diff --git a/marketplace/settings.py b/marketplace/settings.py index ca3aa756..c2d0ad6f 100644 --- a/marketplace/settings.py +++ b/marketplace/settings.py @@ -11,15 +11,20 @@ """ import os -from pathlib import Path -from datetime import timedelta import urllib - import environ import sentry_sdk + +from pathlib import Path + +from datetime import timedelta + from sentry_sdk.integrations.django import DjangoIntegration + from corsheaders.defaults import default_headers +from celery.schedules import crontab + # Build paths inside the project like this: BASE_DIR / "subdir". BASE_DIR = Path(__file__).resolve().parent.parent @@ -385,6 +390,10 @@ seconds=env.int("SYNC_FACEBOOK_CATALOGS_TIME", default=5400) ), }, + "task-cleanup-vtex-logs-and-uploads": { + "task": "task_cleanup_vtex_logs_and_uploads", + "schedule": crontab(minute=0, hour=0), + }, } diff --git a/marketplace/wpp_products/migrations/0010_auto_20240515_1158.py b/marketplace/wpp_products/migrations/0010_auto_20240515_1158.py new file mode 100644 index 00000000..8dff8528 --- /dev/null +++ b/marketplace/wpp_products/migrations/0010_auto_20240515_1158.py @@ -0,0 +1,51 @@ +# Generated by Django 3.2.4 on 2024-05-15 14:58 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("applications", "0017_alter_app_platform"), + ("wpp_products", "0009_webhooklog"), + ] + + operations = [ + migrations.CreateModel( + name="ProductUploadLog", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("sku_id", models.IntegerField()), + ("created_on", models.DateTimeField(auto_now=True)), + ( + "vtex_app", + models.ForeignKey( + blank=True, + limit_choices_to={"code": "vtex"}, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="vtex_product_upload_logs", + to="applications.app", + ), + ), + ], + ), + migrations.AddIndex( + model_name="productuploadlog", + index=models.Index(fields=["sku_id"], name="wpp_product_sku_id_7dda12_idx"), + ), + migrations.AddIndex( + model_name="productuploadlog", + index=models.Index( + fields=["created_on"], name="wpp_product_created_bdfb44_idx" + ), + ), + ] diff --git a/marketplace/wpp_products/models.py b/marketplace/wpp_products/models.py index 030b4848..66a22f3d 100644 --- a/marketplace/wpp_products/models.py +++ b/marketplace/wpp_products/models.py @@ -169,3 +169,22 @@ class WebhookLog(models.Model): null=True, limit_choices_to={"code": "vtex"}, ) + + +class ProductUploadLog(models.Model): + sku_id = models.IntegerField() + created_on = models.DateTimeField(auto_now=True) + vtex_app = models.ForeignKey( + App, + on_delete=models.CASCADE, + related_name="vtex_product_upload_logs", + blank=True, + null=True, + limit_choices_to={"code": "vtex"}, + ) + + class Meta: + indexes = [ + models.Index(fields=["sku_id"]), + models.Index(fields=["created_on"]), + ] diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index 52962bdc..8bc23acc 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -9,7 +9,12 @@ from marketplace.clients.facebook.client import FacebookClient from marketplace.services.vtex.exceptions import NoVTEXAppConfiguredException -from marketplace.wpp_products.models import Catalog, WebhookLog +from marketplace.wpp_products.models import ( + Catalog, + ProductUploadLog, + UploadProduct, + WebhookLog, +) from marketplace.clients.flows.client import FlowsClient from marketplace.celery import app as celery_app from marketplace.services.vtex.generic_service import ( @@ -312,3 +317,15 @@ def task_upload_vtex_products(**kwargs): print(f"Upload task for App: {app_vtex_uuid} is already in progress.") print(f"Processing upload for App: {app_vtex_uuid}") + + +@celery_app.task(name="task_cleanup_vtex_logs_and_uploads") +def task_cleanup_vtex_logs_and_uploads(): + # Delete all records from the ProductUploadLog and WebhookLog tables + ProductUploadLog.objects.all().delete() + WebhookLog.objects.all().delete() + + # Delete all UploadProduct records with "success" status + UploadProduct.objects.filter(status="success").delete() + + print("Logs and successful uploads have been cleaned up.") diff --git a/marketplace/wpp_products/utils.py b/marketplace/wpp_products/utils.py index c6582ad1..86aafb94 100644 --- a/marketplace/wpp_products/utils.py +++ b/marketplace/wpp_products/utils.py @@ -7,7 +7,7 @@ from django.db.models import QuerySet from marketplace.clients.facebook.client import FacebookClient -from marketplace.wpp_products.models import Catalog, UploadProduct +from marketplace.wpp_products.models import Catalog, ProductUploadLog, UploadProduct from marketplace.services.facebook.service import ( FacebookService, ) @@ -45,13 +45,19 @@ def process_and_upload( csv_content = self.product_manager.convert_to_csv(products) if self.send_to_meta(csv_content): self.product_manager.mark_products_as_sent(products_ids) - # Clear CSV buffer from memory - del csv_content + self.log_sent_products(products_ids) + + else: + self.product_manager.mark_products_as_error(products_ids) + + # Clear CSV buffer from memory + del csv_content redis_client.expire(lock_key, lock_expiration_time) except Exception as e: print(f"Error on 'process_and_upload': {e}") + self.product_manager.mark_products_as_error(products_ids) def send_to_meta(self, csv_content: io.BytesIO) -> bool: """Sends the CSV content to Meta and returns the upload status.""" @@ -83,6 +89,25 @@ def send_to_meta(self, csv_content: io.BytesIO) -> bool: print(f"Error sending data to Meta: {e}") return False + def log_sent_products(self, product_ids: List[str]): + """Logs the successfully sent products to the log table.""" + for product_id in product_ids: + # Extract SKU ID from "sku_id#seller_id" + sku_id = self.extract_sku_id(product_id) + ProductUploadLog.objects.create( + sku_id=sku_id, vtex_app=self.catalog.vtex_app + ) + + print(f"Logged {len(product_ids)} products as sent.") + + def extract_sku_id(self, product_id: str) -> int: + """Extract sku_id from facebook_product_id.""" + sku_part = product_id.split("#")[0] + if sku_part.isdigit(): + return int(sku_part) + else: + raise ValueError(f"Invalid SKU ID, error: {sku_part} is not a number") + class ProductUploadManager: def convert_to_csv(self, products: QuerySet, include_header=True) -> io.BytesIO: @@ -110,6 +135,13 @@ def mark_products_as_sent(self, product_ids: List[str]): print(f"{updated_count} products successfully marked as sent.") + def mark_products_as_error(self, product_ids: List[str]): + updated_count = UploadProduct.objects.filter( + facebook_product_id__in=product_ids, status="processing" + ).update(status="error") + + print(f"{updated_count} products marked as error.") + class ProductBatchFetcher(ProductUploadManager): def __init__(self, catalog, batch_size):