Extract common and task-specific methods for subscribers [MAILPOET-940]
This commit is contained in:
@@ -5,6 +5,8 @@ use MailPoet\Cron\CronHelper;
|
|||||||
use MailPoet\Mailer\Mailer;
|
use MailPoet\Mailer\Mailer;
|
||||||
use MailPoet\Models\ScheduledTask;
|
use MailPoet\Models\ScheduledTask;
|
||||||
use MailPoet\Models\ScheduledTaskSubscriber;
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Tasks\Bounce as BounceTask;
|
||||||
|
use MailPoet\Tasks\Subscribers as TaskSubscribers;
|
||||||
use MailPoet\Tasks\Subscribers\BatchIterator;
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
||||||
use MailPoet\Models\Subscriber;
|
use MailPoet\Models\Subscriber;
|
||||||
use MailPoet\Services\Bridge;
|
use MailPoet\Services\Bridge;
|
||||||
@@ -35,21 +37,7 @@ class Bounce extends SimpleWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function prepareTask(ScheduledTask $task) {
|
function prepareTask(ScheduledTask $task) {
|
||||||
// Prepare subscribers on the DB side for performance reasons
|
BounceTask::prepareSubscribers($task);
|
||||||
Subscriber::rawExecute(
|
|
||||||
'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
|
|
||||||
(task_id, subscriber_id, processed)
|
|
||||||
SELECT ? as task_id, s.`id` as subscriber_id, ? as processed
|
|
||||||
FROM ' . MP_SUBSCRIBERS_TABLE . ' s
|
|
||||||
WHERE s.`deleted_at` IS NULL
|
|
||||||
AND s.`status` IN (?, ?)',
|
|
||||||
array(
|
|
||||||
$task->id,
|
|
||||||
ScheduledTaskSubscriber::STATUS_TO_PROCESS,
|
|
||||||
Subscriber::STATUS_SUBSCRIBED,
|
|
||||||
Subscriber::STATUS_UNCONFIRMED
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
if(!ScheduledTaskSubscriber::getToProcessCount($task->id)) {
|
if(!ScheduledTaskSubscriber::getToProcessCount($task->id)) {
|
||||||
$task->delete();
|
$task->delete();
|
||||||
@@ -67,6 +55,8 @@ class Bounce extends SimpleWorker {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$task_subscribers = new TaskSubscribers($task);
|
||||||
|
|
||||||
foreach($subscriber_batches as $subscribers_to_process_ids) {
|
foreach($subscriber_batches as $subscribers_to_process_ids) {
|
||||||
// abort if execution limit is reached
|
// abort if execution limit is reached
|
||||||
CronHelper::enforceExecutionLimit($this->timer);
|
CronHelper::enforceExecutionLimit($this->timer);
|
||||||
@@ -79,7 +69,7 @@ class Bounce extends SimpleWorker {
|
|||||||
|
|
||||||
$this->processEmails($subscriber_emails);
|
$this->processEmails($subscriber_emails);
|
||||||
|
|
||||||
$task->updateProcessedSubscribers($subscribers_to_process_ids);
|
$task_subscribers->updateProcessedSubscribers($subscribers_to_process_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@@ -11,10 +11,6 @@ class ScheduledTask extends Model {
|
|||||||
const PRIORITY_MEDIUM = 5;
|
const PRIORITY_MEDIUM = 5;
|
||||||
const PRIORITY_LOW = 10;
|
const PRIORITY_LOW = 10;
|
||||||
|
|
||||||
function taskSubscribers() {
|
|
||||||
return $this->has_many(__NAMESPACE__ . '\ScheduledTaskSubscriber', 'task_id', 'id');
|
|
||||||
}
|
|
||||||
|
|
||||||
function complete() {
|
function complete() {
|
||||||
$this->set('status', self::STATUS_COMPLETED);
|
$this->set('status', self::STATUS_COMPLETED);
|
||||||
$this->save();
|
$this->save();
|
||||||
@@ -29,21 +25,4 @@ class ScheduledTask extends Model {
|
|||||||
parent::save();
|
parent::save();
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
function updateProcessedSubscribers(array $processed_subscribers) {
|
|
||||||
$this->taskSubscribers()
|
|
||||||
->whereIn('subscriber_id', $processed_subscribers)
|
|
||||||
->findResultSet()
|
|
||||||
->set('processed', ScheduledTaskSubscriber::STATUS_PROCESSED)
|
|
||||||
->save();
|
|
||||||
$this->checkCompleted();
|
|
||||||
}
|
|
||||||
|
|
||||||
private function checkCompleted() {
|
|
||||||
if(!ScheduledTaskSubscriber::getToProcessCount($this->id)) {
|
|
||||||
$this->processed_at = current_time('mysql');
|
|
||||||
$this->status = self::STATUS_COMPLETED;
|
|
||||||
return $this->save();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
28
lib/Tasks/Bounce.php
Normal file
28
lib/Tasks/Bounce.php
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Tasks;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
use MailPoet\Models\Subscriber;
|
||||||
|
|
||||||
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
|
class Bounce {
|
||||||
|
static function prepareSubscribers(ScheduledTask $task) {
|
||||||
|
// Prepare subscribers on the DB side for performance reasons
|
||||||
|
Subscriber::rawExecute(
|
||||||
|
'INSERT INTO ' . MP_SCHEDULED_TASK_SUBSCRIBERS_TABLE . '
|
||||||
|
(task_id, subscriber_id, processed)
|
||||||
|
SELECT ? as task_id, s.`id` as subscriber_id, ? as processed
|
||||||
|
FROM ' . MP_SUBSCRIBERS_TABLE . ' s
|
||||||
|
WHERE s.`deleted_at` IS NULL
|
||||||
|
AND s.`status` IN (?, ?)',
|
||||||
|
array(
|
||||||
|
$task->id,
|
||||||
|
ScheduledTaskSubscriber::STATUS_TO_PROCESS,
|
||||||
|
Subscriber::STATUS_SUBSCRIBED,
|
||||||
|
Subscriber::STATUS_UNCONFIRMED
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
49
lib/Tasks/Subscribers.php
Normal file
49
lib/Tasks/Subscribers.php
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
<?php
|
||||||
|
namespace MailPoet\Tasks;
|
||||||
|
|
||||||
|
use MailPoet\Models\ScheduledTask;
|
||||||
|
use MailPoet\Models\ScheduledTaskSubscriber;
|
||||||
|
|
||||||
|
if(!defined('ABSPATH')) exit;
|
||||||
|
|
||||||
|
class Subscribers {
|
||||||
|
private $task;
|
||||||
|
|
||||||
|
public function __construct(ScheduledTask $task) {
|
||||||
|
$this->task = $task;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getSubscribers() {
|
||||||
|
return ScheduledTaskSubscriber::where('task_id', $this->task->id);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isSubscriberProcessed($subscriber_id) {
|
||||||
|
$subscriber = $this->getSubscribers()
|
||||||
|
->where('subscriber_id', $subscriber_id)
|
||||||
|
->findOne();
|
||||||
|
return !empty($subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeNonexistentSubscribers($subscribers_to_remove) {
|
||||||
|
$this->getSubscribers()
|
||||||
|
->whereIn('subscriber_id', $subscribers_to_remove)
|
||||||
|
->deleteMany();
|
||||||
|
$this->checkCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateProcessedSubscribers(array $processed_subscribers) {
|
||||||
|
$this->getSubscribers()
|
||||||
|
->whereIn('subscriber_id', $processed_subscribers)
|
||||||
|
->findResultSet()
|
||||||
|
->set('processed', ScheduledTaskSubscriber::STATUS_PROCESSED)
|
||||||
|
->save();
|
||||||
|
$this->checkCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function checkCompleted() {
|
||||||
|
if(!ScheduledTaskSubscriber::getToProcessCount($this->task->id)) {
|
||||||
|
$this->task->processed_at = current_time('mysql');
|
||||||
|
return $this->task->complete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user