From 6c54c5900ca7df6051b8e6a3a70b59e3669fbba8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Lys=C3=BD?= Date: Wed, 19 May 2021 10:18:08 +0200 Subject: [PATCH] Add check if sending queue is in progress [MAILPOET-3608] --- .../Workers/SendingQueue/SendingQueue.php | 235 ++++++++++-------- 1 file changed, 138 insertions(+), 97 deletions(-) diff --git a/lib/Cron/Workers/SendingQueue/SendingQueue.php b/lib/Cron/Workers/SendingQueue/SendingQueue.php index 77b8984b52..55f4664bcb 100644 --- a/lib/Cron/Workers/SendingQueue/SendingQueue.php +++ b/lib/Cron/Workers/SendingQueue/SendingQueue.php @@ -94,117 +94,135 @@ class SendingQueue { $this->enforceSendingAndExecutionLimits($timer); foreach (self::getRunningQueues() as $queue) { if (!$queue instanceof SendingTask) continue; - ScheduledTaskModel::touchAllByIds([$queue->taskId]); + $task = $queue->task(); + if (!$task instanceof ScheduledTask) continue; + if ($this->isInProgress($task)) continue; + + + $this->startProgress($task); + + try { + ScheduledTaskModel::touchAllByIds([$queue->taskId]); + $this->processSending($queue, $timer); + } catch (\Exception $e) { + $this->stopProgress($task); + throw $e; + } + + $this->stopProgress($task); + } + } + + private function processSending(SendingTask $queue, int $timer): void { + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( + 'sending queue processing', + ['task_id' => $queue->taskId] + ); + $newsletter = $this->newsletterTask->getNewsletterFromQueue($queue); + if (!$newsletter) { + return; + } + // pre-process newsletter (render, replace shortcodes/links, etc.) + $newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $queue); + if (!$newsletter) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'sending queue processing', + 'delete task in sending queue', ['task_id' => $queue->taskId] ); - $newsletter = $this->newsletterTask->getNewsletterFromQueue($queue); - if (!$newsletter) { - continue; - } - // pre-process newsletter (render, replace shortcodes/links, etc.) - $newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $queue); - if (!$newsletter) { - $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'delete task in sending queue', - ['task_id' => $queue->taskId] - ); - $queue->delete(); - continue; - } - // clone the original object to be used for processing - $_newsletter = (object)$newsletter->asArray(); - $options = $newsletter->options()->findMany(); - if (!empty($options)) { - $options = array_column($options, 'value', 'name'); - } - $_newsletter->options = $options; - // configure mailer - $this->mailerTask->configureMailer($newsletter); - // get newsletter segments - $newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter); - // Pause task in case some of related segments was deleted or trashed - if ($newsletterSegmentsIds && !$this->checkDeletedSegments($newsletterSegmentsIds)) { - $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'pause task in sending queue due deleted or trashed segment', - ['task_id' => $queue->taskId] - ); - $queue->status = ScheduledTaskEntity::STATUS_PAUSED; - $queue->save(); - $this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject); - continue; - } + $queue->delete(); + return; + } + // clone the original object to be used for processing + $_newsletter = (object)$newsletter->asArray(); + $options = $newsletter->options()->findMany(); + if (!empty($options)) { + $options = array_column($options, 'value', 'name'); + } + $_newsletter->options = $options; + // configure mailer + $this->mailerTask->configureMailer($newsletter); + // get newsletter segments + $newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter); + // Pause task in case some of related segments was deleted or trashed + if ($newsletterSegmentsIds && !$this->checkDeletedSegments($newsletterSegmentsIds)) { + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( + 'pause task in sending queue due deleted or trashed segment', + ['task_id' => $queue->taskId] + ); + $queue->status = ScheduledTaskEntity::STATUS_PAUSED; + $queue->save(); + $this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject); + return; + } - // get subscribers - $subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize()); - foreach ($subscriberBatches as $subscribersToProcessIds) { - $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'subscriber batch processing', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)] + // get subscribers + $subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize()); + foreach ($subscriberBatches as $subscribersToProcessIds) { + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( + 'subscriber batch processing', + ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)] + ); + if (!empty($newsletterSegmentsIds[0])) { + // Check that subscribers are in segments + $foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds); + $foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) + ->whereNull('deleted_at') + ->findMany(); + } else { + // No segments = Welcome emails or some Automatic emails. + // Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option + $foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) + ->where('status', SubscriberModel::STATUS_SUBSCRIBED) + ->whereNull('deleted_at') + ->findMany(); + $foundSubscribersIds = SubscriberModel::extractSubscribersIds($foundSubscribers); + } + // if some subscribers weren't found, remove them from the processing list + if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) { + $subscribersToRemove = array_diff( + $subscribersToProcessIds, + $foundSubscribersIds ); - if (!empty($newsletterSegmentsIds[0])) { - // Check that subscribers are in segments - $foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds); - $foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) - ->whereNull('deleted_at') - ->findMany(); - } else { - // No segments = Welcome emails or some Automatic emails. - // Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option - $foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) - ->where('status', SubscriberModel::STATUS_SUBSCRIBED) - ->whereNull('deleted_at') - ->findMany(); - $foundSubscribersIds = SubscriberModel::extractSubscribersIds($foundSubscribers); + $queue->removeSubscribers($subscribersToRemove); + if (!$queue->countToProcess) { + $this->newsletterTask->markNewsletterAsSent($newsletter, $queue); + continue; } - // if some subscribers weren't found, remove them from the processing list - if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) { - $subscribersToRemove = array_diff( - $subscribersToProcessIds, - $foundSubscribersIds - ); - $queue->removeSubscribers($subscribersToRemove); - if (!$queue->countToProcess) { - $this->newsletterTask->markNewsletterAsSent($newsletter, $queue); - continue; - } - // if there aren't any subscribers to process in batch (e.g. all unsubscribed or were deleted) continue with next batch - if (count($foundSubscribersIds) === 0) { - continue; - } + // if there aren't any subscribers to process in batch (e.g. all unsubscribed or were deleted) continue with next batch + if (count($foundSubscribersIds) === 0) { + continue; } - $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'before queue chunk processing', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)] - ); + } + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( + 'before queue chunk processing', + ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)] + ); - // reschedule bounce task to run sooner, if needed - $this->reScheduleBounceTask(); + // reschedule bounce task to run sooner, if needed + $this->reScheduleBounceTask(); - $queue = $this->processQueue( - $queue, - $_newsletter, - $foundSubscribers, - $timer - ); + $queue = $this->processQueue( + $queue, + $_newsletter, + $foundSubscribers, + $timer + ); + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( + 'after queue chunk processing', + ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] + ); + if ($queue->status === ScheduledTaskModel::STATUS_COMPLETED) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'after queue chunk processing', + 'completed newsletter sending', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ); - if ($queue->status === ScheduledTaskModel::STATUS_COMPLETED) { - $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( - 'completed newsletter sending', - ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] - ); - $this->newsletterTask->markNewsletterAsSent($newsletter, $queue); - $newsletter = $this->newslettersRepository->findOneById($newsletter->id); - assert($newsletter instanceof NewsletterEntity); - $this->statsNotificationsScheduler->schedule($newsletter); - } - $this->enforceSendingAndExecutionLimits($timer); + $this->newsletterTask->markNewsletterAsSent($newsletter, $queue); + $newsletter = $this->newslettersRepository->findOneById($newsletter->id); + assert($newsletter instanceof NewsletterEntity); + $this->statsNotificationsScheduler->schedule($newsletter); } + $this->enforceSendingAndExecutionLimits($timer); } } @@ -393,4 +411,27 @@ class SendingQueue { } } } + + private function isInProgress(ScheduledTask $task): bool { + if (!empty($task->inProgress)) { + // Do not run multiple instances of the task + return true; + } + return false; + } + + private function startProgress(ScheduledTask $task): void { + $task->inProgress = true; + $task->save(); + } + + private function stopProgress(ScheduledTask $task): void { + $task->inProgress = false; + $task->save(); + } + + public function getExecutionLimit(): int { + // Convert seconds to minutes + return (int)round($this->cronHelper->getDaemonExecutionLimit() * 2 / 60); + } }