- Implements scheduler worker for welcome and post notifications

- Updates sending queue worker to save rendered newsletter body
- Updates sending queue router to schedule post notification newsletters
This commit is contained in:
Vlad
2016-03-20 22:01:01 -04:00
parent ad31b143d2
commit 3f6caf5fa4
8 changed files with 145 additions and 42 deletions

View File

@ -149,8 +149,8 @@ class Hooks {
} }
function setupCronWorkers() { function setupCronWorkers() {
add_action('mailpoet_cron_worker', array($this, 'runSchedulerWorker'), 10, 1); add_action('mailpoet_scheduler_worker', array($this, 'runSchedulerWorker'), 10, 1);
add_action('mailpoet_cron_worker', array($this, 'runSendingQueueWorker'), 10, 1); add_action('mailpoet_queue_worker', array($this, 'runSendingQueueWorker'), 10, 1);
} }
function runSchedulerWorker($timer) { function runSchedulerWorker($timer) {

View File

@ -214,6 +214,9 @@ class Migrator {
$attributes = array( $attributes = array(
'id mediumint(9) NOT NULL AUTO_INCREMENT,', 'id mediumint(9) NOT NULL AUTO_INCREMENT,',
'newsletter_id mediumint(9) NOT NULL,', 'newsletter_id mediumint(9) NOT NULL,',
'newsletter_rendered_body longtext,',
'newsletter_rendered_body_hash varchar(250) NULL DEFAULT NULL,',
'newsletter_rendered_subject varchar(250) NULL DEFAULT NULL,',
'subscribers longtext,', 'subscribers longtext,',
'status varchar(12) NULL DEFAULT NULL,', 'status varchar(12) NULL DEFAULT NULL,',
'priority mediumint(9) NOT NULL DEFAULT 0,', 'priority mediumint(9) NOT NULL DEFAULT 0,',

View File

@ -185,9 +185,9 @@ class Populator {
'newsletter_type' => 'notification', 'newsletter_type' => 'notification',
), ),
array( array(
'name' => 'lastSentData', 'name' => 'segments',
'newsletter_type' => 'notification', 'newsletter_type' => 'notification',
), )
); );
} }

View File

@ -37,7 +37,8 @@ class Daemon {
} }
$this->abortIfStopped($daemon); $this->abortIfStopped($daemon);
try { try {
do_action('mailpoet_cron_worker', $this->timer); do_action('mailpoet_scheduler_worker', $this->timer);
do_action('mailpoet_queue_worker', $this->timer);
} catch(\Exception $e) { } catch(\Exception $e) {
} }
$elapsed_time = microtime(true) - $this->timer; $elapsed_time = microtime(true) - $this->timer;

View File

@ -4,8 +4,11 @@ namespace MailPoet\Cron\Workers;
use Carbon\Carbon; use Carbon\Carbon;
use MailPoet\Cron\CronHelper; use MailPoet\Cron\CronHelper;
use MailPoet\Models\Newsletter; use MailPoet\Models\Newsletter;
use MailPoet\Models\Segment;
use MailPoet\Models\SendingQueue; use MailPoet\Models\SendingQueue;
use MailPoet\Models\SubscriberSegment; use MailPoet\Models\SubscriberSegment;
use MailPoet\Util\Helpers;
use Cron\CronExpression as Cron;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
@ -18,23 +21,29 @@ class Scheduler {
} }
function process() { function process() {
$this->processScheduledNewsletters();
}
function processScheduledQueues() {
$scheduled_queues = SendingQueue::where('status', 'scheduled') $scheduled_queues = SendingQueue::where('status', 'scheduled')
->whereLte('scheduled_at', Carbon::now() ->whereLte('scheduled_at', Carbon::now()->format('Y-m-d H:i:s'))
->format('Y-m-d H:i:s'))
->findMany(); ->findMany();
if(!count($scheduled_queues)) return; if(!count($scheduled_queues)) return;
foreach($scheduled_queues as $queue) { foreach($scheduled_queues as $queue) {
$newsletter = Newsletter::filter('filterWithOptions') $newsletter = Newsletter::filter('filterWithOptions')
->findOne($queue->newsletter_id) ->findOne($queue->newsletter_id);
->asArray(); if(!$newsletter) {
$queue->delete();
} elseif($newsletter->type === 'welcome') {
$this->processWelcomeNewsletter($newsletter, $queue);
} elseif($newsletter->type === 'notification') {
$this->processPostNotificationNewsletter($newsletter, $queue);
}
CronHelper::checkExecutionTimer($this->timer);
}
}
function processWelcomeNewsletter($newsletter, $queue) {
$subscriber = unserialize($queue->subscribers); $subscriber = unserialize($queue->subscribers);
$subscriber_in_segment = $subscriber_in_segment =
SubscriberSegment::where('subscriber_id', $subscriber['to_process'][0]) SubscriberSegment::where('subscriber_id', $subscriber['to_process'][0])
->where('segment_id', $newsletter['segment']) ->where('segment_id', $newsletter->segment)
->findOne(); ->findOne();
if(!$subscriber_in_segment) { if(!$subscriber_in_segment) {
$queue->delete(); $queue->delete();
@ -42,7 +51,37 @@ class Scheduler {
$queue->status = null; $queue->status = null;
$queue->save(); $queue->save();
} }
CronHelper::checkExecutionTimer($this->timer); }
}
function processPostNotificationNewsletter($newsletter, $queue) {
$subscriber_ids = array();
$segments = Segment::whereIn('id', unserialize($newsletter->segments))
->findMany();
foreach($segments as $segment) {
$subscriber_ids = array_merge(
$subscriber_ids,
Helpers::arrayColumn(
$segment->subscribers()->findArray(),
'id'
)
);
}
if(empty($subscriber_ids)) return;
// TODO: check if newsletter contents changed since last time it was sent
$subscriber_ids = array_unique($subscriber_ids);
$queue->subscribers = serialize(
array(
'to_process' => $subscriber_ids
)
);
$queue->count_total = $queue->count_to_process = count($subscriber_ids);
$queue->status = null;
$queue->save();
$new_queue = SendingQueue::create();
$new_queue->newsletter_id = $newsletter->id;
$schedule = Cron::factory($newsletter->schedule);
$new_queue->scheduled_at = $schedule->getNextRunDate()->format('Y-m-d H:i:s');
$new_queue->status = 'scheduled';
$new_queue->save();
} }
} }

