Delete max 10000 rows from scheduled task subscribers in cleanup.

DB may crash when deleting too many rows in one query.
The scheduled_task_subscribers has the potential to contain many records.
This commit adds limit when cleaning up inconsistent data from scheduled task subscribers.
[MAILPOET-6155]
This commit is contained in:
Rostislav Wolny
2024-08-02 14:41:17 +02:00
committed by Aschepikov
parent 4733bde082
commit 5a7462a152
2 changed files with 46 additions and 9 deletions

View File

@@ -12,11 +12,15 @@ use MailPoet\Entities\SegmentEntity;
use MailPoet\Entities\SendingQueueEntity; use MailPoet\Entities\SendingQueueEntity;
use MailPoet\Entities\SubscriberEntity; use MailPoet\Entities\SubscriberEntity;
use MailPoet\Entities\SubscriberSegmentEntity; use MailPoet\Entities\SubscriberSegmentEntity;
use MailPoetVendor\Doctrine\DBAL\ArrayParameterType;
use MailPoetVendor\Doctrine\DBAL\ParameterType;
use MailPoetVendor\Doctrine\ORM\EntityManager; use MailPoetVendor\Doctrine\ORM\EntityManager;
use MailPoetVendor\Doctrine\ORM\Query; use MailPoetVendor\Doctrine\ORM\Query;
use MailPoetVendor\Doctrine\ORM\QueryBuilder; use MailPoetVendor\Doctrine\ORM\QueryBuilder;
class DataInconsistencyRepository { class DataInconsistencyRepository {
const DELETE_ROWS_LIMIT = 10000;
private EntityManager $entityManager; private EntityManager $entityManager;
public function __construct( public function __construct(
@@ -115,6 +119,19 @@ class DataInconsistencyRepository {
->execute(); ->execute();
// delete the scheduled tasks subscribers // delete the scheduled tasks subscribers
$stsTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
$this->entityManager->getConnection()->executeStatement(
"DELETE sts_top FROM $stsTable sts_top
JOIN (
SELECT sts.`task_id`, sts.`subscriber_id` FROM $stsTable sts
WHERE `task_id` IN (:ids)
LIMIT :limit
) as to_delete ON sts_top.`task_id` = to_delete.`task_id` AND sts_top.`subscriber_id` = to_delete.`subscriber_id`",
['limit' => self::DELETE_ROWS_LIMIT, 'ids' => $ids],
['limit' => ParameterType::INTEGER, 'ids' => ArrayParameterType::INTEGER]
);
$qb = $this->entityManager->createQueryBuilder(); $qb = $this->entityManager->createQueryBuilder();
$qb->delete(ScheduledTaskSubscriberEntity::class, 'sts') $qb->delete(ScheduledTaskSubscriberEntity::class, 'sts')
->where($qb->expr()->in('sts.task', ':ids')) ->where($qb->expr()->in('sts.task', ':ids'))
@@ -128,11 +145,21 @@ class DataInconsistencyRepository {
public function cleanupOrphanedScheduledTaskSubscribers(): int { public function cleanupOrphanedScheduledTaskSubscribers(): int {
$stTable = $this->entityManager->getClassMetadata(ScheduledTaskEntity::class)->getTableName(); $stTable = $this->entityManager->getClassMetadata(ScheduledTaskEntity::class)->getTableName();
$stsTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName(); $stsTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
return (int)$this->entityManager->getConnection()->executeStatement(" $deletedCount = 0;
DELETE sts FROM $stsTable sts do {
LEFT JOIN $stTable st ON st.`id` = sts.`task_id` $deletedCount += (int)$this->entityManager->getConnection()->executeStatement(
WHERE st.`id` IS NULL "DELETE sts_top FROM $stsTable sts_top
"); JOIN (
SELECT sts.`task_id`, sts.`subscriber_id` FROM $stsTable sts
LEFT JOIN $stTable st ON st.`id` = sts.`task_id`
WHERE st.`id` IS NULL
LIMIT :limit
) as to_delete ON sts_top.`task_id` = to_delete.`task_id` AND sts_top.`subscriber_id` = to_delete.`subscriber_id`",
['limit' => self::DELETE_ROWS_LIMIT],
['limit' => ParameterType::INTEGER]
);
} while ($this->getOrphanedScheduledTasksSubscribersCount() > 0);
return $deletedCount;
} }
public function cleanupSendingQueuesWithoutNewsletter(): int { public function cleanupSendingQueuesWithoutNewsletter(): int {

View File

@@ -67,19 +67,29 @@ class DataInconsistencyRepositoryTest extends \MailPoetTest {
} }
public function testItHandlesOrphanedScheduledTaskSubscribers(): void { public function testItHandlesOrphanedScheduledTaskSubscribers(): void {
$taskWithSubscriber = (new ScheduledTask())->create(SendingQueueWorker::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED); $taskToDelete = (new ScheduledTask())->create(SendingQueueWorker::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED);
$taskToKeep = (new ScheduledTask())->create(SendingQueueWorker::TASK_TYPE, ScheduledTaskEntity::STATUS_SCHEDULED);
$subscriber1 = (new Subscriber())->create(); $subscriber1 = (new Subscriber())->create();
(new ScheduledTaskSubscriber())->createProcessed($taskWithSubscriber, $subscriber1); (new ScheduledTaskSubscriber())->createProcessed($taskToDelete, $subscriber1);
(new ScheduledTaskSubscriber())->createProcessed($taskToKeep, $subscriber1);
$subscriber2 = (new Subscriber())->create(); $subscriber2 = (new Subscriber())->create();
(new ScheduledTaskSubscriber())->createProcessed($taskWithSubscriber, $subscriber2); (new ScheduledTaskSubscriber())->createProcessed($taskToDelete, $subscriber2);
(new ScheduledTaskSubscriber())->createProcessed($taskToKeep, $subscriber2);
$this->entityManager->remove($taskWithSubscriber); $this->entityManager->remove($taskToDelete);
$this->entityManager->flush(); $this->entityManager->flush();
$taskSubscriberCount = $this->entityManager->getRepository(ScheduledTaskSubscriberEntity::class)->count([]);
verify($taskSubscriberCount)->equals(4);
verify($this->repository->getOrphanedScheduledTasksSubscribersCount())->equals(2); verify($this->repository->getOrphanedScheduledTasksSubscribersCount())->equals(2);
$this->repository->cleanupOrphanedScheduledTaskSubscribers(); $this->repository->cleanupOrphanedScheduledTaskSubscribers();
verify($this->repository->getOrphanedScheduledTasksSubscribersCount())->equals(0); verify($this->repository->getOrphanedScheduledTasksSubscribersCount())->equals(0);
// We keep the task and subscriber that was associated with the task we kept
$taskSubscriberCount = $this->entityManager->getRepository(ScheduledTaskSubscriberEntity::class)->count([]);
verify($taskSubscriberCount)->equals(2);
} }
public function testItHandlesSendingQueuesWithoutNewsletter(): void { public function testItHandlesSendingQueuesWithoutNewsletter(): void {