Rewrite Scheduler::processPostNotificationNewsletter() using Doctrine

[MAILPOET-4375]
This commit is contained in:
Jan Jakes
2023-10-24 11:49:59 +02:00
committed by Aschepikov
parent 6517c697d6
commit 1ef88a5fb5
2 changed files with 46 additions and 34 deletions

View File

@@ -152,7 +152,7 @@ class Scheduler {
} elseif ($newsletter->getType() === NewsletterEntity::TYPE_WELCOME) { } elseif ($newsletter->getType() === NewsletterEntity::TYPE_WELCOME) {
$this->processWelcomeNewsletter($newsletter, $task); $this->processWelcomeNewsletter($newsletter, $task);
} elseif ($newsletter->getType() === NewsletterEntity::TYPE_NOTIFICATION) { } elseif ($newsletter->getType() === NewsletterEntity::TYPE_NOTIFICATION) {
$this->processPostNotificationNewsletter($newsletter, $legacyQueue); $this->processPostNotificationNewsletter($newsletter, $task);
} elseif ($newsletter->getType() === NewsletterEntity::TYPE_STANDARD) { } elseif ($newsletter->getType() === NewsletterEntity::TYPE_STANDARD) {
$this->processScheduledStandardNewsletter($newsletter, $legacyQueue); $this->processScheduledStandardNewsletter($newsletter, $legacyQueue);
} elseif ($newsletter->getType() === NewsletterEntity::TYPE_AUTOMATIC) { } elseif ($newsletter->getType() === NewsletterEntity::TYPE_AUTOMATIC) {
@@ -198,10 +198,10 @@ class Scheduler {
return true; return true;
} }
public function processPostNotificationNewsletter(NewsletterEntity $newsletter, SendingTask $queue) { public function processPostNotificationNewsletter(NewsletterEntity $newsletter, ScheduledTaskEntity $task) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info(
'process post notification in scheduler', 'process post notification in scheduler',
['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
); );
// ensure that segments exist // ensure that segments exist
@@ -209,24 +209,21 @@ class Scheduler {
if (empty($segments)) { if (empty($segments)) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info(
'post notification no segments', 'post notification no segments',
['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
); );
return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); $this->deleteQueueOrUpdateNextRunDate($task, $newsletter);
return false;
} }
// ensure that subscribers are in segments // ensure that subscribers are in segments
$taskModel = $queue->task(); $subscribersCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($task, $segments, $newsletter->getFilterSegmentId());
$taskEntity = $this->scheduledTasksRepository->findOneById($taskModel->id);
if ($taskEntity instanceof ScheduledTaskEntity) {
$subscribersCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($taskEntity, $segments, $newsletter->getFilterSegmentId());
}
if (empty($subscribersCount)) { if (empty($subscribersCount)) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info(
'post notification no subscribers', 'post notification no subscribers',
['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId, 'segment_ids' => $segments] ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'segment_ids' => $segments]
); );
return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); $this->deleteQueueOrUpdateNextRunDate($task, $newsletter);
return false;
} }
// create a duplicate newsletter that acts as a history record // create a duplicate newsletter that acts as a history record
@@ -235,25 +232,28 @@ class Scheduler {
} catch (\Exception $exception) { } catch (\Exception $exception) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->error( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->error(
'creating post notification history failed', 'creating post notification history failed',
['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId, 'error' => $exception->getMessage()] ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'error' => $exception->getMessage()]
); );
return false; return false;
} }
// queue newsletter for delivery // queue newsletter for delivery
$queue->newsletterId = (int)$notificationHistory->getId(); $queue = $task->getSendingQueue();
$queue->updateCount(); if (!$queue) {
$queue->status = null; $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->error(
$queue->save(); 'post notification no queue',
$this->updateScheduledTaskEntity($queue); ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
);
// Because there is mixed usage of the old and new model, we want to be sure about the correct state return false;
$this->newslettersRepository->refresh($notificationHistory); }
$queue->getSendingQueueEntity(); // This call refreshes sending queue entity $queue->setNewsletter($notificationHistory);
$this->sendingQueuesRepository->updateCounts($queue);
$task->setStatus(null);
$this->scheduledTasksRepository->flush();
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info(
'post notification set status to sending', 'post notification set status to sending',
['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
); );
return true; return true;
} }
@@ -418,21 +418,17 @@ class Scheduler {
return true; return true;
} }
public function deleteQueueOrUpdateNextRunDate($queue, NewsletterEntity $newsletter) { public function deleteQueueOrUpdateNextRunDate(ScheduledTaskEntity $task, NewsletterEntity $newsletter) {
if ($newsletter->getOptionValue('intervalType') === PostNotificationScheduler::INTERVAL_IMMEDIATELY) { if ($newsletter->getOptionValue('intervalType') === PostNotificationScheduler::INTERVAL_IMMEDIATELY) {
$queue->delete(); $this->deleteByTask($task);
$this->updateScheduledTaskEntity($queue, true);
return;
} else { } else {
$nextRunDate = $this->scheduler->getNextRunDate($newsletter->getOptionValue('schedule')); $nextRunDate = $this->scheduler->getNextRunDateTime($newsletter->getOptionValue('schedule'));
if (!$nextRunDate) { if (!$nextRunDate) {
$queue->delete(); $this->deleteByTask($task);
$this->updateScheduledTaskEntity($queue, true);
return; return;
} }
$queue->scheduledAt = $nextRunDate; $task->setScheduledAt($nextRunDate);
$queue->save(); $this->scheduledTasksRepository->flush();
$this->updateScheduledTaskEntity($queue);
} }
} }

View File

@@ -205,4 +205,20 @@ class SendingQueuesRepository extends Repository {
$queue->setMeta($meta); $queue->setMeta($meta);
$this->flush(); $this->flush();
} }
public function updateCounts(SendingQueueEntity $queue, ?int $count = null): void {
if ($count) {
// increment/decrement counts based on known subscriber count, don't exceed the bounds
$queue->setCountProcessed(min($queue->getCountProcessed() + $count, $queue->getCountTotal()));
$queue->setCountToProcess(max($queue->getCountToProcess() - $count, 0));
} else {
// query DB to update counts, slower but more accurate, to be used if count isn't known
$task = $queue->getTask();
$processed = $task ? $this->scheduledTaskSubscribersRepository->countProcessed($task) : 0;
$unprocessed = $task ? $this->scheduledTaskSubscribersRepository->countUnprocessed($task) : 0;
$queue->setCountProcessed($processed);
$queue->setCountToProcess($unprocessed);
$queue->setCountTotal($processed + $unprocessed);
}
}
} }