Skip to content

Commit

Permalink
feat: save updates products on redis queue (#586)
Browse files Browse the repository at this point in the history
* feat: save updates products on redis queue

* fix: remove unnecessary imports

* fix: new configured apps start as v2
  • Loading branch information
elitonzky authored Dec 12, 2024
1 parent 6c79553 commit 39c03e1
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 37 deletions.
8 changes: 5 additions & 3 deletions marketplace/services/vtex/generic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def configure(
"currency_pt_br",
"unifies_id_with_seller",
]
app.config["use_sync_v2"] = True
app.config["store_domain"] = store_domain
app.configured = True
app.save()
Expand Down Expand Up @@ -170,7 +171,8 @@ def __init__(
api_credentials: APICredentials,
catalog: Catalog,
skus_ids: list,
webhook: dict,
webhook: Optional[dict] = None,
sellers_ids: list[str] = None,
product_feed: Optional[ProductFeed] = None,
):
"""
Expand All @@ -183,6 +185,7 @@ def __init__(
self.product_feed = product_feed
self.app = self.catalog.app
self.webhook = webhook
self.sellers_ids = sellers_ids if sellers_ids else []
self.product_manager = ProductFacebookManager()

def webhook_product_insert(self):
Expand Down Expand Up @@ -230,13 +233,12 @@ def process_batch_sync(self):
pvt_service = self.get_private_service(
self.api_credentials.app_key, self.api_credentials.app_token
)
seller_ids = self._get_sellers_ids(pvt_service)

# Fetch product data
products_dto = pvt_service.update_webhook_product_info(
domain=self.api_credentials.domain,
skus_ids=self.skus_ids,
seller_ids=seller_ids,
seller_ids=self.sellers_ids,
catalog=self.catalog,
)
if not products_dto:
Expand Down
2 changes: 1 addition & 1 deletion marketplace/services/vtex/utils/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def process_single_sku(self, sku_id):

# Define the sellers to be synchronized
sellers_to_sync = []
if self.use_sku_sellers:
if self.use_sku_sellers and not self.update_product:
sku_sellers = product_details.get("SkuSellers")
for seller in sku_sellers:
seller_id = seller.get("SellerId")
Expand Down
148 changes: 115 additions & 33 deletions marketplace/wpp_products/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from celery import shared_task

from django_redis import get_redis_connection
from django.db import reset_queries, close_old_connections
from django.db.models import Exists, OuterRef
from django.core.cache import cache
Expand All @@ -29,11 +30,10 @@
from marketplace.services.vtex.generic_service import APICredentials
from marketplace.applications.models import App

from django_redis import get_redis_connection

from marketplace.wpp_products.utils import (
ProductBatchUploader,
ProductUploader,
RedisQueue,
SellerSyncUtils,
UploadManager,
ProductSyncMetaPolices,
Expand Down Expand Up @@ -425,10 +425,24 @@ def send_sync(app_uuid: str, webhook: dict):
celery_queue = app.config.get("celery_queue_name", "product_synchronization")

if use_sync_v2:
logger.info(f"App {app_uuid} uses Sync v2. Forwarding to batch update task.")
logger.info(f"App {app_uuid} uses Sync v2. Enqueuing for batch update.")

# Extract seller_id from webhook
seller_id = _extract_sellers_ids(webhook)
if not seller_id:
raise ValueError(f"Seller ID not found in webhook. App:{str(app.uuid)}")

# Enqueue the seller and SKU in the task_enqueue_webhook
celery_app.send_task(
"task_update_batch_products",
kwargs={"app_uuid": app_uuid, "webhook": webhook},
"task_enqueue_webhook",
kwargs={"app_uuid": app_uuid, "seller": seller_id, "sku_id": sku_id},
queue=celery_queue,
ignore_result=True,
)
# Dequeue
celery_app.send_task(
"task_dequeue_webhooks",
kwargs={"app_uuid": app_uuid, "celery_queue": celery_queue},
queue=celery_queue,
ignore_result=True,
)
Expand Down Expand Up @@ -544,39 +558,34 @@ def task_sync_product_policies():


@celery_app.task(name="task_update_batch_products")
def task_update_batch_products(**kwargs):
def task_update_batch_products(app_uuid: str, seller: str, sku_id: str):
"""
Processes product updates for a VTEX app based on a webhook.
Processes product updates for a VTEX app based on a seller and SKU.
"""
start_time = datetime.now()
vtex_base_service = VtexServiceBase()

app_uuid = kwargs.get("app_uuid")
webhook = kwargs.get("webhook")

sku_id = webhook.get("IdSku")
seller_an = webhook.get("An")
seller_chain = webhook.get("SellerChain")

try:
logger.info(
f"Processing product update for App UUID: {app_uuid}, SKU_ID: {sku_id}. "
f"'An': {seller_an}, 'SellerChain': {seller_chain}."
f"Processing product update for App UUID: {app_uuid}, SKU_ID: {sku_id}, Seller: {seller}."
)

# Fetch app configuration from cache or database
cache_key = f"app_cache_{app_uuid}"
vtex_app = cache.get(cache_key)
if not vtex_app:
vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex")
cache.set(cache_key, vtex_app, timeout=300)

# Ensure the app is configured for synchronization
# Ensure synchronization is enabled for the app
if not vtex_app.config.get("initial_sync_completed", False):
logger.info(f"Initial sync not completed for App: {app_uuid}. Task ending.")
return

# Get VTEX credentials
api_credentials = vtex_base_service.get_vtex_credentials_or_raise(vtex_app)

# Fetch catalog
catalog = vtex_app.vtex_catalogs.first()
if not catalog:
logger.info(f"No catalog found for VTEX app: {vtex_app.uuid}")
Expand All @@ -587,35 +596,108 @@ def task_update_batch_products(**kwargs):
api_credentials=api_credentials,
catalog=catalog,
skus_ids=[sku_id],
webhook=webhook,
sellers_ids=[seller],
)

# Process products
# Process product updates in batch
products = vtex_update_service.process_batch_sync()
if products is None:
logger.info(
f"No products to process for VTEX app: {app_uuid}. Task ending."
f"No products to process for App: {app_uuid}, SKU: {sku_id}. Task ending."
)
return

# Log webhook
# Log webhook after successful processing
close_old_connections()
WebhookLog.objects.create(sku_id=sku_id, data=webhook, vtex_app=vtex_app)
WebhookLog.objects.create(
sku_id=sku_id, data={"IdSku": sku_id, "An": seller}, vtex_app=vtex_app
)

except Exception as e:
logger.error(
f"An error occurred during the updating Webhook VTEX products for App: {app_uuid}, {e}"
f"An error occurred during the processing of SKU: {sku_id} for App: {app_uuid}. Error: {e}"
)

# Log task duration
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
minutes, seconds = divmod(duration, 60)
finally:
# Log task duration
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
minutes, seconds = divmod(duration, 60)

logger.info(
f"Finished processing update for SKU: {sku_id}, App: {app_uuid}. "
f"Task completed in {int(minutes)} minutes and {int(seconds)} seconds."
)
logger.info(
f"Finished processing update for SKU: {sku_id}, App: {app_uuid}. "
f"Task completed in {int(minutes)} minutes and {int(seconds)} seconds."
)

# Start upload task
UploadManager.check_and_start_upload(app_uuid)
# Start upload task
UploadManager.check_and_start_upload(app_uuid)


@celery_app.task(name="task_enqueue_webhook")
def task_enqueue_webhook(app_uuid: str, seller: str, sku_id: str):
"""
Enqueues the seller and SKU in Redis for batch processing.
"""
try:
queue = RedisQueue(f"webhook_queue:{app_uuid}")
value = f"{seller}#{sku_id}"

# Added to queue if it doesn't exist
queue.insert(value)

print(
f"Webhook enqueued for App: {app_uuid}, Item: {value}, Total Enqueue: {queue.length()}"
)
except Exception as e:
logger.error(f"Failed to enqueue webhook for App: {app_uuid}, {e}")


@celery_app.task(name="task_dequeue_webhooks")
def task_dequeue_webhooks(app_uuid: str, celery_queue: str):
"""
Dequeues webhooks from Redis and dispatches them to `task_update_batch_products`.
"""
queue_key = f"webhook_queue:{app_uuid}"
queue = RedisQueue(queue_key)
lock_key = f"lock:{queue_key}"
redis = queue.redis

lock_ttl_seconds = 60 * 5 # Lock expires in 5 minutes

# Attempt to acquire the lock
if not redis.set(lock_key, "locked", nx=True, ex=lock_ttl_seconds):
logger.info(f"Task already running for App: {app_uuid}. Skipping dequeue.")
return

try:
print(
f"Starting dequeue process for App: {app_uuid}. Total items: {queue.length()}"
)

while True:
# Renew the lock to ensure we don't lose it while processing
redis.expire(lock_key, lock_ttl_seconds)

item = queue.remove() # Fetch the next item from the queue
if not item:
break # Exit when the queue is empty

seller, sku_id = item.split("#")
celery_app.send_task(
"task_update_batch_products",
kwargs={
"app_uuid": app_uuid,
"seller": seller,
"sku_id": sku_id,
},
queue=celery_queue,
ignore_result=True,
)
print({"app_uuid": app_uuid, "seller": seller, "sku_id": sku_id})
logger.info(f"Dispatched task for Seller: {seller}, SKU: {sku_id}")

except Exception as e:
logger.error(f"Error during dequeue process for App: {app_uuid}, {e}")
finally:
# Release the lock at the end of the task
redis.delete(lock_key)
45 changes: 45 additions & 0 deletions marketplace/wpp_products/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import logging
import json
import time

from typing import List, Dict, Any

Expand Down Expand Up @@ -547,3 +548,47 @@ def log_sent_products(self, product_ids: List[str]):
sku_id=sku_id, vtex_app=self.catalog.vtex_app
)
print(f"Logged {len(product_ids)} products as sent.")


class RedisQueue:
def __init__(self, queue_key):
self.queue_key = queue_key
self.redis = get_redis_connection()

def insert(self, value):
"""Add an item to the ZSET queue with a timestamp score."""
# Check if the item already exists
if self.redis.zscore(self.queue_key, value) is not None:
print(value, "already exists")
return False # Skip insertion if it exists

# Add the item with the current timestamp as the score
score = time.time()
self.redis.zadd(self.queue_key, {value: score})
self.redis.expire(self.queue_key, 3600 * 24) # TTL of 24 hours
return True

def remove(self):
"""Remove and return the first item from the queue (FIFO)."""
items = self.redis.zrange(
self.queue_key, 0, 0, withscores=False
) # Get the first item
if not items:
return None
self.redis.zrem(self.queue_key, items[0]) # Remove the first item
return items[0].decode("utf-8")

def order(self):
"""List all items in the queue in order."""
items = self.redis.zrange(self.queue_key, 0, -1, withscores=False)
return [item.decode("utf-8") for item in items]

def length(self):
"""Returns the total number of items in the queue."""
return self.redis.zcard(self.queue_key)

def get_batch(self, batch_size):
items = self.redis.zrange(self.queue_key, 0, batch_size - 1, withscores=False)
if items:
self.redis.zrem(self.queue_key, *items)
return [item.decode("utf-8") for item in items]

0 comments on commit 39c03e1

Please sign in to comment.