Rewrite SendingQueue::process() and progress/timeout functions using Doctrine

[MAILPOET-5682]
This commit is contained in:
Jan Jakes
2023-10-24 14:24:14 +02:00
committed by Rodrigo Primo
parent 38cc3bd56f
commit 3394568792

View File

@@ -127,17 +127,19 @@ class SendingQueue {
public function process($timer = false) {
$timer = $timer ?: microtime(true);
$this->enforceSendingAndExecutionLimits($timer);
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $taskEntity) {
$task = ScheduledTask::findOne($taskEntity->getId());
if (!$task instanceof ScheduledTask) continue;
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $task) {
$queue = $task->getSendingQueue();
if (!$queue) {
continue;
}
$queue = SendingTask::createFromScheduledTask($task);
if (!$queue instanceof SendingTask) continue;
$legacyTask = ScheduledTask::findOne($task->getId());
$legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null;
if (!$legacyQueue) {
continue;
}
$task = $queue->task();
if (!$task instanceof ScheduledTask) continue;
if ($this->isInProgress($task)) {
if ($task->getInProgress()) {
if ($this->isTimeout($task)) {
$this->stopProgress($task);
} else {
@@ -149,8 +151,8 @@ class SendingQueue {
$this->startProgress($task);
try {
$this->scheduledTasksRepository->touchAllByIds([$queue->taskId]);
$this->processSending($queue, (int)$timer);
$this->scheduledTasksRepository->touchAllByIds([$task->getId()]);
$this->processSending($legacyQueue, (int)$timer);
} catch (\Exception $e) {
$this->stopProgress($task);
throw $e;
@@ -540,31 +542,20 @@ class SendingQueue {
}
}
private function isInProgress(ScheduledTask $task): bool {
if (!empty($task->inProgress)) {
// Do not run multiple instances of the task
return true;
}
return false;
private function startProgress(ScheduledTaskEntity $task): void {
$task->setInProgress(true);
$this->scheduledTasksRepository->flush();
}
private function startProgress(ScheduledTask $task): void {
$task->inProgress = true;
$task->save();
private function stopProgress(ScheduledTaskEntity $task): void {
$task->setInProgress(true);
$this->scheduledTasksRepository->flush();
}
private function stopProgress(ScheduledTask $task): void {
$task->inProgress = false;
$task->save();
}
private function isTimeout(ScheduledTask $task): bool {
private function isTimeout(ScheduledTaskEntity $task): bool {
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
$updated = strtotime((string)$task->updatedAt);
if ($updated !== false) {
$updatedAt = Carbon::createFromTimestamp($updated);
}
if (isset($updatedAt) && $updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
$updatedAt = new Carbon($task->getUpdatedAt());
if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
return true;
}