Pass Doctrine entity into SendingQueue::processSending(), replace simple usages

[MAILPOET-5682]
This commit is contained in:
Jan Jakes
2023-10-24 14:32:16 +02:00
committed by Jan Jakeš
parent 6307f6c155
commit ff9b2a1adb

View File

@ -133,12 +133,6 @@ class SendingQueue {
continue; continue;
} }
$legacyTask = ScheduledTask::findOne($task->getId());
$legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null;
if (!$legacyQueue) {
continue;
}
if ($task->getInProgress()) { if ($task->getInProgress()) {
if ($this->isTimeout($task)) { if ($this->isTimeout($task)) {
$this->stopProgress($task); $this->stopProgress($task);
@ -152,7 +146,7 @@ class SendingQueue {
try { try {
$this->scheduledTasksRepository->touchAllByIds([$task->getId()]); $this->scheduledTasksRepository->touchAllByIds([$task->getId()]);
$this->processSending($legacyQueue, (int)$timer); $this->processSending($task, (int)$timer);
} catch (\Exception $e) { } catch (\Exception $e) {
$this->stopProgress($task); $this->stopProgress($task);
throw $e; throw $e;
@ -162,24 +156,30 @@ class SendingQueue {
} }
} }
private function processSending(SendingTask $queue, int $timer): void { private function processSending(ScheduledTaskEntity $task, int $timer): void {
$legacyTask = ScheduledTask::findOne($task->getId());
$legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null;
if (!$legacyQueue) {
return;
}
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'sending queue processing', 'sending queue processing',
['task_id' => $queue->taskId] ['task_id' => $task->getId()]
); );
$this->deleteTaskIfNewsletterDoesNotExist($queue); $this->deleteTaskIfNewsletterDoesNotExist($legacyQueue);
$newsletterEntity = $this->newsletterTask->getNewsletterFromQueue($queue); $newsletterEntity = $this->newsletterTask->getNewsletterFromQueue($legacyQueue);
if (!$newsletterEntity) { if (!$newsletterEntity) {
return; return;
} }
// pre-process newsletter (render, replace shortcodes/links, etc.) // pre-process newsletter (render, replace shortcodes/links, etc.)
$newsletterEntity = $this->newsletterTask->preProcessNewsletter($newsletterEntity, $queue); $newsletterEntity = $this->newsletterTask->preProcessNewsletter($newsletterEntity, $legacyQueue);
if (!$newsletterEntity) { if (!$newsletterEntity) {
$this->deleteTask($queue); $this->deleteTask($legacyQueue);
return; return;
} }
@ -211,32 +211,29 @@ class SendingQueue {
if ($newsletterSegmentsIds && !$this->checkDeletedSegments($segmentIdsToCheck)) { if ($newsletterSegmentsIds && !$this->checkDeletedSegments($segmentIdsToCheck)) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'pause task in sending queue due deleted or trashed segment', 'pause task in sending queue due deleted or trashed segment',
['task_id' => $queue->taskId] ['task_id' => $task->getId()]
); );
$queue->status = ScheduledTaskEntity::STATUS_PAUSED; $legacyQueue->status = ScheduledTaskEntity::STATUS_PAUSED;
$queue->save(); $legacyQueue->save();
$this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject); $this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject);
return; return;
} }
// get subscribers // get subscribers
$subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize()); $subscriberBatches = new BatchIterator($task->getId(), $this->getBatchSize());
if ($subscriberBatches->count() === 0) { if ($subscriberBatches->count() === 0) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'no subscribers to process', 'no subscribers to process',
['task_id' => $queue->taskId] ['task_id' => $task->getId()]
); );
$task = $queue->getSendingQueueEntity()->getTask(); $this->scheduledTasksRepository->invalidateTask($task);
if ($task) {
$this->scheduledTasksRepository->invalidateTask($task);
}
return; return;
} }
/** @var int[] $subscribersToProcessIds - it's required for PHPStan */ /** @var int[] $subscribersToProcessIds - it's required for PHPStan */
foreach ($subscriberBatches as $subscribersToProcessIds) { foreach ($subscriberBatches as $subscribersToProcessIds) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'subscriber batch processing', 'subscriber batch processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)] ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId(), 'subscriber_batch_count' => count($subscribersToProcessIds)]
); );
if (!empty($newsletterSegmentsIds[0])) { if (!empty($newsletterSegmentsIds[0])) {
// Check that subscribers are in segments // Check that subscribers are in segments
@ -245,10 +242,10 @@ class SendingQueue {
} catch (InvalidStateException $exception) { } catch (InvalidStateException $exception) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'paused task in sending queue due to problem finding subscribers: ' . $exception->getMessage(), 'paused task in sending queue due to problem finding subscribers: ' . $exception->getMessage(),
['task_id' => $queue->taskId] ['task_id' => $task->getId()]
); );
$queue->status = ScheduledTaskEntity::STATUS_PAUSED; $legacyQueue->status = ScheduledTaskEntity::STATUS_PAUSED;
$queue->save(); $legacyQueue->save();
return; return;
} }
$foundSubscribers = empty($foundSubscribersIds) ? [] : SubscriberModel::whereIn('id', $foundSubscribersIds) $foundSubscribers = empty($foundSubscribersIds) ? [] : SubscriberModel::whereIn('id', $foundSubscribersIds)
@ -272,8 +269,9 @@ class SendingQueue {
$subscribersToProcessIds, $subscribersToProcessIds,
$foundSubscribersIds $foundSubscribersIds
); );
$queue->removeSubscribers($subscribersToRemove);
if (!$queue->countToProcess) { $legacyQueue->removeSubscribers($subscribersToRemove);
if (!$legacyQueue->countToProcess) {
$this->newsletterTask->markNewsletterAsSent($newsletterEntity); $this->newsletterTask->markNewsletterAsSent($newsletterEntity);
continue; continue;
} }
@ -284,15 +282,15 @@ class SendingQueue {
} }
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'before queue chunk processing', 'before queue chunk processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)] ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId(), 'found_subscribers_count' => count($foundSubscribers)]
); );
// reschedule bounce task to run sooner, if needed // reschedule bounce task to run sooner, if needed
$this->reScheduleBounceTask(); $this->reScheduleBounceTask();
if ($newsletterEntity->getStatus() !== NewsletterEntity::STATUS_CORRUPT) { if ($newsletterEntity->getStatus() !== NewsletterEntity::STATUS_CORRUPT) {
$queue = $this->processQueue( $legacyQueue = $this->processQueue(
$queue, $legacyQueue,
$_newsletter, $_newsletter,
$foundSubscribers, $foundSubscribers,
$timer $timer
@ -307,22 +305,22 @@ class SendingQueue {
} }
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'after queue chunk processing', 'after queue chunk processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId()]
); );
if ($queue->status === ScheduledTaskEntity::STATUS_COMPLETED) { if ($legacyQueue->status === ScheduledTaskEntity::STATUS_COMPLETED) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'completed newsletter sending', 'completed newsletter sending',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId()]
); );
$this->newsletterTask->markNewsletterAsSent($newsletterEntity); $this->newsletterTask->markNewsletterAsSent($newsletterEntity);
$this->statsNotificationsScheduler->schedule($newsletterEntity); $this->statsNotificationsScheduler->schedule($newsletterEntity);
} }
$this->enforceSendingAndExecutionLimits($timer); $this->enforceSendingAndExecutionLimits($timer);
} else { } else {
$this->sendingQueuesRepository->pause($queue->getSendingQueueEntity()); $this->sendingQueuesRepository->pause($legacyQueue->getSendingQueueEntity());
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error(
'Can\'t send corrupt newsletter', 'Can\'t send corrupt newsletter',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId()]
); );
} }
} }