Extract subscribers to a separate table in the Bounce worker [MAILPOET-940]

This commit is contained in:
stoletniy
2017-07-11 16:30:44 +03:00
parent b24c51d800
commit 6a9b8d88c2
8 changed files with 162 additions and 83 deletions

View File

@ -70,6 +70,7 @@ class Database {
$subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field'; $subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field';
$newsletter_segment = Env::$db_prefix . 'newsletter_segment'; $newsletter_segment = Env::$db_prefix . 'newsletter_segment';
$scheduled_tasks = Env::$db_prefix . 'scheduled_tasks'; $scheduled_tasks = Env::$db_prefix . 'scheduled_tasks';
$scheduled_task_subscribers = Env::$db_prefix . 'scheduled_task_subscribers';
$sending_queues = Env::$db_prefix . 'sending_queues'; $sending_queues = Env::$db_prefix . 'sending_queues';
$newsletters = Env::$db_prefix . 'newsletters'; $newsletters = Env::$db_prefix . 'newsletters';
$newsletter_templates = Env::$db_prefix . 'newsletter_templates'; $newsletter_templates = Env::$db_prefix . 'newsletter_templates';
@ -92,6 +93,7 @@ class Database {
define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment); define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment);
define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field); define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field);
define('MP_SCHEDULED_TASKS_TABLE', $scheduled_tasks); define('MP_SCHEDULED_TASKS_TABLE', $scheduled_tasks);
define('MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE', $scheduled_task_subscribers);
define('MP_SENDING_QUEUES_TABLE', $sending_queues); define('MP_SENDING_QUEUES_TABLE', $sending_queues);
define('MP_NEWSLETTERS_TABLE', $newsletters); define('MP_NEWSLETTERS_TABLE', $newsletters);
define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates); define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates);

View File

