Add check if sending queue is in progress

[MAILPOET-3608]
This commit is contained in:
Jan Lysý
2021-05-19 10:18:08 +02:00
committed by Veljko V
parent 74dfdd1e4b
commit 6c54c5900c

View File

@ -94,117 +94,135 @@ class SendingQueue {
$this->enforceSendingAndExecutionLimits($timer); $this->enforceSendingAndExecutionLimits($timer);
foreach (self::getRunningQueues() as $queue) { foreach (self::getRunningQueues() as $queue) {
if (!$queue instanceof SendingTask) continue; if (!$queue instanceof SendingTask) continue;
ScheduledTaskModel::touchAllByIds([$queue->taskId]);
$task = $queue->task();
if (!$task instanceof ScheduledTask) continue;
if ($this->isInProgress($task)) continue;
$this->startProgress($task);
try {
ScheduledTaskModel::touchAllByIds([$queue->taskId]);
$this->processSending($queue, $timer);
} catch (\Exception $e) {
$this->stopProgress($task);
throw $e;
}
$this->stopProgress($task);
}
}
private function processSending(SendingTask $queue, int $timer): void {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
'sending queue processing',
['task_id' => $queue->taskId]
);
$newsletter = $this->newsletterTask->getNewsletterFromQueue($queue);
if (!$newsletter) {
return;
}
// pre-process newsletter (render, replace shortcodes/links, etc.)
$newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $queue);
if (!$newsletter) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
'sending queue processing', 'delete task in sending queue',
['task_id' => $queue->taskId] ['task_id' => $queue->taskId]
); );
$newsletter = $this->newsletterTask->getNewsletterFromQueue($queue); $queue->delete();
if (!$newsletter) { return;
continue; }
} // clone the original object to be used for processing
// pre-process newsletter (render, replace shortcodes/links, etc.) $_newsletter = (object)$newsletter->asArray();
$newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $queue); $options = $newsletter->options()->findMany();
if (!$newsletter) { if (!empty($options)) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( $options = array_column($options, 'value', 'name');
'delete task in sending queue', }
['task_id' => $queue->taskId] $_newsletter->options = $options;
); // configure mailer
$queue->delete(); $this->mailerTask->configureMailer($newsletter);
continue; // get newsletter segments
} $newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter);
// clone the original object to be used for processing // Pause task in case some of related segments was deleted or trashed
$_newsletter = (object)$newsletter->asArray(); if ($newsletterSegmentsIds && !$this->checkDeletedSegments($newsletterSegmentsIds)) {
$options = $newsletter->options()->findMany(); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
if (!empty($options)) { 'pause task in sending queue due deleted or trashed segment',
$options = array_column($options, 'value', 'name'); ['task_id' => $queue->taskId]
} );
$_newsletter->options = $options; $queue->status = ScheduledTaskEntity::STATUS_PAUSED;
// configure mailer $queue->save();
$this->mailerTask->configureMailer($newsletter); $this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject);
// get newsletter segments return;
$newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter); }
// Pause task in case some of related segments was deleted or trashed
if ($newsletterSegmentsIds && !$this->checkDeletedSegments($newsletterSegmentsIds)) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
'pause task in sending queue due deleted or trashed segment',
['task_id' => $queue->taskId]
);
$queue->status = ScheduledTaskEntity::STATUS_PAUSED;
$queue->save();
$this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject);
continue;
}
// get subscribers // get subscribers
$subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize()); $subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize());
foreach ($subscriberBatches as $subscribersToProcessIds) { foreach ($subscriberBatches as $subscribersToProcessIds) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
'subscriber batch processing', 'subscriber batch processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)] ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)]
);
if (!empty($newsletterSegmentsIds[0])) {
// Check that subscribers are in segments
$foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds);
$foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds)
->whereNull('deleted_at')
->findMany();
} else {
// No segments = Welcome emails or some Automatic emails.
// Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option
$foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds)
->where('status', SubscriberModel::STATUS_SUBSCRIBED)
->whereNull('deleted_at')
->findMany();
$foundSubscribersIds = SubscriberModel::extractSubscribersIds($foundSubscribers);
}
// if some subscribers weren't found, remove them from the processing list
if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) {
$subscribersToRemove = array_diff(
$subscribersToProcessIds,
$foundSubscribersIds
); );
if (!empty($newsletterSegmentsIds[0])) { $queue->removeSubscribers($subscribersToRemove);
// Check that subscribers are in segments if (!$queue->countToProcess) {
$foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds); $this->newsletterTask->markNewsletterAsSent($newsletter, $queue);
$foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) continue;
->whereNull('deleted_at')
->findMany();
} else {
// No segments = Welcome emails or some Automatic emails.
// Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option
$foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds)
->where('status', SubscriberModel::STATUS_SUBSCRIBED)
->whereNull('deleted_at')
->findMany();
$foundSubscribersIds = SubscriberModel::extractSubscribersIds($foundSubscribers);
} }
// if some subscribers weren't found, remove them from the processing list // if there aren't any subscribers to process in batch (e.g. all unsubscribed or were deleted) continue with next batch
if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) { if (count($foundSubscribersIds) === 0) {
$subscribersToRemove = array_diff( continue;
$subscribersToProcessIds,
$foundSubscribersIds
);
$queue->removeSubscribers($subscribersToRemove);
if (!$queue->countToProcess) {
$this->newsletterTask->markNewsletterAsSent($newsletter, $queue);
continue;
}
// if there aren't any subscribers to process in batch (e.g. all unsubscribed or were deleted) continue with next batch
if (count($foundSubscribersIds) === 0) {
continue;
}
} }
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( }
'before queue chunk processing', $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)] 'before queue chunk processing',
); ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)]
);
// reschedule bounce task to run sooner, if needed // reschedule bounce task to run sooner, if needed
$this->reScheduleBounceTask(); $this->reScheduleBounceTask();
$queue = $this->processQueue( $queue = $this->processQueue(
$queue, $queue,
$_newsletter, $_newsletter,
$foundSubscribers, $foundSubscribers,
$timer $timer
); );
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
'after queue chunk processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId]
);
if ($queue->status === ScheduledTaskModel::STATUS_COMPLETED) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
'after queue chunk processing', 'completed newsletter sending',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId]
); );
if ($queue->status === ScheduledTaskModel::STATUS_COMPLETED) { $this->newsletterTask->markNewsletterAsSent($newsletter, $queue);
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( $newsletter = $this->newslettersRepository->findOneById($newsletter->id);
'completed newsletter sending', assert($newsletter instanceof NewsletterEntity);
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] $this->statsNotificationsScheduler->schedule($newsletter);
);
$this->newsletterTask->markNewsletterAsSent($newsletter, $queue);
$newsletter = $this->newslettersRepository->findOneById($newsletter->id);
assert($newsletter instanceof NewsletterEntity);
$this->statsNotificationsScheduler->schedule($newsletter);
}
$this->enforceSendingAndExecutionLimits($timer);
} }
$this->enforceSendingAndExecutionLimits($timer);
} }
} }
@ -393,4 +411,27 @@ class SendingQueue {
} }
} }
} }
private function isInProgress(ScheduledTask $task): bool {
if (!empty($task->inProgress)) {
// Do not run multiple instances of the task
return true;
}
return false;
}
private function startProgress(ScheduledTask $task): void {
$task->inProgress = true;
$task->save();
}
private function stopProgress(ScheduledTask $task): void {
$task->inProgress = false;
$task->save();
}
public function getExecutionLimit(): int {
// Convert seconds to minutes
return (int)round($this->cronHelper->getDaemonExecutionLimit() * 2 / 60);
}
} }