From ff9b2a1adb47a9401846c38f73812ff33d91a396 Mon Sep 17 00:00:00 2001 From: Jan Jakes Date: Tue, 24 Oct 2023 14:32:16 +0200 Subject: [PATCH] Pass Doctrine entity into SendingQueue::processSending(), replace simple usages [MAILPOET-5682] --- .../Workers/SendingQueue/SendingQueue.php | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/mailpoet/lib/Cron/Workers/SendingQueue/SendingQueue.php b/mailpoet/lib/Cron/Workers/SendingQueue/SendingQueue.php index e61428d7e5..aa215a039e 100644 --- a/mailpoet/lib/Cron/Workers/SendingQueue/SendingQueue.php +++ b/mailpoet/lib/Cron/Workers/SendingQueue/SendingQueue.php @@ -133,12 +133,6 @@ class SendingQueue { continue; } - $legacyTask = ScheduledTask::findOne($task->getId()); - $legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null; - if (!$legacyQueue) { - continue; - } - if ($task->getInProgress()) { if ($this->isTimeout($task)) { $this->stopProgress($task); @@ -152,7 +146,7 @@ class SendingQueue { try { $this->scheduledTasksRepository->touchAllByIds([$task->getId()]); - $this->processSending($legacyQueue, (int)$timer); + $this->processSending($task, (int)$timer); } catch (\Exception $e) { $this->stopProgress($task); throw $e; @@ -162,24 +156,30 @@ class SendingQueue { } } - private function processSending(SendingTask $queue, int $timer): void { + private function processSending(ScheduledTaskEntity $task, int $timer): void { + $legacyTask = ScheduledTask::findOne($task->getId()); + $legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null; + if (!$legacyQueue) { + return; + } + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'sending queue processing', - ['task_id' => $queue->taskId] + ['task_id' => $task->getId()] ); - $this->deleteTaskIfNewsletterDoesNotExist($queue); + $this->deleteTaskIfNewsletterDoesNotExist($legacyQueue); - $newsletterEntity = $this->newsletterTask->getNewsletterFromQueue($queue); + $newsletterEntity = $this->newsletterTask->getNewsletterFromQueue($legacyQueue); if (!$newsletterEntity) { return; } // pre-process newsletter (render, replace shortcodes/links, etc.) - $newsletterEntity = $this->newsletterTask->preProcessNewsletter($newsletterEntity, $queue); + $newsletterEntity = $this->newsletterTask->preProcessNewsletter($newsletterEntity, $legacyQueue); if (!$newsletterEntity) { - $this->deleteTask($queue); + $this->deleteTask($legacyQueue); return; } @@ -211,32 +211,29 @@ class SendingQueue { if ($newsletterSegmentsIds && !$this->checkDeletedSegments($segmentIdsToCheck)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'pause task in sending queue due deleted or trashed segment', - ['task_id' => $queue->taskId] + ['task_id' => $task->getId()] ); - $queue->status = ScheduledTaskEntity::STATUS_PAUSED; - $queue->save(); + $legacyQueue->status = ScheduledTaskEntity::STATUS_PAUSED; + $legacyQueue->save(); $this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject); return; } // get subscribers - $subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize()); + $subscriberBatches = new BatchIterator($task->getId(), $this->getBatchSize()); if ($subscriberBatches->count() === 0) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'no subscribers to process', - ['task_id' => $queue->taskId] + ['task_id' => $task->getId()] ); - $task = $queue->getSendingQueueEntity()->getTask(); - if ($task) { - $this->scheduledTasksRepository->invalidateTask($task); - } + $this->scheduledTasksRepository->invalidateTask($task); return; } /** @var int[] $subscribersToProcessIds - it's required for PHPStan */ foreach ($subscriberBatches as $subscribersToProcessIds) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'subscriber batch processing', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)] + ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId(), 'subscriber_batch_count' => count($subscribersToProcessIds)] ); if (!empty($newsletterSegmentsIds[0])) { // Check that subscribers are in segments @@ -245,10 +242,10 @@ class SendingQueue { } catch (InvalidStateException $exception) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'paused task in sending queue due to problem finding subscribers: ' . $exception->getMessage(), - ['task_id' => $queue->taskId] + ['task_id' => $task->getId()] ); - $queue->status = ScheduledTaskEntity::STATUS_PAUSED; - $queue->save(); + $legacyQueue->status = ScheduledTaskEntity::STATUS_PAUSED; + $legacyQueue->save(); return; } $foundSubscribers = empty($foundSubscribersIds) ? [] : SubscriberModel::whereIn('id', $foundSubscribersIds) @@ -272,8 +269,9 @@ class SendingQueue { $subscribersToProcessIds, $foundSubscribersIds ); - $queue->removeSubscribers($subscribersToRemove); - if (!$queue->countToProcess) { + + $legacyQueue->removeSubscribers($subscribersToRemove); + if (!$legacyQueue->countToProcess) { $this->newsletterTask->markNewsletterAsSent($newsletterEntity); continue; } @@ -284,15 +282,15 @@ class SendingQueue { } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'before queue chunk processing', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)] + ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId(), 'found_subscribers_count' => count($foundSubscribers)] ); // reschedule bounce task to run sooner, if needed $this->reScheduleBounceTask(); if ($newsletterEntity->getStatus() !== NewsletterEntity::STATUS_CORRUPT) { - $queue = $this->processQueue( - $queue, + $legacyQueue = $this->processQueue( + $legacyQueue, $_newsletter, $foundSubscribers, $timer @@ -307,22 +305,22 @@ class SendingQueue { } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'after queue chunk processing', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] + ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId()] ); - if ($queue->status === ScheduledTaskEntity::STATUS_COMPLETED) { + if ($legacyQueue->status === ScheduledTaskEntity::STATUS_COMPLETED) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info( 'completed newsletter sending', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] + ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId()] ); $this->newsletterTask->markNewsletterAsSent($newsletterEntity); $this->statsNotificationsScheduler->schedule($newsletterEntity); } $this->enforceSendingAndExecutionLimits($timer); } else { - $this->sendingQueuesRepository->pause($queue->getSendingQueueEntity()); + $this->sendingQueuesRepository->pause($legacyQueue->getSendingQueueEntity()); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'Can\'t send corrupt newsletter', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] + ['newsletter_id' => $newsletter->id, 'task_id' => $task->getId()] ); } }