Reschedule failing cron tasks progressively [MAILPOET-2181]

This commit is contained in:
wxa
2019-07-28 16:42:47 +03:00
committed by M. Shull
parent e1b8462254
commit 8b867a7b4f
9 changed files with 86 additions and 27 deletions

View File

@ -130,6 +130,7 @@ class Migrator {
'created_at timestamp NULL,', // must be NULL, see comment at the top 'created_at timestamp NULL,', // must be NULL, see comment at the top
'updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,', 'updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
'deleted_at timestamp NULL,', 'deleted_at timestamp NULL,',
'reschedule_count int(11) NOT NULL DEFAULT 0,',
'meta longtext,', 'meta longtext,',
'PRIMARY KEY (id),', 'PRIMARY KEY (id),',
'KEY type (type),', 'KEY type (type),',

View File

@ -8,8 +8,6 @@ use MailPoet\Services\Bridge;
if (!defined('ABSPATH')) exit; if (!defined('ABSPATH')) exit;
abstract class KeyCheckWorker extends SimpleWorker { abstract class KeyCheckWorker extends SimpleWorker {
const UNAVAILABLE_SERVICE_RESCHEDULE_TIMEOUT = 60;
public $bridge; public $bridge;
function init() { function init() {
@ -26,7 +24,7 @@ abstract class KeyCheckWorker extends SimpleWorker {
} }
if (empty($result['code']) || $result['code'] == Bridge::CHECK_ERROR_UNAVAILABLE) { if (empty($result['code']) || $result['code'] == Bridge::CHECK_ERROR_UNAVAILABLE) {
$this->reschedule($task, self::UNAVAILABLE_SERVICE_RESCHEDULE_TIMEOUT); $task->rescheduleProgressively();
return false; return false;
} }

View File

@ -13,16 +13,13 @@ use MailPoet\Models\SubscriberSegment;
use MailPoet\Segments\SubscribersFinder; use MailPoet\Segments\SubscribersFinder;
use MailPoet\Tasks\Sending as SendingTask; use MailPoet\Tasks\Sending as SendingTask;
use MailPoet\Newsletter\Scheduler\Scheduler as NewsletterScheduler; use MailPoet\Newsletter\Scheduler\Scheduler as NewsletterScheduler;
use MailPoet\WP\Functions as WPFunctions;
if (!defined('ABSPATH')) exit; if (!defined('ABSPATH')) exit;
class Scheduler { class Scheduler {
const UNCONFIRMED_SUBSCRIBER_RESCHEDULE_TIMEOUT = 5;
const TASK_BATCH_SIZE = 5; const TASK_BATCH_SIZE = 5;
public $timer; public $timer;
private $wp;
/** @var SubscribersFinder */ /** @var SubscribersFinder */
private $subscribers_finder; private $subscribers_finder;
@ -31,7 +28,6 @@ class Scheduler {
$this->timer = ($timer) ? $timer : microtime(true); $this->timer = ($timer) ? $timer : microtime(true);
// abort if execution limit is reached // abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer); CronHelper::enforceExecutionLimit($this->timer);
$this->wp = new WPFunctions();
$this->subscribers_finder = $subscribers_finder; $this->subscribers_finder = $subscribers_finder;
} }
@ -177,12 +173,8 @@ class Scheduler {
} }
// check if subscriber is confirmed (subscribed) // check if subscriber is confirmed (subscribed)
if ($subscriber->status !== Subscriber::STATUS_SUBSCRIBED) { if ($subscriber->status !== Subscriber::STATUS_SUBSCRIBED) {
// reschedule delivery in 5 minutes // reschedule delivery
$scheduled_at = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $queue->rescheduleProgressively();
$queue->scheduled_at = $scheduled_at->addMinutes(
self::UNCONFIRMED_SUBSCRIBER_RESCHEDULE_TIMEOUT
);
$queue->save();
return false; return false;
} }
return true; return true;

View File

@ -50,12 +50,20 @@ abstract class SimpleWorker {
return false; return false;
} }
$task = null;
try {
foreach ($scheduled_tasks as $i => $task) { foreach ($scheduled_tasks as $i => $task) {
$this->prepareTask($task); $this->prepareTask($task);
} }
foreach ($running_tasks as $i => $task) { foreach ($running_tasks as $i => $task) {
$this->processTask($task); $this->processTask($task);
} }
} catch (\Exception $e) {
if ($task) {
$task->rescheduleProgressively();
}
throw $e;
}
return true; return true;
} }

View File

@ -2,6 +2,7 @@
namespace MailPoet\Models; namespace MailPoet\Models;
use Carbon\Carbon;
use MailPoet\Util\Helpers; use MailPoet\Util\Helpers;
use MailPoet\WP\Functions as WPFunctions; use MailPoet\WP\Functions as WPFunctions;
@ -14,6 +15,7 @@ if (!defined('ABSPATH')) exit;
* @property string|null $type * @property string|null $type
* @property int $priority * @property int $priority
* @property string|null $scheduled_at * @property string|null $scheduled_at
* @property int $reschedule_count
* @property string|array|null $meta * @property string|array|null $meta
*/ */
class ScheduledTask extends Model { class ScheduledTask extends Model {
@ -26,6 +28,9 @@ class ScheduledTask extends Model {
const PRIORITY_MEDIUM = 5; const PRIORITY_MEDIUM = 5;
const PRIORITY_LOW = 10; const PRIORITY_LOW = 10;
const BASIC_RESCHEDULE_TIMEOUT = 5; //minutes
const MAX_RESCHEDULE_TIMEOUT = 1440; //minutes
private $wp; private $wp;
function __construct() { function __construct() {
@ -131,6 +136,16 @@ class ScheduledTask extends Model {
} }
} }
function rescheduleProgressively() {
$scheduled_at = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
$timeout = min(self::BASIC_RESCHEDULE_TIMEOUT * pow(2, $this->reschedule_count), self::MAX_RESCHEDULE_TIMEOUT);
$this->scheduled_at = $scheduled_at->addMinutes($timeout);
$this->reschedule_count++;
$this->status = ScheduledTask::STATUS_SCHEDULED;
$this->save();
return $timeout;
}
static function touchAllByIds(array $ids) { static function touchAllByIds(array $ids) {
ScheduledTask::rawExecute( ScheduledTask::rawExecute(
'UPDATE `' . ScheduledTask::$_table . '`' . 'UPDATE `' . ScheduledTask::$_table . '`' .

View File

@ -35,11 +35,14 @@ class KeyCheckWorkerTest extends \MailPoetTest {
'checkKey' => function () { 'checkKey' => function () {
throw new \Exception; throw new \Exception;
}, },
'reschedule' => Expected::once(),
], ],
$this $this
); );
$task = $this->createRunningTask(); $task = Stub::make(
ScheduledTask::class,
['rescheduleProgressively' => Expected::once()],
$this
);
$result = $worker->processTaskStrategy($task); $result = $worker->processTaskStrategy($task);
expect($result)->false(); expect($result)->false();
} }
@ -49,11 +52,14 @@ class KeyCheckWorkerTest extends \MailPoetTest {
$this->worker, $this->worker,
[ [
'checkKey' => ['code' => Bridge::CHECK_ERROR_UNAVAILABLE], 'checkKey' => ['code' => Bridge::CHECK_ERROR_UNAVAILABLE],
'reschedule' => Expected::once(),
], ],
$this $this
); );
$task = $this->createRunningTask(); $task = Stub::make(
ScheduledTask::class,
['rescheduleProgressively' => Expected::once()],
$this
);
$result = $worker->processTaskStrategy($task); $result = $worker->processTaskStrategy($task);
expect($result)->false(); expect($result)->false();
} }

