- Updates cron/mailer/scheduler code to work with the new sending queue

task scheduler and mailer log
This commit is contained in:
Vlad
2016-07-20 19:12:31 -04:00
parent a5569a6a55
commit 9b9cb1455a
7 changed files with 71 additions and 91 deletions

View File

@ -5,6 +5,7 @@ use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Supervisor; use MailPoet\Cron\Supervisor;
use MailPoet\Cron\Workers\Scheduler as SchedulerWorker; use MailPoet\Cron\Workers\Scheduler as SchedulerWorker;
use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker; use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker;
use MailPoet\Mailer\MailerLog;
use MailPoet\Models\Setting; use MailPoet\Models\Setting;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
@ -45,14 +46,17 @@ class TaskScheduler {
function configureWordpressScheduler() { function configureWordpressScheduler() {
$scheduled_queues = SchedulerWorker::getScheduledQueues(); $scheduled_queues = SchedulerWorker::getScheduledQueues();
$running_queues = SendingQueueWorker::getRunningQueues(); $running_queues = SendingQueueWorker::getRunningQueues();
// run cron only when there are scheduled queues ready to be processed $sending_limit_reached = MailerLog::isSendingLimitReached();
// or are already being processed // run cron only when:
if($scheduled_queues || $running_queues) { // 1) there are scheduled queues ready to be processed
// 2) queues are already being processed
// 3) sending limit has not been reached
if(($scheduled_queues || $running_queues) && !$sending_limit_reached) {
return $this->configureMailpoetScheduler(); return $this->configureMailpoetScheduler();
} }
// stop (delete) daemon since the WP task scheduler is enabled // in all other cases stop (delete) the daemon
$cron_daemon = CronHelper::getDaemon(); $cron_daemon = CronHelper::getDaemon();
if ($cron_daemon) { if($cron_daemon) {
CronHelper::deleteDaemon(); CronHelper::deleteDaemon();
} }
return; return;

View File

@ -81,7 +81,7 @@ class CronHelper {
throw new \Exception(__('Site URL is unreachable.')); throw new \Exception(__('Site URL is unreachable.'));
} }
static function checkExecutionTimer($timer) { static function enforceExecutionLimit($timer) {
$elapsed_time = microtime(true) - $timer; $elapsed_time = microtime(true) - $timer;
if($elapsed_time >= self::DAEMON_EXECUTION_LIMIT) { if($elapsed_time >= self::DAEMON_EXECUTION_LIMIT) {
throw new \Exception(__('Maximum execution time has been reached.')); throw new \Exception(__('Maximum execution time has been reached.'));

View File

@ -20,7 +20,8 @@ class Scheduler {
function __construct($timer = false) { function __construct($timer = false) {
$this->timer = ($timer) ? $timer : microtime(true); $this->timer = ($timer) ? $timer : microtime(true);
CronHelper::checkExecutionTimer($this->timer); // abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
} }
function process() { function process() {
@ -38,7 +39,7 @@ class Scheduler {
} elseif($newsletter->type === 'standard') { } elseif($newsletter->type === 'standard') {
$this->processScheduledStandardNewsletter($newsletter, $queue); $this->processScheduledStandardNewsletter($newsletter, $queue);
} }
CronHelper::checkExecutionTimer($this->timer); CronHelper::enforceExecutionLimit($this->timer);
} }
} }

View File

@ -5,7 +5,7 @@ use MailPoet\Cron\CronHelper;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask; use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask; use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
use MailPoet\Cron\Workers\SendingQueue\Tasks\Subscribers as SubscribersTask; use MailPoet\Cron\Workers\SendingQueue\Tasks\Subscribers as SubscribersTask;
use MailPoet\Models\Newsletter as NewsletterModel; use MailPoet\Cron\Workers\SendingQueue\Tasks\TaskScheduler as TaskSchedulerTask;
use MailPoet\Models\SendingQueue as SendingQueueModel; use MailPoet\Models\SendingQueue as SendingQueueModel;
use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel; use MailPoet\Models\StatisticsNewsletters as StatisticsNewslettersModel;
use MailPoet\Models\Subscriber as SubscriberModel; use MailPoet\Models\Subscriber as SubscriberModel;
@ -23,10 +23,12 @@ class SendingQueue {
$this->mailer_task = new MailerTask(); $this->mailer_task = new MailerTask();
$this->newsletter_task = new NewsletterTask(); $this->newsletter_task = new NewsletterTask();
$this->timer = ($timer) ? $timer : microtime(true); $this->timer = ($timer) ? $timer : microtime(true);
// abort if sending and/or execution limit are reached
$this->mailer_task->enforceSendingLimit();
CronHelper::enforceExecutionLimit($this->timer);
} }
function process() { function process() {
$this->mailer_task->checkSendingLimit();
foreach(self::getRunningQueues() as $queue) { foreach(self::getRunningQueues() as $queue) {
// get and pre-process newsletter (render, replace shortcodes/links, etc.) // get and pre-process newsletter (render, replace shortcodes/links, etc.)
$newsletter = $this->newsletter_task->getAndPreProcess($queue->asArray()); $newsletter = $this->newsletter_task->getAndPreProcess($queue->asArray());
@ -67,8 +69,14 @@ class SendingQueue {
$newsletter, $newsletter,
$found_subscribers $found_subscribers
); );
if($queue->status === SendingQueueModel::STATUS_COMPLETED) {
$this->newsletter_task->markNewsletterAsSent($queue->newsletter_id);
}
// abort if execution limit is reached
CronHelper::enforceExecutionLimit($this->timer);
} }
} }
TaskSchedulerTask::complete();
} }
function processQueue($queue, $newsletter, $subscribers) { function processQueue($queue, $newsletter, $subscribers) {
@ -141,6 +149,7 @@ class SendingQueue {
$prepared_subscribers_ids, $prepared_subscribers_ids,
$queue->subscribers $queue->subscribers
); );
$queue = $this->updateQueue($queue);
} else { } else {
// update processed/to process list // update processed/to process list
$queue->subscribers = SubscribersTask::updateProcessedList( $queue->subscribers = SubscribersTask::updateProcessedList(
@ -152,22 +161,14 @@ class SendingQueue {
// keep track of sent items // keep track of sent items
$this->mailer_task->updateMailerLog(); $this->mailer_task->updateMailerLog();
$subscribers_to_process_count = count($queue->subscribers['to_process']); $subscribers_to_process_count = count($queue->subscribers['to_process']);
$queue = $this->updateQueue($queue);
// check sending limit if there are still subscribers left to send
if(isset($subscribers_to_process_count)) {
$this->mailer_task->enforceSendingLimit();
}
} }
$queue = $this->updateQueue($queue);
if($subscribers_to_process_count) {
$this->mailer_task->checkSendingLimit();
}
CronHelper::checkExecutionTimer($this->timer);
return $queue; return $queue;
} }
function getQueues() {
return SendingQueueModel::orderByDesc('priority')
->whereNull('deleted_at')
->whereNull('status')
->findMany();
}
static function getRunningQueues() { static function getRunningQueues() {
return SendingQueueModel::orderByDesc('priority') return SendingQueueModel::orderByDesc('priority')
->whereNull('deleted_at') ->whereNull('deleted_at')
@ -185,13 +186,6 @@ class SendingQueue {
if(!$queue->count_to_process) { if(!$queue->count_to_process) {
$queue->processed_at = current_time('mysql'); $queue->processed_at = current_time('mysql');
$queue->status = SendingQueueModel::STATUS_COMPLETED; $queue->status = SendingQueueModel::STATUS_COMPLETED;
// if it's a standard or post notificaiton newsletter, update its status to sent
$newsletter = NewsletterModel::findOne($queue->newsletter_id);
if($newsletter->type === NewsletterModel::TYPE_STANDARD ||
$newsletter->type === NewsletterModel::TYPE_NOTIFICATION_HISTORY
) {
$newsletter->setStatus(NewsletterModel::STATUS_SENT);
}
} }
return $queue->save(); return $queue->save();
} }

View File

@ -2,19 +2,17 @@
namespace MailPoet\Cron\Workers\SendingQueue\Tasks; namespace MailPoet\Cron\Workers\SendingQueue\Tasks;
use MailPoet\Mailer\Mailer as MailerFactory; use MailPoet\Mailer\Mailer as MailerFactory;
use MailPoet\Models\Setting; use MailPoet\Mailer\MailerLog;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
class Mailer { class Mailer {
public $mta_config;
public $mta_log;
public $mailer; public $mailer;
public $mailer_log;
function __construct() { function __construct() {
$this->mta_config = $this->getMailerConfig();
$this->mta_log = $this->getMailerLog();
$this->mailer = $this->configureMailer(); $this->mailer = $this->configureMailer();
$this->mailer_log = $this->getMailerLog();
} }
function configureMailer(array $newsletter = null) { function configureMailer(array $newsletter = null) {
@ -40,33 +38,17 @@ class Mailer {
return $this->mailer; return $this->mailer;
} }
function getMailerConfig() {
$mta_config = Setting::getValue('mta');
if(!$mta_config) {
throw new \Exception(__('Mailer is not configured'));
}
return $mta_config;
}
function getMailerLog() { function getMailerLog() {
$mta_log = Setting::getValue('mta_log'); return MailerLog::getMailerLog();
if(!$mta_log) {
$mta_log = array(
'sent' => 0,
'started' => time()
);
Setting::setValue('mta_log', $mta_log);
}
return $mta_log;
} }
function updateMailerLog() { function updateMailerLog() {
$this->mta_log['sent']++; $this->mailer_log['sent']++;
Setting::setValue('mta_log', $this->mta_log); return MailerLog::updateMailerLog($this->mailer_log);
} }
function getProcessingMethod() { function getProcessingMethod() {
return ($this->mta_config['method'] === 'MailPoet') ? return ($this->mailer->mailer['method'] === MailerFactory::METHOD_MAILPOET) ?
'bulk' : 'bulk' :
'individual'; 'individual';
} }
@ -82,22 +64,9 @@ class Mailer {
); );
} }
function checkSendingLimit() { function enforceSendingLimit() {
if($this->mta_config['method'] === 'MailPoet') return; if(MailerLog::isSendingLimitReached()) {
$frequency_interval = (int)$this->mta_config['frequency']['interval'] * 60;
$frequency_limit = (int)$this->mta_config['frequency']['emails'];
$elapsed_time = time() - (int)$this->mta_log['started'];
if($this->mta_log['sent'] === $frequency_limit &&
$elapsed_time <= $frequency_interval
) {
throw new \Exception(__('Sending frequency limit has been reached')); throw new \Exception(__('Sending frequency limit has been reached'));
} }
if($elapsed_time > $frequency_interval) {
$this->mta_log = array(
'sent' => 0,
'started' => time()
);
Setting::setValue('mta_log', $this->mta_log);
}
} }
} }

