Skip to content

Commit

Permalink
Removes redundant calls to raise_on_failed_jobs() in entire test suit…
Browse files Browse the repository at this point in the history
…e. Refactors tests where necessary.
  • Loading branch information
willi-mueller committed Sep 9, 2024
1 parent 2606217 commit 864f642
Show file tree
Hide file tree
Showing 22 changed files with 79 additions and 114 deletions.
4 changes: 3 additions & 1 deletion dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ def _run(

if self.abort_task_if_any_job_failed is not None:
dlt.config["load.raise_on_failed_jobs"] = self.abort_task_if_any_job_failed
logger.info("Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}")
logger.info(
"Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}"
)

if self.log_progress_period > 0 and task_pipeline.collector == NULL_COLLECTOR:
task_pipeline.collector = log(log_period=self.log_progress_period, logger=logger.LOGGER)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/chess_production/chess_production.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ def load_data_with_retry(pipeline, data):
)
# get data for a few famous players
data = chess(max_players=MAX_PLAYERS)
load_info = load_data_with_retry(pipeline, data)
load_data_with_retry(pipeline, data)
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,3 @@ def bigquery_insert(
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))

print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
keywords: [parent child relationship, parent key]
---
This example demonstrates handling data with parent-child relationships using
the `dlt` library. You learn how to integrate specific fields (e.g., primary,
foreign keys) from a parent record into each child record.
This example demonstrates handling data with parent-child relationships using the `dlt` library.
You learn how to integrate specific fields (e.g., primary, foreign keys) from a parent record into each child record.
In this example, we'll explore how to:
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/running-in-production/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def check(ex: Exception):
If any job in the package **fails terminally** it will be moved to `failed_jobs` folder and assigned
such status.
By default, **an exceptions is raised** and on the first failed job, the load package will be aborted with `LoadClientJobFailed` (terminal exception).
Such package will be completed but its load id is not added to the `_dlt_loads` table.
Such package will be completed but its load id is not added to the `_dlt_loads` table.
All the jobs that were running in parallel are completed before raising. The dlt state, if present, will not be visible to `dlt`.
Here is an example `config.toml` to disable this behavior:

Expand Down
1 change: 0 additions & 1 deletion tests/destinations/test_custom_destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ def sink_func_with_spec(
[1, 2, 3], table_name="items"
)


# check destination with additional config params
@dlt.destination(spec=MyDestinationSpec)
def sink_func_with_spec_and_additional_params(
Expand Down
22 changes: 9 additions & 13 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def some_data(created_at=dlt.sources.incremental("created_at")):
pipeline_name=uniq_id(),
destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")),
)
p.run(some_data()).raise_on_failed_jobs()
p.run(some_data()).raise_on_failed_jobs()
p.run(some_data())
p.run(some_data())

with p.sql_client() as c:
with c.execute_query("SELECT created_at, id FROM some_data order by created_at, id") as cur:
Expand Down Expand Up @@ -248,8 +248,8 @@ def some_data(created_at=dlt.sources.incremental("created_at")):
pipeline_name=uniq_id(),
destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")),
)
p.run(some_data()).raise_on_failed_jobs()
p.run(some_data()).raise_on_failed_jobs()
p.run(some_data())
p.run(some_data())

with p.sql_client() as c:
with c.execute_query("SELECT created_at, id FROM some_data order by created_at, id") as cur:
Expand Down Expand Up @@ -455,7 +455,7 @@ def some_data(created_at=dlt.sources.incremental("created_at")):
pipeline_name=uniq_id(),
destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")),
)
p.run(some_data()).raise_on_failed_jobs()
p.run(some_data())

