Rewrite SendingQueue::process() and progress/timeout functions using Doctrine

[MAILPOET-5682]
This commit is contained in:
Jan Jakes
2023-10-24 14:24:14 +02:00
committed by Rodrigo Primo
parent 38cc3bd56f
commit 3394568792

View File

@@ -127,17 +127,19 @@ class SendingQueue {
public function process($timer = false) { public function process($timer = false) {
$timer = $timer ?: microtime(true); $timer = $timer ?: microtime(true);
$this->enforceSendingAndExecutionLimits($timer); $this->enforceSendingAndExecutionLimits($timer);
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $taskEntity) { foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $task) {
$task = ScheduledTask::findOne($taskEntity->getId()); $queue = $task->getSendingQueue();
if (!$task instanceof ScheduledTask) continue; if (!$queue) {
continue;
}
$queue = SendingTask::createFromScheduledTask($task); $legacyTask = ScheduledTask::findOne($task->getId());
if (!$queue instanceof SendingTask) continue; $legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null;
if (!$legacyQueue) {
continue;
}
$task = $queue->task(); if ($task->getInProgress()) {
if (!$task instanceof ScheduledTask) continue;
if ($this->isInProgress($task)) {
if ($this->isTimeout($task)) { if ($this->isTimeout($task)) {
$this->stopProgress($task); $this->stopProgress($task);
} else { } else {
@@ -149,8 +151,8 @@ class SendingQueue {
$this->startProgress($task); $this->startProgress($task);
try { try {
$this->scheduledTasksRepository->touchAllByIds([$queue->taskId]); $this->scheduledTasksRepository->touchAllByIds([$task->getId()]);
$this->processSending($queue, (int)$timer); $this->processSending($legacyQueue, (int)$timer);
} catch (\Exception $e) { } catch (\Exception $e) {
$this->stopProgress($task); $this->stopProgress($task);
throw $e; throw $e;
@@ -540,31 +542,20 @@ class SendingQueue {
} }
} }
private function isInProgress(ScheduledTask $task): bool { private function startProgress(ScheduledTaskEntity $task): void {
if (!empty($task->inProgress)) { $task->setInProgress(true);
// Do not run multiple instances of the task $this->scheduledTasksRepository->flush();
return true;
}
return false;
} }
private function startProgress(ScheduledTask $task): void { private function stopProgress(ScheduledTaskEntity $task): void {
$task->inProgress = true; $task->setInProgress(true);
$task->save(); $this->scheduledTasksRepository->flush();
} }
private function stopProgress(ScheduledTask $task): void { private function isTimeout(ScheduledTaskEntity $task): bool {
$task->inProgress = false;
$task->save();
}
private function isTimeout(ScheduledTask $task): bool {
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
$updated = strtotime((string)$task->updatedAt); $updatedAt = new Carbon($task->getUpdatedAt());
if ($updated !== false) { if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
$updatedAt = Carbon::createFromTimestamp($updated);
}
if (isset($updatedAt) && $updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
return true; return true;
} }