- Implements queue worker class

This commit is contained in:
MrCasual
2015-11-25 15:45:33 -05:00
parent 5ce5f0bf8a
commit 4208b148b4
6 changed files with 119 additions and 14 deletions

View File

@ -55,7 +55,8 @@ class Initializer {
$subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field'; $subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field';
$newsletter_option_fields = Env::$db_prefix . 'newsletter_option_fields'; $newsletter_option_fields = Env::$db_prefix . 'newsletter_option_fields';
$newsletter_option = Env::$db_prefix . 'newsletter_option'; $newsletter_option = Env::$db_prefix . 'newsletter_option';
$queue = Env::$db_prefix . 'queue'; $queues = Env::$db_prefix . 'queues';
$newsletter_statistics = Env::$db_prefix . 'newsletter_statistics';
define('MP_SUBSCRIBERS_TABLE', $subscribers); define('MP_SUBSCRIBERS_TABLE', $subscribers);
define('MP_SETTINGS_TABLE', $settings); define('MP_SETTINGS_TABLE', $settings);
@ -71,7 +72,8 @@ class Initializer {
define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field); define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field);
define('MP_NEWSLETTER_OPTION_FIELDS_TABLE', $newsletter_option_fields); define('MP_NEWSLETTER_OPTION_FIELDS_TABLE', $newsletter_option_fields);
define('MP_NEWSLETTER_OPTION_TABLE', $newsletter_option); define('MP_NEWSLETTER_OPTION_TABLE', $newsletter_option);
define('MP_QUEUE_TABLE', $queue); define('MP_QUEUES_TABLE', $queues);
define('MP_NEWSLETTER_STATISTICS_TABLE', $newsletter_statistics);
} }
function setupActivator() { function setupActivator() {

View File

@ -21,7 +21,8 @@ class Migrator {
'subscriber_custom_field', 'subscriber_custom_field',
'newsletter_option_fields', 'newsletter_option_fields',
'newsletter_option', 'newsletter_option',
'queue', 'queues',
'newsletter_statistics',
'forms' 'forms'
); );
} }
@ -204,13 +205,33 @@ class Migrator {
return $this->sqlify(__FUNCTION__, $attributes); return $this->sqlify(__FUNCTION__, $attributes);
} }
function queue() { function queues() {
$attributes = array( $attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,', 'id mediumint(9) NOT NULL AUTO_INCREMENT,',
'newsletter_id mediumint(9) NOT NULL,', 'newsletter_id mediumint(9) NOT NULL,',
'subscribers longtext,', 'subscribers longtext,',
'total mediumint(9) NOT NULL DEFAULT 0,', 'status varchar(12) NOT NULL,',
'processed mediumint(9) NOT NULL DEFAULT 0,', 'priority mediumint(9) NOT NULL DEFAULT 0,',
'count_total mediumint(9) NOT NULL DEFAULT 0,',
'count_processed mediumint(9) NOT NULL DEFAULT 0,',
'count_to_process mediumint(9) NOT NULL DEFAULT 0,',
'count_failed mediumint(9) NOT NULL DEFAULT 0,',
'processed_at TIMESTAMP NOT NULL DEFAULT 0,',
'created_at TIMESTAMP NOT NULL DEFAULT 0,',
'updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
'deleted_at TIMESTAMP NULL DEFAULT NULL,',
'PRIMARY KEY (id)',
);
return $this->sqlify(__FUNCTION__, $attributes);
}
function newsletter_statistics() {
$attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,',
'newsletter_id mediumint(9) NOT NULL,',
'subscriber_id mediumint(9) NOT NULL,',
'queue_id mediumint(9) NOT NULL,',
'sent_at TIMESTAMP NOT NULL DEFAULT 0,',
'created_at TIMESTAMP NOT NULL DEFAULT 0,', 'created_at TIMESTAMP NOT NULL DEFAULT 0,',
'updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,', 'updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,',
'deleted_at TIMESTAMP NULL DEFAULT NULL,', 'deleted_at TIMESTAMP NULL DEFAULT NULL,',
@ -244,4 +265,4 @@ class Migrator {
return implode("\n", $sql); return implode("\n", $sql);
} }
} }

