Sending queue refactoring WIP [MAILPOET-903]

This commit is contained in:
stoletniy
2017-12-21 23:50:43 +03:00
parent bf8b0c81df
commit c0c57f6b67
17 changed files with 684 additions and 171 deletions

View File

@ -0,0 +1,237 @@
<?php
namespace MailPoet\Cron\Workers\SendingQueue;
use Carbon\Carbon;
use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Workers\SimpleWorker;
use MailPoet\Mailer\MailerLog;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\SendingQueue;
if(!defined('ABSPATH')) exit;
class Migration extends SimpleWorker {
const TASK_TYPE = 'migration';
const BATCH_SIZE = 20;
function checkProcessingRequirements() {
// if migration was completed, don't run it again
$completed_tasks = self::getCompletedTasks();
return empty($completed_tasks);
}
function prepareTask(ScheduledTask $task) {
$unmigrated_queues_count = $this->getUnmigratedQueues()->count();
$unmigrated_queue_subscribers = $this->getTaskIdsForUnmigratedSubscribers();
if($unmigrated_queues_count == 0
&& count($unmigrated_queue_subscribers) == 0
) {
// nothing to migrate
$this->complete($task);
$this->resumeSending();
return false;
}
// pause sending while the migration is in process
$this->pauseSending();
return parent::prepareTask($task);
}
private function pauseSending() {
$mailer_log = MailerLog::getMailerLog();
if(MailerLog::isSendingPaused($mailer_log)) {
// sending is already paused
return false;
}
$mailer_log = MailerLog::setError(
$mailer_log,
'migration',
__('Your sending queue data is being migrated to allow better performance, sending is paused while the migration is in progress and will resume automatically upon completion. This may take a few minutes.')
);
return MailerLog::pauseSending($mailer_log);
}
private function resumeSending() {
$mailer_log = MailerLog::getMailerLog();
if(!MailerLog::isSendingPaused($mailer_log)) {
// sending is not paused
return false;
}
$error = MailerLog::getError($mailer_log);
// only resume sending if it was paused by migration
if(isset($error['operation']) && $error['operation'] === 'migration') {
return MailerLog::resumeSending();
}
}
function processTask(ScheduledTask $task) {
$this->migrateSendingQueues();
$this->migrateSubscribers();
$this->complete($task);
$this->resumeSending();
return true;
}
function getUnmigratedQueues() {
return SendingQueue::where('task_id', 0)
->whereNull('type');
}
function getTaskIdsForUnmigratedSubscribers() {
global $wpdb;
$query = sprintf(
'SELECT queues.`task_id` FROM %1$s queues INNER JOIN %2$s tasks ON queues.`task_id` = tasks.`id` ' .
'WHERE tasks.`type` = "sending" AND (tasks.`status` IS NULL OR tasks.`status` = "paused") ' .
'AND queues.`subscribers` != "" AND queues.`subscribers` != "N;"' .
'AND queues.`count_total` > (SELECT COUNT(*) FROM %3$s subs WHERE subs.`task_id` = queues.`task_id`)',
MP_SENDING_QUEUES_TABLE,
MP_SCHEDULED_TASKS_TABLE,
MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE
);
return $wpdb->get_col($query);
}
/*
* Migrate all sending queues without converting subscriber data
*/
function migrateSendingQueues() {
global $wpdb;
$queues = $this->getUnmigratedQueues()
->select('id')
->findArray();
$column_list = array(
'status',
'priority',
'scheduled_at',
'processed_at',
'created_at',
'updated_at',
'deleted_at'
);
if(!empty($queues)) {
foreach(array_chunk($queues, self::BATCH_SIZE) as $queue_batch) {
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
foreach($queue_batch as $queue) {
// create a new scheduled task of type "sending"
$wpdb->query(sprintf(
'INSERT IGNORE INTO %1$s (`type`, %2$s) ' .
'SELECT "sending", %2$s FROM %3$s WHERE `id` = %4$s',
MP_SCHEDULED_TASKS_TABLE,
'`' . join('`, `', $column_list) . '`',
MP_SENDING_QUEUES_TABLE,
$queue['id']
));
// link the queue with the task via task_id
$new_task_id = $wpdb->insert_id;
$wpdb->query(sprintf(
'UPDATE %1$s SET `task_id` = %2$s WHERE `id` = %3$s',
MP_SENDING_QUEUES_TABLE,
$new_task_id,
$queue['id']
));
}
}
}
return true;
}
/*
* Migrate subscribers for in-progress sending tasks from the `subscribers` field to a separate table
*/
function migrateSubscribers() {
global $wpdb;
// find in-progress queues that have serialized subscribers
$task_ids = $this->getTaskIdsForUnmigratedSubscribers();
// check if subscribers for each one were already migrated
if(!empty($task_ids)) {
$task_ids = $wpdb->get_col(sprintf(
'SELECT queues.`task_id` FROM %1$s queues WHERE queues.`task_id` IN(' . join(',', array_map('intval', $task_ids)) . ') ' .
'AND queues.`count_total` > (SELECT COUNT(*) FROM %2$s subs WHERE subs.`task_id` = queues.`task_id`)',
MP_SENDING_QUEUES_TABLE,
MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE
));
}
if(!empty($task_ids)) {
foreach($task_ids as $task_id) {
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
$this->migrateTaskSubscribers($task_id);
}
}
return true;
}
function migrateTaskSubscribers($task_id) {
global $wpdb;
$migrated_unprocessed_count = ScheduledTaskSubscriber::getUnprocessedCount($task_id);
$migrated_processed_count = ScheduledTaskSubscriber::getProcessedCount($task_id);
$subscribers = $wpdb->get_var(sprintf(
'SELECT `subscribers` FROM %1$s WHERE `task_id` = %2$d ' .
'AND (`count_processed` > %3$d OR `count_to_process` > %4$d)',
MP_SENDING_QUEUES_TABLE,
$task_id,
$migrated_unprocessed_count,
$migrated_processed_count
));
// sanity check
if(empty($subscribers)) {
return false;
}
$subscribers = unserialize($subscribers);
if(!empty($subscribers['to_process'])) {
$subscribers_to_migrate = array_slice($subscribers['to_process'], $migrated_unprocessed_count);
foreach($subscribers_to_migrate as $sub_id) {
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
ScheduledTaskSubscriber::createOrUpdate(array(
'task_id' => $task_id,
'subscriber_id' => $sub_id,
'processed' => ScheduledTaskSubscriber::STATUS_UNPROCESSED
));
}
}
if(!empty($subscribers['processed'])) {
$subscribers_to_migrate = array_slice($subscribers['processed'], $migrated_processed_count);
foreach($subscribers_to_migrate as $sub_id) {
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
ScheduledTaskSubscriber::createOrUpdate(array(
'task_id' => $task_id,
'subscriber_id' => $sub_id,
'processed' => ScheduledTaskSubscriber::STATUS_PROCESSED
));
}
}
return true;
}
static function getNextRunDate() {
// run migration immediately
return Carbon::now();
}
}

