Don't load subscriber IDs in memory when preparing tasks from static segments [MAILPOET-903]

This commit is contained in:
stoletniy
2018-02-13 20:36:01 +03:00
parent 4c0f5bb456
commit 84dfa88a1a
5 changed files with 152 additions and 17 deletions

View File

@ -78,16 +78,14 @@ class SendingQueue extends APIEndpoint {
} else {
$segments = $newsletter->segments()->findArray();
$finder = new SubscribersFinder();
$subscribers = $finder->getSubscribersByList($segments);
$subscribers = Helpers::flattenArray($subscribers);
if(!count($subscribers)) {
$subscribers_count = $finder->addSubscribersToTaskFromSegments($queue->task(), $segments);
if(!$subscribers_count) {
return $this->errorResponse(array(
APIError::UNKNOWN => __('There are no subscribers in that list!', 'mailpoet')
));
}
$queue->status = null;
$queue->scheduled_at = null;
$queue->setSubscribers($subscribers);
// set newsletter status
$newsletter->setStatus(Newsletter::STATUS_SENDING);

View File

@ -78,10 +78,9 @@ class Scheduler {
// ensure that subscribers are in segments
$finder = new SubscribersFinder();
$subscribers = $finder->getSubscribersByList($segments);
$subscribers = Helpers::flattenArray($subscribers);
$subscribers_count = $finder->addSubscribersToTaskFromSegments($queue->task(), $segments);
if(empty($subscribers)) {
if(empty($subscribers_count)) {
return $this->deleteQueueOrUpdateNextRunDate($queue, $newsletter);
}
@ -91,7 +90,6 @@ class Scheduler {
// queue newsletter for delivery
$queue->newsletter_id = $notification_history->id;
$queue->setSubscribers($subscribers);
$queue->status = null;
$queue->save();
// update notification status
@ -102,10 +100,8 @@ class Scheduler {
function processScheduledStandardNewsletter($newsletter, $queue) {
$segments = $newsletter->segments()->findArray();
$finder = new SubscribersFinder();
$subscribers = $finder->getSubscribersByList($segments);
$subscribers = Helpers::flattenArray($subscribers);
$subscribers_count = $finder->addSubscribersToTaskFromSegments($queue->task(), $segments);
// update current queue
$queue->setSubscribers($subscribers);
$queue->status = null;
$queue->save();
// update newsletter status

View File

@ -2,8 +2,11 @@
namespace MailPoet\Segments;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\Segment;
use MailPoet\Models\Subscriber;
use MailPoet\Util\Helpers;
use MailPoet\WP\Hooks;
class SubscribersFinder {
@ -18,7 +21,7 @@ class SubscribersFinder {
}
private function findSubscribersInSegment($segment, $subscribers_to_process_ids) {
if($segment['type'] === Segment::TYPE_DEFAULT || $segment['type'] === Segment::TYPE_WP_USERS) {
if($this->isStaticSegment($segment)) {
$subscribers = Subscriber::findSubscribersInSegments($subscribers_to_process_ids, array($segment['id']))->findMany();
return Subscriber::extractSubscribersIds($subscribers);
}
@ -41,7 +44,7 @@ class SubscribersFinder {
}
private function getSubscribers($segment) {
if($segment['type'] === Segment::TYPE_DEFAULT || $segment['type'] === Segment::TYPE_WP_USERS) {
if($this->isStaticSegment($segment)) {
return Subscriber::getSubscribedInSegments(array($segment['id']))->findArray();
}
$finders = Hooks::applyFilters('mailpoet_get_subscribers_in_segment_finders', array());
@ -54,6 +57,91 @@ class SubscribersFinder {
return array();
}
private function isStaticSegment($segment) {
return $segment['type'] === Segment::TYPE_DEFAULT || $segment['type'] === Segment::TYPE_WP_USERS;
}
function addSubscribersToTaskFromSegments(ScheduledTask $task, array $segments) {
// Prepare subscribers on the DB side for performance reasons
$staticSegments = array();
$dynamicSegments = array();
foreach($segments as $segment) {
if($this->isStaticSegment($segment)) {
$staticSegments[] = $segment;
} else {
$dynamicSegments[] = $segment;
}
}
$count = 0;
if(!empty($staticSegments)) {
$count += $this->addSubscribersToTaskFromStaticSegments($task, $staticSegments);
}
if(!empty($dynamicSegments)) {
$count += $this->addSubscribersToTaskFromDynamicSegments($task, $dynamicSegments);
}
return $count;
}
private function addSubscribersToTaskFromStaticSegments(ScheduledTask $task, array $segments) {
$segment_ids = Helpers::arrayColumn($segments, 'id');
Subscriber::rawExecute(
'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
(task_id, subscriber_id, processed)
SELECT DISTINCT ? as task_id, subscribers.`id` as subscriber_id, ? as processed
FROM ' . MP_SUBSCRIBER_SEGMENT_TABLE . ' relation
JOIN ' . MP_SUBSCRIBERS_TABLE . ' subscribers ON subscribers.id = relation.subscriber_id
WHERE subscribers.`deleted_at` IS NULL
AND subscribers.`status` = ?
AND relation.`status` = ?
AND relation.`segment_id` IN (' . join(',', array_map('intval', $segment_ids)) . ')',
array(
$task->id,
ScheduledTaskSubscriber::STATUS_UNPROCESSED,
Subscriber::STATUS_SUBSCRIBED,
Subscriber::STATUS_SUBSCRIBED
)
);
return \ORM::get_last_statement()->rowCount();
}
private function addSubscribersToTaskFromDynamicSegments(ScheduledTask $task, array $segments) {
$count = 0;
foreach($segments as $segment) {
$count += $this->addSubscribersToTaskFromDynamicSegment($task, $segment);
}
return $count;
}
private function addSubscribersToTaskFromDynamicSegment(ScheduledTask $task, $segment) {
$finders = Hooks::applyFilters('mailpoet_get_subscribers_in_segment_finders', array());
$count = 0;
foreach($finders as $finder) {
$subscribers = $finder->getSubscriberIdsInSegment($segment);
if($subscribers) {
$count += $this->addSubscribersToTaskByIds($task, $subscribers);
}
}
return $count;
}
private function addSubscribersToTaskByIds(ScheduledTask $task, array $subscribers) {
Subscriber::rawExecute(
'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
(task_id, subscriber_id, processed)
SELECT DISTINCT ? as task_id, subscribers.`id` as subscriber_id, ? as processed
FROM ' . MP_SUBSCRIBERS_TABLE . ' subscribers
WHERE subscribers.`deleted_at` IS NULL
AND subscribers.`status` = ?
AND subscribers.`id` IN (' . join(',', array_map('intval', $subscribers)) . ')',
array(
$task->id,
ScheduledTaskSubscriber::STATUS_UNPROCESSED,
Subscriber::STATUS_SUBSCRIBED
)
);
return \ORM::get_last_statement()->rowCount();
}
private function unique($subscribers) {
$result = array();
foreach($subscribers as $subscriber) {

View File

@ -381,7 +381,7 @@ class SchedulerTest extends \MailPoetTest {
$updated_newsletter = Newsletter::findOne($newsletter->id);
expect($updated_newsletter->status)->equals(Newsletter::STATUS_SENDING);
// SubscribersFinder is used for getting subscribers
$finder->verifyInvoked('getSubscribersByList');
$finder->verifyInvoked('addSubscribersToTaskFromSegments');
}
function testItFailsToProcessPostNotificationNewsletterWhenSegmentsDontExist() {
@ -447,7 +447,7 @@ class SchedulerTest extends \MailPoetTest {
->findOne();
expect($updated_notification_history->status)->equals(Newsletter::STATUS_SENDING);
// SubscribersFinder is used for getting subscribers
$finder->verifyInvoked('getSubscribersByList');
$finder->verifyInvoked('addSubscribersToTaskFromSegments');
}
function testItFailsToProcessWhenScheduledQueuesNotFound() {

View File

@ -5,14 +5,19 @@ namespace MailPoet\Segments;
require_once('FinderMock.php');
use Codeception\Util\Stub;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\Segment;
use MailPoet\Models\Subscriber;
use MailPoet\Models\SubscriberSegment;
use MailPoet\Tasks\Sending as SendingTask;
use MailPoet\WP\Hooks;
class SubscribersFinderTest extends \MailPoetTest {
function _before() {
\ORM::raw_execute('TRUNCATE ' . ScheduledTask::$_table);
\ORM::raw_execute('TRUNCATE ' . ScheduledTaskSubscriber::$_table);
\ORM::raw_execute('TRUNCATE ' . Segment::$_table);
\ORM::raw_execute('TRUNCATE ' . SubscriberSegment::$_table);
\ORM::raw_execute('TRUNCATE ' . Subscriber::$_table);
@ -45,6 +50,7 @@ class SubscribersFinderTest extends \MailPoetTest {
));
SubscriberSegment::resubscribeToAllSegments($this->subscriber_2);
SubscriberSegment::resubscribeToAllSegments($this->subscriber_3);
$this->sending = SendingTask::create();
}
function testGetSubscribersInSegmentDefaultSegment() {
@ -70,7 +76,7 @@ class SubscribersFinderTest extends \MailPoetTest {
$mock
->expects($this->once())
->method('getSubscriberIdsInSegment')
->will($this->returnValue(array($this->subscriber_1)));
->will($this->returnValue(array($this->subscriber_1->id)));
remove_all_filters('mailpoet_get_subscribers_in_segment_finders');
Hooks::addFilter('mailpoet_get_subscribers_in_segment_finders', function () use ($mock) {
@ -89,7 +95,7 @@ class SubscribersFinderTest extends \MailPoetTest {
$mock
->expects($this->exactly(2))
->method('getSubscriberIdsInSegment')
->will($this->returnValue(array($this->subscriber_1)));
->will($this->returnValue(array($this->subscriber_1->id)));
remove_all_filters('mailpoet_get_subscribers_in_segment_finders');
Hooks::addFilter('mailpoet_get_subscribers_in_segment_finders', function () use ($mock) {
@ -146,4 +152,51 @@ class SubscribersFinderTest extends \MailPoetTest {
expect($subscribers)->count(1);
}
function testItAddsSubscribersToTaskFromStaticSegments() {
$finder = new SubscribersFinder();
$subscribers_count = $finder->addSubscribersToTaskFromSegments(
$this->sending->task(),
array(
array('id' => $this->segment_1->id, 'type' => Segment::TYPE_DEFAULT),
array('id' => $this->segment_2->id, 'type' => Segment::TYPE_DEFAULT),
)
);
expect($subscribers_count)->equals(1);
expect($this->sending->getSubscribers())->equals(array($this->subscriber_2->id));
}
function testItDoesNotAddSubscribersToTaskFromNoSegment() {
$finder = new SubscribersFinder();
$subscribers_count = $finder->addSubscribersToTaskFromSegments(
$this->sending->task(),
array(
array('id' => $this->segment_1->id, 'type' => 'UNKNOWN SEGMENT'),
)
);
expect($subscribers_count)->equals(0);
}
function testItAddsSubscribersToTaskFromDynamicSegments() {
$mock = Stub::makeEmpty('MailPoet\Segments\FinderMock', array('getSubscriberIdsInSegment'));
$mock
->expects($this->once())
->method('getSubscriberIdsInSegment')
->will($this->returnValue(array($this->subscriber_1->id)));
remove_all_filters('mailpoet_get_subscribers_in_segment_finders');
Hooks::addFilter('mailpoet_get_subscribers_in_segment_finders', function () use ($mock) {
return array($mock);
});
$finder = new SubscribersFinder();
$subscribers_count = $finder->addSubscribersToTaskFromSegments(
$this->sending->task(),
array(
array('id' => $this->segment_2->id, 'type' => ''),
)
);
expect($subscribers_count)->equals(1);
expect($this->sending->getSubscribers())->equals(array($this->subscriber_1->id));
}
}