Move query with inserting task subscribers to repository
[MAILPOET-4303]
This commit is contained in:
@@ -9,13 +9,13 @@ use MailPoet\Entities\SubscriberEntity;
|
|||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
use MailPoet\Models\ScheduledTask;
|
||||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Newsletter\Sending\ScheduledTaskSubscribersRepository;
|
||||||
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
||||||
use MailPoet\Services\Bridge;
|
use MailPoet\Services\Bridge;
|
||||||
use MailPoet\Services\Bridge\API;
|
use MailPoet\Services\Bridge\API;
|
||||||
use MailPoet\Settings\SettingsController;
|
use MailPoet\Settings\SettingsController;
|
||||||
use MailPoet\Statistics\StatisticsBouncesRepository;
|
use MailPoet\Statistics\StatisticsBouncesRepository;
|
||||||
use MailPoet\Subscribers\SubscribersRepository;
|
use MailPoet\Subscribers\SubscribersRepository;
|
||||||
use MailPoet\Tasks\Bounce as BounceTask;
|
|
||||||
use MailPoet\Tasks\Subscribers as TaskSubscribers;
|
use MailPoet\Tasks\Subscribers as TaskSubscribers;
|
||||||
use MailPoet\Tasks\Subscribers\BatchIterator;
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
||||||
use MailPoetVendor\Carbon\Carbon;
|
use MailPoetVendor\Carbon\Carbon;
|
||||||
@@ -45,11 +45,15 @@ class Bounce extends SimpleWorker {
|
|||||||
/** @var StatisticsBouncesRepository */
|
/** @var StatisticsBouncesRepository */
|
||||||
private $statisticsBouncesRepository;
|
private $statisticsBouncesRepository;
|
||||||
|
|
||||||
|
/** @var ScheduledTaskSubscribersRepository */
|
||||||
|
private $scheduledTaskSubscribersRepository;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
SettingsController $settings,
|
SettingsController $settings,
|
||||||
SubscribersRepository $subscribersRepository,
|
SubscribersRepository $subscribersRepository,
|
||||||
SendingQueuesRepository $sendingQueuesRepository,
|
SendingQueuesRepository $sendingQueuesRepository,
|
||||||
StatisticsBouncesRepository $statisticsBouncesRepository,
|
StatisticsBouncesRepository $statisticsBouncesRepository,
|
||||||
|
ScheduledTaskSubscribersRepository $scheduledTaskSubscribersRepository,
|
||||||
Bridge $bridge
|
Bridge $bridge
|
||||||
) {
|
) {
|
||||||
$this->settings = $settings;
|
$this->settings = $settings;
|
||||||
@@ -58,6 +62,7 @@ class Bounce extends SimpleWorker {
|
|||||||
$this->subscribersRepository = $subscribersRepository;
|
$this->subscribersRepository = $subscribersRepository;
|
||||||
$this->sendingQueuesRepository = $sendingQueuesRepository;
|
$this->sendingQueuesRepository = $sendingQueuesRepository;
|
||||||
$this->statisticsBouncesRepository = $statisticsBouncesRepository;
|
$this->statisticsBouncesRepository = $statisticsBouncesRepository;
|
||||||
|
$this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function init() {
|
public function init() {
|
||||||
@@ -71,7 +76,7 @@ class Bounce extends SimpleWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
|
public function prepareTaskStrategy(ScheduledTaskEntity $task, $timer) {
|
||||||
BounceTask::prepareSubscribers($task);
|
$this->scheduledTaskSubscribersRepository->createSubscribersForBounceWorker($task);
|
||||||
|
|
||||||
if (!ScheduledTaskSubscriber::getUnprocessedCount($task->getId())) {
|
if (!ScheduledTaskSubscriber::getUnprocessedCount($task->getId())) {
|
||||||
ScheduledTaskSubscriber::where('task_id', $task->getId())->deleteMany();
|
ScheduledTaskSubscriber::where('task_id', $task->getId())->deleteMany();
|
||||||
|
@@ -106,6 +106,25 @@ class ScheduledTaskSubscribersRepository extends Repository {
|
|||||||
$this->checkCompleted($task);
|
$this->checkCompleted($task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function createSubscribersForBounceWorker(ScheduledTaskEntity $scheduledTaskEntity): void {
|
||||||
|
$scheduledTaskSubscribersTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
|
||||||
|
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
|
||||||
|
|
||||||
|
$stmt = $this->entityManager->getConnection()->prepare("
|
||||||
|
INSERT IGNORE INTO " . $scheduledTaskSubscribersTable . "
|
||||||
|
(task_id, subscriber_id, processed)
|
||||||
|
SELECT :taskId AS task_id, s.`id` AS subscriber_id, :unprocessed AS processed
|
||||||
|
FROM " . $subscribersTable . " s
|
||||||
|
WHERE s.`deleted_at` IS NULL
|
||||||
|
AND s.`status` IN (:subscribed, :unconfirmed)
|
||||||
|
");
|
||||||
|
$stmt->bindValue('taskId', $scheduledTaskEntity->getId());
|
||||||
|
$stmt->bindValue('unprocessed', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
|
||||||
|
$stmt->bindValue('subscribed', SubscriberEntity::STATUS_SUBSCRIBED);
|
||||||
|
$stmt->bindValue('unconfirmed', SubscriberEntity::STATUS_UNCONFIRMED);
|
||||||
|
$stmt->executeQuery();
|
||||||
|
}
|
||||||
|
|
||||||
private function checkCompleted(ScheduledTaskEntity $task): void {
|
private function checkCompleted(ScheduledTaskEntity $task): void {
|
||||||
$count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
|
$count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
|
||||||
if ($count === 0) {
|
if ($count === 0) {
|
||||||
|
@@ -1,27 +0,0 @@
|
|||||||
<?php // phpcs:ignore SlevomatCodingStandard.TypeHints.DeclareStrictTypes.DeclareStrictTypesMissing
|
|
||||||
|
|
||||||
namespace MailPoet\Tasks;
|
|
||||||
|
|
||||||
use MailPoet\Entities\ScheduledTaskEntity;
|
|
||||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
|
||||||
use MailPoet\Models\Subscriber;
|
|
||||||
|
|
||||||
class Bounce {
|
|
||||||
public static function prepareSubscribers(ScheduledTaskEntity $task) {
|
|
||||||
// Prepare subscribers on the DB side for performance reasons
|
|
||||||
Subscriber::rawExecute(
|
|
||||||
'INSERT IGNORE INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
|
|
||||||
(task_id, subscriber_id, processed)
|
|
||||||
SELECT ? as task_id, s.`id` as subscriber_id, ? as processed
|
|
||||||
FROM ' . MP_SUBSCRIBERS_TABLE . ' s
|
|
||||||
WHERE s.`deleted_at` IS NULL
|
|
||||||
AND s.`status` IN (?, ?)',
|
|
||||||
[
|
|
||||||
$task->getId(),
|
|
||||||
ScheduledTaskSubscriber::STATUS_UNPROCESSED,
|
|
||||||
Subscriber::STATUS_SUBSCRIBED,
|
|
||||||
Subscriber::STATUS_UNCONFIRMED,
|
|
||||||
]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
@@ -67,6 +67,7 @@ class BounceTest extends \MailPoetTest {
|
|||||||
$this->subscribersRepository,
|
$this->subscribersRepository,
|
||||||
$this->diContainer->get(SendingQueuesRepository::class),
|
$this->diContainer->get(SendingQueuesRepository::class),
|
||||||
$this->diContainer->get(StatisticsBouncesRepository::class),
|
$this->diContainer->get(StatisticsBouncesRepository::class),
|
||||||
|
$this->diContainer->get(ScheduledTaskSubscribersRepository::class),
|
||||||
$this->diContainer->get(Bridge::class)
|
$this->diContainer->get(Bridge::class)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -86,6 +87,7 @@ class BounceTest extends \MailPoetTest {
|
|||||||
$this->subscribersRepository,
|
$this->subscribersRepository,
|
||||||
$this->diContainer->get(SendingQueuesRepository::class),
|
$this->diContainer->get(SendingQueuesRepository::class),
|
||||||
$this->diContainer->get(StatisticsBouncesRepository::class),
|
$this->diContainer->get(StatisticsBouncesRepository::class),
|
||||||
|
$this->diContainer->get(ScheduledTaskSubscribersRepository::class),
|
||||||
$this->diContainer->get(Bridge::class)
|
$this->diContainer->get(Bridge::class)
|
||||||
);
|
);
|
||||||
$worker->init();
|
$worker->init();
|
||||||
|
Reference in New Issue
Block a user