diff --git a/metadata-ingestion/source_docs/bigquery.md b/metadata-ingestion/source_docs/bigquery.md
index 98fb78305647a2..97abdad6268f0e 100644
--- a/metadata-ingestion/source_docs/bigquery.md
+++ b/metadata-ingestion/source_docs/bigquery.md
@@ -215,20 +215,23 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
-| Field | Required | Default | Description |
-|-----------------------------|----------|----------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `projects` | | | |
-| `extra_client_options` | | | |
-| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
-| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
-| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
-| `top_n_queries` | | `10` | Number of top queries to save to each table. |
-| `include_operational_stats` | | `true` | Whether to display operational stats. |
-| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. |
-| `query_log_delay` | | | To account for the possibility that the query event arrives after the read event in the audit logs, we wait for at least `query_log_delay` additional events to be processed before attempting to resolve BigQuery job information from the logs. If `query_log_delay` is `None`, it gets treated as an unlimited delay, which prioritizes correctness at the expense of memory usage. |
-| `max_query_duration` | | `15` | Correction to pad `start_time` and `end_time` with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time. |
-| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
-| `table_pattern.deny` | | | List of regex patterns for tables to exclude in ingestion. |
+| Field | Required | Default | Description |
+|---------------------------------|----------|----------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `projects` | | | |
+| `extra_client_options` | | | |
+| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
+| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
+| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
+| `top_n_queries` | | `10` | Number of top queries to save to each table. |
+| `include_operational_stats` | | `true` | Whether to display operational stats. |
+| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. |
+| `query_log_delay` | | | To account for the possibility that the query event arrives after the read event in the audit logs, we wait for at least `query_log_delay` additional events to be processed before attempting to resolve BigQuery job information from the logs. If `query_log_delay` is `None`, it gets treated as an unlimited delay, which prioritizes correctness at the expense of memory usage. |
+| `max_query_duration` | | `15` | Correction to pad `start_time` and `end_time` with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time. |
+| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
+| `table_pattern.deny` | | | List of regex patterns for tables to exclude in ingestion. |
+| `user_email_pattern.allow` | | * | List of regex patterns for user emails to include in usage. |
+| `user_email_pattern.deny` | | | List of regex patterns for user emails to exclude from usage. |
+| `user_email_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
## Compatibility
diff --git a/metadata-ingestion/source_docs/redshift.md b/metadata-ingestion/source_docs/redshift.md
index 8cf4631d04a8c8..8446800b0c9b9f 100644
--- a/metadata-ingestion/source_docs/redshift.md
+++ b/metadata-ingestion/source_docs/redshift.md
@@ -234,21 +234,24 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
-| Field | Required | Default | Description |
-|-----------------------------|----------|----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `username` | | | Redshift username. |
-| `password` | | | Redshift password. |
-| `host_port` | ✅ | | Redshift host URL. |
-| `database` | | | Redshift database. |
-| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
-| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
-| `options.` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs. See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
-| `email_domain` | ✅ | | Email domain of your organisation so users can be displayed on UI appropriately. |
-| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage to consider. |
-| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage to consider. |
-| `top_n_queries` | | `10` | Number of top queries to save to each table. |
-| `include_operational_stats` | | `true` | Whether to display operational stats. |
-| `bucket_duration` | | `"DAY"` | Size of the time window to aggregate usage stats. |
+| Field | Required | Default | Description |
+|---------------------------------|----------|----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `username` | | | Redshift username. |
+| `password` | | | Redshift password. |
+| `host_port` | ✅ | | Redshift host URL. |
+| `database` | | | Redshift database. |
+| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
+| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
+| `options. ` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs. See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
+| `email_domain` | ✅ | | Email domain of your organisation so users can be displayed on UI appropriately. |
+| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage to consider. |
+| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage to consider. |
+| `top_n_queries` | | `10` | Number of top queries to save to each table. |
+| `include_operational_stats` | | `true` | Whether to display operational stats. |
+| `bucket_duration` | | `"DAY"` | Size of the time window to aggregate usage stats. |
+| `user_email_pattern.allow` | | * | List of regex patterns for user emails to include in usage. |
+| `user_email_pattern.deny` | | | List of regex patterns for user emails to exclude from usage. |
+| `user_email_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
## Questions
diff --git a/metadata-ingestion/source_docs/snowflake.md b/metadata-ingestion/source_docs/snowflake.md
index 07bcefd2917c7f..04448c73a03947 100644
--- a/metadata-ingestion/source_docs/snowflake.md
+++ b/metadata-ingestion/source_docs/snowflake.md
@@ -199,25 +199,27 @@ Snowflake integration also supports prevention of redundant reruns for the same
Note that a `.` is used to denote nested fields in the YAML recipe.
-
-| Field | Required | Default | Description |
-|-----------------------------|----------|---------------------------------------------------------------------|----------------------------------------------------------------------------------|
-| `username` | | | Snowflake username. |
-| `password` | | | Snowflake password. |
-| `host_port` | ✅ | | Snowflake host URL. |
-| `warehouse` | | | Snowflake warehouse. |
-| `role` | | | Snowflake role. |
-| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
-| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
-| `email_domain` | | | Email domain of your organisation so users can be displayed on UI appropriately. |
-| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
-| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
-| `top_n_queries` | | `10` | Number of top queries to save to each table. |
-| `include_operational_stats` | | `true` | Whether to display operational stats. |
-| `database_pattern` | | `"^UTIL_DB$" ` `"^SNOWFLAKE$"` `"^SNOWFLAKE_SAMPLE_DATA$" | Allow/deny patterns for db in snowflake dataset names. |
-| `schema_pattern` | | | Allow/deny patterns for schema in snowflake dataset names. |
-| `view_pattern` | | | Allow/deny patterns for views in snowflake dataset names. |
-| `table_pattern` | | | Allow/deny patterns for tables in snowflake dataset names. |
+| Field | Required | Default | Description |
+|---------------------------------|----------|---------------------------------------------------------------------|----------------------------------------------------------------------------------|
+| `username` | | | Snowflake username. |
+| `password` | | | Snowflake password. |
+| `host_port` | ✅ | | Snowflake host URL. |
+| `warehouse` | | | Snowflake warehouse. |
+| `role` | | | Snowflake role. |
+| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
+| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
+| `email_domain` | | | Email domain of your organisation so users can be displayed on UI appropriately. |
+| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
+| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
+| `top_n_queries` | | `10` | Number of top queries to save to each table. |
+| `include_operational_stats` | | `true` | Whether to display operational stats. |
+| `database_pattern` | | `"^UTIL_DB$" ` `"^SNOWFLAKE$"` `"^SNOWFLAKE_SAMPLE_DATA$" | Allow/deny patterns for db in snowflake dataset names. |
+| `schema_pattern` | | | Allow/deny patterns for schema in snowflake dataset names. |
+| `view_pattern` | | | Allow/deny patterns for views in snowflake dataset names. |
+| `table_pattern` | | | Allow/deny patterns for tables in snowflake dataset names. |
+| `user_email_pattern.allow` | | * | List of regex patterns for user emails to include in usage. |
+| `user_email_pattern.deny` | | | List of regex patterns for user emails to exclude from usage. |
+| `user_email_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
:::caution
diff --git a/metadata-ingestion/source_docs/trino.md b/metadata-ingestion/source_docs/trino.md
index 9d3ec330e84f7d..7835c85684b718 100644
--- a/metadata-ingestion/source_docs/trino.md
+++ b/metadata-ingestion/source_docs/trino.md
@@ -110,17 +110,20 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
-| Field | Required | Default | Description |
-| ---------------------- | -------- | -------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `database` | yes | | The name of the catalog from getting the usage |
-| `audit_catalog` | yes | | The catalog name where the audit table can be found |
-| `audit_schema` | yes | | The schema name where the audit table can be found |
-| `email_domain` | yes | | The email domain which will be appended to the users |
-| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
-| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
-| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
-| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
-| `top_n_queries` | | `10` | Number of top queries to save to each table. |
+| Field | Required | Default | Description |
+|---------------------------------|----------|----------------------------------------------------------------|-----------------------------------------------------------------|
+| `database` | yes | | The name of the catalog from getting the usage |
+| `audit_catalog` | yes | | The catalog name where the audit table can be found |
+| `audit_schema` | yes | | The schema name where the audit table can be found |
+| `email_domain` | yes | | The email domain which will be appended to the users |
+| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
+| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
+| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
+| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
+| `top_n_queries` | | `10` | Number of top queries to save to each table. |
+| `user_email_pattern.allow` | | * | List of regex patterns for user emails to include in usage. |
+| `user_email_pattern.deny` | | | List of regex patterns for user emails to exclude from usage. |
+| `user_email_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
## Questions
diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py
index 66ec0ec537486a..04a3e9e32a917e 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py
@@ -742,7 +742,11 @@ def _aggregate_enriched_read_events(
agg_bucket = datasets[floored_ts].setdefault(
resource,
- AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
+ AggregatedDataset(
+ bucket_start_time=floored_ts,
+ resource=resource,
+ user_email_pattern=self.config.user_email_pattern,
+ ),
)
agg_bucket.add_read_entry(event.actor_email, event.query, event.fieldsRead)
num_aggregated += 1
diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py
index 1cac07c65a1e4d..3c75665964be09 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py
@@ -319,7 +319,11 @@ def _aggregate_access_events(
agg_bucket = datasets[floored_ts].setdefault(
resource,
- AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
+ AggregatedDataset(
+ bucket_start_time=floored_ts,
+ resource=resource,
+ user_email_pattern=self.config.user_email_pattern,
+ ),
)
# current limitation in user stats UI, we need to provide email to show users
diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py
index 1cb62bd8b1f953..0c6af64a0da10c 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py
@@ -452,7 +452,11 @@ def _aggregate_access_events(
resource = object.objectName
agg_bucket = datasets[floored_ts].setdefault(
resource,
- AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
+ AggregatedDataset(
+ bucket_start_time=floored_ts,
+ resource=resource,
+ user_email_pattern=self.config.user_email_pattern,
+ ),
)
agg_bucket.add_read_entry(
event.email,
diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py
index 6e404b03eb2b6f..5fd9714607ddd9 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py
@@ -217,7 +217,11 @@ def _aggregate_access_events(
agg_bucket = datasets[floored_ts].setdefault(
resource,
- AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
+ AggregatedDataset(
+ bucket_start_time=floored_ts,
+ resource=resource,
+ user_email_pattern=self.config.user_email_pattern,
+ ),
)
# add @unknown.com to username
diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py
index 3ee2ad9491cd77..b599ead282fe2c 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py
@@ -6,6 +6,7 @@
import pydantic
import datahub.emitter.mce_builder as builder
+from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
BucketDuration,
@@ -27,6 +28,7 @@
class GenericAggregatedDataset(Generic[ResourceType]):
bucket_start_time: datetime
resource: ResourceType
+ user_email_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
readCount: int = 0
queryCount: int = 0
@@ -45,8 +47,13 @@ def add_read_entry(
query: Optional[str],
fields: List[str],
) -> None:
+
+ if not self.user_email_pattern.allowed(user_email):
+ return
+
self.readCount += 1
self.userFreq[user_email] += 1
+
if query:
self.queryCount += 1
self.queryFreq[query] += 1
@@ -115,6 +122,7 @@ def make_usage_workunit(
class BaseUsageConfig(BaseTimeWindowConfig):
top_n_queries: pydantic.PositiveInt = 10
+ user_email_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
include_operational_stats: bool = True
@pydantic.validator("top_n_queries")
diff --git a/metadata-ingestion/tests/unit/test_usage_common.py b/metadata-ingestion/tests/unit/test_usage_common.py
index 021a988422688a..8d929214156a5c 100644
--- a/metadata-ingestion/tests/unit/test_usage_common.py
+++ b/metadata-ingestion/tests/unit/test_usage_common.py
@@ -3,6 +3,7 @@
import pytest
from pydantic import ValidationError
+from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration, get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
@@ -20,11 +21,8 @@
def test_add_one_query_without_columns():
test_email = "test_email@test.com"
test_query = "select * from test"
-
event_time = datetime(2020, 1, 1)
-
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
-
resource = "test_db.test_schema.test_table"
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
@@ -40,16 +38,75 @@ def test_add_one_query_without_columns():
assert len(ta.columnFreq) == 0
-def test_multiple_query_without_columns():
+def test_add_one_query_with_ignored_user():
+ test_email = "test_email@test.com"
+ test_query = "select * from test"
+ event_time = datetime(2020, 1, 1)
+ floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
+ resource = "test_db.test_schema.test_table"
+
+ ta = _TestAggregatedDataset(
+ bucket_start_time=floored_ts,
+ resource=resource,
+ user_email_pattern=AllowDenyPattern(deny=list(["test_email@test.com"])),
+ )
+ ta.add_read_entry(
+ test_email,
+ test_query,
+ [],
+ )
+
+ assert ta.queryCount == 0
+ assert ta.queryFreq[test_query] == 0
+ assert ta.userFreq[test_email] == 0
+ assert len(ta.columnFreq) == 0
+
+
+def test_multiple_query_with_ignored_user():
test_email = "test_email@test.com"
test_email2 = "test_email2@test.com"
test_query = "select * from test"
test_query2 = "select * from test2"
-
event_time = datetime(2020, 1, 1)
-
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
+ resource = "test_db.test_schema.test_table"
+ ta = _TestAggregatedDataset(
+ bucket_start_time=floored_ts,
+ resource=resource,
+ user_email_pattern=AllowDenyPattern(deny=list(["test_email@test.com"])),
+ )
+ ta.add_read_entry(
+ test_email,
+ test_query,
+ [],
+ )
+ ta.add_read_entry(
+ test_email,
+ test_query,
+ [],
+ )
+ ta.add_read_entry(
+ test_email2,
+ test_query2,
+ [],
+ )
+
+ assert ta.queryCount == 1
+ assert ta.queryFreq[test_query] == 0
+ assert ta.userFreq[test_email] == 0
+ assert ta.queryFreq[test_query2] == 1
+ assert ta.userFreq[test_email2] == 1
+ assert len(ta.columnFreq) == 0
+
+
+def test_multiple_query_without_columns():
+ test_email = "test_email@test.com"
+ test_email2 = "test_email2@test.com"
+ test_query = "select * from test"
+ test_query2 = "select * from test2"
+ event_time = datetime(2020, 1, 1)
+ floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
resource = "test_db.test_schema.test_table"
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
@@ -58,13 +115,11 @@ def test_multiple_query_without_columns():
test_query,
[],
)
-
ta.add_read_entry(
test_email,
test_query,
[],
)
-
ta.add_read_entry(
test_email2,
test_query2,
@@ -83,9 +138,7 @@ def test_make_usage_workunit():
test_email = "test_email@test.com"
test_query = "select * from test"
event_time = datetime(2020, 1, 1)
-
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
-
resource = "test_db.test_schema.test_table"
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
@@ -97,6 +150,7 @@ def test_make_usage_workunit():
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY, urn_builder=lambda x: x, top_n_queries=10
)
+
assert wu.id == "2020-01-01T00:00:00-test_db.test_schema.test_table"
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
@@ -110,16 +164,12 @@ def test_query_trimming():
test_query: str = "select * from test where a > 10 and b > 20 order by a asc"
top_n_queries: int = 10
total_budget_for_query_list: int = 200
-
event_time = datetime(2020, 1, 1)
-
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
-
resource = "test_db.test_schema.test_table"
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
ta.total_budget_for_query_list = total_budget_for_query_list
-
ta.add_read_entry(
test_email,
test_query,
@@ -130,6 +180,7 @@ def test_query_trimming():
urn_builder=lambda x: x,
top_n_queries=top_n_queries,
)
+
assert wu.id == "2020-01-01T00:00:00-test_db.test_schema.test_table"
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect