From 84dfa88a1a4da9bed9d9d475fff755adbc2ca475 Mon Sep 17 00:00:00 2001 From: stoletniy Date: Tue, 13 Feb 2018 20:36:01 +0300 Subject: [PATCH] Don't load subscriber IDs in memory when preparing tasks from static segments [MAILPOET-903] --- lib/API/JSON/v1/SendingQueue.php | 6 +- lib/Cron/Workers/Scheduler.php | 10 +- lib/Segments/SubscribersFinder.php | 92 ++++++++++++++++++- tests/unit/Cron/Workers/SchedulerTest.php | 4 +- tests/unit/Segments/SubscribersFinderTest.php | 57 +++++++++++- 5 files changed, 152 insertions(+), 17 deletions(-) diff --git a/lib/API/JSON/v1/SendingQueue.php b/lib/API/JSON/v1/SendingQueue.php index 2b425a4c4a..4610fd70b7 100644 --- a/lib/API/JSON/v1/SendingQueue.php +++ b/lib/API/JSON/v1/SendingQueue.php @@ -78,16 +78,14 @@ class SendingQueue extends APIEndpoint { } else { $segments = $newsletter->segments()->findArray(); $finder = new SubscribersFinder(); - $subscribers = $finder->getSubscribersByList($segments); - $subscribers = Helpers::flattenArray($subscribers); - if(!count($subscribers)) { + $subscribers_count = $finder->addSubscribersToTaskFromSegments($queue->task(), $segments); + if(!$subscribers_count) { return $this->errorResponse(array( APIError::UNKNOWN => __('There are no subscribers in that list!', 'mailpoet') )); } $queue->status = null; $queue->scheduled_at = null; - $queue->setSubscribers($subscribers); // set newsletter status $newsletter->setStatus(Newsletter::STATUS_SENDING); diff --git a/lib/Cron/Workers/Scheduler.php b/lib/Cron/Workers/Scheduler.php index b8aa8a982b..91c24e35cd 100644 --- a/lib/Cron/Workers/Scheduler.php +++ b/lib/Cron/Workers/Scheduler.php @@ -78,10 +78,9 @@ class Scheduler { // ensure that subscribers are in segments $finder = new SubscribersFinder(); - $subscribers = $finder->getSubscribersByList($segments); - $subscribers = Helpers::flattenArray($subscribers); + $subscribers_count = $finder->addSubscribersToTaskFromSegments($queue->task(), $segments); - if(empty($subscribers)) { + if(empty($subscribers_count)) { return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter); } @@ -91,7 +90,6 @@ class Scheduler { // queue newsletter for delivery $queue->newsletter_id = $notification_history->id; - $queue->setSubscribers($subscribers); $queue->status = null; $queue->save(); // update notification status @@ -102,10 +100,8 @@ class Scheduler { function processScheduledStandardNewsletter($newsletter, $queue) { $segments = $newsletter->segments()->findArray(); $finder = new SubscribersFinder(); - $subscribers = $finder->getSubscribersByList($segments); - $subscribers = Helpers::flattenArray($subscribers); + $subscribers_count = $finder->addSubscribersToTaskFromSegments($queue->task(), $segments); // update current queue - $queue->setSubscribers($subscribers); $queue->status = null; $queue->save(); // update newsletter status diff --git a/lib/Segments/SubscribersFinder.php b/lib/Segments/SubscribersFinder.php index 85975f268c..da6cb967dc 100644 --- a/lib/Segments/SubscribersFinder.php +++ b/lib/Segments/SubscribersFinder.php @@ -2,8 +2,11 @@ namespace MailPoet\Segments; +use MailPoet\Models\ScheduledTask; +use MailPoet\Models\ScheduledTaskSubscriber; use MailPoet\Models\Segment; use MailPoet\Models\Subscriber; +use MailPoet\Util\Helpers; use MailPoet\WP\Hooks; class SubscribersFinder { @@ -18,7 +21,7 @@ class SubscribersFinder { } private function findSubscribersInSegment($segment, $subscribers_to_process_ids) { - if($segment['type'] === Segment::TYPE_DEFAULT || $segment['type'] === Segment::TYPE_WP_USERS) { + if($this->isStaticSegment($segment)) { $subscribers = Subscriber::findSubscribersInSegments($subscribers_to_process_ids, array($segment['id']))->findMany(); return Subscriber::extractSubscribersIds($subscribers); } @@ -41,7 +44,7 @@ class SubscribersFinder { } private function getSubscribers($segment) { - if($segment['type'] === Segment::TYPE_DEFAULT || $segment['type'] === Segment::TYPE_WP_USERS) { + if($this->isStaticSegment($segment)) { return Subscriber::getSubscribedInSegments(array($segment['id']))->findArray(); } $finders = Hooks::applyFilters('mailpoet_get_subscribers_in_segment_finders', array()); @@ -54,6 +57,91 @@ class SubscribersFinder { return array(); } + private function isStaticSegment($segment) { + return $segment['type'] === Segment::TYPE_DEFAULT || $segment['type'] === Segment::TYPE_WP_USERS; + } + + function addSubscribersToTaskFromSegments(ScheduledTask $task, array $segments) { + // Prepare subscribers on the DB side for performance reasons + $staticSegments = array(); + $dynamicSegments = array(); + foreach($segments as $segment) { + if($this->isStaticSegment($segment)) { + $staticSegments[] = $segment; + } else { + $dynamicSegments[] = $segment; + } + } + $count = 0; + if(!empty($staticSegments)) { + $count += $this->addSubscribersToTaskFromStaticSegments($task, $staticSegments); + } + if(!empty($dynamicSegments)) { + $count += $this->addSubscribersToTaskFromDynamicSegments($task, $dynamicSegments); + } + return $count; + } + + private function addSubscribersToTaskFromStaticSegments(ScheduledTask $task, array $segments) { + $segment_ids = Helpers::arrayColumn($segments, 'id'); + Subscriber::rawExecute( + 'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . ' + (task_id, subscriber_id, processed) + SELECT DISTINCT ? as task_id, subscribers.`id` as subscriber_id, ? as processed + FROM ' . MP_SUBSCRIBER_SEGMENT_TABLE . ' relation + JOIN ' . MP_SUBSCRIBERS_TABLE . ' subscribers ON subscribers.id = relation.subscriber_id + WHERE subscribers.`deleted_at` IS NULL + AND subscribers.`status` = ? + AND relation.`status` = ? + AND relation.`segment_id` IN (' . join(',', array_map('intval', $segment_ids)) . ')', + array( + $task->id, + ScheduledTaskSubscriber::STATUS_UNPROCESSED, + Subscriber::STATUS_SUBSCRIBED, + Subscriber::STATUS_SUBSCRIBED + ) + ); + return \ORM::get_last_statement()->rowCount(); + } + + private function addSubscribersToTaskFromDynamicSegments(ScheduledTask $task, array $segments) { + $count = 0; + foreach($segments as $segment) { + $count += $this->addSubscribersToTaskFromDynamicSegment($task, $segment); + } + return $count; + } + + private function addSubscribersToTaskFromDynamicSegment(ScheduledTask $task, $segment) { + $finders = Hooks::applyFilters('mailpoet_get_subscribers_in_segment_finders', array()); + $count = 0; + foreach($finders as $finder) { + $subscribers = $finder->getSubscriberIdsInSegment($segment); + if($subscribers) { + $count += $this->addSubscribersToTaskByIds($task, $subscribers); + } + } + return $count; + } + + private function addSubscribersToTaskByIds(ScheduledTask $task, array $subscribers) { + Subscriber::rawExecute( + 'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . ' + (task_id, subscriber_id, processed) + SELECT DISTINCT ? as task_id, subscribers.`id` as subscriber_id, ? as processed + FROM ' . MP_SUBSCRIBERS_TABLE . ' subscribers + WHERE subscribers.`deleted_at` IS NULL + AND subscribers.`status` = ? + AND subscribers.`id` IN (' . join(',', array_map('intval', $subscribers)) . ')', + array( + $task->id, + ScheduledTaskSubscriber::STATUS_UNPROCESSED, + Subscriber::STATUS_SUBSCRIBED + ) + ); + return \ORM::get_last_statement()->rowCount(); + } + private function unique($subscribers) { $result = array(); foreach($subscribers as $subscriber) { diff --git a/tests/unit/Cron/Workers/SchedulerTest.php b/tests/unit/Cron/Workers/SchedulerTest.php index ec00a802da..04d4985674 100644 --- a/tests/unit/Cron/Workers/SchedulerTest.php +++ b/tests/unit/Cron/Workers/SchedulerTest.php @@ -381,7 +381,7 @@ class SchedulerTest extends \MailPoetTest { $updated_newsletter = Newsletter::findOne($newsletter->id); expect($updated_newsletter->status)->equals(Newsletter::STATUS_SENDING); // SubscribersFinder is used for getting subscribers - $finder->verifyInvoked('getSubscribersByList'); + $finder->verifyInvoked('addSubscribersToTaskFromSegments'); } function testItFailsToProcessPostNotificationNewsletterWhenSegmentsDontExist() { @@ -447,7 +447,7 @@ class SchedulerTest extends \MailPoetTest { ->findOne(); expect($updated_notification_history->status)->equals(Newsletter::STATUS_SENDING); // SubscribersFinder is used for getting subscribers - $finder->verifyInvoked('getSubscribersByList'); + $finder->verifyInvoked('addSubscribersToTaskFromSegments'); } function testItFailsToProcessWhenScheduledQueuesNotFound() { diff --git a/tests/unit/Segments/SubscribersFinderTest.php b/tests/unit/Segments/SubscribersFinderTest.php index ceb5d4097a..4e88cb1644 100644 --- a/tests/unit/Segments/SubscribersFinderTest.php +++ b/tests/unit/Segments/SubscribersFinderTest.php @@ -5,14 +5,19 @@ namespace MailPoet\Segments; require_once('FinderMock.php'); use Codeception\Util\Stub; +use MailPoet\Models\ScheduledTask; +use MailPoet\Models\ScheduledTaskSubscriber; use MailPoet\Models\Segment; use MailPoet\Models\Subscriber; use MailPoet\Models\SubscriberSegment; +use MailPoet\Tasks\Sending as SendingTask; use MailPoet\WP\Hooks; class SubscribersFinderTest extends \MailPoetTest { function _before() { + \ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table); + \ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table); \ORM::raw_execute('TRUNCATE ' . Segment::$_table); \ORM::raw_execute('TRUNCATE ' . SubscriberSegment::$_table); \ORM::raw_execute('TRUNCATE ' . Subscriber::$_table); @@ -45,6 +50,7 @@ class SubscribersFinderTest extends \MailPoetTest { )); SubscriberSegment::resubscribeToAllSegments($this->subscriber_2); SubscriberSegment::resubscribeToAllSegments($this->subscriber_3); + $this->sending = SendingTask::create(); } function testGetSubscribersInSegmentDefaultSegment() { @@ -70,7 +76,7 @@ class SubscribersFinderTest extends \MailPoetTest { $mock ->expects($this->once()) ->method('getSubscriberIdsInSegment') - ->will($this->returnValue(array($this->subscriber_1))); + ->will($this->returnValue(array($this->subscriber_1->id))); remove_all_filters('mailpoet_get_subscribers_in_segment_finders'); Hooks::addFilter('mailpoet_get_subscribers_in_segment_finders', function () use ($mock) { @@ -89,7 +95,7 @@ class SubscribersFinderTest extends \MailPoetTest { $mock ->expects($this->exactly(2)) ->method('getSubscriberIdsInSegment') - ->will($this->returnValue(array($this->subscriber_1))); + ->will($this->returnValue(array($this->subscriber_1->id))); remove_all_filters('mailpoet_get_subscribers_in_segment_finders'); Hooks::addFilter('mailpoet_get_subscribers_in_segment_finders', function () use ($mock) { @@ -146,4 +152,51 @@ class SubscribersFinderTest extends \MailPoetTest { expect($subscribers)->count(1); } + function testItAddsSubscribersToTaskFromStaticSegments() { + $finder = new SubscribersFinder(); + $subscribers_count = $finder->addSubscribersToTaskFromSegments( + $this->sending->task(), + array( + array('id' => $this->segment_1->id, 'type' => Segment::TYPE_DEFAULT), + array('id' => $this->segment_2->id, 'type' => Segment::TYPE_DEFAULT), + ) + ); + expect($subscribers_count)->equals(1); + expect($this->sending->getSubscribers())->equals(array($this->subscriber_2->id)); + } + + function testItDoesNotAddSubscribersToTaskFromNoSegment() { + $finder = new SubscribersFinder(); + $subscribers_count = $finder->addSubscribersToTaskFromSegments( + $this->sending->task(), + array( + array('id' => $this->segment_1->id, 'type' => 'UNKNOWN SEGMENT'), + ) + ); + expect($subscribers_count)->equals(0); + } + + function testItAddsSubscribersToTaskFromDynamicSegments() { + $mock = Stub::makeEmpty('MailPoet\Segments\FinderMock', array('getSubscriberIdsInSegment')); + $mock + ->expects($this->once()) + ->method('getSubscriberIdsInSegment') + ->will($this->returnValue(array($this->subscriber_1->id))); + + remove_all_filters('mailpoet_get_subscribers_in_segment_finders'); + Hooks::addFilter('mailpoet_get_subscribers_in_segment_finders', function () use ($mock) { + return array($mock); + }); + + $finder = new SubscribersFinder(); + $subscribers_count = $finder->addSubscribersToTaskFromSegments( + $this->sending->task(), + array( + array('id' => $this->segment_2->id, 'type' => ''), + ) + ); + expect($subscribers_count)->equals(1); + expect($this->sending->getSubscribers())->equals(array($this->subscriber_1->id)); + } + }