@ -18,6 +18,7 @@ class Migrator {
'settings', 'settings',
'custom_fields', 'custom_fields',
'scheduled_tasks', 'scheduled_tasks',
'scheduled_task_subscribers',
'sending_queues', 'sending_queues',
'subscribers', 'subscribers',
'subscriber_segment', 'subscriber_segment',
@ -109,12 +110,8 @@ class Migrator {
$attributes = array( $attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,', 'id mediumint(9) NOT NULL AUTO_INCREMENT,',
'type varchar(90) NULL DEFAULT NULL,', 'type varchar(90) NULL DEFAULT NULL,',
'subscribers longtext,',
'status varchar(12) NULL DEFAULT NULL,', 'status varchar(12) NULL DEFAULT NULL,',
'priority mediumint(9) NOT NULL DEFAULT 0,', 'priority mediumint(9) NOT NULL DEFAULT 0,',
'count_total mediumint(9) NOT NULL DEFAULT 0,',
'count_processed mediumint(9) NOT NULL DEFAULT 0,',
'count_to_process mediumint(9) NOT NULL DEFAULT 0,',
'scheduled_at TIMESTAMP NULL,', 'scheduled_at TIMESTAMP NULL,',
'processed_at TIMESTAMP NULL,', 'processed_at TIMESTAMP NULL,',
'created_at TIMESTAMP NULL,', 'created_at TIMESTAMP NULL,',
@ -125,6 +122,17 @@ class Migrator {
return $this->sqlify(__FUNCTION__, $attributes); return $this->sqlify(__FUNCTION__, $attributes);
} }
function scheduledTaskSubscribers() {
$attributes = array(
'task_id mediumint(9) NOT NULL,',
'subscriber_id mediumint(9) NOT NULL,',
'processed int(1) NULL,',
'created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,',
'PRIMARY KEY (task_id, subscriber_id)',
);
return $this->sqlify(__FUNCTION__, $attributes);
}
function sendingQueues() { function sendingQueues() {
$attributes = array( $attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,', 'id mediumint(9) NOT NULL AUTO_INCREMENT,',

View File

@ -4,6 +4,8 @@ namespace MailPoet\Cron\Workers;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Mailer\Mailer; use MailPoet\Mailer\Mailer;
use MailPoet\Models\ScheduledTask; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Tasks\Subscribers\BatchIterator;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
use MailPoet\Services\Bridge; use MailPoet\Services\Bridge;
use MailPoet\Services\Bridge\API; use MailPoet\Services\Bridge\API;
@ -48,28 +50,19 @@ class Bounce extends SimpleWorker {
} }
// update current task // update current task
$task->subscribers = serialize( ScheduledTaskSubscriber::addSubscribers($task->id, $subscribers);
array(
'to_process' => $subscribers
)
);
$task->count_total = $task->count_to_process = count($subscribers);
return parent::prepareTask($task); return parent::prepareTask($task);
} }
function processTask(ScheduledTask $task) { function processTask(ScheduledTask $task) {
$task->subscribers = $task->getSubscribers(); $subscriber_batches = new BatchIterator($task->id, self::BATCH_SIZE);
if(empty($task->subscribers['to_process'])) {
if(count($subscriber_batches) === 0) {
$task->delete(); $task->delete();
return false; return false;
} }
$subscriber_batches = array_chunk(
$task->subscribers['to_process'],
self::BATCH_SIZE
);
foreach($subscriber_batches as $subscribers_to_process_ids) { foreach($subscriber_batches as $subscribers_to_process_ids) {
// abort if execution limit is reached // abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer); CronHelper::enforceExecutionLimit($this->timer);

View File

@ -11,6 +11,10 @@ class ScheduledTask extends Model {
const PRIORITY_MEDIUM = 5; const PRIORITY_MEDIUM = 5;
const PRIORITY_LOW = 10; const PRIORITY_LOW = 10;
function taskSubscribers() {
return $this->has_many(__NAMESPACE__ . '\ScheduledTaskSubscriber', 'task_id', 'id');
}
function complete() { function complete() {
$this->set('status', self::STATUS_COMPLETED); $this->set('status', self::STATUS_COMPLETED);
$this->save(); $this->save();
@ -18,79 +22,28 @@ class ScheduledTask extends Model {
} }
function save() { function save() {
if(!is_serialized($this->subscribers)) {
$this->set('subscribers', serialize($this->subscribers));
}
// set the default priority to medium // set the default priority to medium
if(!$this->priority) { if(!$this->priority) {
$this->priority = self::PRIORITY_MEDIUM; $this->priority = self::PRIORITY_MEDIUM;
} }
parent::save(); parent::save();
$this->subscribers = $this->getSubscribers();
return $this; return $this;
} }
function getSubscribers() { function updateProcessedSubscribers(array $processed_subscribers) {
if(!is_serialized($this->subscribers)) { $this->taskSubscribers()
return $this->subscribers; ->whereIn('subscriber_id', $processed_subscribers)
} ->findResultSet()
$subscribers = unserialize($this->subscribers); ->set('processed', ScheduledTaskSubscriber::STATUS_PROCESSED)
if(empty($subscribers['processed'])) { ->save();
$subscribers['processed'] = array(); $this->checkCompleted();
}
return $subscribers;
} }
function isSubscriberProcessed($subscriber_id) { private function checkCompleted() {
$subscribers = $this->getSubscribers(); if(!ScheduledTaskSubscriber::getToProcessCount($this->id)) {
return in_array($subscriber_id, $subscribers['processed']);
}
function asArray() {
$model = parent::asArray();
$model['subscribers'] = (is_serialized($this->subscribers))
? unserialize($this->subscribers)
: $this->subscribers;
return $model;
}
function removeNonexistentSubscribers($subscribers_to_remove) {
$subscribers = $this->getSubscribers();
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$subscribers_to_remove
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateProcessedSubscribers($processed_subscribers) {
$subscribers = $this->getSubscribers();
$subscribers['processed'] = array_merge(
$subscribers['processed'],
$processed_subscribers
);
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$processed_subscribers
)
);
$this->subscribers = $subscribers;
$this->updateCount();
}
function updateCount() {
$this->subscribers = $this->getSubscribers();
$this->count_processed = count($this->subscribers['processed']);
$this->count_to_process = count($this->subscribers['to_process']);
$this->count_total = $this->count_processed + $this->count_to_process;
if(!$this->count_to_process) {
$this->processed_at = current_time('mysql'); $this->processed_at = current_time('mysql');
$this->status = self::STATUS_COMPLETED; $this->status = self::STATUS_COMPLETED;
}
return $this->save(); return $this->save();
} }
} }
}

View File

@ -0,0 +1,61 @@
<?php
namespace MailPoet\Models;
if(!defined('ABSPATH')) exit;
class ScheduledTaskSubscriber extends Model {
const STATUS_TO_PROCESS = 0;
const STATUS_PROCESSED = 1;
public static $_table = MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE;
public static $_id_column = array('task_id', 'subscriber_id');
static function createOrUpdate($data = array()) {
if(!is_array($data) || empty($data['task_id']) || empty($data['subscriber_id'])) return;
$task_subscriber = self::where('subscriber_id', $data['subscriber_id'])
->where('task_id', $data['task_id'])
->findOne();
if(empty($task_subscriber)) $task_subscriber = self::create();
$task_subscriber->task_id = $data['task_id'];
$task_subscriber->subscriber_id = $data['subscriber_id'];
$task_subscriber->processed = self::STATUS_TO_PROCESS;
$task_subscriber->save();
return $task_subscriber;
}
static function addSubscribers($task_id, array $subscriber_ids) {
foreach($subscriber_ids as $subscriber_id) {
self::createOrUpdate(array(
'task_id' => $task_id,
'subscriber_id' => $subscriber_id
));
}
}
static function getToProcessCount($task_id) {
return self::getCount($task_id, false);
}
static function getProcessedCount($task_id) {
return self::getCount($task_id, true);
}
static function getTotalCount($task_id) {
return self::getCount($task_id);
}
private static function getCount($task_id, $processed = null) {
$orm = self::where('task_id', $task_id);
if(!is_null($processed)) {
$orm->where(
'processed',
($processed) ? self::STATUS_PROCESSED : self::STATUS_TO_PROCESS
);
}
return $orm->count();
}
}

View File

@ -0,0 +1,59 @@
<?php
namespace MailPoet\Tasks\Subscribers;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Util\Helpers;
if(!defined('ABSPATH')) exit;
class BatchIterator implements \Iterator, \Countable {
private $task_id;
private $batch_size;
private $offset = 0;
function __construct($task_id, $batch_size) {
if($task_id <= 0) {
throw new \Exception('Task ID must be greater than zero');
} elseif($batch_size <= 0) {
throw new \Exception('Batch size must be greater than zero');
}
$this->task_id = (int)$task_id;
$this->batch_size = (int)$batch_size;
}
function rewind() {
$this->offset = 0;
}
function current() {
$subscribers = $this->getSubscribers()
->orderByAsc('subscriber_id')
->limit($this->batch_size)
->offset($this->offset)
->findArray();
$subscribers = Helpers::arrayColumn($subscribers, 'subscriber_id');
return $subscribers;
}
function key() {
return $this->offset;
}
function next() {
$this->offset += $this->batch_size;
}
function valid() {
return $this->offset < $this->count();
}
function count() {
return $this->getSubscribers()->count();
}
private function getSubscribers() {
return ScheduledTaskSubscriber::select('subscriber_id')
->where('task_id', $this->task_id)
->where('processed', ScheduledTaskSubscriber::STATUS_TO_PROCESS);
}
}

View File

@ -19,6 +19,7 @@ $models = array(
'NewsletterOptionField', 'NewsletterOptionField',
'Segment', 'Segment',
'ScheduledTask', 'ScheduledTask',
'ScheduledTaskSubscriber',
'SendingQueue', 'SendingQueue',
'Setting', 'Setting',
'Subscriber', 'Subscriber',

View File

@ -4,6 +4,7 @@ use Carbon\Carbon;
use MailPoet\Cron\Workers\Bounce; use MailPoet\Cron\Workers\Bounce;
use MailPoet\Mailer\Mailer; use MailPoet\Mailer\Mailer;
use MailPoet\Models\ScheduledTask; use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
use MailPoet\Models\Subscriber; use MailPoet\Models\Subscriber;
use MailPoet\Services\Bridge\API; use MailPoet\Services\Bridge\API;
@ -58,10 +59,10 @@ class BounceTest extends MailPoetTest {
function testItPreparesTask() { function testItPreparesTask() {
$task = $this->createScheduledTask(); $task = $this->createScheduledTask();
expect(empty($task->subscribers['to_process']))->true(); expect(ScheduledTaskSubscriber::getToProcessCount($task->id))->isEmpty();
$this->worker->prepareTask($task); $this->worker->prepareTask($task);
expect($task->status)->null(); expect($task->status)->null();
expect(!empty($task->subscribers['to_process']))->true(); expect(ScheduledTaskSubscriber::getToProcessCount($task->id))->notEmpty();
} }
function testItDeletesTaskIfThereAreNoSubscribersToProcessWhenProcessingTask() { function testItDeletesTaskIfThereAreNoSubscribersToProcessWhenProcessingTask() {
@ -76,9 +77,9 @@ class BounceTest extends MailPoetTest {
function testItProcessesTask() { function testItProcessesTask() {
$task = $this->createRunningTask(); $task = $this->createRunningTask();
$this->worker->prepareTask($task); $this->worker->prepareTask($task);
expect(!empty($task->subscribers['to_process']))->true(); expect(ScheduledTaskSubscriber::getToProcessCount($task->id))->notEmpty();
$this->worker->processTask($task); $this->worker->processTask($task);
expect(!empty($task->subscribers['processed']))->true(); expect(ScheduledTaskSubscriber::getProcessedCount($task->id))->notEmpty();
} }
function testItSetsSubscriberStatusAsBounced() { function testItSetsSubscriberStatusAsBounced() {
@ -125,6 +126,7 @@ class BounceTest extends MailPoetTest {
function _after() { function _after() {
ORM::raw_execute('TRUNCATE ' . Setting::$_table); ORM::raw_execute('TRUNCATE ' . Setting::$_table);
ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table);
ORM::raw_execute('TRUNCATE ' . Subscriber::$_table); ORM::raw_execute('TRUNCATE ' . Subscriber::$_table);
} }
} }