with p.sql_client() as c:
with c.execute_query(
Expand Down Expand Up @@ -1325,12 +1325,11 @@ def some_data(
):
yield from source_items

info = p.run(some_data())
info.raise_on_failed_jobs()
p.run(some_data())
norm_info = p.last_trace.last_normalize_info
assert norm_info.row_counts["some_data"] == 20
# load incrementally
info = p.run(some_data())
p.run(some_data())
norm_info = p.last_trace.last_normalize_info
assert "some_data" not in norm_info.row_counts

Expand Down Expand Up @@ -2560,11 +2559,8 @@ def test_source():
pip_1_name = "test_pydantic_columns_validator_" + uniq_id()
pipeline = dlt.pipeline(pipeline_name=pip_1_name, destination="duckdb")

info = pipeline.run(test_source())
info.raise_on_failed_jobs()

info = pipeline.run(test_source_incremental())
info.raise_on_failed_jobs()
pipeline.run(test_source())
pipeline.run(test_source_incremental())

# verify that right steps are at right place
steps = test_source().table_name._pipe._steps
Expand Down
4 changes: 1 addition & 3 deletions tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
drop_active_pipeline_data,
TABLE_UPDATE,
sequence_generator,
empty_schema,
)

# mark all tests as essential, do not remove
Expand Down Expand Up @@ -1019,8 +1018,7 @@ def sources() -> List[DltResource]:
dlt.resource([{"col2": "ABC"}], name="hints"),
table_description="Once upon a time a small table got hinted twice.",
)
info = pipeline.run(mod_hints)
info.raise_on_failed_jobs()
pipeline.run(mod_hints)
assert pipeline.last_trace.last_normalize_info.row_counts["hints"] == 1

with pipeline.sql_client() as c:
Expand Down
1 change: 0 additions & 1 deletion tests/load/pipeline/test_csv_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def test_custom_csv_no_header(
table_name="no_header",
loader_file_format=file_format,
)
info.raise_on_failed_jobs()
print(info)
assert_only_table_columns(pipeline, "no_header", [col["name"] for col in columns])
rows = load_tables_to_dicts(pipeline, "no_header")
Expand Down
4 changes: 2 additions & 2 deletions tests/load/pipeline/test_drop.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Any, Iterator, Dict, Any, List
from typing import Iterator, Dict, Any, List
from unittest import mock
from itertools import chain

Expand Down Expand Up @@ -355,7 +355,7 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf

attached.extract(droppable_source()) # TODO: individual steps cause pipeline.run() never raises
attached.normalize()
attached.load(raise_on_failed_jobs=True)
attached.load()


