Replace Paris models with Doctrine entities in BatchIterator
[MAILPOET-4357]
This commit is contained in:
@@ -7,6 +7,7 @@ use MailPoet\Entities\ScheduledTaskEntity;
|
||||
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||
use MailPoet\Entities\SubscriberEntity;
|
||||
use MailPoet\InvalidStateException;
|
||||
use MailPoetVendor\Doctrine\ORM\QueryBuilder;
|
||||
|
||||
/**
|
||||
* @extends Repository<ScheduledTaskSubscriberEntity>
|
||||
@@ -53,4 +54,45 @@ class ScheduledTaskSubscribersRepository extends Repository {
|
||||
$this->flush();
|
||||
return $taskSubscriber;
|
||||
}
|
||||
|
||||
public function countSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId): int {
|
||||
$queryBuilder = $this->entityManager
|
||||
->createQueryBuilder()
|
||||
->select('count(sts.subscriber)');
|
||||
$queryBuilder = $this->prepareSubscriberIdsBatchForTaskQuery($queryBuilder, $taskId, $lastProcessedSubscriberId);
|
||||
$countSubscribers = $queryBuilder
|
||||
->getQuery()
|
||||
->getSingleScalarResult();
|
||||
|
||||
if (is_numeric($countSubscribers)) {
|
||||
return (int)$countSubscribers;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public function getSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId, int $limit): array {
|
||||
$queryBuilder = $this->entityManager
|
||||
->createQueryBuilder()
|
||||
->select('IDENTITY(sts.subscriber) AS subscriber_id');
|
||||
$queryBuilder = $this->prepareSubscriberIdsBatchForTaskQuery($queryBuilder, $taskId, $lastProcessedSubscriberId);
|
||||
$subscribersIds = $queryBuilder
|
||||
->orderBy('sts.subscriber', 'asc')
|
||||
->setMaxResults($limit)
|
||||
->getQuery()
|
||||
->getSingleColumnResult();
|
||||
|
||||
return $subscribersIds;
|
||||
}
|
||||
|
||||
private function prepareSubscriberIdsBatchForTaskQuery(QueryBuilder $queryBuilder, int $taskId, int $lastProcessedSubscriberId): QueryBuilder {
|
||||
return $queryBuilder
|
||||
->from(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->andWhere('sts.task = :taskId')
|
||||
->andWhere('sts.subscriber > :lastProcessedSubscriberId')
|
||||
->andWhere('sts.processed = :status')
|
||||
->setParameter('taskId', $taskId)
|
||||
->setParameter('lastProcessedSubscriberId', $lastProcessedSubscriberId)
|
||||
->setParameter('status', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
|
||||
}
|
||||
}
|
||||
|
@@ -2,7 +2,8 @@
|
||||
|
||||
namespace MailPoet\Tasks\Subscribers;
|
||||
|
||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||
use MailPoet\DI\ContainerWrapper;
|
||||
use MailPoet\Newsletter\Sending\ScheduledTaskSubscribersRepository;
|
||||
|
||||
/**
|
||||
* @implements \Iterator<null, array>
|
||||
@@ -13,6 +14,9 @@ class BatchIterator implements \Iterator, \Countable {
|
||||
private $lastProcessedId = 0;
|
||||
private $batchLastId;
|
||||
|
||||
/** @var ScheduledTaskSubscribersRepository */
|
||||
private $scheduledTaskSubscribersRepository;
|
||||
|
||||
public function __construct(
|
||||
$taskId,
|
||||
$batchSize
|
||||
@@ -24,6 +28,7 @@ class BatchIterator implements \Iterator, \Countable {
|
||||
}
|
||||
$this->taskId = (int)$taskId;
|
||||
$this->batchSize = (int)$batchSize;
|
||||
$this->scheduledTaskSubscribersRepository = ContainerWrapper::getInstance()->get(ScheduledTaskSubscribersRepository::class);
|
||||
}
|
||||
|
||||
public function rewind(): void {
|
||||
@@ -35,11 +40,7 @@ class BatchIterator implements \Iterator, \Countable {
|
||||
*/
|
||||
#[\ReturnTypeWillChange]
|
||||
public function current() {
|
||||
$subscribers = $this->getSubscribers()
|
||||
->orderByAsc('subscriber_id')
|
||||
->limit($this->batchSize)
|
||||
->findArray();
|
||||
$subscribers = array_column($subscribers, 'subscriber_id');
|
||||
$subscribers = $this->scheduledTaskSubscribersRepository->getSubscriberIdsBatchForTask($this->taskId, $this->lastProcessedId, $this->batchSize);
|
||||
$this->batchLastId = end($subscribers);
|
||||
return $subscribers;
|
||||
}
|
||||
@@ -61,13 +62,6 @@ class BatchIterator implements \Iterator, \Countable {
|
||||
}
|
||||
|
||||
public function count(): int {
|
||||
return $this->getSubscribers()->count();
|
||||
}
|
||||
|
||||
private function getSubscribers() {
|
||||
return ScheduledTaskSubscriber::select('subscriber_id')
|
||||
->where('task_id', $this->taskId)
|
||||
->whereGt('subscriber_id', $this->lastProcessedId)
|
||||
->where('processed', ScheduledTaskSubscriber::STATUS_UNPROCESSED);
|
||||
return $this->scheduledTaskSubscribersRepository->countSubscriberIdsBatchForTask($this->taskId, $this->lastProcessedId);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user