Skip to content

Commit

Permalink
Make Dataprep system test self-sufficient
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Oct 6, 2023
1 parent 1fc2867 commit 128296f
Show file tree
Hide file tree
Showing 5 changed files with 643 additions and 49 deletions.
85 changes: 85 additions & 0 deletions airflow/providers/google/cloud/hooks/dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ def run_job_group(self, body_request: dict) -> dict[str, Any]:
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def create_flow(self, *, body_request: dict) -> dict:
"""
Creates flow.
:param body_request: Body of the POST request to be sent.
For more details check https://clouddataprep.com/documentation/api#operation/createFlow
"""
endpoint = "/v4/flows"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def copy_flow(
self, *, flow_id: int, name: str = "", description: str = "", copy_datasources: bool = False
Expand Down Expand Up @@ -205,3 +219,74 @@ def _raise_for_status(self, response: requests.models.Response) -> None:
except HTTPError:
self.log.error(response.json().get("exception"))
raise

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def create_imported_dataset(self, *, body_request: dict) -> dict:
"""
Creates imported dataset.
:param body_request: Body of the POST request to be sent.
For more details check https://clouddataprep.com/documentation/api#operation/createImportedDataset
"""
endpoint = "/v4/importedDatasets"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def create_wrangled_dataset(self, *, body_request: dict) -> dict:
"""
Creates wrangled dataset.
:param body_request: Body of the POST request to be sent.
For more details check
https://api.trifacta.com/dataprep-enterprise-cloud/index.html#tag/WrangledDataset
"""
endpoint = "/v4/wrangledDatasets"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def create_output_object(self, *, body_request: dict) -> dict:
"""
Creates output.
:param body_request: Body of the POST request to be sent.
For more details check
https://api.trifacta.com/dataprep-premium/index.html#operation/createOutputObject
"""
endpoint = "/v4/outputObjects"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def create_write_settings(self, *, body_request: dict) -> dict:
"""
Creates write settings.
:param body_request: Body of the POST request to be sent.
For more details check
https://api.trifacta.com/dataprep-premium/index.html#operation/createWriteSetting
"""
endpoint = "/v4/writeSettings"
url: str = urljoin(self._base_url, endpoint)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def delete_imported_dataset(self, *, dataset_id: int) -> None:
"""
Deletes imported dataset.
:param dataset_id: ID of the imported dataset for removal.
"""
endpoint = f"/v4/importedDatasets/{dataset_id}"
url: str = urljoin(self._base_url, endpoint)
response = requests.delete(url, headers=self._headers)
self._raise_for_status(response)
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ def __init__(
**kwargs,
) -> None:
super().__init__(**kwargs)
self.dataprep_conn_id = (dataprep_conn_id,)
self.dataprep_conn_id = dataprep_conn_id
self.job_group_id = job_group_id

def execute(self, context: Context) -> dict:
self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
hook = GoogleDataprepHook(
dataprep_conn_id="dataprep_default",
dataprep_conn_id=self.dataprep_conn_id,
)
response = hook.get_jobs_for_job_group(job_id=int(self.job_group_id))
return response
Expand Down
Loading

0 comments on commit 128296f

Please sign in to comment.