Process queues to save statistics
[MAILPOET-3765]
This commit is contained in:
@@ -2,11 +2,13 @@
|
|||||||
|
|
||||||
namespace MailPoet\Cron\Workers;
|
namespace MailPoet\Cron\Workers;
|
||||||
|
|
||||||
|
use MailPoet\Entities\ScheduledTaskEntity;
|
||||||
use MailPoet\Entities\SubscriberEntity;
|
use MailPoet\Entities\SubscriberEntity;
|
||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
use MailPoet\Models\ScheduledTask;
|
||||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
use MailPoet\Models\Subscriber;
|
use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
|
||||||
|
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
||||||
use MailPoet\Services\Bridge;
|
use MailPoet\Services\Bridge;
|
||||||
use MailPoet\Services\Bridge\API;
|
use MailPoet\Services\Bridge\API;
|
||||||
use MailPoet\Settings\SettingsController;
|
use MailPoet\Settings\SettingsController;
|
||||||
@@ -35,15 +37,25 @@ class Bounce extends SimpleWorker {
|
|||||||
/** @var SubscribersRepository */
|
/** @var SubscribersRepository */
|
||||||
private $subscribersRepository;
|
private $subscribersRepository;
|
||||||
|
|
||||||
|
/** @var ScheduledTasksRepository */
|
||||||
|
private $scheduledTasksRepository;
|
||||||
|
|
||||||
|
/** @var SendingQueuesRepository */
|
||||||
|
private $sendingQueuesRepository;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
SettingsController $settings,
|
SettingsController $settings,
|
||||||
SubscribersRepository $subscribersRepository,
|
SubscribersRepository $subscribersRepository,
|
||||||
|
ScheduledTasksRepository $scheduledTasksRepository,
|
||||||
|
SendingQueuesRepository $sendingQueuesRepository,
|
||||||
Bridge $bridge
|
Bridge $bridge
|
||||||
) {
|
) {
|
||||||
$this->settings = $settings;
|
$this->settings = $settings;
|
||||||
$this->bridge = $bridge;
|
$this->bridge = $bridge;
|
||||||
parent::__construct();
|
parent::__construct();
|
||||||
$this->subscribersRepository = $subscribersRepository;
|
$this->subscribersRepository = $subscribersRepository;
|
||||||
|
$this->scheduledTasksRepository = $scheduledTasksRepository;
|
||||||
|
$this->sendingQueuesRepository = $sendingQueuesRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function init() {
|
public function init() {
|
||||||
@@ -83,7 +95,7 @@ class Bounce extends SimpleWorker {
|
|||||||
$subscriberEmails = $this->subscribersRepository->getUndeletedSubscribersEmailsByIds($subscribersToProcessIds);
|
$subscriberEmails = $this->subscribersRepository->getUndeletedSubscribersEmailsByIds($subscribersToProcessIds);
|
||||||
$subscriberEmails = array_column($subscriberEmails, 'email');
|
$subscriberEmails = array_column($subscriberEmails, 'email');
|
||||||
|
|
||||||
$this->processEmails($subscriberEmails);
|
$this->processEmails($task, $subscriberEmails);
|
||||||
|
|
||||||
$taskSubscribers->updateProcessedSubscribers($subscribersToProcessIds);
|
$taskSubscribers->updateProcessedSubscribers($subscribersToProcessIds);
|
||||||
}
|
}
|
||||||
@@ -91,12 +103,13 @@ class Bounce extends SimpleWorker {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function processEmails(array $subscriberEmails) {
|
public function processEmails($task, array $subscriberEmails) {
|
||||||
$checkedEmails = $this->api->checkBounces($subscriberEmails);
|
$checkedEmails = $this->api->checkBounces($subscriberEmails);
|
||||||
$this->processApiResponse((array)$checkedEmails);
|
$this->processApiResponse($task, (array)$checkedEmails);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function processApiResponse(array $checkedEmails) {
|
public function processApiResponse($task, array $checkedEmails) {
|
||||||
|
$previousTask = $this->findPreviousTask($task);
|
||||||
foreach ($checkedEmails as $email) {
|
foreach ($checkedEmails as $email) {
|
||||||
if (!isset($email['address'], $email['bounce'])) {
|
if (!isset($email['address'], $email['bounce'])) {
|
||||||
continue;
|
continue;
|
||||||
@@ -105,6 +118,7 @@ class Bounce extends SimpleWorker {
|
|||||||
$subscriber = $this->subscribersRepository->findOneBy(['email' => $email['address']]);
|
$subscriber = $this->subscribersRepository->findOneBy(['email' => $email['address']]);
|
||||||
if (!$subscriber instanceof SubscriberEntity) continue;
|
if (!$subscriber instanceof SubscriberEntity) continue;
|
||||||
$subscriber->setStatus(SubscriberEntity::STATUS_BOUNCED);
|
$subscriber->setStatus(SubscriberEntity::STATUS_BOUNCED);
|
||||||
|
$this->saveBouncedStatistics($subscriber, $task, $previousTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
$this->subscribersRepository->flush();
|
$this->subscribersRepository->flush();
|
||||||
@@ -118,4 +132,23 @@ class Bounce extends SimpleWorker {
|
|||||||
->addMinutes(rand(0, 59))
|
->addMinutes(rand(0, 59))
|
||||||
->addSeconds(rand(0, 59));
|
->addSeconds(rand(0, 59));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function findPreviousTask(ScheduledTask $task): ?ScheduledTaskEntity {
|
||||||
|
$taskEntity = $this->scheduledTasksRepository->findOneById($task->id);
|
||||||
|
if (!$taskEntity instanceof ScheduledTaskEntity) return null;
|
||||||
|
return $this->scheduledTasksRepository->findPreviousTask($taskEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function saveBouncedStatistics(SubscriberEntity $subscriber, ScheduledTask $task, ?ScheduledTaskEntity $previousTask) {
|
||||||
|
$taskEntity = $this->scheduledTasksRepository->findOneById($task->id);
|
||||||
|
if (!$taskEntity instanceof ScheduledTaskEntity) return null;
|
||||||
|
$dateFrom = null;
|
||||||
|
if ($previousTask instanceof ScheduledTaskEntity) {
|
||||||
|
$dateFrom = $previousTask->getScheduledAt();
|
||||||
|
}
|
||||||
|
$queues = $this->sendingQueuesRepository->findAllForSubscriberSentBetween($subscriber, $taskEntity->getScheduledAt(), $dateFrom);
|
||||||
|
foreach ($queues as $queue) {
|
||||||
|
// todo save statistics
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -74,6 +74,19 @@ class ScheduledTasksRepository extends Repository {
|
|||||||
return $queryBuilder->getQuery()->getOneOrNullResult();
|
return $queryBuilder->getQuery()->getOneOrNullResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function findPreviousTask(ScheduledTaskEntity $task): ?ScheduledTaskEntity {
|
||||||
|
return $this->doctrineRepository->createQueryBuilder('st')
|
||||||
|
->select('st')
|
||||||
|
->where('st.type = :type')
|
||||||
|
->setParameter('type', $task->getType())
|
||||||
|
->andWhere('st.createdAt < :created')
|
||||||
|
->setParameter('created', $task->getCreatedAt())
|
||||||
|
->orderBy('st.scheduledAt', 'DESC')
|
||||||
|
->setMaxResults(1)
|
||||||
|
->getQuery()
|
||||||
|
->getOneOrNullResult();
|
||||||
|
}
|
||||||
|
|
||||||
protected function getEntityClassName() {
|
protected function getEntityClassName() {
|
||||||
return ScheduledTaskEntity::class;
|
return ScheduledTaskEntity::class;
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,7 @@ namespace MailPoet\Newsletter\Sending;
|
|||||||
|
|
||||||
use MailPoet\Doctrine\Repository;
|
use MailPoet\Doctrine\Repository;
|
||||||
use MailPoet\Entities\NewsletterEntity;
|
use MailPoet\Entities\NewsletterEntity;
|
||||||
|
use MailPoet\Entities\ScheduledTaskEntity;
|
||||||
use MailPoet\Entities\SendingQueueEntity;
|
use MailPoet\Entities\SendingQueueEntity;
|
||||||
use MailPoet\Entities\SubscriberEntity;
|
use MailPoet\Entities\SubscriberEntity;
|
||||||
use MailPoetVendor\Doctrine\ORM\EntityManager;
|
use MailPoetVendor\Doctrine\ORM\EntityManager;
|
||||||
@@ -45,4 +46,36 @@ class SendingQueuesRepository extends Repository {
|
|||||||
if (is_null($task)) return false;
|
if (is_null($task)) return false;
|
||||||
return $this->scheduledTaskSubscribersRepository->isSubscriberProcessed($task, $subscriber);
|
return $this->scheduledTaskSubscribersRepository->isSubscriberProcessed($task, $subscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return SendingQueueEntity[]
|
||||||
|
*/
|
||||||
|
public function findAllForSubscriberSentBetween(
|
||||||
|
SubscriberEntity $subscriber,
|
||||||
|
?\DateTimeInterface $dateTo,
|
||||||
|
?\DateTimeInterface $dateFrom
|
||||||
|
): array {
|
||||||
|
$qb = $this->entityManager->createQueryBuilder()
|
||||||
|
->select('s, n')
|
||||||
|
->from(SendingQueueEntity::class, 's')
|
||||||
|
->join('s.task', 't')
|
||||||
|
->join('t.subscribers', 'tsub')
|
||||||
|
->join('tsub.subscriber', 'sub')
|
||||||
|
->join('s.newsletter', 'n')
|
||||||
|
->where('t.status = :status')
|
||||||
|
->setParameter('status', ScheduledTaskEntity::STATUS_COMPLETED)
|
||||||
|
->andWhere('t.type = :sendingType')
|
||||||
|
->setParameter('sendingType', 'sending')
|
||||||
|
->andWhere('sub.id = :subscriber')
|
||||||
|
->setParameter('subscriber', $subscriber);
|
||||||
|
if ($dateTo) {
|
||||||
|
$qb->andWhere('t.updatedAt < :dateTo')
|
||||||
|
->setParameter('dateTo', $dateTo);
|
||||||
|
}
|
||||||
|
if ($dateFrom) {
|
||||||
|
$qb->andWhere('t.updatedAt > :dateFrom')
|
||||||
|
->setParameter('dateFrom', $dateFrom);
|
||||||
|
}
|
||||||
|
return $qb->getQuery()->getResult();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -4,16 +4,22 @@ namespace MailPoet\Test\Cron\Workers;
|
|||||||
|
|
||||||
use MailPoet\Cron\Workers\Bounce;
|
use MailPoet\Cron\Workers\Bounce;
|
||||||
use MailPoet\Cron\Workers\Bounce\BounceTestMockAPI as MockAPI;
|
use MailPoet\Cron\Workers\Bounce\BounceTestMockAPI as MockAPI;
|
||||||
|
use MailPoet\Entities\NewsletterEntity;
|
||||||
use MailPoet\Entities\ScheduledTaskEntity;
|
use MailPoet\Entities\ScheduledTaskEntity;
|
||||||
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||||
|
use MailPoet\Entities\SendingQueueEntity;
|
||||||
|
use MailPoet\Entities\StatisticsBounceEntity;
|
||||||
use MailPoet\Entities\SubscriberEntity;
|
use MailPoet\Entities\SubscriberEntity;
|
||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
use MailPoet\Models\ScheduledTask;
|
||||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
|
||||||
|
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
||||||
use MailPoet\Services\Bridge;
|
use MailPoet\Services\Bridge;
|
||||||
use MailPoet\Services\Bridge\API;
|
use MailPoet\Services\Bridge\API;
|
||||||
use MailPoet\Settings\SettingsController;
|
use MailPoet\Settings\SettingsController;
|
||||||
use MailPoet\Settings\SettingsRepository;
|
use MailPoet\Settings\SettingsRepository;
|
||||||
|
use MailPoet\Statistics\StatisticsBouncesRepository;
|
||||||
use MailPoet\Subscribers\SubscribersRepository;
|
use MailPoet\Subscribers\SubscribersRepository;
|
||||||
use MailPoet\WP\Functions as WPFunctions;
|
use MailPoet\WP\Functions as WPFunctions;
|
||||||
use MailPoetVendor\Carbon\Carbon;
|
use MailPoetVendor\Carbon\Carbon;
|
||||||
@@ -33,6 +39,7 @@ class BounceTest extends \MailPoetTest {
|
|||||||
|
|
||||||
public function _before() {
|
public function _before() {
|
||||||
parent::_before();
|
parent::_before();
|
||||||
|
$this->cleanup();
|
||||||
$this->emails = [
|
$this->emails = [
|
||||||
'soft_bounce@example.com',
|
'soft_bounce@example.com',
|
||||||
'hard_bounce@example.com',
|
'hard_bounce@example.com',
|
||||||
@@ -50,6 +57,9 @@ class BounceTest extends \MailPoetTest {
|
|||||||
$this->worker = new Bounce(
|
$this->worker = new Bounce(
|
||||||
$this->diContainer->get(SettingsController::class),
|
$this->diContainer->get(SettingsController::class),
|
||||||
$this->subscribersRepository,
|
$this->subscribersRepository,
|
||||||
|
$this->diContainer->get(ScheduledTasksRepository::class),
|
||||||
|
$this->diContainer->get(SendingQueuesRepository::class),
|
||||||
|
$this->diContainer->get(StatisticsBouncesRepository::class),
|
||||||
$this->diContainer->get(Bridge::class)
|
$this->diContainer->get(Bridge::class)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -67,6 +77,9 @@ class BounceTest extends \MailPoetTest {
|
|||||||
$worker = new Bounce(
|
$worker = new Bounce(
|
||||||
$this->diContainer->get(SettingsController::class),
|
$this->diContainer->get(SettingsController::class),
|
||||||
$this->subscribersRepository,
|
$this->subscribersRepository,
|
||||||
|
$this->diContainer->get(ScheduledTasksRepository::class),
|
||||||
|
$this->diContainer->get(SendingQueuesRepository::class),
|
||||||
|
$this->diContainer->get(StatisticsBouncesRepository::class),
|
||||||
$this->diContainer->get(Bridge::class)
|
$this->diContainer->get(Bridge::class)
|
||||||
);
|
);
|
||||||
$worker->init();
|
$worker->init();
|
||||||
@@ -122,7 +135,8 @@ class BounceTest extends \MailPoetTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public function testItSetsSubscriberStatusAsBounced() {
|
public function testItSetsSubscriberStatusAsBounced() {
|
||||||
$this->worker->processEmails($this->emails);
|
$task = $this->createRunningTask();
|
||||||
|
$this->worker->processEmails($task, $this->emails);
|
||||||
|
|
||||||
$subscribers = $this->subscribersRepository->findAll();
|
$subscribers = $this->subscribersRepository->findAll();
|
||||||
|
|
||||||
@@ -160,10 +174,13 @@ class BounceTest extends \MailPoetTest {
|
|||||||
return $task;
|
return $task;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function _after() {
|
public function cleanup() {
|
||||||
$this->diContainer->get(SettingsRepository::class)->truncate();
|
$this->diContainer->get(SettingsRepository::class)->truncate();
|
||||||
$this->truncateEntity(SubscriberEntity::class);
|
$this->truncateEntity(SubscriberEntity::class);
|
||||||
$this->truncateEntity(ScheduledTaskEntity::class);
|
$this->truncateEntity(ScheduledTaskEntity::class);
|
||||||
$this->truncateEntity(ScheduledTaskSubscriberEntity::class);
|
$this->truncateEntity(ScheduledTaskSubscriberEntity::class);
|
||||||
|
$this->truncateEntity(StatisticsBounceEntity::class);
|
||||||
|
$this->truncateEntity(NewsletterEntity::class);
|
||||||
|
$this->truncateEntity(SendingQueueEntity::class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user