View File

@ -6,10 +6,12 @@ use MailPoet\Cron\Workers\SendingQueue\Tasks\Links;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
use MailPoet\Mailer\MailerLog;
use MailPoet\Models\SendingQueue as SendingQueueModel;
use MailPoet\Models\ScheduledTask as ScheduledTaskModel;
use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel;
use MailPoet\Models\Subscriber as SubscriberModel;
use MailPoet\Segments\SubscribersFinder;
use MailPoet\Tasks\Sending as SendingTask;
use MailPoet\Tasks\Subscribers\BatchIterator;
use MailPoet\WP\Hooks as WPHooks;
if(!defined('ABSPATH')) exit;
@ -45,11 +47,7 @@ class SendingQueue {
// get newsletter segments
$newsletter_segments_ids = $this->newsletter_task->getNewsletterSegments($newsletter);
// get subscribers
$queue->subscribers = $queue->getSubscribers();
$subscriber_batches = array_chunk(
$queue->subscribers['to_process'],
$this->batch_size
);
$subscriber_batches = new BatchIterator($queue->task_id, $this->batch_size);
foreach($subscriber_batches as $subscribers_to_process_ids) {
if(!empty($newsletter_segments_ids[0])) {
// Check that subscribers are in segments
@ -72,7 +70,7 @@ class SendingQueue {
$found_subscribers_ids
);
$queue->removeSubscribers($subscribers_to_remove);
if(!count($queue->subscribers['to_process'])) {
if(!$queue->count_to_process) {
$this->newsletter_task->markNewsletterAsSent($newsletter, $queue);
continue;
}
@ -82,7 +80,7 @@ class SendingQueue {
$newsletter,
$found_subscribers
);
if($queue->status === SendingQueueModel::STATUS_COMPLETED) {
if($queue->status === ScheduledTaskModel::STATUS_COMPLETED) {
$this->newsletter_task->markNewsletterAsSent($newsletter, $queue);
}
$this->enforceSendingAndExecutionLimits();
@ -179,7 +177,7 @@ class SendingQueue {
// update the sent count
$this->mailer_task->updateSentCount();
// enforce execution limits if queue is still being processed
if($queue->status !== SendingQueueModel::STATUS_COMPLETED) {
if($queue->status !== ScheduledTaskModel::STATUS_COMPLETED) {
$this->enforceSendingAndExecutionLimits();
}
return $queue;
@ -193,11 +191,6 @@ class SendingQueue {
}
static function getRunningQueues() {
return SendingQueueModel::orderByAsc('priority')
->orderByAsc('created_at')
->whereNull('deleted_at')
->whereNull('status')
->whereNull('type')
->findMany();
return SendingTask::getRunningQueues();
}
}