Rewrite SendingQueue::process() and progress/timeout functions using Doctrine
[MAILPOET-5682]
This commit is contained in:
@@ -127,17 +127,19 @@ class SendingQueue {
|
||||
public function process($timer = false) {
|
||||
$timer = $timer ?: microtime(true);
|
||||
$this->enforceSendingAndExecutionLimits($timer);
|
||||
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $taskEntity) {
|
||||
$task = ScheduledTask::findOne($taskEntity->getId());
|
||||
if (!$task instanceof ScheduledTask) continue;
|
||||
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $task) {
|
||||
$queue = $task->getSendingQueue();
|
||||
if (!$queue) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$queue = SendingTask::createFromScheduledTask($task);
|
||||
if (!$queue instanceof SendingTask) continue;
|
||||
$legacyTask = ScheduledTask::findOne($task->getId());
|
||||
$legacyQueue = $legacyTask ? SendingTask::createFromScheduledTask($legacyTask) : null;
|
||||
if (!$legacyQueue) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$task = $queue->task();
|
||||
if (!$task instanceof ScheduledTask) continue;
|
||||
|
||||
if ($this->isInProgress($task)) {
|
||||
if ($task->getInProgress()) {
|
||||
if ($this->isTimeout($task)) {
|
||||
$this->stopProgress($task);
|
||||
} else {
|
||||
@@ -149,8 +151,8 @@ class SendingQueue {
|
||||
$this->startProgress($task);
|
||||
|
||||
try {
|
||||
$this->scheduledTasksRepository->touchAllByIds([$queue->taskId]);
|
||||
$this->processSending($queue, (int)$timer);
|
||||
$this->scheduledTasksRepository->touchAllByIds([$task->getId()]);
|
||||
$this->processSending($legacyQueue, (int)$timer);
|
||||
} catch (\Exception $e) {
|
||||
$this->stopProgress($task);
|
||||
throw $e;
|
||||
@@ -540,31 +542,20 @@ class SendingQueue {
|
||||
}
|
||||
}
|
||||
|
||||
private function isInProgress(ScheduledTask $task): bool {
|
||||
if (!empty($task->inProgress)) {
|
||||
// Do not run multiple instances of the task
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
private function startProgress(ScheduledTaskEntity $task): void {
|
||||
$task->setInProgress(true);
|
||||
$this->scheduledTasksRepository->flush();
|
||||
}
|
||||
|
||||
private function startProgress(ScheduledTask $task): void {
|
||||
$task->inProgress = true;
|
||||
$task->save();
|
||||
private function stopProgress(ScheduledTaskEntity $task): void {
|
||||
$task->setInProgress(true);
|
||||
$this->scheduledTasksRepository->flush();
|
||||
}
|
||||
|
||||
private function stopProgress(ScheduledTask $task): void {
|
||||
$task->inProgress = false;
|
||||
$task->save();
|
||||
}
|
||||
|
||||
private function isTimeout(ScheduledTask $task): bool {
|
||||
private function isTimeout(ScheduledTaskEntity $task): bool {
|
||||
$currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
|
||||
$updated = strtotime((string)$task->updatedAt);
|
||||
if ($updated !== false) {
|
||||
$updatedAt = Carbon::createFromTimestamp($updated);
|
||||
}
|
||||
if (isset($updatedAt) && $updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
|
||||
$updatedAt = new Carbon($task->getUpdatedAt());
|
||||
if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user