diff --git a/lib/Config/Migrator.php b/lib/Config/Migrator.php index 9e9ad103e3..4dacf1a795 100644 --- a/lib/Config/Migrator.php +++ b/lib/Config/Migrator.php @@ -106,6 +106,7 @@ class Migrator { function sendingQueues() { $attributes = array( 'id mediumint(9) NOT NULL AUTO_INCREMENT,', + 'type varchar(12) NULL DEFAULT NULL,', 'newsletter_id mediumint(9) NOT NULL,', 'newsletter_rendered_body longtext,', 'newsletter_rendered_subject varchar(250) NULL DEFAULT NULL,', diff --git a/lib/Cron/Daemon.php b/lib/Cron/Daemon.php index d8e13389cb..6b11d2ddff 100644 --- a/lib/Cron/Daemon.php +++ b/lib/Cron/Daemon.php @@ -2,6 +2,7 @@ namespace MailPoet\Cron; use MailPoet\Cron\Workers\Scheduler as SchedulerWorker; use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker; +use MailPoet\Cron\Workers\Bounce as BounceWorker; if(!defined('ABSPATH')) exit; require_once(ABSPATH . 'wp-includes/pluggable.php'); @@ -43,6 +44,7 @@ class Daemon { try { $this->executeScheduleWorker(); $this->executeQueueWorker(); + $this->executeBounceWorker(); } catch(\Exception $e) { // continue processing, no need to handle errors } @@ -74,6 +76,11 @@ class Daemon { return $queue->process(); } + function executeBounceWorker() { + $bounce = new BounceWorker($this->timer); + return $bounce->process(); + } + function callSelf() { CronHelper::accessDaemon($this->token, self::REQUEST_TIMEOUT); return $this->terminateRequest(); diff --git a/lib/Cron/Triggers/WordPress.php b/lib/Cron/Triggers/WordPress.php index 5885cfa9fc..b8c5174753 100644 --- a/lib/Cron/Triggers/WordPress.php +++ b/lib/Cron/Triggers/WordPress.php @@ -4,6 +4,7 @@ namespace MailPoet\Cron\Triggers; use MailPoet\Cron\CronHelper; use MailPoet\Cron\Workers\Scheduler as SchedulerWorker; use MailPoet\Cron\Workers\SendingQueue\SendingQueue as SendingQueueWorker; +use MailPoet\Cron\Workers\Bounce as BounceWorker; use MailPoet\Mailer\MailerLog; if(!defined('ABSPATH')) exit; @@ -19,7 +20,11 @@ class WordPress { $scheduled_queues = SchedulerWorker::getScheduledQueues(); $running_queues = SendingQueueWorker::getRunningQueues(); $sending_limit_reached = MailerLog::isSendingLimitReached(); - return (($scheduled_queues || $running_queues) && !$sending_limit_reached); + $bounce_sync_available = BounceWorker::checkBounceSyncAvailable(); + $bounce_due_queues = BounceWorker::getAllDueQueues(); + $bounce_future_queues = BounceWorker::getFutureQueues(); + return (($scheduled_queues || $running_queues) && !$sending_limit_reached) + || ($bounce_sync_available && ($bounce_due_queues || !$bounce_future_queues)); } static function cleanup() { diff --git a/lib/Cron/Workers/Bounce.php b/lib/Cron/Workers/Bounce.php new file mode 100644 index 0000000000..cfa30dff48 --- /dev/null +++ b/lib/Cron/Workers/Bounce.php @@ -0,0 +1,197 @@ +timer = ($timer) ? $timer : microtime(true); + // abort if execution limit is reached + CronHelper::enforceExecutionLimit($this->timer); + } + + static function checkBounceSyncAvailable() { + $mailer_config = Mailer::getMailerConfig(); + return !empty($mailer_config['method']) + && $mailer_config['method'] === Mailer::METHOD_MAILPOET; + } + + function initApi() { + if(!$this->api) { + $mailer_config = Mailer::getMailerConfig(); + $this->api = new Bounce\API($mailer_config['mailpoet_api_key']); + } + } + + function process() { + if(!self::checkBounceSyncAvailable()) { + return false; + } + + $this->initApi(); + + $scheduled_queues = self::getScheduledQueues(); + $running_queues = self::getRunningQueues(); + + if(!$scheduled_queues && !$running_queues) { + self::scheduleBounceSync(); + return false; + } + + foreach($scheduled_queues as $i => $queue) { + $this->prepareBounceQueue($queue); + } + foreach($running_queues as $i => $queue) { + $this->processBounceQueue($queue); + } + + return true; + } + + static function scheduleBounceSync() { + $already_scheduled = SendingQueue::where('type', 'bounce') + ->whereNull('deleted_at') + ->where('status', SendingQueue::STATUS_SCHEDULED) + ->findMany(); + if($already_scheduled) { + return false; + } + $queue = SendingQueue::create(); + $queue->type = 'bounce'; + $queue->status = SendingQueue::STATUS_SCHEDULED; + $queue->priority = SendingQueue::PRIORITY_LOW; + $queue->scheduled_at = self::getNextRunDate(); + $queue->newsletter_id = 0; + $queue->save(); + return $queue; + } + + function prepareBounceQueue(SendingQueue $queue) { + $subscribers = Subscriber::select('id') + ->whereNull('deleted_at') + ->whereIn('status', array( + Subscriber::STATUS_SUBSCRIBED, + Subscriber::STATUS_UNCONFIRMED + )) + ->findArray(); + $subscribers = Helpers::arrayColumn($subscribers, 'id'); + + if(empty($subscribers)) { + $queue->delete(); + return false; + } + + // update current queue + $queue->subscribers = serialize( + array( + 'to_process' => $subscribers + ) + ); + $queue->count_total = $queue->count_to_process = count($subscribers); + $queue->status = null; + $queue->save(); + + // abort if execution limit is reached + CronHelper::enforceExecutionLimit($this->timer); + + return true; + } + + function processBounceQueue(SendingQueue $queue) { + $queue->subscribers = $queue->getSubscribers(); + if(empty($queue->subscribers['to_process'])) { + $queue->delete(); + return false; + } + + $subscriber_batches = array_chunk( + $queue->subscribers['to_process'], + self::BATCH_SIZE + ); + + foreach($subscriber_batches as $subscribers_to_process_ids) { + // abort if execution limit is reached + CronHelper::enforceExecutionLimit($this->timer); + + $subscriber_emails = Subscriber::select('email') + ->whereIn('id', $subscribers_to_process_ids) + ->whereNull('deleted_at') + ->findArray(); + $subscriber_emails = Helpers::arrayColumn($subscriber_emails, 'email'); + + $this->processEmails($subscriber_emails); + + $queue->updateProcessedSubscribers($subscribers_to_process_ids); + } + + return true; + } + + function processEmails(array $subscriber_emails) { + $checked_emails = $this->api->check($subscriber_emails); + $this->processApiResponse((array)$checked_emails); + } + + function processApiResponse(array $checked_emails) { + foreach($checked_emails as $email) { + if(!isset($email['address'], $email['bounce'])) { + continue; + } + if($email['bounce'] === self::BOUNCED_HARD) { + $subscriber = Subscriber::findOne($email['address']); + $subscriber->status = Subscriber::STATUS_BOUNCED; + $subscriber->save(); + } + } + } + + static function getNextRunDate() { + $date = Carbon::createFromTimestamp(current_time('timestamp')); + // Random day of the next week + $date->setISODate($date->format('o'), $date->format('W') + 1, mt_rand(1, 7)); + $date->startOfDay(); + return $date; + } + + static function getScheduledQueues($future = false) { + $dateWhere = ($future) ? 'whereGt' : 'whereLte'; + return SendingQueue::where('type', 'bounce') + ->$dateWhere('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) + ->whereNull('deleted_at') + ->where('status', SendingQueue::STATUS_SCHEDULED) + ->findMany(); + } + + static function getRunningQueues() { + return SendingQueue::where('type', 'bounce') + ->whereLte('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) + ->whereNull('deleted_at') + ->whereNull('status') + ->findMany(); + } + + static function getAllDueQueues() { + $scheduled_queues = self::getScheduledQueues(); + $running_queues = self::getRunningQueues(); + return array_merge((array)$scheduled_queues, (array)$running_queues); + } + + static function getFutureQueues() { + return self::getScheduledQueues(true); + } +} diff --git a/lib/Cron/Workers/Bounce/API.php b/lib/Cron/Workers/Bounce/API.php new file mode 100644 index 0000000000..c36a8fe5d3 --- /dev/null +++ b/lib/Cron/Workers/Bounce/API.php @@ -0,0 +1,41 @@ +api_key = $api_key; + } + + function check(array $emails) { + $result = wp_remote_post( + $this->url, + $this->request($emails) + ); + if(wp_remote_retrieve_response_code($result) === 201) { + return json_decode(wp_remote_retrieve_body($result), true); + } + return false; + } + + private function auth() { + return 'Basic ' . base64_encode('api:' . $this->api_key); + } + + private function request($body) { + return array( + 'timeout' => 10, + 'httpversion' => '1.0', + 'method' => 'POST', + 'headers' => array( + 'Content-Type' => 'application/json', + 'Authorization' => $this->auth() + ), + 'body' => json_encode($body) + ); + } +} diff --git a/lib/Cron/Workers/Scheduler.php b/lib/Cron/Workers/Scheduler.php index bb84a124b8..68fb8c0f33 100644 --- a/lib/Cron/Workers/Scheduler.php +++ b/lib/Cron/Workers/Scheduler.php @@ -187,6 +187,7 @@ class Scheduler { static function getScheduledQueues() { return SendingQueue::where('status', 'scheduled') ->whereLte('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) + ->whereNull('type') ->findMany(); } } \ No newline at end of file diff --git a/lib/Cron/Workers/SendingQueue/SendingQueue.php b/lib/Cron/Workers/SendingQueue/SendingQueue.php index a3fbfa9cb0..19aaba7440 100644 --- a/lib/Cron/Workers/SendingQueue/SendingQueue.php +++ b/lib/Cron/Workers/SendingQueue/SendingQueue.php @@ -167,6 +167,7 @@ class SendingQueue { return SendingQueueModel::orderByAsc('priority') ->whereNull('deleted_at') ->whereNull('status') + ->whereNull('type') ->findMany(); } } \ No newline at end of file diff --git a/tests/unit/Cron/Workers/BounceTest.php b/tests/unit/Cron/Workers/BounceTest.php new file mode 100644 index 0000000000..1aab174fde --- /dev/null +++ b/tests/unit/Cron/Workers/BounceTest.php @@ -0,0 +1,208 @@ +emails = array( + 'soft_bounce@example.com', + 'hard_bounce@example.com', + 'good_address@example.com' + ); + + foreach ($this->emails as $email) { + Subscriber::createOrUpdate(array( + 'status' => Subscriber::STATUS_SUBSCRIBED, + 'email' => $email + )); + } + + $this->bounce = new Bounce(microtime(true)); + + $api = + + $this->bounce->api = new MailPoet\Cron\Workers\Bounce\MockAPI('key'); + } + + function testItConstructs() { + expect($this->bounce->timer)->notEmpty(); + } + + function testItDefinesConstants() { + expect(Bounce::BATCH_SIZE)->equals(100); + } + + function testItChecksIfCurrentSendingMethodIsMailpoet() { + expect(Bounce::checkBounceSyncAvailable())->false(); + $this->setMailPoetSendingMethod(); + expect(Bounce::checkBounceSyncAvailable())->true(); + } + + function testItThrowsExceptionWhenExecutionLimitIsReached() { + try { + $bounce = new Bounce(microtime(true) - CronHelper::DAEMON_EXECUTION_LIMIT); + self::fail('Maximum execution time limit exception was not thrown.'); + } catch(\Exception $e) { + expect($e->getMessage())->equals('Maximum execution time has been reached.'); + } + } + + function testItSchedulesBounceSync() { + expect(SendingQueue::where('type', 'bounce')->findMany())->isEmpty(); + Bounce::scheduleBounceSync(); + expect(SendingQueue::where('type', 'bounce')->findMany())->notEmpty(); + } + + function testItDoesNotScheduleBounceSyncTwice() { + expect(count(SendingQueue::where('type', 'bounce')->findMany()))->equals(0); + Bounce::scheduleBounceSync(); + expect(count(SendingQueue::where('type', 'bounce')->findMany()))->equals(1); + Bounce::scheduleBounceSync(); + expect(count(SendingQueue::where('type', 'bounce')->findMany()))->equals(1); + } + + function testItCanGetScheduledQueues() { + expect(Bounce::getScheduledQueues())->isEmpty(); + $this->createScheduledQueue(); + expect(Bounce::getScheduledQueues())->notEmpty(); + } + + function testItCanGetRunningQueues() { + expect(Bounce::getRunningQueues())->isEmpty(); + $this->createRunningQueue(); + expect(Bounce::getRunningQueues())->notEmpty(); + } + + function testItCanGetAllDueQueues() { + expect(Bounce::getAllDueQueues())->isEmpty(); + + // scheduled for now + $this->createScheduledQueue(); + + // running + $this->createRunningQueue(); + + // scheduled in the future (should not be retrieved) + $queue = $this->createScheduledQueue(); + $queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->addDays(7); + $queue->save(); + + // completed (should not be retrieved) + $queue = $this->createRunningQueue(); + $queue->status = SendingQueue::STATUS_COMPLETED; + $queue->save(); + + expect(count(Bounce::getAllDueQueues()))->equals(2); + } + + function testItCanGetFutureQueues() { + expect(Bounce::getFutureQueues())->isEmpty(); + $queue = $this->createScheduledQueue(); + $queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp'))->addDays(7); + $queue->save(); + expect(count(Bounce::getFutureQueues()))->notEmpty(); + } + + function testItFailsToProcessWithoutMailPoetMethodSetUp() { + expect($this->bounce->process())->false(); + } + + function testItFailsToProcessWithoutQueues() { + $this->setMailPoetSendingMethod(); + expect($this->bounce->process())->false(); + } + + function testItProcesses() { + $this->setMailPoetSendingMethod(); + $this->createScheduledQueue(); + $this->createRunningQueue(); + expect($this->bounce->process())->true(); + } + + function testItPreparesBounceQueue() { + $queue = $this->createScheduledQueue(); + expect(empty($queue->subscribers['to_process']))->true(); + $this->bounce->prepareBounceQueue($queue); + expect($queue->status)->null(); + expect(!empty($queue->subscribers['to_process']))->true(); + } + + function testItProcessesBounceQueue() { + $queue = $this->createRunningQueue(); + $this->bounce->prepareBounceQueue($queue); + expect(!empty($queue->subscribers['to_process']))->true(); + $this->bounce->processBounceQueue($queue); + expect(!empty($queue->subscribers['processed']))->true(); + } + + function testItSetsSubscriberStatusAsBounced() { + $emails = Subscriber::select('email')->findArray(); + $emails = Helpers::arrayColumn($emails, 'email'); + + $this->bounce->processEmails($emails); + + $subscribers = Subscriber::findMany(); + + expect($subscribers[0]->status)->equals(Subscriber::STATUS_SUBSCRIBED); + expect($subscribers[1]->status)->equals(Subscriber::STATUS_BOUNCED); + expect($subscribers[2]->status)->equals(Subscriber::STATUS_SUBSCRIBED); + } + + function testItCalculatesNextRunDateWithinNextWeekBoundaries() { + $current_date = Carbon::createFromTimestamp(current_time('timestamp')); + $next_run_date = Bounce::getNextRunDate(); + $difference = $next_run_date->diffInDays($current_date); + // Subtract days left in the current week + $difference -= (Carbon::DAYS_PER_WEEK - $current_date->format('N')); + expect($difference)->lessOrEquals(7); + expect($difference)->greaterOrEquals(0); + } + + private function setMailPoetSendingMethod() { + Setting::setValue( + Mailer::MAILER_CONFIG_SETTING_NAME, + array( + 'method' => 'MailPoet', + 'mailpoet_api_key' => 'some_key', + ) + ); + } + + private function createScheduledQueue() { + $queue = SendingQueue::create(); + $queue->type = 'bounce'; + $queue->status = SendingQueue::STATUS_SCHEDULED; + $queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); + $queue->newsletter_id = 0; + $queue->save(); + return $queue; + } + + private function createRunningQueue() { + $queue = SendingQueue::create(); + $queue->type = 'bounce'; + $queue->status = null; + $queue->scheduled_at = Carbon::createFromTimestamp(current_time('timestamp')); + $queue->newsletter_id = 0; + $queue->save(); + return $queue; + } + + function _after() { + ORM::raw_execute('TRUNCATE ' . Setting::$_table); + ORM::raw_execute('TRUNCATE ' . SendingQueue::$_table); + ORM::raw_execute('TRUNCATE ' . Subscriber::$_table); + } +} \ No newline at end of file diff --git a/tests/unit/Cron/Workers/BounceTestMockAPI.php b/tests/unit/Cron/Workers/BounceTestMockAPI.php new file mode 100644 index 0000000000..1f1a6ebbbd --- /dev/null +++ b/tests/unit/Cron/Workers/BounceTestMockAPI.php @@ -0,0 +1,18 @@ + $email, + 'bounce' => preg_match('/(hard|soft)/', $email, $m) ? $m[1] : null, + ); + }, + $emails + ); + } +}