diff --git a/lib/Cron/CronWorkerRunner.php b/lib/Cron/CronWorkerRunner.php index 8e76485dc3..49d81d996e 100644 --- a/lib/Cron/CronWorkerRunner.php +++ b/lib/Cron/CronWorkerRunner.php @@ -3,7 +3,6 @@ namespace MailPoet\Cron; use MailPoet\Entities\ScheduledTaskEntity; -use MailPoet\Models\ScheduledTask; use MailPoet\Newsletter\Sending\ScheduledTasksRepository; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; @@ -66,16 +65,10 @@ class CronWorkerRunner { try { foreach ($dueTasks as $task) { - $parisTask = ScheduledTask::getFromDoctrineEntity($task); - if ($parisTask) { - $this->prepareTask($worker, $parisTask); - } + $this->prepareTask($worker, $task); } foreach ($runningTasks as $task) { - $parisTask = ScheduledTask::getFromDoctrineEntity($task); - if ($parisTask) { - $this->processTask($worker, $parisTask); - } + $this->processTask($worker, $task); } } catch (\Exception $e) { if ($task && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) { @@ -95,21 +88,20 @@ class CronWorkerRunner { return $this->scheduledTasksRepository->findRunningByType($worker->getTaskType(), self::TASK_BATCH_SIZE); } - private function prepareTask(CronWorkerInterface $worker, ScheduledTask $task) { + private function prepareTask(CronWorkerInterface $worker, ScheduledTaskEntity $task) { // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($this->timer); - $doctrineTask = $this->convertTaskClassToDoctrine($task); - if ($doctrineTask) { - $prepareCompleted = $worker->prepareTaskStrategy($doctrineTask, $this->timer); - if ($prepareCompleted) { - $task->status = null; - $task->save(); - } + $prepareCompleted = $worker->prepareTaskStrategy($task, $this->timer); + + if ($prepareCompleted) { + $task->setStatus(null); + $this->scheduledTasksRepository->persist($task); + $this->scheduledTasksRepository->flush(); } } - private function processTask(CronWorkerInterface $worker, ScheduledTask $task) { + private function processTask(CronWorkerInterface $worker, ScheduledTaskEntity $task) { // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($this->timer); @@ -125,11 +117,7 @@ class CronWorkerRunner { $this->startProgress($task); try { - $completed = false; - $doctrineTask = $this->convertTaskClassToDoctrine($task); - if ($doctrineTask) { - $completed = $worker->processTaskStrategy($doctrineTask, $this->timer); - } + $completed = $worker->processTaskStrategy($task, $this->timer); } catch (\Exception $e) { $this->stopProgress($task); throw $e; @@ -144,18 +132,20 @@ class CronWorkerRunner { return (bool)$completed; } - private function rescheduleOutdated(ScheduledTask $task) { + private function rescheduleOutdated(ScheduledTaskEntity $task) { $currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); - $updated = strtotime((string)$task->updatedAt); - if ($updated === false) { + + if (empty($task->getUpdatedAt())) { // missing updatedAt, consider this task outdated (set year to 2000) and reschedule $updatedAt = Carbon::createFromDate(2000); + } else if (!$task->getUpdatedAt() instanceof Carbon) { + $updatedAt = new Carbon($task->getUpdatedAt()); } else { - $updatedAt = Carbon::createFromTimestamp($updated); + $updatedAt = $task->getUpdatedAt(); } // If the task is running for too long consider it stuck and reschedule - if (!empty($task->updatedAt) && $updatedAt->diffInMinutes($currentTime, false) > self::TASK_RUN_TIMEOUT) { + if (!empty($task->getUpdatedAt()) && $updatedAt->diffInMinutes($currentTime, false) > self::TASK_RUN_TIMEOUT) { $this->stopProgress($task); $this->cronWorkerScheduler->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); return true; @@ -163,39 +153,30 @@ class CronWorkerRunner { return false; } - private function isInProgress(ScheduledTask $task) { - if (!empty($task->inProgress)) { + private function isInProgress(ScheduledTaskEntity $task) { + if ($task->getInProgress()) { // Do not run multiple instances of the task return true; } return false; } - private function startProgress(ScheduledTask $task) { - $task->inProgress = true; - $task->save(); + private function startProgress(ScheduledTaskEntity $task) { + $task->setInProgress(true); + $this->scheduledTasksRepository->persist($task); + $this->scheduledTasksRepository->flush(); } - private function stopProgress(ScheduledTask $task) { - $task->inProgress = false; - $task->save(); + private function stopProgress(ScheduledTaskEntity $task) { + $task->setInProgress(false); + $this->scheduledTasksRepository->persist($task); + $this->scheduledTasksRepository->flush(); } - private function complete(ScheduledTask $task) { - $task->processedAt = $this->wp->currentTime('mysql'); - $task->status = ScheduledTask::STATUS_COMPLETED; - $task->save(); - } - - // temporary function to convert an ScheduledTask object to ScheduledTaskEntity while we don't migrate the rest of - // the code in this class to use Doctrine entities - private function convertTaskClassToDoctrine(ScheduledTask $parisTask): ?ScheduledTaskEntity { - $doctrineTask = $this->scheduledTasksRepository->findOneById($parisTask->id); - - if (!$doctrineTask instanceof ScheduledTaskEntity) { - return null; - } - - return $doctrineTask; + private function complete(ScheduledTaskEntity $task) { + $task->setProcessedAt(new Carbon()); + $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); + $this->scheduledTasksRepository->persist($task); + $this->scheduledTasksRepository->flush(); } } diff --git a/lib/Cron/Workers/Bounce.php b/lib/Cron/Workers/Bounce.php index 29b786111f..86ea4ea426 100644 --- a/lib/Cron/Workers/Bounce.php +++ b/lib/Cron/Workers/Bounce.php @@ -88,9 +88,9 @@ class Bounce extends SimpleWorker { return true; // mark completed } - $parisTask = ScheduledTask::getFromDoctrineEntity($task); + $parisTask = ScheduledTask::findOne($task->getId()); - if ($parisTask) { + if ($parisTask instanceof ScheduledTask) { $taskSubscribers = new TaskSubscribers($parisTask); foreach ($subscriberBatches as $subscribersToProcessIds) { diff --git a/lib/Entities/ScheduledTaskEntity.php b/lib/Entities/ScheduledTaskEntity.php index d8f6594a02..305571106b 100644 --- a/lib/Entities/ScheduledTaskEntity.php +++ b/lib/Entities/ScheduledTaskEntity.php @@ -68,6 +68,12 @@ class ScheduledTaskEntity { */ private $meta; + /** + * @ORM\Column(type="boolean", nullable=true) + * @var bool|null + */ + private $inProgress; + /** * @ORM\Column(type="integer", options={"default" : 0}) * @var int @@ -168,6 +174,20 @@ class ScheduledTaskEntity { $this->meta = $meta; } + /** + * @return bool|null + */ + public function getInProgress() { + return $this->inProgress; + } + + /** + * @param bool|null $inProgress + */ + public function setInProgress($inProgress) { + $this->inProgress = $inProgress; + } + public function getRescheduleCount(): int { return $this->rescheduleCount; } diff --git a/lib/Models/ScheduledTask.php b/lib/Models/ScheduledTask.php index f273c16af7..8d72dbf08f 100644 --- a/lib/Models/ScheduledTask.php +++ b/lib/Models/ScheduledTask.php @@ -181,16 +181,4 @@ class ScheduledTask extends Model { return $query->findMany(); } - - // temporary function to convert an ScheduledTaskEntity object to ScheduledTask while we don't migrate the rest of - // the code in this class to use Doctrine entities - public static function getFromDoctrineEntity(ScheduledTaskEntity $doctrineTask): ?ScheduledTask { - $parisTask = self::findOne($doctrineTask->getId()); - - if (!$parisTask instanceof ScheduledTask) { - return null; - } - - return $parisTask; - } } diff --git a/tests/integration/Cron/CronWorkerRunnerTest.php b/tests/integration/Cron/CronWorkerRunnerTest.php index 21a06aed62..a5c0bb89bc 100644 --- a/tests/integration/Cron/CronWorkerRunnerTest.php +++ b/tests/integration/Cron/CronWorkerRunnerTest.php @@ -7,10 +7,11 @@ use Codeception\Stub\Expected; use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronWorkerRunner; use MailPoet\Cron\Workers\SimpleWorkerMockImplementation; -use MailPoet\Models\ScheduledTask; +use MailPoet\Entities\ScheduledTaskEntity; +use MailPoet\Newsletter\Sending\ScheduledTasksRepository; +use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; -use MailPoetVendor\Idiorm\ORM; require_once __DIR__ . '/Workers/SimpleWorkerMockImplementation.php'; @@ -21,10 +22,13 @@ class CronWorkerRunnerTest extends \MailPoetTest { /** @var CronHelper */ private $cronHelper; + /** @var ScheduledTasksRepository */ + private $scheduledTasksRepository; + public function _before() { $this->cronWorkerRunner = $this->diContainer->get(CronWorkerRunner::class); $this->cronHelper = $this->diContainer->get(CronHelper::class); - ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); + $this->scheduledTasksRepository = $this->diContainer->get(ScheduledTasksRepository::class); } public function testItCanInitBeforeProcessing() { @@ -44,9 +48,9 @@ class CronWorkerRunnerTest extends \MailPoetTest { $task = $this->createScheduledTask(); $result = $this->cronWorkerRunner->run($worker); expect($result)->true(); - $scheduledTask = ScheduledTask::findOne($task->id); - assert($scheduledTask instanceof ScheduledTask); - expect($scheduledTask->status)->null(); + $scheduledTask = $this->scheduledTasksRepository->findOneById($task->getId()); + assert($scheduledTask instanceof ScheduledTaskEntity); + expect($scheduledTask->getStatus())->null(); } public function testItProcessesTask() { @@ -58,9 +62,9 @@ class CronWorkerRunnerTest extends \MailPoetTest { $task = $this->createRunningTask(); $result = $this->cronWorkerRunner->run($worker); expect($result)->true(); - $scheduledTask = ScheduledTask::findOne($task->id); - assert($scheduledTask instanceof ScheduledTask); - expect($scheduledTask->status)->same(ScheduledTask::STATUS_COMPLETED); + $scheduledTask = $this->scheduledTasksRepository->findOneById($task->getId()); + assert($scheduledTask instanceof ScheduledTaskEntity); + expect($scheduledTask->getStatus())->same(ScheduledTaskEntity::STATUS_COMPLETED); } public function testItFailsToProcessWithoutTasks() { @@ -98,9 +102,9 @@ class CronWorkerRunnerTest extends \MailPoetTest { $result = $this->cronWorkerRunner->run($worker); expect($result)->false(); - $scheduledTask = ScheduledTask::findOne(); - assert($scheduledTask instanceof ScheduledTask); - expect($scheduledTask->scheduledAt)->same($inOneWeek->format('Y-m-d H:i:s')); + $scheduledTask = $this->scheduledTasksRepository->findAll()[0]; + assert($scheduledTask instanceof ScheduledTaskEntity); + expect($scheduledTask->getScheduledAt())->same($inOneWeek); } public function testItWillRescheduleTaskIfItIsRunningForTooLong() { @@ -110,25 +114,29 @@ class CronWorkerRunnerTest extends \MailPoetTest { $worker->__construct(); $task = $this->createRunningTask(); - $task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB - assert($task instanceof ScheduledTask); + $task = $this->scheduledTasksRepository->findOneById($task->getId()); // make sure `updated_at` is set by the DB + assert($task instanceof ScheduledTaskEntity); $result = $this->cronWorkerRunner->run($worker); expect($result)->true(); - $scheduledAt = $task->scheduledAt; - $task->updatedAt = Carbon::createFromTimestamp((int)strtotime((string)$task->updatedAt)) - ->subMinutes(CronWorkerRunner::TASK_RUN_TIMEOUT + 1); - $task->save(); + $scheduledAt = $task->getScheduledAt(); + $newUpdatedAt = $task->getUpdatedAt()->subMinutes(CronWorkerRunner::TASK_RUN_TIMEOUT + 1); // @phpstan-ignore-line + $task->setUpdatedAt($newUpdatedAt); + $this->scheduledTasksRepository->persist($task); + $this->scheduledTasksRepository->flush(); $result = $this->cronWorkerRunner->run($worker); expect($result)->true(); - $task = ScheduledTask::findOne($task->id); - assert($task instanceof ScheduledTask); - expect($task->scheduledAt)->greaterThan($scheduledAt); - expect($task->status)->same(ScheduledTask::STATUS_SCHEDULED); - expect($task->inProgress)->isEmpty(); + $task = $this->scheduledTasksRepository->findOneById($task->getId()); + assert($task instanceof ScheduledTaskEntity); + expect($task->getScheduledAt())->greaterThan($scheduledAt); + expect($task->getStatus())->same(ScheduledTaskEntity::STATUS_SCHEDULED); + expect($task->getInProgress())->isEmpty(); + + // reset the state of the updatedAt field. this is needed to reset the state of TimestampListener::now otherwise it will impact other tests. + $task->getUpdatedAt()->addMinutes(CronWorkerRunner::TASK_RUN_TIMEOUT + 1); // @phpstan-ignore-line } public function testItWillRescheduleATaskIfItFails() { @@ -139,18 +147,18 @@ class CronWorkerRunnerTest extends \MailPoetTest { ]); $task = $this->createRunningTask(); - $scheduledAt = $task->scheduledAt; + $scheduledAt = $task->getScheduledAt(); try { $this->cronWorkerRunner->run($worker); $this->fail('An exception should be thrown'); } catch (\Exception $e) { expect($e->getMessage())->equals('test error'); - $task = ScheduledTask::findOne($task->id); - assert($task instanceof ScheduledTask); - expect($task->scheduledAt)->greaterThan($scheduledAt); - expect($task->status)->same(ScheduledTask::STATUS_SCHEDULED); - expect($task->rescheduleCount)->equals(1); - expect($task->inProgress)->isEmpty(); + $task = $this->scheduledTasksRepository->findOneById($task->getId()); + assert($task instanceof ScheduledTaskEntity); + expect($task->getScheduledAt())->greaterThan($scheduledAt); + expect($task->getStatus())->same(ScheduledTaskEntity::STATUS_SCHEDULED); + expect($task->getRescheduleCount())->equals(1); + expect($task->getInProgress())->isEmpty(); } } @@ -162,17 +170,17 @@ class CronWorkerRunnerTest extends \MailPoetTest { ]); $task = $this->createRunningTask(); - $scheduledAt = $task->scheduledAt; + $scheduledAt = $task->getScheduledAt(); try { $this->cronWorkerRunner->run($worker); $this->fail('An exception should be thrown'); } catch (\Exception $e) { expect($e->getCode())->same(CronHelper::DAEMON_EXECUTION_LIMIT_REACHED); - $task = ScheduledTask::findOne($task->id); - assert($task instanceof ScheduledTask); - expect($scheduledAt)->equals($task->scheduledAt); - expect($task->status)->null(); - expect($task->rescheduleCount)->equals(0); + $task = $this->scheduledTasksRepository->findOneById($task->getId()); + assert($task instanceof ScheduledTaskEntity); + expect($scheduledAt)->equals($task->getScheduledAt()); + expect($task->getStatus())->null(); + expect($task->getRescheduleCount())->equals(0); } } @@ -183,8 +191,7 @@ class CronWorkerRunnerTest extends \MailPoetTest { ]); $task = $this->createRunningTask(); - $task->inProgress = true; - $task->save(); + $task->setInProgress(true); $this->cronWorkerRunner->run($worker); } @@ -205,25 +212,25 @@ class CronWorkerRunnerTest extends \MailPoetTest { } } - private function createScheduledTask() { - $task = ScheduledTask::create(); - $task->type = SimpleWorkerMockImplementation::TASK_TYPE; - $task->status = ScheduledTask::STATUS_SCHEDULED; - $task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); - $task->save(); - return $task; + private function createScheduledTask(): ScheduledTaskEntity { + return $this->createTask(ScheduledTaskEntity::STATUS_SCHEDULED); } - private function createRunningTask() { - $task = ScheduledTask::create(); - $task->type = SimpleWorkerMockImplementation::TASK_TYPE; - $task->status = null; - $task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); - $task->save(); - return $task; + private function createRunningTask(): ScheduledTaskEntity { + return $this->createTask(null); + } + + private function createTask($status): ScheduledTaskEntity { + $factory = new ScheduledTaskFactory(); + + return $factory->create( + SimpleWorkerMockImplementation::TASK_TYPE, + $status, + Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')) + ); } public function _after() { - ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); + $this->truncateEntity(ScheduledTaskEntity::class); } }