Merge SingleInstanceSimpleWorker into SimpleWorker [MAILPOET-2385]

This commit is contained in:
wxa
2019-09-30 08:02:09 +03:00
committed by Jack Kitterhing
parent 0805dc365f
commit 12c526e120
8 changed files with 119 additions and 157 deletions

View File

@ -6,9 +6,10 @@ use MailPoet\Models\ScheduledTask;
use MailPoet\Settings\SettingsController;
use MailPoet\Subscribers\InactiveSubscribersController;
class InactiveSubscribers extends SingleInstanceSimpleWorker {
class InactiveSubscribers extends SimpleWorker {
const TASK_TYPE = 'inactive_subscribers';
const BATCH_SIZE = 1000;
const SUPPORT_MULTIPLE_INSTANCES = false;
/** @var InactiveSubscribersController */
private $inactive_subscribers_controller;

View File

@ -14,6 +14,10 @@ abstract class SimpleWorker {
const TASK_BATCH_SIZE = 5;
const AUTOMATIC_SCHEDULING = true;
const SUPPORT_MULTIPLE_INSTANCES = true;
const TASK_RUN_TIMEOUT = 120;
const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5;
function __construct($timer = false) {
if (static::TASK_TYPE === null) {
throw new \Exception('Constant TASK_TYPE is not defined on subclass ' . get_class($this));
@ -97,12 +101,34 @@ abstract class SimpleWorker {
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
if ($this->processTaskStrategy($task)) {
$this->complete($task);
return true;
if (!static::SUPPORT_MULTIPLE_INSTANCES) {
if ($this->isInProgress($task)) {
return false;
}
if ($this->rescheduleOutdated($task)) {
return false;
}
$this->startProgress($task);
}
return false;
try {
$completed = $this->processTaskStrategy($task);
} catch (\Exception $e) {
if (!static::SUPPORT_MULTIPLE_INSTANCES) {
$this->stopProgress($task);
}
throw $e;
}
if ($completed) {
$this->complete($task);
}
if (!static::SUPPORT_MULTIPLE_INSTANCES) {
$this->stopProgress($task);
}
return (bool)$completed;
}
function processTaskStrategy(ScheduledTask $task) {
@ -123,6 +149,39 @@ abstract class SimpleWorker {
$task->save();
}
private function isInProgress(ScheduledTask $task) {
$meta = $task->getMeta();
if (!empty($meta['in_progress'])) {
// Do not run multiple instances of the task
return true;
}
return false;
}
private function startProgress(ScheduledTask $task) {
$task->meta = array_merge($task->getMeta(), ['in_progress' => true]);
$task->save();
}
private function stopProgress(ScheduledTask $task) {
$task->meta = array_merge($task->getMeta(), ['in_progress' => null]);
$task->save();
}
private function rescheduleOutdated(ScheduledTask $task) {
$current_time = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'));
$updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at));
// If the task is running for too long consider it stuck and reschedule
if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) {
$this->stopProgress($task);
$this->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT);
return true;
}
return false;
}
static function getNextRunDate() {
$wp = new WPFunctions();
$date = Carbon::createFromTimestamp($wp->currentTime('timestamp'));

View File

@ -1,74 +0,0 @@
<?php
namespace MailPoet\Cron\Workers;
use Carbon\Carbon;
use MailPoet\Cron\CronHelper;
use MailPoet\Models\ScheduledTask;
use MailPoet\WP\Functions as WPFunctions;
abstract class SingleInstanceSimpleWorker extends SimpleWorker {
const TASK_RUN_TIMEOUT = 120;
const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5;
function processTask(ScheduledTask $task) {
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
if ($this->isInProgress($task)) {
return false;
}
if ($this->rescheduleOutdated($task)) {
return false;
}
$this->startProgress($task);
try {
$completed = $this->processTaskStrategy($task);
} catch (\Exception $e) {
$this->stopProgress($task);
throw $e;
}
if ($completed) {
$this->complete($task);
}
$this->stopProgress($task);
return (bool)$completed;
}
private function isInProgress(ScheduledTask $task) {
$meta = $task->getMeta();
if (!empty($meta['in_progress'])) {
// Do not run multiple instances of the task
return true;
}
return false;
}
private function startProgress(ScheduledTask $task) {
$task->meta = array_merge($task->getMeta(), ['in_progress' => true]);
$task->save();
}
private function stopProgress(ScheduledTask $task) {
$task->meta = array_merge($task->getMeta(), ['in_progress' => null]);
$task->save();
}
private function rescheduleOutdated(ScheduledTask $task) {
$current_time = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'));
$updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at));
// If the task is running for too long consider it stuck and reschedule
if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) {
$this->stopProgress($task);
$this->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT);
return true;
}
return false;
}
}

View File

