diff --git a/mailpoet/lib/Newsletter/Sending/ScheduledTaskSubscribersRepository.php b/mailpoet/lib/Newsletter/Sending/ScheduledTaskSubscribersRepository.php index d995666808..99f04412e5 100644 --- a/mailpoet/lib/Newsletter/Sending/ScheduledTaskSubscribersRepository.php +++ b/mailpoet/lib/Newsletter/Sending/ScheduledTaskSubscribersRepository.php @@ -7,6 +7,7 @@ use MailPoet\Entities\ScheduledTaskEntity; use MailPoet\Entities\ScheduledTaskSubscriberEntity; use MailPoet\Entities\SubscriberEntity; use MailPoet\InvalidStateException; +use MailPoetVendor\Doctrine\DBAL\Connection; use MailPoetVendor\Doctrine\ORM\QueryBuilder; /** @@ -86,6 +87,33 @@ class ScheduledTaskSubscribersRepository extends Repository { ->execute(); } + /** + * @param int[] $subscriberIds + */ + public function updateProcessedSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void { + if ($subscriberIds) { + $this->entityManager->createQueryBuilder() + ->update(ScheduledTaskSubscriberEntity::class, 'sts') + ->set('sts.processed', ScheduledTaskSubscriberEntity::STATUS_PROCESSED) + ->where('sts.subscriber IN (:subscriberIds)') + ->andWhere('sts.task = :task') + ->setParameter('subscriberIds', $subscriberIds, Connection::PARAM_INT_ARRAY) + ->setParameter('task', $task) + ->getQuery() + ->execute(); + } + + $this->checkCompleted($task); + } + + private function checkCompleted(ScheduledTaskEntity $task): void { + $count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]); + if ($count === 0) { + $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); + $this->entityManager->flush(); + } + } + private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder { return $this->entityManager ->createQueryBuilder()