Add SimpleWorker to calculate Subscriber email_counts
[MAILPOET-4177]
This commit is contained in:
@ -67,6 +67,7 @@ class Daemon {
|
||||
yield $this->workersFactory->createBounceWorker();
|
||||
yield $this->workersFactory->createExportFilesCleanupWorker();
|
||||
yield $this->workersFactory->createBeamerkWorker();
|
||||
yield $this->workersFactory->createSubscribersEmailCountsWorker();
|
||||
yield $this->workersFactory->createInactiveSubscribersWorker();
|
||||
yield $this->workersFactory->createUnsubscribeTokensWorker();
|
||||
yield $this->workersFactory->createWooCommerceSyncWorker();
|
||||
|
@ -19,6 +19,7 @@ use MailPoet\Cron\Workers\StatsNotifications\AutomatedEmails;
|
||||
use MailPoet\Cron\Workers\StatsNotifications\Worker as StatsNotificationsWorker;
|
||||
use MailPoet\Cron\Workers\SubscriberLinkTokens;
|
||||
use MailPoet\Cron\Workers\SubscribersCountCacheRecalculation;
|
||||
use MailPoet\Cron\Workers\SubscribersEmailCount;
|
||||
use MailPoet\Cron\Workers\SubscribersEngagementScore;
|
||||
use MailPoet\Cron\Workers\SubscribersLastEngagement;
|
||||
use MailPoet\Cron\Workers\SubscribersStatsReport;
|
||||
@ -179,6 +180,12 @@ class WordPress {
|
||||
'scheduled_in' => [self::SCHEDULED_IN_THE_PAST],
|
||||
'status' => ['null', ScheduledTask::STATUS_SCHEDULED],
|
||||
]);
|
||||
// subscribers emails count
|
||||
$subscribersEmailsCount = $this->getTasksCount([
|
||||
'type' => SubscribersEmailCount::TASK_TYPE,
|
||||
'scheduled_in' => [self::SCHEDULED_IN_THE_PAST],
|
||||
'status' => ['null', ScheduledTask::STATUS_SCHEDULED],
|
||||
]);
|
||||
// inactive subscribers check
|
||||
$inactiveSubscribersTasks = $this->getTasksCount([
|
||||
'type' => InactiveSubscribers::TASK_TYPE,
|
||||
@ -282,6 +289,7 @@ class WordPress {
|
||||
|| $subscribersStatsReportActive
|
||||
|| $statsNotificationsTasks
|
||||
|| $autoStatsNotificationsTasks
|
||||
|| $subscribersEmailsCount
|
||||
|| $inactiveSubscribersTasks
|
||||
|| $wooCommerceSyncTasks
|
||||
|| $authorizedEmailAddressesTasks
|
||||
|
70
mailpoet/lib/Cron/Workers/SubscribersEmailCount.php
Normal file
70
mailpoet/lib/Cron/Workers/SubscribersEmailCount.php
Normal file
@ -0,0 +1,70 @@
|
||||
<?php declare(strict_types = 1);
|
||||
|
||||
namespace MailPoet\Cron\Workers;
|
||||
|
||||
use MailPoet\Entities\ScheduledTaskEntity;
|
||||
use MailPoet\Entities\SubscriberEntity;
|
||||
use MailPoet\Subscribers\SubscribersEmailCountsController;
|
||||
use MailPoetVendor\Doctrine\ORM\EntityManager;
|
||||
|
||||
class SubscribersEmailCount extends SimpleWorker {
|
||||
const TASK_TYPE = 'subscribers_email_count';
|
||||
const BATCH_SIZE = 1000;
|
||||
const SUPPORT_MULTIPLE_INSTANCES = false;
|
||||
|
||||
/** @var SubscribersEmailCountsController */
|
||||
private $subscribersEmailCountsController;
|
||||
|
||||
/** @var EntityManager */
|
||||
private $entityManager;
|
||||
|
||||
public function __construct(
|
||||
SubscribersEmailCountsController $subscribersEmailCountsController,
|
||||
EntityManager $entityManager
|
||||
) {
|
||||
$this->subscribersEmailCountsController = $subscribersEmailCountsController;
|
||||
$this->entityManager = $entityManager;
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
public function processTaskStrategy(ScheduledTaskEntity $task, $timer) {
|
||||
$previousTask = $this->findPreviousTask($task);
|
||||
$dateFromLastRun = null;
|
||||
if ($previousTask instanceof ScheduledTaskEntity) {
|
||||
$dateFromLastRun = $previousTask->getScheduledAt();
|
||||
}
|
||||
|
||||
$meta = $task->getMeta();
|
||||
$lastSubscriberId = $meta['last_subscriber_id'] ?? 0;
|
||||
$highestSubscriberId = $meta['highest_subscriber_id'] ?? $this->getHighestSubscriberId();
|
||||
$meta['highest_subscriber_id'] = $highestSubscriberId;
|
||||
$task->setMeta($meta);
|
||||
|
||||
while ($lastSubscriberId <= $highestSubscriberId) {
|
||||
[$count, $lastSubscriberId] = $this->subscribersEmailCountsController->updateSubscribersEmailCounts($dateFromLastRun, self::BATCH_SIZE, $lastSubscriberId);
|
||||
if ($count === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
$meta['last_subscriber_id'] = $lastSubscriberId++;
|
||||
$task->setMeta($meta);
|
||||
$this->scheduledTasksRepository->persist($task);
|
||||
$this->scheduledTasksRepository->flush();
|
||||
$this->cronHelper->enforceExecutionLimit($timer);
|
||||
};
|
||||
|
||||
$this->schedule();
|
||||
return true;
|
||||
}
|
||||
|
||||
private function findPreviousTask(ScheduledTaskEntity $task): ?ScheduledTaskEntity {
|
||||
return $this->scheduledTasksRepository->findPreviousTask($task);
|
||||
}
|
||||
|
||||
private function getHighestSubscriberId(): int {
|
||||
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
|
||||
$result = $this->entityManager->getConnection()->executeQuery("SELECT MAX(id) FROM $subscribersTable LIMIT 1;")->fetchNumeric();
|
||||
/** @var int[] $result - it's required for PHPStan */
|
||||
return is_array($result) && isset($result[0]) ? (int)$result[0] : 0;
|
||||
}
|
||||
}
|
@ -132,4 +132,9 @@ class WorkersFactory {
|
||||
public function createNewsletterTemplateThumbnailsWorker() {
|
||||
return $this->container->get(NewsletterTemplateThumbnails::class);
|
||||
}
|
||||
|
||||
/** @return SubscribersEmailCount */
|
||||
public function createSubscribersEmailCountsWorker() {
|
||||
return $this->container->get(SubscribersEmailCount::class);
|
||||
}
|
||||
}
|
||||
|
@ -203,6 +203,7 @@ class ContainerConfigurator implements IContainerConfigurator {
|
||||
$container->autowire(\MailPoet\Cron\Workers\WooCommerceSync::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\ExportFilesCleanup::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\Beamer::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\SubscribersEmailCount::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\InactiveSubscribers::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\UnsubscribeTokens::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Cron\Workers\SubscriberLinkTokens::class)->setPublic(true);
|
||||
@ -302,6 +303,7 @@ class ContainerConfigurator implements IContainerConfigurator {
|
||||
$container->autowire(\MailPoet\Subscribers\ConfirmationEmailMailer::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Subscribers\RequiredCustomFieldValidator::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Subscribers\SubscriberActions::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Subscribers\SubscribersEmailCountsController::class);
|
||||
$container->autowire(\MailPoet\Subscribers\InactiveSubscribersController::class);
|
||||
$container->autowire(\MailPoet\Subscribers\LinkTokens::class)->setPublic(true);
|
||||
$container->autowire(\MailPoet\Subscribers\SubscribersRepository::class)->setPublic(true);
|
||||
|
@ -296,6 +296,7 @@ class DaemonHttpRunnerTest extends \MailPoetTest {
|
||||
'createMigrationWorker' => $worker,
|
||||
'createWooCommerceSyncWorker' => $worker,
|
||||
'createExportFilesCleanupWorker' => $worker,
|
||||
'createSubscribersEmailCountsWorker' => $worker,
|
||||
'createInactiveSubscribersWorker' => $worker,
|
||||
'createAuthorizedSendingEmailsCheckWorker' => $worker,
|
||||
'createWooCommercePastOrdersWorker' => $worker,
|
||||
|
@ -52,6 +52,7 @@ class DaemonTest extends \MailPoetTest {
|
||||
'createMigrationWorker' => $this->createSimpleWorkerMock(),
|
||||
'createWooCommerceSyncWorker' => $this->createSimpleWorkerMock(),
|
||||
'createExportFilesCleanupWorker' => $this->createSimpleWorkerMock(),
|
||||
'createSubscribersEmailCountsWorker' => $this->createSimpleWorkerMock(),
|
||||
'createInactiveSubscribersWorker' => $this->createSimpleWorkerMock(),
|
||||
'createAuthorizedSendingEmailsCheckWorker' => $this->createSimpleWorkerMock(),
|
||||
'createWooCommercePastOrdersWorker' => $this->createSimpleWorkerMock(),
|
||||
|
@ -0,0 +1,178 @@
|
||||
<?php declare(strict_types=1);
|
||||
|
||||
namespace MailPoet\Cron\Workers;
|
||||
|
||||
use MailPoet\Entities\NewsletterEntity;
|
||||
use MailPoet\Entities\ScheduledTaskEntity;
|
||||
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||
use MailPoet\Entities\SendingQueueEntity;
|
||||
use MailPoet\Entities\SubscriberEntity;
|
||||
use MailPoet\Models\ScheduledTask;
|
||||
use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
|
||||
use MailPoet\Subscribers\SubscribersRepository;
|
||||
use MailPoet\Tasks\Sending;
|
||||
use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory;
|
||||
use MailPoet\WP\Functions as WPFunctions;
|
||||
use MailPoetVendor\Carbon\Carbon;
|
||||
|
||||
class SubscribersLifetimeEmailCountTest extends \MailPoetTest {
|
||||
|
||||
/** @var SubscribersEmailCount */
|
||||
private $worker;
|
||||
|
||||
/** @var SubscribersRepository */
|
||||
private $subscribersRepository;
|
||||
|
||||
/** @var ScheduledTaskFactory */
|
||||
private $scheduledTaskFactory;
|
||||
|
||||
/** @var NewsletterEntity */
|
||||
private $newsletter;
|
||||
|
||||
/** @var ScheduledTasksRepository */
|
||||
private $scheduledTasksRepository;
|
||||
|
||||
public function _before() {
|
||||
parent::_before();
|
||||
$this->worker = $this->diContainer->get(SubscribersEmailCount::class);
|
||||
$this->subscribersRepository = $this->diContainer->get(SubscribersRepository::class);
|
||||
$this->scheduledTaskFactory = new ScheduledTaskFactory();
|
||||
$this->scheduledTasksRepository = $this->diContainer->get(ScheduledTasksRepository::class);
|
||||
$this->truncateEntity(SubscriberEntity::class);
|
||||
$this->truncateEntity(ScheduledTaskEntity::class);
|
||||
$this->truncateEntity(ScheduledTaskSubscriberEntity::class);
|
||||
$this->truncateEntity(SendingQueueEntity::class);
|
||||
$this->truncateEntity(NewsletterEntity::class);
|
||||
$this->newsletter = new NewsletterEntity();
|
||||
$this->newsletter->setSubject('Subject');
|
||||
$this->newsletter->setType(NewsletterEntity::TYPE_STANDARD);
|
||||
$this->newsletter->setStatus(NewsletterEntity::STATUS_SENT);
|
||||
$this->entityManager->persist($this->newsletter);
|
||||
$this->entityManager->flush();
|
||||
}
|
||||
|
||||
public function testItCalculatesTotalSubscribersEmailCountsOnFirstRun() {
|
||||
$subscriber1 = $this->createSubscriber('s1@email.com', 100);
|
||||
$this->createCompletedSendingTasksForSubscriber($subscriber1, 80, 90);
|
||||
$subscriber2 = $this->createSubscriber('s2@email.com', 90);
|
||||
$this->createCompletedSendingTasksForSubscriber($subscriber2, 8, 80);
|
||||
|
||||
$this->worker->processTaskStrategy(new ScheduledTaskEntity(), microtime(true));
|
||||
|
||||
$this->entityManager->clear();
|
||||
$subscriber1 = $this->subscribersRepository->findOneById($subscriber1->getId());
|
||||
assert($subscriber1 instanceof SubscriberEntity);
|
||||
expect($subscriber1->getEmailsCount())->equals(80);
|
||||
$subscriber2 = $this->subscribersRepository->findOneById($subscriber2->getId());
|
||||
assert($subscriber2 instanceof SubscriberEntity);
|
||||
expect($subscriber2->getEmailsCount())->equals(8);
|
||||
}
|
||||
|
||||
public function testItUpdatesSubscribersEmailCountsAfterFirstRun() {
|
||||
$subscriber1 = $this->createSubscriber('s1@email.com', 100, SubscriberEntity::STATUS_SUBSCRIBED, 80);
|
||||
|
||||
// create previous completed task
|
||||
$previousEmailCountsTask = $this->createRunningTask();
|
||||
$previousEmailCountsTask->setStatus(ScheduledTask::STATUS_COMPLETED);
|
||||
$previousEmailCountsTask->setCreatedAt(Carbon::now()->subDays(2));
|
||||
$previousEmailCountsTask->setScheduledAt(Carbon::now()->subDays(1));
|
||||
$previousEmailCountsTask->setUpdatedAt(Carbon::now()->subDays(1));
|
||||
$this->entityManager->persist($previousEmailCountsTask);
|
||||
$this->entityManager->flush();
|
||||
|
||||
// Emails to be added on next run
|
||||
$this->createCompletedSendingTasksForSubscriber($subscriber1, 1, 2);
|
||||
|
||||
$this->worker->processTaskStrategy($this->createRunningTask(), microtime(true));
|
||||
|
||||
$this->entityManager->clear();
|
||||
$subscriber1 = $this->subscribersRepository->findOneById($subscriber1->getId());
|
||||
assert($subscriber1 instanceof SubscriberEntity);
|
||||
expect($subscriber1->getEmailsCount())->equals(81);
|
||||
|
||||
}
|
||||
|
||||
public function testItSchedulesNextRunWhenFinished() {
|
||||
$this->worker->processTaskStrategy(new ScheduledTaskEntity(), microtime(true));
|
||||
|
||||
$task = $this->scheduledTasksRepository->findOneBy(
|
||||
['type' => SubscribersEmailCount::TASK_TYPE, 'status' => ScheduledTaskEntity::STATUS_SCHEDULED]
|
||||
);
|
||||
|
||||
assert($task instanceof ScheduledTaskEntity);
|
||||
expect($task)->isInstanceOf(ScheduledTaskEntity::class);
|
||||
expect($task->getScheduledAt())->greaterThan(new Carbon());
|
||||
}
|
||||
|
||||
private function createRunningTask(): ScheduledTaskEntity {
|
||||
return $this->scheduledTaskFactory->create(
|
||||
SubscribersEmailCount::TASK_TYPE,
|
||||
null,
|
||||
Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'))
|
||||
);
|
||||
}
|
||||
|
||||
private function createSubscriber(
|
||||
string $email,
|
||||
int $createdDaysAgo = 0,
|
||||
string $status = SubscriberEntity::STATUS_SUBSCRIBED,
|
||||
int $emailCounts = 0
|
||||
): SubscriberEntity {
|
||||
$createdAt = (new Carbon())->subDays($createdDaysAgo);
|
||||
$subscriber = new SubscriberEntity();
|
||||
$subscriber->setEmail($email);
|
||||
$subscriber->setStatus($status);
|
||||
$subscriber->setCreatedAt($createdAt);
|
||||
$subscriber->setEmailsCount($emailCounts);
|
||||
$this->entityManager->persist($subscriber);
|
||||
// we need to set lastSubscribeAt after persist due to LastSubscribedAtListener
|
||||
$subscriber->setLastSubscribedAt($createdAt);
|
||||
$this->entityManager->flush();
|
||||
return $subscriber;
|
||||
}
|
||||
|
||||
private function createCompletedSendingTasksForSubscriber(SubscriberEntity $subscriber, int $numTasks = 1, int $processedDaysAgo = 0): void {
|
||||
for ($i = 0; $i < $numTasks; $i++) {
|
||||
[$task] = $this->createCompletedSendingTask($processedDaysAgo);
|
||||
$this->addSubscriberToTask($subscriber, $task);
|
||||
}
|
||||
}
|
||||
|
||||
private function createCompletedSendingTask(int $processedDaysAgo = 0): array {
|
||||
$processedAt = (new Carbon())->subDays($processedDaysAgo)->addHours(2);
|
||||
$task = new ScheduledTaskEntity();
|
||||
$task->setType(Sending::TASK_TYPE);
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
|
||||
$task->setCreatedAt($processedAt);
|
||||
$task->setProcessedAt($processedAt);
|
||||
$this->entityManager->persist($task);
|
||||
$this->entityManager->flush();
|
||||
$queue = new SendingQueueEntity();
|
||||
$queue->setTask($task);
|
||||
$queue->setNewsletter($this->newsletter);
|
||||
$this->entityManager->persist($queue);
|
||||
$this->entityManager->flush();
|
||||
return [$task, $queue];
|
||||
}
|
||||
|
||||
private function addSubscriberToTask(
|
||||
SubscriberEntity $subscriber,
|
||||
ScheduledTaskEntity $task,
|
||||
int $daysAgo = 0
|
||||
): ScheduledTaskSubscriberEntity {
|
||||
$createdAt = (new Carbon())->subDays($daysAgo);
|
||||
$taskSubscriber = new ScheduledTaskSubscriberEntity($task, $subscriber);
|
||||
$taskSubscriber->setCreatedAt($createdAt);
|
||||
$this->entityManager->persist($taskSubscriber);
|
||||
$this->entityManager->flush();
|
||||
return $taskSubscriber;
|
||||
}
|
||||
|
||||
public function _after(): void {
|
||||
$this->truncateEntity(SubscriberEntity::class);
|
||||
$this->truncateEntity(ScheduledTaskEntity::class);
|
||||
$this->truncateEntity(ScheduledTaskSubscriberEntity::class);
|
||||
$this->truncateEntity(SendingQueueEntity::class);
|
||||
$this->truncateEntity(NewsletterEntity::class);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user