@ -7,8 +7,9 @@ use MailPoet\Segments\WooCommerce as WooCommerceSegment;
use MailPoet\WooCommerce\Helper as WooCommerceHelper;
use MailPoet\WP\Functions as WPFunctions;
class WooCommerceSync extends SingleInstanceSimpleWorker {
class WooCommerceSync extends SimpleWorker {
const TASK_TYPE = 'woocommerce_sync';
const SUPPORT_MULTIPLE_INSTANCES = false;
/** @var WooCommerceSegment */
private $woocommerce_segment;

View File

@ -3,6 +3,7 @@ namespace MailPoet\Cron\Workers;
class SimpleWorkerMockImplementation extends SimpleWorker {
const TASK_TYPE = 'mock_simple_worker';
const SUPPORT_MULTIPLE_INSTANCES = false;
function init() {
// to be mocked

View File

@ -205,6 +205,57 @@ class SimpleWorkerTest extends \MailPoetTest {
}
}
function testItWillNotRunInMultipleInstances() {
$worker = $this->getMockBuilder(MockSimpleWorker::class)
->setMethods(['processTaskStrategy'])
->getMock();
$worker->expects($this->once())
->method('processTaskStrategy')
->willReturn(true);
$task = $this->createRunningTask();
expect(empty($task->getMeta()['in_progress']))->equals(true);
expect($worker->processTask($task))->equals(true);
$task->meta = ['in_progress' => true];
expect($worker->processTask($task))->equals(false);
}
function testItWillResetTheInProgressFlagOnFail() {
$worker = $this->getMockBuilder(MockSimpleWorker::class)
->setMethods(['processTaskStrategy'])
->getMock();
$worker->expects($this->once())
->method('processTaskStrategy')
->willThrowException(new \Exception('test error'));
$task = $this->createRunningTask();
try {
$worker->processTask($task);
$this->fail('An exception should be thrown');
} catch (\Exception $e) {
expect($e->getMessage())->equals('test error');
expect(empty($task->getMeta()['in_progress']))->equals(true);
}
}
function testItWillRescheduleTaskIfItIsRunningForTooLong() {
$worker = $this->getMockBuilder(MockSimpleWorker::class)
->setMethods(['processTaskStrategy'])
->getMock();
$worker->expects($this->once())
->method('processTaskStrategy')
->willReturn(true);
$task = $this->createRunningTask();
$task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB
expect($worker->processTask($task))->equals(true);
$scheduled_at = $task->scheduled_at;
$task->updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at))
->subMinutes(MockSimpleWorker::TASK_RUN_TIMEOUT + 1);
expect($worker->processTask($task))->equals(false);
$task = ScheduledTask::findOne($task->id);
expect($scheduled_at < $task->scheduled_at)->true();
expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED);
expect(empty($task->getMeta()['in_progress']))->equals(true);
}
function testItCalculatesNextRunDateWithinNextWeekBoundaries() {
$current_date = Carbon::createFromTimestamp(current_time('timestamp'));
$next_run_date = MockSimpleWorker::getNextRunDate();

View File

@ -1,6 +0,0 @@
<?php
namespace MailPoet\Cron\Workers;
class SingleInstanceSimpleWorkerMockImplementation extends SingleInstanceSimpleWorker {
const TASK_TYPE = 'mock_simple_worker';
}

View File

@ -1,71 +0,0 @@
<?php
namespace MailPoet\Test\Cron\Workers;
use Carbon\Carbon;
use MailPoet\Models\ScheduledTask;
require_once('SingleInstanceSimpleWorkerMockImplementation.php');
use MailPoet\Cron\Workers\SingleInstanceSimpleWorkerMockImplementation as MockSimpleWorker;
class SingleInstanceSimpleWorkerTest extends \MailPoetTest {
function _before() {
$this->worker = $this->getMockBuilder(MockSimpleWorker::class)
->setMethods(['processTaskStrategy'])
->getMock();
}
function testItWillNotRunInMultipleInstances() {
$this->worker->expects($this->once())
->method('processTaskStrategy')
->willReturn(true);
$task = $this->createScheduledTask();
expect(empty($task->getMeta()['in_progress']))->equals(true);
expect($this->worker->processTask($task))->equals(true);
$task->meta = ['in_progress' => true];
expect($this->worker->processTask($task))->equals(false);
}
function testItWillResetTheInProgressFlagOnFail() {
$task = $this->createScheduledTask();
$this->worker->expects($this->once())
->method('processTaskStrategy')
->willThrowException(new \Exception('test error'));
try {
$this->worker->processTask($task);
$this->fail('An exception should be thrown');
} catch (\Exception $e) {
expect($e->getMessage())->equals('test error');
expect(empty($task->getMeta()['in_progress']))->equals(true);
}
}
function testItWillRescheduleTaskIfItIsRunningForTooLong() {
$this->worker->expects($this->once())
->method('processTaskStrategy')
->willReturn(true);
$task = $this->createScheduledTask();
$task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB
expect($this->worker->processTask($task))->equals(true);
$scheduled_at = $task->scheduled_at;
$task->updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at))
->subMinutes(MockSimpleWorker::TASK_RUN_TIMEOUT + 1);
expect($this->worker->processTask($task))->equals(false);
$task = ScheduledTask::findOne($task->id);
expect($scheduled_at < $task->scheduled_at)->true();
expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED);
expect(empty($task->getMeta()['in_progress']))->equals(true);
}
private function createScheduledTask() {
$task = ScheduledTask::create();
$task->type = MockSimpleWorker::TASK_TYPE;
$task->status = null;
$task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$task->save();
return $task;
}
function _after() {
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
}
}