Add a limit to fetch 5 scheduled tasks at a time [MAILPOET-1366]

This commit is contained in:
Tautvidas Sipavičius
2018-05-02 23:27:30 +03:00
parent 576796db22
commit a588e95762
7 changed files with 94 additions and 15 deletions

View File

@ -24,7 +24,7 @@ class WordPress {
static function checkExecutionRequirements() { static function checkExecutionRequirements() {
// migration // migration
$migration_disabled = Setting::getValue('cron_trigger.method') === 'none'; $migration_disabled = Setting::getValue('cron_trigger.method') === 'none';
$migration_due_tasks = MigrationWorker::getAllDueTasks(); $migration_due_tasks = MigrationWorker::getDueTasks();
$migration_completed_tasks = MigrationWorker::getCompletedTasks(); $migration_completed_tasks = MigrationWorker::getCompletedTasks();
$migration_future_tasks = MigrationWorker::getFutureTasks(); $migration_future_tasks = MigrationWorker::getFutureTasks();
// sending queue // sending queue
@ -35,14 +35,14 @@ class WordPress {
// sending service // sending service
$mp_sending_enabled = Bridge::isMPSendingServiceEnabled(); $mp_sending_enabled = Bridge::isMPSendingServiceEnabled();
// bounce sync // bounce sync
$bounce_due_tasks = BounceWorker::getAllDueTasks(); $bounce_due_tasks = BounceWorker::getDueTasks();
$bounce_future_tasks = BounceWorker::getFutureTasks(); $bounce_future_tasks = BounceWorker::getFutureTasks();
// sending service key check // sending service key check
$msskeycheck_due_tasks = SendingServiceKeyCheckWorker::getAllDueTasks(); $msskeycheck_due_tasks = SendingServiceKeyCheckWorker::getDueTasks();
$msskeycheck_future_tasks = SendingServiceKeyCheckWorker::getFutureTasks(); $msskeycheck_future_tasks = SendingServiceKeyCheckWorker::getFutureTasks();
// premium key check // premium key check
$premium_key_specified = Bridge::isPremiumKeySpecified(); $premium_key_specified = Bridge::isPremiumKeySpecified();
$premium_keycheck_due_tasks = PremiumKeyCheckWorker::getAllDueTasks(); $premium_keycheck_due_tasks = PremiumKeyCheckWorker::getDueTasks();
$premium_keycheck_future_tasks = PremiumKeyCheckWorker::getFutureTasks(); $premium_keycheck_future_tasks = PremiumKeyCheckWorker::getFutureTasks();
// check requirements for each worker // check requirements for each worker
$sending_queue_active = (($scheduled_queues || $running_queues) && !$sending_limit_reached && !$sending_is_paused); $sending_queue_active = (($scheduled_queues || $running_queues) && !$sending_limit_reached && !$sending_is_paused);

View File

@ -18,6 +18,7 @@ require_once(ABSPATH . 'wp-includes/pluggable.php');
class Scheduler { class Scheduler {
public $timer; public $timer;
const UNCONFIRMED_SUBSCRIBER_RESCHEDULE_TIMEOUT = 5; const UNCONFIRMED_SUBSCRIBER_RESCHEDULE_TIMEOUT = 5;
const TASK_BATCH_SIZE = 5;
function __construct($timer = false) { function __construct($timer = false) {
$this->timer = ($timer) ? $timer : microtime(true); $this->timer = ($timer) ? $timer : microtime(true);
@ -177,6 +178,6 @@ class Scheduler {
} }
static function getScheduledQueues() { static function getScheduledQueues() {
return SendingTask::getScheduledQueues(); return SendingTask::getScheduledQueues(self::TASK_BATCH_SIZE);
} }
} }

View File

@ -21,6 +21,7 @@ class SendingQueue {
public $newsletter_task; public $newsletter_task;
public $timer; public $timer;
const BATCH_SIZE = 20; const BATCH_SIZE = 20;
const TASK_BATCH_SIZE = 5;
function __construct($timer = false, $mailer_task = false, $newsletter_task = false) { function __construct($timer = false, $mailer_task = false, $newsletter_task = false) {
$this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask(); $this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask();
@ -195,6 +196,6 @@ class SendingQueue {
} }
static function getRunningQueues() { static function getRunningQueues() {
return SendingTask::getRunningQueues(); return SendingTask::getRunningQueues(self::TASK_BATCH_SIZE);
} }
} }

View File

@ -12,6 +12,8 @@ if(!defined('ABSPATH')) exit;
abstract class SimpleWorker { abstract class SimpleWorker {
public $timer; public $timer;
const TASK_BATCH_SIZE = 5;
function __construct($timer = false) { function __construct($timer = false) {
if(!defined('static::TASK_TYPE')) { if(!defined('static::TASK_TYPE')) {
throw new \Exception('Constant TASK_TYPE is not defined on subclass ' . get_class($this)); throw new \Exception('Constant TASK_TYPE is not defined on subclass ' . get_class($this));
@ -121,6 +123,7 @@ abstract class SimpleWorker {
->$dateWhere('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp'))) ->$dateWhere('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp')))
->whereNull('deleted_at') ->whereNull('deleted_at')
->where('status', ScheduledTask::STATUS_SCHEDULED) ->where('status', ScheduledTask::STATUS_SCHEDULED)
->limit(self::TASK_BATCH_SIZE)
->findMany(); ->findMany();
} }
@ -129,10 +132,11 @@ abstract class SimpleWorker {
->whereLte('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp'))) ->whereLte('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp')))
->whereNull('deleted_at') ->whereNull('deleted_at')
->whereNull('status') ->whereNull('status')
->limit(self::TASK_BATCH_SIZE)
->findMany(); ->findMany();
} }
static function getAllDueTasks() { static function getDueTasks() {
$scheduled_tasks = self::getScheduledTasks(); $scheduled_tasks = self::getScheduledTasks();
$running_tasks = self::getRunningTasks(); $running_tasks = self::getRunningTasks();
return array_merge((array)$scheduled_tasks, (array)$running_tasks); return array_merge((array)$scheduled_tasks, (array)$running_tasks);
@ -146,6 +150,7 @@ abstract class SimpleWorker {
return ScheduledTask::where('type', static::TASK_TYPE) return ScheduledTask::where('type', static::TASK_TYPE)
->whereNull('deleted_at') ->whereNull('deleted_at')
->where('status', ScheduledTask::STATUS_COMPLETED) ->where('status', ScheduledTask::STATUS_COMPLETED)
->limit(self::TASK_BATCH_SIZE)
->findMany(); ->findMany();
} }
} }

