diff --git a/lib/Cron/Workers/SendingQueue/SendingQueue.php b/lib/Cron/Workers/SendingQueue/SendingQueue.php index ac1adf1595..9efa7caaeb 100644 --- a/lib/Cron/Workers/SendingQueue/SendingQueue.php +++ b/lib/Cron/Workers/SendingQueue/SendingQueue.php @@ -4,7 +4,6 @@ namespace MailPoet\Cron\Workers\SendingQueue; use MailPoet\Cron\CronHelper; use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask; use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask; -use MailPoet\Cron\Workers\SendingQueue\Tasks\Subscribers as SubscribersTask; use MailPoet\Mailer\MailerLog; use MailPoet\Models\SendingQueue as SendingQueueModel; use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel; @@ -16,6 +15,7 @@ class SendingQueue { public $mailer_task; public $newsletter_task; public $timer; + const BATCH_SIZE = 50; function __construct($timer = false, $mailer_task = false, $newsletter_task = false) { $this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask(); @@ -39,8 +39,10 @@ class SendingQueue { $this->mailer_task->configureMailer($newsletter); // get subscribers $queue->subscribers = $queue->getSubscribers(); - $subscriber_batches = - SubscribersTask::splitSubscribersIntoBatches($queue->subscribers['to_process']); + $subscriber_batches = array_chunk( + $queue->subscribers['to_process'], + self::BATCH_SIZE + ); foreach($subscriber_batches as $subscribers_to_process_ids) { // abort if execution limit is reached CronHelper::enforceExecutionLimit($this->timer); @@ -51,13 +53,12 @@ class SendingQueue { }, $found_subscribers); // if some subscribers weren't found, remove them from the processing list if(count($found_subscribers_ids) !== count($subscribers_to_process_ids)) { - $queue->subscribers = SubscribersTask::updateToProcessList( - $found_subscribers_ids, + $subscibers_to_remove = array_diff( $subscribers_to_process_ids, - $queue->subscribers + $found_subscribers_ids ); + $queue->removeNonexistentSubscribers($subscibers_to_remove); if(!count($queue->subscribers['to_process'])) { - $this->updateQueue($queue); $this->newsletter_task->markNewsletterAsSent($newsletter); continue; } @@ -140,22 +141,14 @@ class SendingQueue { ); if(!$send_result) { // update failed/to process list - $queue->subscribers = SubscribersTask::updateFailedList( - $prepared_subscribers_ids, - $queue->subscribers - ); - $queue = $this->updateQueue($queue); + $queue->updateFailedSubscribers($prepared_subscribers_ids); } else { // update processed/to process list - $queue->subscribers = SubscribersTask::updateProcessedList( - $prepared_subscribers_ids, - $queue->subscribers - ); + $queue->updateProcessedSubscribers($prepared_subscribers_ids); // log statistics StatisticsNewslettersModel::createMultiple($statistics); // update the sent count $this->mailer_task->updateSentCount(); - $queue = $this->updateQueue($queue); // enforce sending limit if there are still subscribers left to process if($queue->count_to_process) { MailerLog::enforceSendingLimit(); @@ -170,18 +163,4 @@ class SendingQueue { ->whereNull('status') ->findMany(); } - - function updateQueue($queue) { - $queue->count_processed = - count($queue->subscribers['processed']) + count($queue->subscribers['failed']); - $queue->count_to_process = count($queue->subscribers['to_process']); - $queue->count_failed = count($queue->subscribers['failed']); - $queue->count_total = - $queue->count_processed + $queue->count_to_process; - if(!$queue->count_to_process) { - $queue->processed_at = current_time('mysql'); - $queue->status = SendingQueueModel::STATUS_COMPLETED; - } - return $queue->save(); - } } \ No newline at end of file diff --git a/lib/Cron/Workers/SendingQueue/Tasks/Subscribers.php b/lib/Cron/Workers/SendingQueue/Tasks/Subscribers.php deleted file mode 100644 index c33fb30dae..0000000000 --- a/lib/Cron/Workers/SendingQueue/Tasks/Subscribers.php +++ /dev/null @@ -1,63 +0,0 @@ -has_one(__NAMESPACE__.'\Newsletter', 'id', 'newsletter_id'); + return $this->has_one(__NAMESPACE__ . '\Newsletter', 'id', 'newsletter_id'); } function pause() { @@ -84,4 +84,62 @@ class SendingQueue extends Model { : $this->subscribers; return $model; } + + function removeNonexistentSubscribers($subscribers_to_remove) { + $subscribers = $this->getSubscribers(); + $subscribers['to_process'] = array_values( + array_diff( + $subscribers['to_process'], + $subscribers_to_remove + ) + ); + $this->subscribers = $subscribers; + $this->updateCount(); + } + + function updateFailedSubscribers($failed_subscribers) { + $subscribers = $this->getSubscribers(); + $subscribers['failed'] = array_merge( + $subscribers['failed'], + $failed_subscribers + ); + $subscribers['to_process'] = array_values( + array_diff( + $subscribers['to_process'], + $failed_subscribers + ) + ); + $this->subscribers = $subscribers; + $this->updateCount(); + } + + function updateProcessedSubscribers($processed_subscribers) { + $subscribers = $this->getSubscribers(); + $subscribers['processed'] = array_merge( + $subscribers['processed'], + $processed_subscribers + ); + $subscribers['to_process'] = array_values( + array_diff( + $subscribers['to_process'], + $processed_subscribers + ) + ); + $this->subscribers = $subscribers; + $this->updateCount(); + } + + function updateCount() { + $this->subscribers = $this->getSubscribers(); + $this->count_processed = + count($this->subscribers['processed']) + count($this->subscribers['failed']); + $this->count_to_process = count($this->subscribers['to_process']); + $this->count_failed = count($this->subscribers['failed']); + $this->count_total = $this->count_processed + $this->count_to_process; + if(!$this->count_to_process) { + $this->processed_at = current_time('mysql'); + $this->status = self::STATUS_COMPLETED; + } + return $this->save(); + } } \ No newline at end of file diff --git a/tests/unit/Cron/Workers/SendingQueue/Tasks/SubscribersTest.php b/tests/unit/Cron/Workers/SendingQueue/Tasks/SubscribersTest.php deleted file mode 100644 index c0f7b271f0..0000000000 --- a/tests/unit/Cron/Workers/SendingQueue/Tasks/SubscribersTest.php +++ /dev/null @@ -1,96 +0,0 @@ -equals(200 / Subscribers::BATCH_SIZE); - expect(count($split_subscribers[0]))->equals(Subscribers::BATCH_SIZE); - } - - function testItCanRemoveNonexistentSubscribersFromListOfSubscribersToProcess() { - $queue_subscribers = array( - 'to_process' => range(1, 5) - ); - $subscribers_found = array( - 1, - 2, - 5 - ); - $subscribers_to_process = array( - 3, - 4 - ); - $updated_queue_subscribers = Subscribers::updateToProcessList( - $subscribers_found, - $subscribers_to_process, - $queue_subscribers - ); - expect($updated_queue_subscribers)->equals( - array( - 'to_process' => array( - 1, - 2, - 5 - ) - ) - ); - } - - function testItCanUpdateListOfFailedSubscribers() { - $queue_subscribers = array( - 'to_process' => range(1, 5), - 'failed' => array() - ); - $failed_subscribers = array( - 1 - ); - $updated_queue_subscribers = Subscribers::updateFailedList( - $failed_subscribers, - $queue_subscribers - ); - expect($updated_queue_subscribers)->equals( - array( - 'to_process' => array( - 2, - 3, - 4, - 5 - ), - 'failed' => array( - 1 - ) - ) - ); - } - - function testItCanUpdateListOfProcessedSubscribers() { - $queue_subscribers = array( - 'to_process' => range(1, 5), - 'processed' => array() - ); - $processed_subscribers = array( - 1 - ); - $updated_queue_subscribers = Subscribers::updateProcessedList( - $processed_subscribers, - $queue_subscribers - ); - expect($updated_queue_subscribers)->equals( - array( - 'to_process' => array( - 2, - 3, - 4, - 5 - ), - 'processed' => array( - 1 - ) - ) - ); - } -} \ No newline at end of file