Add processing of soft errors

When processing soft error we only save error to particular subscriber and continue sending.

[MAILPOET-1154]
This commit is contained in:
Rostislav Wolny
2018-09-11 18:33:54 +02:00
parent ea4c5f46fb
commit 624c6d900e
9 changed files with 120 additions and 9 deletions

View File

@ -131,6 +131,8 @@ class Migrator {
'task_id int(11) unsigned NOT NULL,', 'task_id int(11) unsigned NOT NULL,',
'subscriber_id int(11) unsigned NOT NULL,', 'subscriber_id int(11) unsigned NOT NULL,',
'processed int(1) NOT NULL,', 'processed int(1) NOT NULL,',
'failed int(1) NOT NULL,',
'error text NULL,',
'created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,', 'created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,',
'PRIMARY KEY (task_id, subscriber_id),', 'PRIMARY KEY (task_id, subscriber_id),',
'KEY subscriber_id (subscriber_id)' 'KEY subscriber_id (subscriber_id)'

View File

@ -3,13 +3,34 @@ namespace MailPoet\Cron\Workers\SendingQueue;
use MailPoet\Mailer\MailerError; use MailPoet\Mailer\MailerError;
use MailPoet\Mailer\MailerLog; use MailPoet\Mailer\MailerLog;
use MailPoet\Tasks\Sending as SendingTask;
class SendingErrorHandler { class SendingErrorHandler {
function processError(MailerError $error) { function processError(
MailerError $error,
SendingTask $sending_task,
array $prepared_subscribers_ids,
array $prepared_subscribers
) {
if($error->getLevel() === MailerError::LEVEL_HARD) {
return $this->processHardError($error);
}
$this->processSoftError($error, $sending_task, $prepared_subscribers_ids, $prepared_subscribers);
}
private function processHardError(MailerError $error) {
if($error->getRetryInterval() !== null) { if($error->getRetryInterval() !== null) {
MailerLog::processNonBlockingError($error->getOperation(), $error->getMessageWithFailedSubscribers(), $error->getRetryInterval()); MailerLog::processNonBlockingError($error->getOperation(), $error->getMessageWithFailedSubscribers(), $error->getRetryInterval());
} else { } else {
MailerLog::processError($error->getOperation(), $error->getMessageWithFailedSubscribers()); MailerLog::processError($error->getOperation(), $error->getMessageWithFailedSubscribers());
} }
} }
private function processSoftError(MailerError $error, SendingTask $sending_task, $prepared_subscribers_ids, $prepared_subscribers) {
foreach($error->getSubscriberErrors() as $subscriber_error) {
$subscriber_id_index = array_search($subscriber_error->getEmail(), $prepared_subscribers);
$message = $subscriber_error->getMessage() ?: $error->getMessage();
$sending_task->saveSubscriberError($prepared_subscribers_ids[$subscriber_id_index], $message);
}
}
} }

View File

@ -167,7 +167,13 @@ class SendingQueue {
$prepared_subscriber, $prepared_subscriber,
$extra_params $extra_params
); );
return $this->processSendResult($sending_task, $send_result, [$prepared_subscriber_id], [$statistics]); return $this->processSendResult(
$sending_task,
$send_result,
[$prepared_subscriber],
[$prepared_subscriber_id],
[$statistics]
);
} }
function sendNewsletters( function sendNewsletters(
@ -180,19 +186,27 @@ class SendingQueue {
$prepared_subscribers, $prepared_subscribers,
$extra_params $extra_params
); );
return $this->processSendResult($sending_task, $send_result, $prepared_subscribers_ids, $statistics); return $this->processSendResult(
$sending_task,
$send_result,
$prepared_subscribers,
$prepared_subscribers_ids,
$statistics
);
} }
private function processSendResult( private function processSendResult(
SendingTask $sending_task, SendingTask $sending_task,
$send_result, $send_result,
array $prepared_subscribers_ids, array $statistics array $prepared_subscribers,
array $prepared_subscribers_ids,
array $statistics
) { ) {
// log error message and schedule retry/pause sending // log error message and schedule retry/pause sending
if($send_result['response'] === false) { if($send_result['response'] === false) {
$error = $send_result['error']; $error = $send_result['error'];
assert($error instanceof MailerError); assert($error instanceof MailerError);
$this->error_handler->processError($error); $this->error_handler->processError($error, $sending_task, $prepared_subscribers_ids, $prepared_subscribers);
} }
// update processed/to process list // update processed/to process list
if(!$sending_task->updateProcessedSubscribers($prepared_subscribers_ids)) { if(!$sending_task->updateProcessedSubscribers($prepared_subscribers_ids)) {

View File

@ -67,7 +67,7 @@ class MailerError {
} }
/** /**
* @return array * @return SubscriberError[]
*/ */
function getSubscriberErrors() { function getSubscriberErrors() {
return $this->subscribers_errors; return $this->subscribers_errors;

View File

@ -7,6 +7,9 @@ class ScheduledTaskSubscriber extends Model {
const STATUS_UNPROCESSED = 0; const STATUS_UNPROCESSED = 0;
const STATUS_PROCESSED = 1; const STATUS_PROCESSED = 1;
const FAIL_STATUS_OK = 0;
const FAIL_STATUS_FAILED = 1;
public static $_table = MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE; public static $_table = MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE;
public static $_id_column = array('task_id', 'subscriber_id'); public static $_id_column = array('task_id', 'subscriber_id');
@ -19,6 +22,7 @@ class ScheduledTaskSubscriber extends Model {
return; return;
} }
$data['processed'] = !empty($data['processed']) ? self::STATUS_PROCESSED : self::STATUS_UNPROCESSED; $data['processed'] = !empty($data['processed']) ? self::STATUS_PROCESSED : self::STATUS_UNPROCESSED;
$data['failed'] = !empty($data['failed']) ? self::FAIL_STATUS_FAILED : self::FAIL_STATUS_OK;
return parent::_createOrUpdate($data, array( return parent::_createOrUpdate($data, array(
'subscriber_id' => $data['subscriber_id'], 'subscriber_id' => $data['subscriber_id'],
'task_id' => $data['task_id'] 'task_id' => $data['task_id']

View File

@ -166,6 +166,11 @@ class Sending {
return $this->updateCount()->getErrors() === false; return $this->updateCount()->getErrors() === false;
} }
public function saveSubscriberError($subcriber_id, $error_message) {
$this->task_subscribers->saveSubscriberError($subcriber_id, $error_message);
return $this->updateCount()->getErrors() === false;
}
function updateCount() { function updateCount() {
$this->queue->count_processed = ScheduledTaskSubscriber::getProcessedCount($this->task->id); $this->queue->count_processed = ScheduledTaskSubscriber::getProcessedCount($this->task->id);
$this->queue->count_to_process = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id); $this->queue->count_to_process = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id);

View File

@ -53,6 +53,15 @@ class Subscribers {
$this->checkCompleted(); $this->checkCompleted();
} }
function saveSubscriberError($subcriber_id, $error_message) {
$this->getSubscribers()
->where('subscriber_id', $subcriber_id)
->findResultSet()
->set('failed', ScheduledTaskSubscriber::FAIL_STATUS_FAILED)
->set('error', $error_message)
->save();
}
private function checkCompleted($count = null) { private function checkCompleted($count = null) {
if(!$count && !ScheduledTaskSubscriber::getUnprocessedCount($this->task->id)) { if(!$count && !ScheduledTaskSubscriber::getUnprocessedCount($this->task->id)) {
$this->task->complete(); $this->task->complete();

View File

@ -0,0 +1,56 @@
<?php
namespace MailPoet\Test\Cron\Workers;
use Codeception\Stub;
use Codeception\Stub\Expected;
use MailPoet\Cron\Workers\SendingQueue\SendingErrorHandler;
use MailPoet\Mailer\MailerError;
use MailPoet\Mailer\SubscriberError;
use MailPoet\Tasks\Sending as SendingTask;
class SendingErrorHandlerTest extends \MailPoetTest {
/** @var SendingErrorHandler */
private $error_handler;
function _before() {
$this->error_handler = new SendingErrorHandler();
}
function testItShouldProcessSoftErrorCorrectly() {
$subscribers = [
'john@doe.com',
'john@rambo.com',
];
$subscriber_ids = [1, 2];
$subscriber_errors = [
new SubscriberError('john@doe.com', 'Subscriber Message'),
new SubscriberError('john@rambo.com', null),
];
$error = new MailerError(
MailerError::OPERATION_SEND,
MailerError::LEVEL_SOFT,
'Error Message',
null, $subscriber_errors
);
$sending_task = Stub::make(
SendingTask::class,
[
'saveSubscriberError' => Expected::exactly(
2,
function($id, $message) {
if($id === 2) {
expect($message)->equals('Error Message');
} else {
expect($message)->equals('Subscriber Message');
}
}
),
],
$this
);
$this->error_handler->processError($error, $sending_task, $subscriber_ids, $subscribers);
}
}

View File

@ -147,8 +147,8 @@ class SendingQueueTest extends \MailPoetTest {
$sending_queue_worker->sendNewsletters( $sending_queue_worker->sendNewsletters(
$this->queue, $this->queue,
$prepared_subscribers = array(), $prepared_subscribers = array(),
$prepared_newsletters = false, $prepared_newsletters = [],
$prepared_subscribers = false, $prepared_subscribers = [],
$statistics[] = array( $statistics[] = array(
'newsletter_id' => 1, 'newsletter_id' => 1,
'subscriber_id' => 1, 'subscriber_id' => 1,
@ -608,7 +608,7 @@ class SendingQueueTest extends \MailPoetTest {
$prepared_subscribers = [], $prepared_subscribers = [],
$prepared_newsletters = [], $prepared_newsletters = [],
$prepared_subscribers = [], $prepared_subscribers = [],
$statistics = false $statistics = []
); );
$this->fail('Paused sending exception was not thrown.'); $this->fail('Paused sending exception was not thrown.');
} catch(\Exception $e) { } catch(\Exception $e) {