Refactor simple workers to use a ScheduledTask model [MAILPOET-940]

This commit is contained in:
stoletniy
2017-06-26 14:16:00 +03:00
parent 7be01f0e4e
commit 6e929dcf79
11 changed files with 328 additions and 193 deletions

View File

@ -69,6 +69,7 @@ class Database {
$subscriber_segment = Env::$db_prefix . 'subscriber_segment'; $subscriber_segment = Env::$db_prefix . 'subscriber_segment';
$subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field'; $subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field';
$newsletter_segment = Env::$db_prefix . 'newsletter_segment'; $newsletter_segment = Env::$db_prefix . 'newsletter_segment';
$scheduled_tasks = Env::$db_prefix . 'scheduled_tasks';
$sending_queues = Env::$db_prefix . 'sending_queues'; $sending_queues = Env::$db_prefix . 'sending_queues';
$newsletters = Env::$db_prefix . 'newsletters'; $newsletters = Env::$db_prefix . 'newsletters';
$newsletter_templates = Env::$db_prefix . 'newsletter_templates'; $newsletter_templates = Env::$db_prefix . 'newsletter_templates';
@ -90,6 +91,7 @@ class Database {
define('MP_SUBSCRIBERS_TABLE', $subscribers); define('MP_SUBSCRIBERS_TABLE', $subscribers);
define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment); define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment);
define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field); define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field);
define('MP_SCHEDULED_TASKS_TABLE', $scheduled_tasks);
define('MP_SENDING_QUEUES_TABLE', $sending_queues); define('MP_SENDING_QUEUES_TABLE', $sending_queues);
define('MP_NEWSLETTERS_TABLE', $newsletters); define('MP_NEWSLETTERS_TABLE', $newsletters);
define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates); define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates);

View File

