- Moves logic of the Subscribers task to the Sending Queue model

This commit is contained in:
Vlad
2016-09-19 21:53:13 -04:00
parent d5107be65e
commit 2fa5e5ead2
4 changed files with 69 additions and 191 deletions

View File

@ -4,7 +4,6 @@ namespace MailPoet\Cron\Workers\SendingQueue;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask; use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask; use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Subscribers as SubscribersTask;
use MailPoet\Mailer\MailerLog; use MailPoet\Mailer\MailerLog;
use MailPoet\Models\SendingQueue as SendingQueueModel; use MailPoet\Models\SendingQueue as SendingQueueModel;
use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel; use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel;
@ -16,6 +15,7 @@ class SendingQueue {
public $mailer_task; public $mailer_task;
public $newsletter_task; public $newsletter_task;
public $timer; public $timer;
const BATCH_SIZE = 50;
function __construct($timer = false, $mailer_task = false, $newsletter_task = false) { function __construct($timer = false, $mailer_task = false, $newsletter_task = false) {
$this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask(); $this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask();
@ -39,8 +39,10 @@ class SendingQueue {
$this->mailer_task->configureMailer($newsletter); $this->mailer_task->configureMailer($newsletter);
// get subscribers // get subscribers
$queue->subscribers = $queue->getSubscribers(); $queue->subscribers = $queue->getSubscribers();
$subscriber_batches = $subscriber_batches = array_chunk(
SubscribersTask::splitSubscribersIntoBatches($queue->subscribers['to_process']); $queue->subscribers['to_process'],
self::BATCH_SIZE
);
foreach($subscriber_batches as $subscribers_to_process_ids) { foreach($subscriber_batches as $subscribers_to_process_ids) {
// abort if execution limit is reached // abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer); CronHelper::enforceExecutionLimit($this->timer);
@ -51,13 +53,12 @@ class SendingQueue {
}, $found_subscribers); }, $found_subscribers);
// if some subscribers weren't found, remove them from the processing list // if some subscribers weren't found, remove them from the processing list
if(count($found_subscribers_ids) !== count($subscribers_to_process_ids)) { if(count($found_subscribers_ids) !== count($subscribers_to_process_ids)) {
$queue->subscribers = SubscribersTask::updateToProcessList( $subscibers_to_remove = array_diff(
$found_subscribers_ids,
$subscribers_to_process_ids, $subscribers_to_process_ids,
$queue->subscribers $found_subscribers_ids
); );
$queue->removeNonexistentSubscribers($subscibers_to_remove);
if(!count($queue->subscribers['to_process'])) { if(!count($queue->subscribers['to_process'])) {
$this->updateQueue($queue);
$this->newsletter_task->markNewsletterAsSent($newsletter); $this->newsletter_task->markNewsletterAsSent($newsletter);
continue; continue;
} }
@ -140,22 +141,14 @@ class SendingQueue {
); );
if(!$send_result) { if(!$send_result) {
// update failed/to process list // update failed/to process list
$queue->subscribers = SubscribersTask::updateFailedList( $queue->updateFailedSubscribers($prepared_subscribers_ids);
$prepared_subscribers_ids,
$queue->subscribers
);
$queue = $this->updateQueue($queue);
} else { } else {
// update processed/to process list // update processed/to process list
$queue->subscribers = SubscribersTask::updateProcessedList( $queue->updateProcessedSubscribers($prepared_subscribers_ids);
$prepared_subscribers_ids,
$queue->subscribers
);
// log statistics // log statistics
StatisticsNewslettersModel::createMultiple($statistics); StatisticsNewslettersModel::createMultiple($statistics);
// update the sent count // update the sent count
$this->mailer_task->updateSentCount(); $this->mailer_task->updateSentCount();
$queue = $this->updateQueue($queue);
// enforce sending limit if there are still subscribers left to process // enforce sending limit if there are still subscribers left to process
if($queue->count_to_process) { if($queue->count_to_process) {
MailerLog::enforceSendingLimit(); MailerLog::enforceSendingLimit();
@ -170,18 +163,4 @@ class SendingQueue {
->whereNull('status') ->whereNull('status')
->findMany(); ->findMany();
} }
function updateQueue($queue) {
$queue->count_processed =
count($queue->subscribers['processed']) + count($queue->subscribers['failed']);
$queue->count_to_process = count($queue->subscribers['to_process']);
$queue->count_failed = count($queue->subscribers['failed']);
$queue->count_total =
$queue->count_processed + $queue->count_to_process;
if(!$queue->count_to_process) {
$queue->processed_at = current_time('mysql');
$queue->status = SendingQueueModel::STATUS_COMPLETED;
}
return $queue->save();
}
} }

View File

@ -1,63 +0,0 @@
<?php
namespace MailPoet\Cron\Workers\SendingQueue\Tasks;
if(!defined('ABSPATH')) exit;
class Subscribers {
const BATCH_SIZE = 50;
static function splitSubscribersIntoBatches(array $subscribers) {
return array_chunk(
$subscribers,
self::BATCH_SIZE
);
}
static function updateToProcessList(
$found_subscribers_ids,
$subscribers_to_process_ids,
$queue_subscribers
) {
// compare existing subscribers to the ones that are queued for processing
$subscibers_to_exclude = array_diff(
$subscribers_to_process_ids,
$found_subscribers_ids
);
// remove nonexistent subscribers from the processing list
$queue_subscribers['to_process'] = array_values(
array_diff(
$queue_subscribers['to_process'],
$subscibers_to_exclude
)
);
return $queue_subscribers;
}
static function updateFailedList($failed_subscribers, $queue_subscribers) {
$queue_subscribers['failed'] = array_merge(
$queue_subscribers['failed'],
$failed_subscribers
);
$queue_subscribers['to_process'] = array_values(
array_diff(
$queue_subscribers['to_process'],
$failed_subscribers
)
);
return $queue_subscribers;
}
static function updateProcessedList($processed_subscribers, $queue_subscribers) {
$queue_subscribers['processed'] = array_merge(
$queue_subscribers['processed'],
$processed_subscribers
);
$queue_subscribers['to_process'] = array_values(
array_diff(
$queue_subscribers['to_process'],
$processed_subscribers
)
);
return $queue_subscribers;
}
}

View File

@ -10,7 +10,7 @@ class SendingQueue extends Model {
const STATUS_PAUSED = 'paused'; const STATUS_PAUSED = 'paused';
function newsletter() { function newsletter() {
return $this->has_one(__NAMESPACE__.'\Newsletter', 'id', 'newsletter_id'); return $this->has_one(__NAMESPACE__ . '\Newsletter', 'id', 'newsletter_id');
} }
function pause() { function pause() {
@ -84,4 +84,62 @@ class SendingQueue extends Model {
: $this->subscribers; : $this->subscribers;
return $model; return $model;
} }
function removeNonexistentSubscribers($subscribers_to_remove) {
$subscribers = $this->getSubscribers();
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$subscribers_to_remove
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateFailedSubscribers($failed_subscribers) {
$subscribers = $this->getSubscribers();
$subscribers['failed'] = array_merge(
$subscribers['failed'],
$failed_subscribers
);
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$failed_subscribers
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateProcessedSubscribers($processed_subscribers) {
$subscribers = $this->getSubscribers();
$subscribers['processed'] = array_merge(
$subscribers['processed'],
$processed_subscribers
);
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$processed_subscribers
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateCount() {
$this->subscribers = $this->getSubscribers();
$this->count_processed =
count($this->subscribers['processed']) + count($this->subscribers['failed']);
$this->count_to_process = count($this->subscribers['to_process']);
$this->count_failed = count($this->subscribers['failed']);
$this->count_total = $this->count_processed + $this->count_to_process;
if(!$this->count_to_process) {
$this->processed_at = current_time('mysql');
$this->status = self::STATUS_COMPLETED;
}
return $this->save();
}
} }

View File

@ -1,96 +0,0 @@
<?php
use MailPoet\Cron\Workers\SendingQueue\Tasks\Subscribers;
if(!defined('ABSPATH')) exit;
class SubscribersTaskTest extends MailPoetTest {
function testItCanSplitSubscribersInBatches() {
$subscribers = range(1, 200);
$split_subscribers = Subscribers::splitSubscribersIntoBatches($subscribers);
expect(count($split_subscribers))->equals(200 / Subscribers::BATCH_SIZE);
expect(count($split_subscribers[0]))->equals(Subscribers::BATCH_SIZE);
}
function testItCanRemoveNonexistentSubscribersFromListOfSubscribersToProcess() {
$queue_subscribers = array(
'to_process' => range(1, 5)
);
$subscribers_found = array(
1,
2,
5
);
$subscribers_to_process = array(
3,
4
);
$updated_queue_subscribers = Subscribers::updateToProcessList(
$subscribers_found,
$subscribers_to_process,
$queue_subscribers
);
expect($updated_queue_subscribers)->equals(
array(
'to_process' => array(
1,
2,
5
)
)
);
}
function testItCanUpdateListOfFailedSubscribers() {
$queue_subscribers = array(
'to_process' => range(1, 5),
'failed' => array()
);
$failed_subscribers = array(
1
);
$updated_queue_subscribers = Subscribers::updateFailedList(
$failed_subscribers,
$queue_subscribers
);
expect($updated_queue_subscribers)->equals(
array(
'to_process' => array(
2,
3,
4,
5
),
'failed' => array(
1
)
)
);
}
function testItCanUpdateListOfProcessedSubscribers() {
$queue_subscribers = array(
'to_process' => range(1, 5),
'processed' => array()
);
$processed_subscribers = array(
1
);
$updated_queue_subscribers = Subscribers::updateProcessedList(
$processed_subscribers,
$queue_subscribers
);
expect($updated_queue_subscribers)->equals(
array(
'to_process' => array(
2,
3,
4,
5
),
'processed' => array(
1
)
)
);
}
}