- Runs mailer and cron execution limit checks at the same and in 3
locations: before processing, after each send operations and after queue processing - Fixes an issue with sending continuing when newsletter is trashed - Updates unit tests
This commit is contained in:
@ -21,13 +21,10 @@ class SendingQueue {
|
|||||||
$this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask();
|
$this->mailer_task = ($mailer_task) ? $mailer_task : new MailerTask();
|
||||||
$this->newsletter_task = ($newsletter_task) ? $newsletter_task : new NewsletterTask();
|
$this->newsletter_task = ($newsletter_task) ? $newsletter_task : new NewsletterTask();
|
||||||
$this->timer = ($timer) ? $timer : microtime(true);
|
$this->timer = ($timer) ? $timer : microtime(true);
|
||||||
// abort if execution limit is reached
|
|
||||||
CronHelper::enforceExecutionLimit($this->timer);
|
|
||||||
// abort if mailing is paused or sending limit has been reached
|
|
||||||
MailerLog::enforceExecutionRequirements();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function process() {
|
function process() {
|
||||||
|
$this->enforceSendingAndExecutionLimits();
|
||||||
foreach(self::getRunningQueues() as $queue) {
|
foreach(self::getRunningQueues() as $queue) {
|
||||||
// get and pre-process newsletter (render, replace shortcodes/links, etc.)
|
// get and pre-process newsletter (render, replace shortcodes/links, etc.)
|
||||||
$newsletter = $this->newsletter_task->getAndPreProcess($queue);
|
$newsletter = $this->newsletter_task->getAndPreProcess($queue);
|
||||||
@ -44,8 +41,6 @@ class SendingQueue {
|
|||||||
self::BATCH_SIZE
|
self::BATCH_SIZE
|
||||||
);
|
);
|
||||||
foreach($subscriber_batches as $subscribers_to_process_ids) {
|
foreach($subscriber_batches as $subscribers_to_process_ids) {
|
||||||
// abort if execution limit is reached
|
|
||||||
CronHelper::enforceExecutionLimit($this->timer);
|
|
||||||
$found_subscribers = SubscriberModel::whereIn('id', $subscribers_to_process_ids)
|
$found_subscribers = SubscriberModel::whereIn('id', $subscribers_to_process_ids)
|
||||||
->whereNull('deleted_at')
|
->whereNull('deleted_at')
|
||||||
->findMany();
|
->findMany();
|
||||||
@ -72,8 +67,7 @@ class SendingQueue {
|
|||||||
if($queue->status === SendingQueueModel::STATUS_COMPLETED) {
|
if($queue->status === SendingQueueModel::STATUS_COMPLETED) {
|
||||||
$this->newsletter_task->markNewsletterAsSent($newsletter);
|
$this->newsletter_task->markNewsletterAsSent($newsletter);
|
||||||
}
|
}
|
||||||
// abort if sending limit has been reached
|
$this->enforceSendingAndExecutionLimits();
|
||||||
MailerLog::enforceExecutionRequirements();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -120,8 +114,6 @@ class SendingQueue {
|
|||||||
$prepared_subscribers_ids = array();
|
$prepared_subscribers_ids = array();
|
||||||
$statistics = array();
|
$statistics = array();
|
||||||
}
|
}
|
||||||
// abort if sending limit has been reached
|
|
||||||
MailerLog::enforceExecutionRequirements();
|
|
||||||
}
|
}
|
||||||
if($processing_method === 'bulk') {
|
if($processing_method === 'bulk') {
|
||||||
$queue = $this->sendNewsletters(
|
$queue = $this->sendNewsletters(
|
||||||
@ -157,13 +149,20 @@ class SendingQueue {
|
|||||||
StatisticsNewslettersModel::createMultiple($statistics);
|
StatisticsNewslettersModel::createMultiple($statistics);
|
||||||
// update the sent count
|
// update the sent count
|
||||||
$this->mailer_task->updateSentCount();
|
$this->mailer_task->updateSentCount();
|
||||||
// abort if sending limit has been reached
|
// enforce execution limits if queue is still being processed
|
||||||
if($queue->count_to_process) {
|
if($queue->status !== SendingQueueModel::STATUS_COMPLETED) {
|
||||||
MailerLog::enforceExecutionRequirements();
|
$this->enforceSendingAndExecutionLimits();
|
||||||
}
|
}
|
||||||
return $queue;
|
return $queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function enforceSendingAndExecutionLimits() {
|
||||||
|
// abort if execution limit is reached
|
||||||
|
CronHelper::enforceExecutionLimit($this->timer);
|
||||||
|
// abort if sending limit has been reached
|
||||||
|
MailerLog::enforceExecutionRequirements();
|
||||||
|
}
|
||||||
|
|
||||||
static function getRunningQueues() {
|
static function getRunningQueues() {
|
||||||
return SendingQueueModel::orderByAsc('priority')
|
return SendingQueueModel::orderByAsc('priority')
|
||||||
->whereNull('deleted_at')
|
->whereNull('deleted_at')
|
||||||
|
@ -21,7 +21,7 @@ class Newsletter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function getAndPreProcess($queue) {
|
function getAndPreProcess($queue) {
|
||||||
$newsletter = $queue->newsletter()->findOne();
|
$newsletter = $queue->newsletter()->whereNull('deleted_at')->findOne();
|
||||||
if(!$newsletter) {
|
if(!$newsletter) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ use Codeception\Util\Fixtures;
|
|||||||
use Codeception\Util\Stub;
|
use Codeception\Util\Stub;
|
||||||
use MailPoet\API\Endpoints\Cron;
|
use MailPoet\API\Endpoints\Cron;
|
||||||
use MailPoet\Config\Populator;
|
use MailPoet\Config\Populator;
|
||||||
use MailPoet\Cron\CronHelper;
|
|
||||||
use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker;
|
use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker;
|
||||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
||||||
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
||||||
@ -56,25 +55,96 @@ class SendingQueueTest extends MailPoetTest {
|
|||||||
expect($sending_queue_worker->timer)->equals($timer);
|
expect($sending_queue_worker->timer)->equals($timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
function testItEnforcesExecutionLimitBeforeStart() {
|
function testItEnforcesExecutionLimitsBeforeQueueProcessing() {
|
||||||
$timer = microtime(true) - CronHelper::DAEMON_EXECUTION_LIMIT;
|
$sending_queue_worker = Stub::make(
|
||||||
|
new SendingQueueWorker(),
|
||||||
|
array(
|
||||||
|
'processQueue' => Stub::never(),
|
||||||
|
'enforceSendingAndExecutionLimits' => Stub::exactly(1, function() {
|
||||||
|
throw new \Exception();
|
||||||
|
})
|
||||||
|
), $this);
|
||||||
|
$sending_queue_worker->__construct();
|
||||||
try {
|
try {
|
||||||
$sending_queue_worker = new SendingQueueWorker($timer);
|
$sending_queue_worker->process();
|
||||||
self::fail('Maximum execution time limit exception was not thrown.');
|
self::fail('Execution limits function was not called.');
|
||||||
} catch(\Exception $e) {
|
} catch(\Exception $e) {
|
||||||
expect($e->getMessage())->equals('Maximum execution time has been reached.');
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function testItEnforcesExecutionLimitDuringProcessing() {
|
function testItEnforcesExecutionLimitsAfterSendingWhenQueueStatusIsNotSetToComplete() {
|
||||||
try {
|
$sending_queue_worker = Stub::make(
|
||||||
$sending_queue_worker = $this->sending_queue_worker;
|
new SendingQueueWorker(),
|
||||||
$sending_queue_worker->timer = microtime(true) - CronHelper::DAEMON_EXECUTION_LIMIT;
|
array(
|
||||||
$sending_queue_worker->process();
|
'enforceSendingAndExecutionLimits' => Stub::exactly(1, function() { })
|
||||||
self::fail('Maximum execution time limit exception was not thrown.');
|
), $this);
|
||||||
} catch(\Exception $e) {
|
$sending_queue_worker->__construct(
|
||||||
expect($e->getMessage())->equals('Maximum execution time has been reached.');
|
$timer = false,
|
||||||
}
|
Stub::make(
|
||||||
|
new MailerTask(),
|
||||||
|
array(
|
||||||
|
'send' => function() { }
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
$sending_queue_worker->sendNewsletters(
|
||||||
|
$this->queue,
|
||||||
|
$prepared_subscribers = array(),
|
||||||
|
$prepared_newsletters = false,
|
||||||
|
$prepared_subscribers = false,
|
||||||
|
$statistics[] = array(
|
||||||
|
'newsletter_id' => 1,
|
||||||
|
'subscriber_id' => 1,
|
||||||
|
'queue_id' => $this->queue->id
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItDoesNotEnforceExecutionLimitsAfterSendingWhenQueueStatusIsSetToComplete() {
|
||||||
|
// when sending is done and there are no more subscribers to process, continue
|
||||||
|
// without enforcing execution limits. this allows the newsletter to be marked as sent
|
||||||
|
// in the process() method and after that execution limits will be enforced
|
||||||
|
$queue = $this->queue;
|
||||||
|
$queue->status = SendingQueue::STATUS_COMPLETED;
|
||||||
|
$sending_queue_worker = Stub::make(
|
||||||
|
new SendingQueueWorker(),
|
||||||
|
array(
|
||||||
|
'enforceSendingAndExecutionLimits' => Stub::never()
|
||||||
|
), $this);
|
||||||
|
$sending_queue_worker->__construct(
|
||||||
|
$timer = false,
|
||||||
|
Stub::make(
|
||||||
|
new MailerTask(),
|
||||||
|
array(
|
||||||
|
'send' => function() { }
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
$sending_queue_worker->sendNewsletters(
|
||||||
|
$queue,
|
||||||
|
$prepared_subscribers = array(),
|
||||||
|
$prepared_newsletters = array(),
|
||||||
|
$prepared_subscribers = array(),
|
||||||
|
$statistics[] = array(
|
||||||
|
'newsletter_id' => 1,
|
||||||
|
'subscriber_id' => 1,
|
||||||
|
'queue_id' => $queue->id
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testItEnforcesExecutionLimitsAfterQueueProcessing() {
|
||||||
|
$sending_queue_worker = Stub::make(
|
||||||
|
new SendingQueueWorker(),
|
||||||
|
array(
|
||||||
|
'processQueue' => function() {
|
||||||
|
// this function returns a queue object
|
||||||
|
return (object)array('status' => null);
|
||||||
|
},
|
||||||
|
'enforceSendingAndExecutionLimits' => Stub::exactly(2, function() { })
|
||||||
|
), $this);
|
||||||
|
$sending_queue_worker->__construct();
|
||||||
|
$sending_queue_worker->process();
|
||||||
}
|
}
|
||||||
|
|
||||||
function testItDeletesQueueWhenNewsletterIsNotFound() {
|
function testItDeletesQueueWhenNewsletterIsNotFound() {
|
||||||
@ -92,6 +162,21 @@ class SendingQueueTest extends MailPoetTest {
|
|||||||
expect($queue)->false(false);
|
expect($queue)->false(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function testItDeletesQueueWhenNewsletterIsTrashed() {
|
||||||
|
// queue exists
|
||||||
|
$queue = SendingQueue::findOne($this->queue->id);
|
||||||
|
expect($queue)->notEquals(false);
|
||||||
|
|
||||||
|
// trash newsletter
|
||||||
|
Newsletter::findOne($this->newsletter->id)
|
||||||
|
->trash();
|
||||||
|
|
||||||
|
// queue no longer exists
|
||||||
|
$this->sending_queue_worker->process();
|
||||||
|
$queue = SendingQueue::findOne($this->queue->id);
|
||||||
|
expect($queue)->false(false);
|
||||||
|
}
|
||||||
|
|
||||||
function testItCanProcessSubscribersOneByOne() {
|
function testItCanProcessSubscribersOneByOne() {
|
||||||
$sending_queue_worker = new SendingQueueWorker(
|
$sending_queue_worker = new SendingQueueWorker(
|
||||||
$timer = false,
|
$timer = false,
|
||||||
|
Reference in New Issue
Block a user