@ -17,6 +17,7 @@ class Migrator {
'segments', 'segments',
'settings', 'settings',
'custom_fields', 'custom_fields',
'scheduled_tasks',
'sending_queues', 'sending_queues',
'subscribers', 'subscribers',
'subscriber_segment', 'subscriber_segment',
@ -104,6 +105,26 @@ class Migrator {
return $this->sqlify(__FUNCTION__, $attributes); return $this->sqlify(__FUNCTION__, $attributes);
} }
function scheduledTasks() {
$attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,',
'type varchar(90) NULL DEFAULT NULL,',
'subscribers longtext,',
'status varchar(12) NULL DEFAULT NULL,',
'priority mediumint(9) NOT NULL DEFAULT 0,',
'count_total mediumint(9) NOT NULL DEFAULT 0,',
'count_processed mediumint(9) NOT NULL DEFAULT 0,',
'count_to_process mediumint(9) NOT NULL DEFAULT 0,',
'scheduled_at TIMESTAMP NULL,',
'processed_at TIMESTAMP NULL,',
'created_at TIMESTAMP NULL,',
'updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
'deleted_at TIMESTAMP NULL,',
'PRIMARY KEY (id)',
);
return $this->sqlify(__FUNCTION__, $attributes);
}
function sendingQueues() { function sendingQueues() {
$attributes = array( $attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,', 'id mediumint(9) NOT NULL AUTO_INCREMENT,',

View File

@ -28,20 +28,20 @@ class WordPress {
// sending service // sending service
$mp_sending_enabled = Bridge::isMPSendingServiceEnabled(); $mp_sending_enabled = Bridge::isMPSendingServiceEnabled();
// bounce sync // bounce sync
$bounce_due_queues = BounceWorker::getAllDueQueues(); $bounce_due_tasks = BounceWorker::getAllDueTasks();
$bounce_future_queues = BounceWorker::getFutureQueues(); $bounce_future_tasks = BounceWorker::getFutureTasks();
// sending service key check // sending service key check
$sskeycheck_due_queues = SendingServiceKeyCheckWorker::getAllDueQueues(); $msskeycheck_due_tasks = SendingServiceKeyCheckWorker::getAllDueTasks();
$sskeycheck_future_queues = SendingServiceKeyCheckWorker::getFutureQueues(); $msskeycheck_future_tasks = SendingServiceKeyCheckWorker::getFutureTasks();
// premium key check // premium key check
$premium_key_specified = Bridge::isPremiumKeySpecified(); $premium_key_specified = Bridge::isPremiumKeySpecified();
$premium_keycheck_due_queues = PremiumKeyCheckWorker::getAllDueQueues(); $premium_keycheck_due_tasks = PremiumKeyCheckWorker::getAllDueTasks();
$premium_keycheck_future_queues = PremiumKeyCheckWorker::getFutureQueues(); $premium_keycheck_future_tasks = PremiumKeyCheckWorker::getFutureTasks();
// check requirements for each worker // check requirements for each worker
$sending_queue_active = (($scheduled_queues || $running_queues) && !$sending_limit_reached && !$sending_is_paused); $sending_queue_active = (($scheduled_queues || $running_queues) && !$sending_limit_reached && !$sending_is_paused);
$bounce_sync_active = ($mp_sending_enabled && ($bounce_due_queues || !$bounce_future_queues)); $bounce_sync_active = ($mp_sending_enabled && ($bounce_due_tasks || !$bounce_future_tasks));
$sending_service_key_check_active = ($mp_sending_enabled && ($sskeycheck_due_queues || !$sskeycheck_future_queues)); $sending_service_key_check_active = ($mp_sending_enabled && ($msskeycheck_due_tasks || !$msskeycheck_future_tasks));
$premium_key_check_active = ($premium_key_specified && ($premium_keycheck_due_queues || !$premium_keycheck_future_queues)); $premium_key_check_active = ($premium_key_specified && ($premium_keycheck_due_tasks || !$premium_keycheck_future_tasks));
return ($sending_queue_active || $bounce_sync_active || $sending_service_key_check_active || $premium_key_check_active); return ($sending_queue_active || $bounce_sync_active || $sending_service_key_check_active || $premium_key_check_active);
} }

View File

@ -3,7 +3,7 @@ namespace MailPoet\Cron\Workers;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Mailer\Mailer; use MailPoet\Mailer\Mailer;
use MailPoet\Models\SendingQueue; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
use MailPoet\Services\Bridge; use MailPoet\Services\Bridge;
use MailPoet\Services\Bridge\API; use MailPoet\Services\Bridge\API;
@ -32,7 +32,7 @@ class Bounce extends SimpleWorker {
return Bridge::isMPSendingServiceEnabled(); return Bridge::isMPSendingServiceEnabled();
} }
function prepareQueue(SendingQueue $queue) { function prepareTask(ScheduledTask $task) {
$subscribers = Subscriber::select('id') $subscribers = Subscriber::select('id')
->whereNull('deleted_at') ->whereNull('deleted_at')
->whereIn('status', array( ->whereIn('status', array(
@ -43,30 +43,30 @@ class Bounce extends SimpleWorker {
$subscribers = Helpers::arrayColumn($subscribers, 'id'); $subscribers = Helpers::arrayColumn($subscribers, 'id');
if(empty($subscribers)) { if(empty($subscribers)) {
$queue->delete(); $task->delete();
return false; return false;
} }
// update current queue // update current task
$queue->subscribers = serialize( $task->subscribers = serialize(
array( array(
'to_process' => $subscribers 'to_process' => $subscribers
) )
); );
$queue->count_total = $queue->count_to_process = count($subscribers); $task->count_total = $task->count_to_process = count($subscribers);
return parent::prepareQueue($queue); return parent::prepareTask($task);
} }
function processQueue(SendingQueue $queue) { function processTask(ScheduledTask $task) {
$queue->subscribers = $queue->getSubscribers(); $task->subscribers = $task->getSubscribers();
if(empty($queue->subscribers['to_process'])) { if(empty($task->subscribers['to_process'])) {
$queue->delete(); $task->delete();
return false; return false;
} }
$subscriber_batches = array_chunk( $subscriber_batches = array_chunk(
$queue->subscribers['to_process'], $task->subscribers['to_process'],
self::BATCH_SIZE self::BATCH_SIZE
); );
@ -82,7 +82,7 @@ class Bounce extends SimpleWorker {
$this->processEmails($subscriber_emails); $this->processEmails($subscriber_emails);
$queue->updateProcessedSubscribers($subscribers_to_process_ids); $task->updateProcessedSubscribers($subscribers_to_process_ids);
} }
return true; return true;

View File

@ -3,7 +3,7 @@ namespace MailPoet\Cron\Workers\KeyCheck;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Workers\SimpleWorker; use MailPoet\Cron\Workers\SimpleWorker;
use MailPoet\Models\SendingQueue; use MailPoet\Models\ScheduledTask;
use MailPoet\Services\Bridge; use MailPoet\Services\Bridge;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
@ -19,7 +19,7 @@ abstract class KeyCheckWorker extends SimpleWorker {
} }
} }
function processQueueStrategy(SendingQueue $queue) { function processTaskStrategy(ScheduledTask $task) {
try { try {
$result = $this->checkKey(); $result = $this->checkKey();
} catch (\Exception $e) { } catch (\Exception $e) {
@ -27,7 +27,7 @@ abstract class KeyCheckWorker extends SimpleWorker {
} }
if(empty($result['code']) || $result['code'] == Bridge::CHECK_ERROR_UNAVAILABLE) { if(empty($result['code']) || $result['code'] == Bridge::CHECK_ERROR_UNAVAILABLE) {
$this->reschedule($queue, self::UNAVAILABLE_SERVICE_RESCHEDULE_TIMEOUT); $this->reschedule($task, self::UNAVAILABLE_SERVICE_RESCHEDULE_TIMEOUT);
return false; return false;
} }

View File

@ -3,7 +3,7 @@ namespace MailPoet\Cron\Workers;
use Carbon\Carbon; use Carbon\Carbon;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Models\SendingQueue; use MailPoet\Models\ScheduledTask;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
@ -32,45 +32,44 @@ abstract class SimpleWorker {
$this->init(); $this->init();
} }
$scheduled_queues = self::getScheduledQueues(); $scheduled_tasks = self::getScheduledTasks();
$running_queues = self::getRunningQueues(); $running_tasks = self::getRunningTasks();
if(!$scheduled_queues && !$running_queues) { if(!$scheduled_tasks && !$running_tasks) {
self::schedule(); self::schedule();
return false; return false;
} }
foreach($scheduled_queues as $i => $queue) { foreach($scheduled_tasks as $i => $task) {
$this->prepareQueue($queue); $this->prepareTask($task);
} }
foreach($running_queues as $i => $queue) { foreach($running_tasks as $i => $task) {
$this->processQueue($queue); $this->processTask($task);
} }
return true; return true;
} }
static function schedule() { static function schedule() {
$already_scheduled = SendingQueue::where('type', static::TASK_TYPE) $already_scheduled = ScheduledTask::where('type', static::TASK_TYPE)
->whereNull('deleted_at') ->whereNull('deleted_at')
->where('status', SendingQueue::STATUS_SCHEDULED) ->where('status', ScheduledTask::STATUS_SCHEDULED)
->findMany(); ->findMany();
if($already_scheduled) { if($already_scheduled) {
return false; return false;
} }
$queue = SendingQueue::create(); $task = ScheduledTask::create();
$queue->type = static::TASK_TYPE; $task->type = static::TASK_TYPE;
$queue->status = SendingQueue::STATUS_SCHEDULED; $task->status = ScheduledTask::STATUS_SCHEDULED;
$queue->priority = SendingQueue::PRIORITY_LOW; $task->priority = ScheduledTask::PRIORITY_LOW;
$queue->scheduled_at = self::getNextRunDate(); $task->scheduled_at = self::getNextRunDate();
$queue->newsletter_id = 0; $task->save();
$queue->save(); return $task;
return $queue;
} }
function prepareQueue(SendingQueue $queue) { function prepareTask(ScheduledTask $task) {
$queue->status = null; $task->status = null;
$queue->save(); $task->save();
// abort if execution limit is reached // abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer); CronHelper::enforceExecutionLimit($this->timer);
@ -78,32 +77,32 @@ abstract class SimpleWorker {
return true; return true;
} }
function processQueue(SendingQueue $queue) { function processTask(ScheduledTask $task) {
// abort if execution limit is reached // abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer); CronHelper::enforceExecutionLimit($this->timer);
if($this->processQueueStrategy($queue)) { if($this->processTaskStrategy($task)) {
$this->complete($queue); $this->complete($task);
return true; return true;
} }
return false; return false;
} }
function processQueueStrategy(SendingQueue $queue) { function processTaskStrategy(ScheduledTask $task) {
return true; return true;
} }
function complete(SendingQueue $queue) { function complete(ScheduledTask $task) {
$queue->processed_at = current_time('mysql'); $task->processed_at = current_time('mysql');
$queue->status = SendingQueue::STATUS_COMPLETED; $task->status = ScheduledTask::STATUS_COMPLETED;
$queue->save(); $task->save();
} }
function reschedule(SendingQueue $queue, $timeout) { function reschedule(ScheduledTask $task, $timeout) {
$scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); $scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->scheduled_at = $scheduled_at->addMinutes($timeout); $task->scheduled_at = $scheduled_at->addMinutes($timeout);
$queue->save(); $task->save();
} }
static function getNextRunDate() { static function getNextRunDate() {
@ -114,30 +113,30 @@ abstract class SimpleWorker {
return $date; return $date;
} }
static function getScheduledQueues($future = false) { static function getScheduledTasks($future = false) {
$dateWhere = ($future) ? 'whereGt' : 'whereLte'; $dateWhere = ($future) ? 'whereGt' : 'whereLte';
return SendingQueue::where('type', static::TASK_TYPE) return ScheduledTask::where('type', static::TASK_TYPE)
->$dateWhere('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) ->$dateWhere('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp')))
->whereNull('deleted_at') ->whereNull('deleted_at')
->where('status', SendingQueue::STATUS_SCHEDULED) ->where('status', ScheduledTask::STATUS_SCHEDULED)
->findMany(); ->findMany();
} }
static function getRunningQueues() { static function getRunningTasks() {
return SendingQueue::where('type', static::TASK_TYPE) return ScheduledTask::where('type', static::TASK_TYPE)
->whereLte('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) ->whereLte('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp')))
->whereNull('deleted_at') ->whereNull('deleted_at')
->whereNull('status') ->whereNull('status')
->findMany(); ->findMany();
} }
static function getAllDueQueues() { static function getAllDueTasks() {
$scheduled_queues = self::getScheduledQueues(); $scheduled_tasks = self::getScheduledTasks();
$running_queues = self::getRunningQueues(); $running_tasks = self::getRunningTasks();
return array_merge((array)$scheduled_queues, (array)$running_queues); return array_merge((array)$scheduled_tasks, (array)$running_tasks);
} }
static function getFutureQueues() { static function getFutureTasks() {
return self::getScheduledQueues(true); return self::getScheduledTasks(true);
} }
} }

View File

@ -0,0 +1,117 @@
<?php
namespace MailPoet\Models;
if(!defined('ABSPATH')) exit;
class ScheduledTask extends Model {
public static $_table = MP_SCHEDULED_TASKS_TABLE;
const STATUS_COMPLETED = 'completed';
const STATUS_SCHEDULED = 'scheduled';
const STATUS_PAUSED = 'paused';
const PRIORITY_HIGH = 1;
const PRIORITY_MEDIUM = 5;
const PRIORITY_LOW = 10;
function pause() {
if($this->count_processed === $this->count_total) {
return false;
} else {
$this->set('status', self::STATUS_PAUSED);
$this->save();
return ($this->getErrors() === false && $this->id() > 0);
}
}
function resume() {
if($this->count_processed === $this->count_total) {
return $this->complete();
} else {
$this->setExpr('status', 'NULL');
$this->save();
return ($this->getErrors() === false && $this->id() > 0);
}
}
function complete() {
$this->set('status', self::STATUS_COMPLETED);
$this->save();
return ($this->getErrors() === false && $this->id() > 0);
}
function save() {
if(!is_serialized($this->subscribers)) {
$this->set('subscribers', serialize($this->subscribers));
}
// set the default priority to medium
if(!$this->priority) {
$this->priority = self::PRIORITY_MEDIUM;
}
parent::save();
$this->subscribers = $this->getSubscribers();
return $this;
}
function getSubscribers() {
if(!is_serialized($this->subscribers)) {
return $this->subscribers;
}
$subscribers = unserialize($this->subscribers);
if(empty($subscribers['processed'])) {
$subscribers['processed'] = array();
}
return $subscribers;
}
function isSubscriberProcessed($subscriber_id) {
$subscribers = $this->getSubscribers();
return in_array($subscriber_id, $subscribers['processed']);
}
function asArray() {
$model = parent::asArray();
$model['subscribers'] = (is_serialized($this->subscribers))
? unserialize($this->subscribers)
: $this->subscribers;
return $model;
}
function removeNonexistentSubscribers($subscribers_to_remove) {
$subscribers = $this->getSubscribers();
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$subscribers_to_remove
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateProcessedSubscribers($processed_subscribers) {
$subscribers = $this->getSubscribers();
$subscribers['processed'] = array_merge(
$subscribers['processed'],
$processed_subscribers
);
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$processed_subscribers
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateCount() {
$this->subscribers = $this->getSubscribers();
$this->count_processed = count($this->subscribers['processed']);
$this->count_to_process = count($this->subscribers['to_process']);
$this->count_total = $this->count_processed + $this->count_to_process;
if(!$this->count_to_process) {
$this->processed_at = current_time('mysql');
$this->status = self::STATUS_COMPLETED;
}
return $this->save();
}
}

View File

@ -18,6 +18,7 @@ $models = array(
'NewsletterOption', 'NewsletterOption',
'NewsletterOptionField', 'NewsletterOptionField',
'Segment', 'Segment',
'ScheduledTask',
'SendingQueue', 'SendingQueue',
'Setting', 'Setting',
'Subscriber', 'Subscriber',

View File

@ -3,7 +3,7 @@
use Carbon\Carbon; use Carbon\Carbon;
use MailPoet\Cron\Workers\Bounce; use MailPoet\Cron\Workers\Bounce;
use MailPoet\Mailer\Mailer; use MailPoet\Mailer\Mailer;
use MailPoet\Models\SendingQueue; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
use MailPoet\Services\Bridge\API; use MailPoet\Services\Bridge\API;
@ -48,37 +48,37 @@ class BounceTest extends MailPoetTest {
expect($this->worker->checkProcessingRequirements())->true(); expect($this->worker->checkProcessingRequirements())->true();
} }
function testItDeletesQueueIfThereAreNoSubscribersWhenPreparingQueue() { function testItDeletesTaskIfThereAreNoSubscribersWhenPreparingTask() {
Subscriber::deleteMany(); Subscriber::deleteMany();
$queue = $this->createScheduledQueue(); $task = $this->createScheduledTask();
$result = $this->worker->prepareQueue($queue); $result = $this->worker->prepareTask($task);
expect(SendingQueue::findOne($queue->id))->isEmpty(); expect(ScheduledTask::findOne($task->id))->isEmpty();
expect($result)->false(); expect($result)->false();
} }
function testItPreparesQueue() { function testItPreparesTask() {
$queue = $this->createScheduledQueue(); $task = $this->createScheduledTask();
expect(empty($queue->subscribers['to_process']))->true(); expect(empty($task->subscribers['to_process']))->true();
$this->worker->prepareQueue($queue); $this->worker->prepareTask($task);
expect($queue->status)->null(); expect($task->status)->null();
expect(!empty($queue->subscribers['to_process']))->true(); expect(!empty($task->subscribers['to_process']))->true();
} }
function testItDeletesQueueIfThereAreNoSubscribersToProcessWhenProcessingQueue() { function testItDeletesTaskIfThereAreNoSubscribersToProcessWhenProcessingTask() {
$queue = $this->createScheduledQueue(); $task = $this->createScheduledTask();
$queue->subscribers = null; $task->subscribers = null;
$queue->save(); $task->save();
$result = $this->worker->processQueue($queue); $result = $this->worker->processTask($task);
expect(SendingQueue::findOne($queue->id))->isEmpty(); expect(ScheduledTask::findOne($task->id))->isEmpty();
expect($result)->false(); expect($result)->false();
} }
function testItProcessesQueue() { function testItProcessesTask() {
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$this->worker->prepareQueue($queue); $this->worker->prepareTask($task);
expect(!empty($queue->subscribers['to_process']))->true(); expect(!empty($task->subscribers['to_process']))->true();
$this->worker->processQueue($queue); $this->worker->processTask($task);
expect(!empty($queue->subscribers['processed']))->true(); expect(!empty($task->subscribers['processed']))->true();
} }
function testItSetsSubscriberStatusAsBounced() { function testItSetsSubscriberStatusAsBounced() {
@ -104,29 +104,27 @@ class BounceTest extends MailPoetTest {
); );
} }
private function createScheduledQueue() { private function createScheduledTask() {
$queue = SendingQueue::create(); $task = ScheduledTask::create();
$queue->type = 'bounce'; $task->type = 'bounce';
$queue->status = SendingQueue::STATUS_SCHEDULED; $task->status = ScheduledTask::STATUS_SCHEDULED;
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->newsletter_id = 0; $task->save();
$queue->save(); return $task;
return $queue;
} }
private function createRunningQueue() { private function createRunningTask() {
$queue = SendingQueue::create(); $task = ScheduledTask::create();
$queue->type = 'bounce'; $task->type = 'bounce';
$queue->status = null; $task->status = null;
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->newsletter_id = 0; $task->save();
$queue->save(); return $task;
return $queue;
} }
function _after() { function _after() {
ORM::raw_execute('TRUNCATE ' . Setting::$_table); ORM::raw_execute('TRUNCATE ' . Setting::$_table);
ORM::raw_execute('TRUNCATE ' . SendingQueue::$_table); ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
ORM::raw_execute('TRUNCATE ' . Subscriber::$_table); ORM::raw_execute('TRUNCATE ' . Subscriber::$_table);
} }
} }

View File

@ -2,7 +2,7 @@
use Carbon\Carbon; use Carbon\Carbon;
use Codeception\Util\Stub; use Codeception\Util\Stub;
use MailPoet\Models\SendingQueue; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
use MailPoet\Services\Bridge; use MailPoet\Services\Bridge;
@ -20,8 +20,8 @@ class KeyCheckWorkerTest extends MailPoetTest {
} }
function testItReturnsTrueOnSuccessfulKeyCheck() { function testItReturnsTrueOnSuccessfulKeyCheck() {
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$result = $this->worker->processQueueStrategy($queue); $result = $this->worker->processTaskStrategy($task);
expect($result)->true(); expect($result)->true();
} }
@ -34,8 +34,8 @@ class KeyCheckWorkerTest extends MailPoetTest {
), ),
$this $this
); );
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$result = $worker->processQueueStrategy($queue); $result = $worker->processTaskStrategy($task);
expect($result)->false(); expect($result)->false();
} }
@ -48,23 +48,22 @@ class KeyCheckWorkerTest extends MailPoetTest {
), ),
$this $this
); );
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$result = $worker->processQueueStrategy($queue); $result = $worker->processTaskStrategy($task);
expect($result)->false(); expect($result)->false();
} }
private function createRunningQueue() { private function createRunningTask() {
$queue = SendingQueue::create(); $task = ScheduledTask::create();
$queue->type = MockKeyCheckWorker::TASK_TYPE; $task->type = MockKeyCheckWorker::TASK_TYPE;
$queue->status = null; $task->status = null;
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->newsletter_id = 0; $task->save();
$queue->save(); return $task;
return $queue;
} }
function _after() { function _after() {
ORM::raw_execute('TRUNCATE ' . Setting::$_table); ORM::raw_execute('TRUNCATE ' . Setting::$_table);
ORM::raw_execute('TRUNCATE ' . SendingQueue::$_table); ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
} }
} }

