Pause running tasks that have deleted or trashed segments
[MAILPOET-3418]
This commit is contained in:
committed by
Veljko V
parent
6183258489
commit
b3da3ec1bc
@ -9,6 +9,7 @@ use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
|||||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
||||||
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
|
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
|
||||||
use MailPoet\Entities\NewsletterEntity;
|
use MailPoet\Entities\NewsletterEntity;
|
||||||
|
use MailPoet\Entities\ScheduledTaskEntity;
|
||||||
use MailPoet\Logging\LoggerFactory;
|
use MailPoet\Logging\LoggerFactory;
|
||||||
use MailPoet\Mailer\MailerError;
|
use MailPoet\Mailer\MailerError;
|
||||||
use MailPoet\Mailer\MailerLog;
|
use MailPoet\Mailer\MailerLog;
|
||||||
@ -18,6 +19,7 @@ use MailPoet\Models\ScheduledTask as ScheduledTaskModel;
|
|||||||
use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel;
|
use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel;
|
||||||
use MailPoet\Models\Subscriber as SubscriberModel;
|
use MailPoet\Models\Subscriber as SubscriberModel;
|
||||||
use MailPoet\Newsletter\NewslettersRepository;
|
use MailPoet\Newsletter\NewslettersRepository;
|
||||||
|
use MailPoet\Segments\SegmentsRepository;
|
||||||
use MailPoet\Segments\SubscribersFinder;
|
use MailPoet\Segments\SubscribersFinder;
|
||||||
use MailPoet\Tasks\Sending as SendingTask;
|
use MailPoet\Tasks\Sending as SendingTask;
|
||||||
use MailPoet\Tasks\Subscribers\BatchIterator;
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
||||||
@ -52,6 +54,9 @@ class SendingQueue {
|
|||||||
/** @var SubscribersFinder */
|
/** @var SubscribersFinder */
|
||||||
private $subscribersFinder;
|
private $subscribersFinder;
|
||||||
|
|
||||||
|
/** @var SegmentsRepository */
|
||||||
|
private $segmentsRepository;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
SendingErrorHandler $errorHandler,
|
SendingErrorHandler $errorHandler,
|
||||||
StatsNotificationsScheduler $statsNotificationsScheduler,
|
StatsNotificationsScheduler $statsNotificationsScheduler,
|
||||||
@ -59,6 +64,7 @@ class SendingQueue {
|
|||||||
NewslettersRepository $newslettersRepository,
|
NewslettersRepository $newslettersRepository,
|
||||||
CronHelper $cronHelper,
|
CronHelper $cronHelper,
|
||||||
SubscribersFinder $subscriberFinder,
|
SubscribersFinder $subscriberFinder,
|
||||||
|
SegmentsRepository $segmentsRepository,
|
||||||
$mailerTask = false,
|
$mailerTask = false,
|
||||||
$newsletterTask = false
|
$newsletterTask = false
|
||||||
) {
|
) {
|
||||||
@ -67,6 +73,7 @@ class SendingQueue {
|
|||||||
$this->subscribersFinder = $subscriberFinder;
|
$this->subscribersFinder = $subscriberFinder;
|
||||||
$this->mailerTask = ($mailerTask) ? $mailerTask : new MailerTask();
|
$this->mailerTask = ($mailerTask) ? $mailerTask : new MailerTask();
|
||||||
$this->newsletterTask = ($newsletterTask) ? $newsletterTask : new NewsletterTask();
|
$this->newsletterTask = ($newsletterTask) ? $newsletterTask : new NewsletterTask();
|
||||||
|
$this->segmentsRepository = $segmentsRepository;
|
||||||
$this->mailerMetaInfo = new MetaInfo;
|
$this->mailerMetaInfo = new MetaInfo;
|
||||||
$wp = new WPFunctions;
|
$wp = new WPFunctions;
|
||||||
$this->batchSize = $wp->applyFilters('mailpoet_cron_worker_sending_queue_batch_size', self::BATCH_SIZE);
|
$this->batchSize = $wp->applyFilters('mailpoet_cron_worker_sending_queue_batch_size', self::BATCH_SIZE);
|
||||||
@ -111,6 +118,17 @@ class SendingQueue {
|
|||||||
$this->mailerTask->configureMailer($newsletter);
|
$this->mailerTask->configureMailer($newsletter);
|
||||||
// get newsletter segments
|
// get newsletter segments
|
||||||
$newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter);
|
$newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter);
|
||||||
|
// Pause task in case some of related segments was deleted or trashed
|
||||||
|
if ($newsletterSegmentsIds && !$this->checkDeletedSegments($newsletterSegmentsIds)) {
|
||||||
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
|
||||||
|
'pause task in sending queue due deleted or trashed segment',
|
||||||
|
['task_id' => $queue->taskId]
|
||||||
|
);
|
||||||
|
$queue->status = ScheduledTaskEntity::STATUS_PAUSED;
|
||||||
|
$queue->save();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// get subscribers
|
// get subscribers
|
||||||
$subscriberBatches = new BatchIterator($queue->taskId, $this->batchSize);
|
$subscriberBatches = new BatchIterator($queue->taskId, $this->batchSize);
|
||||||
foreach ($subscriberBatches as $subscribersToProcessIds) {
|
foreach ($subscriberBatches as $subscribersToProcessIds) {
|
||||||
@ -280,6 +298,29 @@ class SendingQueue {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether some of segments was deleted or trashed
|
||||||
|
* @param int[] $segmentIds
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
private function checkDeletedSegments(array $segmentIds) {
|
||||||
|
if (count($segmentIds) === 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
$segmentIds = array_unique($segmentIds);
|
||||||
|
$segments = $this->segmentsRepository->findBy(['id' => $segmentIds]);
|
||||||
|
// Some segment was deleted from DB
|
||||||
|
if (count($segmentIds) > count($segments)) {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
foreach ($segments as $segment) {
|
||||||
|
if ($segment->getDeletedAt() !== null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
private function processSendResult(
|
private function processSendResult(
|
||||||
SendingTask $sendingTask,
|
SendingTask $sendingTask,
|
||||||
$sendResult,
|
$sendResult,
|
||||||
|
@ -12,7 +12,6 @@ use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker;
|
|||||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
||||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
||||||
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
|
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
|
||||||
use MailPoet\DI\ContainerWrapper;
|
|
||||||
use MailPoet\Entities\NewsletterEntity;
|
use MailPoet\Entities\NewsletterEntity;
|
||||||
use MailPoet\Entities\NewsletterLinkEntity;
|
use MailPoet\Entities\NewsletterLinkEntity;
|
||||||
use MailPoet\Entities\NewsletterPostEntity;
|
use MailPoet\Entities\NewsletterPostEntity;
|
||||||
@ -40,6 +39,7 @@ use MailPoet\Newsletter\Links\Links;
|
|||||||
use MailPoet\Newsletter\NewslettersRepository;
|
use MailPoet\Newsletter\NewslettersRepository;
|
||||||
use MailPoet\Router\Endpoints\Track;
|
use MailPoet\Router\Endpoints\Track;
|
||||||
use MailPoet\Router\Router;
|
use MailPoet\Router\Router;
|
||||||
|
use MailPoet\Segments\SegmentsRepository;
|
||||||
use MailPoet\Segments\SubscribersFinder;
|
use MailPoet\Segments\SubscribersFinder;
|
||||||
use MailPoet\Settings\SettingsController;
|
use MailPoet\Settings\SettingsController;
|
||||||
use MailPoet\Settings\SettingsRepository;
|
use MailPoet\Settings\SettingsRepository;
|
||||||
@ -72,6 +72,8 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
private $newslettersRepository;
|
private $newslettersRepository;
|
||||||
/** @var SubscribersFinder */
|
/** @var SubscribersFinder */
|
||||||
private $subscribersFinder;
|
private $subscribersFinder;
|
||||||
|
/** @var SegmentsRepository */
|
||||||
|
private $segmentsRepository;
|
||||||
|
|
||||||
public function _before() {
|
public function _before() {
|
||||||
parent::_before();
|
parent::_before();
|
||||||
@ -120,8 +122,9 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
$this->statsNotificationsWorker = Stub::makeEmpty(StatsNotificationsScheduler::class);
|
$this->statsNotificationsWorker = Stub::makeEmpty(StatsNotificationsScheduler::class);
|
||||||
$this->loggerFactory = LoggerFactory::getInstance();
|
$this->loggerFactory = LoggerFactory::getInstance();
|
||||||
$this->cronHelper = $this->diContainer->get(CronHelper::class);
|
$this->cronHelper = $this->diContainer->get(CronHelper::class);
|
||||||
|
$this->newslettersRepository = $this->diContainer->get(NewslettersRepository::class);
|
||||||
|
$this->segmentsRepository = $this->diContainer->get(SegmentsRepository::class);
|
||||||
$this->sendingQueueWorker = $this->getSendingQueueWorker(Stub::makeEmpty(NewslettersRepository::class, ['findOneById' => new NewsletterEntity()]));
|
$this->sendingQueueWorker = $this->getSendingQueueWorker(Stub::makeEmpty(NewslettersRepository::class, ['findOneById' => new NewsletterEntity()]));
|
||||||
$this->newslettersRepository = ContainerWrapper::getInstance()->get(NewslettersRepository::class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getDirectUnsubscribeURL() {
|
private function getDirectUnsubscribeURL() {
|
||||||
@ -165,7 +168,8 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
$this->loggerFactory,
|
$this->loggerFactory,
|
||||||
Stub::makeEmpty(NewslettersRepository::class),
|
Stub::makeEmpty(NewslettersRepository::class),
|
||||||
$this->cronHelper,
|
$this->cronHelper,
|
||||||
$this->subscribersFinder
|
$this->subscribersFinder,
|
||||||
|
$this->segmentsRepository
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
$sendingQueueWorker->process();
|
$sendingQueueWorker->process();
|
||||||
@ -188,6 +192,7 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
Stub::makeEmpty(NewslettersRepository::class),
|
Stub::makeEmpty(NewslettersRepository::class),
|
||||||
$this->cronHelper,
|
$this->cronHelper,
|
||||||
$this->subscribersFinder,
|
$this->subscribersFinder,
|
||||||
|
$this->segmentsRepository,
|
||||||
Stub::make(
|
Stub::make(
|
||||||
new MailerTask(),
|
new MailerTask(),
|
||||||
[
|
[
|
||||||
@ -227,6 +232,7 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
Stub::makeEmpty(NewslettersRepository::class),
|
Stub::makeEmpty(NewslettersRepository::class),
|
||||||
$this->cronHelper,
|
$this->cronHelper,
|
||||||
$this->subscribersFinder,
|
$this->subscribersFinder,
|
||||||
|
$this->segmentsRepository,
|
||||||
Stub::make(
|
Stub::make(
|
||||||
new MailerTask(),
|
new MailerTask(),
|
||||||
[
|
[
|
||||||
@ -264,7 +270,8 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
$this->loggerFactory,
|
$this->loggerFactory,
|
||||||
Stub::makeEmpty(NewslettersRepository::class),
|
Stub::makeEmpty(NewslettersRepository::class),
|
||||||
$this->cronHelper,
|
$this->cronHelper,
|
||||||
$this->subscribersFinder
|
$this->subscribersFinder,
|
||||||
|
$this->segmentsRepository
|
||||||
);
|
);
|
||||||
$sendingQueueWorker->process();
|
$sendingQueueWorker->process();
|
||||||
}
|
}
|
||||||
@ -743,6 +750,7 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
Stub::makeEmpty(NewslettersRepository::class),
|
Stub::makeEmpty(NewslettersRepository::class),
|
||||||
$this->cronHelper,
|
$this->cronHelper,
|
||||||
$this->subscribersFinder,
|
$this->subscribersFinder,
|
||||||
|
$this->segmentsRepository,
|
||||||
Stub::make(
|
Stub::make(
|
||||||
new MailerTask(),
|
new MailerTask(),
|
||||||
[
|
[
|
||||||
@ -846,6 +854,46 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
expect($refetchedTask->scheduledAt)->equals($inOneHour);
|
expect($refetchedTask->scheduledAt)->equals($inOneHour);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItPauseSendingTaskThatHasTrashedSegment() {
|
||||||
|
$newsletter = $this->createNewsletter(NewsletterEntity::TYPE_STANDARD, 'Subject', NewsletterEntity::STATUS_SENDING);
|
||||||
|
$queue = $this->createQueueWithTaskAndSegment($newsletter, null, ['html' => 'Hello', 'text' => 'Hello']);
|
||||||
|
$segment = $this->createSegment('Segment test', SegmentEntity::TYPE_DEFAULT);
|
||||||
|
$segment->setDeletedAt(new \DateTime());
|
||||||
|
$this->entityManager->flush();
|
||||||
|
$this->addSegmentToNewsletter($newsletter, $segment);
|
||||||
|
|
||||||
|
$sendingQueueWorker = $this->getSendingQueueWorker();
|
||||||
|
$sendingQueueWorker->process();
|
||||||
|
|
||||||
|
$task = $queue->getTask();
|
||||||
|
assert($task instanceof ScheduledTaskEntity);
|
||||||
|
$this->entityManager->refresh($task);
|
||||||
|
$this->entityManager->refresh($newsletter);
|
||||||
|
expect($task->getStatus())->equals(ScheduledTaskEntity::STATUS_PAUSED);
|
||||||
|
expect($newsletter->getStatus())->equals(NewsletterEntity::STATUS_SENDING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItPauseSendingTaskThatHasDeletedSegment() {
|
||||||
|
$newsletter = $this->createNewsletter(NewsletterEntity::TYPE_STANDARD, 'Subject', NewsletterEntity::STATUS_SENDING);
|
||||||
|
$queue = $this->createQueueWithTaskAndSegment($newsletter, null, ['html' => 'Hello', 'text' => 'Hello']);
|
||||||
|
$segment = $this->createSegment('Segment test', SegmentEntity::TYPE_DEFAULT);
|
||||||
|
$this->addSegmentToNewsletter($newsletter, $segment);
|
||||||
|
$this->entityManager->createQueryBuilder()->delete(SegmentEntity::class, 's')
|
||||||
|
->where('s.id = :id')
|
||||||
|
->setParameter('id', $segment->getId())
|
||||||
|
->getQuery()
|
||||||
|
->execute();
|
||||||
|
$sendingQueueWorker = $this->getSendingQueueWorker();
|
||||||
|
$sendingQueueWorker->process();
|
||||||
|
|
||||||
|
$task = $queue->getTask();
|
||||||
|
assert($task instanceof ScheduledTaskEntity);
|
||||||
|
$this->entityManager->refresh($task);
|
||||||
|
$this->entityManager->refresh($newsletter);
|
||||||
|
expect($task->getStatus())->equals(ScheduledTaskEntity::STATUS_PAUSED);
|
||||||
|
expect($newsletter->getStatus())->equals(NewsletterEntity::STATUS_SENDING);
|
||||||
|
}
|
||||||
|
|
||||||
public function _after() {
|
public function _after() {
|
||||||
$this->truncateEntity(SubscriberEntity::class);
|
$this->truncateEntity(SubscriberEntity::class);
|
||||||
$this->truncateEntity(SubscriberSegmentEntity::class);
|
$this->truncateEntity(SubscriberSegmentEntity::class);
|
||||||
@ -861,6 +909,50 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
$this->diContainer->get(SettingsRepository::class)->truncate();
|
$this->diContainer->get(SettingsRepository::class)->truncate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function createNewsletter(string $type, $subject, string $status = NewsletterEntity::STATUS_DRAFT): NewsletterEntity {
|
||||||
|
$newsletter = new NewsletterEntity();
|
||||||
|
$newsletter->setType($type);
|
||||||
|
$newsletter->setSubject($subject);
|
||||||
|
$newsletter->setBody(Fixtures::get('newsletter_body_template'));
|
||||||
|
$newsletter->setStatus($status);
|
||||||
|
$this->entityManager->persist($newsletter);
|
||||||
|
$this->entityManager->flush();
|
||||||
|
return $newsletter;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function createSegment(string $name, string $type): SegmentEntity {
|
||||||
|
$segment = new SegmentEntity($name, $type, 'Description');
|
||||||
|
$this->entityManager->persist($segment);
|
||||||
|
$this->entityManager->flush();
|
||||||
|
return $segment;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function addSegmentToNewsletter(NewsletterEntity $newsletter, SegmentEntity $segment) {
|
||||||
|
$newsletterSegment = new NewsletterSegmentEntity($newsletter, $segment);
|
||||||
|
$newsletter->getNewsletterSegments()->add($newsletterSegment);
|
||||||
|
$this->entityManager->persist($newsletterSegment);
|
||||||
|
$this->entityManager->flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function createQueueWithTaskAndSegment(NewsletterEntity $newsletter, $status = null, $body = null): SendingQueueEntity {
|
||||||
|
$task = new ScheduledTaskEntity();
|
||||||
|
$task->setType(SendingTask::TASK_TYPE);
|
||||||
|
$task->setStatus($status);
|
||||||
|
$this->entityManager->persist($task);
|
||||||
|
|
||||||
|
$queue = new SendingQueueEntity();
|
||||||
|
$queue->setNewsletter($newsletter);
|
||||||
|
$queue->setTask($task);
|
||||||
|
if ($body) {
|
||||||
|
$queue->setNewsletterRenderedBody($body);
|
||||||
|
}
|
||||||
|
$this->entityManager->persist($queue);
|
||||||
|
$newsletter->getQueues()->add($queue);
|
||||||
|
|
||||||
|
$this->entityManager->flush();
|
||||||
|
return $queue;
|
||||||
|
}
|
||||||
|
|
||||||
private function getSendingQueueWorker($newsletterRepositoryMock = null, $mailerMock = null) {
|
private function getSendingQueueWorker($newsletterRepositoryMock = null, $mailerMock = null) {
|
||||||
return new SendingQueueWorker(
|
return new SendingQueueWorker(
|
||||||
$this->sendingErrorHandler,
|
$this->sendingErrorHandler,
|
||||||
@ -869,6 +961,7 @@ class SendingQueueTest extends \MailPoetTest {
|
|||||||
$newsletterRepositoryMock ?: $this->newslettersRepository,
|
$newsletterRepositoryMock ?: $this->newslettersRepository,
|
||||||
$this->cronHelper,
|
$this->cronHelper,
|
||||||
$this->subscribersFinder,
|
$this->subscribersFinder,
|
||||||
|
$this->segmentsRepository,
|
||||||
$mailerMock
|
$mailerMock
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user