*/ private $stepRunners; /** @var WorkflowRunLogStorage */ private $workflowRunLogStorage; /** @var Hooks */ private $hooks; public function __construct( ActionScheduler $actionScheduler, ActionStepRunner $actionStepRunner, Hooks $hooks, WordPress $wordPress, WorkflowRunStorage $workflowRunStorage, WorkflowRunLogStorage $workflowRunLogStorage, WorkflowStorage $workflowStorage ) { $this->actionScheduler = $actionScheduler; $this->actionStepRunner = $actionStepRunner; $this->hooks = $hooks; $this->wordPress = $wordPress; $this->workflowRunStorage = $workflowRunStorage; $this->workflowRunLogStorage = $workflowRunLogStorage; $this->workflowStorage = $workflowStorage; } public function initialize(): void { $this->wordPress->addAction(Hooks::WORKFLOW_STEP, [$this, 'handle']); $this->addStepRunner(Step::TYPE_ACTION, $this->actionStepRunner); $this->wordPress->doAction(Hooks::STEP_RUNNER_INITIALIZE, [$this]); } public function addStepRunner(string $stepType, StepRunner $stepRunner): void { $this->stepRunners[$stepType] = $stepRunner; } /** @param mixed $args */ public function handle($args): void { // TODO: args validation if (!is_array($args)) { throw new InvalidStateException(); } // Action Scheduler catches only Exception instances, not other errors. // We need to convert them to exceptions to be processed and logged. try { $this->handleStep($args); } catch (Throwable $e) { $this->workflowRunStorage->updateStatus((int)$args['workflow_run_id'], WorkflowRun::STATUS_FAILED); if (!$e instanceof Exception) { throw new Exception($e->getMessage(), intval($e->getCode()), $e); } throw $e; } } private function handleStep(array $args): void { $workflowRunId = $args['workflow_run_id']; $stepId = $args['step_id']; $workflowRun = $this->workflowRunStorage->getWorkflowRun($workflowRunId); if (!$workflowRun) { throw Exceptions::workflowRunNotFound($workflowRunId); } if ($workflowRun->getStatus() !== WorkflowRun::STATUS_RUNNING) { throw Exceptions::workflowRunNotRunning($workflowRunId, $workflowRun->getStatus()); } $workflow = $this->workflowStorage->getWorkflow($workflowRun->getWorkflowId(), $workflowRun->getVersionId()); if (!$workflow) { throw Exceptions::workflowVersionNotFound($workflowRun->getWorkflowId(), $workflowRun->getVersionId()); } // complete workflow run if (!$stepId) { $this->workflowRunStorage->updateStatus($workflowRunId, WorkflowRun::STATUS_COMPLETE); return; } $step = $workflow->getStep($stepId); if (!$step) { throw Exceptions::workflowStepNotFound($stepId); } $stepType = $step->getType(); if (isset($this->stepRunners[$stepType])) { $log = new WorkflowRunLog($workflowRun->getId(), $step->getId()); try { $this->stepRunners[$stepType]->run($step, $workflow, $workflowRun); $log->markCompletedSuccessfully(); } catch (Throwable $e) { $log->markFailed(); $log->setError($e); throw $e; } finally { try { $this->hooks->doWorkflowStepAfterRun($log); } catch (Throwable $e) { // Ignore integration errors } $this->workflowRunLogStorage->createWorkflowRunLog($log); } } else { throw new InvalidStateException(); } $nextStep = $step->getNextSteps()[0] ?? null; $nextStepArgs = [ [ 'workflow_run_id' => $workflowRunId, 'step_id' => $nextStep ? $nextStep->getId() : null, ], ]; // next step scheduled by action if ($this->actionScheduler->hasScheduledAction(Hooks::WORKFLOW_STEP, $nextStepArgs)) { return; } // no need to schedule a new step if the next step is null, complete the run if (!$nextStep) { $this->workflowRunStorage->updateStatus($workflowRunId, WorkflowRun::STATUS_COMPLETE); return; } // enqueue next step $this->actionScheduler->enqueue(Hooks::WORKFLOW_STEP, $nextStepArgs); // TODO: allow long-running steps (that are not done here yet) } }