diff --git a/lib/Config/Database.php b/lib/Config/Database.php index ca7c5f4c6b..23c7e59be7 100644 --- a/lib/Config/Database.php +++ b/lib/Config/Database.php @@ -70,6 +70,7 @@ class Database { $subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field'; $newsletter_segment = Env::$db_prefix . 'newsletter_segment'; $scheduled_tasks = Env::$db_prefix . 'scheduled_tasks'; + $scheduled_task_subscribers = Env::$db_prefix . 'scheduled_task_subscribers'; $sending_queues = Env::$db_prefix . 'sending_queues'; $newsletters = Env::$db_prefix . 'newsletters'; $newsletter_templates = Env::$db_prefix . 'newsletter_templates'; @@ -92,6 +93,7 @@ class Database { define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment); define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field); define('MP_SCHEDULED_TASKS_TABLE', $scheduled_tasks); + define('MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE', $scheduled_task_subscribers); define('MP_SENDING_QUEUES_TABLE', $sending_queues); define('MP_NEWSLETTERS_TABLE', $newsletters); define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates); diff --git a/lib/Config/Migrator.php b/lib/Config/Migrator.php index 841626c710..7707a7e3aa 100644 --- a/lib/Config/Migrator.php +++ b/lib/Config/Migrator.php @@ -18,6 +18,7 @@ class Migrator { 'settings', 'custom_fields', 'scheduled_tasks', + 'scheduled_task_subscribers', 'sending_queues', 'subscribers', 'subscriber_segment', @@ -109,12 +110,8 @@ class Migrator { $attributes = array( 'id mediumint(9) NOT NULL AUTO_INCREMENT,', 'type varchar(90) NULL DEFAULT NULL,', - 'subscribers longtext,', 'status varchar(12) NULL DEFAULT NULL,', 'priority mediumint(9) NOT NULL DEFAULT 0,', - 'count_total mediumint(9) NOT NULL DEFAULT 0,', - 'count_processed mediumint(9) NOT NULL DEFAULT 0,', - 'count_to_process mediumint(9) NOT NULL DEFAULT 0,', 'scheduled_at TIMESTAMP NULL,', 'processed_at TIMESTAMP NULL,', 'created_at TIMESTAMP NULL,', @@ -125,6 +122,17 @@ class Migrator { return $this->sqlify(__FUNCTION__, $attributes); } + function scheduledTaskSubscribers() { + $attributes = array( + 'task_id mediumint(9) NOT NULL,', + 'subscriber_id mediumint(9) NOT NULL,', + 'processed int(1) NULL,', + 'created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,', + 'PRIMARY KEY (task_id, subscriber_id)', + ); + return $this->sqlify(__FUNCTION__, $attributes); + } + function sendingQueues() { $attributes = array( 'id mediumint(9) NOT NULL AUTO_INCREMENT,', diff --git a/lib/Cron/Workers/Bounce.php b/lib/Cron/Workers/Bounce.php index 10ef580f56..537d79e963 100644 --- a/lib/Cron/Workers/Bounce.php +++ b/lib/Cron/Workers/Bounce.php @@ -4,6 +4,8 @@ namespace MailPoet\Cron\Workers; use MailPoet\Cron\CronHelper; use MailPoet\Mailer\Mailer; use MailPoet\Models\ScheduledTask; +use MailPoet\Models\ScheduledTaskSubscriber; +use MailPoet\Tasks\Subscribers\BatchIterator; use MailPoet\Models\Subscriber; use MailPoet\Services\Bridge; use MailPoet\Services\Bridge\API; @@ -48,28 +50,19 @@ class Bounce extends SimpleWorker { } // update current task - $task->subscribers = serialize( - array( - 'to_process' => $subscribers - ) - ); - $task->count_total = $task->count_to_process = count($subscribers); + ScheduledTaskSubscriber::addSubscribers($task->id, $subscribers); return parent::prepareTask($task); } function processTask(ScheduledTask $task) { - $task->subscribers = $task->getSubscribers(); - if(empty($task->subscribers['to_process'])) { + $subscriber_batches = new BatchIterator($task->id, self::BATCH_SIZE); + + if(count($subscriber_batches) === 0) { $task->delete(); return false; } - $subscriber_batches = array_chunk( - $task->subscribers['to_process'], - self::BATCH_SIZE - ); - foreach($subscriber_batches as $subscribers_to_process_ids) { // abort if execution limit is reached CronHelper::enforceExecutionLimit($this->timer); diff --git a/lib/Models/ScheduledTask.php b/lib/Models/ScheduledTask.php index 9edb31e688..a8bb61afa0 100644 --- a/lib/Models/ScheduledTask.php +++ b/lib/Models/ScheduledTask.php @@ -11,6 +11,10 @@ class ScheduledTask extends Model { const PRIORITY_MEDIUM = 5; const PRIORITY_LOW = 10; + function taskSubscribers() { + return $this->has_many(__NAMESPACE__ . '\ScheduledTaskSubscriber', 'task_id', 'id'); + } + function complete() { $this->set('status', self::STATUS_COMPLETED); $this->save(); @@ -18,79 +22,28 @@ class ScheduledTask extends Model { } function save() { - if(!is_serialized($this->subscribers)) { - $this->set('subscribers', serialize($this->subscribers)); - } // set the default priority to medium if(!$this->priority) { $this->priority = self::PRIORITY_MEDIUM; } parent::save(); - $this->subscribers = $this->getSubscribers(); return $this; } - function getSubscribers() { - if(!is_serialized($this->subscribers)) { - return $this->subscribers; - } - $subscribers = unserialize($this->subscribers); - if(empty($subscribers['processed'])) { - $subscribers['processed'] = array(); - } - return $subscribers; + function updateProcessedSubscribers(array $processed_subscribers) { + $this->taskSubscribers() + ->whereIn('subscriber_id', $processed_subscribers) + ->findResultSet() + ->set('processed', ScheduledTaskSubscriber::STATUS_PROCESSED) + ->save(); + $this->checkCompleted(); } - function isSubscriberProcessed($subscriber_id) { - $subscribers = $this->getSubscribers(); - return in_array($subscriber_id, $subscribers['processed']); - } - - function asArray() { - $model = parent::asArray(); - $model['subscribers'] = (is_serialized($this->subscribers)) - ? unserialize($this->subscribers) - : $this->subscribers; - return $model; - } - - function removeNonexistentSubscribers($subscribers_to_remove) { - $subscribers = $this->getSubscribers(); - $subscribers['to_process'] = array_values( - array_diff( - $subscribers['to_process'], - $subscribers_to_remove - ) - ); - $this->subscribers = $subscribers; - $this->updateCount(); - } - - function updateProcessedSubscribers($processed_subscribers) { - $subscribers = $this->getSubscribers(); - $subscribers['processed'] = array_merge( - $subscribers['processed'], - $processed_subscribers - ); - $subscribers['to_process'] = array_values( - array_diff( - $subscribers['to_process'], - $processed_subscribers - ) - ); - $this->subscribers = $subscribers; - $this->updateCount(); - } - - function updateCount() { - $this->subscribers = $this->getSubscribers(); - $this->count_processed = count($this->subscribers['processed']); - $this->count_to_process = count($this->subscribers['to_process']); - $this->count_total = $this->count_processed + $this->count_to_process; - if(!$this->count_to_process) { + private function checkCompleted() { + if(!ScheduledTaskSubscriber::getToProcessCount($this->id)) { $this->processed_at = current_time('mysql'); $this->status = self::STATUS_COMPLETED; + return $this->save(); } - return $this->save(); } -} \ No newline at end of file +} diff --git a/lib/Models/ScheduledTaskSubscriber.php b/lib/Models/ScheduledTaskSubscriber.php new file mode 100644 index 0000000000..c0a9debb89 --- /dev/null +++ b/lib/Models/ScheduledTaskSubscriber.php @@ -0,0 +1,61 @@ +where('task_id', $data['task_id']) + ->findOne(); + + if(empty($task_subscriber)) $task_subscriber = self::create(); + + $task_subscriber->task_id = $data['task_id']; + $task_subscriber->subscriber_id = $data['subscriber_id']; + $task_subscriber->processed = self::STATUS_TO_PROCESS; + $task_subscriber->save(); + + return $task_subscriber; + } + + static function addSubscribers($task_id, array $subscriber_ids) { + foreach($subscriber_ids as $subscriber_id) { + self::createOrUpdate(array( + 'task_id' => $task_id, + 'subscriber_id' => $subscriber_id + )); + } + } + + static function getToProcessCount($task_id) { + return self::getCount($task_id, false); + } + + static function getProcessedCount($task_id) { + return self::getCount($task_id, true); + } + + static function getTotalCount($task_id) { + return self::getCount($task_id); + } + + private static function getCount($task_id, $processed = null) { + $orm = self::where('task_id', $task_id); + if(!is_null($processed)) { + $orm->where( + 'processed', + ($processed) ? self::STATUS_PROCESSED : self::STATUS_TO_PROCESS + ); + } + return $orm->count(); + } +} diff --git a/lib/Tasks/Subscribers/BatchIterator.php b/lib/Tasks/Subscribers/BatchIterator.php new file mode 100644 index 0000000000..f1102bfc7f --- /dev/null +++ b/lib/Tasks/Subscribers/BatchIterator.php @@ -0,0 +1,59 @@ +task_id = (int)$task_id; + $this->batch_size = (int)$batch_size; + } + + function rewind() { + $this->offset = 0; + } + + function current() { + $subscribers = $this->getSubscribers() + ->orderByAsc('subscriber_id') + ->limit($this->batch_size) + ->offset($this->offset) + ->findArray(); + $subscribers = Helpers::arrayColumn($subscribers, 'subscriber_id'); + return $subscribers; + } + + function key() { + return $this->offset; + } + + function next() { + $this->offset += $this->batch_size; + } + + function valid() { + return $this->offset < $this->count(); + } + + function count() { + return $this->getSubscribers()->count(); + } + + private function getSubscribers() { + return ScheduledTaskSubscriber::select('subscriber_id') + ->where('task_id', $this->task_id) + ->where('processed', ScheduledTaskSubscriber::STATUS_TO_PROCESS); + } +} diff --git a/tests/_bootstrap.php b/tests/_bootstrap.php index 85e9ec5b1b..7a8ad81072 100644 --- a/tests/_bootstrap.php +++ b/tests/_bootstrap.php @@ -19,6 +19,7 @@ $models = array( 'NewsletterOptionField', 'Segment', 'ScheduledTask', + 'ScheduledTaskSubscriber', 'SendingQueue', 'Setting', 'Subscriber', diff --git a/tests/unit/Cron/Workers/BounceTest.php b/tests/unit/Cron/Workers/BounceTest.php index 0da973c21d..f87a24ab91 100644 --- a/tests/unit/Cron/Workers/BounceTest.php +++ b/tests/unit/Cron/Workers/BounceTest.php @@ -4,6 +4,7 @@ use Carbon\Carbon; use MailPoet\Cron\Workers\Bounce; use MailPoet\Mailer\Mailer; use MailPoet\Models\ScheduledTask; +use MailPoet\Models\ScheduledTaskSubscriber; use MailPoet\Models\Setting; use MailPoet\Models\Subscriber; use MailPoet\Services\Bridge\API; @@ -58,10 +59,10 @@ class BounceTest extends MailPoetTest { function testItPreparesTask() { $task = $this->createScheduledTask(); - expect(empty($task->subscribers['to_process']))->true(); + expect(ScheduledTaskSubscriber::getToProcessCount($task->id))->isEmpty(); $this->worker->prepareTask($task); expect($task->status)->null(); - expect(!empty($task->subscribers['to_process']))->true(); + expect(ScheduledTaskSubscriber::getToProcessCount($task->id))->notEmpty(); } function testItDeletesTaskIfThereAreNoSubscribersToProcessWhenProcessingTask() { @@ -76,9 +77,9 @@ class BounceTest extends MailPoetTest { function testItProcessesTask() { $task = $this->createRunningTask(); $this->worker->prepareTask($task); - expect(!empty($task->subscribers['to_process']))->true(); + expect(ScheduledTaskSubscriber::getToProcessCount($task->id))->notEmpty(); $this->worker->processTask($task); - expect(!empty($task->subscribers['processed']))->true(); + expect(ScheduledTaskSubscriber::getProcessedCount($task->id))->notEmpty(); } function testItSetsSubscriberStatusAsBounced() { @@ -125,6 +126,7 @@ class BounceTest extends MailPoetTest { function _after() { ORM::raw_execute('TRUNCATE ' . Setting::$_table); ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); + ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table); ORM::raw_execute('TRUNCATE ' . Subscriber::$_table); } } \ No newline at end of file