Rewrite SendingQueue::process() and progress/timeout functions using Doctrine
[MAILPOET-5682]
This commit is contained in:
@@ -127,17 +127,19 @@ class SendingQueue {
|
|||||||
public function process($timer = false) {
|
public function process($timer = false) {
|
||||||
$timer = $timer ?: microtime(true);
|
$timer = $timer ?: microtime(true);
|
||||||
$this->enforceSendingAndExecutionLimits($timer);
|
$this->enforceSendingAndExecutionLimits($timer);
|
||||||
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $taskEntity) {
|
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $task) {
|
||||||
$task = ScheduledTask::findOne($taskEntity->getId());
|
$queue = $task->getSendingQueue();
|
||||||
if (!$task instanceof ScheduledTask) continue;
|
if (!$queue) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
$queue = SendingTask::createFromScheduledTask($task);
|
$legacyTask = ScheduledTask::findOne($task->getId());
|
||||||
if (!$queue instanceof SendingTask) continue;
|
$legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null;
|
||||||
|
if (!$legacyQueue) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
$task = $queue->task();
|
if ($task->getInProgress()) {
|
||||||
if (!$task instanceof ScheduledTask) continue;
|
|
||||||
|
|
||||||
if ($this->isInProgress($task)) {
|
|
||||||
if ($this->isTimeout($task)) {
|
if ($this->isTimeout($task)) {
|
||||||
$this->stopProgress($task);
|
$this->stopProgress($task);
|
||||||
} else {
|
} else {
|
||||||
@@ -149,8 +151,8 @@ class SendingQueue {
|
|||||||
$this->startProgress($task);
|
$this->startProgress($task);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$this->scheduledTasksRepository->touchAllByIds([$queue->taskId]);
|
$this->scheduledTasksRepository->touchAllByIds([$task->getId()]);
|
||||||
$this->processSending($queue, (int)$timer);
|
$this->processSending($legacyQueue, (int)$timer);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
$this->stopProgress($task);
|
$this->stopProgress($task);
|
||||||
throw $e;
|
throw $e;
|
||||||
@@ -540,31 +542,20 @@ class SendingQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function isInProgress(ScheduledTask $task): bool {
|
private function startProgress(ScheduledTaskEntity $task): void {
|
||||||
if (!empty($task->inProgress)) {
|
$task->setInProgress(true);
|
||||||
// Do not run multiple instances of the task
|
$this->scheduledTasksRepository->flush();
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private function startProgress(ScheduledTask $task): void {
|
private function stopProgress(ScheduledTaskEntity $task): void {
|
||||||
$task->inProgress = true;
|
$task->setInProgress(true);
|
||||||
$task->save();
|
$this->scheduledTasksRepository->flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function stopProgress(ScheduledTask $task): void {
|
private function isTimeout(ScheduledTaskEntity $task): bool {
|
||||||
$task->inProgress = false;
|
|
||||||
$task->save();
|
|
||||||
}
|
|
||||||
|
|
||||||
private function isTimeout(ScheduledTask $task): bool {
|
|
||||||
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
|
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
|
||||||
$updated = strtotime((string)$task->updatedAt);
|
$updatedAt = new Carbon($task->getUpdatedAt());
|
||||||
if ($updated !== false) {
|
if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
|
||||||
$updatedAt = Carbon::createFromTimestamp($updated);
|
|
||||||
}
|
|
||||||
if (isset($updatedAt) && $updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user