Files
piratepoet/mailpoet/lib/Cron/Workers/SendingQueue/SendingQueue.php
David Remer 2298b5b061 Be more restrictive when deleting a queue task
getNewsletterFromQueue() has too many conditions under which it returns null. Therefore we can not rely on that method when we wont to delete running tasks with no associated newsletter.
[MAILPOET-4708]
2022-10-10 17:13:17 +02:00

513 lines
17 KiB
PHP

<?php
namespace MailPoet\Cron\Workers\SendingQueue;
use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Workers\Bounce;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Links;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
use MailPoet\Entities\NewsletterEntity;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Entities\SubscriberEntity;
use MailPoet\Logging\LoggerFactory;
use MailPoet\Mailer\MailerError;
use MailPoet\Mailer\MailerLog;
use MailPoet\Mailer\MetaInfo;
use MailPoet\Models\Newsletter;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel;
use MailPoet\Models\Subscriber as SubscriberModel;
use MailPoet\Newsletter\NewslettersRepository;
use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
use MailPoet\Segments\SegmentsRepository;
use MailPoet\Segments\SubscribersFinder;
use MailPoet\Subscribers\SubscribersRepository;
use MailPoet\Tasks\Sending as SendingTask;
use MailPoet\Tasks\Subscribers\BatchIterator;
use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon;
class SendingQueue {
/** @var MailerTask */
public $mailerTask;
/** @var NewsletterTask */
public $newsletterTask;
const TASK_TYPE = 'sending';
const TASK_BATCH_SIZE = 5;
const EMAIL_WITH_INVALID_SEGMENT_OPTION = 'mailpoet_email_with_invalid_segment';
/** @var StatsNotificationsScheduler */
public $statsNotificationsScheduler;
/** @var SendingErrorHandler */
private $errorHandler;
/** @var SendingThrottlingHandler */
private $throttlingHandler;
/** @var MetaInfo */
private $mailerMetaInfo;
/** @var LoggerFactory */
private $loggerFactory;
/** @var NewslettersRepository */
private $newslettersRepository;
/** @var CronHelper */
private $cronHelper;
/** @var SubscribersFinder */
private $subscribersFinder;
/** @var SegmentsRepository */
private $segmentsRepository;
/** @var WPFunctions */
private $wp;
/** @var Links */
private $links;
/** @var ScheduledTasksRepository */
private $scheduledTasksRepository;
/** @var SubscribersRepository */
private $subscribersRepository;
public function __construct(
SendingErrorHandler $errorHandler,
SendingThrottlingHandler $throttlingHandler,
StatsNotificationsScheduler $statsNotificationsScheduler,
LoggerFactory $loggerFactory,
NewslettersRepository $newslettersRepository,
CronHelper $cronHelper,
SubscribersFinder $subscriberFinder,
SegmentsRepository $segmentsRepository,
WPFunctions $wp,
Links $links,
ScheduledTasksRepository $scheduledTasksRepository,
MailerTask $mailerTask,
SubscribersRepository $subscribersRepository,
$newsletterTask = false
) {
$this->errorHandler = $errorHandler;
$this->throttlingHandler = $throttlingHandler;
$this->statsNotificationsScheduler = $statsNotificationsScheduler;
$this->subscribersFinder = $subscriberFinder;
$this->mailerTask = $mailerTask;
$this->newsletterTask = ($newsletterTask) ? $newsletterTask : new NewsletterTask();
$this->segmentsRepository = $segmentsRepository;
$this->mailerMetaInfo = new MetaInfo;
$this->wp = $wp;
$this->loggerFactory = $loggerFactory;
$this->newslettersRepository = $newslettersRepository;
$this->cronHelper = $cronHelper;
$this->links = $links;
$this->scheduledTasksRepository = $scheduledTasksRepository;
$this->subscribersRepository = $subscribersRepository;
}
public function process($timer = false) {
$timer = $timer ?: microtime(true);
$this->enforceSendingAndExecutionLimits($timer);
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $taskEntity) {
$task = ScheduledTask::findOne($taskEntity->getId());
if (!$task instanceof ScheduledTask) continue;
$queue = SendingTask::createFromScheduledTask($task);
if (!$queue instanceof SendingTask) continue;
$task = $queue->task();
if (!$task instanceof ScheduledTask) continue;
if ($this->isInProgress($task)) {
if ($this->isTimeout($task)) {
$this->stopProgress($task);
} else {
continue;
}
}
$this->startProgress($task);
try {
$this->scheduledTasksRepository->touchAllByIds([$queue->taskId]);
$this->processSending($queue, (int)$timer);
} catch (\Exception $e) {
$this->stopProgress($task);
throw $e;
}
$this->stopProgress($task);
}
}
private function processSending(SendingTask $queue, int $timer): void {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'sending queue processing',
['task_id' => $queue->taskId]
);
$this->deleteTaskIfNewsletterDoesNotExist($queue);
$newsletterEntity = $this->newsletterTask->getNewsletterFromQueue($queue);
if (!$newsletterEntity) {
return;
}
// pre-process newsletter (render, replace shortcodes/links, etc.)
$newsletterEntity = $this->newsletterTask->preProcessNewsletter($newsletterEntity, $queue);
if (!$newsletterEntity) {
$this->deleteTask($queue);
return;
}
$newsletter = Newsletter::findOne($newsletterEntity->getId());
if (!$newsletter) {
return;
}
// clone the original object to be used for processing
$_newsletter = (object)$newsletter->asArray();
$_newsletter->options = $newsletterEntity->getOptionsAsArray();
// configure mailer
$this->mailerTask->configureMailer($newsletter);
// get newsletter segments
$newsletterSegmentsIds = $newsletterEntity->getSegmentIds();
// Pause task in case some of related segments was deleted or trashed
if ($newsletterSegmentsIds && !$this->checkDeletedSegments($newsletterSegmentsIds)) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'pause task in sending queue due deleted or trashed segment',
['task_id' => $queue->taskId]
);
$queue->status = ScheduledTaskEntity::STATUS_PAUSED;
$queue->save();
$this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->subject);
return;
}
// get subscribers
$subscriberBatches = new BatchIterator($queue->taskId, $this->getBatchSize());
/** @var int[] $subscribersToProcessIds - it's required for PHPStan */
foreach ($subscriberBatches as $subscribersToProcessIds) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'subscriber batch processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'subscriber_batch_count' => count($subscribersToProcessIds)]
);
if (!empty($newsletterSegmentsIds[0])) {
// Check that subscribers are in segments
$foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds);
$foundSubscribers = empty($foundSubscribersIds) ? [] : SubscriberModel::whereIn('id', $foundSubscribersIds)
->whereNull('deleted_at')
->findMany();
} else {
// No segments = Welcome emails or some Automatic emails.
// Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option
$foundSubscribers = SubscriberModel::whereIn('id', $subscribersToProcessIds)
->where('status', SubscriberModel::STATUS_SUBSCRIBED)
->whereNull('deleted_at')
->findMany();
$foundSubscribersIds = SubscriberModel::extractSubscribersIds($foundSubscribers);
}
// if some subscribers weren't found, remove them from the processing list
if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) {
$subscribersToRemove = array_diff(
$subscribersToProcessIds,
$foundSubscribersIds
);
$queue->removeSubscribers($subscribersToRemove);
if (!$queue->countToProcess) {
$this->newsletterTask->markNewsletterAsSent($newsletterEntity, $queue);
continue;
}
// if there aren't any subscribers to process in batch (e.g. all unsubscribed or were deleted) continue with next batch
if (count($foundSubscribersIds) === 0) {
continue;
}
}
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'before queue chunk processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId, 'found_subscribers_count' => count($foundSubscribers)]
);
// reschedule bounce task to run sooner, if needed
$this->reScheduleBounceTask();
$queue = $this->processQueue(
$queue,
$_newsletter,
$foundSubscribers,
$timer
);
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'after queue chunk processing',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId]
);
if ($queue->status === ScheduledTaskEntity::STATUS_COMPLETED) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'completed newsletter sending',
['newsletter_id' => $newsletter->id, 'task_id' => $queue->taskId]
);
$this->newsletterTask->markNewsletterAsSent($newsletterEntity, $queue);
$this->statsNotificationsScheduler->schedule($newsletterEntity);
}
$this->enforceSendingAndExecutionLimits($timer);
}
}
public function getBatchSize(): int {
return $this->throttlingHandler->getBatchSize();
}
public function processQueue($queue, $newsletter, $subscribers, $timer) {
// determine if processing is done in bulk or individually
$processingMethod = $this->mailerTask->getProcessingMethod();
$preparedNewsletters = [];
$preparedSubscribers = [];
$preparedSubscribersIds = [];
$unsubscribeUrls = [];
$statistics = [];
$metas = [];
$newsletterEntity = $this->newslettersRepository->findOneById($newsletter->id);
foreach ($subscribers as $subscriber) {
$subscriberEntity = $this->subscribersRepository->findOneById($subscriber->id);
if (!$subscriberEntity instanceof SubscriberEntity) {
continue;
}
if (!$newsletterEntity instanceof NewsletterEntity) {
continue;
}
// render shortcodes and replace subscriber data in tracked links
$preparedNewsletters[] =
$this->newsletterTask->prepareNewsletterForSending(
$newsletterEntity,
$subscriberEntity,
$queue
);
// format subscriber name/address according to mailer settings
$preparedSubscribers[] = $this->mailerTask->prepareSubscriberForSending(
$subscriber
);
$preparedSubscribersIds[] = $subscriber->id;
// create personalized instant unsubsribe link
$unsubscribeUrls[] = $this->links->getUnsubscribeUrl($queue, $subscriber->id);
$metas[] = $this->mailerMetaInfo->getNewsletterMetaInfo($newsletter, $subscriberEntity);
// keep track of values for statistics purposes
$statistics[] = [
'newsletter_id' => $newsletter->id,
'subscriber_id' => $subscriber->id,
'queue_id' => $queue->id,
];
if ($processingMethod === 'individual') {
$queue = $this->sendNewsletter(
$queue,
$preparedSubscribersIds[0],
$preparedNewsletters[0],
$preparedSubscribers[0],
$statistics[0],
$timer,
['unsubscribe_url' => $unsubscribeUrls[0], 'meta' => $metas[0]]
);
$preparedNewsletters = [];
$preparedSubscribers = [];
$preparedSubscribersIds = [];
$unsubscribeUrls = [];
$statistics = [];
$metas = [];
}
}
if ($processingMethod === 'bulk') {
$queue = $this->sendNewsletters(
$queue,
$preparedSubscribersIds,
$preparedNewsletters,
$preparedSubscribers,
$statistics,
$timer,
['unsubscribe_url' => $unsubscribeUrls, 'meta' => $metas]
);
}
return $queue;
}
public function sendNewsletter(
SendingTask $sendingTask, $preparedSubscriberId, $preparedNewsletter,
$preparedSubscriber, $statistics, $timer, $extraParams = []
) {
// send newsletter
$sendResult = $this->mailerTask->send(
$preparedNewsletter,
$preparedSubscriber,
$extraParams
);
return $this->processSendResult(
$sendingTask,
$sendResult,
[$preparedSubscriber],
[$preparedSubscriberId],
[$statistics],
$timer
);
}
public function sendNewsletters(
SendingTask $sendingTask, $preparedSubscribersIds, $preparedNewsletters,
$preparedSubscribers, $statistics, $timer, $extraParams = []
) {
// send newsletters
$sendResult = $this->mailerTask->sendBulk(
$preparedNewsletters,
$preparedSubscribers,
$extraParams
);
return $this->processSendResult(
$sendingTask,
$sendResult,
$preparedSubscribers,
$preparedSubscribersIds,
$statistics,
$timer
);
}
/**
* Checks whether some of segments was deleted or trashed
* @param int[] $segmentIds
*/
private function checkDeletedSegments(array $segmentIds): bool {
if (count($segmentIds) === 0) {
return true;
}
$segmentIds = array_unique($segmentIds);
$segments = $this->segmentsRepository->findBy(['id' => $segmentIds]);
// Some segment was deleted from DB
if (count($segmentIds) > count($segments)) {
return false;
}
foreach ($segments as $segment) {
if ($segment->getDeletedAt() !== null) {
return false;
}
}
return true;
}
private function processSendResult(
SendingTask $sendingTask,
$sendResult,
array $preparedSubscribers,
array $preparedSubscribersIds,
array $statistics,
$timer
) {
// log error message and schedule retry/pause sending
if ($sendResult['response'] === false) {
$error = $sendResult['error'];
assert($error instanceof MailerError);
$this->errorHandler->processError($error, $sendingTask, $preparedSubscribersIds, $preparedSubscribers);
} elseif (!$sendingTask->updateProcessedSubscribers($preparedSubscribersIds)) { // update processed/to process list
MailerLog::processError(
'processed_list_update',
sprintf('QUEUE-%d-PROCESSED-LIST-UPDATE', $sendingTask->id),
null,
true
);
}
// log statistics
StatisticsNewslettersModel::createMultiple($statistics);
// update the sent count
$this->mailerTask->updateSentCount();
// enforce execution limits if queue is still being processed
if ($sendingTask->status !== ScheduledTaskEntity::STATUS_COMPLETED) {
$this->enforceSendingAndExecutionLimits($timer);
}
$this->throttlingHandler->processSuccess();
return $sendingTask;
}
public function enforceSendingAndExecutionLimits($timer) {
// abort if execution limit is reached
$this->cronHelper->enforceExecutionLimit($timer);
// abort if sending limit has been reached
MailerLog::enforceExecutionRequirements();
}
private function reScheduleBounceTask() {
$bounceTasks = $this->scheduledTasksRepository->findFutureScheduledByType(Bounce::TASK_TYPE);
if (count($bounceTasks)) {
$bounceTask = reset($bounceTasks);
if (Carbon::createFromTimestamp((int)current_time('timestamp'))->addHours(42)->lessThan($bounceTask->getScheduledAt())) {
$randomOffset = rand(-6 * 60 * 60, 6 * 60 * 60);
$bounceTask->setScheduledAt(Carbon::createFromTimestamp((int)current_time('timestamp'))->addSeconds((36 * 60 * 60) + $randomOffset));
$this->scheduledTasksRepository->persist($bounceTask);
$this->scheduledTasksRepository->flush();
}
}
}
private function isInProgress(ScheduledTask $task): bool {
if (!empty($task->inProgress)) {
// Do not run multiple instances of the task
return true;
}
return false;
}
private function startProgress(ScheduledTask $task): void {
$task->inProgress = true;
$task->save();
}
private function stopProgress(ScheduledTask $task): void {
$task->inProgress = false;
$task->save();
}
private function isTimeout(ScheduledTask $task): bool {
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
$updated = strtotime((string)$task->updatedAt);
if ($updated !== false) {
$updatedAt = Carbon::createFromTimestamp($updated);
}
if (isset($updatedAt) && $updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
return true;
}
return false;
}
private function getExecutionLimit(): int {
return $this->cronHelper->getDaemonExecutionLimit() * 3;
}
private function deleteTaskIfNewsletterDoesNotExist(SendingTask $sendingTask) {
$sendingQueue = $sendingTask->getSendingQueueEntity();
$newsletter = $sendingQueue->getNewsletter();
if ($newsletter !== null) {
return;
}
$this->deleteTask($sendingTask);
}
private function deleteTask(SendingTask $queue) {
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
'delete task in sending queue',
['task_id' => $queue->taskId]
);
$queue->delete();
}
}