Rotate sending queues

This should prevent sending queues to be stuck when a newsletter cannot be send.

[MAILPOET-1391]
This commit is contained in:
Pavel Dohnal
2018-06-05 11:05:59 +01:00
parent b8ce6ff88c
commit 2c290c6e4c
4 changed files with 42 additions and 2 deletions

View File

@@ -34,6 +34,7 @@ class SendingQueue {
$this->enforceSendingAndExecutionLimits(); $this->enforceSendingAndExecutionLimits();
foreach(self::getRunningQueues() as $queue) { foreach(self::getRunningQueues() as $queue) {
if(!$queue instanceof SendingTask) continue; if(!$queue instanceof SendingTask) continue;
ScheduledTaskModel::touchAllByIds(array($queue->task_id));
$newsletter = $this->newsletter_task->getNewsletterFromQueue($queue); $newsletter = $this->newsletter_task->getNewsletterFromQueue($queue);
if(!$newsletter) { if(!$newsletter) {

View File

@@ -239,7 +239,7 @@ class Sending {
static function getRunningQueues($amount = self::RESULT_BATCH_SIZE) { static function getRunningQueues($amount = self::RESULT_BATCH_SIZE) {
$tasks = ScheduledTask::orderByAsc('priority') $tasks = ScheduledTask::orderByAsc('priority')
->orderByAsc('created_at') ->orderByAsc('updated_at')
->whereNull('deleted_at') ->whereNull('deleted_at')
->whereNull('status') ->whereNull('status')
->where('type', 'sending') ->where('type', 'sending')

View File

@@ -385,6 +385,26 @@ class SendingQueueTest extends \MailPoetTest {
expect($statistics)->notEquals(false); expect($statistics)->notEquals(false);
} }
function testItUpdatesUpdateTime() {
$originalUpdated = Carbon::createFromTimestamp(current_time('timestamp'))->subHours(5)->toDateTimeString();
$this->queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'));
$this->queue->updated_at = $originalUpdated;
$this->queue->save();
$this->newsletter->type = Newsletter::TYPE_WELCOME;
$this->newsletter_segment->delete();
$sending_queue_worker = new SendingQueueWorker(
$timer = false,
Stub::makeEmpty(new MailerTask(), array(), $this)
);
$sending_queue_worker->process();
$newQueue = ScheduledTask::findOne($this->queue->task_id);
expect($newQueue->updated_at)->notEquals($originalUpdated);
}
function testItCanProcessWelcomeNewsletters() { function testItCanProcessWelcomeNewsletters() {
$this->newsletter->type = Newsletter::TYPE_WELCOME; $this->newsletter->type = Newsletter::TYPE_WELCOME;
$this->newsletter_segment->delete(); $this->newsletter_segment->delete();
@@ -631,4 +651,4 @@ class SendingQueueTest extends \MailPoetTest {
\ORM::raw_execute('TRUNCATE ' . NewsletterSegment::$_table); \ORM::raw_execute('TRUNCATE ' . NewsletterSegment::$_table);
\ORM::raw_execute('TRUNCATE ' . StatisticsNewsletters::$_table); \ORM::raw_execute('TRUNCATE ' . StatisticsNewsletters::$_table);
} }
} }

View File

@@ -195,6 +195,25 @@ class SendingTest extends \MailPoetTest {
expect($queues[2]->task_id)->equals($sending2->id()); expect($queues[2]->task_id)->equals($sending2->id());
} }
function testItGetsBatchOfScheduledQueuesSortedByUpdatedTime() {
$this->_after();
$sending1 = $this->createNewSendingTask(['status' => null]);
$sending1->updated_at = '2017-05-04 14:00:00';
$sending1->save();
$sending2 = $this->createNewSendingTask(['status' => null]);
$sending2->updated_at = '2017-05-04 16:00:00';
$sending2->save();
$sending3 = $this->createNewSendingTask(['status' => null]);
$sending3->updated_at = '2017-05-04 15:00:00';
$sending3->save();
$queues = SendingTask::getRunningQueues(3);
expect($queues[0]->task_id)->equals($sending1->id());
expect($queues[1]->task_id)->equals($sending3->id());
expect($queues[2]->task_id)->equals($sending2->id());
}
function createNewNewsletter() { function createNewNewsletter() {
$newsletter = Newsletter::create(); $newsletter = Newsletter::create();
$newsletter->type = Newsletter::TYPE_STANDARD; $newsletter->type = Newsletter::TYPE_STANDARD;