Use Doctrine in InactiveSubscribersController

[MAILPOET-3774]
This commit is contained in:
Jan Lysý
2021-09-22 15:53:54 +02:00
committed by Veljko V
parent 93e038371d
commit ddb025c0ec

View File

@@ -1,28 +1,33 @@
<?php
<?php declare(strict_types = 1);
namespace MailPoet\Subscribers;
use MailPoet\Config\MP2Migrator;
use MailPoet\Models\ScheduledTask;
use MailPoet\Models\ScheduledTaskSubscriber;
use MailPoet\Models\SendingQueue;
use MailPoet\Models\StatisticsOpens;
use MailPoet\Models\Subscriber;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
use MailPoet\Entities\SendingQueueEntity;
use MailPoet\Entities\SubscriberEntity;
use MailPoet\Settings\SettingsRepository;
use MailPoetVendor\Carbon\Carbon;
use MailPoetVendor\Idiorm\ORM;
use MailPoetVendor\Doctrine\DBAL\Connection;
use MailPoetVendor\Doctrine\ORM\EntityManager;
class InactiveSubscribersController {
private $inactivesTaskIdsTableCreated = false;
private $inactiveTaskIdsTableCreated = false;
/** @var SettingsRepository */
private $settingsRepository;
/** @var EntityManager */
private $entityManager;
public function __construct(
EntityManager $entityManager,
SettingsRepository $settingsRepository
) {
$this->settingsRepository = $settingsRepository;
$this->entityManager = $entityManager;
}
public function markInactiveSubscribers(int $daysToInactive, int $batchSize, ?int $startId = null) {
@@ -35,15 +40,15 @@ class InactiveSubscribersController {
return $this->activateSubscribers($thresholdDate, $batchSize);
}
/**
* @return void
*/
public function reactivateInactiveSubscribers() {
$reactivateAllInactiveQuery = sprintf(
"UPDATE %s SET status = '%s' WHERE status = '%s';",
Subscriber::$_table, Subscriber::STATUS_SUBSCRIBED, Subscriber::STATUS_INACTIVE
);
ORM::rawExecute($reactivateAllInactiveQuery);
public function reactivateInactiveSubscribers(): void {
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$reactivateAllInactiveQuery = "
UPDATE {$subscribersTable} SET status = :statusSubscribed WHERE status = :statusInactive
";
$this->entityManager->getConnection()->executeQuery($reactivateAllInactiveQuery, [
'statusSubscribed' => SubscriberEntity::STATUS_SUBSCRIBED,
'statusInactive' => SubscriberEntity::STATUS_INACTIVE,
]);
}
private function getThresholdDate(int $daysToInactive): Carbon {
@@ -55,11 +60,11 @@ class InactiveSubscribersController {
* @return int|bool
*/
private function deactivateSubscribers(Carbon $thresholdDate, int $batchSize, ?int $startId = null) {
$subscribersTable = Subscriber::$_table;
$scheduledTasksTable = ScheduledTask::$_table;
$scheduledTaskSubcribresTable = ScheduledTaskSubscriber::$_table;
$statisticsOpensTable = StatisticsOpens::$_table;
$sendingQueuesTable = SendingQueue::$_table;
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$scheduledTasksTable = $this->entityManager->getClassMetadata(ScheduledTaskEntity::class)->getTableName();
$scheduledTaskSubscribersTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
$sendingQueuesTable = $this->entityManager->getClassMetadata(SendingQueueEntity::class)->getTableName();
$connection = $this->entityManager->getConnection();
$thresholdDateIso = $thresholdDate->toDateTimeString();
$dayAgo = new Carbon();
@@ -74,46 +79,59 @@ class InactiveSubscribersController {
// We take into account only emails which have at least one opening tracked
// to ensure that tracking was enabled for the particular email
if (!$this->inactivesTaskIdsTableCreated) {
$inactivesTaskIdsTable = sprintf("
CREATE TEMPORARY TABLE IF NOT EXISTS inactives_task_ids
(INDEX task_id_ids (id))
SELECT DISTINCT task_id as id FROM $sendingQueuesTable as sq
JOIN $scheduledTasksTable as st ON sq.task_id = st.id
WHERE st.processed_at > '%s'
AND st.processed_at < '%s'",
$thresholdDateIso, $dayAgoIso
);
ORM::rawExecute($inactivesTaskIdsTable);
$this->inactivesTaskIdsTableCreated = true;
$inactiveTaskIdsTable = 'inactive_task_ids';
if (!$this->inactiveTaskIdsTableCreated) {
$inactiveTaskIdsTableSql = "
CREATE TEMPORARY TABLE IF NOT EXISTS {$inactiveTaskIdsTable}
(INDEX task_id_ids (id))
SELECT DISTINCT task_id as id FROM {$sendingQueuesTable} as sq
JOIN {$scheduledTasksTable} as st ON sq.task_id = st.id
WHERE st.processed_at > :thresholdDate
AND st.processed_at < :dayAgo
";
$connection->executeQuery($inactiveTaskIdsTableSql, [
'thresholdDate' => $thresholdDateIso,
'dayAgo' => $dayAgoIso,
]);
$this->inactiveTaskIdsTableCreated = true;
}
// Select subscribers who received a recent tracked email but didn't open it
$startId = (int)$startId;
$endId = $startId + $batchSize;
$inactiveSubscriberIdsTmpTable = 'inactive_subscriber_ids';
ORM::rawExecute("
CREATE TEMPORARY TABLE IF NOT EXISTS $inactiveSubscriberIdsTmpTable
$connection->executeQuery("
CREATE TEMPORARY TABLE IF NOT EXISTS {$inactiveSubscriberIdsTmpTable}
(UNIQUE subscriber_id (id))
SELECT DISTINCT s.id FROM $subscribersTable as s
JOIN $scheduledTaskSubcribresTable as sts USE INDEX (subscriber_id) ON s.id = sts.subscriber_id
JOIN inactives_task_ids task_ids ON task_ids.id = sts.task_id
WHERE s.last_subscribed_at < ? AND s.status = ? AND s.id >= ? AND s.id < ?",
[$thresholdDateIso, Subscriber::STATUS_SUBSCRIBED, $startId, $endId]
);
SELECT DISTINCT s.id FROM {$subscribersTable} as s
JOIN {$scheduledTaskSubscribersTable} as sts USE INDEX (subscriber_id) ON s.id = sts.subscriber_id
JOIN {$inactiveTaskIdsTable} task_ids ON task_ids.id = sts.task_id
WHERE s.last_subscribed_at < :thresholdDate
AND s.status = :status
AND s.id >= :startId
AND s.id < :endId
",
[
'thresholdDate' => $thresholdDateIso,
'status' => SubscriberEntity::STATUS_SUBSCRIBED,
'startId' => $startId,
'endId' => $endId,
]);
$idsToDeactivate = ORM::forTable($inactiveSubscriberIdsTmpTable)->rawQuery("
SELECT isi.id FROM $inactiveSubscriberIdsTmpTable isi
LEFT OUTER JOIN $subscribersTable as s ON isi.id = s.id AND GREATEST(
$result = $connection->executeQuery("
SELECT isi.id FROM {$inactiveSubscriberIdsTmpTable} isi
LEFT OUTER JOIN {$subscribersTable} as s ON isi.id = s.id AND GREATEST(
COALESCE(s.last_engagement_at, 0),
COALESCE(s.last_subscribed_at, 0),
COALESCE(s.created_at, 0)
) > ?
WHERE s.id IS NULL",
[$thresholdDateIso]
)->findArray();
) > :thresholdDate
WHERE s.id IS NULL
", [
'thresholdDate' => $thresholdDateIso,
]);
$idsToDeactivate = $result->fetchAllAssociative();
ORM::rawExecute("DROP TABLE $inactiveSubscriberIdsTmpTable");
$connection->executeQuery("DROP TABLE {$inactiveSubscriberIdsTmpTable}");
$idsToDeactivate = array_map(
function ($id) {
@@ -124,39 +142,50 @@ class InactiveSubscribersController {
if (!count($idsToDeactivate)) {
return 0;
}
ORM::rawExecute(sprintf(
"UPDATE %s SET status='" . Subscriber::STATUS_INACTIVE . "' WHERE id IN (%s);",
$subscribersTable,
implode(',', $idsToDeactivate)
));
$connection->executeQuery("UPDATE {$subscribersTable} SET status = :statusInactive WHERE id IN (:idsToDeactivate)", [
'statusInactive' => SubscriberEntity::STATUS_INACTIVE,
'idsToDeactivate' => $idsToDeactivate,
], ['idsToDeactivate' => Connection::PARAM_INT_ARRAY]);
return count($idsToDeactivate);
}
private function activateSubscribers(Carbon $thresholdDate, int $batchSize): int {
$subscribersTable = Subscriber::$_table;
$statsOpensTable = StatisticsOpens::$_table;
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$connection = $this->entityManager->getConnection();
$mp2MigrationDate = $this->getMP2MigrationDate();
if ($mp2MigrationDate && $mp2MigrationDate > $thresholdDate) {
// If MP2 migration occurred during detection interval re-activate all subscribers created before migration
$idsToActivate = ORM::forTable($subscribersTable)->select("$subscribersTable.id")
->whereLt("$subscribersTable.created_at", $mp2MigrationDate)
->where("$subscribersTable.status", Subscriber::STATUS_INACTIVE)
->limit($batchSize)
->findArray();
$idsToActivate = $connection->executeQuery("
SELECT id
FROM {$subscribersTable}
WHERE created_at < :migrationDate
AND status = :statusInactive
LIMIT :batchSize
", [
'migrationDate' => $mp2MigrationDate,
'statusInactive' => SubscriberEntity::STATUS_INACTIVE,
'batchSize' => $batchSize,
], ['batchSize' => \PDO::PARAM_INT])->fetchAllAssociative();
} else {
$idsToActivate = ORM::forTable($subscribersTable)->select("$subscribersTable.id")
->leftOuterJoin($subscribersTable, "$subscribersTable.id = s2.id AND GREATEST(
$idsToActivate = $connection->executeQuery("
SELECT s.id
FROM {$subscribersTable} s
LEFT OUTER JOIN {$subscribersTable} s2 ON s.id = s2.id AND GREATEST(
COALESCE(s2.last_engagement_at, 0),
COALESCE(s2.last_subscribed_at, 0),
COALESCE(s2.created_at, 0)
) > '$thresholdDate'", 's2')
->whereLt("$subscribersTable.last_subscribed_at", $thresholdDate)
->where("$subscribersTable.status", Subscriber::STATUS_INACTIVE)
->whereRaw("s2.id IS NOT NULL")
->limit($batchSize)
->groupByExpr("$subscribersTable.id")
->findArray();
) > :thresholdDate
WHERE s.last_subscribed_at < :thresholdDate
AND s.status = :statusInactive
AND s2.id IS NOT NULL
GROUP BY s.id
LIMIT :batchSize
", [
'thresholdDate' => $thresholdDate,
'statusInactive' => SubscriberEntity::STATUS_INACTIVE,
'batchSize' => $batchSize,
], ['batchSize' => \PDO::PARAM_INT])->fetchAllAssociative();
}
$idsToActivate = array_map(
@@ -167,11 +196,10 @@ class InactiveSubscribersController {
if (!count($idsToActivate)) {
return 0;
}
ORM::rawExecute(sprintf(
"UPDATE %s SET status='" . Subscriber::STATUS_SUBSCRIBED . "' WHERE id IN (%s);",
$subscribersTable,
implode(',', $idsToActivate)
));
$connection->executeQuery("UPDATE {$subscribersTable} SET status = :statusSubscribed WHERE id IN (:idsToActivate)", [
'statusSubscribed' => SubscriberEntity::STATUS_SUBSCRIBED,
'idsToActivate' => $idsToActivate,
], ['idsToActivate' => Connection::PARAM_INT_ARRAY]);
return count($idsToActivate);
}