From 12c526e12044b3da099d319a68d500df3f3a048c Mon Sep 17 00:00:00 2001 From: wxa Date: Mon, 30 Sep 2019 08:02:09 +0300 Subject: [PATCH] Merge SingleInstanceSimpleWorker into SimpleWorker [MAILPOET-2385] --- lib/Cron/Workers/InactiveSubscribers.php | 3 +- lib/Cron/Workers/SimpleWorker.php | 67 ++++++++++++++++- .../Workers/SingleInstanceSimpleWorker.php | 74 ------------------- lib/Cron/Workers/WooCommerceSync.php | 3 +- .../SimpleWorkerMockImplementation.php | 1 + .../Cron/Workers/SimpleWorkerTest.php | 51 +++++++++++++ ...InstanceSimpleWorkerMockImplementation.php | 6 -- .../SingleInstanceSimpleWorkerTest.php | 71 ------------------ 8 files changed, 119 insertions(+), 157 deletions(-) delete mode 100644 lib/Cron/Workers/SingleInstanceSimpleWorker.php delete mode 100644 tests/integration/Cron/Workers/SingleInstanceSimpleWorkerMockImplementation.php delete mode 100644 tests/integration/Cron/Workers/SingleInstanceSimpleWorkerTest.php diff --git a/lib/Cron/Workers/InactiveSubscribers.php b/lib/Cron/Workers/InactiveSubscribers.php index 50d17862a4..e0cf50d966 100644 --- a/lib/Cron/Workers/InactiveSubscribers.php +++ b/lib/Cron/Workers/InactiveSubscribers.php @@ -6,9 +6,10 @@ use MailPoet\Models\ScheduledTask; use MailPoet\Settings\SettingsController; use MailPoet\Subscribers\InactiveSubscribersController; -class InactiveSubscribers extends SingleInstanceSimpleWorker { +class InactiveSubscribers extends SimpleWorker { const TASK_TYPE = 'inactive_subscribers'; const BATCH_SIZE = 1000; + const SUPPORT_MULTIPLE_INSTANCES = false; /** @var InactiveSubscribersController */ private $inactive_subscribers_controller; diff --git a/lib/Cron/Workers/SimpleWorker.php b/lib/Cron/Workers/SimpleWorker.php index 9ee921d310..add2a79fd3 100644 --- a/lib/Cron/Workers/SimpleWorker.php +++ b/lib/Cron/Workers/SimpleWorker.php @@ -14,6 +14,10 @@ abstract class SimpleWorker { const TASK_BATCH_SIZE = 5; const AUTOMATIC_SCHEDULING = true; + const SUPPORT_MULTIPLE_INSTANCES = true; + const TASK_RUN_TIMEOUT = 120; + const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5; + function __construct($timer = false) { if (static::TASK_TYPE === null) { throw new \Exception('Constant TASK_TYPE is not defined on subclass ' . get_class($this)); @@ -97,12 +101,34 @@ abstract class SimpleWorker { // abort if execution limit is reached CronHelper::enforceExecutionLimit($this->timer); - if ($this->processTaskStrategy($task)) { - $this->complete($task); - return true; + if (!static::SUPPORT_MULTIPLE_INSTANCES) { + if ($this->isInProgress($task)) { + return false; + } + if ($this->rescheduleOutdated($task)) { + return false; + } + $this->startProgress($task); } - return false; + try { + $completed = $this->processTaskStrategy($task); + } catch (\Exception $e) { + if (!static::SUPPORT_MULTIPLE_INSTANCES) { + $this->stopProgress($task); + } + throw $e; + } + + if ($completed) { + $this->complete($task); + } + + if (!static::SUPPORT_MULTIPLE_INSTANCES) { + $this->stopProgress($task); + } + + return (bool)$completed; } function processTaskStrategy(ScheduledTask $task) { @@ -123,6 +149,39 @@ abstract class SimpleWorker { $task->save(); } + private function isInProgress(ScheduledTask $task) { + $meta = $task->getMeta(); + if (!empty($meta['in_progress'])) { + // Do not run multiple instances of the task + return true; + } + return false; + } + + private function startProgress(ScheduledTask $task) { + $task->meta = array_merge($task->getMeta(), ['in_progress' => true]); + $task->save(); + } + + private function stopProgress(ScheduledTask $task) { + $task->meta = array_merge($task->getMeta(), ['in_progress' => null]); + $task->save(); + } + + private function rescheduleOutdated(ScheduledTask $task) { + $current_time = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); + $updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at)); + + // If the task is running for too long consider it stuck and reschedule + if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) { + $this->stopProgress($task); + $this->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); + return true; + } + + return false; + } + static function getNextRunDate() { $wp = new WPFunctions(); $date = Carbon::createFromTimestamp($wp->currentTime('timestamp')); diff --git a/lib/Cron/Workers/SingleInstanceSimpleWorker.php b/lib/Cron/Workers/SingleInstanceSimpleWorker.php deleted file mode 100644 index e4ce1f8d89..0000000000 --- a/lib/Cron/Workers/SingleInstanceSimpleWorker.php +++ /dev/null @@ -1,74 +0,0 @@ -timer); - - if ($this->isInProgress($task)) { - return false; - } - if ($this->rescheduleOutdated($task)) { - return false; - } - - $this->startProgress($task); - - try { - $completed = $this->processTaskStrategy($task); - } catch (\Exception $e) { - $this->stopProgress($task); - throw $e; - } - - if ($completed) { - $this->complete($task); - } - $this->stopProgress($task); - - return (bool)$completed; - } - - private function isInProgress(ScheduledTask $task) { - $meta = $task->getMeta(); - if (!empty($meta['in_progress'])) { - // Do not run multiple instances of the task - return true; - } - return false; - } - - private function startProgress(ScheduledTask $task) { - $task->meta = array_merge($task->getMeta(), ['in_progress' => true]); - $task->save(); - } - - private function stopProgress(ScheduledTask $task) { - $task->meta = array_merge($task->getMeta(), ['in_progress' => null]); - $task->save(); - } - - private function rescheduleOutdated(ScheduledTask $task) { - $current_time = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); - $updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at)); - - // If the task is running for too long consider it stuck and reschedule - if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) { - $this->stopProgress($task); - $this->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); - return true; - } - - return false; - } -} diff --git a/lib/Cron/Workers/WooCommerceSync.php b/lib/Cron/Workers/WooCommerceSync.php index 5b665f786b..3b43a1f61c 100644 --- a/lib/Cron/Workers/WooCommerceSync.php +++ b/lib/Cron/Workers/WooCommerceSync.php @@ -7,8 +7,9 @@ use MailPoet\Segments\WooCommerce as WooCommerceSegment; use MailPoet\WooCommerce\Helper as WooCommerceHelper; use MailPoet\WP\Functions as WPFunctions; -class WooCommerceSync extends SingleInstanceSimpleWorker { +class WooCommerceSync extends SimpleWorker { const TASK_TYPE = 'woocommerce_sync'; + const SUPPORT_MULTIPLE_INSTANCES = false; /** @var WooCommerceSegment */ private $woocommerce_segment; diff --git a/tests/integration/Cron/Workers/SimpleWorkerMockImplementation.php b/tests/integration/Cron/Workers/SimpleWorkerMockImplementation.php index ac964f292c..14f5d47b9c 100644 --- a/tests/integration/Cron/Workers/SimpleWorkerMockImplementation.php +++ b/tests/integration/Cron/Workers/SimpleWorkerMockImplementation.php @@ -3,6 +3,7 @@ namespace MailPoet\Cron\Workers; class SimpleWorkerMockImplementation extends SimpleWorker { const TASK_TYPE = 'mock_simple_worker'; + const SUPPORT_MULTIPLE_INSTANCES = false; function init() { // to be mocked diff --git a/tests/integration/Cron/Workers/SimpleWorkerTest.php b/tests/integration/Cron/Workers/SimpleWorkerTest.php index c838f338dc..04cc1de9b2 100644 --- a/tests/integration/Cron/Workers/SimpleWorkerTest.php +++ b/tests/integration/Cron/Workers/SimpleWorkerTest.php @@ -205,6 +205,57 @@ class SimpleWorkerTest extends \MailPoetTest { } } + function testItWillNotRunInMultipleInstances() { + $worker = $this->getMockBuilder(MockSimpleWorker::class) + ->setMethods(['processTaskStrategy']) + ->getMock(); + $worker->expects($this->once()) + ->method('processTaskStrategy') + ->willReturn(true); + $task = $this->createRunningTask(); + expect(empty($task->getMeta()['in_progress']))->equals(true); + expect($worker->processTask($task))->equals(true); + $task->meta = ['in_progress' => true]; + expect($worker->processTask($task))->equals(false); + } + + function testItWillResetTheInProgressFlagOnFail() { + $worker = $this->getMockBuilder(MockSimpleWorker::class) + ->setMethods(['processTaskStrategy']) + ->getMock(); + $worker->expects($this->once()) + ->method('processTaskStrategy') + ->willThrowException(new \Exception('test error')); + $task = $this->createRunningTask(); + try { + $worker->processTask($task); + $this->fail('An exception should be thrown'); + } catch (\Exception $e) { + expect($e->getMessage())->equals('test error'); + expect(empty($task->getMeta()['in_progress']))->equals(true); + } + } + + function testItWillRescheduleTaskIfItIsRunningForTooLong() { + $worker = $this->getMockBuilder(MockSimpleWorker::class) + ->setMethods(['processTaskStrategy']) + ->getMock(); + $worker->expects($this->once()) + ->method('processTaskStrategy') + ->willReturn(true); + $task = $this->createRunningTask(); + $task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB + expect($worker->processTask($task))->equals(true); + $scheduled_at = $task->scheduled_at; + $task->updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at)) + ->subMinutes(MockSimpleWorker::TASK_RUN_TIMEOUT + 1); + expect($worker->processTask($task))->equals(false); + $task = ScheduledTask::findOne($task->id); + expect($scheduled_at < $task->scheduled_at)->true(); + expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED); + expect(empty($task->getMeta()['in_progress']))->equals(true); + } + function testItCalculatesNextRunDateWithinNextWeekBoundaries() { $current_date = Carbon::createFromTimestamp(current_time('timestamp')); $next_run_date = MockSimpleWorker::getNextRunDate(); diff --git a/tests/integration/Cron/Workers/SingleInstanceSimpleWorkerMockImplementation.php b/tests/integration/Cron/Workers/SingleInstanceSimpleWorkerMockImplementation.php deleted file mode 100644 index a4701801ce..0000000000 --- a/tests/integration/Cron/Workers/SingleInstanceSimpleWorkerMockImplementation.php +++ /dev/null @@ -1,6 +0,0 @@ -worker = $this->getMockBuilder(MockSimpleWorker::class) - ->setMethods(['processTaskStrategy']) - ->getMock(); - } - - function testItWillNotRunInMultipleInstances() { - $this->worker->expects($this->once()) - ->method('processTaskStrategy') - ->willReturn(true); - $task = $this->createScheduledTask(); - expect(empty($task->getMeta()['in_progress']))->equals(true); - expect($this->worker->processTask($task))->equals(true); - $task->meta = ['in_progress' => true]; - expect($this->worker->processTask($task))->equals(false); - } - - function testItWillResetTheInProgressFlagOnFail() { - $task = $this->createScheduledTask(); - $this->worker->expects($this->once()) - ->method('processTaskStrategy') - ->willThrowException(new \Exception('test error')); - try { - $this->worker->processTask($task); - $this->fail('An exception should be thrown'); - } catch (\Exception $e) { - expect($e->getMessage())->equals('test error'); - expect(empty($task->getMeta()['in_progress']))->equals(true); - } - } - - function testItWillRescheduleTaskIfItIsRunningForTooLong() { - $this->worker->expects($this->once()) - ->method('processTaskStrategy') - ->willReturn(true); - $task = $this->createScheduledTask(); - $task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB - expect($this->worker->processTask($task))->equals(true); - $scheduled_at = $task->scheduled_at; - $task->updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at)) - ->subMinutes(MockSimpleWorker::TASK_RUN_TIMEOUT + 1); - expect($this->worker->processTask($task))->equals(false); - $task = ScheduledTask::findOne($task->id); - expect($scheduled_at < $task->scheduled_at)->true(); - expect($task->status)->equals(ScheduledTask::STATUS_SCHEDULED); - expect(empty($task->getMeta()['in_progress']))->equals(true); - } - - private function createScheduledTask() { - $task = ScheduledTask::create(); - $task->type = MockSimpleWorker::TASK_TYPE; - $task->status = null; - $task->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); - $task->save(); - return $task; - } - - function _after() { - \ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); - } -}