diff --git a/lib/Cron/CronWorkerInterface.php b/lib/Cron/CronWorkerInterface.php new file mode 100644 index 0000000000..ab4029aefe --- /dev/null +++ b/lib/Cron/CronWorkerInterface.php @@ -0,0 +1,38 @@ +timer = microtime(true); + $this->cron_helper = $cron_helper; + $this->cron_worker_scheduler = $cron_worker_scheduler; + $this->wp = $wp; + } + + function run(CronWorkerInterface $worker) { + // abort if execution limit is reached + $this->cron_helper->enforceExecutionLimit($this->timer); + $due_tasks = $this->getDueTasks($worker); + $running_tasks = $this->getRunningTasks($worker); + + if (!$worker->checkProcessingRequirements()) { + foreach (array_merge($due_tasks, $running_tasks) as $task) { + $task->delete(); + } + return false; + } + + $worker->init(); + + if (!$due_tasks && !$running_tasks) { + if ($worker->scheduleAutomatically()) { + $this->cron_worker_scheduler->schedule($worker->getTaskType(), $worker->getNextRunDate()); + } + return false; + } + + $task = null; + try { + foreach ($due_tasks as $i => $task) { + $this->prepareTask($worker, $task); + } + foreach ($running_tasks as $i => $task) { + $this->processTask($worker, $task); + } + } catch (\Exception $e) { + if ($task && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) { + $task->rescheduleProgressively(); + } + throw $e; + } + + return true; + } + + private function getDueTasks(CronWorkerInterface $worker) { + return ScheduledTask::findDueByType($worker->getTaskType(), self::TASK_BATCH_SIZE); + } + + private function getRunningTasks(CronWorkerInterface $worker) { + return ScheduledTask::findRunningByType($worker->getTaskType(), self::TASK_BATCH_SIZE); + } + + private function prepareTask(CronWorkerInterface $worker, ScheduledTask $task) { + // abort if execution limit is reached + $this->cron_helper->enforceExecutionLimit($this->timer); + + $prepare_completed = $worker->prepareTaskStrategy($task, $this->timer); + if ($prepare_completed) { + $task->status = null; + $task->save(); + } + } + + private function processTask(CronWorkerInterface $worker, ScheduledTask $task) { + // abort if execution limit is reached + $this->cron_helper->enforceExecutionLimit($this->timer); + + if (!$worker->supportsMultipleInstances()) { + if ($this->rescheduleOutdated($task)) { + return false; + } + if ($this->isInProgress($task)) { + return false; + } + } + + $this->startProgress($task); + + try { + $completed = $worker->processTaskStrategy($task, $this->timer); + } catch (\Exception $e) { + $this->stopProgress($task); + throw $e; + } + + if ($completed) { + $this->complete($task); + } + + $this->stopProgress($task); + + return (bool)$completed; + } + + private function rescheduleOutdated(ScheduledTask $task) { + $current_time = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); + $updated_at = Carbon::createFromTimestamp(strtotime((string)$task->updated_at)); + + // If the task is running for too long consider it stuck and reschedule + if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) { + $this->stopProgress($task); + $this->cron_worker_scheduler->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); + return true; + } + return false; + } + + private function isInProgress(ScheduledTask $task) { + if (!empty($task->in_progress)) { + // Do not run multiple instances of the task + return true; + } + return false; + } + + private function startProgress(ScheduledTask $task) { + $task->in_progress = true; + $task->save(); + } + + private function stopProgress(ScheduledTask $task) { + $task->in_progress = false; + $task->save(); + } + + private function complete(ScheduledTask $task) { + $task->processed_at = $this->wp->currentTime('mysql'); + $task->status = ScheduledTask::STATUS_COMPLETED; + $task->save(); + } +} diff --git a/lib/Cron/Daemon.php b/lib/Cron/Daemon.php index b30c42dcf0..2b2c3cba6a 100644 --- a/lib/Cron/Daemon.php +++ b/lib/Cron/Daemon.php @@ -7,15 +7,23 @@ use MailPoet\Cron\Workers\WorkersFactory; class Daemon { public $timer; - /** @var WorkersFactory */ - private $workers_factory; - /** @var CronHelper */ private $cron_helper; - function __construct(WorkersFactory $workers_factory, CronHelper $cron_helper) { + /** @var CronWorkerRunner */ + private $cron_worker_runner; + + /** @var WorkersFactory */ + private $workers_factory; + + function __construct( + CronHelper $cron_helper, + CronWorkerRunner $cron_worker_runner, + WorkersFactory $workers_factory + ) { $this->timer = microtime(true); $this->workers_factory = $workers_factory; + $this->cron_worker_runner = $cron_worker_runner; $this->cron_helper = $cron_helper; } @@ -26,7 +34,11 @@ class Daemon { $errors = []; foreach ($this->getWorkers() as $worker) { try { - $worker->process($this->timer); + if ($worker instanceof CronWorkerInterface) { + $this->cron_worker_runner->run($worker); + } else { + $worker->process($this->timer); // BC for workers not implementing CronWorkerInterface + } } catch (\Exception $e) { $worker_class_name_parts = explode('\\', get_class($worker)); $errors[] = [ @@ -46,9 +58,9 @@ class Daemon { private function getWorkers() { yield $this->workers_factory->createMigrationWorker(); - yield $this->workers_factory->createStatsNotificationsWorker(); - yield $this->workers_factory->createScheduleWorker(); - yield $this->workers_factory->createQueueWorker(); + yield $this->workers_factory->createStatsNotificationsWorker(); // not CronWorkerInterface compatible + yield $this->workers_factory->createScheduleWorker(); // not CronWorkerInterface compatible + yield $this->workers_factory->createQueueWorker(); // not CronWorkerInterface compatible yield $this->workers_factory->createSendingServiceKeyCheckWorker(); yield $this->workers_factory->createPremiumKeyCheckWorker(); yield $this->workers_factory->createBounceWorker(); diff --git a/lib/Cron/Workers/SendingQueue/Migration.php b/lib/Cron/Workers/SendingQueue/Migration.php index b2f76eecdd..123e6493f7 100644 --- a/lib/Cron/Workers/SendingQueue/Migration.php +++ b/lib/Cron/Workers/SendingQueue/Migration.php @@ -35,8 +35,10 @@ class Migration extends SimpleWorker { ($unmigrated_queues_count == 0 && count($unmigrated_queue_subscribers) == 0) ) { - // nothing to migrate - $this->complete($task); + // nothing to migrate, complete task + $task->processed_at = WPFunctions::get()->currentTime('mysql'); + $task->status = ScheduledTask::STATUS_COMPLETED; + $task->save(); $this->resumeSending(); return false; } diff --git a/lib/Cron/Workers/SimpleWorker.php b/lib/Cron/Workers/SimpleWorker.php index 7f9d846fbd..741eadbbea 100644 --- a/lib/Cron/Workers/SimpleWorker.php +++ b/lib/Cron/Workers/SimpleWorker.php @@ -3,21 +3,23 @@ namespace MailPoet\Cron\Workers; use MailPoet\Cron\CronHelper; +use MailPoet\Cron\CronWorkerInterface; +use MailPoet\Cron\CronWorkerRunner; use MailPoet\Cron\CronWorkerScheduler; use MailPoet\DI\ContainerWrapper; use MailPoet\Models\ScheduledTask; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; -abstract class SimpleWorker { - private $wp; +abstract class SimpleWorker implements CronWorkerInterface { const TASK_TYPE = null; - const TASK_BATCH_SIZE = 5; const AUTOMATIC_SCHEDULING = true; - const SUPPORT_MULTIPLE_INSTANCES = true; - const TASK_RUN_TIMEOUT = 120; - const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5; + + public $timer; + + /** @var WPFunctions */ + private $wp; /** @var CronHelper */ protected $cron_helper; @@ -35,6 +37,18 @@ abstract class SimpleWorker { $this->cron_worker_scheduler = ContainerWrapper::getInstance()->get(CronWorkerScheduler::class); } + function getTaskType() { + return static::TASK_TYPE; + } + + function supportsMultipleInstances() { + return static::SUPPORT_MULTIPLE_INSTANCES; + } + + function schedule() { + $this->cron_worker_scheduler->schedule(static::TASK_TYPE, $this->getNextRunDate()); + } + function checkProcessingRequirements() { return true; } @@ -42,95 +56,6 @@ abstract class SimpleWorker { function init() { } - function process($timer = false) { - $timer = $timer ?: microtime(true); - - // abort if execution limit is reached - $this->cron_helper->enforceExecutionLimit($timer); - $scheduled_tasks = $this->getDueTasks(); - $running_tasks = $this->getRunningTasks(); - - if (!$this->checkProcessingRequirements()) { - foreach (array_merge($scheduled_tasks, $running_tasks) as $task) { - $task->delete(); - } - return false; - } - - $this->init(); - - - if (!$scheduled_tasks && !$running_tasks) { - if (static::AUTOMATIC_SCHEDULING) { - $this->schedule(); - } - return false; - } - - $task = null; - try { - foreach ($scheduled_tasks as $i => $task) { - $this->prepareTask($task, $timer); - } - foreach ($running_tasks as $i => $task) { - $this->processTask($task, $timer); - } - } catch (\Exception $e) { - if ($task && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) { - $task->rescheduleProgressively(); - } - throw $e; - } - - return true; - } - - function schedule() { - $this->cron_worker_scheduler->schedule(static::TASK_TYPE, static::getNextRunDate()); - } - - function prepareTask(ScheduledTask $task, $timer) { - // abort if execution limit is reached - $this->cron_helper->enforceExecutionLimit($timer); - - $prepare_completed = $this->prepareTaskStrategy($task, $timer); - if ($prepare_completed) { - $task->status = null; - $task->save(); - } - } - - function processTask(ScheduledTask $task, $timer) { - // abort if execution limit is reached - $this->cron_helper->enforceExecutionLimit($timer); - - if (!static::SUPPORT_MULTIPLE_INSTANCES) { - if ($this->rescheduleOutdated($task)) { - return false; - } - if ($this->isInProgress($task)) { - return false; - } - } - - $this->startProgress($task); - - try { - $completed = $this->processTaskStrategy($task, $timer); - } catch (\Exception $e) { - $this->stopProgress($task); - throw $e; - } - - if ($completed) { - $this->complete($task); - } - - $this->stopProgress($task); - - return (bool)$completed; - } - function prepareTaskStrategy(ScheduledTask $task, $timer) { return true; } @@ -139,66 +64,19 @@ abstract class SimpleWorker { return true; } - function complete(ScheduledTask $task) { - $task->processed_at = $this->wp->currentTime('mysql'); - $task->status = ScheduledTask::STATUS_COMPLETED; - $task->save(); - } - - function reschedule(ScheduledTask $task, $timeout) { - $this->cron_worker_scheduler->reschedule($task, $timeout); - } - - private function isInProgress(ScheduledTask $task) { - if (!empty($task->in_progress)) { - // Do not run multiple instances of the task - return true; - } - return false; - } - - private function startProgress(ScheduledTask $task) { - $task->in_progress = true; - $task->save(); - } - - private function stopProgress(ScheduledTask $task) { - $task->in_progress = false; - $task->save(); - } - - private function rescheduleOutdated(ScheduledTask $task) { - $current_time = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); - $updated_at = Carbon::createFromTimestamp(strtotime($task->updated_at)); - - // If the task is running for too long consider it stuck and reschedule - if (!empty($task->updated_at) && $updated_at->diffInMinutes($current_time, false) > self::TASK_RUN_TIMEOUT) { - $this->stopProgress($task); - $this->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); - return true; - } - - return false; - } - function getNextRunDate() { - $wp = new WPFunctions(); - $date = Carbon::createFromTimestamp($wp->currentTime('timestamp')); - // Random day of the next week + // random day of the next week + $date = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $date->setISODate((int)$date->format('o'), ((int)$date->format('W')) + 1, mt_rand(1, 7)); $date->startOfDay(); return $date; } - function getDueTasks() { - return ScheduledTask::findDueByType(static::TASK_TYPE, self::TASK_BATCH_SIZE); + function scheduleAutomatically() { + return static::AUTOMATIC_SCHEDULING; } - function getRunningTasks() { - return ScheduledTask::findRunningByType(static::TASK_TYPE, self::TASK_BATCH_SIZE); - } - - function getCompletedTasks() { - return ScheduledTask::findCompletedByType(static::TASK_TYPE, self::TASK_BATCH_SIZE); + protected function getCompletedTasks() { + return ScheduledTask::findCompletedByType(static::TASK_TYPE, CronWorkerRunner::TASK_BATCH_SIZE); } } diff --git a/lib/DI/ContainerConfigurator.php b/lib/DI/ContainerConfigurator.php index fa7e2564c7..4b87c5e677 100644 --- a/lib/DI/ContainerConfigurator.php +++ b/lib/DI/ContainerConfigurator.php @@ -127,6 +127,7 @@ class ContainerConfigurator implements IContainerConfigurator { // Cron $container->autowire(\MailPoet\Cron\CronHelper::class)->setPublic(true); $container->autowire(\MailPoet\Cron\CronTrigger::class)->setPublic(true); + $container->autowire(\MailPoet\Cron\CronWorkerRunner::class)->setPublic(true); $container->autowire(\MailPoet\Cron\CronWorkerScheduler::class)->setPublic(true); $container->autowire(\MailPoet\Cron\Daemon::class)->setPublic(true); $container->autowire(\MailPoet\Cron\DaemonHttpRunner::class)->setPublic(true);