Rotate scheduled tasks

Scheduled tasks should rotate so that they don't block
sending when they cannot be send.

[MAILPOET-1391]
This commit is contained in:
Pavel Dohnal
2018-06-04 16:00:32 +01:00
parent 24b63d324b
commit b8ce6ff88c
5 changed files with 52 additions and 1 deletions

View File

@ -5,6 +5,7 @@ namespace MailPoet\Cron\Workers;
use Carbon\Carbon; use Carbon\Carbon;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Models\Newsletter; use MailPoet\Models\Newsletter;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
use MailPoet\Models\SubscriberSegment; use MailPoet\Models\SubscriberSegment;
use MailPoet\Segments\SubscribersFinder; use MailPoet\Segments\SubscribersFinder;
@ -29,6 +30,7 @@ class Scheduler {
function process() { function process() {
$scheduled_queues = self::getScheduledQueues(); $scheduled_queues = self::getScheduledQueues();
if(!count($scheduled_queues)) return false; if(!count($scheduled_queues)) return false;
$this->updateTasks($scheduled_queues);
foreach($scheduled_queues as $i => $queue) { foreach($scheduled_queues as $i => $queue) {
$newsletter = Newsletter::filter('filterWithOptions') $newsletter = Newsletter::filter('filterWithOptions')
->findOne($queue->newsletter_id); ->findOne($queue->newsletter_id);
@ -177,6 +179,13 @@ class Scheduler {
false; false;
} }
private function updateTasks(array $scheduled_queues) {
$ids = array_map(function ($queue) {
return $queue->task_id;
}, $scheduled_queues);
ScheduledTask::touchAllByIds($ids);
}
static function getScheduledQueues() { static function getScheduledQueues() {
return SendingTask::getScheduledQueues(self::TASK_BATCH_SIZE); return SendingTask::getScheduledQueues(self::TASK_BATCH_SIZE);
} }

View File

@ -74,4 +74,12 @@ class ScheduledTask extends Model {
parent::save(); parent::save();
return $this; return $this;
} }
static function touchAllByIds(array $ids) {
ScheduledTask::rawExecute(
'UPDATE `' . ScheduledTask::$_table . '`' .
'SET `updated_at` = NOW() ' .
'WHERE `id` IN (' . join(',', $ids) . ')'
);
}
} }

View File

@ -227,6 +227,7 @@ class Sending {
->whereLte('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp'))) ->whereLte('scheduled_at', Carbon::createFromTimestamp(WPFunctions::currentTime('timestamp')))
->where('type', 'sending') ->where('type', 'sending')
->whereNotEqual('status', ScheduledTask::STATUS_PAUSED) ->whereNotEqual('status', ScheduledTask::STATUS_PAUSED)
->orderByAsc('updated_at')
->limit($amount) ->limit($amount)
->findMany(); ->findMany();
$result = array(); $result = array();

View File

@ -571,6 +571,20 @@ class SchedulerTest extends \MailPoetTest {
$scheduler->process(); $scheduler->process();
} }
function testItUpdatesUpdateTime() {
$originalUpdated = Carbon::createFromTimestamp(current_time('timestamp'))->subHours(5)->toDateTimeString();
$newsletter = $this->_createNewsletter(Newsletter::TYPE_WELCOME, Newsletter::STATUS_DRAFT);
$queue = $this->_createQueue($newsletter->id);
$queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$queue->updated_at = $originalUpdated;
$queue->save();
$scheduler = new Scheduler();
$scheduler->timer = microtime(true);
$scheduler->process();
$newQueue = ScheduledTask::findOne($queue->task_id);
expect($newQueue->updated_at)->notEquals($originalUpdated);
}
function _createNewsletterSegment($newsletter_id, $segment_id) { function _createNewsletterSegment($newsletter_id, $segment_id) {
$newsletter_segment = NewsletterSegment::create(); $newsletter_segment = NewsletterSegment::create();
$newsletter_segment->newsletter_id = $newsletter_id; $newsletter_segment->newsletter_id = $newsletter_id;
@ -685,4 +699,4 @@ class SchedulerTest extends \MailPoetTest {
\ORM::raw_execute('TRUNCATE ' . Segment::$_table); \ORM::raw_execute('TRUNCATE ' . Segment::$_table);
\ORM::raw_execute('TRUNCATE ' . NewsletterSegment::$_table); \ORM::raw_execute('TRUNCATE ' . NewsletterSegment::$_table);
} }
} }

View File

@ -176,6 +176,25 @@ class SendingTest extends \MailPoetTest {
expect(SendingTask::getRunningQueues($amount))->count($amount); expect(SendingTask::getRunningQueues($amount))->count($amount);
} }
function testItGetsBatchOfRunningQueuesSortedByUpdatedTime() {
$this->_after();
$sending1 = $this->createNewSendingTask(['status' => ScheduledTask::STATUS_SCHEDULED]);
$sending1->updated_at = '2017-05-04 14:00:00';
$sending1->save();
$sending2 = $this->createNewSendingTask(['status' => ScheduledTask::STATUS_SCHEDULED]);
$sending2->updated_at = '2017-05-04 16:00:00';
$sending2->save();
$sending3 = $this->createNewSendingTask(['status' => ScheduledTask::STATUS_SCHEDULED]);
$sending3->updated_at = '2017-05-04 15:00:00';
$sending3->save();
$queues = SendingTask::getScheduledQueues(3);
expect($queues[0]->task_id)->equals($sending1->id());
expect($queues[1]->task_id)->equals($sending3->id());
expect($queues[2]->task_id)->equals($sending2->id());
}
function createNewNewsletter() { function createNewNewsletter() {
$newsletter = Newsletter::create(); $newsletter = Newsletter::create();
$newsletter->type = Newsletter::TYPE_STANDARD; $newsletter->type = Newsletter::TYPE_STANDARD;