View File

@ -221,10 +221,11 @@ class Sending {
return in_array($prop, $this->common_fields); return in_array($prop, $this->common_fields);
} }
static function getScheduledQueues() { static function getScheduledQueues($amount = 5) {
$tasks = ScheduledTask::where('status', ScheduledTask::STATUS_SCHEDULED) $tasks = ScheduledTask::where('status', ScheduledTask::STATUS_SCHEDULED)
->whereLte('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp'))) ->whereLte('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp')))
->where('type', 'sending') ->where('type', 'sending')
->limit($amount)
->findMany(); ->findMany();
$result = array(); $result = array();
foreach($tasks as $task) { foreach($tasks as $task) {
@ -233,12 +234,13 @@ class Sending {
return array_filter($result); return array_filter($result);
} }
static function getRunningQueues() { static function getRunningQueues($amount = 5) {
$tasks = ScheduledTask::orderByAsc('priority') $tasks = ScheduledTask::orderByAsc('priority')
->orderByAsc('created_at') ->orderByAsc('created_at')
->whereNull('deleted_at') ->whereNull('deleted_at')
->whereNull('status') ->whereNull('status')
->where('type', 'sending') ->where('type', 'sending')
->limit($amount)
->findMany(); ->findMany();
$result = array(); $result = array();
foreach($tasks as $task) { foreach($tasks as $task) {

View File

@ -66,14 +66,38 @@ class SimpleWorkerTest extends \MailPoetTest {
expect(MockSimpleWorker::getScheduledTasks())->notEmpty(); expect(MockSimpleWorker::getScheduledTasks())->notEmpty();
} }
function testItCanGetABatchOfScheduledTasks() {
expect(MockSimpleWorker::getScheduledTasks())->isEmpty();
for($i = 0; $i < MockSimpleWorker::TASK_BATCH_SIZE + 5; $i += 1) {
$this->createScheduledTask();
}
expect(count(MockSimpleWorker::getScheduledTasks()))->equals(MockSimpleWorker::TASK_BATCH_SIZE);
}
function testItCanGetRunningTasks() { function testItCanGetRunningTasks() {
expect(MockSimpleWorker::getRunningTasks())->isEmpty(); expect(MockSimpleWorker::getRunningTasks())->isEmpty();
$this->createRunningTask(); $this->createRunningTask();
expect(MockSimpleWorker::getRunningTasks())->notEmpty(); expect(MockSimpleWorker::getRunningTasks())->notEmpty();
} }
function testItCanGetAllDueTasks() { function testItCanGetBatchOfRunningTasks() {
expect(MockSimpleWorker::getAllDueTasks())->isEmpty(); expect(MockSimpleWorker::getRunningTasks())->isEmpty();
for($i = 0; $i < MockSimpleWorker::TASK_BATCH_SIZE + 5; $i += 1) {
$this->createRunningTask();
}
expect(count(MockSimpleWorker::getRunningTasks()))->equals(MockSimpleWorker::TASK_BATCH_SIZE);
}
function testItCanGetBatchOfCompletedTasks() {
expect(MockSimpleWorker::getCompletedTasks())->isEmpty();
for($i = 0; $i < MockSimpleWorker::TASK_BATCH_SIZE + 5; $i += 1) {
$this->createCompletedTask();
}
expect(count(MockSimpleWorker::getCompletedTasks()))->equals(MockSimpleWorker::TASK_BATCH_SIZE);
}
function testItCanGetDueTasks() {
expect(MockSimpleWorker::getDueTasks())->isEmpty();
// scheduled for now // scheduled for now
$this->createScheduledTask(); $this->createScheduledTask();
@ -91,7 +115,7 @@ class SimpleWorkerTest extends \MailPoetTest {
$task->status = ScheduledTask::STATUS_COMPLETED; $task->status = ScheduledTask::STATUS_COMPLETED;
$task->save(); $task->save();
expect(count(MockSimpleWorker::getAllDueTasks()))->equals(2); expect(count(MockSimpleWorker::getDueTasks()))->equals(2);
} }
function testItCanGetFutureTasks() { function testItCanGetFutureTasks() {
@ -193,6 +217,15 @@ class SimpleWorkerTest extends \MailPoetTest {
return $task; return $task;
} }
private function createCompletedTask() {
$task = ScheduledTask::create();
$task->type = MockSimpleWorker::TASK_TYPE;
$task->status = ScheduledTask::STATUS_COMPLETED;
$task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$task->save();
return $task;
}
function _after() { function _after() {
\ORM::raw_execute('TRUNCATE ' . Setting::$_table); \ORM::raw_execute('TRUNCATE ' . Setting::$_table);
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); \ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);

View File

@ -140,6 +140,14 @@ class SendingTest extends \MailPoetTest {
expect($tasks)->isEmpty(); expect($tasks)->isEmpty();
} }
function testItGetsBatchOfScheduledQueues() {
$amount = 5;
for($i = 0; $i < $amount + 3; $i += 1) {
$this->createNewSendingTask(ScheduledTask::STATUS_SCHEDULED);
}
expect(count(SendingTask::getScheduledQueues($amount)))->equals($amount);
}
function testItGetsRunningQueues() { function testItGetsRunningQueues() {
$this->sending->status = null; $this->sending->status = null;
$this->sending->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->subHours(1); $this->sending->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->subHours(1);
@ -156,6 +164,35 @@ class SendingTest extends \MailPoetTest {
expect($tasks)->isEmpty(); expect($tasks)->isEmpty();
} }
function testItGetsBatchOfRunningQueues() {
$amount = 5;
for($i = 0; $i < $amount + 3; $i += 1) {
$this->createNewSendingTask(null);
}
expect(count(SendingTask::getRunningQueues($amount)))->equals($amount);
}
function createNewSendingTask($status = null) {
$newsletter = Newsletter::create();
$newsletter->type = Newsletter::TYPE_STANDARD;
$newsletter->save();
$task = ScheduledTask::create();
$task->type = SendingTask::TASK_TYPE;
$task->save();
$queue = SendingQueue::create();
$queue->newsletter_id = $newsletter->id;
$queue->task_id = $task->id;
$queue->save();
$sending = SendingTask::create($task, $queue);
$sending->setSubscribers(array(123, 456)); // random IDs
$sending->status = $status;
$sending->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->subHours(1);
$sending->save();
}
function _after() { function _after() {
\ORM::raw_execute('TRUNCATE ' . Newsletter::$_table); \ORM::raw_execute('TRUNCATE ' . Newsletter::$_table);
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); \ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);