diff --git a/lib/Cron/Workers/WooCommerceSync.php b/lib/Cron/Workers/WooCommerceSync.php index aa50229bc3..9cfa7a8c38 100644 --- a/lib/Cron/Workers/WooCommerceSync.php +++ b/lib/Cron/Workers/WooCommerceSync.php @@ -3,9 +3,9 @@ namespace MailPoet\Cron\Workers; use MailPoet\Entities\ScheduledTaskEntity; -use MailPoet\Segments\WooCommerce; use MailPoet\Segments\WooCommerce as WooCommerceSegment; use MailPoet\WooCommerce\Helper as WooCommerceHelper; +use MailPoetVendor\Doctrine\DBAL\Connection; class WooCommerceSync extends SimpleWorker { const TASK_TYPE = 'woocommerce_sync'; @@ -18,12 +18,17 @@ class WooCommerceSync extends SimpleWorker { /** @var WooCommerceHelper */ private $woocommerceHelper; + /** @var Connection */ + private $connection; + public function __construct( WooCommerceSegment $woocommerceSegment, - WooCommerceHelper $woocommerceHelper + WooCommerceHelper $woocommerceHelper, + Connection $connection ) { $this->woocommerceSegment = $woocommerceSegment; $this->woocommerceHelper = $woocommerceHelper; + $this->connection = $connection; parent::__construct(); } @@ -32,15 +37,29 @@ class WooCommerceSync extends SimpleWorker { } public function processTaskStrategy(ScheduledTaskEntity $task, $timer) { - $countOfSynchronized = $task->getMeta()['count_of_synchronized'] ?? 0; - $count = $this->woocommerceSegment->synchronizeCustomers($countOfSynchronized); + $lastProcessedOrderId = $task->getMeta()['last_processed_order_id'] ?? 0; + $highestOrderId = $this->getHighestOrderId(); - $countOfSynchronized += $count; - $task->setMeta(['count_of_synchronized' => $countOfSynchronized]); + $lastProcessedOrderId = $this->woocommerceSegment->synchronizeCustomers($lastProcessedOrderId, $highestOrderId); + + $meta = $task->getMeta() ?? []; + $meta['last_processed_order_id'] = $lastProcessedOrderId; + $task->setMeta($meta); + $this->scheduledTasksRepository->persist($task); $this->scheduledTasksRepository->flush(); - if ($count === WooCommerce::BATCH_SIZE) { + + if ($lastProcessedOrderId !== $highestOrderId) { return false; } return true; } + + private function getHighestOrderId(): int { + global $wpdb; + return (int)$this->connection->fetchOne(" + SELECT MAX(wpp.ID) + FROM {$wpdb->posts} wpp + WHERE wpp.post_type = 'shop_order' + "); + } } diff --git a/lib/Segments/WooCommerce.php b/lib/Segments/WooCommerce.php index 115742d31d..b4661ab0d4 100644 --- a/lib/Segments/WooCommerce.php +++ b/lib/Segments/WooCommerce.php @@ -139,7 +139,6 @@ class WooCommerce { public function synchronizeGuestCustomer($orderId) { $wcOrder = $this->woocommerceHelper->wcGetOrder($orderId); - $wcSegment = $this->segmentsRepository->getWooCommerceSegment(); if (!$wcOrder instanceof \WC_Order) return; $signupConfirmation = $this->settings->get('signup_confirmation'); @@ -148,7 +147,8 @@ class WooCommerce { $status = SubscriberEntity::STATUS_SUBSCRIBED; } - $insertedEmails = $this->insertSubscribersFromOrders($orderId, $status); + $processedOrders = $this->insertSubscribersFromOrders(null, $orderId, $status); + $insertedEmails = array_keys($processedOrders); if (empty($insertedEmails[0])) { return false; @@ -170,19 +170,17 @@ class WooCommerce { } } - public function synchronizeCustomers(int $countOfSynchronized = 0): int { - if ($countOfSynchronized === 0) { - $this->resetSynchronization(); - } + public function synchronizeCustomers(int $lastProcessedOrderId = 0, ?int $highestOrderId = null): int { $this->wpSegment->synchronizeUsers(); // synchronize registered users $this->markRegisteredCustomers(); - $insertedUsersEmails = $this->insertSubscribersFromOrders(); - $this->updateNames($insertedUsersEmails); + $processedOrders = $this->insertSubscribersFromOrders($lastProcessedOrderId); + $this->updateNames($processedOrders); - if (count($insertedUsersEmails) < self::BATCH_SIZE) { + $lastProcessedOrderId = end($processedOrders); + if (!$highestOrderId || $lastProcessedOrderId === $highestOrderId) { $this->insertUsersToSegment(); $this->unsubscribeUsersFromSegment(); $this->removeOrphanedSubscribers(); @@ -190,15 +188,7 @@ class WooCommerce { $this->updateGlobalStatus(); } - return count($insertedUsersEmails); - } - - public function resetSynchronization(): void { - $subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName(); - $this->connection->executeQuery(" - UPDATE {$subscribersTable} - SET is_woocommerce_synced = 0 - "); + return (int)$lastProcessedOrderId; } private function ensureColumnCollation(): void { @@ -249,42 +239,50 @@ class WooCommerce { ", ['capabilities' => $wpdb->prefix . 'capabilities', 'source' => Source::WOOCOMMERCE_USER]); } - private function insertSubscribersFromOrders($orderId = null, $status = SubscriberEntity::STATUS_SUBSCRIBED): array { + /** + * @return array + */ + private function insertSubscribersFromOrders($lastProcessedOrderId = null, $orderId = null, $status = SubscriberEntity::STATUS_SUBSCRIBED): array { global $wpdb; $validator = new ModelValidator(); $orderId = !is_null($orderId) ? (int)$orderId : null; $subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName(); - $subQuery = " AND wppm.meta_value NOT IN ( - SELECT email - FROM {$subscribersTable} - WHERE is_woocommerce_synced = 1 - ORDER BY email - )"; - $parameters = ['batchSize' => self::BATCH_SIZE]; + $parameters = []; + $parametersType = []; if ($orderId) { $parameters['orderId'] = $orderId; } + if ($lastProcessedOrderId !== null) { + $parameters['lowestOrderId'] = $lastProcessedOrderId; + $parameters['highestOrderId'] = $lastProcessedOrderId + self::BATCH_SIZE; + $parametersType['lowestOrderId'] = \PDO::PARAM_INT; + $parametersType['highestOrderId'] = \PDO::PARAM_INT; + } - $usersEmails = $this->connection->executeQuery(' - SELECT DISTINCT wppm.meta_value as email FROM `' . $wpdb->prefix . 'postmeta` wppm - JOIN `' . $wpdb->prefix . 'posts` p ON wppm.post_id = p.ID AND p.post_type = "shop_order" - WHERE wppm.meta_key = "_billing_email" AND wppm.meta_value != "" - ' . ($orderId ? ' AND p.ID = :orderId' : $subQuery) . ' - ORDER BY wppm.meta_value - LIMIT :batchSize - ', $parameters, ['batchSize' => \PDO::PARAM_INT])->fetchAllAssociative(); - $usersEmails = array_column($usersEmails, 'email'); + $result = $this->connection->executeQuery(" + SELECT wpp.id AS order_id, wppm.meta_value AS email + FROM `{$wpdb->posts}` wpp + JOIN `{$wpdb->postmeta}` wppm ON wpp.ID = wppm.post_id AND wppm.meta_key = '_billing_email' AND wppm.meta_value != '' + WHERE wpp.post_type = 'shop_order' + " . ($orderId ? ' AND wpp.ID = :orderId' : '') . " + " . ($lastProcessedOrderId !== null ? ' AND (wpp.ID > :lowestOrderId AND wpp.ID <= :highestOrderId)' : '') . " + ORDER BY wpp.id + ", $parameters, $parametersType)->fetchAllAssociative(); - $subscribersValues = []; - $insertedUsersEmails = []; - $now = (Carbon::createFromTimestamp($this->wp->currentTime('timestamp')))->format('Y-m-d H:i:s'); - $source = Source::WOOCOMMERCE_USER; - foreach ($usersEmails as $email) { - if (!$validator->validateEmail($email)) { + $processedOrders = []; + foreach ($result as $item) { + if (!$validator->validateEmail($item['email'])) { continue; } - $insertedUsersEmails[] = $email; + // because data in result are sorted by id, we can replace the previous order id + $processedOrders[(string)$item['email']] = (int)$item['order_id']; + } + + $subscribersValues = []; + $now = (Carbon::createFromTimestamp($this->wp->currentTime('timestamp')))->format('Y-m-d H:i:s'); + $source = Source::WOOCOMMERCE_USER; + foreach ($processedOrders as $email => $orderId) { $subscribersValues[] = "(1, '{$email}', '{$status}', '{$now}', '{$now}', '{$source}')"; } @@ -296,25 +294,19 @@ class WooCommerce { '); } - return $insertedUsersEmails; + return $processedOrders; } - private function updateNames(array $emails): int { + /** + * @param array $orders + */ + private function updateNames(array $orders): void { global $wpdb; - if (!$emails) { - return 0; + if (!$orders) { + return; } $subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName(); - // select latest order ID with emails - $postIdsResult = $this->connection->executeQuery(" - SELECT MAX(post_id) AS post_id, meta_value AS email - FROM {$wpdb->postmeta} - WHERE meta_key = \"_billing_email\" - AND meta_value IN (:emails) - GROUP BY meta_value - ", ['emails' => $emails], ['emails' => Connection::PARAM_STR_ARRAY])->fetchAllAssociative(); - $subscribersData = array_combine(array_column($postIdsResult, 'post_id'), $postIdsResult); $metaKeys = [ '_billing_first_name', '_billing_last_name', @@ -324,10 +316,15 @@ class WooCommerce { FROM {$wpdb->postmeta} WHERE meta_key IN (:metaKeys) AND post_id IN (:postIds) ", - ['metaKeys' => $metaKeys, 'postIds' => array_column($postIdsResult, 'post_id')], + ['metaKeys' => $metaKeys, 'postIds' => array_values($orders)], ['metaKeys' => Connection::PARAM_STR_ARRAY, 'postIds' => Connection::PARAM_INT_ARRAY] )->fetchAllAssociative(); + $subscribersData = []; + foreach ($orders as $email => $postId) { + $subscribersData[$postId]['email'] = $email; + } + foreach ($metaData as $row) { if (!$row['meta_value']) { continue; @@ -335,18 +332,14 @@ class WooCommerce { $subscribersData[$row['post_id']][$row['meta_key']] = $row['meta_value']; } - $count = 0; $now = (Carbon::now())->format('Y-m-d H:i:s'); foreach ($subscribersData as $subscriber) { $data = []; - $data['is_woocommerce_synced'] = 1; $data['woocommerce_synced_at'] = $now; if (!empty($subscriber['_billing_first_name'])) $data['first_name'] = $subscriber['_billing_first_name']; if (!empty($subscriber['_billing_last_name'])) $data['last_name'] = $subscriber['_billing_last_name']; $this->connection->update($subscribersTable, $data, ['email' => $subscriber['email']]); - $count++; } - return $count; } private function insertUsersToSegment(): void { diff --git a/tests/integration/Cron/Workers/WooCommerceSyncTest.php b/tests/integration/Cron/Workers/WooCommerceSyncTest.php index afb4f7ea09..ef462fe153 100644 --- a/tests/integration/Cron/Workers/WooCommerceSyncTest.php +++ b/tests/integration/Cron/Workers/WooCommerceSyncTest.php @@ -9,19 +9,22 @@ use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory; use MailPoet\WooCommerce\Helper as WooCommerceHelper; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; +use MailPoetVendor\Doctrine\DBAL\Connection; class WooCommerceSyncTest extends \MailPoetTest { public $worker; public $woocommerceHelper; public $woocommerceSegment; + public $connection; /** @var ScheduledTaskFactory */ private $scheduledTaskFactory; public function _before() { $this->woocommerceSegment = $this->createMock(WooCommerceSegment::class); $this->woocommerceHelper = $this->createMock(WooCommerceHelper::class); + $this->connection = $this->createMock(Connection::class); $this->scheduledTaskFactory = new ScheduledTaskFactory(); - $this->worker = new WooCommerceSync($this->woocommerceSegment, $this->woocommerceHelper); + $this->worker = new WooCommerceSync($this->woocommerceSegment, $this->woocommerceHelper, $this->connection); } public function testItWillNotRunIfWooCommerceIsDisabled() { diff --git a/tests/integration/Segments/WooCommerceTest.php b/tests/integration/Segments/WooCommerceTest.php index d1ded51ea3..9d6aaa3cad 100644 --- a/tests/integration/Segments/WooCommerceTest.php +++ b/tests/integration/Segments/WooCommerceTest.php @@ -26,9 +26,6 @@ class WooCommerceTest extends \MailPoetTest { private $userEmails = []; - /** @var SegmentEntity */ - private $wooCommerceSegment; - /** @var WooCommerceSegment */ private $wooCommerce;