Pass cron worker timer in process() method instead of a constructor
[MAILPOET-2538]
This commit is contained in:
committed by
Jack Kitterhing
parent
afecae15f5
commit
382df5e034
@@ -21,7 +21,7 @@ class Migration extends SimpleWorker {
|
||||
return empty($completed_tasks);
|
||||
}
|
||||
|
||||
function prepareTaskStrategy(ScheduledTask $task) {
|
||||
function prepareTaskStrategy(ScheduledTask $task, $timer) {
|
||||
$unmigrated_columns = $this->checkUnmigratedColumnsExist();
|
||||
$unmigrated_queues_count = 0;
|
||||
$unmigrated_queue_subscribers = [];
|
||||
@@ -73,9 +73,9 @@ class Migration extends SimpleWorker {
|
||||
}
|
||||
}
|
||||
|
||||
function processTaskStrategy(ScheduledTask $task) {
|
||||
$this->migrateSendingQueues();
|
||||
$this->migrateSubscribers();
|
||||
function processTaskStrategy(ScheduledTask $task, $timer) {
|
||||
$this->migrateSendingQueues($timer);
|
||||
$this->migrateSubscribers($timer);
|
||||
$this->resumeSending();
|
||||
return true;
|
||||
}
|
||||
@@ -108,7 +108,7 @@ class Migration extends SimpleWorker {
|
||||
/*
|
||||
* Migrate all sending queues without converting subscriber data
|
||||
*/
|
||||
function migrateSendingQueues() {
|
||||
function migrateSendingQueues($timer) {
|
||||
global $wpdb;
|
||||
|
||||
$queues = $this->getUnmigratedQueues()
|
||||
@@ -128,7 +128,7 @@ class Migration extends SimpleWorker {
|
||||
if (!empty($queues)) {
|
||||
foreach (array_chunk($queues, self::BATCH_SIZE) as $queue_batch) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($this->timer);
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
|
||||
foreach ($queue_batch as $queue) {
|
||||
// create a new scheduled task of type "sending"
|
||||
@@ -158,7 +158,7 @@ class Migration extends SimpleWorker {
|
||||
/*
|
||||
* Migrate subscribers for in-progress sending tasks from the `subscribers` field to a separate table
|
||||
*/
|
||||
function migrateSubscribers() {
|
||||
function migrateSubscribers($timer) {
|
||||
global $wpdb;
|
||||
|
||||
// find in-progress queues that have serialized subscribers
|
||||
@@ -177,16 +177,16 @@ class Migration extends SimpleWorker {
|
||||
if (!empty($task_ids)) {
|
||||
foreach ($task_ids as $task_id) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($this->timer);
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
|
||||
$this->migrateTaskSubscribers($task_id);
|
||||
$this->migrateTaskSubscribers($task_id, $timer);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function migrateTaskSubscribers($task_id) {
|
||||
function migrateTaskSubscribers($task_id, $timer) {
|
||||
global $wpdb;
|
||||
|
||||
$migrated_unprocessed_count = ScheduledTaskSubscriber::getUnprocessedCount($task_id);
|
||||
@@ -212,7 +212,7 @@ class Migration extends SimpleWorker {
|
||||
$subscribers_to_migrate = array_slice($subscribers['to_process'], $migrated_unprocessed_count);
|
||||
foreach ($subscribers_to_migrate as $sub_id) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($this->timer);
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
|
||||
ScheduledTaskSubscriber::createOrUpdate([
|
||||
'task_id' => $task_id,
|
||||
@@ -226,7 +226,7 @@ class Migration extends SimpleWorker {
|
||||
$subscribers_to_migrate = array_slice($subscribers['processed'], $migrated_processed_count);
|
||||
foreach ($subscribers_to_migrate as $sub_id) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($this->timer);
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
|
||||
ScheduledTaskSubscriber::createOrUpdate([
|
||||
'task_id' => $task_id,
|
||||
|
@@ -25,7 +25,6 @@ use function MailPoet\Util\array_column;
|
||||
class SendingQueue {
|
||||
public $mailer_task;
|
||||
public $newsletter_task;
|
||||
public $timer;
|
||||
public $batch_size;
|
||||
const BATCH_SIZE = 20;
|
||||
const TASK_BATCH_SIZE = 5;
|
||||
@@ -54,7 +53,6 @@ class SendingQueue {
|
||||
LoggerFactory $logger_factory,
|
||||
NewslettersRepository $newsletters_repository,
|
||||
CronHelper $cron_helper,
|
||||
$timer = false,
|
||||
$mailer_task = false,
|
||||
$newsletter_task = false
|
||||
) {
|
||||
@@ -62,7 +60,6 @@ class SendingQueue {
|
||||
$this->stats_notifications_scheduler = $stats_notifications_scheduler;
|
||||
$this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask();
|
||||
$this->newsletter_task = ($newsletter_task) ? $newsletter_task : new NewsletterTask();
|
||||
$this->timer = ($timer) ? $timer : microtime(true);
|
||||
$this->mailerMetaInfo = new MetaInfo;
|
||||
$wp = new WPFunctions;
|
||||
$this->batch_size = $wp->applyFilters('mailpoet_cron_worker_sending_queue_batch_size', self::BATCH_SIZE);
|
||||
@@ -71,8 +68,9 @@ class SendingQueue {
|
||||
$this->cron_helper = $cron_helper;
|
||||
}
|
||||
|
||||
function process() {
|
||||
$this->enforceSendingAndExecutionLimits();
|
||||
function process($timer = false) {
|
||||
$timer = $timer ?: microtime(true);
|
||||
$this->enforceSendingAndExecutionLimits($timer);
|
||||
foreach (self::getRunningQueues() as $queue) {
|
||||
if (!$queue instanceof SendingTask) continue;
|
||||
ScheduledTaskModel::touchAllByIds([$queue->task_id]);
|
||||
@@ -147,7 +145,8 @@ class SendingQueue {
|
||||
$queue = $this->processQueue(
|
||||
$queue,
|
||||
$_newsletter,
|
||||
$found_subscribers
|
||||
$found_subscribers,
|
||||
$timer
|
||||
);
|
||||
$this->logger_factory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addInfo(
|
||||
'after queue chunk processing',
|
||||
@@ -161,12 +160,12 @@ class SendingQueue {
|
||||
$this->newsletter_task->markNewsletterAsSent($newsletter, $queue);
|
||||
$this->stats_notifications_scheduler->schedule($this->newsletters_repository->findOneById($newsletter->id));
|
||||
}
|
||||
$this->enforceSendingAndExecutionLimits();
|
||||
$this->enforceSendingAndExecutionLimits($timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function processQueue($queue, $newsletter, $subscribers) {
|
||||
function processQueue($queue, $newsletter, $subscribers, $timer) {
|
||||
// determine if processing is done in bulk or individually
|
||||
$processing_method = $this->mailer_task->getProcessingMethod();
|
||||
$prepared_newsletters = [];
|
||||
@@ -204,6 +203,7 @@ class SendingQueue {
|
||||
$prepared_newsletters[0],
|
||||
$prepared_subscribers[0],
|
||||
$statistics[0],
|
||||
$timer,
|
||||
['unsubscribe_url' => $unsubscribe_urls[0], 'meta' => $metas[0]]
|
||||
);
|
||||
$prepared_newsletters = [];
|
||||
@@ -220,6 +220,7 @@ class SendingQueue {
|
||||
$prepared_newsletters,
|
||||
$prepared_subscribers,
|
||||
$statistics,
|
||||
$timer,
|
||||
['unsubscribe_url' => $unsubscribe_urls, 'meta' => $metas]
|
||||
);
|
||||
}
|
||||
@@ -228,7 +229,7 @@ class SendingQueue {
|
||||
|
||||
function sendNewsletter(
|
||||
SendingTask $sending_task, $prepared_subscriber_id, $prepared_newsletter,
|
||||
$prepared_subscriber, $statistics, $extra_params = []
|
||||
$prepared_subscriber, $statistics, $timer, $extra_params = []
|
||||
) {
|
||||
// send newsletter
|
||||
$send_result = $this->mailer_task->send(
|
||||
@@ -241,13 +242,14 @@ class SendingQueue {
|
||||
$send_result,
|
||||
[$prepared_subscriber],
|
||||
[$prepared_subscriber_id],
|
||||
[$statistics]
|
||||
[$statistics],
|
||||
$timer
|
||||
);
|
||||
}
|
||||
|
||||
function sendNewsletters(
|
||||
SendingTask $sending_task, $prepared_subscribers_ids, $prepared_newsletters,
|
||||
$prepared_subscribers, $statistics, $extra_params = []
|
||||
$prepared_subscribers, $statistics, $timer, $extra_params = []
|
||||
) {
|
||||
// send newsletters
|
||||
$send_result = $this->mailer_task->sendBulk(
|
||||
@@ -260,7 +262,8 @@ class SendingQueue {
|
||||
$send_result,
|
||||
$prepared_subscribers,
|
||||
$prepared_subscribers_ids,
|
||||
$statistics
|
||||
$statistics,
|
||||
$timer
|
||||
);
|
||||
}
|
||||
|
||||
@@ -269,7 +272,8 @@ class SendingQueue {
|
||||
$send_result,
|
||||
array $prepared_subscribers,
|
||||
array $prepared_subscribers_ids,
|
||||
array $statistics
|
||||
array $statistics,
|
||||
$timer
|
||||
) {
|
||||
// log error message and schedule retry/pause sending
|
||||
if ($send_result['response'] === false) {
|
||||
@@ -292,14 +296,14 @@ class SendingQueue {
|
||||
$this->mailer_task->updateSentCount();
|
||||
// enforce execution limits if queue is still being processed
|
||||
if ($sending_task->status !== ScheduledTaskModel::STATUS_COMPLETED) {
|
||||
$this->enforceSendingAndExecutionLimits();
|
||||
$this->enforceSendingAndExecutionLimits($timer);
|
||||
}
|
||||
return $sending_task;
|
||||
}
|
||||
|
||||
function enforceSendingAndExecutionLimits() {
|
||||
function enforceSendingAndExecutionLimits($timer) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($this->timer);
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
// abort if sending limit has been reached
|
||||
MailerLog::enforceExecutionRequirements();
|
||||
}
|
||||
|
Reference in New Issue
Block a user