Implement subscriber filtering in scheduled tasks

If a filterSegmentId is provided, ensure only subscribers who belong to
that segment get included in the scheduled task.

MAILPOET-5509
This commit is contained in:
John Oleksowicz
2023-08-11 15:40:04 -05:00
committed by Aschepikov
parent 3745b17465
commit f52b1343f8
2 changed files with 72 additions and 38 deletions

View File

@ -72,7 +72,7 @@ class SubscribersFinder {
*
* @return float|int
*/
public function addSubscribersToTaskFromSegments(ScheduledTaskEntity $task, array $segmentIds) {
public function addSubscribersToTaskFromSegments(ScheduledTaskEntity $task, array $segmentIds, ?int $filterSegmentId = null) {
// Prepare subscribers on the DB side for performance reasons
$staticSegmentIds = [];
$dynamicSegmentIds = [];
@ -88,10 +88,10 @@ class SubscribersFinder {
}
$count = 0;
if (!empty($staticSegmentIds)) {
$count += $this->addSubscribersToTaskFromStaticSegments($task, $staticSegmentIds);
$count += $this->addSubscribersToTaskFromStaticSegments($task, $staticSegmentIds, $filterSegmentId);
}
if (!empty($dynamicSegmentIds)) {
$count += $this->addSubscribersToTaskFromDynamicSegments($task, $dynamicSegmentIds);
$count += $this->addSubscribersToTaskFromDynamicSegments($task, $dynamicSegmentIds, $filterSegmentId);
}
return $count;
}
@ -102,41 +102,37 @@ class SubscribersFinder {
*
* @return int
*/
private function addSubscribersToTaskFromStaticSegments(ScheduledTaskEntity $task, array $segmentIds) {
$processedStatus = ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED;
$subscribersStatus = SubscriberEntity::STATUS_SUBSCRIBED;
$relationStatus = SubscriberEntity::STATUS_SUBSCRIBED;
private function addSubscribersToTaskFromStaticSegments(ScheduledTaskEntity $task, array $segmentIds, ?int $filterSegmentId = null) {
$scheduledTaskSubscriberTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
$subscriberSegmentTable = $this->entityManager->getClassMetadata(SubscriberSegmentEntity::class)->getTableName();
$subscriberTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$connection = $this->entityManager->getConnection();
$selectQueryBuilder = $connection->createQueryBuilder();
$selectQueryBuilder
->select('DISTINCT :task_id as task_id', 'subscribers.id as subscriber_id', ':processed as processed')
->from($subscriberSegmentTable, 'relation')
->join('relation', $subscriberTable, 'subscribers', 'subscribers.id = relation.subscriber_id')
->where('subscribers.deleted_at IS NULL')
->andWhere('subscribers.status = :subscribers_status')
->andWhere('relation.status = :relation_status')
->andWhere($selectQueryBuilder->expr()->in('relation.segment_id', ':segment_ids'))
->setParameter('task_id', $task->getId(), ParameterType::INTEGER)
->setParameter('processed', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED, ParameterType::INTEGER)
->setParameter('subscribers_status', SubscriberEntity::STATUS_SUBSCRIBED, ParameterType::STRING)
->setParameter('relation_status', SubscriberEntity::STATUS_SUBSCRIBED, ParameterType::STRING)
->setParameter('segment_ids', $segmentIds, Connection::PARAM_INT_ARRAY);
$result = $connection->executeQuery(
"INSERT IGNORE INTO $scheduledTaskSubscriberTable
(task_id, subscriber_id, processed)
SELECT DISTINCT ? as task_id, subscribers.`id` as subscriber_id, ? as processed
FROM $subscriberSegmentTable relation
JOIN $subscriberTable subscribers ON subscribers.id = relation.subscriber_id
WHERE subscribers.`deleted_at` IS NULL
AND subscribers.`status` = ?
AND relation.`status` = ?
AND relation.`segment_id` IN (?)",
[
$task->getId(),
$processedStatus,
$subscribersStatus,
$relationStatus,
$segmentIds,
],
[
ParameterType::INTEGER,
ParameterType::INTEGER,
ParameterType::STRING,
ParameterType::STRING,
Connection::PARAM_INT_ARRAY,
]
);
if ($filterSegmentId) {
$filterSegmentSubscriberIds = $this->segmentSubscriberRepository->findSubscribersIdsInSegment($filterSegmentId);
$selectQueryBuilder
->andWhere($selectQueryBuilder->expr()->in('subscribers.id', ':filterSegmentSubscriberIds'))
->setParameter('filterSegmentSubscriberIds', $filterSegmentSubscriberIds, Connection::PARAM_INT_ARRAY);
}
// queryBuilder doesn't support INSERT IGNORE directly
$sql = "INSERT IGNORE INTO $scheduledTaskSubscriberTable (task_id, subscriber_id, processed) " . $selectQueryBuilder->getSQL();
$result = $connection->executeQuery($sql, $selectQueryBuilder->getParameters(), $selectQueryBuilder->getParameterTypes());
return (int)$result->rowCount();
}
@ -147,19 +143,25 @@ class SubscribersFinder {
*
* @return int
*/
private function addSubscribersToTaskFromDynamicSegments(ScheduledTaskEntity $task, array $segmentIds) {
private function addSubscribersToTaskFromDynamicSegments(ScheduledTaskEntity $task, array $segmentIds, ?int $filterSegmentId = null) {
$count = 0;
foreach ($segmentIds as $segmentId) {
$count += $this->addSubscribersToTaskFromDynamicSegment($task, (int)$segmentId);
$count += $this->addSubscribersToTaskFromDynamicSegment($task, (int)$segmentId, $filterSegmentId);
}
return $count;
}
private function addSubscribersToTaskFromDynamicSegment(ScheduledTaskEntity $task, int $segmentId) {
private function addSubscribersToTaskFromDynamicSegment(ScheduledTaskEntity $task, int $segmentId, ?int $filterSegmentId) {
$count = 0;
$subscribers = $this->segmentSubscriberRepository->getSubscriberIdsInSegment($segmentId);
if ($subscribers) {
$count += $this->addSubscribersToTaskByIds($task, $subscribers);
$subscriberIds = $this->segmentSubscriberRepository->getSubscriberIdsInSegment($segmentId);
if ($filterSegmentId) {
$filterSegmentSubscriberIds = $this->segmentSubscriberRepository->getSubscriberIdsInSegment($filterSegmentId);
$subscriberIds = array_intersect($subscriberIds, $filterSegmentSubscriberIds);
}
if ($subscriberIds) {
$count += $this->addSubscribersToTaskByIds($task, $subscriberIds);
}
return $count;
}

View File

@ -164,6 +164,38 @@ class SubscribersFinderTest extends \MailPoetTest {
expect($subscribersIds)->equals([$this->subscriber2->getId()]);
}
public function testItDoesNotAddSubscribersToTaskIfFilteredOutByFilterSegment(): void {
$staticSegment = (new SegmentFactory())->withType(SegmentEntity::TYPE_DEFAULT)->create();
$dynamicSegment = (new DynamicSegment())->withEngagementScoreFilter(0, 'higherThan')->create();
(new SubscriberFactory())->withEngagementScore(50)->withSegments([$staticSegment])->create();
(new SubscriberFactory())->withEngagementScore(30)->withSegments([$staticSegment])->create();
(new SubscriberFactory())->withEngagementScore(60)->create();
(new SubscriberFactory())->withEngagementScore(20)->create();
$filterSegment = (new DynamicSegment())->withEngagementScoreFilter(40, 'higherThan')->create();
$this->assertIsInt($staticSegment->getId());
$this->assertIsInt($dynamicSegment->getId());
// Without filtering
$task = (new ScheduledTaskFactory())->create(SendingTask::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED, new Carbon());
$staticCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($task, [$staticSegment->getId()]);
expect($staticCount)->equals(2);
$task = (new ScheduledTaskFactory())->create(SendingTask::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED, new Carbon());
$dynamicCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($task, [$dynamicSegment->getId()]);
expect($dynamicCount)->equals(4);
// With filtering
$task = (new ScheduledTaskFactory())->create(SendingTask::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED, new Carbon());
$staticCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($task, [$staticSegment->getId()], $filterSegment->getId());
expect($staticCount)->equals(1);
$task = (new ScheduledTaskFactory())->create(SendingTask::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED, new Carbon());
$dynamicCount = $this->subscribersFinder->addSubscribersToTaskFromSegments($task, [$dynamicSegment->getId()], $filterSegment->getId());
expect($dynamicCount)->equals(2);
}
public function testItCanFilterSubscribersBasedOnDynamicSegment(): void {
$subscriber1 = (new SubscriberFactory())->withEngagementScore(50)->withSegments([$this->segment1])->create();
$subscriber2 = (new SubscriberFactory())->withEngagementScore(30)->withSegments([$this->segment1])->create();