diff --git a/composer.json b/composer.json index d6086d15db..1abbafed7f 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,8 @@ "j4mie/paris": "1.5.4", "swiftmailer/swiftmailer": "^5.4", "phpseclib/phpseclib": "*", - "nesbot/carbon": "*" + "nesbot/carbon": "*", + "mtdowling/cron-expression": "^1.0" }, "require-dev": { "codeception/codeception": "*", diff --git a/lib/Config/Initializer.php b/lib/Config/Initializer.php index 402b085974..5828912cb6 100644 --- a/lib/Config/Initializer.php +++ b/lib/Config/Initializer.php @@ -2,6 +2,7 @@ namespace MailPoet\Config; use MailPoet\Models; +use MailPoet\Queue\Supervisor; use MailPoet\Router; if(!defined('ABSPATH')) exit; @@ -25,7 +26,8 @@ class Initializer { $this->setupAnalytics(); $this->setupPermissions(); $this->setupChangelog(); - $this->setupPublicAPI(); + $this->setupPublicAPI(); + $this->runQueueSupervisor(); } function setupDB() { @@ -34,7 +36,8 @@ class Initializer { \ORM::configure('password', Env::$db_password); \ORM::configure('logging', WP_DEBUG); \ORM::configure('driver_options', array( - \PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8' + \PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8', + \PDO::MYSQL_ATTR_INIT_COMMAND => 'SET TIME_ZONE = "+00:00"' )); $subscribers = Env::$db_prefix . 'subscribers'; @@ -49,6 +52,7 @@ class Initializer { $subscriber_custom_field = Env::$db_prefix . 'subscriber_custom_field'; $newsletter_option_fields = Env::$db_prefix . 'newsletter_option_fields'; $newsletter_option = Env::$db_prefix . 'newsletter_option'; + $queue = Env::$db_prefix . 'queue'; define('MP_SUBSCRIBERS_TABLE', $subscribers); define('MP_SETTINGS_TABLE', $settings); @@ -62,6 +66,7 @@ class Initializer { define('MP_SUBSCRIBER_CUSTOM_FIELD_TABLE', $subscriber_custom_field); define('MP_NEWSLETTER_OPTION_FIELDS_TABLE', $newsletter_option_fields); define('MP_NEWSLETTER_OPTION_TABLE', $newsletter_option); + define('MP_QUEUE_TABLE', $queue); } function setupActivator() { @@ -115,5 +120,10 @@ class Initializer { function setupPublicAPI() { $publicAPI = new PublicAPI(); $publicAPI->init(); - } -} + } + + function runQueueSupervisor() { + $supervisor = new Supervisor(); + $supervisor->checkQueue(); + } +} \ No newline at end of file diff --git a/lib/Config/Migrator.php b/lib/Config/Migrator.php index 81bd028fee..f5de58d680 100644 --- a/lib/Config/Migrator.php +++ b/lib/Config/Migrator.php @@ -21,6 +21,7 @@ class Migrator { 'subscriber_custom_field', 'newsletter_option_fields', 'newsletter_option', + 'queue', 'forms' ); } @@ -199,6 +200,21 @@ class Migrator { return $this->sqlify(__FUNCTION__, $attributes); } + function queue() { + $attributes = array( + 'id mediumint(9) NOT NULL AUTO_INCREMENT,', + 'newsletter_id mediumint(9) NOT NULL,', + 'subscribers longtext,', + 'total mediumint(9) NOT NULL DEFAULT 0,', + 'processed mediumint(9) NOT NULL DEFAULT 0,', + 'created_at TIMESTAMP NOT NULL DEFAULT 0,', + 'updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,', + 'deleted_at TIMESTAMP NULL DEFAULT NULL,', + 'PRIMARY KEY (id)', + ); + return $this->sqlify(__FUNCTION__, $attributes); + } + function forms() { $attributes = array( 'id mediumint(9) NOT NULL AUTO_INCREMENT,', diff --git a/lib/Mailer/SMTP.php b/lib/Mailer/SMTP.php index 08a80501f7..6e74ff03c6 100644 --- a/lib/Mailer/SMTP.php +++ b/lib/Mailer/SMTP.php @@ -20,6 +20,7 @@ class SMTP { $message = $this->createMessage($newsletter, $subscriber); $result = $this->mailer->send($message); } catch (\Exception $e) { + return $e->getMessage(); $result = false; } return ($result === 1); diff --git a/lib/Models/Queue.php b/lib/Models/Queue.php new file mode 100644 index 0000000000..9d97b60a86 --- /dev/null +++ b/lib/Models/Queue.php @@ -0,0 +1,12 @@ +checkRequestMethod(); set_time_limit(0); - header('Connection: close'); - header('X-MailPoet-Queue: started'); - ob_end_flush(); - ob_flush(); - flush();*/ + ignore_user_abort(); + ob_start(); + list ($this->queue, $this->queueData) = $this->getQueue(); + } + + function start() { + $_start = function ($queue, $queueData) { + $queue->value = serialize($queueData); + $queue->updated_at = date('Y-m-d H:i:s'); + $queue->save(); + $this->setHeadersAndFlush($queueData['status']); + $this->callSelf(); + }; + if(!$this->queue) { + $queue = Setting::create(); + $queueData = array( + 'status' => 'started', + 'executionCounter' => 0 + ); + $queue->name = 'queue'; + $_start($queue, $queueData); + } + $queue = $this->queue; + $queueData = $this->queueData; + if($queueData['status'] === 'stopped' || $queueData['status'] === 'paused') { + if($queueData['status'] !== 'paused') $queueData['executionCounter'] = 0; + $queueData['status'] = 'started'; + $_start($queue, $queueData); + } + $this->setHeadersAndFlush('already running', true); } function process() { + if(!$this->queue) { + $this->setHeadersAndFlush('not running', true); + } + $queue = $this->queue; + $queueData = $this->queueData; + if($queueData['status'] !== 'started') { + $this->setHeadersAndFlush('not running', true); + } + // TODO: check if the queue is already being executed + $currentTime = Carbon::now('UTC'); + $lastUpdateTime = Carbon::createFromFormat('Y-m-d H:i:s', $queue->updated_at, 'UTC'); + $timeSinceLastStart = $currentTime->diffInSeconds($lastUpdateTime); + $this->setHeadersAndFlush('processing'); + sleep(30); // THIS WILL BE REPLACED BY SENDING LOGIC + list($queue, $queueData) = $this->getQueue(); + $queueData['executionCounter']++; + $queue->value = serialize($queueData); + $queue->save(); + // TODO: remove + $setting = Setting::create(); + $setting->name = date('H:i:s'); + $setting->save(); + $this->callSelf(); } function pause() { + $this->updateQueue('paused'); } function stop() { + $this->updateQueue('stopped'); + } + + function updateQueue($status = false) { + if (!$status) return; + if(!$this->queue || $this->queueData['status'] !== 'started') { + $this->setHeadersAndFlush('not running', true); + } + $queue = $this->queue; + $queueData = $this->queueData; + $queueData['status'] = $status; + $queue->value = serialize($queueData); + $queue->save(); + $this->setHeadersAndFlush($queueData['status'], true); + } + + function setHeadersAndFlush($status, $terminate = false) { + ob_end_clean(); + header('Connection: close'); + header('X-MailPoet-Queue: ' . $status); + ob_end_flush(); + ob_flush(); + flush(); + if($terminate) exit; + } + + function callSelf() { + stream_context_set_default(array('http' => array('method' => 'HEAD'))); + get_headers(home_url() . '/?mailpoet-api§ion=queue&action=process', 1); + exit; + } + + function getQueue() { + $queue = Setting::where('name', 'queue') + ->findOne(); + $queueData = ($queue) ? unserialize($queue->value) : false; + return array( + $queue, + $queueData + ); + } + + function checkRequestMethod() { + $method = $_SERVER['REQUEST_METHOD']; + if($method !== 'HEAD') { + header('HTTP/1.0 405 Method Not Allowed'); + exit; + } } } \ No newline at end of file diff --git a/lib/Queue/Supervisor.php b/lib/Queue/Supervisor.php new file mode 100644 index 0000000000..79b7442c96 --- /dev/null +++ b/lib/Queue/Supervisor.php @@ -0,0 +1,48 @@ +queue, $this->queueData) = $this->getQueue(); + } + + function checkQueue() { + if(!$this->queue) { + $this->startQueue(); + } else { + if($this->queueData['status'] !== 'paused' && + $this->queueData['status'] !== 'stopped' + ) { + return; + } + $currentTime = Carbon::now('UTC'); + $lastUpdateTime = Carbon::createFromFormat('Y-m-d H:i:s', $this->queue->updated_at, 'UTC'); + $timeSinceLastStart = $currentTime->diffInSeconds($lastUpdateTime); + if($timeSinceLastStart < 50) return; + $this->queueData['status'] = 'paused'; + $this->queue->value = serialize($this->queueData); + $this->queue->save(); + $this->startQueue(); + } + } + + function startQueue() { + stream_context_set_default(array('http' => array('method' => 'HEAD'))); + get_headers(home_url() . '/?mailpoet-api§ion=queue&action=start', 1); + } + + function getQueue() { + $queue = Setting::where('name', 'queue') + ->findOne(); + $queueData = ($queue) ? unserialize($queue->value) : false; + return array( + $queue, + $queueData + ); + } +} \ No newline at end of file