View File

@ -104,8 +104,8 @@ class Newsletter {
); );
} }
function markNewsletterAsSent($queue_id) { function markNewsletterAsSent($newsletter_id) {
$newsletter = NewsletterModel::findOne($queue_id); $newsletter = NewsletterModel::findOne($newsletter_id);
// if it's a standard newsletter, update its status // if it's a standard newsletter, update its status
if($newsletter->type === NewsletterModel::TYPE_STANDARD) { if($newsletter->type === NewsletterModel::TYPE_STANDARD) {
$newsletter->setStatus(NewsletterModel::STATUS_SENT); $newsletter->setStatus(NewsletterModel::STATUS_SENT);

View File

@ -12,9 +12,18 @@ class Mailer {
public $sender; public $sender;
public $reply_to; public $reply_to;
public $mailer_instance; public $mailer_instance;
const MAILER_CONFIG = 'mta';
const SENDING_LIMIT_INTERVAL_MULTIPLIER = 60;
const METHOD_MAILPOET = 'MailPoet';
const METHOD_MAILGUN = 'MailGun';
const METHOD_ELASTICEMAIL = 'ElasticEmail';
const METHOD_AMAZONSES = 'AmazonSES';
const METHOD_SENDGRID = 'SendGrid';
const METHOD_PHPMAIL = 'PHPMail';
const METHOD_SMTP = 'SMTP';
function __construct($mailer = false, $sender = false, $reply_to = false) { function __construct($mailer = false, $sender = false, $reply_to = false) {
$this->mailer = $this->getMailer($mailer); $this->mailer = self::getMailer($mailer);
$this->sender = $this->getSender($sender); $this->sender = $this->getSender($sender);
$this->reply_to = $this->getReplyTo($reply_to); $this->reply_to = $this->getReplyTo($reply_to);
$this->mailer_instance = $this->buildMailer(); $this->mailer_instance = $this->buildMailer();
@ -27,7 +36,7 @@ class Mailer {
function buildMailer() { function buildMailer() {
switch($this->mailer['method']) { switch($this->mailer['method']) {
case 'AmazonSES': case self::METHOD_AMAZONSES:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->mailer['region'], $this->mailer['region'],
$this->mailer['access_key'], $this->mailer['access_key'],
@ -35,43 +44,43 @@ class Mailer {
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
case 'ElasticEmail': case self::METHOD_ELASTICEMAIL:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->mailer['api_key'], $this->mailer['api_key'],
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
case 'MailGun': case self::METHOD_MAILGUN:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->mailer['domain'], $this->mailer['domain'],
$this->mailer['api_key'], $this->mailer['api_key'],
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
case 'MailPoet': case self::METHOD_MAILPOET:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->mailer['mailpoet_api_key'], $this->mailer['mailpoet_api_key'],
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
case 'SendGrid': case self::METHOD_SENDGRID:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->mailer['api_key'], $this->mailer['api_key'],
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
case 'PHPMail': case self::METHOD_PHPMAIL:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
case 'SMTP': case self::METHOD_SMTP:
$mailer_instance = new $this->mailer['class']( $mailer_instance = new $this->mailer['class'](
$this->mailer['host'], $this->mailer['host'],
$this->mailer['port'], $this->mailer['port'],
@ -82,19 +91,22 @@ class Mailer {
$this->sender, $this->sender,
$this->reply_to $this->reply_to
); );
break; break;
default: default:
throw new \Exception(__('Mailing method does not exist')); throw new \Exception(__('Mailing method does not exist'));
} }
return $mailer_instance; return $mailer_instance;
} }
function getMailer($mailer = false) { static function getMailer($mailer = false) {
if(!$mailer) { if(!$mailer) {
$mailer = Setting::getValue('mta'); $mailer = Setting::getValue(self::MAILER_CONFIG);
if(!$mailer || !isset($mailer['method'])) throw new \Exception(__('Mailer is not configured')); if(!$mailer || !isset($mailer['method'])) throw new \Exception(__('Mailer is not configured'));
} }
$mailer['class'] = 'MailPoet\\Mailer\\Methods\\' . $mailer['method']; $mailer['class'] = 'MailPoet\\Mailer\\Methods\\' . $mailer['method'];
$mailer['frequency_interval'] =
(int)$mailer['frequency']['interval'] * self::SENDING_LIMIT_INTERVAL_MULTIPLIER;
$mailer['frequency_limit'] = (int)$mailer['frequency']['emails'];
return $mailer; return $mailer;
} }