Refactor CronWorkerRunner to use Doctrine instead of Paris

[MAILPOET-3844]
This commit is contained in:
Rodrigo Primo
2021-09-22 15:09:43 -03:00
committed by Veljko V
parent 2c78db9e04
commit 498ceabc8c
5 changed files with 115 additions and 119 deletions

View File

@@ -3,7 +3,6 @@
namespace MailPoet\Cron; namespace MailPoet\Cron;
use MailPoet\Entities\ScheduledTaskEntity; use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Models\ScheduledTask;
use MailPoet\Newsletter\Sending\ScheduledTasksRepository; use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
use MailPoet\WP\Functions as WPFunctions; use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Carbon\Carbon;
@@ -66,16 +65,10 @@ class CronWorkerRunner {
try { try {
foreach ($dueTasks as $task) { foreach ($dueTasks as $task) {
$parisTask = ScheduledTask::getFromDoctrineEntity($task); $this->prepareTask($worker, $task);
if ($parisTask) {
$this->prepareTask($worker, $parisTask);
}
} }
foreach ($runningTasks as $task) { foreach ($runningTasks as $task) {
$parisTask = ScheduledTask::getFromDoctrineEntity($task); $this->processTask($worker, $task);
if ($parisTask) {
$this->processTask($worker, $parisTask);
}
} }
} catch (\Exception $e) { } catch (\Exception $e) {
if ($task && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) { if ($task && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) {
@@ -95,21 +88,20 @@ class CronWorkerRunner {
return $this->scheduledTasksRepository->findRunningByType($worker->getTaskType(), self::TASK_BATCH_SIZE); return $this->scheduledTasksRepository->findRunningByType($worker->getTaskType(), self::TASK_BATCH_SIZE);
} }
private function prepareTask(CronWorkerInterface $worker, ScheduledTask $task) { private function prepareTask(CronWorkerInterface $worker, ScheduledTaskEntity $task) {
// abort if execution limit is reached // abort if execution limit is reached
$this->cronHelper->enforceExecutionLimit($this->timer); $this->cronHelper->enforceExecutionLimit($this->timer);
$doctrineTask = $this->convertTaskClassToDoctrine($task); $prepareCompleted = $worker->prepareTaskStrategy($task, $this->timer);
if ($doctrineTask) {
$prepareCompleted = $worker->prepareTaskStrategy($doctrineTask, $this->timer); if ($prepareCompleted) {
if ($prepareCompleted) { $task->setStatus(null);
$task->status = null; $this->scheduledTasksRepository->persist($task);
$task->save(); $this->scheduledTasksRepository->flush();
}
} }
} }
private function processTask(CronWorkerInterface $worker, ScheduledTask $task) { private function processTask(CronWorkerInterface $worker, ScheduledTaskEntity $task) {
// abort if execution limit is reached // abort if execution limit is reached
$this->cronHelper->enforceExecutionLimit($this->timer); $this->cronHelper->enforceExecutionLimit($this->timer);
@@ -125,11 +117,7 @@ class CronWorkerRunner {
$this->startProgress($task); $this->startProgress($task);
try { try {
$completed = false; $completed = $worker->processTaskStrategy($task, $this->timer);
$doctrineTask = $this->convertTaskClassToDoctrine($task);
if ($doctrineTask) {
$completed = $worker->processTaskStrategy($doctrineTask, $this->timer);
}
} catch (\Exception $e) { } catch (\Exception $e) {
$this->stopProgress($task); $this->stopProgress($task);
throw $e; throw $e;
@@ -144,18 +132,20 @@ class CronWorkerRunner {
return (bool)$completed; return (bool)$completed;
} }
private function rescheduleOutdated(ScheduledTask $task) { private function rescheduleOutdated(ScheduledTaskEntity $task) {
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
$updated = strtotime((string)$task->updatedAt);
if ($updated === false) { if (empty($task->getUpdatedAt())) {
// missing updatedAt, consider this task outdated (set year to 2000) and reschedule // missing updatedAt, consider this task outdated (set year to 2000) and reschedule
$updatedAt = Carbon::createFromDate(2000); $updatedAt = Carbon::createFromDate(2000);
} else if (!$task->getUpdatedAt() instanceof Carbon) {
$updatedAt = new Carbon($task->getUpdatedAt());
} else { } else {
$updatedAt = Carbon::createFromTimestamp($updated); $updatedAt = $task->getUpdatedAt();
} }
// If the task is running for too long consider it stuck and reschedule // If the task is running for too long consider it stuck and reschedule
if (!empty($task->updatedAt) && $updatedAt->diffInMinutes($currentTime, false) > self::TASK_RUN_TIMEOUT) { if (!empty($task->getUpdatedAt()) && $updatedAt->diffInMinutes($currentTime, false) > self::TASK_RUN_TIMEOUT) {
$this->stopProgress($task); $this->stopProgress($task);
$this->cronWorkerScheduler->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); $this->cronWorkerScheduler->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT);
return true; return true;
@@ -163,39 +153,30 @@ class CronWorkerRunner {
return false; return false;
} }
private function isInProgress(ScheduledTask $task) { private function isInProgress(ScheduledTaskEntity $task) {
if (!empty($task->inProgress)) { if ($task->getInProgress()) {
// Do not run multiple instances of the task // Do not run multiple instances of the task
return true; return true;
} }
return false; return false;
} }
private function startProgress(ScheduledTask $task) { private function startProgress(ScheduledTaskEntity $task) {
$task->inProgress = true; $task->setInProgress(true);
$task->save(); $this->scheduledTasksRepository->persist($task);
$this->scheduledTasksRepository->flush();
} }
private function stopProgress(ScheduledTask $task) { private function stopProgress(ScheduledTaskEntity $task) {
$task->inProgress = false; $task->setInProgress(false);
$task->save(); $this->scheduledTasksRepository->persist($task);
$this->scheduledTasksRepository->flush();
} }
private function complete(ScheduledTask $task) { private function complete(ScheduledTaskEntity $task) {
$task->processedAt = $this->wp->currentTime('mysql'); $task->setProcessedAt(new Carbon());
$task->status = ScheduledTask::STATUS_COMPLETED; $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
$task->save(); $this->scheduledTasksRepository->persist($task);
} $this->scheduledTasksRepository->flush();
// temporary function to convert an ScheduledTask object to ScheduledTaskEntity while we don't migrate the rest of
// the code in this class to use Doctrine entities
private function convertTaskClassToDoctrine(ScheduledTask $parisTask): ?ScheduledTaskEntity {
$doctrineTask = $this->scheduledTasksRepository->findOneById($parisTask->id);
if (!$doctrineTask instanceof ScheduledTaskEntity) {
return null;
}
return $doctrineTask;
} }
} }

View File

@@ -88,9 +88,9 @@ class Bounce extends SimpleWorker {
return true; // mark completed return true; // mark completed
} }
$parisTask = ScheduledTask::getFromDoctrineEntity($task); $parisTask = ScheduledTask::findOne($task->getId());
if ($parisTask) { if ($parisTask instanceof ScheduledTask) {
$taskSubscribers = new TaskSubscribers($parisTask); $taskSubscribers = new TaskSubscribers($parisTask);
foreach ($subscriberBatches as $subscribersToProcessIds) { foreach ($subscriberBatches as $subscribersToProcessIds) {

View File

@@ -68,6 +68,12 @@ class ScheduledTaskEntity {
*/ */
private $meta; private $meta;
/**
* @ORM\Column(type="boolean", nullable=true)
* @var bool|null
*/
private $inProgress;
/** /**
* @ORM\Column(type="integer", options={"default" : 0}) * @ORM\Column(type="integer", options={"default" : 0})
* @var int * @var int
@@ -168,6 +174,20 @@ class ScheduledTaskEntity {
$this->meta = $meta; $this->meta = $meta;
} }
/**
* @return bool|null
*/
public function getInProgress() {
return $this->inProgress;
}
/**
* @param bool|null $inProgress
*/
public function setInProgress($inProgress) {
$this->inProgress = $inProgress;
}
public function getRescheduleCount(): int { public function getRescheduleCount(): int {
return $this->rescheduleCount; return $this->rescheduleCount;
} }

View File

@@ -181,16 +181,4 @@ class ScheduledTask extends Model {
return $query->findMany(); return $query->findMany();
} }
// temporary function to convert an ScheduledTaskEntity object to ScheduledTask while we don't migrate the rest of
// the code in this class to use Doctrine entities
public static function getFromDoctrineEntity(ScheduledTaskEntity $doctrineTask): ?ScheduledTask {
$parisTask = self::findOne($doctrineTask->getId());
if (!$parisTask instanceof ScheduledTask) {
return null;
}
return $parisTask;
}
} }

View File

@@ -7,10 +7,11 @@ use Codeception\Stub\Expected;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Cron\CronWorkerRunner; use MailPoet\Cron\CronWorkerRunner;
use MailPoet\Cron\Workers\SimpleWorkerMockImplementation; use MailPoet\Cron\Workers\SimpleWorkerMockImplementation;
use MailPoet\Models\ScheduledTask; use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory;
use MailPoet\WP\Functions as WPFunctions; use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Carbon\Carbon;
use MailPoetVendor\Idiorm\ORM;
require_once __DIR__ . '/Workers/SimpleWorkerMockImplementation.php'; require_once __DIR__ . '/Workers/SimpleWorkerMockImplementation.php';
@@ -21,10 +22,13 @@ class CronWorkerRunnerTest extends \MailPoetTest {
/** @var CronHelper */ /** @var CronHelper */
private $cronHelper; private $cronHelper;
/** @var ScheduledTasksRepository */
private $scheduledTasksRepository;
public function _before() { public function _before() {
$this->cronWorkerRunner = $this->diContainer->get(CronWorkerRunner::class); $this->cronWorkerRunner = $this->diContainer->get(CronWorkerRunner::class);
$this->cronHelper = $this->diContainer->get(CronHelper::class); $this->cronHelper = $this->diContainer->get(CronHelper::class);
ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); $this->scheduledTasksRepository = $this->diContainer->get(ScheduledTasksRepository::class);
} }
public function testItCanInitBeforeProcessing() { public function testItCanInitBeforeProcessing() {
@@ -44,9 +48,9 @@ class CronWorkerRunnerTest extends \MailPoetTest {
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
$result = $this->cronWorkerRunner->run($worker); $result = $this->cronWorkerRunner->run($worker);
expect($result)->true(); expect($result)->true();
$scheduledTask = ScheduledTask::findOne($task->id); $scheduledTask = $this->scheduledTasksRepository->findOneById($task->getId());
assert($scheduledTask instanceof ScheduledTask); assert($scheduledTask instanceof ScheduledTaskEntity);
expect($scheduledTask->status)->null(); expect($scheduledTask->getStatus())->null();
} }
public function testItProcessesTask() { public function testItProcessesTask() {
@@ -58,9 +62,9 @@ class CronWorkerRunnerTest extends \MailPoetTest {
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$result = $this->cronWorkerRunner->run($worker); $result = $this->cronWorkerRunner->run($worker);
expect($result)->true(); expect($result)->true();
$scheduledTask = ScheduledTask::findOne($task->id); $scheduledTask = $this->scheduledTasksRepository->findOneById($task->getId());
assert($scheduledTask instanceof ScheduledTask); assert($scheduledTask instanceof ScheduledTaskEntity);
expect($scheduledTask->status)->same(ScheduledTask::STATUS_COMPLETED); expect($scheduledTask->getStatus())->same(ScheduledTaskEntity::STATUS_COMPLETED);
} }
public function testItFailsToProcessWithoutTasks() { public function testItFailsToProcessWithoutTasks() {
@@ -98,9 +102,9 @@ class CronWorkerRunnerTest extends \MailPoetTest {
$result = $this->cronWorkerRunner->run($worker); $result = $this->cronWorkerRunner->run($worker);
expect($result)->false(); expect($result)->false();
$scheduledTask = ScheduledTask::findOne(); $scheduledTask = $this->scheduledTasksRepository->findAll()[0];
assert($scheduledTask instanceof ScheduledTask); assert($scheduledTask instanceof ScheduledTaskEntity);
expect($scheduledTask->scheduledAt)->same($inOneWeek->format('Y-m-d H:i:s')); expect($scheduledTask->getScheduledAt())->same($inOneWeek);
} }
public function testItWillRescheduleTaskIfItIsRunningForTooLong() { public function testItWillRescheduleTaskIfItIsRunningForTooLong() {
@@ -110,25 +114,29 @@ class CronWorkerRunnerTest extends \MailPoetTest {
$worker->__construct(); $worker->__construct();
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$task = ScheduledTask::findOne($task->id); // make sure `updated_at` is set by the DB $task = $this->scheduledTasksRepository->findOneById($task->getId()); // make sure `updated_at` is set by the DB
assert($task instanceof ScheduledTask); assert($task instanceof ScheduledTaskEntity);
$result = $this->cronWorkerRunner->run($worker); $result = $this->cronWorkerRunner->run($worker);
expect($result)->true(); expect($result)->true();
$scheduledAt = $task->scheduledAt; $scheduledAt = $task->getScheduledAt();
$task->updatedAt = Carbon::createFromTimestamp((int)strtotime((string)$task->updatedAt)) $newUpdatedAt = $task->getUpdatedAt()->subMinutes(CronWorkerRunner::TASK_RUN_TIMEOUT + 1); // @phpstan-ignore-line
->subMinutes(CronWorkerRunner::TASK_RUN_TIMEOUT + 1); $task->setUpdatedAt($newUpdatedAt);
$task->save(); $this->scheduledTasksRepository->persist($task);
$this->scheduledTasksRepository->flush();
$result = $this->cronWorkerRunner->run($worker); $result = $this->cronWorkerRunner->run($worker);
expect($result)->true(); expect($result)->true();
$task = ScheduledTask::findOne($task->id); $task = $this->scheduledTasksRepository->findOneById($task->getId());
assert($task instanceof ScheduledTask); assert($task instanceof ScheduledTaskEntity);
expect($task->scheduledAt)->greaterThan($scheduledAt); expect($task->getScheduledAt())->greaterThan($scheduledAt);
expect($task->status)->same(ScheduledTask::STATUS_SCHEDULED); expect($task->getStatus())->same(ScheduledTaskEntity::STATUS_SCHEDULED);
expect($task->inProgress)->isEmpty(); expect($task->getInProgress())->isEmpty();
// reset the state of the updatedAt field. this is needed to reset the state of TimestampListener::now otherwise it will impact other tests.
$task->getUpdatedAt()->addMinutes(CronWorkerRunner::TASK_RUN_TIMEOUT + 1); // @phpstan-ignore-line
} }
public function testItWillRescheduleATaskIfItFails() { public function testItWillRescheduleATaskIfItFails() {
@@ -139,18 +147,18 @@ class CronWorkerRunnerTest extends \MailPoetTest {
]); ]);
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$scheduledAt = $task->scheduledAt; $scheduledAt = $task->getScheduledAt();
try { try {
$this->cronWorkerRunner->run($worker); $this->cronWorkerRunner->run($worker);
$this->fail('An exception should be thrown'); $this->fail('An exception should be thrown');
} catch (\Exception $e) { } catch (\Exception $e) {
expect($e->getMessage())->equals('test error'); expect($e->getMessage())->equals('test error');
$task = ScheduledTask::findOne($task->id); $task = $this->scheduledTasksRepository->findOneById($task->getId());
assert($task instanceof ScheduledTask); assert($task instanceof ScheduledTaskEntity);
expect($task->scheduledAt)->greaterThan($scheduledAt); expect($task->getScheduledAt())->greaterThan($scheduledAt);
expect($task->status)->same(ScheduledTask::STATUS_SCHEDULED); expect($task->getStatus())->same(ScheduledTaskEntity::STATUS_SCHEDULED);
expect($task->rescheduleCount)->equals(1); expect($task->getRescheduleCount())->equals(1);
expect($task->inProgress)->isEmpty(); expect($task->getInProgress())->isEmpty();
} }
} }
@@ -162,17 +170,17 @@ class CronWorkerRunnerTest extends \MailPoetTest {
]); ]);
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$scheduledAt = $task->scheduledAt; $scheduledAt = $task->getScheduledAt();
try { try {
$this->cronWorkerRunner->run($worker); $this->cronWorkerRunner->run($worker);
$this->fail('An exception should be thrown'); $this->fail('An exception should be thrown');
} catch (\Exception $e) { } catch (\Exception $e) {
expect($e->getCode())->same(CronHelper::DAEMON_EXECUTION_LIMIT_REACHED); expect($e->getCode())->same(CronHelper::DAEMON_EXECUTION_LIMIT_REACHED);
$task = ScheduledTask::findOne($task->id); $task = $this->scheduledTasksRepository->findOneById($task->getId());
assert($task instanceof ScheduledTask); assert($task instanceof ScheduledTaskEntity);
expect($scheduledAt)->equals($task->scheduledAt); expect($scheduledAt)->equals($task->getScheduledAt());
expect($task->status)->null(); expect($task->getStatus())->null();
expect($task->rescheduleCount)->equals(0); expect($task->getRescheduleCount())->equals(0);
} }
} }
@@ -183,8 +191,7 @@ class CronWorkerRunnerTest extends \MailPoetTest {
]); ]);
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$task->inProgress = true; $task->setInProgress(true);
$task->save();
$this->cronWorkerRunner->run($worker); $this->cronWorkerRunner->run($worker);
} }
@@ -205,25 +212,25 @@ class CronWorkerRunnerTest extends \MailPoetTest {
} }
} }
private function createScheduledTask() { private function createScheduledTask(): ScheduledTaskEntity {
$task = ScheduledTask::create(); return $this->createTask(ScheduledTaskEntity::STATUS_SCHEDULED);
$task->type = SimpleWorkerMockImplementation::TASK_TYPE;
$task->status = ScheduledTask::STATUS_SCHEDULED;
$task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'));
$task->save();
return $task;
} }
private function createRunningTask() { private function createRunningTask(): ScheduledTaskEntity {
$task = ScheduledTask::create(); return $this->createTask(null);
$task->type = SimpleWorkerMockImplementation::TASK_TYPE; }
$task->status = null;
$task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); private function createTask($status): ScheduledTaskEntity {
$task->save(); $factory = new ScheduledTaskFactory();
return $task;
return $factory->create(
SimpleWorkerMockImplementation::TASK_TYPE,
$status,
Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'))
);
} }
public function _after() { public function _after() {
ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); $this->truncateEntity(ScheduledTaskEntity::class);
} }
} }