diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index c6ca1660f4..d3d5603d28 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -354,6 +354,7 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. + 4. Parallelized sync resources with a limit added will usually produce one item less than the limit. This behavior is not deterministic. Args: max_items (int): The maximum number of items to yield diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 542d0209d6..937263081c 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -217,6 +217,19 @@ async def async_resource1(): assert len(result) == 13 +@pytest.mark.parametrize("parallelized", [False, True]) +def test_limit_sync_resource(parallelized: bool) -> None: + @dlt.resource(parallelized=parallelized) + def sync_resource1(): + for i in range(1, 10): + yield i + + limit = 5 + result = list(sync_resource1().add_limit(limit)) + allowed_result_range = range(limit - int(parallelized), limit + 1) + assert len(result) in allowed_result_range + + @pytest.mark.parametrize("parallelized", [True, False]) def test_parallelized_resource(parallelized: bool) -> None: os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"