Add method for update processed subscribers

[MAILPOET-4369]
This commit is contained in:
Jan Lysý
2023-02-21 12:38:43 +01:00
committed by David Remer
parent 6a1e734633
commit 20e435872f

View File

@ -7,6 +7,7 @@ use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Entities\ScheduledTaskSubscriberEntity; use MailPoet\Entities\ScheduledTaskSubscriberEntity;
use MailPoet\Entities\SubscriberEntity; use MailPoet\Entities\SubscriberEntity;
use MailPoet\InvalidStateException; use MailPoet\InvalidStateException;
use MailPoetVendor\Doctrine\DBAL\Connection;
use MailPoetVendor\Doctrine\ORM\QueryBuilder; use MailPoetVendor\Doctrine\ORM\QueryBuilder;
/** /**
@ -86,6 +87,33 @@ class ScheduledTaskSubscribersRepository extends Repository {
->execute(); ->execute();
} }
/**
* @param int[] $subscriberIds
*/
public function updateProcessedSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void {
if ($subscriberIds) {
$this->entityManager->createQueryBuilder()
->update(ScheduledTaskSubscriberEntity::class, 'sts')
->set('sts.processed', ScheduledTaskSubscriberEntity::STATUS_PROCESSED)
->where('sts.subscriber IN (:subscriberIds)')
->andWhere('sts.task = :task')
->setParameter('subscriberIds', $subscriberIds, Connection::PARAM_INT_ARRAY)
->setParameter('task', $task)
->getQuery()
->execute();
}
$this->checkCompleted($task);
}
private function checkCompleted(ScheduledTaskEntity $task): void {
$count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
if ($count === 0) {
$task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
$this->entityManager->flush();
}
}
private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder { private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder {
return $this->entityManager return $this->entityManager
->createQueryBuilder() ->createQueryBuilder()