@pytest.mark.parametrize(
Expand Down
22 changes: 8 additions & 14 deletions tests/load/pipeline/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ def test_duck_case_names(destination_config: DestinationTestConfiguration) -> No
os.environ["SCHEMA__NAMING"] = "duck_case"
pipeline = destination_config.setup_pipeline("test_duck_case_names")
# create tables and columns with emojis and other special characters
info = pipeline.run(
pipeline.run(
airtable_emojis().with_resources("📆 Schedule", "🦚Peacock", "🦚WidePeacock"),
loader_file_format=destination_config.file_format,
)
info.raise_on_failed_jobs()
info = pipeline.run(
pipeline.run(
[{"🐾Feet": 2, "1+1": "two", "\nhey": "value"}],
table_name="🦚Peacocks🦚",
loader_file_format=destination_config.file_format,
)
info.raise_on_failed_jobs()
table_counts = load_table_counts(
pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]
)
Expand Down Expand Up @@ -103,13 +101,12 @@ def test_duck_precision_types(destination_config: DestinationTestConfiguration)
"col5_int": 2**64 // 2 - 1,
}
]
info = pipeline.run(
pipeline.run(
row,
table_name="row",
loader_file_format=destination_config.file_format,
columns=TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS + TABLE_UPDATE_ALL_INT_PRECISIONS,
)
info.raise_on_failed_jobs()

with pipeline.sql_client() as client:
table = client.native_connection.sql("SELECT * FROM row").arrow()
Expand Down Expand Up @@ -163,14 +160,13 @@ class EventV1(BaseModel):

event = {"ver": 1, "id": "id1", "details": {"detail_id": "detail_1", "is_complete": False}}

info = pipeline.run(
pipeline.run(
[event],
table_name="events",
columns=EventV1,
loader_file_format="parquet",
schema_contract="evolve",
)
info.raise_on_failed_jobs()
print(pipeline.default_schema.to_pretty_yaml())

# we will use a different pipeline with a separate schema but writing to the same dataset and to the same table
Expand All @@ -196,14 +192,13 @@ class EventV2(BaseModel):
"test_new_nested_prop_parquet_2", dataset_name="test_dataset"
)
pipeline.destination = duck_factory # type: ignore
info = pipeline.run(
pipeline.run(
[event],
table_name="events",
columns=EventV2,
loader_file_format="parquet",
schema_contract="evolve",
)
info.raise_on_failed_jobs()
print(pipeline.default_schema.to_pretty_yaml())


Expand All @@ -216,8 +211,7 @@ def test_jsonl_reader(destination_config: DestinationTestConfiguration) -> None:
pipeline = destination_config.setup_pipeline("test_jsonl_reader")

data = [{"a": 1, "b": 2}, {"a": 1}]
info = pipeline.run(data, table_name="data", loader_file_format="jsonl")
info.raise_on_failed_jobs()
pipeline.run(data, table_name="data", loader_file_format="jsonl")


@pytest.mark.parametrize(
Expand All @@ -242,8 +236,8 @@ def _get_shuffled_events(repeat: int = 1):

pipeline = destination_config.setup_pipeline("test_provoke_parallel_parquet_same_table")

info = pipeline.run(_get_shuffled_events(50))
info.raise_on_failed_jobs()
pipeline.run(_get_shuffled_events(50))

assert_data_table_counts(
pipeline,
expected_counts={
Expand Down
5 changes: 0 additions & 5 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None:
"""Run pipeline twice with merge write disposition
Regardless wether primary key is set or not, filesystem appends
"""
import pyarrow.parquet as pq # Module is evaluated by other tests

os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True"

Expand Down Expand Up @@ -102,7 +101,6 @@ def test_pipeline_csv_filesystem_destination(item_type: TestDataItemFormat) -> N

item, rows, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True)
info = pipeline.run(item, table_name="table", loader_file_format="csv")
info.raise_on_failed_jobs()
job = info.load_packages[0].jobs["completed_jobs"][0].file_path
assert job.endswith("csv")
with open(job, "r", encoding="utf-8", newline="") as f:
Expand All @@ -128,7 +126,6 @@ def test_csv_options(item_type: TestDataItemFormat) -> None:

item, rows, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True)
info = pipeline.run(item, table_name="table", loader_file_format="csv")
info.raise_on_failed_jobs()
job = info.load_packages[0].jobs["completed_jobs"][0].file_path
assert job.endswith("csv")
with open(job, "r", encoding="utf-8", newline="") as f:
Expand Down Expand Up @@ -157,7 +154,6 @@ def test_csv_quoting_style(item_type: TestDataItemFormat) -> None:

item, _, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=True)
info = pipeline.run(item, table_name="table", loader_file_format="csv")
info.raise_on_failed_jobs()
job = info.load_packages[0].jobs["completed_jobs"][0].file_path
assert job.endswith("csv")
with open(job, "r", encoding="utf-8", newline="") as f:
Expand Down Expand Up @@ -693,7 +689,6 @@ def test_delta_table_empty_source(
Tests both empty Arrow table and `dlt.mark.materialize_table_schema()`.
"""
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.deltalake import ensure_delta_compatible_arrow_data, get_delta_tables
from tests.pipeline.utils import users_materialize_table_schema

Expand Down
10 changes: 5 additions & 5 deletions tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from dlt.common import json, sleep
from dlt.common.pipeline import SupportsPipeline
from dlt.common.destination import Destination
from dlt.common.destination.exceptions import DestinationHasFailedJobs
from dlt.common.destination.reference import WithStagingDataset
from dlt.common.schema.exceptions import CannotCoerceColumnException
from dlt.common.schema.schema import Schema
Expand All @@ -24,6 +23,7 @@
from dlt.destinations.job_client_impl import SqlJobClientBase
from dlt.extract.exceptions import ResourceNameMissing
from dlt.extract.source import DltSource
from dlt.load.exceptions import LoadClientJobFailed
from dlt.pipeline.exceptions import (
CannotRestorePipelineException,
PipelineConfigMissing,
Expand Down Expand Up @@ -767,10 +767,10 @@ def test_snowflake_custom_stage(destination_config: DestinationTestConfiguration
"""Using custom stage name instead of the table stage"""
os.environ["DESTINATION__SNOWFLAKE__STAGE_NAME"] = "my_non_existing_stage"
pipeline, data = simple_nested_pipeline(destination_config, f"custom_stage_{uniq_id()}", False)
info = pipeline.run(data(), loader_file_format=destination_config.file_format)
with pytest.raises(DestinationHasFailedJobs) as f_jobs:
info.raise_on_failed_jobs()
assert "MY_NON_EXISTING_STAGE" in f_jobs.value.failed_jobs[0].failed_message
with pytest.raises(LoadClientJobFailed) as f_jobs:
pipeline.run(data(), loader_file_format=destination_config.file_format)

assert "MY_NON_EXISTING_STAGE" in f_jobs.value.failed_message

drop_active_pipeline_data()

Expand Down
4 changes: 1 addition & 3 deletions tests/load/qdrant/test_restore_state.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import TYPE_CHECKING
import pytest
from qdrant_client import models

import dlt
from tests.load.utils import destinations_configs, DestinationTestConfiguration

from dlt.common.destination.reference import JobClientBase, WithStateSync
from dlt.destinations.impl.qdrant.qdrant_job_client import QdrantClient


Expand Down Expand Up @@ -37,7 +35,7 @@ def dummy_table():
pipeline.extract(dummy_table)

pipeline.normalize()
info = pipeline.load(raise_on_failed_jobs=True)
info = pipeline.load()

client: QdrantClient
with pipeline.destination_client() as client: # type: ignore[assignment]
Expand Down
10 changes: 4 additions & 6 deletions tests/load/sources/sql_database/test_sql_database_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import re
from copy import deepcopy
from datetime import datetime # noqa: I251
from typing import Any, Callable, cast, List, Optional, Set

import pytest
Expand Down Expand Up @@ -505,7 +503,7 @@ def test_all_types_no_precision_hints(
source.resources[table_name].add_map(unwrap_json_connector_x("json_col"))
pipeline.extract(source)
pipeline.normalize(loader_file_format="parquet")
pipeline.load().raise_on_failed_jobs()
pipeline.load()

schema = pipeline.default_schema
# print(pipeline.default_schema.to_pretty_yaml())
Expand Down Expand Up @@ -605,7 +603,7 @@ def test_deferred_reflect_in_source(
pipeline.extract(source)
# use insert values to convert parquet into INSERT
pipeline.normalize(loader_file_format="insert_values")
pipeline.load().raise_on_failed_jobs()
pipeline.load()
precision_table = pipeline.default_schema.get_table("has_precision")
assert_precision_columns(
precision_table["columns"],
Expand Down Expand Up @@ -661,7 +659,7 @@ def test_deferred_reflect_in_resource(
pipeline.extract(table)
# use insert values to convert parquet into INSERT
pipeline.normalize(loader_file_format="insert_values")
pipeline.load().raise_on_failed_jobs()
pipeline.load()
precision_table = pipeline.default_schema.get_table("has_precision")
assert_precision_columns(
precision_table["columns"],
Expand Down Expand Up @@ -954,7 +952,7 @@ def dummy_source():
pipeline.extract(source)

pipeline.normalize()
pipeline.load().raise_on_failed_jobs()
pipeline.load()

channel_rows = load_tables_to_dicts(pipeline, "chat_channel")["chat_channel"]
assert channel_rows and all(row["active"] for row in channel_rows)
Expand Down
11 changes: 9 additions & 2 deletions tests/load/test_dummy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,15 @@ def test_spool_job_failed() -> None:
assert len(started_files) == 0

# test the whole
loader_config = LoaderConfiguration(raise_on_failed_jobs = False, workers=1, pool_type="none")
load = setup_loader(client_config=DummyClientConfiguration(fail_prob=1.0), loader_config=loader_config)
loader_config = LoaderConfiguration(
raise_on_failed_jobs=False,
workers=1,
pool_type="none",
)
load = setup_loader(
client_config=DummyClientConfiguration(fail_prob=1.0),
loader_config=loader_config,
)
load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES)
run_all(load)

Expand Down
Loading

0 comments on commit 864f642

Please sign in to comment.