Replace old model usages by Doctrine
[MAILPOET-4303]
This commit is contained in:
@ -4,11 +4,10 @@ namespace MailPoet\Cron\Workers;
|
|||||||
|
|
||||||
use MailPoet\Entities\NewsletterEntity;
|
use MailPoet\Entities\NewsletterEntity;
|
||||||
use MailPoet\Entities\ScheduledTaskEntity;
|
use MailPoet\Entities\ScheduledTaskEntity;
|
||||||
|
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||||
use MailPoet\Entities\StatisticsBounceEntity;
|
use MailPoet\Entities\StatisticsBounceEntity;
|
||||||
use MailPoet\Entities\SubscriberEntity;
|
use MailPoet\Entities\SubscriberEntity;
|
||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
|
||||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
|
||||||
use MailPoet\Newsletter\Sending\ScheduledTaskSubscribersRepository;
|
use MailPoet\Newsletter\Sending\ScheduledTaskSubscribersRepository;
|
||||||
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
||||||
use MailPoet\Services\Bridge;
|
use MailPoet\Services\Bridge;
|
||||||
@ -16,7 +15,6 @@ use MailPoet\Services\Bridge\API;
|
|||||||
use MailPoet\Settings\SettingsController;
|
use MailPoet\Settings\SettingsController;
|
||||||
use MailPoet\Statistics\StatisticsBouncesRepository;
|
use MailPoet\Statistics\StatisticsBouncesRepository;
|
||||||
use MailPoet\Subscribers\SubscribersRepository;
|
use MailPoet\Subscribers\SubscribersRepository;
|
||||||
use MailPoet\Tasks\Subscribers as TaskSubscribers;
|
|
||||||
use MailPoet\Tasks\Subscribers\BatchIterator;
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
||||||
use MailPoetVendor\Carbon\Carbon;
|
use MailPoetVendor\Carbon\Carbon;
|
||||||
|
|
||||||
@ -78,8 +76,8 @@ class Bounce extends SimpleWorker {
|
|||||||
public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
|
public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
|
||||||
$this->scheduledTaskSubscribersRepository->createSubscribersForBounceWorker($task);
|
$this->scheduledTaskSubscribersRepository->createSubscribersForBounceWorker($task);
|
||||||
|
|
||||||
if (!ScheduledTaskSubscriber::getUnprocessedCount($task->getId())) {
|
if (!$this->scheduledTaskSubscribersRepository->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED])) {
|
||||||
ScheduledTaskSubscriber::where('task_id', $task->getId())->deleteMany();
|
$this->scheduledTaskSubscribersRepository->deleteByScheduledTask($task);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -89,27 +87,21 @@ class Bounce extends SimpleWorker {
|
|||||||
$subscriberBatches = new BatchIterator($task->getId(), self::BATCH_SIZE);
|
$subscriberBatches = new BatchIterator($task->getId(), self::BATCH_SIZE);
|
||||||
|
|
||||||
if (count($subscriberBatches) === 0) {
|
if (count($subscriberBatches) === 0) {
|
||||||
ScheduledTaskSubscriber::where('task_id', $task->getId())->deleteMany();
|
$this->scheduledTaskSubscribersRepository->deleteByScheduledTask($task);
|
||||||
return true; // mark completed
|
return true; // mark completed
|
||||||
}
|
}
|
||||||
|
|
||||||
$parisTask = ScheduledTask::findOne($task->getId());
|
/** @var int[] $subscribersToProcessIds - it's required for PHPStan */
|
||||||
|
foreach ($subscriberBatches as $subscribersToProcessIds) {
|
||||||
|
// abort if execution limit is reached
|
||||||
|
$this->cronHelper->enforceExecutionLimit($timer);
|
||||||
|
|
||||||
if ($parisTask instanceof ScheduledTask) {
|
$subscriberEmails = $this->subscribersRepository->getUndeletedSubscribersEmailsByIds($subscribersToProcessIds);
|
||||||
$taskSubscribers = new TaskSubscribers($parisTask);
|
$subscriberEmails = array_column($subscriberEmails, 'email');
|
||||||
|
|
||||||
/** @var int[] $subscribersToProcessIds - it's required for PHPStan */
|
$this->processEmails($task, $subscriberEmails);
|
||||||
foreach ($subscriberBatches as $subscribersToProcessIds) {
|
|
||||||
// abort if execution limit is reached
|
|
||||||
$this->cronHelper->enforceExecutionLimit($timer);
|
|
||||||
|
|
||||||
$subscriberEmails = $this->subscribersRepository->getUndeletedSubscribersEmailsByIds($subscribersToProcessIds);
|
$this->scheduledTaskSubscribersRepository->updateProcessedSubscribers($task, $subscribersToProcessIds);
|
||||||
$subscriberEmails = array_column($subscriberEmails, 'email');
|
|
||||||
|
|
||||||
$this->processEmails($task, $subscriberEmails);
|
|
||||||
|
|
||||||
$taskSubscribers->updateProcessedSubscribers($subscribersToProcessIds);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -125,6 +125,15 @@ class ScheduledTaskSubscribersRepository extends Repository {
|
|||||||
$stmt->executeQuery();
|
$stmt->executeQuery();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function deleteByScheduledTask(ScheduledTaskEntity $scheduledTask): void {
|
||||||
|
$this->entityManager->createQueryBuilder()
|
||||||
|
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||||
|
->where('sts.task = :task')
|
||||||
|
->setParameter('task', $scheduledTask)
|
||||||
|
->getQuery()
|
||||||
|
->execute();
|
||||||
|
}
|
||||||
|
|
||||||
private function checkCompleted(ScheduledTaskEntity $task): void {
|
private function checkCompleted(ScheduledTaskEntity $task): void {
|
||||||
$count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
|
$count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
|
||||||
if ($count === 0) {
|
if ($count === 0) {
|
||||||
|
Reference in New Issue
Block a user