- Updates queue worker based on code review comments

This commit is contained in:
Vlad
2016-02-11 10:23:41 -05:00
parent 7522084ccb
commit 9b584296a5
4 changed files with 22 additions and 16 deletions

View File

@@ -7,8 +7,9 @@ use MailPoet\Util\Security;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
class CronHelper { class CronHelper {
static $daemon_execution_limit = 20; const daemon_execution_limit = 20;
static $daemon_execution_timeout = 35; const daemon_execution_timeout = 35;
const daemon_request_timeout = 2;
static function createDaemon($token) { static function createDaemon($token) {
$daemon = array( $daemon = array(
@@ -36,7 +37,7 @@ class CronHelper {
return Security::generateRandomString(); return Security::generateRandomString();
} }
static function accessDaemon($token, $timeout = 2) { static function accessDaemon($token, $timeout = self::daemon_request_timeout) {
$payload = serialize(array('token' => $token)); $payload = serialize(array('token' => $token));
$url = '/?mailpoet-api&section=queue&action=run&request_payload=' . $url = '/?mailpoet-api&section=queue&action=run&request_payload=' .
base64_encode($payload); base64_encode($payload);

View File

@@ -11,7 +11,7 @@ class Daemon {
public $daemon; public $daemon;
public $request_payload; public $request_payload;
public $refreshed_token; public $refreshed_token;
private $daemon_request_timeout = 3; const daemon_request_timeout = 5;
private $timer; private $timer;
function __construct($request_payload = array()) { function __construct($request_payload = array()) {
@@ -40,8 +40,8 @@ class Daemon {
} catch(Exception $e) { } catch(Exception $e) {
} }
$elapsed_time = microtime(true) - $this->timer; $elapsed_time = microtime(true) - $this->timer;
if($elapsed_time < CronHelper::$daemon_execution_limit) { if($elapsed_time < CronHelper::daemon_execution_limit) {
sleep(CronHelper::$daemon_execution_limit - $elapsed_time); sleep(CronHelper::daemon_execution_limit - $elapsed_time);
} }
// after each execution, re-read daemon data in case it was deleted or // after each execution, re-read daemon data in case it was deleted or
// its status has changed // its status has changed
@@ -73,7 +73,7 @@ class Daemon {
} }
function callSelf() { function callSelf() {
CronHelper::accessDaemon($this->token, $this->daemon_request_timeout); CronHelper::accessDaemon($this->token, self::daemon_request_timeout);
exit; exit;
} }
} }

View File

@@ -31,13 +31,13 @@ class Supervisor {
$elapsed_time = time() - (int) $daemon['updated_at']; $elapsed_time = time() - (int) $daemon['updated_at'];
// if it's been less than 40 seconds since last execution and we're not // if it's been less than 40 seconds since last execution and we're not
// force-running the daemon, return its status and do nothing // force-running the daemon, return its status and do nothing
if($elapsed_time < CronHelper::$daemon_execution_timeout && !$this->force_run) { if($elapsed_time < CronHelper::daemon_execution_timeout && !$this->force_run) {
return $this->formatDaemonStatusMessage($daemon['status']); return $this->formatDaemonStatusMessage($daemon['status']);
} }
// if it's been less than 40 seconds since last execution, we are // if it's been less than 40 seconds since last execution, we are
// force-running the daemon and it's either being started or stopped, // force-running the daemon and it's either being started or stopped,
// return its status and do nothing // return its status and do nothing
elseif($elapsed_time < CronHelper::$daemon_execution_timeout && elseif($elapsed_time < CronHelper::daemon_execution_timeout &&
$this->force_run && $this->force_run &&
in_array($daemon['status'], array( in_array($daemon['status'], array(
'stopping', 'stopping',

View File

@@ -16,11 +16,16 @@ if(!defined('ABSPATH')) exit;
class SendingQueue { class SendingQueue {
public $mailer_config; public $mailer_config;
public $mailer_log; public $mailer_log;
public $processing_method;
private $timer; private $timer;
const batch_size = 50;
function __construct($timer = false) { function __construct($timer = false) {
$this->mailer_config = $this->getMailerConfig(); $this->mailer_config = $this->getMailerConfig();
$this->mailer_log = $this->getMailerLog(); $this->mailer_log = $this->getMailerLog();
$this->processing_method = ($this->mailer_config['method'] === 'MailPoet') ?
'processBulkSubscribers' :
'processIndividualSubscriber';
$this->timer = ($timer) ? $timer : microtime(true); $this->timer = ($timer) ? $timer : microtime(true);
} }
@@ -40,17 +45,14 @@ class SendingQueue {
$newsletter = $newsletter->asArray(); $newsletter = $newsletter->asArray();
$newsletter['body'] = $this->renderNewsletter($newsletter); $newsletter['body'] = $this->renderNewsletter($newsletter);
$mailer = $this->configureMailer($newsletter); $mailer = $this->configureMailer($newsletter);
foreach(array_chunk($queue->subscribers->to_process, 5) as foreach(array_chunk($queue->subscribers->to_process, self::batch_size) as
$subscribers_ids) { $subscribers_ids) {
$subscribers = Subscriber::whereIn('id', $subscribers_ids) $subscribers = Subscriber::whereIn('id', $subscribers_ids)
->findArray(); ->findArray();
$processing_method = ($this->mailer_config['method'] === 'MailPoet') ?
'processBulkSubscribers' :
'processIndividualSubscriber';
$queue->subscribers = call_user_func_array( $queue->subscribers = call_user_func_array(
array( array(
$this, $this,
$processing_method $this->processing_method
), ),
array( array(
$mailer, $mailer,
@@ -59,6 +61,7 @@ class SendingQueue {
$queue $queue
) )
); );
die('aaaa');
} }
} }
} }
@@ -101,6 +104,7 @@ class SendingQueue {
$this->updateQueue($queue); $this->updateQueue($queue);
$this->checkSendingLimit(); $this->checkSendingLimit();
$this->checkExecutionTimer(); $this->checkExecutionTimer();
die('zzz');
return $queue->subscribers; return $queue->subscribers;
} }
@@ -185,7 +189,7 @@ class SendingQueue {
function checkExecutionTimer() { function checkExecutionTimer() {
$elapsed_time = microtime(true) - $this->timer; $elapsed_time = microtime(true) - $this->timer;
if($elapsed_time >= CronHelper::$daemon_execution_limit) { if($elapsed_time >= CronHelper::daemon_execution_limit) {
throw new \Exception(__('Maximum execution time reached.')); throw new \Exception(__('Maximum execution time reached.'));
} }
} }
@@ -217,7 +221,7 @@ class SendingQueue {
$queue->processed_at = date('Y-m-d H:i:s'); $queue->processed_at = date('Y-m-d H:i:s');
$queue->status = 'completed'; $queue->status = 'completed';
} }
$queue->subscribers = serialize($queue->subscribers); $queue->subscribers = serialize((array) $queue->subscribers);
$queue->save(); $queue->save();
} }
@@ -227,6 +231,7 @@ class SendingQueue {
} }
function checkSendingLimit() { function checkSendingLimit() {
// TODO: enforce sending frequency
} }
function getMailerConfig() { function getMailerConfig() {