View File

@ -0,0 +1,12 @@
<?php
namespace MailPoet\Models;
if(!defined('ABSPATH')) exit;
class NewsletterStatistics extends Model {
public static $_table = MP_NEWSLETTER_STATISTICS_TABLE;
function __construct() {
parent::__construct();
}
}

12
lib/Models/Queue.php Normal file
View File

@ -0,0 +1,12 @@
<?php
namespace MailPoet\Models;
if(!defined('ABSPATH')) exit;
class Queue extends Model {
public static $_table = MP_QUEUES_TABLE;
function __construct() {
parent::__construct();
}
}

View File

@ -15,6 +15,7 @@ class Daemon {
list ($this->queue, $this->queueData) = $this->getQueue(); list ($this->queue, $this->queueData) = $this->getQueue();
$this->refreshedToken = $this->refreshToken(); $this->refreshedToken = $this->refreshToken();
$this->payload = $payload; $this->payload = $payload;
$this->timer = microtime(true);
} }
function start() { function start() {
@ -36,8 +37,8 @@ class Daemon {
'status' => 'started', 'status' => 'started',
'token' => $this->refreshedToken, 'token' => $this->refreshedToken,
'counter' => ($queueData['status'] === 'paused') ? 'counter' => ($queueData['status'] === 'paused') ?
$queueData['counter'] $queueData['counter'] :
: 0 0
); );
$_SESSION['queue'] = array('result' => true); $_SESSION['queue'] = array('result' => true);
$this->manageSession('end'); $this->manageSession('end');
@ -63,11 +64,8 @@ class Daemon {
$this->abortWithError('invalid token'); $this->abortWithError('invalid token');
} }
/* $worker = new Worker();
* LOGIC WILL HAPPEN HERE $worker->process();
*
*/
sleep(15);
// after each execution, read queue in case it's status was modified // after each execution, read queue in case it's status was modified
list($queue, $queueData) = $this->getQueue(); list($queue, $queueData) = $this->getQueue();

60
lib/Queue/Worker.php Normal file
View File

@ -0,0 +1,60 @@
<?php
namespace MailPoet\Queue;
use MailPoet\Models\Newsletter;
use MailPoet\Models\NewsletterStatistics;
use MailPoet\Models\Queue;
if(!defined('ABSPATH')) exit;
class Worker {
function __construct($timer = false) {
$this->timer = $timer;
$this->timer = microtime(true);
}
function process() {
$queues = Queue::orderByDesc('priority')
->whereNotIn('status', array(
'paused',
'completed'
))
->findResultSet();
foreach ($queues as $queue) {
$newsletter = Newsletter::findOne($queue->newsletter_id)
->asArray();
$subscribers = json_decode($queue->subscribers, true);
if(!isset($subscribers['failed'])) $subscribers['failed'] = array();
if(!isset($subscribers['processed'])) $subscribers['processed'] = array();
$subscribersToProcess = $subscribers['to_process'];
foreach ($subscribersToProcess as $subscriber) {
if(microtime(true) - $this->timer >= 28) break;
// TODO: remove
sleep(1);
$newsletterStatistics = NewsletterStatistics::create();
$newsletterStatistics->subscriber_id = $subscriber;
$newsletterStatistics->newsletter_id = $newsletter['id'];
$newsletterStatistics->queue_id = $queue->id;
$newsletterStatistics->save();
$subscribers['processed'][] = $subscriber;
$subscribers['to_process'] = array_values(
array_diff(
$subscribers['to_process'],
$subscribers['processed']
)
);
$queue->count_processed = count($subscribers['processed']);
$queue->count_to_process = count($subscribers['to_process']);
$queue->count_failed = count($subscribers['failed']);
$queue->count_total =
$queue->count_processed + $queue->count_to_process + $queue->count_failed;
if(!$queue->count_to_process) {
$queue->processed_at = date('Y-m-d H:i:s');
$queue->status = 'completed';
}
$queue->subscribers = json_encode($subscribers);
$queue->save();
}
}
}
}