View File

@ -4,7 +4,7 @@ use Carbon\Carbon;
use Codeception\Util\Stub; use Codeception\Util\Stub;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Workers\SimpleWorker; use MailPoet\Cron\Workers\SimpleWorker;
use MailPoet\Models\SendingQueue; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
require_once('SimpleWorkerMockImplementation.php'); require_once('SimpleWorkerMockImplementation.php');
@ -46,68 +46,68 @@ class SimpleWorkerTest extends MailPoetTest {
} }
function testItSchedulesTask() { function testItSchedulesTask() {
expect(SendingQueue::where('type', MockSimpleWorker::TASK_TYPE)->findMany())->isEmpty(); expect(ScheduledTask::where('type', MockSimpleWorker::TASK_TYPE)->findMany())->isEmpty();
MockSimpleWorker::schedule(); MockSimpleWorker::schedule();
expect(SendingQueue::where('type', MockSimpleWorker::TASK_TYPE)->findMany())->notEmpty(); expect(ScheduledTask::where('type', MockSimpleWorker::TASK_TYPE)->findMany())->notEmpty();
} }
function testItDoesNotScheduleTaskTwice() { function testItDoesNotScheduleTaskTwice() {
expect(count(SendingQueue::where('type', MockSimpleWorker::TASK_TYPE)->findMany()))->equals(0); expect(count(ScheduledTask::where('type', MockSimpleWorker::TASK_TYPE)->findMany()))->equals(0);
MockSimpleWorker::schedule(); MockSimpleWorker::schedule();
expect(count(SendingQueue::where('type', MockSimpleWorker::TASK_TYPE)->findMany()))->equals(1); expect(count(ScheduledTask::where('type', MockSimpleWorker::TASK_TYPE)->findMany()))->equals(1);
MockSimpleWorker::schedule(); MockSimpleWorker::schedule();
expect(count(SendingQueue::where('type', MockSimpleWorker::TASK_TYPE)->findMany()))->equals(1); expect(count(ScheduledTask::where('type', MockSimpleWorker::TASK_TYPE)->findMany()))->equals(1);
} }
function testItCanGetScheduledQueues() { function testItCanGetScheduledTasks() {
expect(MockSimpleWorker::getScheduledQueues())->isEmpty(); expect(MockSimpleWorker::getScheduledTasks())->isEmpty();
$this->createScheduledQueue(); $this->createScheduledTask();
expect(MockSimpleWorker::getScheduledQueues())->notEmpty(); expect(MockSimpleWorker::getScheduledTasks())->notEmpty();
} }
function testItCanGetRunningQueues() { function testItCanGetRunningTasks() {
expect(MockSimpleWorker::getRunningQueues())->isEmpty(); expect(MockSimpleWorker::getRunningTasks())->isEmpty();
$this->createRunningQueue(); $this->createRunningTask();
expect(MockSimpleWorker::getRunningQueues())->notEmpty(); expect(MockSimpleWorker::getRunningTasks())->notEmpty();
} }
function testItCanGetAllDueQueues() { function testItCanGetAllDueTasks() {
expect(MockSimpleWorker::getAllDueQueues())->isEmpty(); expect(MockSimpleWorker::getAllDueTasks())->isEmpty();
// scheduled for now // scheduled for now
$this->createScheduledQueue(); $this->createScheduledTask();
// running // running
$this->createRunningQueue(); $this->createRunningTask();
// scheduled in the future (should not be retrieved) // scheduled in the future (should not be retrieved)
$queue = $this->createScheduledQueue(); $task = $this->createScheduledTask();
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->addDays(7); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->addDays(7);
$queue->save(); $task->save();
// completed (should not be retrieved) // completed (should not be retrieved)
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$queue->status = SendingQueue::STATUS_COMPLETED; $task->status = ScheduledTask::STATUS_COMPLETED;
$queue->save(); $task->save();
expect(count(MockSimpleWorker::getAllDueQueues()))->equals(2); expect(count(MockSimpleWorker::getAllDueTasks()))->equals(2);
} }
function testItCanGetFutureQueues() { function testItCanGetFutureTasks() {
expect(MockSimpleWorker::getFutureQueues())->isEmpty(); expect(MockSimpleWorker::getFutureTasks())->isEmpty();
$queue = $this->createScheduledQueue(); $task = $this->createScheduledTask();
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->addDays(7); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->addDays(7);
$queue->save(); $task->save();
expect(count(MockSimpleWorker::getFutureQueues()))->notEmpty(); expect(count(MockSimpleWorker::getFutureTasks()))->notEmpty();
} }
function testItFailsToProcessWithoutQueues() { function testItFailsToProcessWithoutTasks() {
expect($this->worker->process())->false(); expect($this->worker->process())->false();
} }
function testItFailsToProcessWithoutProcessingRequirementsMet() { function testItFailsToProcessWithoutProcessingRequirementsMet() {
$this->createScheduledQueue(); $this->createScheduledTask();
$this->createRunningQueue(); $this->createRunningTask();
$worker = Stub::make( $worker = Stub::make(
$this->worker, $this->worker,
array('checkProcessingRequirements' => false), array('checkProcessingRequirements' => false),
@ -126,42 +126,42 @@ class SimpleWorkerTest extends MailPoetTest {
} }
function testItProcesses() { function testItProcesses() {
$this->createScheduledQueue(); $this->createScheduledTask();
$this->createRunningQueue(); $this->createRunningTask();
expect($this->worker->process())->true(); expect($this->worker->process())->true();
} }
function testItPreparesQueue() { function testItPreparesTask() {
$queue = $this->createScheduledQueue(); $task = $this->createScheduledTask();
$this->worker->prepareQueue($queue); $this->worker->prepareTask($task);
expect($queue->status)->null(); expect($task->status)->null();
} }
function testItProcessesQueue() { function testItProcessesTask() {
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$result = $this->worker->processQueue($queue); $result = $this->worker->processTask($task);
expect($queue->status)->equals(SendingQueue::STATUS_COMPLETED); expect($task->status)->equals(ScheduledTask::STATUS_COMPLETED);
expect($result)->equals(true); expect($result)->equals(true);
} }
function testItReturnsFalseIfInnerProcessingFunctionReturnsFalse() { function testItReturnsFalseIfInnerProcessingFunctionReturnsFalse() {
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$worker = Stub::construct( $worker = Stub::construct(
$this->worker, $this->worker,
array(), array(),
array('processQueueStrategy' => false), array('processTaskStrategy' => false),
$this $this
); );
$result = $worker->processQueue($queue); $result = $worker->processTask($task);
expect($queue->status)->equals(null); expect($task->status)->equals(null);
expect($result)->equals(false); expect($result)->equals(false);
} }
function testItCanRescheduleTasks() { function testItCanRescheduleTasks() {
$queue = $this->createRunningQueue(); $task = $this->createRunningTask();
$scheduled_at = $queue->scheduled_at; $scheduled_at = $task->scheduled_at;
$this->worker->reschedule($queue, 10); $this->worker->reschedule($task, 10);
expect($scheduled_at < $queue->scheduled_at)->true(); expect($scheduled_at < $task->scheduled_at)->true();
} }
function testItCalculatesNextRunDateWithinNextWeekBoundaries() { function testItCalculatesNextRunDateWithinNextWeekBoundaries() {
@ -174,28 +174,26 @@ class SimpleWorkerTest extends MailPoetTest {
expect($difference)->greaterOrEquals(0); expect($difference)->greaterOrEquals(0);
} }
private function createScheduledQueue() { private function createScheduledTask() {
$queue = SendingQueue::create(); $task = ScheduledTask::create();
$queue->type = MockSimpleWorker::TASK_TYPE; $task->type = MockSimpleWorker::TASK_TYPE;
$queue->status = SendingQueue::STATUS_SCHEDULED; $task->status = ScheduledTask::STATUS_SCHEDULED;
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->newsletter_id = 0; $task->save();
$queue->save(); return $task;
return $queue;
} }
private function createRunningQueue() { private function createRunningTask() {
$queue = SendingQueue::create(); $task = ScheduledTask::create();
$queue->type = MockSimpleWorker::TASK_TYPE; $task->type = MockSimpleWorker::TASK_TYPE;
$queue->status = null; $task->status = null;
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->newsletter_id = 0; $task->save();
$queue->save(); return $task;
return $queue;
} }
function _after() { function _after() {
ORM::raw_execute('TRUNCATE ' . Setting::$_table); ORM::raw_execute('TRUNCATE ' . Setting::$_table);
ORM::raw_execute('TRUNCATE ' . SendingQueue::$_table); ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
} }
} }