View File

@ -10,7 +10,6 @@ use MailPoet\Models\Subscriber;
use MailPoet\Newsletter\Renderer\Renderer; use MailPoet\Newsletter\Renderer\Renderer;
use MailPoet\Newsletter\Shortcodes\Shortcodes; use MailPoet\Newsletter\Shortcodes\Shortcodes;
use MailPoet\Util\Helpers; use MailPoet\Util\Helpers;
use MailPoet\Util\Security;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
@ -37,6 +36,8 @@ class SendingQueue {
if(!$newsletter) { if(!$newsletter) {
continue; continue;
} }
$newsletter = $newsletter->asArray();
$newsletter['body'] = $this->getNewsletterBodyAndSubject($queue, $newsletter);
$queue->subscribers = (object) unserialize($queue->subscribers); $queue->subscribers = (object) unserialize($queue->subscribers);
if(!isset($queue->subscribers->processed)) { if(!isset($queue->subscribers->processed)) {
$queue->subscribers->processed = array(); $queue->subscribers->processed = array();
@ -44,8 +45,6 @@ class SendingQueue {
if(!isset($queue->subscribers->failed)) { if(!isset($queue->subscribers->failed)) {
$queue->subscribers->failed = array(); $queue->subscribers->failed = array();
} }
$newsletter = $newsletter->asArray();
$newsletter['body'] = $this->renderNewsletter($newsletter);
$mailer = $this->configureMailer($newsletter); $mailer = $this->configureMailer($newsletter);
foreach(array_chunk($queue->subscribers->to_process, self::batch_size) as foreach(array_chunk($queue->subscribers->to_process, self::batch_size) as
$subscribers_ids) { $subscribers_ids) {
@ -67,6 +66,20 @@ class SendingQueue {
} }
} }
function getNewsletterBodyAndSubject($queue, $newsletter) {
// check if newsletter has been rendered, in which case return its contents
// or render & and for future use
if($queue->newsletter_rendered_body === null) {
$newsletter['body'] = $this->renderNewsletter($newsletter);
$queue->newsletter_rendered_body = json_encode($newsletter['body']);
$queue->newsletter_rendered_body_hash = md5($newsletter['body']['text']);
$queue->save();
} else {
$newsletter['body'] = json_decode($queue->newsletter_rendered_body);
}
return $newsletter['body'];
}
function processBulkSubscribers($mailer, $newsletter, $subscribers, $queue) { function processBulkSubscribers($mailer, $newsletter, $subscribers, $queue) {
foreach($subscribers as $subscriber) { foreach($subscribers as $subscriber) {
$processed_newsletters[] = $processed_newsletters[] =

View File

@ -24,28 +24,33 @@ class Scheduler {
'#' . $newsletter['nthWeekDay']; '#' . $newsletter['nthWeekDay'];
switch($interval_type) { switch($interval_type) {
case 'immediately': case 'immediately':
$cron = '* * * * *'; $schedule = '* * * * *';
break; break;
case 'immediate': //daily case 'immediate':
$cron = sprintf('0 %s * * *', $hour); case 'daily':
$schedule = sprintf('0 %s * * *', $hour);
break; break;
case 'weekly': case 'weekly':
$cron = sprintf('0 %s * * %s', $hour, $week_day); $schedule = sprintf('0 %s * * %s', $hour, $week_day);
break; break;
case 'monthly': case 'monthly':
$cron = sprintf('0 %s %s * *', $hour, $month_day); $schedule = sprintf('0 %s %s * *', $hour, $month_day);
break; break;
case 'nthWeekDay': case 'nthWeekDay':
$cron = sprintf('0 %s ? * %s%s', $hour, $week_day, $nth_week_day); $schedule = sprintf('0 %s ? * %s%s', $hour, $week_day, $nth_week_day);
break; break;
} }
$option_field = NewsletterOptionField::where('name', 'schedule') $option_field = NewsletterOptionField::where('name', 'schedule')
->findOne() ->findOne()
->asArray(); ->asArray();
$relation = NewsletterOption::where('option_field_id', $option_field['id'])
->findOne();
if(!$relation) {
$relation = NewsletterOption::create(); $relation = NewsletterOption::create();
$relation->newsletter_id = $newsletter['id']; $relation->newsletter_id = $newsletter['id'];
$relation->option_field_id = $option_field['id']; $relation->option_field_id = $option_field['id'];
$relation->value = $cron; }
$relation->value = $schedule;
$relation->save(); $relation->save();
} }

View File

@ -2,14 +2,16 @@
namespace MailPoet\Router; namespace MailPoet\Router;
use MailPoet\Models\Newsletter; use MailPoet\Models\Newsletter;
use MailPoet\Models\NewsletterOption;
use MailPoet\Models\NewsletterOptionField;
use MailPoet\Models\Segment; use MailPoet\Models\Segment;
use MailPoet\Util\Helpers; use MailPoet\Util\Helpers;
use Cron\CronExpression as Cron;
if(!defined('ABSPATH')) exit; if(!defined('ABSPATH')) exit;
class SendingQueue { class SendingQueue {
function add($data) { function add($data) {
// check if mailer is properly configured
try { try {
new Mailer(false); new Mailer(false);
} catch(\Exception $e) { } catch(\Exception $e) {
@ -19,18 +21,58 @@ class SendingQueue {
); );
} }
$queue = \MailPoet\Models\SendingQueue::whereNull('status') $newsletter = Newsletter::filter('filterWithOptions')
->where('newsletter_id', $data['newsletter_id']) ->findOne($data['newsletter_id']);
->findArray(); if(!$newsletter) {
return array(
'result' => false,
'errors' => array(__('Newsletter does not exist.'))
);
}
$queue = \MailPoet\Models\SendingQueue::whereNull('status')
->where('newsletter_id', $newsletter->id)
->findOne();
if(!empty($queue)) { if(!empty($queue)) {
return array( return array(
'result' => false, 'result' => false,
'errors' => array(__('Send operation is already in progress.')) 'errors' => array(__('Send operation is already in progress.'))
); );
} }
if($newsletter->type === 'notification') {
$option_field = NewsletterOptionField::where('name', 'segments')
->where('newsletter_type', 'notification')
->findOne();
$relation = NewsletterOption::where('option_field_id', $option_field->id)
->findOne();
if(!$relation) {
$relation = NewsletterOption::create();
$relation->newsletter_id = $newsletter->id;
$relation->option_field_id = $option_field->id;
}
$relation->value = serialize($data['segments']);
$relation->save();
$queue = \MailPoet\Models\SendingQueue::where('status', 'scheduled')
->where('newsletter_id', $newsletter->id)
->findOne();
if(!$queue) {
$queue = \MailPoet\Models\SendingQueue::create(); $queue = \MailPoet\Models\SendingQueue::create();
$queue->newsletter_id = $data['newsletter_id']; $queue->newsletter_id = $newsletter->id;
}
$schedule = Cron::factory($newsletter->schedule);
$queue->scheduled_at = $schedule->getNextRunDate()->format('Y-m-d H:i:s');
$queue->status = 'scheduled';
$queue->save();
return array(
'result' => true,
'data' => array(__('Newsletter was scheduled for sending.'))
);
}
$queue = \MailPoet\Models\SendingQueue::create();
$queue->newsletter_id = $newsletter->id;
$subscriber_ids = array(); $subscriber_ids = array();
$segments = Segment::whereIn('id', $data['segments']) $segments = Segment::whereIn('id', $data['segments'])
->findMany(); ->findMany();