Convert variable names to camel case
[MAILPOET-1796]
This commit is contained in:
@ -17,26 +17,26 @@ class Migration extends SimpleWorker {
|
||||
|
||||
public function checkProcessingRequirements() {
|
||||
// if migration was completed, don't run it again
|
||||
$completed_tasks = $this->getCompletedTasks();
|
||||
return empty($completed_tasks);
|
||||
$completedTasks = $this->getCompletedTasks();
|
||||
return empty($completedTasks);
|
||||
}
|
||||
|
||||
public function prepareTaskStrategy(ScheduledTask $task, $timer) {
|
||||
$unmigrated_columns = $this->checkUnmigratedColumnsExist();
|
||||
$unmigrated_queues_count = 0;
|
||||
$unmigrated_queue_subscribers = [];
|
||||
$unmigratedColumns = $this->checkUnmigratedColumnsExist();
|
||||
$unmigratedQueuesCount = 0;
|
||||
$unmigratedQueueSubscribers = [];
|
||||
|
||||
if ($unmigrated_columns) {
|
||||
$unmigrated_queues_count = $this->getUnmigratedQueues()->count();
|
||||
$unmigrated_queue_subscribers = $this->getTaskIdsForUnmigratedSubscribers();
|
||||
if ($unmigratedColumns) {
|
||||
$unmigratedQueuesCount = $this->getUnmigratedQueues()->count();
|
||||
$unmigratedQueueSubscribers = $this->getTaskIdsForUnmigratedSubscribers();
|
||||
}
|
||||
|
||||
if (!$unmigrated_columns ||
|
||||
($unmigrated_queues_count == 0
|
||||
&& count($unmigrated_queue_subscribers) == 0)
|
||||
if (!$unmigratedColumns ||
|
||||
($unmigratedQueuesCount == 0
|
||||
&& count($unmigratedQueueSubscribers) == 0)
|
||||
) {
|
||||
// nothing to migrate, complete task
|
||||
$task->processed_at = WPFunctions::get()->currentTime('mysql');
|
||||
$task->processedAt = WPFunctions::get()->currentTime('mysql');
|
||||
$task->status = ScheduledTask::STATUS_COMPLETED;
|
||||
$task->save();
|
||||
$this->resumeSending();
|
||||
@ -49,26 +49,26 @@ class Migration extends SimpleWorker {
|
||||
}
|
||||
|
||||
public function pauseSending() {
|
||||
$mailer_log = MailerLog::getMailerLog();
|
||||
if (MailerLog::isSendingPaused($mailer_log)) {
|
||||
$mailerLog = MailerLog::getMailerLog();
|
||||
if (MailerLog::isSendingPaused($mailerLog)) {
|
||||
// sending is already paused
|
||||
return false;
|
||||
}
|
||||
$mailer_log = MailerLog::setError(
|
||||
$mailer_log,
|
||||
$mailerLog = MailerLog::setError(
|
||||
$mailerLog,
|
||||
'migration',
|
||||
WPFunctions::get()->__('Your sending queue data is being migrated to allow better performance, sending is paused while the migration is in progress and will resume automatically upon completion. This may take a few minutes.')
|
||||
);
|
||||
return MailerLog::pauseSending($mailer_log);
|
||||
return MailerLog::pauseSending($mailerLog);
|
||||
}
|
||||
|
||||
public function resumeSending() {
|
||||
$mailer_log = MailerLog::getMailerLog();
|
||||
if (!MailerLog::isSendingPaused($mailer_log)) {
|
||||
$mailerLog = MailerLog::getMailerLog();
|
||||
if (!MailerLog::isSendingPaused($mailerLog)) {
|
||||
// sending is not paused
|
||||
return false;
|
||||
}
|
||||
$error = MailerLog::getError($mailer_log);
|
||||
$error = MailerLog::getError($mailerLog);
|
||||
// only resume sending if it was paused by migration
|
||||
if (isset($error['operation']) && $error['operation'] === 'migration') {
|
||||
return MailerLog::resumeSending();
|
||||
@ -84,8 +84,8 @@ class Migration extends SimpleWorker {
|
||||
|
||||
private function checkUnmigratedColumnsExist() {
|
||||
global $wpdb;
|
||||
$existing_columns = $wpdb->get_col('DESC ' . SendingQueueModel::$_table);
|
||||
return in_array('type', $existing_columns);
|
||||
$existingColumns = $wpdb->get_col('DESC ' . SendingQueueModel::$_table);
|
||||
return in_array('type', $existingColumns);
|
||||
}
|
||||
|
||||
public function getUnmigratedQueues() {
|
||||
@ -117,7 +117,7 @@ class Migration extends SimpleWorker {
|
||||
->select('id')
|
||||
->findArray();
|
||||
|
||||
$column_list = [
|
||||
$columnList = [
|
||||
'status',
|
||||
'priority',
|
||||
'scheduled_at',
|
||||
@ -128,26 +128,26 @@ class Migration extends SimpleWorker {
|
||||
];
|
||||
|
||||
if (!empty($queues)) {
|
||||
foreach (array_chunk($queues, self::BATCH_SIZE) as $queue_batch) {
|
||||
foreach (array_chunk($queues, self::BATCH_SIZE) as $queueBatch) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
$this->cronHelper->enforceExecutionLimit($timer);
|
||||
|
||||
foreach ($queue_batch as $queue) {
|
||||
foreach ($queueBatch as $queue) {
|
||||
// create a new scheduled task of type "sending"
|
||||
$wpdb->query(sprintf(
|
||||
'INSERT IGNORE INTO %1$s (`type`, %2$s) ' .
|
||||
'SELECT "sending", %2$s FROM %3$s WHERE `id` = %4$s',
|
||||
MP_SCHEDULED_TASKS_TABLE,
|
||||
'`' . join('`, `', $column_list) . '`',
|
||||
'`' . join('`, `', $columnList) . '`',
|
||||
MP_SENDING_QUEUES_TABLE,
|
||||
$queue['id']
|
||||
));
|
||||
// link the queue with the task via task_id
|
||||
$new_task_id = $wpdb->insert_id;
|
||||
$newTaskId = $wpdb->insertId;
|
||||
$wpdb->query(sprintf(
|
||||
'UPDATE %1$s SET `task_id` = %2$s WHERE `id` = %3$s',
|
||||
MP_SENDING_QUEUES_TABLE,
|
||||
$new_task_id,
|
||||
$newTaskId,
|
||||
$queue['id']
|
||||
));
|
||||
}
|
||||
@ -164,43 +164,43 @@ class Migration extends SimpleWorker {
|
||||
global $wpdb;
|
||||
|
||||
// find in-progress queues that have serialized subscribers
|
||||
$task_ids = $this->getTaskIdsForUnmigratedSubscribers();
|
||||
$taskIds = $this->getTaskIdsForUnmigratedSubscribers();
|
||||
|
||||
// check if subscribers for each one were already migrated
|
||||
if (!empty($task_ids)) {
|
||||
$task_ids = $wpdb->get_col(sprintf(
|
||||
'SELECT queues.`task_id` FROM %1$s queues WHERE queues.`task_id` IN(' . join(',', array_map('intval', $task_ids)) . ') ' .
|
||||
if (!empty($taskIds)) {
|
||||
$taskIds = $wpdb->get_col(sprintf(
|
||||
'SELECT queues.`task_id` FROM %1$s queues WHERE queues.`task_id` IN(' . join(',', array_map('intval', $taskIds)) . ') ' .
|
||||
'AND queues.`count_total` > (SELECT COUNT(*) FROM %2$s subs WHERE subs.`task_id` = queues.`task_id`)',
|
||||
MP_SENDING_QUEUES_TABLE,
|
||||
MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE
|
||||
));
|
||||
}
|
||||
|
||||
if (!empty($task_ids)) {
|
||||
foreach ($task_ids as $task_id) {
|
||||
if (!empty($taskIds)) {
|
||||
foreach ($taskIds as $taskId) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
$this->cronHelper->enforceExecutionLimit($timer);
|
||||
|
||||
$this->migrateTaskSubscribers($task_id, $timer);
|
||||
$this->migrateTaskSubscribers($taskId, $timer);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public function migrateTaskSubscribers($task_id, $timer) {
|
||||
public function migrateTaskSubscribers($taskId, $timer) {
|
||||
global $wpdb;
|
||||
|
||||
$migrated_unprocessed_count = ScheduledTaskSubscriber::getUnprocessedCount($task_id);
|
||||
$migrated_processed_count = ScheduledTaskSubscriber::getProcessedCount($task_id);
|
||||
$migratedUnprocessedCount = ScheduledTaskSubscriber::getUnprocessedCount($taskId);
|
||||
$migratedProcessedCount = ScheduledTaskSubscriber::getProcessedCount($taskId);
|
||||
|
||||
$subscribers = $wpdb->get_var(sprintf(
|
||||
'SELECT `subscribers` FROM %1$s WHERE `task_id` = %2$d ' .
|
||||
'AND (`count_processed` > %3$d OR `count_to_process` > %4$d)',
|
||||
MP_SENDING_QUEUES_TABLE,
|
||||
$task_id,
|
||||
$migrated_unprocessed_count,
|
||||
$migrated_processed_count
|
||||
$taskId,
|
||||
$migratedUnprocessedCount,
|
||||
$migratedProcessedCount
|
||||
));
|
||||
|
||||
// sanity check
|
||||
@ -211,28 +211,28 @@ class Migration extends SimpleWorker {
|
||||
$subscribers = unserialize($subscribers);
|
||||
|
||||
if (!empty($subscribers['to_process'])) {
|
||||
$subscribers_to_migrate = array_slice($subscribers['to_process'], $migrated_unprocessed_count);
|
||||
foreach ($subscribers_to_migrate as $sub_id) {
|
||||
$subscribersToMigrate = array_slice($subscribers['to_process'], $migratedUnprocessedCount);
|
||||
foreach ($subscribersToMigrate as $subId) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
$this->cronHelper->enforceExecutionLimit($timer);
|
||||
|
||||
ScheduledTaskSubscriber::createOrUpdate([
|
||||
'task_id' => $task_id,
|
||||
'subscriber_id' => $sub_id,
|
||||
'task_id' => $taskId,
|
||||
'subscriber_id' => $subId,
|
||||
'processed' => ScheduledTaskSubscriber::STATUS_UNPROCESSED,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($subscribers['processed'])) {
|
||||
$subscribers_to_migrate = array_slice($subscribers['processed'], $migrated_processed_count);
|
||||
foreach ($subscribers_to_migrate as $sub_id) {
|
||||
$subscribersToMigrate = array_slice($subscribers['processed'], $migratedProcessedCount);
|
||||
foreach ($subscribersToMigrate as $subId) {
|
||||
// abort if execution limit is reached
|
||||
$this->cron_helper->enforceExecutionLimit($timer);
|
||||
$this->cronHelper->enforceExecutionLimit($timer);
|
||||
|
||||
ScheduledTaskSubscriber::createOrUpdate([
|
||||
'task_id' => $task_id,
|
||||
'subscriber_id' => $sub_id,
|
||||
'task_id' => $taskId,
|
||||
'subscriber_id' => $subId,
|
||||
'processed' => ScheduledTaskSubscriber::STATUS_PROCESSED,
|
||||
]);
|
||||
}
|
||||
|
Reference in New Issue
Block a user