Add a single-instance simple worker [MAILPOET-2385]
This commit is contained in:
@@ -118,6 +118,7 @@ abstract class SimpleWorker {
|
||||
function reschedule(ScheduledTask $task, $timeout) {
|
||||
$scheduled_at = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
|
||||
$task->scheduled_at = $scheduled_at->addMinutes($timeout);
|
||||
$task->setExpr('updated_at', 'NOW()');
|
||||
$task->status = ScheduledTask::STATUS_SCHEDULED;
|
||||
$task->save();
|
||||
}
|
||||
|
71
lib/Cron/Workers/SingleInstanceSimpleWorker.php
Normal file
71
lib/Cron/Workers/SingleInstanceSimpleWorker.php
Normal file
@@ -0,0 +1,71 @@
|
||||
<?php
|
||||
|
||||
namespace MailPoet\Cron\Workers;
|
||||
|
||||
use Carbon\Carbon;
|
||||
use MailPoet\Cron\CronHelper;
|
||||
use MailPoet\Models\ScheduledTask;
|
||||
use MailPoet\WP\Functions as WPFunctions;
|
||||
|
||||
abstract class SingleInstanceSimpleWorker extends SimpleWorker {
|
||||
const TASK_RUN_TIMEOUT = 120;
|
||||
const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5;
|
||||
|
||||
function processTask(ScheduledTask $task) {
|
||||
// abort if execution limit is reached
|
||||
CronHelper::enforceExecutionLimit($this->timer);
|
||||
|
||||
if ($this->isInProgress($task)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->startProgress($task);
|
||||
$completed = $this->processTaskStrategy($task);
|
||||
if ($completed) {
|
||||
$this->complete($task);
|
||||
}
|
||||
$this->stopProgress($task);
|
||||
|
||||
return (bool)$completed;
|
||||
}
|
||||
|
||||
function isInProgress(ScheduledTask $task) {
|
||||
$meta = $task->getMeta();
|
||||
$current_time = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'));
|
||||
$updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at));
|
||||
|
||||
// If the task is running for too long consider it stuck and reschedule
|
||||
if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) {
|
||||
$this->ensureMetaIsArray($task);
|
||||
$task->meta = array_merge($task->getMeta(), ['in_progress' => null]);
|
||||
$this->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT);
|
||||
return true;
|
||||
} elseif (!empty($meta['in_progress'])) {
|
||||
// Do not run multiple instances of the task
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function startProgress(ScheduledTask $task) {
|
||||
$this->ensureMetaIsArray($task);
|
||||
$task->meta = array_merge($task->getMeta(), ['in_progress' => true]);
|
||||
$task->save();
|
||||
}
|
||||
|
||||
function stopProgress(ScheduledTask $task) {
|
||||
$this->ensureMetaIsArray($task);
|
||||
$task->meta = array_merge($task->getMeta(), ['in_progress' => null]);
|
||||
$task->save();
|
||||
}
|
||||
|
||||
private function ensureMetaIsArray(ScheduledTask $task) {
|
||||
$meta = $task->getMeta();
|
||||
if (empty($meta)) {
|
||||
$task->meta = [];
|
||||
} elseif (!is_array($meta)) {
|
||||
$task->meta = (array)$meta;
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,6 @@
|
||||
<?php
|
||||
namespace MailPoet\Cron\Workers;
|
||||
|
||||
class SingleInstanceSimpleWorkerMockImplementation extends SingleInstanceSimpleWorker {
|
||||
const TASK_TYPE = 'mock_simple_worker';
|
||||
}
|
@@ -0,0 +1,72 @@
|
||||
<?php
|
||||
namespace MailPoet\Test\Cron\Workers;
|
||||
|
||||
use Carbon\Carbon;
|
||||
use MailPoet\Models\ScheduledTask;
|
||||
|
||||
require_once('SingleInstanceSimpleWorkerMockImplementation.php');
|
||||
use MailPoet\Cron\Workers\SingleInstanceSimpleWorkerMockImplementation as MockSimpleWorker;
|
||||
|
||||
class SingleInstanceSimpleWorkerTest extends \MailPoetTest {
|
||||
function _before() {
|
||||
$this->worker = $this->getMockBuilder(MockSimpleWorker::class)
|
||||
->setMethods(['processTaskStrategy'])
|
||||
->getMock();
|
||||
}
|
||||
|
||||
function testItWillNotRunInMultipleInstances() {
|
||||
$this->worker->expects($this->once())
|
||||
->method('processTaskStrategy')
|
||||
->willReturn(true);
|
||||
$task = $this->createScheduledTask();
|
||||
expect($this->worker->isInProgress($task))->equals(false);
|
||||
expect($this->worker->processTask($task))->equals(true);
|
||||
$this->worker->startProgress($task);
|
||||
expect($this->worker->isInProgress($task))->equals(true);
|
||||
expect($this->worker->processTask($task))->equals(false);
|
||||
}
|
||||
|
||||
function testItWillKeepTheInProgressFlagOnFail() {
|
||||
$task = $this->createScheduledTask();
|
||||
$this->worker->expects($this->once())
|
||||
->method('processTaskStrategy')
|
||||
->willThrowException(new \Exception('test error'));
|
||||
try {
|
||||
$this->worker->processTask($task);
|
||||
$this->fail('An exception should be thrown');
|
||||
} catch (\Exception $e) {
|
||||
expect($e->getMessage())->equals('test error');
|
||||
expect($this->worker->isInProgress($task))->equals(true);
|
||||
}
|
||||
}
|
||||
|
||||
function testItWillRescheduleTaskIfItIsRunningForTooLong() {
|
||||
$this->worker->expects($this->once())
|
||||
->method('processTaskStrategy')
|
||||
->willReturn(true);
|
||||
$task = $this->createScheduledTask();
|
||||
$task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB
|
||||
expect($this->worker->processTask($task))->equals(true);
|
||||
$scheduled_at = $task->scheduled_at;
|
||||
$task->updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at))
|
||||
->subMinutes(MockSimpleWorker::TASK_RUN_TIMEOUT + 1);
|
||||
expect($this->worker->processTask($task))->equals(false);
|
||||
$task = ScheduledTask::findOne($task->id);
|
||||
expect($scheduled_at < $task->scheduled_at)->true();
|
||||
expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED);
|
||||
expect($this->worker->isInProgress($task))->equals(false);
|
||||
}
|
||||
|
||||
private function createScheduledTask() {
|
||||
$task = ScheduledTask::create();
|
||||
$task->type = MockSimpleWorker::TASK_TYPE;
|
||||
$task->status = null;
|
||||
$task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
|
||||
$task->save();
|
||||
return $task;
|
||||
}
|
||||
|
||||
function _after() {
|
||||
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user