Sync subscribers sequentially from orders

[MAILPOET-3954]
This commit is contained in:
Jan Lysý
2021-11-24 15:34:34 +01:00
committed by Veljko V
parent 00f4913739
commit 9bd6cb159c
4 changed files with 83 additions and 71 deletions

View File

@@ -3,9 +3,9 @@
namespace MailPoet\Cron\Workers;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Segments\WooCommerce;
use MailPoet\Segments\WooCommerce as WooCommerceSegment;
use MailPoet\WooCommerce\Helper as WooCommerceHelper;
use MailPoetVendor\Doctrine\DBAL\Connection;
class WooCommerceSync extends SimpleWorker {
const TASK_TYPE = 'woocommerce_sync';
@@ -18,12 +18,17 @@ class WooCommerceSync extends SimpleWorker {
/** @var WooCommerceHelper */
private $woocommerceHelper;
/** @var Connection */
private $connection;
public function __construct(
WooCommerceSegment $woocommerceSegment,
WooCommerceHelper $woocommerceHelper
WooCommerceHelper $woocommerceHelper,
Connection $connection
) {
$this->woocommerceSegment = $woocommerceSegment;
$this->woocommerceHelper = $woocommerceHelper;
$this->connection = $connection;
parent::__construct();
}
@@ -32,15 +37,29 @@ class WooCommerceSync extends SimpleWorker {
}
public function processTaskStrategy(ScheduledTaskEntity $task, $timer) {
$countOfSynchronized = $task->getMeta()['count_of_synchronized'] ?? 0;
$count = $this->woocommerceSegment->synchronizeCustomers($countOfSynchronized);
$lastProcessedOrderId = $task->getMeta()['last_processed_order_id'] ?? 0;
$highestOrderId = $this->getHighestOrderId();
$countOfSynchronized += $count;
$task->setMeta(['count_of_synchronized' => $countOfSynchronized]);
$lastProcessedOrderId = $this->woocommerceSegment->synchronizeCustomers($lastProcessedOrderId, $highestOrderId);
$meta = $task->getMeta() ?? [];
$meta['last_processed_order_id'] = $lastProcessedOrderId;
$task->setMeta($meta);
$this->scheduledTasksRepository->persist($task);
$this->scheduledTasksRepository->flush();
if ($count === WooCommerce::BATCH_SIZE) {
if ($lastProcessedOrderId !== $highestOrderId) {
return false;
}
return true;
}
private function getHighestOrderId(): int {
global $wpdb;
return (int)$this->connection->fetchOne("
SELECT MAX(wpp.ID)
FROM {$wpdb->posts} wpp
WHERE wpp.post_type = 'shop_order'
");
}
}

View File

@@ -139,7 +139,6 @@ class WooCommerce {
public function synchronizeGuestCustomer($orderId) {
$wcOrder = $this->woocommerceHelper->wcGetOrder($orderId);
$wcSegment = $this->segmentsRepository->getWooCommerceSegment();
if (!$wcOrder instanceof \WC_Order) return;
$signupConfirmation = $this->settings->get('signup_confirmation');
@@ -148,7 +147,8 @@ class WooCommerce {
$status = SubscriberEntity::STATUS_SUBSCRIBED;
}
$insertedEmails = $this->insertSubscribersFromOrders($orderId, $status);
$processedOrders = $this->insertSubscribersFromOrders(null, $orderId, $status);
$insertedEmails = array_keys($processedOrders);
if (empty($insertedEmails[0])) {
return false;
@@ -170,19 +170,17 @@ class WooCommerce {
}
}
public function synchronizeCustomers(int $countOfSynchronized = 0): int {
if ($countOfSynchronized === 0) {
$this->resetSynchronization();
}
public function synchronizeCustomers(int $lastProcessedOrderId = 0, ?int $highestOrderId = null): int {
$this->wpSegment->synchronizeUsers(); // synchronize registered users
$this->markRegisteredCustomers();
$insertedUsersEmails = $this->insertSubscribersFromOrders();
$this->updateNames($insertedUsersEmails);
$processedOrders = $this->insertSubscribersFromOrders($lastProcessedOrderId);
$this->updateNames($processedOrders);
if (count($insertedUsersEmails) < self::BATCH_SIZE) {
$lastProcessedOrderId = end($processedOrders);
if (!$highestOrderId || $lastProcessedOrderId === $highestOrderId) {
$this->insertUsersToSegment();
$this->unsubscribeUsersFromSegment();
$this->removeOrphanedSubscribers();
@@ -190,15 +188,7 @@ class WooCommerce {
$this->updateGlobalStatus();
}
return count($insertedUsersEmails);
}
public function resetSynchronization(): void {
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$this->connection->executeQuery("
UPDATE {$subscribersTable}
SET is_woocommerce_synced = 0
");
return (int)$lastProcessedOrderId;
}
private function ensureColumnCollation(): void {
@@ -249,42 +239,50 @@ class WooCommerce {
", ['capabilities' => $wpdb->prefix . 'capabilities', 'source' => Source::WOOCOMMERCE_USER]);
}
private function insertSubscribersFromOrders($orderId = null, $status = SubscriberEntity::STATUS_SUBSCRIBED): array {
/**
* @return array<string, int>
*/
private function insertSubscribersFromOrders($lastProcessedOrderId = null, $orderId = null, $status = SubscriberEntity::STATUS_SUBSCRIBED): array {
global $wpdb;
$validator = new ModelValidator();
$orderId = !is_null($orderId) ? (int)$orderId : null;
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$subQuery = " AND wppm.meta_value NOT IN (
SELECT email
FROM {$subscribersTable}
WHERE is_woocommerce_synced = 1
ORDER BY email
)";
$parameters = ['batchSize' => self::BATCH_SIZE];
$parameters = [];
$parametersType = [];
if ($orderId) {
$parameters['orderId'] = $orderId;
}
if ($lastProcessedOrderId !== null) {
$parameters['lowestOrderId'] = $lastProcessedOrderId;
$parameters['highestOrderId'] = $lastProcessedOrderId + self::BATCH_SIZE;
$parametersType['lowestOrderId'] = \PDO::PARAM_INT;
$parametersType['highestOrderId'] = \PDO::PARAM_INT;
}
$usersEmails = $this->connection->executeQuery('
SELECT DISTINCT wppm.meta_value as email FROM `' . $wpdb->prefix . 'postmeta` wppm
JOIN `' . $wpdb->prefix . 'posts` p ON wppm.post_id = p.ID AND p.post_type = "shop_order"
WHERE wppm.meta_key = "_billing_email" AND wppm.meta_value != ""
' . ($orderId ? ' AND p.ID = :orderId' : $subQuery) . '
ORDER BY wppm.meta_value
LIMIT :batchSize
', $parameters, ['batchSize' => \PDO::PARAM_INT])->fetchAllAssociative();
$usersEmails = array_column($usersEmails, 'email');
$result = $this->connection->executeQuery("
SELECT wpp.id AS order_id, wppm.meta_value AS email
FROM `{$wpdb->posts}` wpp
JOIN `{$wpdb->postmeta}` wppm ON wpp.ID = wppm.post_id AND wppm.meta_key = '_billing_email' AND wppm.meta_value != ''
WHERE wpp.post_type = 'shop_order'
" . ($orderId ? ' AND wpp.ID = :orderId' : '') . "
" . ($lastProcessedOrderId !== null ? ' AND (wpp.ID > :lowestOrderId AND wpp.ID <= :highestOrderId)' : '') . "
ORDER BY wpp.id
", $parameters, $parametersType)->fetchAllAssociative();
$subscribersValues = [];
$insertedUsersEmails = [];
$now = (Carbon::createFromTimestamp($this->wp->currentTime('timestamp')))->format('Y-m-d H:i:s');
$source = Source::WOOCOMMERCE_USER;
foreach ($usersEmails as $email) {
if (!$validator->validateEmail($email)) {
$processedOrders = [];
foreach ($result as $item) {
if (!$validator->validateEmail($item['email'])) {
continue;
}
$insertedUsersEmails[] = $email;
// because data in result are sorted by id, we can replace the previous order id
$processedOrders[(string)$item['email']] = (int)$item['order_id'];
}
$subscribersValues = [];
$now = (Carbon::createFromTimestamp($this->wp->currentTime('timestamp')))->format('Y-m-d H:i:s');
$source = Source::WOOCOMMERCE_USER;
foreach ($processedOrders as $email => $orderId) {
$subscribersValues[] = "(1, '{$email}', '{$status}', '{$now}', '{$now}', '{$source}')";
}
@@ -296,25 +294,19 @@ class WooCommerce {
');
}
return $insertedUsersEmails;
return $processedOrders;
}
private function updateNames(array $emails): int {
/**
* @param array<string, int> $orders
*/
private function updateNames(array $orders): void {
global $wpdb;
if (!$emails) {
return 0;
if (!$orders) {
return;
}
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
// select latest order ID with emails
$postIdsResult = $this->connection->executeQuery("
SELECT MAX(post_id) AS post_id, meta_value AS email
FROM {$wpdb->postmeta}
WHERE meta_key = \"_billing_email\"
AND meta_value IN (:emails)
GROUP BY meta_value
", ['emails' => $emails], ['emails' => Connection::PARAM_STR_ARRAY])->fetchAllAssociative();
$subscribersData = array_combine(array_column($postIdsResult, 'post_id'), $postIdsResult);
$metaKeys = [
'_billing_first_name',
'_billing_last_name',
@@ -324,10 +316,15 @@ class WooCommerce {
FROM {$wpdb->postmeta}
WHERE meta_key IN (:metaKeys) AND post_id IN (:postIds)
",
['metaKeys' => $metaKeys, 'postIds' => array_column($postIdsResult, 'post_id')],
['metaKeys' => $metaKeys, 'postIds' => array_values($orders)],
['metaKeys' => Connection::PARAM_STR_ARRAY, 'postIds' => Connection::PARAM_INT_ARRAY]
)->fetchAllAssociative();
$subscribersData = [];
foreach ($orders as $email => $postId) {
$subscribersData[$postId]['email'] = $email;
}
foreach ($metaData as $row) {
if (!$row['meta_value']) {
continue;
@@ -335,18 +332,14 @@ class WooCommerce {
$subscribersData[$row['post_id']][$row['meta_key']] = $row['meta_value'];
}
$count = 0;
$now = (Carbon::now())->format('Y-m-d H:i:s');
foreach ($subscribersData as $subscriber) {
$data = [];
$data['is_woocommerce_synced'] = 1;
$data['woocommerce_synced_at'] = $now;
if (!empty($subscriber['_billing_first_name'])) $data['first_name'] = $subscriber['_billing_first_name'];
if (!empty($subscriber['_billing_last_name'])) $data['last_name'] = $subscriber['_billing_last_name'];
$this->connection->update($subscribersTable, $data, ['email' => $subscriber['email']]);
$count++;
}
return $count;
}
private function insertUsersToSegment(): void {

View File

@@ -9,19 +9,22 @@ use MailPoet\Test\DataFactories\ScheduledTask as ScheduledTaskFactory;
use MailPoet\WooCommerce\Helper as WooCommerceHelper;
use MailPoet\WP\Functions as WPFunctions;
use MailPoetVendor\Carbon\Carbon;
use MailPoetVendor\Doctrine\DBAL\Connection;
class WooCommerceSyncTest extends \MailPoetTest {
public $worker;
public $woocommerceHelper;
public $woocommerceSegment;
public $connection;
/** @var ScheduledTaskFactory */
private $scheduledTaskFactory;
public function _before() {
$this->woocommerceSegment = $this->createMock(WooCommerceSegment::class);
$this->woocommerceHelper = $this->createMock(WooCommerceHelper::class);
$this->connection = $this->createMock(Connection::class);
$this->scheduledTaskFactory = new ScheduledTaskFactory();
$this->worker = new WooCommerceSync($this->woocommerceSegment, $this->woocommerceHelper);
$this->worker = new WooCommerceSync($this->woocommerceSegment, $this->woocommerceHelper, $this->connection);
}
public function testItWillNotRunIfWooCommerceIsDisabled() {

View File

@@ -26,9 +26,6 @@ class WooCommerceTest extends \MailPoetTest {
private $userEmails = [];
/** @var SegmentEntity */
private $wooCommerceSegment;
/** @var WooCommerceSegment */
private $wooCommerce;