View File

@ -21,10 +21,6 @@ use MailPoet\Segments\SubscribersFinder;
use MailPoet\Tasks\Sending as SendingTask; use MailPoet\Tasks\Sending as SendingTask;
class SchedulerTest extends \MailPoetTest { class SchedulerTest extends \MailPoetTest {
function testItDefinesConstants() {
expect(Scheduler::UNCONFIRMED_SUBSCRIBER_RESCHEDULE_TIMEOUT)->equals(5);
}
function testItConstructs() { function testItConstructs() {
$scheduler = new Scheduler($this->makeEmpty(SubscribersFinder::class)); $scheduler = new Scheduler($this->makeEmpty(SubscribersFinder::class));
expect($scheduler->timer)->greaterOrEquals(5); expect($scheduler->timer)->greaterOrEquals(5);
@ -329,7 +325,7 @@ class SchedulerTest extends \MailPoetTest {
$updated_queue = SendingTask::createFromQueue(SendingQueue::findOne($queue->id)); $updated_queue = SendingTask::createFromQueue(SendingQueue::findOne($queue->id));
expect(Carbon::parse($updated_queue->scheduled_at))->equals( expect(Carbon::parse($updated_queue->scheduled_at))->equals(
Carbon::createFromTimestamp(current_time('timestamp')) Carbon::createFromTimestamp(current_time('timestamp'))
->addMinutes(Scheduler::UNCONFIRMED_SUBSCRIBER_RESCHEDULE_TIMEOUT) ->addMinutes(ScheduledTask::BASIC_RESCHEDULE_TIMEOUT)
); );
} }

View File

@ -180,6 +180,31 @@ class SimpleWorkerTest extends \MailPoetTest {
expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED); expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED);
} }
function testWillRescheduleATaskIfItFails() {
$task = $this->createRunningTask();
$worker = Stub::construct(
$this->worker,
[],
[
'processTaskStrategy' => function () {
throw new \Exception('test error');
},
],
$this
);
$scheduled_at = $task->scheduled_at;
try {
$worker->process();
$this->fail('An exception should be thrown');
} catch (\Exception $e) {
expect($e->getMessage())->equals('test error');
$task = ScheduledTask::findOne($task->id);
expect($scheduled_at < $task->scheduled_at)->true();
expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED);
expect($task->reschedule_count)->equals(1);
}
}
function testItCalculatesNextRunDateWithinNextWeekBoundaries() { function testItCalculatesNextRunDateWithinNextWeekBoundaries() {
$current_date = Carbon::createFromTimestamp(current_time('timestamp')); $current_date = Carbon::createFromTimestamp(current_time('timestamp'));
$next_run_date = MockSimpleWorker::getNextRunDate(); $next_run_date = MockSimpleWorker::getNextRunDate();

View File

@ -137,6 +137,24 @@ class ScheduledTaskTest extends \MailPoetTest {
expect($task->meta)->equals($meta); expect($task->meta)->equals($meta);
} }
function testItCanRescheduleTasksProgressively() {
$task = $this->task;
$task->status = null;
$scheduled_at = $task->scheduled_at;
$timeout = $task->rescheduleProgressively();
expect($timeout)->equals(ScheduledTask::BASIC_RESCHEDULE_TIMEOUT);
expect($scheduled_at < $task->scheduled_at)->true();
expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED);
$timeout = $task->rescheduleProgressively();
expect($timeout)->equals(ScheduledTask::BASIC_RESCHEDULE_TIMEOUT * 2);
$task->reschedule_count = 123456; // too many
$timeout = $task->rescheduleProgressively();
expect($timeout)->equals(ScheduledTask::MAX_RESCHEDULE_TIMEOUT);
}
function _after() { function _after() {
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); \ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
\ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table); \ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table);