settings = $settings; $this->bridge = $bridge; parent::__construct(); $this->subscribersRepository = $subscribersRepository; $this->scheduledTasksRepository = $scheduledTasksRepository; $this->sendingQueuesRepository = $sendingQueuesRepository; $this->statisticsBouncesRepository = $statisticsBouncesRepository; } public function init() { if (!$this->api) { $this->api = new API($this->settings->get(Mailer::MAILER_CONFIG_SETTING_NAME)['mailpoet_api_key']); } } public function checkProcessingRequirements() { return $this->bridge->isMailpoetSendingServiceEnabled(); } public function prepareTaskStrategy(ScheduledTask $task, $timer) { BounceTask::prepareSubscribers($task); if (!ScheduledTaskSubscriber::getUnprocessedCount($task->id)) { ScheduledTaskSubscriber::where('task_id', $task->id)->deleteMany(); return false; } return true; } public function processTaskStrategy(ScheduledTask $task, $timer) { $subscriberBatches = new BatchIterator($task->id, self::BATCH_SIZE); if (count($subscriberBatches) === 0) { ScheduledTaskSubscriber::where('task_id', $task->id)->deleteMany(); return true; // mark completed } $taskSubscribers = new TaskSubscribers($task); foreach ($subscriberBatches as $subscribersToProcessIds) { // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($timer); $subscriberEmails = $this->subscribersRepository->getUndeletedSubscribersEmailsByIds($subscribersToProcessIds); $subscriberEmails = array_column($subscriberEmails, 'email'); $this->processEmails($task, $subscriberEmails); $taskSubscribers->updateProcessedSubscribers($subscribersToProcessIds); } return true; } public function processEmails($task, array $subscriberEmails) { $checkedEmails = $this->api->checkBounces($subscriberEmails); $this->processApiResponse($task, (array)$checkedEmails); } public function processApiResponse($task, array $checkedEmails) { $previousTask = $this->findPreviousTask($task); foreach ($checkedEmails as $email) { if (!isset($email['address'], $email['bounce'])) { continue; } if ($email['bounce'] === self::BOUNCED_HARD) { $subscriber = $this->subscribersRepository->findOneBy(['email' => $email['address']]); if (!$subscriber instanceof SubscriberEntity) continue; $subscriber->setStatus(SubscriberEntity::STATUS_BOUNCED); $this->saveBouncedStatistics($subscriber, $task, $previousTask); } } $this->subscribersRepository->flush(); } public function getNextRunDate() { $date = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); return $date->startOfDay() ->addDay() ->addHours(rand(0, 5)) ->addMinutes(rand(0, 59)) ->addSeconds(rand(0, 59)); } private function findPreviousTask(ScheduledTask $task): ?ScheduledTaskEntity { $taskEntity = $this->scheduledTasksRepository->findOneById($task->id); if (!$taskEntity instanceof ScheduledTaskEntity) return null; return $this->scheduledTasksRepository->findPreviousTask($taskEntity); } private function saveBouncedStatistics(SubscriberEntity $subscriber, ScheduledTask $task, ?ScheduledTaskEntity $previousTask) { $taskEntity = $this->scheduledTasksRepository->findOneById($task->id); if (!$taskEntity instanceof ScheduledTaskEntity) return null; $dateFrom = null; if ($previousTask instanceof ScheduledTaskEntity) { $dateFrom = $previousTask->getScheduledAt(); } $queues = $this->sendingQueuesRepository->findAllForSubscriberSentBetween($subscriber, $taskEntity->getScheduledAt(), $dateFrom); foreach ($queues as $queue) { $newsletter = $queue->getNewsletter(); if ($newsletter instanceof NewsletterEntity) { $statistics = new StatisticsBounceEntity($newsletter, $queue, $subscriber); $this->statisticsBouncesRepository->persist($statistics); } } } }