type = self::TASK_TYPE; $task->save(); } if (!$queue instanceof SendingQueue) { $queue = SendingQueue::create(); $queue->newsletterId = 0; $queue->taskId = $task->id; $queue->save(); } if ($task->type !== self::TASK_TYPE) { throw new \Exception('Only tasks of type "' . self::TASK_TYPE . '" are accepted by this class'); } $this->task = $task; $this->queue = $queue; $this->taskSubscribers = new Subscribers($task); } public static function create(ScheduledTask $task = null, SendingQueue $queue = null) { return new self($task, $queue); } public static function createManyFromTasks($tasks) { if (empty($tasks)) { return []; } $tasksIds = array_map(function($task) { return $task->id; }, $tasks); $queues = SendingQueue::whereIn('task_id', $tasksIds)->findMany(); $queuesIndex = []; foreach ($queues as $queue) { $queuesIndex[$queue->taskId] = $queue; } $result = []; foreach ($tasks as $task) { if (!empty($queuesIndex[$task->id])) { $result[] = self::create($task, $queuesIndex[$task->id]); } else { static::handleInvalidTask($task); } } return $result; } public static function handleInvalidTask(ScheduledTask $task) { $loggerFactory = LoggerFactory::getInstance(); $loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'invalid sending task found', ['task_id' => $task->id] ); $task->status = ScheduledTask::STATUS_INVALID; $task->save(); } public static function createFromScheduledTask(ScheduledTask $task) { $queue = SendingQueue::where('task_id', $task->id)->findOne(); if (!$queue) { return false; } return self::create($task, $queue); } public static function createFromQueue(SendingQueue $queue) { $task = $queue->task()->findOne(); if (!$task) { return false; } return self::create($task, $queue); } public static function getByNewsletterId($newsletterId) { $queue = SendingQueue::where('newsletter_id', $newsletterId) ->orderByDesc('updated_at') ->findOne(); if (!$queue instanceof SendingQueue) { return false; } return self::createFromQueue($queue); } public function asArray() { $queue = array_intersect_key( $this->queue->asArray(), array_flip($this->queueFields) ); $task = $this->task->asArray(); return array_merge($task, $queue); } public function getErrors() { $queueErrors = $this->queue->getErrors(); $taskErrors = $this->task->getErrors(); if (empty($queueErrors) && empty($taskErrors)) { return false; } return array_merge((array)$queueErrors, (array)$taskErrors); } public function save() { $this->queue->save(); $this->task->save(); $errors = $this->getErrors(); if ($errors) { $loggerFactory = LoggerFactory::getInstance(); $loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error( 'error saving sending task', ['task_id' => $this->task->id, 'queue_id' => $this->queue->id, 'errors' => $errors] ); } return $this; } public function delete() { $this->taskSubscribers->removeAllSubscribers(); $this->task->delete(); $this->queue->delete(); } public function queue() { return $this->queue; } public function getSendingQueueEntity(): SendingQueueEntity { $sendingQueuesRepository = ContainerWrapper::getInstance()->get(SendingQueuesRepository::class); $sendingQueueEntity = $sendingQueuesRepository->findOneById($this->queue->id); if (!$sendingQueueEntity) { throw new InvalidStateException(); } $sendingQueuesRepository->refresh($sendingQueueEntity); return $sendingQueueEntity; } public function task() { return $this->task; } public function taskSubscribers() { return $this->taskSubscribers; } public function getSubscribers($processed = null) { $scheduledTaskSubscribersRepository = ContainerWrapper::getInstance()->get(ScheduledTaskSubscribersRepository::class); if (is_null($processed)) { $subscribers = $scheduledTaskSubscribersRepository->findBy(['task' => $this->task->id]); } else if ($processed) { $subscribers = $scheduledTaskSubscribersRepository->findBy( ['task' => $this->task->id, 'processed' => ScheduledTaskSubscriberEntity::STATUS_PROCESSED] ); } else { $subscribers = $scheduledTaskSubscribersRepository->findBy( ['task' => $this->task->id, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED] ); } return array_map( function(ScheduledTaskSubscriberEntity $scheduledTaskSubscriber) { return (string)$scheduledTaskSubscriber->getSubscriberId(); }, $subscribers ); } public function setSubscribers(array $subscriberIds) { $this->taskSubscribers->setSubscribers($subscriberIds); $this->updateCount(); } public function removeSubscribers(array $subscriberIds) { $this->taskSubscribers->removeSubscribers($subscriberIds); $this->updateCount(); } public function removeAllSubscribers() { $this->taskSubscribers->removeAllSubscribers(); $this->updateCount(); } public function updateProcessedSubscribers(array $processedSubscribers) { $this->taskSubscribers->updateProcessedSubscribers($processedSubscribers); return $this->updateCount(count($processedSubscribers))->getErrors() === false; } public function saveSubscriberError($subcriberId, $errorMessage) { $this->taskSubscribers->saveSubscriberError($subcriberId, $errorMessage); return $this->updateCount()->getErrors() === false; } public function updateCount(?int $count = null) { if ($count) { // increment/decrement counts based on known subscriber count, don't exceed the bounds $this->queue->countProcessed = min($this->queue->countProcessed + $count, $this->queue->countTotal); $this->queue->countToProcess = max($this->queue->countToProcess - $count, 0); } else { // query DB to update counts, slower but more accurate, to be used if count isn't known $this->queue->countProcessed = ScheduledTaskSubscriber::getProcessedCount($this->task->id); $this->queue->countToProcess = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id); $this->queue->countTotal = $this->queue->countProcessed + $this->queue->countToProcess; } return $this->queue->save(); } public function hydrate(array $data) { foreach ($data as $k => $v) { $this->__set($k, $v); } } public function validate() { return $this->queue->validate() && $this->task->validate(); } public function getMeta() { return $this->queue->getMeta(); } public function __isset($prop) { $prop = Helpers::camelCaseToUnderscore($prop); if ($this->isQueueProperty($prop)) { return isset($this->queue->$prop); } else { return isset($this->task->$prop); } } public function __get($prop) { $prop = Helpers::camelCaseToUnderscore($prop); if ($this->isQueueProperty($prop)) { return $this->queue->$prop; } else { return $this->task->$prop; } } public function __set($prop, $value) { $prop = Helpers::camelCaseToUnderscore($prop); if ($this->isCommonProperty($prop)) { $this->queue->$prop = $value; $this->task->$prop = $value; } elseif ($this->isQueueProperty($prop)) { $this->queue->$prop = $value; } else { $this->task->$prop = $value; } } public function __call($name, $args) { $obj = method_exists($this->queue, $name) ? $this->queue : $this->task; $callback = [$obj, $name]; if (is_callable($callback)) { return call_user_func_array($callback, $args); } } private function isQueueProperty($prop) { return in_array($prop, $this->queueFields); } private function isCommonProperty($prop) { return in_array($prop, $this->commonFields); } }