diff --git a/mailpoet/lib/Cron/Workers/Scheduler.php b/mailpoet/lib/Cron/Workers/Scheduler.php index 58f9b77ecd..0fb8130b13 100644 --- a/mailpoet/lib/Cron/Workers/Scheduler.php +++ b/mailpoet/lib/Cron/Workers/Scheduler.php @@ -152,7 +152,7 @@ class Scheduler { } elseif ($newsletter->getType() === NewsletterEntity::TYPE_WELCOME) { $this->processWelcomeNewsletter($newsletter, $task); } elseif ($newsletter->getType() === NewsletterEntity::TYPE_NOTIFICATION) { - $this->processPostNotificationNewsletter($newsletter, $legacyQueue); + $this->processPostNotificationNewsletter($newsletter, $task); } elseif ($newsletter->getType() === NewsletterEntity::TYPE_STANDARD) { $this->processScheduledStandardNewsletter($newsletter, $legacyQueue); } elseif ($newsletter->getType() === NewsletterEntity::TYPE_AUTOMATIC) { @@ -198,10 +198,10 @@ class Scheduler { return true; } - public function processPostNotificationNewsletter(NewsletterEntity $newsletter, SendingTask $queue) { + public function processPostNotificationNewsletter(NewsletterEntity $newsletter, ScheduledTaskEntity $task) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'process post notification in scheduler', - ['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId] + ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); // ensure that segments exist @@ -209,24 +209,21 @@ class Scheduler { if (empty($segments)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'post notification no segments', - ['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId] + ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); - return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); + $this->deleteQueueOrUpdateNextRunDate($task, $newsletter); + return false; } // ensure that subscribers are in segments - $taskModel = $queue->task(); - $taskEntity = $this->scheduledTasksRepository->findOneById($taskModel->id); - if ($taskEntity instanceof ScheduledTaskEntity) { - $subscribersCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($taskEntity, $segments, $newsletter->getFilterSegmentId()); - } - + $subscribersCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($task, $segments, $newsletter->getFilterSegmentId()); if (empty($subscribersCount)) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'post notification no subscribers', - ['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId, 'segment_ids' => $segments] + ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'segment_ids' => $segments] ); - return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); + $this->deleteQueueOrUpdateNextRunDate($task, $newsletter); + return false; } // create a duplicate newsletter that acts as a history record @@ -235,25 +232,28 @@ class Scheduler { } catch (\Exception $exception) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->error( 'creating post notification history failed', - ['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId, 'error' => $exception->getMessage()] + ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'error' => $exception->getMessage()] ); return false; } // queue newsletter for delivery - $queue->newsletterId = (int)$notificationHistory->getId(); - $queue->updateCount(); - $queue->status = null; - $queue->save(); - $this->updateScheduledTaskEntity($queue); - - // Because there is mixed usage of the old and new model, we want to be sure about the correct state - $this->newslettersRepository->refresh($notificationHistory); - $queue->getSendingQueueEntity(); // This call refreshes sending queue entity + $queue = $task->getSendingQueue(); + if (!$queue) { + $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->error( + 'post notification no queue', + ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] + ); + return false; + } + $queue->setNewsletter($notificationHistory); + $this->sendingQueuesRepository->updateCounts($queue); + $task->setStatus(null); + $this->scheduledTasksRepository->flush(); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_POST_NOTIFICATIONS)->info( 'post notification set status to sending', - ['newsletter_id' => $newsletter->getId(), 'task_id' => $queue->taskId] + ['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()] ); return true; } @@ -418,21 +418,17 @@ class Scheduler { return true; } - public function deleteQueueOrUpdateNextRunDate($queue, NewsletterEntity $newsletter) { + public function deleteQueueOrUpdateNextRunDate(ScheduledTaskEntity $task, NewsletterEntity $newsletter) { if ($newsletter->getOptionValue('intervalType') === PostNotificationScheduler::INTERVAL_IMMEDIATELY) { - $queue->delete(); - $this->updateScheduledTaskEntity($queue, true); - return; + $this->deleteByTask($task); } else { - $nextRunDate = $this->scheduler->getNextRunDate($newsletter->getOptionValue('schedule')); + $nextRunDate = $this->scheduler->getNextRunDateTime($newsletter->getOptionValue('schedule')); if (!$nextRunDate) { - $queue->delete(); - $this->updateScheduledTaskEntity($queue, true); + $this->deleteByTask($task); return; } - $queue->scheduledAt = $nextRunDate; - $queue->save(); - $this->updateScheduledTaskEntity($queue); + $task->setScheduledAt($nextRunDate); + $this->scheduledTasksRepository->flush(); } } diff --git a/mailpoet/lib/Newsletter/Sending/SendingQueuesRepository.php b/mailpoet/lib/Newsletter/Sending/SendingQueuesRepository.php index b94092fba7..2ec5933109 100644 --- a/mailpoet/lib/Newsletter/Sending/SendingQueuesRepository.php +++ b/mailpoet/lib/Newsletter/Sending/SendingQueuesRepository.php @@ -205,4 +205,20 @@ class SendingQueuesRepository extends Repository { $queue->setMeta($meta); $this->flush(); } + + public function updateCounts(SendingQueueEntity $queue, ?int $count = null): void { + if ($count) { + // increment/decrement counts based on known subscriber count, don't exceed the bounds + $queue->setCountProcessed(min($queue->getCountProcessed() + $count, $queue->getCountTotal())); + $queue->setCountToProcess(max($queue->getCountToProcess() - $count, 0)); + } else { + // query DB to update counts, slower but more accurate, to be used if count isn't known + $task = $queue->getTask(); + $processed = $task ? $this->scheduledTaskSubscribersRepository->countProcessed($task) : 0; + $unprocessed = $task ? $this->scheduledTaskSubscribersRepository->countUnprocessed($task) : 0; + $queue->setCountProcessed($processed); + $queue->setCountToProcess($unprocessed); + $queue->setCountTotal($processed + $unprocessed); + } + } }