- Adds session support
- Renames Queue to Daemon - Updates router methods
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
<?php
|
<?php
|
||||||
namespace MailPoet\Config;
|
namespace MailPoet\Config;
|
||||||
|
|
||||||
use MailPoet\Queue\Queue;
|
use MailPoet\Queue\Daemon;
|
||||||
|
|
||||||
if(!defined('ABSPATH')) exit;
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
@@ -33,7 +33,7 @@ class PublicAPI {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function queue() {
|
function queue() {
|
||||||
$queue = new Queue($this->payload);
|
$queue = new Daemon($this->payload);
|
||||||
if(method_exists($queue, $this->action)) {
|
if(method_exists($queue, $this->action)) {
|
||||||
call_user_func(
|
call_user_func(
|
||||||
array(
|
array(
|
||||||
|
@@ -8,7 +8,7 @@ require_once(ABSPATH . 'wp-includes/pluggable.php');
|
|||||||
|
|
||||||
if(!defined('ABSPATH')) exit;
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
class Queue {
|
class Daemon {
|
||||||
function __construct($payload = array()) {
|
function __construct($payload = array()) {
|
||||||
set_time_limit(0);
|
set_time_limit(0);
|
||||||
ignore_user_abort();
|
ignore_user_abort();
|
||||||
@@ -18,9 +18,10 @@ class Queue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function start() {
|
function start() {
|
||||||
if(!isset($this->payload['token'])) {
|
if(!isset($this->payload['session'])) {
|
||||||
$this->abortWithError('missing token');
|
$this->abortWithError('missing session ID');
|
||||||
}
|
}
|
||||||
|
$this->manageSession('start');
|
||||||
$queue = $this->queue;
|
$queue = $this->queue;
|
||||||
$queueData = $this->queueData;
|
$queueData = $this->queueData;
|
||||||
if(!$queue) {
|
if(!$queue) {
|
||||||
@@ -29,33 +30,30 @@ class Queue {
|
|||||||
$queue->value = serialize(array('status' => 'stopped'));
|
$queue->value = serialize(array('status' => 'stopped'));
|
||||||
$queue->save();
|
$queue->save();
|
||||||
}
|
}
|
||||||
if(!preg_match('!stopped|paused!', $queueData['status'])
|
if($queueData['status'] !== 'started') {
|
||||||
) {
|
$_SESSION['queue'] = 'started';
|
||||||
$queueData = array(
|
$queueData = array(
|
||||||
'status' => 'started',
|
'status' => 'started',
|
||||||
'token' => $this->refreshedToken,
|
'token' => $this->refreshedToken,
|
||||||
'executionCounter' => ($queueData['status'] === 'paused') ?
|
'executionCounter' => ($queueData['status'] === 'paused') ?
|
||||||
$queueData['executionCounter']
|
$queueData['executionCounter']
|
||||||
: 0,
|
: 0
|
||||||
'log' => array(
|
|
||||||
'token' => $this->payload['token'],
|
|
||||||
'message' => 'started'
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
$_SESSION['queue'] = array('result' => true);
|
||||||
|
$this->manageSession('end');
|
||||||
$queue->value = serialize($queueData);
|
$queue->value = serialize($queueData);
|
||||||
$queue->save();
|
$queue->save();
|
||||||
$this->callSelf();
|
$this->callSelf();
|
||||||
} else {
|
} else {
|
||||||
$queueData['log'] = array(
|
$_SESSION['queue'] = array(
|
||||||
'token' => $this->payload['token'],
|
'result' => false,
|
||||||
'status' => 'already started'
|
'error' => 'already started'
|
||||||
);
|
);
|
||||||
$queue->value = serialize($queueData);
|
|
||||||
$queue->save();
|
|
||||||
}
|
}
|
||||||
|
$this->manageSession('end');
|
||||||
}
|
}
|
||||||
|
|
||||||
function process() {
|
function run() {
|
||||||
if(!$this->queue || $this->queueData['status'] !== 'started') {
|
if(!$this->queue || $this->queueData['status'] !== 'started') {
|
||||||
$this->abortWithError('not running');
|
$this->abortWithError('not running');
|
||||||
}
|
}
|
||||||
@@ -88,7 +86,7 @@ class Queue {
|
|||||||
);
|
);
|
||||||
wp_remote_get(
|
wp_remote_get(
|
||||||
site_url() .
|
site_url() .
|
||||||
'/?mailpoet-api§ion=queue&action=process&payload=' . urlencode($payload),
|
'/?mailpoet-api§ion=queue&action=run&payload=' . urlencode($payload),
|
||||||
$args
|
$args
|
||||||
);
|
);
|
||||||
exit;
|
exit;
|
||||||
@@ -122,4 +120,19 @@ class Queue {
|
|||||||
function refreshToken() {
|
function refreshToken() {
|
||||||
return Security::generateRandomString(5);
|
return Security::generateRandomString(5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function manageSession($action) {
|
||||||
|
switch ($action) {
|
||||||
|
case 'start':
|
||||||
|
if(session_id()) {
|
||||||
|
session_write_close();
|
||||||
|
}
|
||||||
|
session_id($this->payload['session']);
|
||||||
|
session_start();
|
||||||
|
break;
|
||||||
|
case 'end':
|
||||||
|
session_write_close();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@@ -4,7 +4,6 @@ namespace MailPoet\Queue;
|
|||||||
use Carbon\Carbon;
|
use Carbon\Carbon;
|
||||||
use MailPoet\Config\Env;
|
use MailPoet\Config\Env;
|
||||||
use MailPoet\Models\Setting;
|
use MailPoet\Models\Setting;
|
||||||
use MailPoet\Util\Security;
|
|
||||||
|
|
||||||
if(!defined('ABSPATH')) exit;
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
@@ -20,8 +19,9 @@ class Supervisor {
|
|||||||
return $this->startQueue();
|
return $this->startQueue();
|
||||||
} else {
|
} else {
|
||||||
if(!$this->forceStart && ($this->queueData['status'] === 'paused' ||
|
if(!$this->forceStart && ($this->queueData['status'] === 'paused' ||
|
||||||
$this->queueData['status'] === 'stopped'
|
$this->queueData['status'] === 'stopped'
|
||||||
)) {
|
)
|
||||||
|
) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
$currentTime = Carbon::now('UTC');
|
$currentTime = Carbon::now('UTC');
|
||||||
@@ -39,14 +39,16 @@ class Supervisor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function startQueue() {
|
function startQueue() {
|
||||||
|
if(!session_id()) session_start();
|
||||||
|
$sessionId = session_id();
|
||||||
|
session_write_close();
|
||||||
$args = array(
|
$args = array(
|
||||||
'timeout' => 1,
|
'timeout' => 1,
|
||||||
'user-agent' => 'MailPoet (www.mailpoet.com)'
|
'user-agent' => 'MailPoet (www.mailpoet.com)'
|
||||||
);
|
);
|
||||||
$token = Security::generateRandomString(5);
|
|
||||||
$payload = json_encode(
|
$payload = json_encode(
|
||||||
array(
|
array(
|
||||||
'token' => $token
|
'session' => $sessionId
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
wp_remote_get(
|
wp_remote_get(
|
||||||
@@ -55,8 +57,11 @@ class Supervisor {
|
|||||||
urlencode($payload),
|
urlencode($payload),
|
||||||
$args
|
$args
|
||||||
);
|
);
|
||||||
list ($queue, $queueData) = $this->getQueue();
|
session_start();
|
||||||
return ($queueData && $queueData['token'] === $token) ? true : false;
|
$queueStatus = $_SESSION['queue'];
|
||||||
|
unset($_SESSION['queue']);
|
||||||
|
session_write_close();
|
||||||
|
return $queueStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
function getQueue() {
|
function getQueue() {
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
<?php
|
<?php
|
||||||
namespace MailPoet\Router;
|
namespace MailPoet\Router;
|
||||||
|
|
||||||
|
use MailPoet\Queue\Daemon;
|
||||||
use MailPoet\Queue\Supervisor;
|
use MailPoet\Queue\Supervisor;
|
||||||
|
|
||||||
if(!defined('ABSPATH')) exit;
|
if(!defined('ABSPATH')) exit;
|
||||||
@@ -17,35 +18,27 @@ class Queue {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
function pause() {
|
function update($data) {
|
||||||
wp_send_json(
|
switch ($data['action']) {
|
||||||
array(
|
case 'stop':
|
||||||
'result' => ($this->updateQueueStatus('paused') ?
|
$status = 'stopped';
|
||||||
true :
|
break;
|
||||||
false
|
default:
|
||||||
)
|
$status = 'paused';
|
||||||
)
|
break;
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
function stop() {
|
|
||||||
wp_send_json(
|
|
||||||
array(
|
|
||||||
'result' => ($this->updateQueueStatus('stopped') ?
|
|
||||||
true :
|
|
||||||
false
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private function updateQueueStatus($status) {
|
|
||||||
$queue = new \MailPoet\Queue\Queue();
|
|
||||||
if(!$queue->queue || $queue->queueData['status'] !== 'started') {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
$queue->queueData['status'] = $status;
|
$queue = new Daemon();
|
||||||
$queue->queue->value = serialize($queue->queueData);
|
if(!$queue->queue || $queue->queueData['status'] !== 'started') {
|
||||||
return $queue->queue->save();
|
$result = false;
|
||||||
|
} else {
|
||||||
|
$queue->queueData['status'] = $status;
|
||||||
|
$queue->queue->value = serialize($queue->queueData);
|
||||||
|
$result = $queue->queue->save();
|
||||||
|
}
|
||||||
|
wp_send_json(
|
||||||
|
array(
|
||||||
|
'result' => $result
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
Reference in New Issue
Block a user