Change SimpleWorker::prepareTaskStrategy() to use Doctrine

This commit changes the method prepareTaskStrategy() in the class
SimpleWorker and all its child classes to use ScheduledTaskEntity
instead of ScheduledTask.

[MAILPOET-2996]
This commit is contained in:
Rodrigo Primo
2021-09-28 16:12:59 -03:00
committed by Veljko V
parent 7d87b042e8
commit 57c80ea763
9 changed files with 90 additions and 63 deletions

View File

@@ -2,6 +2,7 @@
namespace MailPoet\Cron; namespace MailPoet\Cron;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Models\ScheduledTask; use MailPoet\Models\ScheduledTask;
interface CronWorkerInterface { interface CronWorkerInterface {
@@ -20,11 +21,11 @@ interface CronWorkerInterface {
public function init(); public function init();
/** /**
* @param ScheduledTask $task * @param ScheduledTaskEntity $task
* @param float $timer * @param float $timer
* @return bool * @return bool
*/ */
public function prepareTaskStrategy(ScheduledTask $task, $timer); public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer);
/** /**
* @param ScheduledTask $task * @param ScheduledTask $task

View File

@@ -68,13 +68,13 @@ class CronWorkerRunner {
$parisTask = null; $parisTask = null;
foreach ($dueTasks as $task) { foreach ($dueTasks as $task) {
$parisTask = $this->convertTaskClass($task); $parisTask = ScheduledTask::getFromDoctrineEntity($task);
if ($parisTask) { if ($parisTask) {
$this->prepareTask($worker, $parisTask); $this->prepareTask($worker, $parisTask);
} }
} }
foreach ($runningTasks as $task) { foreach ($runningTasks as $task) {
$parisTask = $this->convertTaskClass($task); $parisTask = ScheduledTask::getFromDoctrineEntity($task);
if ($parisTask) { if ($parisTask) {
$this->processTask($worker, $parisTask); $this->processTask($worker, $parisTask);
} }
@@ -101,12 +101,15 @@ class CronWorkerRunner {
// abort if execution limit is reached // abort if execution limit is reached
$this->cronHelper->enforceExecutionLimit($this->timer); $this->cronHelper->enforceExecutionLimit($this->timer);
$prepareCompleted = $worker->prepareTaskStrategy($task, $this->timer); $doctrineTask = $this->convertTaskClassToDoctrine($task);
if ($doctrineTask) {
$prepareCompleted = $worker->prepareTaskStrategy($doctrineTask, $this->timer);
if ($prepareCompleted) { if ($prepareCompleted) {
$task->status = null; $task->status = null;
$task->save(); $task->save();
} }
} }
}
private function processTask(CronWorkerInterface $worker, ScheduledTask $task) { private function processTask(CronWorkerInterface $worker, ScheduledTask $task) {
// abort if execution limit is reached // abort if execution limit is reached
@@ -182,15 +185,15 @@ class CronWorkerRunner {
$task->save(); $task->save();
} }
// temporary function to convert an ScheduledTaskEntity object to ScheduledTask while we don't migrate the rest of // temporary function to convert an ScheduledTask object to ScheduledTaskEntity while we don't migrate the rest of
// the code in this class to use Doctrine entities // the code in this class to use Doctrine entities
private function convertTaskClass(ScheduledTaskEntity $doctrineTask): ?ScheduledTask { private function convertTaskClassToDoctrine(ScheduledTask $parisTask): ?ScheduledTaskEntity {
$parisTask = ScheduledTask::findOne($doctrineTask->getId()); $doctrineTask = $this->scheduledTasksRepository->findOneById($parisTask->id);
if (!$parisTask instanceof ScheduledTask) { if (!$doctrineTask instanceof ScheduledTaskEntity) {
return null; return null;
} }
return $parisTask; return $doctrineTask;
} }
} }

View File

@@ -70,11 +70,11 @@ class Bounce extends SimpleWorker {
return $this->bridge->isMailpoetSendingServiceEnabled(); return $this->bridge->isMailpoetSendingServiceEnabled();
} }
public function prepareTaskStrategy(ScheduledTask $task, $timer) { public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
BounceTask::prepareSubscribers($task); BounceTask::prepareSubscribers($task);
if (!ScheduledTaskSubscriber::getUnprocessedCount($task->id)) { if (!ScheduledTaskSubscriber::getUnprocessedCount($task->getId())) {
ScheduledTaskSubscriber::where('task_id', $task->id)->deleteMany(); ScheduledTaskSubscriber::where('task_id', $task->getId())->deleteMany();
return false; return false;
} }
return true; return true;

View File

@@ -3,6 +3,7 @@
namespace MailPoet\Cron\Workers\SendingQueue; namespace MailPoet\Cron\Workers\SendingQueue;
use MailPoet\Cron\Workers\SimpleWorker; use MailPoet\Cron\Workers\SimpleWorker;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Mailer\MailerLog; use MailPoet\Mailer\MailerLog;
use MailPoet\Models\ScheduledTask; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber; use MailPoet\Models\ScheduledTaskSubscriber;
@@ -20,7 +21,7 @@ class Migration extends SimpleWorker {
return empty($completedTasks); return empty($completedTasks);
} }
public function prepareTaskStrategy(ScheduledTask $task, $timer) { public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
$unmigratedColumns = $this->checkUnmigratedColumnsExist(); $unmigratedColumns = $this->checkUnmigratedColumnsExist();
$unmigratedQueuesCount = 0; $unmigratedQueuesCount = 0;
$unmigratedQueueSubscribers = []; $unmigratedQueueSubscribers = [];
@@ -35,9 +36,10 @@ class Migration extends SimpleWorker {
&& count($unmigratedQueueSubscribers) == 0) && count($unmigratedQueueSubscribers) == 0)
) { ) {
// nothing to migrate, complete task // nothing to migrate, complete task
$task->processedAt = WPFunctions::get()->currentTime('mysql'); $task->setProcessedAt(Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')));
$task->status = ScheduledTask::STATUS_COMPLETED; $task->setStatus(ScheduledTask::STATUS_COMPLETED);
$task->save(); $this->scheduledTasksRepository->persist($task);
$this->scheduledTasksRepository->flush();
$this->resumeSending(); $this->resumeSending();
return false; return false;
} }

View File

@@ -7,6 +7,7 @@ use MailPoet\Cron\CronWorkerInterface;
use MailPoet\Cron\CronWorkerRunner; use MailPoet\Cron\CronWorkerRunner;
use MailPoet\Cron\CronWorkerScheduler; use MailPoet\Cron\CronWorkerScheduler;
use MailPoet\DI\ContainerWrapper; use MailPoet\DI\ContainerWrapper;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Models\ScheduledTask; use MailPoet\Models\ScheduledTask;
use MailPoet\Newsletter\Sending\ScheduledTasksRepository; use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
use MailPoet\WP\Functions as WPFunctions; use MailPoet\WP\Functions as WPFunctions;
@@ -68,7 +69,7 @@ abstract class SimpleWorker implements CronWorkerInterface {
public function init() { public function init() {
} }
public function prepareTaskStrategy(ScheduledTask $task, $timer) { public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
return true; return true;
} }

View File

@@ -191,4 +191,16 @@ class ScheduledTask extends Model {
return $query->findMany(); return $query->findMany();
} }
// temporary function to convert an ScheduledTaskEntity object to ScheduledTask while we don't migrate the rest of
// the code in this class to use Doctrine entities
public static function getFromDoctrineEntity(ScheduledTaskEntity $doctrineTask): ?ScheduledTask {
$parisTask = self::findOne($doctrineTask->getId());
if (!$parisTask instanceof ScheduledTask) {
return null;
}
return $parisTask;
}
} }

View File

@@ -2,12 +2,12 @@
namespace MailPoet\Tasks; namespace MailPoet\Tasks;
use MailPoet\Models\ScheduledTask; use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Models\ScheduledTaskSubscriber; use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
class Bounce { class Bounce {
public static function prepareSubscribers(ScheduledTask $task) { public static function prepareSubscribers(ScheduledTaskEntity $task) {
// Prepare subscribers on the DB side for performance reasons // Prepare subscribers on the DB side for performance reasons
Subscriber::rawExecute( Subscriber::rawExecute(
'INSERT IGNORE INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . ' 'INSERT IGNORE INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
@@ -17,7 +17,7 @@ class Bounce {
WHERE s.`deleted_at` IS NULL WHERE s.`deleted_at` IS NULL
AND s.`status` IN (?, ?)', AND s.`status` IN (?, ?)',
[ [
$task->id, $task->getId(),
ScheduledTaskSubscriber::STATUS_UNPROCESSED, ScheduledTaskSubscriber::STATUS_UNPROCESSED,
Subscriber::STATUS_SUBSCRIBED, Subscriber::STATUS_SUBSCRIBED,
Subscriber::STATUS_UNCONFIRMED, Subscriber::STATUS_UNCONFIRMED,

View File

@@ -20,6 +20,7 @@ use MailPoet\Settings\SettingsController;
use MailPoet\Settings\SettingsRepository; use MailPoet\Settings\SettingsRepository;
use MailPoet\Statistics\StatisticsBouncesRepository; use MailPoet\Statistics\StatisticsBouncesRepository;
use MailPoet\Subscribers\SubscribersRepository; use MailPoet\Subscribers\SubscribersRepository;
use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory;
use MailPoet\WP\Functions as WPFunctions; use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Carbon\Carbon;
@@ -36,6 +37,9 @@ class BounceTest extends \MailPoetTest {
/** @var SubscribersRepository */ /** @var SubscribersRepository */
private $subscribersRepository; private $subscribersRepository;
/** @var ScheduledTaskFactory */
private $scheduledTaskFactory;
public function _before() { public function _before() {
parent::_before(); parent::_before();
$this->cleanup(); $this->cleanup();
@@ -45,6 +49,7 @@ class BounceTest extends \MailPoetTest {
'good_address@example.com', 'good_address@example.com',
]; ];
$this->subscribersRepository = $this->diContainer->get(SubscribersRepository::class); $this->subscribersRepository = $this->diContainer->get(SubscribersRepository::class);
$this->scheduledTaskFactory = new ScheduledTaskFactory();
foreach ($this->emails as $email) { foreach ($this->emails as $email) {
$subscriber = new SubscriberEntity(); $subscriber = new SubscriberEntity();
@@ -93,47 +98,47 @@ class BounceTest extends \MailPoetTest {
// 1st run - subscribers will be processed // 1st run - subscribers will be processed
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
$this->worker->prepareTaskStrategy($task, microtime(true)); $this->worker->prepareTaskStrategy($task, microtime(true));
expect(ScheduledTaskSubscriber::where('task_id', $task->id)->findMany())->notEmpty(); expect(ScheduledTaskSubscriber::where('task_id', $task->getId())->findMany())->notEmpty();
// 2nd run - nothing more to process, ScheduledTaskSubscriber will be cleaned up // 2nd run - nothing more to process, ScheduledTaskSubscriber will be cleaned up
$this->truncateEntity(SubscriberEntity::class); $this->truncateEntity(SubscriberEntity::class);
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
$this->worker->prepareTaskStrategy($task, microtime(true)); $this->worker->prepareTaskStrategy($task, microtime(true));
expect(ScheduledTaskSubscriber::where('task_id', $task->id)->findMany())->isEmpty(); expect(ScheduledTaskSubscriber::where('task_id', $task->getId())->findMany())->isEmpty();
} }
public function testItPreparesTask() { public function testItPreparesTask() {
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
expect(ScheduledTaskSubscriber::getUnprocessedCount($task->id))->isEmpty(); expect(ScheduledTaskSubscriber::getUnprocessedCount($task->getId()))->isEmpty();
$result = $this->worker->prepareTaskStrategy($task, microtime(true)); $result = $this->worker->prepareTaskStrategy($task, microtime(true));
expect($result)->true(); expect($result)->true();
expect(ScheduledTaskSubscriber::getUnprocessedCount($task->id))->notEmpty(); expect(ScheduledTaskSubscriber::getUnprocessedCount($task->getId()))->notEmpty();
} }
public function testItDeletesAllSubscribersIfThereAreNoSubscribersToProcessWhenProcessingTask() { public function testItDeletesAllSubscribersIfThereAreNoSubscribersToProcessWhenProcessingTask() {
// prepare subscribers // prepare subscribers
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
$this->worker->prepareTaskStrategy($task, microtime(true)); $this->worker->prepareTaskStrategy($task, microtime(true));
expect(ScheduledTaskSubscriber::where('task_id', $task->id)->findMany())->notEmpty(); expect(ScheduledTaskSubscriber::where('task_id', $task->getId())->findMany())->notEmpty();
// process - no subscribers found, ScheduledTaskSubscriber will be cleaned up // process - no subscribers found, ScheduledTaskSubscriber will be cleaned up
$this->truncateEntity(SubscriberEntity::class); $this->truncateEntity(SubscriberEntity::class);
$task = $this->createScheduledTask(); $task = ScheduledTask::getFromDoctrineEntity($this->createScheduledTask());
$this->worker->processTaskStrategy($task, microtime(true)); $this->worker->processTaskStrategy($task, microtime(true)); // @phpstan-ignore-line
expect(ScheduledTaskSubscriber::where('task_id', $task->id)->findMany())->isEmpty(); expect(ScheduledTaskSubscriber::where('task_id', $task->id)->findMany())->isEmpty(); // @phpstan-ignore-line
} }
public function testItProcessesTask() { public function testItProcessesTask() {
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$this->worker->prepareTaskStrategy($task, microtime(true)); $this->worker->prepareTaskStrategy($task, microtime(true));
expect(ScheduledTaskSubscriber::getUnprocessedCount($task->id))->notEmpty(); expect(ScheduledTaskSubscriber::getUnprocessedCount($task->getId()))->notEmpty();
$this->worker->processTaskStrategy($task, microtime(true)); $this->worker->processTaskStrategy(ScheduledTask::getFromDoctrineEntity($task), microtime(true)); // @phpstan-ignore-line
expect(ScheduledTaskSubscriber::getProcessedCount($task->id))->notEmpty(); expect(ScheduledTaskSubscriber::getProcessedCount($task->getId()))->notEmpty();
} }
public function testItSetsSubscriberStatusAsBounced() { public function testItSetsSubscriberStatusAsBounced() {
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$this->worker->processEmails($task, $this->emails); $this->worker->processEmails(ScheduledTask::getFromDoctrineEntity($task), $this->emails);
$subscribers = $this->subscribersRepository->findAll(); $subscribers = $this->subscribersRepository->findAll();
@@ -153,11 +158,12 @@ class BounceTest extends \MailPoetTest {
$this->createScheduledTaskSubscriber($oldSendingTask, $subscriber); $this->createScheduledTaskSubscriber($oldSendingTask, $subscriber);
// create previous bounce task // create previous bounce task
$previousBounceTask = $this->createRunningTask(); $previousBounceTask = $this->createRunningTask();
$previousBounceTask->status = ScheduledTask::STATUS_COMPLETED; $previousBounceTask->setStatus(ScheduledTask::STATUS_COMPLETED);
$previousBounceTask->createdAt = Carbon::now()->subDays(6); $previousBounceTask->setCreatedAt(Carbon::now()->subDays(6));
$previousBounceTask->scheduledAt = Carbon::now()->subDays(4); $previousBounceTask->setScheduledAt(Carbon::now()->subDays(4));
$previousBounceTask->updatedAt = Carbon::now()->subDays(4); $previousBounceTask->setUpdatedAt(Carbon::now()->subDays(4));
$previousBounceTask->save(); $this->entityManager->persist($previousBounceTask);
$this->entityManager->flush();
// create data that should be used for the current bounce task run // create data that should be used for the current bounce task run
$newsletter = $this->createNewsletter(); $newsletter = $this->createNewsletter();
$sendingTask = $this->createSendingTask() ; $sendingTask = $this->createSendingTask() ;
@@ -169,7 +175,7 @@ class BounceTest extends \MailPoetTest {
$this->entityManager->flush(); $this->entityManager->flush();
$this->entityManager->clear(); $this->entityManager->clear();
// run the code // run the code
$this->worker->processEmails($this->createRunningTask(), $this->emails); $this->worker->processEmails(ScheduledTask::getFromDoctrineEntity($this->createRunningTask()), $this->emails);
// test it // test it
$statisticsRepository = $this->diContainer->get(StatisticsBouncesRepository::class); $statisticsRepository = $this->diContainer->get(StatisticsBouncesRepository::class);
$statistics = $statisticsRepository->findAll(); $statistics = $statisticsRepository->findAll();
@@ -187,22 +193,20 @@ class BounceTest extends \MailPoetTest {
); );
} }
private function createScheduledTask() { private function createScheduledTask(): ScheduledTaskEntity {
$task = ScheduledTask::create(); return $this->scheduledTaskFactory->create(
$task->type = 'bounce'; 'bounce',
$task->status = ScheduledTask::STATUS_SCHEDULED; ScheduledTaskEntity::STATUS_SCHEDULED,
$task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'))
$task->save(); );
return $task;
} }
private function createRunningTask() { private function createRunningTask(): ScheduledTaskEntity {
$task = ScheduledTask::create(); return $this->scheduledTaskFactory->create(
$task->type = 'bounce'; 'bounce',
$task->status = null; null,
$task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'))
$task->save(); );
return $task;
} }
private function createNewsletter(): NewsletterEntity { private function createNewsletter(): NewsletterEntity {

View File

@@ -4,6 +4,7 @@ namespace MailPoet\Test\Cron\Workers;
use Codeception\Stub; use Codeception\Stub;
use MailPoet\Cron\Workers\SendingQueue\Migration; use MailPoet\Cron\Workers\SendingQueue\Migration;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Mailer\MailerLog; use MailPoet\Mailer\MailerLog;
use MailPoet\Models\ScheduledTask; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber; use MailPoet\Models\ScheduledTaskSubscriber;
@@ -11,6 +12,7 @@ use MailPoet\Models\SendingQueue;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
use MailPoet\Settings\SettingsRepository; use MailPoet\Settings\SettingsRepository;
use MailPoet\Tasks\Sending as SendingTask; use MailPoet\Tasks\Sending as SendingTask;
use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory;
use MailPoet\WP\Functions as WPFunctions; use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Carbon\Carbon;
use MailPoetVendor\Idiorm\ORM; use MailPoetVendor\Idiorm\ORM;
@@ -23,6 +25,8 @@ class MigrationTest extends \MailPoetTest {
public $queueRunning; public $queueRunning;
public $subscriberProcessed; public $subscriberProcessed;
public $subscriberToProcess; public $subscriberToProcess;
/** @var ScheduledTaskFactory */
private $scheduledTaskFactory;
/** @var Migration */ /** @var Migration */
private $worker; private $worker;
@@ -48,6 +52,7 @@ class MigrationTest extends \MailPoetTest {
$this->queueCompleted = $this->createSendingQueue(SendingQueue::STATUS_COMPLETED); $this->queueCompleted = $this->createSendingQueue(SendingQueue::STATUS_COMPLETED);
$this->queueScheduled = $this->createSendingQueue(SendingQueue::STATUS_SCHEDULED); $this->queueScheduled = $this->createSendingQueue(SendingQueue::STATUS_SCHEDULED);
$this->scheduledTaskFactory = new ScheduledTaskFactory();
$this->worker = new Migration(); $this->worker = new Migration();
} }
@@ -82,7 +87,7 @@ class MigrationTest extends \MailPoetTest {
SendingQueue::deleteMany(); SendingQueue::deleteMany();
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
$this->worker->prepareTaskStrategy($task, microtime(true)); $this->worker->prepareTaskStrategy($task, microtime(true));
$task = ScheduledTask::findOne($task->id); $task = ScheduledTask::findOne($task->getId());
assert($task instanceof ScheduledTask); assert($task instanceof ScheduledTask);
expect($task->status)->equals(ScheduledTask::STATUS_COMPLETED); expect($task->status)->equals(ScheduledTask::STATUS_COMPLETED);
} }
@@ -137,12 +142,11 @@ class MigrationTest extends \MailPoetTest {
} }
private function createScheduledTask() { private function createScheduledTask() {
$task = ScheduledTask::create(); return $this->scheduledTaskFactory->create(
$task->type = Migration::TASK_TYPE; Migration::TASK_TYPE,
$task->status = ScheduledTask::STATUS_SCHEDULED; ScheduledTaskEntity::STATUS_SCHEDULED,
$task->scheduledAt = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp')); Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'))
$task->save(); );
return $task;
} }
private function createRunningTask() { private function createRunningTask() {