Skip to content

Commit

Permalink
Saves uploads sent to Facebook and creates a task to clear logs
Browse files Browse the repository at this point in the history
  • Loading branch information
elitonzky committed May 15, 2024
1 parent cf2fb3c commit 26db2a5
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 7 deletions.
15 changes: 12 additions & 3 deletions marketplace/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
"""

import os
from pathlib import Path
from datetime import timedelta
import urllib

import environ
import sentry_sdk

from pathlib import Path

from datetime import timedelta

from sentry_sdk.integrations.django import DjangoIntegration

from corsheaders.defaults import default_headers

from celery.schedules import crontab


# Build paths inside the project like this: BASE_DIR / "subdir".
BASE_DIR = Path(__file__).resolve().parent.parent
Expand Down Expand Up @@ -385,6 +390,10 @@
seconds=env.int("SYNC_FACEBOOK_CATALOGS_TIME", default=5400)
),
},
"task-cleanup-vtex-logs-and-uploads": {
"task": "task_cleanup_vtex_logs_and_uploads",
"schedule": crontab(minute=0, hour=0),
},
}


Expand Down
51 changes: 51 additions & 0 deletions marketplace/wpp_products/migrations/0010_auto_20240515_1158.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Generated by Django 3.2.4 on 2024-05-15 14:58

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):
dependencies = [
("applications", "0017_alter_app_platform"),
("wpp_products", "0009_webhooklog"),
]

operations = [
migrations.CreateModel(
name="ProductUploadLog",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("sku_id", models.IntegerField()),
("created_on", models.DateTimeField(auto_now=True)),
(
"vtex_app",
models.ForeignKey(
blank=True,
limit_choices_to={"code": "vtex"},
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="vtex_product_upload_logs",
to="applications.app",
),
),
],
),
migrations.AddIndex(
model_name="productuploadlog",
index=models.Index(fields=["sku_id"], name="wpp_product_sku_id_7dda12_idx"),
),
migrations.AddIndex(
model_name="productuploadlog",
index=models.Index(
fields=["created_on"], name="wpp_product_created_bdfb44_idx"
),
),
]
19 changes: 19 additions & 0 deletions marketplace/wpp_products/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,22 @@ class WebhookLog(models.Model):
null=True,
limit_choices_to={"code": "vtex"},
)


class ProductUploadLog(models.Model):
sku_id = models.IntegerField()
created_on = models.DateTimeField(auto_now=True)
vtex_app = models.ForeignKey(
App,
on_delete=models.CASCADE,
related_name="vtex_product_upload_logs",
blank=True,
null=True,
limit_choices_to={"code": "vtex"},
)

class Meta:
indexes = [
models.Index(fields=["sku_id"]),
models.Index(fields=["created_on"]),
]
19 changes: 18 additions & 1 deletion marketplace/wpp_products/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
from marketplace.clients.facebook.client import FacebookClient
from marketplace.services.vtex.exceptions import NoVTEXAppConfiguredException

from marketplace.wpp_products.models import Catalog, WebhookLog
from marketplace.wpp_products.models import (
Catalog,
ProductUploadLog,
UploadProduct,
WebhookLog,
)
from marketplace.clients.flows.client import FlowsClient
from marketplace.celery import app as celery_app
from marketplace.services.vtex.generic_service import (
Expand Down Expand Up @@ -312,3 +317,15 @@ def task_upload_vtex_products(**kwargs):
print(f"Upload task for App: {app_vtex_uuid} is already in progress.")

print(f"Processing upload for App: {app_vtex_uuid}")


@celery_app.task(name="task_cleanup_vtex_logs_and_uploads")
def task_cleanup_vtex_logs_and_uploads():
# Delete all records from the ProductUploadLog and WebhookLog tables
ProductUploadLog.objects.all().delete()
WebhookLog.objects.all().delete()

# Delete all UploadProduct records with "success" status
UploadProduct.objects.filter(status="success").delete()

print("Logs and successful uploads have been cleaned up.")
38 changes: 35 additions & 3 deletions marketplace/wpp_products/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.db.models import QuerySet

from marketplace.clients.facebook.client import FacebookClient
from marketplace.wpp_products.models import Catalog, UploadProduct
from marketplace.wpp_products.models import Catalog, ProductUploadLog, UploadProduct
from marketplace.services.facebook.service import (
FacebookService,
)
Expand Down Expand Up @@ -45,13 +45,19 @@ def process_and_upload(
csv_content = self.product_manager.convert_to_csv(products)
if self.send_to_meta(csv_content):
self.product_manager.mark_products_as_sent(products_ids)
# Clear CSV buffer from memory
del csv_content
self.log_sent_products(products_ids)

else:
self.product_manager.mark_products_as_error(products_ids)

# Clear CSV buffer from memory
del csv_content

redis_client.expire(lock_key, lock_expiration_time)

except Exception as e:
print(f"Error on 'process_and_upload': {e}")
self.product_manager.mark_products_as_error(products_ids)

def send_to_meta(self, csv_content: io.BytesIO) -> bool:
"""Sends the CSV content to Meta and returns the upload status."""
Expand Down Expand Up @@ -83,6 +89,25 @@ def send_to_meta(self, csv_content: io.BytesIO) -> bool:
print(f"Error sending data to Meta: {e}")
return False

def log_sent_products(self, product_ids: List[str]):
"""Logs the successfully sent products to the log table."""
for product_id in product_ids:
# Extract SKU ID from "sku_id#seller_id"
sku_id = self.extract_sku_id(product_id)
ProductUploadLog.objects.create(
sku_id=sku_id, vtex_app=self.catalog.vtex_app
)

print(f"Logged {len(product_ids)} products as sent.")

def extract_sku_id(self, product_id: str) -> int:
"""Extract sku_id from facebook_product_id."""
sku_part = product_id.split("#")[0]
if sku_part.isdigit():
return int(sku_part)
else:
raise ValueError(f"Invalid SKU ID, error: {sku_part} is not a number")


class ProductUploadManager:
def convert_to_csv(self, products: QuerySet, include_header=True) -> io.BytesIO:
Expand Down Expand Up @@ -110,6 +135,13 @@ def mark_products_as_sent(self, product_ids: List[str]):

print(f"{updated_count} products successfully marked as sent.")

def mark_products_as_error(self, product_ids: List[str]):
updated_count = UploadProduct.objects.filter(
facebook_product_id__in=product_ids, status="processing"
).update(status="error")

print(f"{updated_count} products marked as error.")


class ProductBatchFetcher(ProductUploadManager):
def __init__(self, catalog, batch_size):
Expand Down

0 comments on commit 26db2a5

Please sign in to comment.