Add service for automatic decrease sending batch size
[MAILPOET-3588]
This commit is contained in:
@@ -7,6 +7,13 @@ use MailPoet\Mailer\MailerLog;
|
||||
use MailPoet\Tasks\Sending as SendingTask;
|
||||
|
||||
class SendingErrorHandler {
|
||||
/** @var SendingThrottlingHandler */
|
||||
private $throttlingHandler;
|
||||
|
||||
public function __construct(SendingThrottlingHandler $throttlingHandler) {
|
||||
$this->throttlingHandler = $throttlingHandler;
|
||||
}
|
||||
|
||||
public function processError(
|
||||
MailerError $error,
|
||||
SendingTask $sendingTask,
|
||||
@@ -23,7 +30,11 @@ class SendingErrorHandler {
|
||||
if ($error->getRetryInterval() !== null) {
|
||||
MailerLog::processNonBlockingError($error->getOperation(), $error->getMessageWithFailedSubscribers(), $error->getRetryInterval());
|
||||
} else {
|
||||
MailerLog::processError($error->getOperation(), $error->getMessageWithFailedSubscribers());
|
||||
$throttledBatchSize = null;
|
||||
if ($error->getOperation() === MailerError::OPERATION_CONNECT) {
|
||||
$throttledBatchSize = $this->throttlingHandler->throttleBatchSize();
|
||||
}
|
||||
MailerLog::processError($error->getOperation(), $error->getMessageWithFailedSubscribers(), null, false, $throttledBatchSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -30,7 +30,6 @@ class SendingQueue {
|
||||
public $mailerTask;
|
||||
public $newsletterTask;
|
||||
public $batchSize;
|
||||
const BATCH_SIZE = 20;
|
||||
const TASK_BATCH_SIZE = 5;
|
||||
const EMAIL_WITH_INVALID_SEGMENT_OPTION = 'mailpoet_email_with_invalid_segment';
|
||||
|
||||
@@ -40,6 +39,9 @@ class SendingQueue {
|
||||
/** @var SendingErrorHandler */
|
||||
private $errorHandler;
|
||||
|
||||
/** @var SendingThrottlingHandler */
|
||||
private $throttlingHandler;
|
||||
|
||||
/** @var MetaInfo */
|
||||
private $mailerMetaInfo;
|
||||
|
||||
@@ -63,6 +65,7 @@ class SendingQueue {
|
||||
|
||||
public function __construct(
|
||||
SendingErrorHandler $errorHandler,
|
||||
SendingThrottlingHandler $throttlingHandler,
|
||||
StatsNotificationsScheduler $statsNotificationsScheduler,
|
||||
LoggerFactory $loggerFactory,
|
||||
NewslettersRepository $newslettersRepository,
|
||||
@@ -74,6 +77,7 @@ class SendingQueue {
|
||||
$newsletterTask = false
|
||||
) {
|
||||
$this->errorHandler = $errorHandler;
|
||||
$this->throttlingHandler = $throttlingHandler;
|
||||
$this->statsNotificationsScheduler = $statsNotificationsScheduler;
|
||||
$this->subscribersFinder = $subscriberFinder;
|
||||
$this->mailerTask = ($mailerTask) ? $mailerTask : new MailerTask();
|
||||
@@ -81,7 +85,7 @@ class SendingQueue {
|
||||
$this->segmentsRepository = $segmentsRepository;
|
||||
$this->mailerMetaInfo = new MetaInfo;
|
||||
$this->wp = $wp;
|
||||
$this->batchSize = $wp->applyFilters('mailpoet_cron_worker_sending_queue_batch_size', self::BATCH_SIZE);
|
||||
$this->batchSize = $this->throttlingHandler->getBatchSize();
|
||||
$this->loggerFactory = $loggerFactory;
|
||||
$this->newslettersRepository = $newslettersRepository;
|
||||
$this->cronHelper = $cronHelper;
|
||||
@@ -361,6 +365,7 @@ class SendingQueue {
|
||||
if ($sendingTask->status !== ScheduledTaskModel::STATUS_COMPLETED) {
|
||||
$this->enforceSendingAndExecutionLimits($timer);
|
||||
}
|
||||
$this->throttlingHandler->processSuccess();
|
||||
return $sendingTask;
|
||||
}
|
||||
|
||||
|
83
lib/Cron/Workers/SendingQueue/SendingThrottlingHandler.php
Normal file
83
lib/Cron/Workers/SendingQueue/SendingThrottlingHandler.php
Normal file
@@ -0,0 +1,83 @@
|
||||
<?php
|
||||
|
||||
namespace MailPoet\Cron\Workers\SendingQueue;
|
||||
|
||||
use MailPoet\Logging\LoggerFactory;
|
||||
use MailPoet\Settings\SettingsController;
|
||||
use MailPoet\WP\Functions as WPFunctions;
|
||||
use MailPoetVendor\Monolog\Logger;
|
||||
|
||||
class SendingThrottlingHandler {
|
||||
public const BATCH_SIZE = 20;
|
||||
public const SETTINGS_KEY = 'mta_throttling';
|
||||
public const SUCCESS_THRESHOLD_TO_INCREASE = 10;
|
||||
|
||||
/** @var Logger */
|
||||
private $logger;
|
||||
|
||||
/** @var SettingsController */
|
||||
private $settings;
|
||||
|
||||
/** @var WPFunctions */
|
||||
private $wp;
|
||||
|
||||
public function __construct(
|
||||
LoggerFactory $loggerFactory,
|
||||
SettingsController $settings,
|
||||
WPFunctions $wp
|
||||
) {
|
||||
$this->logger = $loggerFactory->getLogger(LoggerFactory::TOPIC_MSS);
|
||||
$this->settings = $settings;
|
||||
$this->wp = $wp;
|
||||
}
|
||||
|
||||
public function getBatchSize(): int {
|
||||
$throttlingSettings = $this->loadSettings();
|
||||
if (isset($throttlingSettings['batch_size'])) {
|
||||
return $throttlingSettings['batch_size'];
|
||||
}
|
||||
return $this->getMaxBatchSize();
|
||||
}
|
||||
|
||||
private function getMaxBatchSize(): int {
|
||||
return $this->wp->applyFilters('mailpoet_cron_worker_sending_queue_batch_size', self::BATCH_SIZE);
|
||||
}
|
||||
|
||||
public function throttleBatchSize(): int {
|
||||
$batchSize = $this->getBatchSize();
|
||||
if ($batchSize > 1) {
|
||||
$batchSize = (int)ceil($this->getBatchSize() / 2);
|
||||
$throttlingSettings['batch_size'] = $batchSize;
|
||||
$this->logger->error("MailPoet throttling: decrease batch_size to: {$batchSize}");
|
||||
$this->saveSettings($throttlingSettings);
|
||||
}
|
||||
|
||||
return $batchSize;
|
||||
}
|
||||
|
||||
public function processSuccess(): void {
|
||||
$throttlingSettings = $this->loadSettings();
|
||||
if (!isset($throttlingSettings['batch_size'])) {
|
||||
return;
|
||||
}
|
||||
$throttlingSettings['success_in_row'] = isset($throttlingSettings['success_in_row']) ? ++$throttlingSettings['success_in_row'] : 1;
|
||||
$this->logger->info("MailPoet throttling: increase success_in_row to: {$throttlingSettings['success_in_row']}");
|
||||
if ($throttlingSettings['success_in_row'] === self::SUCCESS_THRESHOLD_TO_INCREASE) {
|
||||
unset($throttlingSettings['success_in_row']);
|
||||
$throttlingSettings['batch_size'] = min($this->getMaxBatchSize(), $throttlingSettings['batch_size'] * 2);
|
||||
$this->logger->info("MailPoet throttling: increase batch_size to: {$throttlingSettings['batch_size']}");
|
||||
if ($this->getMaxBatchSize() === $throttlingSettings['batch_size']) {
|
||||
unset($throttlingSettings['batch_size']);
|
||||
}
|
||||
}
|
||||
$this->saveSettings($throttlingSettings);
|
||||
}
|
||||
|
||||
private function loadSettings(): ?array {
|
||||
return $this->settings->get(self::SETTINGS_KEY);
|
||||
}
|
||||
|
||||
private function saveSettings(array $settings): void {
|
||||
$this->settings->set(self::SETTINGS_KEY, $settings);
|
||||
}
|
||||
}
|
@@ -146,6 +146,7 @@ class ContainerConfigurator implements IContainerConfigurator {
|
||||
$container->autowire(\MailPoet\Cron\Triggers\WordPress::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\WorkersFactory::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\SendingQueue\SendingErrorHandler::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\SendingQueue\SendingThrottlingHandler::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\StatsNotifications\Scheduler::class);
|
||||
$container->autowire(\MailPoet\Cron\Workers\StatsNotifications\StatsNotificationsRepository::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\StatsNotifications\NewsletterLinkRepository::class)->setPublic(true);
|
||||
|
@@ -104,9 +104,11 @@ class MailerLog {
|
||||
*
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function processError($operation, $errorMessage, $errorCode = null, $pauseSending = false) {
|
||||
public static function processError($operation, $errorMessage, $errorCode = null, $pauseSending = false, $throttledBatchSize = null) {
|
||||
$mailerLog = self::getMailerLog();
|
||||
if (!isset($throttledBatchSize) || $throttledBatchSize === 1) {
|
||||
$mailerLog['retry_attempt']++;
|
||||
}
|
||||
$mailerLog['retry_at'] = time() + self::RETRY_INTERVAL;
|
||||
$mailerLog = self::setError($mailerLog, $operation, $errorMessage, $errorCode);
|
||||
self::updateMailerLog($mailerLog);
|
||||
|
@@ -9,6 +9,7 @@ use MailPoet\Config\Populator;
|
||||
use MailPoet\Cron\CronHelper;
|
||||
use MailPoet\Cron\Workers\SendingQueue\SendingErrorHandler;
|
||||
use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker;
|
||||
use MailPoet\Cron\Workers\SendingQueue\SendingThrottlingHandler;
|
||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
||||
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
|
||||
@@ -151,7 +152,7 @@ class SendingQueueTest extends \MailPoetTest {
|
||||
}
|
||||
|
||||
public function testItConstructs() {
|
||||
expect($this->sendingQueueWorker->batchSize)->equals(SendingQueueWorker::BATCH_SIZE);
|
||||
expect($this->sendingQueueWorker->batchSize)->equals(SendingThrottlingHandler::BATCH_SIZE);
|
||||
expect($this->sendingQueueWorker->mailerTask instanceof MailerTask);
|
||||
expect($this->sendingQueueWorker->newsletterTask instanceof NewsletterTask);
|
||||
}
|
||||
@@ -632,7 +633,7 @@ class SendingQueueTest extends \MailPoetTest {
|
||||
public function testItDoesNotCallMailerWithEmptyBatch() {
|
||||
$queue = $this->queue;
|
||||
$subscribers = [];
|
||||
while (count($subscribers) < 2 * SendingQueueWorker::BATCH_SIZE) {
|
||||
while (count($subscribers) < 2 * SendingThrottlingHandler::BATCH_SIZE) {
|
||||
$subscribers[] = 1234564545 + count($subscribers);
|
||||
}
|
||||
$subscribers[] = $this->subscriber->id();
|
||||
|
Reference in New Issue
Block a user