- Refactors cron supervisor/daemon/router

This commit is contained in:
Vlad
2016-01-28 21:38:23 -05:00
parent 2cf03ec0a3
commit d4d575cda4
4 changed files with 132 additions and 153 deletions

View File

@@ -156,7 +156,9 @@ class Initializer {
} }
function runQueueSupervisor() { function runQueueSupervisor() {
if(php_sapi_name() === 'cli') return; if(php_sapi_name() === 'cli' ||
!Env::isPluginActivated()
) return;
try { try {
$supervisor = new Supervisor(); $supervisor = new Supervisor();
$supervisor->checkDaemon(); $supervisor->checkDaemon();

View File

@@ -18,58 +18,23 @@ class Daemon {
function __construct($request_payload = array()) { function __construct($request_payload = array()) {
set_time_limit(0); set_time_limit(0);
ignore_user_abort(); ignore_user_abort();
$this->daemon = $this->getDaemon(); $this->daemon = Supervisor::getDaemon();
$this->refreshed_token = $this->refreshToken(); $this->token = Security::generateRandomString();
$this->request_payload = $request_payload; $this->request_payload = $request_payload;
$this->timer = microtime(true); $this->timer = microtime(true);
} }
function start() { function run() {
if(!isset($this->request_payload['session'])) {
$this->abortWithError(__('Missing session ID.'));
}
$this->manageSession('start');
$daemon = $this->daemon; $daemon = $this->daemon;
if(!$daemon) { if(!$daemon) {
$this->saveDaemon( $this->abortWithError(__('Daemon does not exist.'));
array(
'status' => 'starting',
'counter' => 0
)
);
}
if($daemon['status'] === 'started') {
$_SESSION['cron_daemon'] = array(
'result' => false,
'errors' => array(__('Daemon already running.'))
);
}
if($daemon['status'] === 'starting') {
$_SESSION['cron_daemon'] = 'started';
$_SESSION['cron_daemon'] = array('result' => true);
$this->manageSession('end');
$daemon['status'] = 'started';
$daemon['token'] = $this->refreshed_token;
$this->saveDaemon($daemon);
$this->callSelf();
}
$this->manageSession('end');
}
function run() {
$allowed_statuses = array(
'stopping',
'starting',
'started'
);
if(!$this->daemon || !in_array($this->daemon['status'], $allowed_statuses)) {
$this->abortWithError(__('Invalid daemon status.'));
} }
if(!isset($this->request_payload['token']) || if(!isset($this->request_payload['token']) ||
$this->request_payload['token'] !== $this->daemon['token'] $this->request_payload['token'] !== $daemon['token']
) { ) {
$this->abortWithError('Invalid token.'); $this->abortWithError(__('Invalid or missing token.'));
} }
$this->abortIfStopped($daemon);
try { try {
$sending_queue = new SendingQueue($this->timer); $sending_queue = new SendingQueue($this->timer);
$sending_queue->process(); $sending_queue->process();
@@ -79,54 +44,33 @@ class Daemon {
if($elapsed_time < 30) { if($elapsed_time < 30) {
sleep(30 - $elapsed_time); sleep(30 - $elapsed_time);
} }
// after each execution, read daemon in case its status was modified // after each execution, re-read daemon data in case its status has changed
$daemon = $this->getDaemon(); $daemon = Supervisor::getDaemon();
// if the token has changed, abort further processing
if($daemon['status'] === 'stopping') $daemon['status'] = 'stopped'; if ($daemon['token'] !== $this->request_payload['token']) {
if($daemon['status'] === 'starting') $daemon['status'] = 'started'; exit;
}
$daemon['token'] = $this->refreshed_token;
$daemon['counter']++; $daemon['counter']++;
$this->abortIfStopped($daemon);
$this->saveDaemon($daemon); if($daemon['status'] === 'starting') {
$daemon['status'] = 'started';
if($daemon['status'] === 'started') $this->callSelf(); }
$daemon['token'] = $this->token;
Supervisor::saveDaemon($daemon);
$this->callSelf();
} }
function getDaemon() { function abortIfStopped($daemon) {
return Setting::getValue('cron_daemon'); if($daemon['status'] === 'stopped') exit;
} if($daemon['status'] === 'stopping') {
$daemon['status'] = 'stopped';
function saveDaemon($daemon_data) { Supervisor::saveDaemon($daemon);
$daemon_data['updated_at'] = time(); exit;
return Setting::setValue(
'cron_daemon',
$daemon_data
);
}
function refreshToken() {
return Security::generateRandomString();
}
function manageSession($action) {
switch($action) {
case 'start':
if(session_id()) {
session_write_close();
}
session_id($this->request_payload['session']);
session_start();
break;
case 'end':
session_write_close();
break;
} }
} }
function callSelf() { function callSelf() {
$payload = serialize(array('token' => $this->refreshed_token)); $payload = serialize(array('token' => $this->token));
Supervisor::accessRemoteUrl( Supervisor::accessRemoteUrl(
'/?mailpoet-api&section=queue&action=run&request_payload=' . '/?mailpoet-api&section=queue&action=run&request_payload=' .
base64_encode($payload) base64_encode($payload)
@@ -134,12 +78,7 @@ class Daemon {
exit; exit;
} }
function abortWithError($error) { function abortWithError($message) {
wp_send_json( exit('[mailpoet_cron_error:' . base64_encode($message) . ']');
array(
'result' => false,
'errors' => array($error)
));
exit;
} }
} }

View File

@@ -1,76 +1,98 @@
<?php <?php
namespace MailPoet\Cron; namespace MailPoet\Cron;
use MailPoet\Config\Env;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
use MailPoet\Util\Security;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
class Supervisor { class Supervisor {
public $daemon; public $daemon;
public $token;
public $force_run;
function __construct($force_start = false) { function __construct($force_run = false) {
$this->force_start = $force_start; $this->daemon = self::getDaemon();
if(!Env::isPluginActivated()) { $this->token = Security::generateRandomString();
throw new \Exception(__('MailPoet is not activated.')); $this->force_run = $force_run;
}
$this->daemon = $this->getDaemon();
} }
function checkDaemon() { function checkDaemon() {
if(!$this->daemon) { $daemon = $this->daemon;
return $this->startDaemon(); if(!$daemon) {
$daemon = $this->createDaemon();
return $this->runDaemon($daemon);
} }
if( // if the daemon is stopped, return its status and do nothing
!$this->force_start if(!$this->force_run &&
&& isset($this->daemon['status']) isset($daemon['status']) &&
&& in_array($this->daemon['status'], array('stopped', 'stopping')) $daemon['status'] === 'stopped'
) { ) {
return $this->daemon['status']; return $this->formatDaemonStatusMessage($daemon['status']);
}
$elapsed_time = time() - (int)$this->daemon['updated_at'];
if($elapsed_time < 40) {
if(!$this->force_start) {
return;
}
if($this->daemon['status'] === 'stopping' ||
$this->daemon['status'] === 'starting'
) {
return $this->daemon['status'];
}
} }
$this->daemon['status'] = 'starting'; $elapsed_time = time() - (int) $daemon['updated_at'];
$this->saveDaemon($this->daemon); // if it's been less than 40 seconds since last execution and we're not
return $this->startDaemon(); // force-running the daemon, return its status and do nothing
if($elapsed_time < 40 && !$this->force_run) {
return $this->formatDaemonStatusMessage($daemon['status']);
}
// if it's been less than 40 seconds since last execution, we are
// force-running the daemon and it's either being started or stopped,
// return its status and do nothing
elseif($elapsed_time < 40 &&
$this->force_run &&
in_array($daemon['status'], array(
'stopping',
'starting'
))
) {
return $this->formatDaemonStatusMessage($daemon['status']);
}
// re-create (restart) daemon
$this->createDaemon();
return $this->runDaemon();
} }
function startDaemon() { function runDaemon() {
if(!session_id()) session_start(); $payload = serialize(array('token' => $this->token));
$sessionId = session_id(); $request = self::accessRemoteUrl(
session_write_close(); '/?mailpoet-api&section=queue&action=run&request_payload=' .
$_SESSION['cron_daemon'] = null; base64_encode($payload)
$requestPayload = serialize(array('session' => $sessionId));
self::accessRemoteUrl(
'/?mailpoet-api&section=queue&action=start&request_payload=' .
base64_encode($requestPayload)
); );
session_start(); preg_match('/\[(mailpoet_cron_error:.*?)\]/i', $request, $status);
if (!isset($_SESSION['cron_daemon'])) { $daemon = self::getDaemon();
throw new \Exception(__('Session cannot be read.')); if(!empty($status) || !$daemon) {
if(!$daemon) {
$message = __('Daemon failed to run.');
} else {
list(, $message) = explode(':', $status[0]);
$message = base64_decode($message);
}
return $this->formatResult(
false,
$message
);
} }
$daemonStatus = $_SESSION['cron_daemon']; return $this->formatDaemonStatusMessage($daemon['status']);
unset($_SESSION['daemon']);
session_write_close();
return $daemonStatus;
} }
function getDaemon() { function createDaemon() {
$daemon = array(
'status' => 'starting',
'counter' => 0,
'token' => $this->token
);
self::saveDaemon($daemon);
return $daemon;
}
static function getDaemon() {
return Setting::getValue('cron_daemon'); return Setting::getValue('cron_daemon');
} }
function saveDaemon($daemon_data) { static function saveDaemon($daemon_data) {
$daemon_data['updated_at'] = time();
return Setting::setValue( return Setting::setValue(
'cron_daemon', 'cron_daemon',
$daemon_data $daemon_data
@@ -82,10 +104,11 @@ class Supervisor {
'timeout' => 1, 'timeout' => 1,
'user-agent' => 'MailPoet (www.mailpoet.com) Cron' 'user-agent' => 'MailPoet (www.mailpoet.com) Cron'
); );
wp_remote_get( $result = wp_remote_get(
self::getSiteUrl() . $url, self::getSiteUrl() . $url,
$args $args
); );
return wp_remote_retrieve_body($result);
} }
static function getSiteUrl() { static function getSiteUrl() {
@@ -101,7 +124,29 @@ class Supervisor {
// connect to the URL without port // connect to the URL without port
$fp = @fsockopen($server['host'], $server['port'], $errno, $errstr, 1); $fp = @fsockopen($server['host'], $server['port'], $errno, $errstr, 1);
if($fp) return preg_replace('!(?=:\d+):\d+!', '$1', site_url()); if($fp) return preg_replace('!(?=:\d+):\d+!', '$1', site_url());
// throw an error if all connections fail // throw an error if all connection attempts failed
throw new \Exception(__('Site URL is unreachable.')); throw new \Exception(__('Site URL is unreachable.'));
} }
private function formatDaemonStatusMessage($status) {
return $this->formatResultMessage(
true,
sprintf(
__('Daemon is currently %.'),
__($status)
)
);
}
private function formatResultMessage($result, $message) {
$formattedResult = array(
'result' => $result
);
if(!$result) {
$formattedResult['errors'] = array($message);
} else {
$formattedResult['message'] = $message;
}
return $formattedResult;
}
} }

View File

@@ -2,7 +2,6 @@
namespace MailPoet\Router; namespace MailPoet\Router;
use Carbon\Carbon; use Carbon\Carbon;
use MailPoet\Cron\Daemon;
use MailPoet\Cron\Supervisor; use MailPoet\Cron\Supervisor;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
@@ -10,23 +9,17 @@ if(!defined('ABSPATH')) exit;
class Cron { class Cron {
function start() { function start() {
$supervisor = new Supervisor($forceStart = true); $supervisor = new Supervisor($force_run = true);
wp_send_json( wp_send_json($supervisor->checkDaemon());
array(
'result' => $supervisor->checkDaemon() ? true : false
)
);
} }
function stop() { function stop() {
$daemon = new Daemon(); $daemon = Supervisor::getDaemon();
if(!$daemon->daemon || if(!$daemon || $daemon['status'] !== 'started') {
$daemon->daemon['status'] !== 'started'
) {
$result = false; $result = false;
} else { } else {
$daemon->daemon['status'] = 'stopping'; $daemon['status'] = 'stopping';
$result = $daemon->saveDaemon($daemon->daemon); Supervisor::saveDaemon($daemon);
} }
wp_send_json( wp_send_json(
array( array(