Merge pull request #997 from mailpoet/scheduled_task_subscribers
Extract subscribers to a separate table in the Bounce worker [MAILPOET-987]
This commit is contained in:
@@ -70,6 +70,7 @@ class Database {
|
|||||||
$subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field';
|
$subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field';
|
||||||
$newsletter_segment = Env::$db_prefix . 'newsletter_segment';
|
$newsletter_segment = Env::$db_prefix . 'newsletter_segment';
|
||||||
$scheduled_tasks = Env::$db_prefix . 'scheduled_tasks';
|
$scheduled_tasks = Env::$db_prefix . 'scheduled_tasks';
|
||||||
|
$scheduled_task_subscribers = Env::$db_prefix . 'scheduled_task_subscribers';
|
||||||
$sending_queues = Env::$db_prefix . 'sending_queues';
|
$sending_queues = Env::$db_prefix . 'sending_queues';
|
||||||
$newsletters = Env::$db_prefix . 'newsletters';
|
$newsletters = Env::$db_prefix . 'newsletters';
|
||||||
$newsletter_templates = Env::$db_prefix . 'newsletter_templates';
|
$newsletter_templates = Env::$db_prefix . 'newsletter_templates';
|
||||||
@@ -92,6 +93,7 @@ class Database {
|
|||||||
define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment);
|
define('MP_SUBSCRIBER_SEGMENT_TABLE', $subscriber_segment);
|
||||||
define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field);
|
define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field);
|
||||||
define('MP_SCHEDULED_TASKS_TABLE', $scheduled_tasks);
|
define('MP_SCHEDULED_TASKS_TABLE', $scheduled_tasks);
|
||||||
|
define('MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE', $scheduled_task_subscribers);
|
||||||
define('MP_SENDING_QUEUES_TABLE', $sending_queues);
|
define('MP_SENDING_QUEUES_TABLE', $sending_queues);
|
||||||
define('MP_NEWSLETTERS_TABLE', $newsletters);
|
define('MP_NEWSLETTERS_TABLE', $newsletters);
|
||||||
define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates);
|
define('MP_NEWSLETTER_TEMPLATES_TABLE', $newsletter_templates);
|
||||||
|
@@ -18,6 +18,7 @@ class Migrator {
|
|||||||
'settings',
|
'settings',
|
||||||
'custom_fields',
|
'custom_fields',
|
||||||
'scheduled_tasks',
|
'scheduled_tasks',
|
||||||
|
'scheduled_task_subscribers',
|
||||||
'sending_queues',
|
'sending_queues',
|
||||||
'subscribers',
|
'subscribers',
|
||||||
'subscriber_segment',
|
'subscriber_segment',
|
||||||
@@ -109,12 +110,8 @@ class Migrator {
|
|||||||
$attributes = array(
|
$attributes = array(
|
||||||
'id mediumint(9) NOT NULL AUTO_INCREMENT,',
|
'id mediumint(9) NOT NULL AUTO_INCREMENT,',
|
||||||
'type varchar(90) NULL DEFAULT NULL,',
|
'type varchar(90) NULL DEFAULT NULL,',
|
||||||
'subscribers longtext,',
|
|
||||||
'status varchar(12) NULL DEFAULT NULL,',
|
'status varchar(12) NULL DEFAULT NULL,',
|
||||||
'priority mediumint(9) NOT NULL DEFAULT 0,',
|
'priority mediumint(9) NOT NULL DEFAULT 0,',
|
||||||
'count_total mediumint(9) NOT NULL DEFAULT 0,',
|
|
||||||
'count_processed mediumint(9) NOT NULL DEFAULT 0,',
|
|
||||||
'count_to_process mediumint(9) NOT NULL DEFAULT 0,',
|
|
||||||
'scheduled_at TIMESTAMP NULL,',
|
'scheduled_at TIMESTAMP NULL,',
|
||||||
'processed_at TIMESTAMP NULL,',
|
'processed_at TIMESTAMP NULL,',
|
||||||
'created_at TIMESTAMP NULL,',
|
'created_at TIMESTAMP NULL,',
|
||||||
@@ -125,6 +122,17 @@ class Migrator {
|
|||||||
return $this->sqlify(__FUNCTION__, $attributes);
|
return $this->sqlify(__FUNCTION__, $attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function scheduledTaskSubscribers() {
|
||||||
|
$attributes = array(
|
||||||
|
'task_id mediumint(9) NOT NULL,',
|
||||||
|
'subscriber_id mediumint(9) NOT NULL,',
|
||||||
|
'processed int(1) NOT NULL,',
|
||||||
|
'created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,',
|
||||||
|
'PRIMARY KEY (task_id, subscriber_id)',
|
||||||
|
);
|
||||||
|
return $this->sqlify(__FUNCTION__, $attributes);
|
||||||
|
}
|
||||||
|
|
||||||
function sendingQueues() {
|
function sendingQueues() {
|
||||||
$attributes = array(
|
$attributes = array(
|
||||||
'id mediumint(9) NOT NULL AUTO_INCREMENT,',
|
'id mediumint(9) NOT NULL AUTO_INCREMENT,',
|
||||||
|
@@ -4,6 +4,10 @@ namespace MailPoet\Cron\Workers;
|
|||||||
use MailPoet\Cron\CronHelper;
|
use MailPoet\Cron\CronHelper;
|
||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Tasks\Bounce as BounceTask;
|
||||||
|
use MailPoet\Tasks\Subscribers as TaskSubscribers;
|
||||||
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
||||||
use MailPoet\Models\Subscriber;
|
use MailPoet\Models\Subscriber;
|
||||||
use MailPoet\Services\Bridge;
|
use MailPoet\Services\Bridge;
|
||||||
use MailPoet\Services\Bridge\API;
|
use MailPoet\Services\Bridge\API;
|
||||||
@@ -33,42 +37,25 @@ class Bounce extends SimpleWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function prepareTask(ScheduledTask $task) {
|
function prepareTask(ScheduledTask $task) {
|
||||||
$subscribers = Subscriber::select('id')
|
BounceTask::prepareSubscribers($task);
|
||||||
->whereNull('deleted_at')
|
|
||||||
->whereIn('status', array(
|
|
||||||
Subscriber::STATUS_SUBSCRIBED,
|
|
||||||
Subscriber::STATUS_UNCONFIRMED
|
|
||||||
))
|
|
||||||
->findArray();
|
|
||||||
$subscribers = Helpers::arrayColumn($subscribers, 'id');
|
|
||||||
|
|
||||||
if(empty($subscribers)) {
|
if(!ScheduledTaskSubscriber::getUnprocessedCount($task->id)) {
|
||||||
$task->delete();
|
$task->delete();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update current task
|
|
||||||
$task->subscribers = serialize(
|
|
||||||
array(
|
|
||||||
'to_process' => $subscribers
|
|
||||||
)
|
|
||||||
);
|
|
||||||
$task->count_total = $task->count_to_process = count($subscribers);
|
|
||||||
|
|
||||||
return parent::prepareTask($task);
|
return parent::prepareTask($task);
|
||||||
}
|
}
|
||||||
|
|
||||||
function processTask(ScheduledTask $task) {
|
function processTask(ScheduledTask $task) {
|
||||||
$task->subscribers = $task->getSubscribers();
|
$subscriber_batches = new BatchIterator($task->id, self::BATCH_SIZE);
|
||||||
if(empty($task->subscribers['to_process'])) {
|
|
||||||
|
if(count($subscriber_batches) === 0) {
|
||||||
$task->delete();
|
$task->delete();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$subscriber_batches = array_chunk(
|
$task_subscribers = new TaskSubscribers($task);
|
||||||
$task->subscribers['to_process'],
|
|
||||||
self::BATCH_SIZE
|
|
||||||
);
|
|
||||||
|
|
||||||
foreach($subscriber_batches as $subscribers_to_process_ids) {
|
foreach($subscriber_batches as $subscribers_to_process_ids) {
|
||||||
// abort if execution limit is reached
|
// abort if execution limit is reached
|
||||||
@@ -82,7 +69,7 @@ class Bounce extends SimpleWorker {
|
|||||||
|
|
||||||
$this->processEmails($subscriber_emails);
|
$this->processEmails($subscriber_emails);
|
||||||
|
|
||||||
$task->updateProcessedSubscribers($subscribers_to_process_ids);
|
$task_subscribers->updateProcessedSubscribers($subscribers_to_process_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@@ -64,11 +64,11 @@ class SendingQueue {
|
|||||||
}
|
}
|
||||||
// if some subscribers weren't found, remove them from the processing list
|
// if some subscribers weren't found, remove them from the processing list
|
||||||
if(count($found_subscribers_ids) !== count($subscribers_to_process_ids)) {
|
if(count($found_subscribers_ids) !== count($subscribers_to_process_ids)) {
|
||||||
$subscibers_to_remove = array_diff(
|
$subscribers_to_remove = array_diff(
|
||||||
$subscribers_to_process_ids,
|
$subscribers_to_process_ids,
|
||||||
$found_subscribers_ids
|
$found_subscribers_ids
|
||||||
);
|
);
|
||||||
$queue->removeNonexistentSubscribers($subscibers_to_remove);
|
$queue->removeSubscribers($subscribers_to_remove);
|
||||||
if(!count($queue->subscribers['to_process'])) {
|
if(!count($queue->subscribers['to_process'])) {
|
||||||
$this->newsletter_task->markNewsletterAsSent($newsletter, $queue);
|
$this->newsletter_task->markNewsletterAsSent($newsletter, $queue);
|
||||||
continue;
|
continue;
|
||||||
|
@@ -12,85 +12,18 @@ class ScheduledTask extends Model {
|
|||||||
const PRIORITY_LOW = 10;
|
const PRIORITY_LOW = 10;
|
||||||
|
|
||||||
function complete() {
|
function complete() {
|
||||||
|
$this->processed_at = current_time('mysql');
|
||||||
$this->set('status', self::STATUS_COMPLETED);
|
$this->set('status', self::STATUS_COMPLETED);
|
||||||
$this->save();
|
$this->save();
|
||||||
return ($this->getErrors() === false && $this->id() > 0);
|
return ($this->getErrors() === false && $this->id() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
function save() {
|
function save() {
|
||||||
if(!is_serialized($this->subscribers)) {
|
|
||||||
$this->set('subscribers', serialize($this->subscribers));
|
|
||||||
}
|
|
||||||
// set the default priority to medium
|
// set the default priority to medium
|
||||||
if(!$this->priority) {
|
if(!$this->priority) {
|
||||||
$this->priority = self::PRIORITY_MEDIUM;
|
$this->priority = self::PRIORITY_MEDIUM;
|
||||||
}
|
}
|
||||||
parent::save();
|
parent::save();
|
||||||
$this->subscribers = $this->getSubscribers();
|
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
function getSubscribers() {
|
|
||||||
if(!is_serialized($this->subscribers)) {
|
|
||||||
return $this->subscribers;
|
|
||||||
}
|
|
||||||
$subscribers = unserialize($this->subscribers);
|
|
||||||
if(empty($subscribers['processed'])) {
|
|
||||||
$subscribers['processed'] = array();
|
|
||||||
}
|
|
||||||
return $subscribers;
|
|
||||||
}
|
|
||||||
|
|
||||||
function isSubscriberProcessed($subscriber_id) {
|
|
||||||
$subscribers = $this->getSubscribers();
|
|
||||||
return in_array($subscriber_id, $subscribers['processed']);
|
|
||||||
}
|
|
||||||
|
|
||||||
function asArray() {
|
|
||||||
$model = parent::asArray();
|
|
||||||
$model['subscribers'] = (is_serialized($this->subscribers))
|
|
||||||
? unserialize($this->subscribers)
|
|
||||||
: $this->subscribers;
|
|
||||||
return $model;
|
|
||||||
}
|
|
||||||
|
|
||||||
function removeNonexistentSubscribers($subscribers_to_remove) {
|
|
||||||
$subscribers = $this->getSubscribers();
|
|
||||||
$subscribers['to_process'] = array_values(
|
|
||||||
array_diff(
|
|
||||||
$subscribers['to_process'],
|
|
||||||
$subscribers_to_remove
|
|
||||||
)
|
|
||||||
);
|
|
||||||
$this->subscribers = $subscribers;
|
|
||||||
$this->updateCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
function updateProcessedSubscribers($processed_subscribers) {
|
|
||||||
$subscribers = $this->getSubscribers();
|
|
||||||
$subscribers['processed'] = array_merge(
|
|
||||||
$subscribers['processed'],
|
|
||||||
$processed_subscribers
|
|
||||||
);
|
|
||||||
$subscribers['to_process'] = array_values(
|
|
||||||
array_diff(
|
|
||||||
$subscribers['to_process'],
|
|
||||||
$processed_subscribers
|
|
||||||
)
|
|
||||||
);
|
|
||||||
$this->subscribers = $subscribers;
|
|
||||||
$this->updateCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
function updateCount() {
|
|
||||||
$this->subscribers = $this->getSubscribers();
|
|
||||||
$this->count_processed = count($this->subscribers['processed']);
|
|
||||||
$this->count_to_process = count($this->subscribers['to_process']);
|
|
||||||
$this->count_total = $this->count_processed + $this->count_to_process;
|
|
||||||
if(!$this->count_to_process) {
|
|
||||||
$this->processed_at = current_time('mysql');
|
|
||||||
$this->status = self::STATUS_COMPLETED;
|
|
||||||
}
|
|
||||||
return $this->save();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
58
lib/Models/ScheduledTaskSubscriber.php
Normal file
58
lib/Models/ScheduledTaskSubscriber.php
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Models;
|
||||||
|
|
||||||
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
|
class ScheduledTaskSubscriber extends Model {
|
||||||
|
const STATUS_UNPROCESSED = 0;
|
||||||
|
const STATUS_PROCESSED = 1;
|
||||||
|
|
||||||
|
public static $_table = MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE;
|
||||||
|
public static $_id_column = array('task_id', 'subscriber_id');
|
||||||
|
|
||||||
|
static function createOrUpdate($data = array()) {
|
||||||
|
if(!is_array($data) || empty($data['task_id']) || empty($data['subscriber_id'])) return;
|
||||||
|
|
||||||
|
$task_subscriber = self::where('subscriber_id', $data['subscriber_id'])
|
||||||
|
->where('task_id', $data['task_id'])
|
||||||
|
->findOne();
|
||||||
|
|
||||||
|
if(empty($task_subscriber)) $task_subscriber = self::create();
|
||||||
|
|
||||||
|
$task_subscriber->task_id = $data['task_id'];
|
||||||
|
$task_subscriber->subscriber_id = $data['subscriber_id'];
|
||||||
|
$task_subscriber->processed = !empty($data['processed']) ? self::STATUS_PROCESSED : self::STATUS_UNPROCESSED;
|
||||||
|
$task_subscriber->save();
|
||||||
|
|
||||||
|
return $task_subscriber;
|
||||||
|
}
|
||||||
|
|
||||||
|
static function addSubscribers($task_id, array $subscriber_ids) {
|
||||||
|
foreach($subscriber_ids as $subscriber_id) {
|
||||||
|
self::createOrUpdate(array(
|
||||||
|
'task_id' => $task_id,
|
||||||
|
'subscriber_id' => $subscriber_id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static function getUnprocessedCount($task_id) {
|
||||||
|
return self::getCount($task_id, self::STATUS_UNPROCESSED);
|
||||||
|
}
|
||||||
|
|
||||||
|
static function getProcessedCount($task_id) {
|
||||||
|
return self::getCount($task_id, self::STATUS_PROCESSED);
|
||||||
|
}
|
||||||
|
|
||||||
|
static function getTotalCount($task_id) {
|
||||||
|
return self::getCount($task_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static function getCount($task_id, $processed = null) {
|
||||||
|
$orm = self::where('task_id', $task_id);
|
||||||
|
if(!is_null($processed)) {
|
||||||
|
$orm->where('processed', $processed);
|
||||||
|
}
|
||||||
|
return $orm->count();
|
||||||
|
}
|
||||||
|
}
|
@@ -92,7 +92,7 @@ class SendingQueue extends Model {
|
|||||||
return $model;
|
return $model;
|
||||||
}
|
}
|
||||||
|
|
||||||
function removeNonexistentSubscribers($subscribers_to_remove) {
|
function removeSubscribers($subscribers_to_remove) {
|
||||||
$subscribers = $this->getSubscribers();
|
$subscribers = $this->getSubscribers();
|
||||||
$subscribers['to_process'] = array_values(
|
$subscribers['to_process'] = array_values(
|
||||||
array_diff(
|
array_diff(
|
||||||
|
28
lib/Tasks/Bounce.php
Normal file
28
lib/Tasks/Bounce.php
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Tasks;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Models\Subscriber;
|
||||||
|
|
||||||
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
|
class Bounce {
|
||||||
|
static function prepareSubscribers(ScheduledTask $task) {
|
||||||
|
// Prepare subscribers on the DB side for performance reasons
|
||||||
|
Subscriber::rawExecute(
|
||||||
|
'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
|
||||||
|
(task_id, subscriber_id, processed)
|
||||||
|
SELECT ? as task_id, s.`id` as subscriber_id, ? as processed
|
||||||
|
FROM ' . MP_SUBSCRIBERS_TABLE . ' s
|
||||||
|
WHERE s.`deleted_at` IS NULL
|
||||||
|
AND s.`status` IN (?, ?)',
|
||||||
|
array(
|
||||||
|
$task->id,
|
||||||
|
ScheduledTaskSubscriber::STATUS_UNPROCESSED,
|
||||||
|
Subscriber::STATUS_SUBSCRIBED,
|
||||||
|
Subscriber::STATUS_UNCONFIRMED
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
48
lib/Tasks/Subscribers.php
Normal file
48
lib/Tasks/Subscribers.php
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Tasks;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
|
||||||
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
|
class Subscribers {
|
||||||
|
private $task;
|
||||||
|
|
||||||
|
public function __construct(ScheduledTask $task) {
|
||||||
|
$this->task = $task;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getSubscribers() {
|
||||||
|
return ScheduledTaskSubscriber::where('task_id', $this->task->id);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isSubscriberProcessed($subscriber_id) {
|
||||||
|
$subscriber = $this->getSubscribers()
|
||||||
|
->where('subscriber_id', $subscriber_id)
|
||||||
|
->findOne();
|
||||||
|
return !empty($subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeSubscribers($subscribers_to_remove) {
|
||||||
|
$this->getSubscribers()
|
||||||
|
->whereIn('subscriber_id', $subscribers_to_remove)
|
||||||
|
->deleteMany();
|
||||||
|
$this->checkCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateProcessedSubscribers(array $processed_subscribers) {
|
||||||
|
$this->getSubscribers()
|
||||||
|
->whereIn('subscriber_id', $processed_subscribers)
|
||||||
|
->findResultSet()
|
||||||
|
->set('processed', ScheduledTaskSubscriber::STATUS_PROCESSED)
|
||||||
|
->save();
|
||||||
|
$this->checkCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function checkCompleted() {
|
||||||
|
if(!ScheduledTaskSubscriber::getUnprocessedCount($this->task->id)) {
|
||||||
|
$this->task->complete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
61
lib/Tasks/Subscribers/BatchIterator.php
Normal file
61
lib/Tasks/Subscribers/BatchIterator.php
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Tasks\Subscribers;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Util\Helpers;
|
||||||
|
|
||||||
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
|
class BatchIterator implements \Iterator, \Countable {
|
||||||
|
private $task_id;
|
||||||
|
private $batch_size;
|
||||||
|
private $last_processed_id = 0;
|
||||||
|
private $batch_last_id;
|
||||||
|
|
||||||
|
function __construct($task_id, $batch_size) {
|
||||||
|
if($task_id <= 0) {
|
||||||
|
throw new \Exception('Task ID must be greater than zero');
|
||||||
|
} elseif($batch_size <= 0) {
|
||||||
|
throw new \Exception('Batch size must be greater than zero');
|
||||||
|
}
|
||||||
|
$this->task_id = (int)$task_id;
|
||||||
|
$this->batch_size = (int)$batch_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
function rewind() {
|
||||||
|
$this->last_processed_id = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
function current() {
|
||||||
|
$subscribers = $this->getSubscribers()
|
||||||
|
->orderByAsc('subscriber_id')
|
||||||
|
->limit($this->batch_size)
|
||||||
|
->findArray();
|
||||||
|
$subscribers = Helpers::arrayColumn($subscribers, 'subscriber_id');
|
||||||
|
$this->batch_last_id = end($subscribers);
|
||||||
|
return $subscribers;
|
||||||
|
}
|
||||||
|
|
||||||
|
function key() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function next() {
|
||||||
|
$this->last_processed_id = $this->batch_last_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
function valid() {
|
||||||
|
return $this->count() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
function count() {
|
||||||
|
return $this->getSubscribers()->count();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getSubscribers() {
|
||||||
|
return ScheduledTaskSubscriber::select('subscriber_id')
|
||||||
|
->where('task_id', $this->task_id)
|
||||||
|
->whereGt('subscriber_id', $this->last_processed_id)
|
||||||
|
->where('processed', ScheduledTaskSubscriber::STATUS_UNPROCESSED);
|
||||||
|
}
|
||||||
|
}
|
@@ -19,6 +19,7 @@ $models = array(
|
|||||||
'NewsletterOptionField',
|
'NewsletterOptionField',
|
||||||
'Segment',
|
'Segment',
|
||||||
'ScheduledTask',
|
'ScheduledTask',
|
||||||
|
'ScheduledTaskSubscriber',
|
||||||
'SendingQueue',
|
'SendingQueue',
|
||||||
'Setting',
|
'Setting',
|
||||||
'Subscriber',
|
'Subscriber',
|
||||||
|
@@ -38,6 +38,6 @@ class PopulatorTest extends MailPoetTest {
|
|||||||
function _after() {
|
function _after() {
|
||||||
ORM::raw_execute('TRUNCATE ' . Newsletter::$_table);
|
ORM::raw_execute('TRUNCATE ' . Newsletter::$_table);
|
||||||
ORM::raw_execute('TRUNCATE ' . SendingQueue::$_table);
|
ORM::raw_execute('TRUNCATE ' . SendingQueue::$_table);
|
||||||
update_option('mailpoet_db_version', Env::$version);
|
update_option('mailpoet_db_version', MAILPOET_VERSION);
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -4,6 +4,7 @@ use Carbon\Carbon;
|
|||||||
use MailPoet\Cron\Workers\Bounce;
|
use MailPoet\Cron\Workers\Bounce;
|
||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
use MailPoet\Models\Setting;
|
use MailPoet\Models\Setting;
|
||||||
use MailPoet\Models\Subscriber;
|
use MailPoet\Models\Subscriber;
|
||||||
use MailPoet\Services\Bridge\API;
|
use MailPoet\Services\Bridge\API;
|
||||||
@@ -58,10 +59,10 @@ class BounceTest extends MailPoetTest {
|
|||||||
|
|
||||||
function testItPreparesTask() {
|
function testItPreparesTask() {
|
||||||
$task = $this->createScheduledTask();
|
$task = $this->createScheduledTask();
|
||||||
expect(empty($task->subscribers['to_process']))->true();
|
expect(ScheduledTaskSubscriber::getUnprocessedCount($task->id))->isEmpty();
|
||||||
$this->worker->prepareTask($task);
|
$this->worker->prepareTask($task);
|
||||||
expect($task->status)->null();
|
expect($task->status)->null();
|
||||||
expect(!empty($task->subscribers['to_process']))->true();
|
expect(ScheduledTaskSubscriber::getUnprocessedCount($task->id))->notEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
function testItDeletesTaskIfThereAreNoSubscribersToProcessWhenProcessingTask() {
|
function testItDeletesTaskIfThereAreNoSubscribersToProcessWhenProcessingTask() {
|
||||||
@@ -76,9 +77,9 @@ class BounceTest extends MailPoetTest {
|
|||||||
function testItProcessesTask() {
|
function testItProcessesTask() {
|
||||||
$task = $this->createRunningTask();
|
$task = $this->createRunningTask();
|
||||||
$this->worker->prepareTask($task);
|
$this->worker->prepareTask($task);
|
||||||
expect(!empty($task->subscribers['to_process']))->true();
|
expect(ScheduledTaskSubscriber::getUnprocessedCount($task->id))->notEmpty();
|
||||||
$this->worker->processTask($task);
|
$this->worker->processTask($task);
|
||||||
expect(!empty($task->subscribers['processed']))->true();
|
expect(ScheduledTaskSubscriber::getProcessedCount($task->id))->notEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
function testItSetsSubscriberStatusAsBounced() {
|
function testItSetsSubscriberStatusAsBounced() {
|
||||||
@@ -125,6 +126,7 @@ class BounceTest extends MailPoetTest {
|
|||||||
function _after() {
|
function _after() {
|
||||||
ORM::raw_execute('TRUNCATE ' . Setting::$_table);
|
ORM::raw_execute('TRUNCATE ' . Setting::$_table);
|
||||||
ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
|
ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
|
||||||
|
ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table);
|
||||||
ORM::raw_execute('TRUNCATE ' . Subscriber::$_table);
|
ORM::raw_execute('TRUNCATE ' . Subscriber::$_table);
|
||||||
}
|
}
|
||||||
}
|
}
|
88
tests/unit/Models/ScheduledTaskSubscriberTest.php
Normal file
88
tests/unit/Models/ScheduledTaskSubscriberTest.php
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Test\Models;
|
||||||
|
|
||||||
|
use Codeception\Util\Fixtures;
|
||||||
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Models\Subscriber;
|
||||||
|
|
||||||
|
class ScheduledTaskSubscriberTest extends \MailPoetTest {
|
||||||
|
function _before() {
|
||||||
|
$task = ScheduledTask::create();
|
||||||
|
$task->hydrate(array(
|
||||||
|
'status' => ScheduledTask::STATUS_SCHEDULED
|
||||||
|
));
|
||||||
|
$this->task = $task->save();
|
||||||
|
|
||||||
|
$subscriber = Subscriber::create();
|
||||||
|
$subscriber->hydrate(Fixtures::get('subscriber_template'));
|
||||||
|
$this->subscriber = $subscriber->save();
|
||||||
|
|
||||||
|
$this->task_subscriber = ScheduledTaskSubscriber::createOrUpdate(array(
|
||||||
|
'task_id' => $this->task->id,
|
||||||
|
'subscriber_id' => $this->subscriber->id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanBeCreated() {
|
||||||
|
expect($this->task_subscriber->task_id)->equals($this->task->id);
|
||||||
|
expect($this->task_subscriber->subscriber_id)->equals($this->subscriber->id);
|
||||||
|
expect($this->task_subscriber->processed)->equals(ScheduledTaskSubscriber::STATUS_UNPROCESSED);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanBeUpdated() {
|
||||||
|
$task_subscriber = ScheduledTaskSubscriber::createOrUpdate(array(
|
||||||
|
'task_id' => $this->task->id,
|
||||||
|
'subscriber_id' => $this->subscriber->id,
|
||||||
|
'processed' => ScheduledTaskSubscriber::STATUS_PROCESSED
|
||||||
|
));
|
||||||
|
expect($task_subscriber->processed)->equals(ScheduledTaskSubscriber::STATUS_PROCESSED);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanAddMultipleSubscribers() {
|
||||||
|
ScheduledTaskSubscriber::deleteMany();
|
||||||
|
$subscriber_ids = array(321, 654, 987); // sorted random ids
|
||||||
|
ScheduledTaskSubscriber::addSubscribers($this->task->id, $subscriber_ids);
|
||||||
|
$task_subscribers = ScheduledTaskSubscriber::where('task_id', $this->task->id)
|
||||||
|
->orderByAsc('subscriber_id')
|
||||||
|
->findMany();
|
||||||
|
expect(count($task_subscribers))->equals(count($subscriber_ids));
|
||||||
|
expect($task_subscribers[0]->subscriber_id)->equals($subscriber_ids[0]);
|
||||||
|
expect($task_subscribers[1]->subscriber_id)->equals($subscriber_ids[1]);
|
||||||
|
expect($task_subscribers[2]->subscriber_id)->equals($subscriber_ids[2]);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCangetUnprocessedCount() {
|
||||||
|
$count = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id);
|
||||||
|
expect($count)->equals(1);
|
||||||
|
$this->task_subscriber->processed = ScheduledTaskSubscriber::STATUS_PROCESSED;
|
||||||
|
$this->task_subscriber->save();
|
||||||
|
$count = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id);
|
||||||
|
expect($count)->equals(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanGetProcessedCount() {
|
||||||
|
$count = ScheduledTaskSubscriber::getProcessedCount($this->task->id);
|
||||||
|
expect($count)->equals(0);
|
||||||
|
$this->task_subscriber->processed = ScheduledTaskSubscriber::STATUS_PROCESSED;
|
||||||
|
$this->task_subscriber->save();
|
||||||
|
$count = ScheduledTaskSubscriber::getProcessedCount($this->task->id);
|
||||||
|
expect($count)->equals(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanGetTotalCount() {
|
||||||
|
ScheduledTaskSubscriber::createOrUpdate(array(
|
||||||
|
'task_id' => $this->task->id,
|
||||||
|
'subscriber_id' => 555, // random new ID
|
||||||
|
'processed' => ScheduledTaskSubscriber::STATUS_PROCESSED
|
||||||
|
));
|
||||||
|
$count = ScheduledTaskSubscriber::getTotalCount($this->task->id);
|
||||||
|
expect($count)->equals(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
function _after() {
|
||||||
|
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
|
||||||
|
\ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table);
|
||||||
|
\ORM::raw_execute('TRUNCATE ' . Subscriber::$_table);
|
||||||
|
}
|
||||||
|
}
|
27
tests/unit/Models/ScheduledTaskTest.php
Normal file
27
tests/unit/Models/ScheduledTaskTest.php
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Test\Models;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
|
||||||
|
class ScheduledTaskTest extends \MailPoetTest {
|
||||||
|
function _before() {
|
||||||
|
$this->task = ScheduledTask::create();
|
||||||
|
$this->task->hydrate(array(
|
||||||
|
'status' => ScheduledTask::STATUS_SCHEDULED
|
||||||
|
));
|
||||||
|
$this->task->save();
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanBeCompleted() {
|
||||||
|
$this->task->complete();
|
||||||
|
expect($this->task->status)->equals(ScheduledTask::STATUS_COMPLETED);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItSetsDefaultPriority() {
|
||||||
|
expect($this->task->priority)->equals(ScheduledTask::PRIORITY_MEDIUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
function _after() {
|
||||||
|
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
|
||||||
|
}
|
||||||
|
}
|
64
tests/unit/Tasks/Subscribers/BatchIteratorTest.php
Normal file
64
tests/unit/Tasks/Subscribers/BatchIteratorTest.php
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Test\Tasks\Subscribers;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
||||||
|
|
||||||
|
class BatchIteratorTest extends \MailPoetTest {
|
||||||
|
function _before() {
|
||||||
|
$this->task_id = 123; // random ID
|
||||||
|
$this->batch_size = 2;
|
||||||
|
$this->subscriber_count = 10;
|
||||||
|
for($i = 0; $i < $this->subscriber_count; $i++) {
|
||||||
|
ScheduledTaskSubscriber::createOrUpdate(array(
|
||||||
|
'task_id' => $this->task_id,
|
||||||
|
'subscriber_id' => $i + 1,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
$this->iterator = new BatchIterator($this->task_id, $this->batch_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItFailsToConstructWithWrongArguments() {
|
||||||
|
try {
|
||||||
|
$iterator = new BatchIterator(0, 0);
|
||||||
|
$this->fail('Exception was not thrown');
|
||||||
|
} catch(\Exception $e) {
|
||||||
|
// No exception handling necessary
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItConstructs() {
|
||||||
|
$iterator = new BatchIterator(123, 456); // random IDs
|
||||||
|
expect_that($iterator instanceof BatchIterator);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItIterates() {
|
||||||
|
$iterations = ceil($this->subscriber_count / $this->batch_size);
|
||||||
|
$i = 0;
|
||||||
|
foreach($this->iterator as $batch) {
|
||||||
|
$i++;
|
||||||
|
|
||||||
|
// process subscribers
|
||||||
|
ScheduledTaskSubscriber::where('task_id', $this->task_id)
|
||||||
|
->whereIn('subscriber_id', $batch)
|
||||||
|
->findResultSet()
|
||||||
|
->set('processed', ScheduledTaskSubscriber::STATUS_PROCESSED)
|
||||||
|
->save();
|
||||||
|
|
||||||
|
if($i < $iterations) {
|
||||||
|
expect(count($batch))->equals($this->batch_size);
|
||||||
|
} else {
|
||||||
|
expect(count($batch))->lessOrEquals($this->batch_size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expect($i)->equals($iterations);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItCanBeCounted() {
|
||||||
|
expect(count($this->iterator))->equals($this->subscriber_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
function _after() {
|
||||||
|
\ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table);
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user