- Addresses issues identified during code review
This commit is contained in:
@ -10,91 +10,93 @@ require_once(ABSPATH . 'wp-includes/pluggable.php');
|
||||
if(!defined('ABSPATH')) exit;
|
||||
|
||||
class Daemon {
|
||||
function __construct($requestPayload = array()) {
|
||||
public $daemon;
|
||||
public $request_payload;
|
||||
public $refreshed_token;
|
||||
public $timer;
|
||||
|
||||
function __construct($request_payload = array()) {
|
||||
set_time_limit(0);
|
||||
ignore_user_abort();
|
||||
list ($this->daemon, $this->daemonData) = $this->getDaemon();
|
||||
$this->refreshedToken = $this->refreshToken();
|
||||
$this->requestPayload = $requestPayload;
|
||||
$this->daemon = $this->getDaemon();
|
||||
$this->refreshed_token = $this->refreshToken();
|
||||
$this->request_payload = $request_payload;
|
||||
$this->timer = microtime(true);
|
||||
}
|
||||
|
||||
function start() {
|
||||
if(!isset($this->requestPayload['session'])) {
|
||||
if(!isset($this->request_payload['session'])) {
|
||||
$this->abortWithError(__('Missing session ID.'));
|
||||
}
|
||||
$this->manageSession('start');
|
||||
$daemon = $this->daemon;
|
||||
$daemonData = $this->daemonData;
|
||||
if(!$daemon) {
|
||||
$daemon = Setting::create();
|
||||
$daemon->name = 'cron_daemon';
|
||||
$daemonData = array(
|
||||
'status' => 'starting',
|
||||
'counter' => 0
|
||||
$this->saveDaemon(
|
||||
array(
|
||||
'status' => 'starting',
|
||||
'counter' => 0
|
||||
)
|
||||
);
|
||||
$daemon->value = json_encode($daemonData);
|
||||
$daemon->save();
|
||||
}
|
||||
if($daemonData['status'] === 'started') {
|
||||
if($daemon['status'] === 'started') {
|
||||
$_SESSION['cron_daemon'] = array(
|
||||
'result' => false,
|
||||
'errors' => array(__('Daemon already running.'))
|
||||
);
|
||||
}
|
||||
if($daemonData['status'] === 'starting') {
|
||||
if($daemon['status'] === 'starting') {
|
||||
$_SESSION['cron_daemon'] = 'started';
|
||||
$_SESSION['cron_daemon'] = array('result' => true);
|
||||
$daemonData['status'] = 'started';
|
||||
$daemonData['token'] = $this->refreshedToken;
|
||||
$this->manageSession('end');
|
||||
$daemon->value = json_encode($daemonData);
|
||||
$daemon->save();
|
||||
$daemon['status'] = 'started';
|
||||
$daemon['token'] = $this->refreshed_token;
|
||||
$this->saveDaemon($daemon);
|
||||
$this->callSelf();
|
||||
}
|
||||
$this->manageSession('end');
|
||||
}
|
||||
|
||||
function run() {
|
||||
$allowedStatuses = array(
|
||||
$allowed_statuses = array(
|
||||
'stopping',
|
||||
'starting',
|
||||
'started'
|
||||
);
|
||||
if(!$this->daemon || !in_array($this->daemonData['status'], $allowedStatuses)) {
|
||||
if(!$this->daemon || !in_array($this->daemon['status'], $allowed_statuses)) {
|
||||
$this->abortWithError(__('Invalid daemon status.'));
|
||||
}
|
||||
if(!isset($this->requestPayload['token']) ||
|
||||
$this->requestPayload['token'] !== $this->daemonData['token']
|
||||
if(!isset($this->request_payload['token']) ||
|
||||
$this->request_payload['token'] !== $this->daemon['token']
|
||||
) {
|
||||
$this->abortWithError('Invalid token.');
|
||||
}
|
||||
try {
|
||||
$sendingQueue = new SendingQueue($this->timer);
|
||||
$sendingQueue->process();
|
||||
$sending_queue = new SendingQueue($this->timer);
|
||||
$sending_queue->process();
|
||||
} catch(Exception $e) {
|
||||
}
|
||||
$elapsedTime = microtime(true) - $this->timer;
|
||||
if($elapsedTime < 30) {
|
||||
sleep(30 - $elapsedTime);
|
||||
$elapsed_time = microtime(true) - $this->timer;
|
||||
if($elapsed_time < 30) {
|
||||
sleep(30 - $elapsed_time);
|
||||
}
|
||||
// after each execution, read daemon in case it's status was modified
|
||||
list($daemon, $daemonData) = $this->getDaemon();
|
||||
if($daemonData['status'] === 'stopping') $daemonData['status'] = 'stopped';
|
||||
if($daemonData['status'] === 'starting') $daemonData['status'] = 'started';
|
||||
$daemonData['token'] = $this->refreshedToken;
|
||||
$daemonData['counter']++;
|
||||
$daemon->value = json_encode($daemonData);
|
||||
$daemon->save();
|
||||
if($daemonData['status'] === 'started') $this->callSelf();
|
||||
$daemon = $this->getDaemon();
|
||||
if($daemon['status'] === 'stopping') $daemon['status'] = 'stopped';
|
||||
if($daemon['status'] === 'starting') $daemon['status'] = 'started';
|
||||
$daemon['token'] = $this->refreshed_token;
|
||||
$daemon['counter']++;
|
||||
$this->saveDaemon($daemon);
|
||||
if($daemon['status'] === 'started') $this->callSelf();
|
||||
}
|
||||
|
||||
function getDaemon() {
|
||||
$daemon = Setting::where('name', 'cron_daemon')
|
||||
->findOne();
|
||||
return array(
|
||||
($daemon) ? $daemon : null,
|
||||
($daemon) ? json_decode($daemon->value, true) : null
|
||||
return Setting::getValue('cron_daemon', null);
|
||||
}
|
||||
|
||||
function saveDaemon($daemon_data) {
|
||||
return Setting::setValue(
|
||||
'cron_daemon',
|
||||
$daemon_data
|
||||
);
|
||||
}
|
||||
|
||||
@ -108,7 +110,7 @@ class Daemon {
|
||||
if(session_id()) {
|
||||
session_write_close();
|
||||
}
|
||||
session_id($this->requestPayload['session']);
|
||||
session_id($this->request_payload['session']);
|
||||
session_start();
|
||||
break;
|
||||
case 'end':
|
||||
@ -118,7 +120,7 @@ class Daemon {
|
||||
}
|
||||
|
||||
function callSelf() {
|
||||
$payload = json_encode(array('token' => $this->refreshedToken));
|
||||
$payload = json_encode(array('token' => $this->refreshed_token));
|
||||
Supervisor::accessRemoteUrl(
|
||||
'/?mailpoet-api§ion=queue&action=run&request_payload=' . urlencode($payload)
|
||||
);
|
||||
|
@ -8,38 +8,39 @@ use MailPoet\Models\Setting;
|
||||
if(!defined('ABSPATH')) exit;
|
||||
|
||||
class Supervisor {
|
||||
function __construct($forceStart = false) {
|
||||
$this->forceStart = $forceStart;
|
||||
public $daemon;
|
||||
|
||||
function __construct($force_start = false) {
|
||||
$this->force_start = $force_start;
|
||||
if(!Env::isPluginActivated()) {
|
||||
throw new \Exception(__('MailPoet is not activated.'));
|
||||
}
|
||||
list ($this->daemon, $this->daemonData) = $this->getDaemon();
|
||||
$this->daemon = $this->getDaemon();
|
||||
}
|
||||
|
||||
function checkDaemon() {
|
||||
if(!$this->daemon) {
|
||||
return $this->startDaemon();
|
||||
}
|
||||
if(!$this->forceStart && (
|
||||
$this->daemonData['status'] === 'stopped' ||
|
||||
$this->daemonData['status'] === 'stopping')
|
||||
if(!$this->force_start && (
|
||||
$this->daemon['value']['status'] === 'stopped' ||
|
||||
$this->daemon['value']['status'] === 'stopping')
|
||||
) {
|
||||
return $this->daemonData['status'];
|
||||
return $this->daemon['value']['status'];
|
||||
}
|
||||
$timeSinceLastRun = $this->getDaemonLastRunTime();
|
||||
if($timeSinceLastRun < 40) {
|
||||
if(!$this->forceStart) {
|
||||
$time_since_last_run = $this->getDaemonLastRunTime();
|
||||
if($time_since_last_run < 40) {
|
||||
if(!$this->force_start) {
|
||||
return;
|
||||
}
|
||||
if($this->daemonData['status'] === 'stopping' ||
|
||||
$this->daemonData['status'] === 'starting'
|
||||
if($this->daemon['value']['status'] === 'stopping' ||
|
||||
$this->daemon['value']['status'] === 'starting'
|
||||
) {
|
||||
return $this->daemonData['status'];
|
||||
return $this->daemon['value']['status'];
|
||||
}
|
||||
}
|
||||
$this->daemonData['status'] = 'starting';
|
||||
$this->daemon->value = json_encode($this->daemonData);
|
||||
$this->daemon->save();
|
||||
$this->daemon['value']['status'] = 'starting';
|
||||
$this->saveDaemon($this->daemon['value']);
|
||||
return $this->startDaemon();
|
||||
}
|
||||
|
||||
@ -50,7 +51,8 @@ class Supervisor {
|
||||
$_SESSION['cron_daemon'] = null;
|
||||
$requestPayload = json_encode(array('session' => $sessionId));
|
||||
self::accessRemoteUrl(
|
||||
'/?mailpoet-api§ion=queue&action=start&request_payload=' . urlencode($requestPayload)
|
||||
'/?mailpoet-api§ion=queue&action=start&request_payload=' .
|
||||
urlencode($requestPayload)
|
||||
);
|
||||
session_start();
|
||||
$daemonStatus = $_SESSION['cron_daemon'];
|
||||
@ -62,10 +64,16 @@ class Supervisor {
|
||||
function getDaemon() {
|
||||
$daemon = Setting::where('name', 'cron_daemon')
|
||||
->findOne();
|
||||
$daemonData = ($daemon) ? json_decode($daemon->value, true) : false;
|
||||
return array(
|
||||
$daemon,
|
||||
$daemonData
|
||||
if(!$daemon) return false;
|
||||
$daemon = $daemon->asArray();
|
||||
$daemon['value'] = unserialize($daemon['value']);
|
||||
return $daemon;
|
||||
}
|
||||
|
||||
function saveDaemon($daemon_data) {
|
||||
return Setting::setValue(
|
||||
'cron_daemon',
|
||||
$daemon_data
|
||||
);
|
||||
}
|
||||
|
||||
@ -98,11 +106,11 @@ class Supervisor {
|
||||
}
|
||||
|
||||
function getDaemonLastRunTime() {
|
||||
$currentTime = Carbon::now('UTC');
|
||||
$lastUpdateTime = Carbon::createFromFormat(
|
||||
$current_time = Carbon::now('UTC');
|
||||
$last_update_time = Carbon::createFromFormat(
|
||||
'Y-m-d H:i:s',
|
||||
$this->daemon->updated_at, 'UTC'
|
||||
$this->daemon['updated_at'], 'UTC'
|
||||
);
|
||||
return $currentTime->diffInSeconds($lastUpdateTime);
|
||||
return $current_time->diffInSeconds($last_update_time);
|
||||
}
|
||||
}
|
@ -10,36 +10,41 @@ use MailPoet\Newsletter\Renderer\Renderer;
|
||||
if(!defined('ABSPATH')) exit;
|
||||
|
||||
class SendingQueue {
|
||||
public $timer;
|
||||
|
||||
function __construct($timer = false) {
|
||||
$this->timer = ($timer) ? $timer : microtime(true);
|
||||
}
|
||||
|
||||
function process() {
|
||||
// TODO: implement mailer sending frequency limits
|
||||
foreach($this->getQueues() as $queue) {
|
||||
$newsletter = Newsletter::findOne($queue->newsletter_id)
|
||||
->asArray();
|
||||
if(!$newsletter) {
|
||||
continue;
|
||||
};
|
||||
$newsletter = $this->renderNewsletter($newsletter);
|
||||
$mailer = $this->configureMailerForNewsletter($newsletter);
|
||||
$newsletter = $this->renderNewsletter($newsletter);
|
||||
$subscribers = json_decode($queue->subscribers, true);
|
||||
$subscribersToProcess = $subscribers['to_process'];
|
||||
$subscribers_to_process = $subscribers['to_process'];
|
||||
if(!isset($subscribers['processed'])) $subscribers['processed'] = array();
|
||||
if(!isset($subscribers['failed'])) $subscribers['failed'] = array();
|
||||
foreach(array_chunk($subscribersToProcess, 200) as $subscriberIds) {
|
||||
$dbSubscribers = Subscriber::whereIn('id', $subscriberIds)
|
||||
foreach(array_chunk($subscribers_to_process, 200) as $subscriber_ids) {
|
||||
$db_subscribers = Subscriber::whereIn('id', $subscriber_ids)
|
||||
->findArray();
|
||||
foreach($dbSubscribers as $dbSubscriber) {
|
||||
foreach($db_subscribers as $db_subscriber) {
|
||||
$this->checkExecutionTimer();
|
||||
$result = $this->sendNewsletter(
|
||||
$mailer,
|
||||
$this->processNewsletter($newsletter),
|
||||
$dbSubscriber);
|
||||
$db_subscriber);
|
||||
if($result) {
|
||||
$this->updateStatistics($newsletter['id'], $dbSubscriber['id'], $queue->id);
|
||||
$subscribers['processed'][] = $dbSubscriber['id'];
|
||||
} else $subscribers['failed'][] = $dbSubscriber['id'];
|
||||
$this->updateStatistics($newsletter['id'], $db_subscriber['id'], $queue->id);
|
||||
$subscribers['processed'][] = $db_subscriber['id'];
|
||||
} else {
|
||||
$subscribers['failed'][] = $db_subscriber['id'];
|
||||
}
|
||||
$this->updateQueue($queue, $subscribers);
|
||||
}
|
||||
}
|
||||
@ -52,18 +57,18 @@ class SendingQueue {
|
||||
}
|
||||
|
||||
function sendNewsletter($mailer, $newsletter, $subscriber) {
|
||||
return $mailer->mailerInstance->send(
|
||||
return $mailer->mailer_instance->send(
|
||||
$newsletter,
|
||||
$mailer->transformSubscriber($subscriber)
|
||||
);
|
||||
}
|
||||
|
||||
function updateStatistics($newsletterId, $subscriberId, $queueId) {
|
||||
$newsletterStatistics = NewsletterStatistics::create();
|
||||
$newsletterStatistics->subscriber_id = $newsletterId;
|
||||
$newsletterStatistics->newsletter_id = $subscriberId;
|
||||
$newsletterStatistics->queue_id = $queueId;
|
||||
$newsletterStatistics->save();
|
||||
function updateStatistics($newsletter_id, $subscriber_id, $queue_id) {
|
||||
$newsletter_statistic = NewsletterStatistics::create();
|
||||
$newsletter_statistic->subscriber_id = $newsletter_id;
|
||||
$newsletter_statistic->newsletter_id = $subscriber_id;
|
||||
$newsletter_statistic->queue_id = $queue_id;
|
||||
$newsletter_statistic->save();
|
||||
}
|
||||
|
||||
function updateQueue($queue, $subscribers) {
|
||||
@ -93,20 +98,24 @@ class SendingQueue {
|
||||
'name' => $newsletter['sender_name'],
|
||||
'address' => $newsletter['sender_address']
|
||||
);
|
||||
} else $sender = false;
|
||||
} else {
|
||||
$sender = false;
|
||||
}
|
||||
if(!empty($newsletter['reply_to_address']) && !empty($newsletter['reply_to_name'])) {
|
||||
$replyTo = array(
|
||||
$reply_to = array(
|
||||
'name' => $newsletter['reply_to_name'],
|
||||
'address' => $newsletter['reply_to_address']
|
||||
);
|
||||
} else $replyTo = false;
|
||||
$mailer = new Mailer($method = false, $sender, $replyTo);
|
||||
} else {
|
||||
$reply_to = false;
|
||||
}
|
||||
$mailer = new Mailer($method = false, $sender, $reply_to);
|
||||
return $mailer;
|
||||
}
|
||||
|
||||
function checkExecutionTimer() {
|
||||
$elapsedTime = microtime(true) - $this->timer;
|
||||
if($elapsedTime >= 30) throw new \Exception('Maximum execution time reached.');
|
||||
$elapsed_time = microtime(true) - $this->timer;
|
||||
if($elapsed_time >= 30) throw new \Exception('Maximum execution time reached.');
|
||||
}
|
||||
|
||||
function getQueues() {
|
||||
@ -118,7 +127,8 @@ class SendingQueue {
|
||||
|
||||
function renderNewsletter($newsletter) {
|
||||
$renderer = new Renderer(json_decode($newsletter['body'], true));
|
||||
$newsletter['body'] = $renderer->renderAll();
|
||||
// TODO: update once text rendering is implemented/enderer returns an array
|
||||
$newsletter['body'] = array('html' => $renderer->render(), 'text' => '');
|
||||
return $newsletter;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user