errorHandler = $errorHandler; $this->statsNotificationsScheduler = $statsNotificationsScheduler; $this->mailerTask = ($mailerTask) ? $mailerTask : new MailerTask(); $this->newsletterTask = ($newsletterTask) ? $newsletterTask : new NewsletterTask(); $this->mailerMetaInfo = new MetaInfo; $wp = new WPFunctions; $this->batchSize = $wp->applyFilters('mailpoet_cron_worker_sending_queue_batch_size', self::BATCH_SIZE); $this->loggerFactory = $loggerFactory; $this->newslettersRepository = $newslettersRepository; $this->cronHelper = $cronHelper; } public function process($timer = false) { $timer = $timer ?: microtime(true); $this->enforceSendingAndExecutionLimits($timer); foreach (self::getRunningQueues() as $queue) { if (!$queue instanceof SendingTask) continue; ScheduledTaskModel::touchAllByIds([$queue->taskId]); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( 'sending queue processing', ['task_id' => $queue->taskId] ); $newsletter = $this->newsletterTask->getNewsletterFromQueue($queue); if (!$newsletter) { continue; } // pre-process newsletter (render, replace shortcodes/links, etc.) $newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $queue); if (!$newsletter) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( 'delete task in sending queue', ['task_id' => $queue->taskId] ); $queue->delete(); continue; } // clone the original object to be used for processing $_newsletter = (object)$newsletter->asArray(); $options = $newsletter->options()->findMany(); if (!empty($options)) { $options = array_column($options, 'value', 'name'); } $_newsletter->options = $options; // configure mailer $this->mailerTask->configureMailer($newsletter); // get newsletter segments $newsletterSegmentsIds = $this->newsletterTask->getNewsletterSegments($newsletter); // get subscribers $subscriberBatches = new BatchIterator($queue->taskId, $this->batchSize); foreach ($subscriberBatches as $subscribersToProcessIds) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( 'subscriber batch processing', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)] ); if (!empty($newsletterSegmentsIds[0])) { // Check that subscribers are in segments $finder = new SubscribersFinder(); $foundSubscribersIds = $finder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds); $foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) ->whereNull('deleted_at') ->findMany(); } else { // No segments = Welcome emails $foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds) ->where('status', SubscriberModel::STATUS_SUBSCRIBED) ->whereNull('deleted_at') ->findMany(); $foundSubscribersIds = SubscriberModel::extractSubscribersIds($foundSubscribers); } // if some subscribers weren't found, remove them from the processing list if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) { $subscribersToRemove = array_diff( $subscribersToProcessIds, $foundSubscribersIds ); $queue->removeSubscribers($subscribersToRemove); if (!$queue->countToProcess) { $this->newsletterTask->markNewsletterAsSent($newsletter, $queue); continue; } } $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( 'before queue chunk processing', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)] ); $queue = $this->processQueue( $queue, $_newsletter, $foundSubscribers, $timer ); $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( 'after queue chunk processing', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ); if ($queue->status === ScheduledTaskModel::STATUS_COMPLETED) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo( 'completed newsletter sending', ['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId] ); $this->newsletterTask->markNewsletterAsSent($newsletter, $queue); $this->statsNotificationsScheduler->schedule($this->newslettersRepository->findOneById($newsletter->id)); } $this->enforceSendingAndExecutionLimits($timer); } } } public function processQueue($queue, $newsletter, $subscribers, $timer) { // determine if processing is done in bulk or individually $processingMethod = $this->mailerTask->getProcessingMethod(); $preparedNewsletters = []; $preparedSubscribers = []; $preparedSubscribersIds = []; $unsubscribeUrls = []; $statistics = []; $metas = []; foreach ($subscribers as $subscriber) { // render shortcodes and replace subscriber data in tracked links $preparedNewsletters[] = $this->newsletterTask->prepareNewsletterForSending( $newsletter, $subscriber, $queue ); // format subscriber name/address according to mailer settings $preparedSubscribers[] = $this->mailerTask->prepareSubscriberForSending( $subscriber ); $preparedSubscribersIds[] = $subscriber->id; // save personalized unsubsribe link $unsubscribeUrls[] = Links::getUnsubscribeUrl($queue, $subscriber->id); $metas[] = $this->mailerMetaInfo->getNewsletterMetaInfo($newsletter, $subscriber); // keep track of values for statistics purposes $statistics[] = [ 'newsletter_id' => $newsletter->id, 'subscriber_id' => $subscriber->id, 'queue_id' => $queue->id, ]; if ($processingMethod === 'individual') { $queue = $this->sendNewsletter( $queue, $preparedSubscribersIds[0], $preparedNewsletters[0], $preparedSubscribers[0], $statistics[0], $timer, ['unsubscribe_url' => $unsubscribeUrls[0], 'meta' => $metas[0]] ); $preparedNewsletters = []; $preparedSubscribers = []; $preparedSubscribersIds = []; $unsubscribeUrls = []; $statistics = []; } } if ($processingMethod === 'bulk') { $queue = $this->sendNewsletters( $queue, $preparedSubscribersIds, $preparedNewsletters, $preparedSubscribers, $statistics, $timer, ['unsubscribe_url' => $unsubscribeUrls, 'meta' => $metas] ); } return $queue; } public function sendNewsletter( SendingTask $sendingTask, $preparedSubscriberId, $preparedNewsletter, $preparedSubscriber, $statistics, $timer, $extraParams = [] ) { // send newsletter $sendResult = $this->mailerTask->send( $preparedNewsletter, $preparedSubscriber, $extraParams ); return $this->processSendResult( $sendingTask, $sendResult, [$preparedSubscriber], [$preparedSubscriberId], [$statistics], $timer ); } public function sendNewsletters( SendingTask $sendingTask, $preparedSubscribersIds, $preparedNewsletters, $preparedSubscribers, $statistics, $timer, $extraParams = [] ) { // send newsletters $sendResult = $this->mailerTask->sendBulk( $preparedNewsletters, $preparedSubscribers, $extraParams ); return $this->processSendResult( $sendingTask, $sendResult, $preparedSubscribers, $preparedSubscribersIds, $statistics, $timer ); } private function processSendResult( SendingTask $sendingTask, $sendResult, array $preparedSubscribers, array $preparedSubscribersIds, array $statistics, $timer ) { // log error message and schedule retry/pause sending if ($sendResult['response'] === false) { $error = $sendResult['error']; assert($error instanceof MailerError); $this->errorHandler->processError($error, $sendingTask, $preparedSubscribersIds, $preparedSubscribers); } // update processed/to process list if (!$sendingTask->updateProcessedSubscribers($preparedSubscribersIds)) { MailerLog::processError( 'processed_list_update', sprintf('QUEUE-%d-PROCESSED-LIST-UPDATE', $sendingTask->id), null, true ); } // log statistics StatisticsNewslettersModel::createMultiple($statistics); // update the sent count $this->mailerTask->updateSentCount(); // enforce execution limits if queue is still being processed if ($sendingTask->status !== ScheduledTaskModel::STATUS_COMPLETED) { $this->enforceSendingAndExecutionLimits($timer); } return $sendingTask; } public function enforceSendingAndExecutionLimits($timer) { // abort if execution limit is reached $this->cronHelper->enforceExecutionLimit($timer); // abort if sending limit has been reached MailerLog::enforceExecutionRequirements(); } public static function getRunningQueues() { return SendingTask::getRunningQueues(self::TASK_BATCH_SIZE); } }