Skip to content

Commit

Permalink
feat: add ALLOWED_ORGS to limit the orgs that will be run
Browse files Browse the repository at this point in the history
  • Loading branch information
helllllllder committed May 28, 2024
1 parent 36aea8a commit 5344a7a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 12 deletions.
10 changes: 10 additions & 0 deletions db/redis/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import redis
import settings

pool = redis.ConnectionPool.from_url(
settings.REDIS_URL, max_connections=2, decode_responses=True
)


def get_connection():
return redis.Redis(connection_pool=pool)
Empty file removed db/redis/connections.py
Empty file.
22 changes: 12 additions & 10 deletions flowrun/storage/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@ def list_by_timestamp_and_org(
return flowrun_query


get_active_orgs = "SELECT id FROM orgs_org WHERE is_active = TRUE"
get_active_orgs = "SELECT id FROM orgs_org WHERE is_active = TRUE "
org_query_attrs = []
if settings.IS_LAST_ORG_BATCH:
get_active_orgs += "AND id > (%s)"
org_query_attrs = [
settings.ORG_RANGE_FROM,
]
else:
get_active_orgs += "AND id BETWEEN (%s) AND (%s)"
org_query_attrs = [settings.ORG_RANGE_FROM, settings.ORG_RANGE_TO]

get_active_orgs += "order by id;"
if settings.ALLOWED_ORGS:
get_active_orgs += "AND proj_uuid IN %s "
org_query_attrs.append(settings.ALLOWED_ORGS)

# Commented to be used in the next version V5
# if settings.IS_LAST_ORG_BATCH:
# get_active_orgs += "AND id > (%s)"
# org_query_attrs.append(settings.ORG_RANGE_FROM)

get_active_orgs += "order by id;" # FETCH FIRST (%s) ROWS ONLY;"


class OrgPostgreSQL(BaseRetrieveStorage):
Expand All @@ -51,5 +52,6 @@ def list_active(self) -> list[dict]:
flowrun_query = cur.execute(
get_active_orgs,
*org_query_attrs,
# settings.ORGS_BATCH_SIZE,
).fetchall()
return flowrun_query
5 changes: 4 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import settings

# from db.redis.connection import get_connection as get_redis_connection
from shared.processors import BulkObjectETLProcessor

from flowrun.storage.elasticsearch import FlowRunElasticSearch
Expand Down Expand Up @@ -53,7 +54,9 @@ def main():
dsn=settings.SENTRY_DSN,
enable_tracing=True,
)

# redis = get_redis_connection()
# batch_position = redis.get(settings.REDIS_BATCH_POSITION)
# if batch_position
logging.info("Service running on bulk process single thread mode")
bulk_process()

Expand Down
7 changes: 6 additions & 1 deletion settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
FLOW_RUN_BATCH_LIMIT = int(os.environ.get("FLOW_RUN_BATCH_LIMIT", 100))
EMPTY_ORG_SLEEP = float(os.environ.get("EMPTY_ORG_SLEEP", 0.1))
BATCH_PROCESSING_TIME_LIMIT = int(os.environ.get("BATCH_PROCESSING_TIME_LIMIT", 30))
START_RUN_OFFSET = int(os.environ.get("START_RUN_OFFSET", 30))
START_RUN_OFFSET = int(os.environ.get("START_RUN_OFFSET", 60))
FLOW_LAST_INDEXED_FIELD = os.environ.get("FLOW_LAST_INDEXED_FIELD", "modified_on")
ORG_RANGE_FROM = int(os.environ.get("ORG_RANGE_FROM", 0))
ORG_RANGE_TO = int(os.environ.get("ORG_RANGE_TO", 2000))
IS_LAST_ORG_BATCH = bool(int(os.environ.get("IS_LAST_ORG_BATCH", "0")))

if os.environ.get("ALLOWED_ORGS", "") != "":
ALLOWED_ORGS = tuple(org for org in os.environ.get("ALLOWED_ORGS").split(","))
else:
ALLOWED_ORGS = ()

CONNECTION_TYPE = os.environ.get("CONNECTION_TYPE", "pool")
WAIT_TIME_RETRY = int(os.environ.get("WAIT_TIME_RETRY", 10))

Expand Down

0 comments on commit 5344a7a

Please sign in to comment.