diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 366f8cc23a4..efa46b3d96c 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -257,6 +257,18 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: for cc_pair_id in cc_pair_ids: lock_beat.reacquire() + # debugging logic - remove after we're done + if ( + tenant_id == "tenant_i-043470d740845ec56" + or tenant_id == "tenant_82b497ce-88aa-4fbd-841a-92cae43529c8" + ): + logger.info( + f"check_for_indexing lock: " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"ttl={redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)}" + ) + redis_connector = RedisConnector(tenant_id, cc_pair_id) with get_session_with_tenant(tenant_id) as db_session: search_settings_list: list[SearchSettings] = get_active_search_settings( diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index db9f20f77db..ce1040b2aff 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -218,6 +218,8 @@ def try_generate_stale_document_sync_tasks( total_tasks_generated = 0 cc_pairs = get_connector_credential_pairs(db_session) for cc_pair in cc_pairs: + lock_beat.reacquire() + rc = RedisConnectorCredentialPair(tenant_id, cc_pair.id) rc.set_skip_docs(docs_to_skip) result = rc.generate_tasks(celery_app, db_session, r, lock_beat, tenant_id) @@ -786,7 +788,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: # print current queue lengths phase_start = time.monotonic() # we don't need every tenant polling redis for this info. - if not MULTI_TENANT or random.randint(1, 100) == 100: + if not MULTI_TENANT or random.randint(1, 10) == 10: r_celery = self.app.broker_connection().channel().client # type: ignore n_celery = celery_get_queue_length("celery", r_celery) n_indexing = celery_get_queue_length(