diff --git a/lib/Config/Initializer.php b/lib/Config/Initializer.php index 77bcd7e5a9..f2750fadb7 100644 --- a/lib/Config/Initializer.php +++ b/lib/Config/Initializer.php @@ -107,7 +107,7 @@ class Initializer { $this->setupShortcodes(); $this->setupHooks(); $this->setupImages(); - $this->runQueueSupervisor(); + $this->setupTaskScheduler(); $this->plugin_initialized = true; } catch(\Exception $e) { @@ -192,14 +192,9 @@ class Initializer { $router->init(); } - function runQueueSupervisor() { - if(php_sapi_name() === 'cli') return; - try { - $supervisor = new Supervisor(); - $supervisor->checkDaemon(); - } catch(\Exception $e) { - // Prevent Daemon exceptions from breaking out and breaking UI - } + function setupTaskScheduler() { + $task_scheduler = new TaskScheduler(); + $task_scheduler->init(); } function setupImages() { diff --git a/lib/Config/TaskScheduler.php b/lib/Config/TaskScheduler.php new file mode 100644 index 0000000000..9d6808965f --- /dev/null +++ b/lib/Config/TaskScheduler.php @@ -0,0 +1,65 @@ +method = self::getCurrentMethod(); + } + + function init() { + // configure task scheduler only outside of cli environment + if(php_sapi_name() === 'cli') return; + switch($this->method) { + case self::METHOD_MAILPOET: + return $this->configureMailpoetScheduler(); + break; + case self::METHOD_WORDPRESS: + return $this->configureWordpressScheduler(); + break; + default: + throw new \Exception(__("Task scheduler is not configured")); + break; + }; + } + + function configureMailpoetScheduler() { + try { + $supervisor = new Supervisor(); + $supervisor->checkDaemon(); + } catch(\Exception $e) { + // exceptions should not prevent the rest of the site loading + } + } + + function configureWordpressScheduler() { + $scheduled_queues = SchedulerWorker::getScheduledQueues(); + $running_queues = SendingQueueWorker::getRunningQueues(); + // run cron only when there are scheduled queues ready to be processed + // or are already being processed + if($scheduled_queues || $running_queues) { + return $this->configureMailpoetScheduler(); + } + return; + } + + static function getAvailableMethods() { + return array( + 'mailpoet' => self::METHOD_MAILPOET, + 'wordpress' => self::METHOD_WORDPRESS + ); + } + + static function getCurrentMethod() { + return Setting::getValue('task_scheduler.method'); + } +} \ No newline at end of file diff --git a/lib/Cron/Workers/Scheduler.php b/lib/Cron/Workers/Scheduler.php index 157b80ded8..db1d7a2bbb 100644 --- a/lib/Cron/Workers/Scheduler.php +++ b/lib/Cron/Workers/Scheduler.php @@ -24,9 +24,7 @@ class Scheduler { } function process() { - $scheduled_queues = SendingQueue::where('status', 'scheduled') - ->whereLte('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) - ->findMany(); + $scheduled_queues = self::getScheduledQueues(); if(!count($scheduled_queues)) return; foreach($scheduled_queues as $i => $queue) { $newsletter = Newsletter::filter('filterWithOptions') @@ -185,4 +183,10 @@ class Scheduler { $notification_history : false; } + + static function getScheduledQueues() { + return SendingQueue::where('status', 'scheduled') + ->whereLte('scheduled_at', Carbon::createFromTimestamp(current_time('timestamp'))) + ->findMany(); + } } \ No newline at end of file diff --git a/lib/Cron/Workers/SendingQueue/SendingQueue.php b/lib/Cron/Workers/SendingQueue/SendingQueue.php index 25fc18551a..7888a6960e 100644 --- a/lib/Cron/Workers/SendingQueue/SendingQueue.php +++ b/lib/Cron/Workers/SendingQueue/SendingQueue.php @@ -27,7 +27,7 @@ class SendingQueue { function process() { $this->mailer_task->checkSendingLimit(); - foreach($this->getQueues() as $queue) { + foreach(self::getRunningQueues() as $queue) { // get and pre-process newsletter (render, replace shortcodes/links, etc.) $newsletter = $this->newsletter_task->getAndPreProcess($queue->asArray()); if(!$newsletter) { @@ -168,6 +168,13 @@ class SendingQueue { ->findMany(); } + static function getRunningQueues() { + return SendingQueueModel::orderByDesc('priority') + ->whereNull('deleted_at') + ->whereNull('status') + ->findMany(); + } + function updateQueue($queue) { $queue->count_processed = count($queue->subscribers['processed']) + count($queue->subscribers['failed']);