Skip to content

Commit

Permalink
Add Dataplex Catalog Entry Group operators (apache#45751)
Browse files Browse the repository at this point in the history
Co-authored-by: Ulada Zakharava <[email protected]>
  • Loading branch information
VladaZakharova and Ulada Zakharava authored Jan 18, 2025
1 parent 375baaf commit 3b50a06
Show file tree
Hide file tree
Showing 12 changed files with 1,392 additions and 8 deletions.
94 changes: 94 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,97 @@ To get a Data Profile scan job you can use:
:dedent: 4
:start-after: [START howto_dataplex_get_data_profile_job_operator]
:end-before: [END howto_dataplex_get_data_profile_job_operator]


Google Dataplex Catalog Operators
=================================

Dataplex Catalog provides a unified inventory of Google Cloud resources, such as BigQuery, and other resources,
such as on-premises resources. Dataplex Catalog automatically retrieves metadata for Google Cloud resources,
and you bring metadata for third-party resources into Dataplex Catalog.

For more information about Dataplex Catalog visit `Dataplex Catalog production documentation <Product documentation <https://cloud.google.com/dataplex/docs/catalog-overview>`__

.. _howto/operator:DataplexCatalogCreateEntryGroupOperator:

Create an EntryGroup
--------------------

To create an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
For more information about the available fields to pass when creating an Entry Group, visit `Entry Group resource configuration. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup>`__

A simple Entry Group configuration can look as followed:

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_entry_group_configuration]
:end-before: [END howto_dataplex_entry_group_configuration]

With this configuration you can create an Entry Group resource:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_create_entry_group]
:end-before: [END howto_operator_dataplex_catalog_create_entry_group]

.. _howto/operator:DataplexCatalogDeleteEntryGroupOperator:

Delete an EntryGroup
--------------------

To delete an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_delete_entry_group]
:end-before: [END howto_operator_dataplex_catalog_delete_entry_group]

.. _howto/operator:DataplexCatalogListEntryGroupsOperator:

List EntryGroups
----------------

To list all Entry Groups in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryGroupsOperator`.
This operator also supports filtering and ordering the result of the operation.

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_list_entry_groups]
:end-before: [END howto_operator_dataplex_catalog_list_entry_groups]

.. _howto/operator:DataplexCatalogGetEntryGroupOperator:

Get an EntryGroup
-----------------

To retrieve an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_get_entry_group]
:end-before: [END howto_operator_dataplex_catalog_get_entry_group]

.. _howto/operator:DataplexCatalogUpdateEntryGroupOperator:

Update an EntryGroup
--------------------

To update an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_group]
:end-before: [END howto_operator_dataplex_catalog_update_entry_group]
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ encodable
encryptor
enqueue
enqueued
EntryGroup
EntryGroups
entrypoint
entrypoints
Enum
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@
"google-cloud-datacatalog>=3.23.0",
"google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.10.0",
"google-cloud-dataplex>=2.6.0",
"google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.12.0",
"google-cloud-dlp>=3.12.0",
Expand Down
212 changes: 211 additions & 1 deletion providers/src/airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@

import time
from collections.abc import Sequence
from copy import deepcopy
from typing import TYPE_CHECKING, Any

from google.api_core.client_options import ClientOptions
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.dataplex_v1 import DataplexServiceClient, DataScanServiceAsyncClient, DataScanServiceClient
from google.cloud.dataplex_v1 import (
DataplexServiceClient,
DataScanServiceAsyncClient,
DataScanServiceClient,
)
from google.cloud.dataplex_v1.services.catalog_service import CatalogServiceClient
from google.cloud.dataplex_v1.types import (
Asset,
DataScan,
DataScanJob,
EntryGroup,
Lake,
Task,
Zone,
Expand All @@ -47,6 +54,7 @@
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager
from googleapiclient.discovery import Resource

PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
Expand Down Expand Up @@ -110,6 +118,14 @@ def get_dataplex_data_scan_client(self) -> DataScanServiceClient:
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
)

def get_dataplex_catalog_client(self) -> CatalogServiceClient:
"""Return CatalogServiceClient."""
client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443")

return CatalogServiceClient(
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
)

def wait_for_operation(self, timeout: float | None, operation: Operation):
"""Wait for long-lasting operation to complete."""
try:
Expand All @@ -118,6 +134,200 @@ def wait_for_operation(self, timeout: float | None, operation: Operation):
error = operation.exception(timeout=timeout)
raise AirflowException(error)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_group(
self,
location: str,
entry_group_id: str,
entry_group_configuration: EntryGroup | dict,
project_id: str = PROVIDE_PROJECT_ID,
validate_only: bool = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Create an Entry resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_group_id: Required. EntryGroup identifier.
:param entry_group_configuration: Required. EntryGroup configuration body.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. If set, performs request validation, but does not actually execute
the create request.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.create_entry_group(
request={
"parent": client.common_location_path(project_id, location),
"entry_group_id": entry_group_id,
"entry_group": entry_group_configuration,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def get_entry_group(
self,
location: str,
entry_group_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> EntryGroup:
"""
Get an EntryGroup resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_group_id: Required. EntryGroup identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.get_entry_group(
request={
"name": client.entry_group_path(project_id, location, entry_group_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def delete_entry_group(
self,
location: str,
entry_group_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Delete an EntryGroup resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_group_id: Required. EntryGroup identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.delete_entry_group(
request={
"name": client.entry_group_path(project_id, location, entry_group_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def list_entry_groups(
self,
location: str,
filter_by: str | None = None,
order_by: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListEntryGroupsPager:
"""
List EntryGroups resources from specific location.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param filter_by: Optional. Filter to apply on the list results.
:param order_by: Optional. Fields to order the results by.
:param page_size: Optional. Maximum number of EntryGroups to return on one page.
:param page_token: Optional. Token to retrieve the next page of results.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.list_entry_groups(
request={
"parent": client.common_location_path(project_id, location),
"filter": filter_by,
"order_by": order_by,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def update_entry_group(
self,
location: str,
entry_group_id: str,
entry_group_configuration: dict | EntryGroup,
project_id: str = PROVIDE_PROJECT_ID,
update_mask: list[str] | FieldMask | None = None,
validate_only: bool | None = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Update an EntryGroup resource.
:param entry_group_id: Required. ID of the EntryGroup to update.
:param entry_group_configuration: Required. The updated configuration body of the EntryGroup.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param update_mask: Optional. Names of fields whose values to overwrite on an entry group.
If this parameter is absent or empty, all modifiable fields are overwritten. If such
fields are non-required and omitted in the request body, their values are emptied.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. The service validates the request without performing any mutations.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
_entry_group = (
deepcopy(entry_group_configuration)
if isinstance(entry_group_configuration, dict)
else EntryGroup.to_dict(entry_group_configuration)
)
_entry_group["name"] = client.entry_group_path(project_id, location, entry_group_id)
return client.update_entry_group(
request={
"entry_group": _entry_group,
"update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def create_task(
self,
Expand Down
Loading

0 comments on commit 3b50a06

Please sign in to comment.