diff --git a/docs/how-to-guides/adding-a-new-offline-store.md b/docs/how-to-guides/adding-a-new-offline-store.md index 06e04de687..27b70421c3 100644 --- a/docs/how-to-guides/adding-a-new-offline-store.md +++ b/docs/how-to-guides/adding-a-new-offline-store.md @@ -54,7 +54,7 @@ There are two methods that deal with reading data from the offline stores`get_hi data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], - event_timestamp_column: str, + timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime) -> RetrievalJob: @@ -63,7 +63,7 @@ There are two methods that deal with reading data from the offline stores`get_hi data_source, join_key_columns, feature_name_columns, - event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column, start_date, end_date) @@ -165,14 +165,14 @@ class CustomFileDataSource(FileSource): """Custom data source class for local files""" def __init__( self, - event_timestamp_column: Optional[str] = "", + timestamp_field: Optional[str] = "", path: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, created_timestamp_column: Optional[str] = "", date_partition_column: Optional[str] = "", ): super(CustomFileDataSource, self).__init__( - event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column, field_mapping, date_partition_column, @@ -189,7 +189,7 @@ class CustomFileDataSource(FileSource): return CustomFileDataSource( field_mapping=dict(data_source.field_mapping), path=path, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) @@ -203,7 +203,7 @@ class CustomFileDataSource(FileSource): ), ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column diff --git a/docs/reference/data-sources/spark.md b/docs/reference/data-sources/spark.md index d0bc495924..2c1d1ec879 100644 --- a/docs/reference/data-sources/spark.md +++ b/docs/reference/data-sources/spark.md @@ -45,7 +45,7 @@ from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import my_spark_source = SparkSource( path=f"{CURRENT_DIR}/data/driver_hourly_stats", file_format="parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) ``` diff --git a/docs/reference/feature-repository/README.md b/docs/reference/feature-repository/README.md index 90a6799c27..aba6ffb408 100644 --- a/docs/reference/feature-repository/README.md +++ b/docs/reference/feature-repository/README.md @@ -26,7 +26,7 @@ Typically, users store their feature repositories in a Git repository, especiall The structure of a feature repository is as follows: * The root of the repository should contain a `feature_store.yaml` file and may contain a `.feastignore` file. -* The repository should contain Python files that contain feature definitions. +* The repository should contain Python files that contain feature definitions. * The repository can contain other files as well, including documentation and potentially data files. An example structure of a feature repository is shown below: @@ -98,7 +98,7 @@ from feast import BigQuerySource, Entity, Feature, FeatureView, ValueType driver_locations_source = BigQuerySource( table_ref="rh_prod.ride_hailing_co.drivers", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) diff --git a/docs/reference/feature-repository/registration-inferencing.md b/docs/reference/feature-repository/registration-inferencing.md index 1bd5f0f1a8..84faf949e1 100644 --- a/docs/reference/feature-repository/registration-inferencing.md +++ b/docs/reference/feature-repository/registration-inferencing.md @@ -2,6 +2,6 @@ ## Overview -* FeatureView - When the `features` parameter is left out of the feature view definition, upon a `feast apply` call, Feast will automatically consider every column in the data source as a feature to be registered other than the specific timestamp columns associated with the underlying data source definition (e.g. event_timestamp_column) and the columns associated with the feature view's entities. -* DataSource - When the `event_timestamp_column` parameter is left out of the data source definition, upon a 'feast apply' call, Feast will automatically find the sole timestamp column in the table underlying the data source and use that as the `event_timestamp_column`. If there are no columns of timestamp type or multiple columns of timestamp type, `feast apply` will throw an exception. +* FeatureView - When the `features` parameter is left out of the feature view definition, upon a `feast apply` call, Feast will automatically consider every column in the data source as a feature to be registered other than the specific timestamp columns associated with the underlying data source definition (e.g. timestamp_field) and the columns associated with the feature view's entities. +* DataSource - When the `timestamp_field` parameter is left out of the data source definition, upon a 'feast apply' call, Feast will automatically find the sole timestamp column in the table underlying the data source and use that as the `timestamp_field`. If there are no columns of timestamp type or multiple columns of timestamp type, `feast apply` will throw an exception. * Entity - When the `value_type` parameter is left out of the entity definition, upon a `feast apply` call, Feast will automatically find the column corresponding with the entity's `join_key` and take that column's data type to be the `value_type`. If the column doesn't exist, `feast apply` will throw an exception. diff --git a/docs/tutorials/validating-historical-features.md b/docs/tutorials/validating-historical-features.md index 19ae4ef434..e9124b08ed 100644 --- a/docs/tutorials/validating-historical-features.md +++ b/docs/tutorials/validating-historical-features.md @@ -1,12 +1,12 @@ # Validating historical features with Great Expectations -In this tutorial, we will use the public dataset of Chicago taxi trips to present data validation capabilities of Feast. -- The original dataset is stored in BigQuery and consists of raw data for each taxi trip (one row per trip) since 2013. +In this tutorial, we will use the public dataset of Chicago taxi trips to present data validation capabilities of Feast. +- The original dataset is stored in BigQuery and consists of raw data for each taxi trip (one row per trip) since 2013. - We will generate several training datasets (aka historical features in Feast) for different periods and evaluate expectations made on one dataset against another. Types of features we're ingesting and generating: -- Features that aggregate raw data with daily intervals (eg, trips per day, average fare or speed for a specific day, etc.). -- Features using SQL while pulling data from BigQuery (like total trips time or total miles travelled). +- Features that aggregate raw data with daily intervals (eg, trips per day, average fare or speed for a specific day, etc.). +- Features using SQL while pulling data from BigQuery (like total trips time or total miles travelled). - Features calculated on the fly when requested using Feast's on-demand transformations Our plan: @@ -31,7 +31,7 @@ Install Feast Python SDK and great expectations: ``` -### 1. Dataset preparation (Optional) +### 1. Dataset preparation (Optional) **You can skip this step if you don't have GCP account. Please use parquet files that are coming with this tutorial instead** @@ -56,15 +56,15 @@ Running some basic aggregations while pulling data from BigQuery. Grouping by ta ```python -data_query = """SELECT +data_query = """SELECT taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day, SUM(trip_miles) as total_miles_travelled, SUM(trip_seconds) as total_trip_seconds, SUM(fare) as total_earned, COUNT(*) as trip_count -FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` -WHERE +FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` +WHERE trip_miles > 0 AND trip_seconds > 60 AND trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31' AND trip_total < 1000 @@ -84,7 +84,7 @@ pyarrow.parquet.write_table(driver_stats_table, "trips_stats.parquet") def entities_query(year): return f"""SELECT distinct taxi_id -FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` +FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE trip_miles > 0 AND trip_seconds > 0 AND trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31' @@ -120,7 +120,7 @@ from google.protobuf.duration_pb2 import Duration ```python batch_source = FileSource( - event_timestamp_column="day", + timestamp_field="day", path="trips_stats.parquet", # using parquet file that we created on previous step file_format=ParquetFormat() ) @@ -141,7 +141,7 @@ trips_stats_fv = FeatureView( Feature("total_trip_seconds", ValueType.DOUBLE), Feature("total_earned", ValueType.DOUBLE), Feature("trip_count", ValueType.INT64), - + ], ttl=Duration(seconds=86400), batch_source=batch_source, @@ -317,8 +317,8 @@ store.create_saved_dataset( Dataset profiler is a function that accepts dataset and generates set of its characteristics. This charasteristics will be then used to evaluate (validate) next datasets. -**Important: datasets are not compared to each other! -Feast use a reference dataset and a profiler function to generate a reference profile. +**Important: datasets are not compared to each other! +Feast use a reference dataset and a profiler function to generate a reference profile. This profile will be then used during validation of the tested dataset.** @@ -523,37 +523,37 @@ def stats_profiler(ds: PandasDataset) -> ExpectationSuite: max_value=60, mostly=0.99 # allow some outliers ) - + ds.expect_column_values_to_be_between( "total_miles_travelled", min_value=0, max_value=500, mostly=0.99 # allow some outliers ) - + # expectation of means based on observed values observed_mean = ds.trip_count.mean() ds.expect_column_mean_to_be_between("trip_count", min_value=observed_mean * (1 - DELTA), max_value=observed_mean * (1 + DELTA)) - + observed_mean = ds.earned_per_hour.mean() ds.expect_column_mean_to_be_between("earned_per_hour", min_value=observed_mean * (1 - DELTA), max_value=observed_mean * (1 + DELTA)) - - + + # expectation of quantiles qs = [0.5, 0.75, 0.9, 0.95] observed_quantiles = ds.avg_fare.quantile(qs) - + ds.expect_column_quantile_values_to_be_between( "avg_fare", quantile_ranges={ "quantiles": qs, "value_ranges": [[None, max_value] for max_value in observed_quantiles] - }) - + }) + return ds.get_expectation_suite() ``` @@ -663,7 +663,7 @@ _ = job.to_df(validation_reference=validation_reference) Validation successfully passed as no exception were raised. -### 5. Validating new historical retrieval +### 5. Validating new historical retrieval Creating new timestamps for Dec 2020: diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index 233593ff02..e39f5d8cee 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -7,7 +7,7 @@ driver_hourly_stats = FileSource( path="data/driver_stats_with_string.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) diff --git a/java/serving/src/test/java/feast/serving/util/DataGenerator.java b/java/serving/src/test/java/feast/serving/util/DataGenerator.java index edbb99b339..261c9ac134 100644 --- a/java/serving/src/test/java/feast/serving/util/DataGenerator.java +++ b/java/serving/src/test/java/feast/serving/util/DataGenerator.java @@ -158,7 +158,7 @@ public static FeatureTableSpec createFeatureTableSpec( .setMaxAge(Duration.newBuilder().setSeconds(3600).build()) .setBatchSource( DataSource.newBuilder() - .setEventTimestampColumn("ts") + .setTimestampField("ts") .setType(DataSource.SourceType.BATCH_FILE) .setFileOptions( FileOptions.newBuilder() @@ -204,7 +204,7 @@ public static DataSource createFileDataSourceSpec( .setType(DataSource.SourceType.BATCH_FILE) .setFileOptions( FileOptions.newBuilder().setFileFormat(createParquetFormat()).setUri(fileURL).build()) - .setEventTimestampColumn(timestampColumn) + .setTimestampField(timestampColumn) .setDatePartitionColumn(datePartitionColumn) .build(); } @@ -215,7 +215,7 @@ public static DataSource createBigQueryDataSourceSpec( .setType(DataSource.SourceType.BATCH_BIGQUERY) .setBigqueryOptions( DataSource.BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build()) - .setEventTimestampColumn(timestampColumn) + .setTimestampField(timestampColumn) .setDatePartitionColumn(datePartitionColumn) .build(); } @@ -230,7 +230,7 @@ public static DataSource createKafkaDataSourceSpec( .setBootstrapServers(servers) .setMessageFormat(createProtoFormat("class.path")) .build()) - .setEventTimestampColumn(timestampColumn) + .setTimestampField(timestampColumn) .build(); } @@ -292,7 +292,7 @@ public static DataSource createKinesisDataSourceSpec( .setStreamName("stream") .setRecordFormat(createProtoFormat(classPath)) .build()) - .setEventTimestampColumn(timestampColumn) + .setTimestampField(timestampColumn) .build(); } diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 24af97e90c..961b5ac188 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -66,7 +66,7 @@ message DataSource { map field_mapping = 2; // Must specify event timestamp column name - string event_timestamp_column = 3; + string timestamp_field = 3; // (Optional) Specify partition column // useful for file sources diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index db9eabfb9f..fee3626c6b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import enum import warnings from abc import ABC, abstractmethod @@ -155,7 +154,7 @@ class DataSource(ABC): Args: name: Name of data source, which should be unique within a project - event_timestamp_column (optional): Event timestamp column used for point in time + timestamp_field (optional): (Deprecated) Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column indicating when the row was created, used for deduplicating rows. @@ -167,10 +166,12 @@ class DataSource(ABC): tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the data source, typically the email of the primary maintainer. + timestamp_field (optional): Event timestamp field used for point in time + joins of feature values. """ name: str - event_timestamp_column: str + timestamp_field: str created_timestamp_column: str field_mapping: Dict[str, str] date_partition_column: str @@ -188,12 +189,13 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", name: Optional[str] = None, + timestamp_field: Optional[str] = None, ): """ Creates a DataSource object. Args: name: Name of data source, which should be unique within a project - event_timestamp_column (optional): Event timestamp column used for point in time + event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column indicating when the row was created, used for deduplicating rows. @@ -205,19 +207,27 @@ def __init__( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the data source, typically the email of the primary maintainer. + timestamp_field (optional): Event timestamp field used for point + in time joins of feature values. """ if not name: warnings.warn( ( "Names for data sources need to be supplied. " - "Data sources without names will no tbe supported after Feast 0.23." + "Data sources without names will not be supported after Feast 0.23." ), UserWarning, ) self.name = name or "" - self.event_timestamp_column = ( - event_timestamp_column if event_timestamp_column else "" - ) + if not timestamp_field and event_timestamp_column: + warnings.warn( + ( + "The argument 'event_timestamp_column' is being deprecated. Please use 'timestamp_field' instead. " + "instead. Feast 0.23 and onwards will not support the argument 'event_timestamp_column' for datasources." + ), + DeprecationWarning, + ) + self.timestamp_field = timestamp_field or event_timestamp_column or "" self.created_timestamp_column = ( created_timestamp_column if created_timestamp_column else "" ) @@ -241,7 +251,7 @@ def __eq__(self, other): if ( self.name != other.name - or self.event_timestamp_column != other.event_timestamp_column + or self.timestamp_field != other.timestamp_field or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column @@ -347,6 +357,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = "", ): super().__init__( event_timestamp_column=event_timestamp_column, @@ -357,6 +368,7 @@ def __init__( tags=tags, owner=owner, name=name, + timestamp_field=timestamp_field, ) self.kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, @@ -384,14 +396,15 @@ def __eq__(self, other): def from_proto(data_source: DataSourceProto): return KafkaSource( name=data_source.name, + event_timestamp_column=data_source.timestamp_field, field_mapping=dict(data_source.field_mapping), bootstrap_servers=data_source.kafka_options.bootstrap_servers, message_format=StreamFormat.from_proto( data_source.kafka_options.message_format ), topic=data_source.kafka_options.topic, - event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, + timestamp_field=data_source.timestamp_field, date_partition_column=data_source.date_partition_column, description=data_source.description, tags=dict(data_source.tags), @@ -409,7 +422,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto @@ -509,14 +522,15 @@ def get_table_column_names_and_types( def from_proto(data_source: DataSourceProto): return KinesisSource( name=data_source.name, + event_timestamp_column=data_source.timestamp_field, field_mapping=dict(data_source.field_mapping), record_format=StreamFormat.from_proto( data_source.kinesis_options.record_format ), region=data_source.kinesis_options.region, stream_name=data_source.kinesis_options.stream_name, - event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, + timestamp_field=data_source.timestamp_field, date_partition_column=data_source.date_partition_column, description=data_source.description, tags=dict(data_source.tags), @@ -543,6 +557,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = "", ): super().__init__( name=name, @@ -553,6 +568,7 @@ def __init__( description=description, tags=tags, owner=owner, + timestamp_field=timestamp_field, ) self.kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name @@ -588,7 +604,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column @@ -603,7 +619,7 @@ class PushSource(DataSource): name: str schema: Dict[str, ValueType] batch_source: DataSource - event_timestamp_column: str + timestamp_field: str def __init__( self, @@ -614,6 +630,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = "", ): """ Creates a PushSource object. @@ -622,23 +639,33 @@ def __init__( schema: Schema mapping from the input feature name to a ValueType batch_source: The batch source that backs this push source. It's used when materializing from the offline store to the online store, and when retrieving historical features. - event_timestamp_column (optional): Event timestamp column used for point in time + event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time joins of feature values. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the data source, typically the email of the primary maintainer. + timestamp_field (optional): Event timestamp foe;d used for point in time + joins of feature values. + """ super().__init__(name=name, description=description, tags=tags, owner=owner) self.schema = schema self.batch_source = batch_source if not self.batch_source: raise ValueError(f"batch_source is needed for push source {self.name}") - self.event_timestamp_column = event_timestamp_column - if not self.event_timestamp_column: - raise ValueError( - f"event_timestamp_column is needed for push source {self.name}" + if not timestamp_field and event_timestamp_column: + warnings.warn( + ( + "The argument 'event_timestamp_column' is being deprecated. Please use 'timestamp_field' instead. " + "instead. Feast 0.23 and onwards will not support the argument 'event_timestamp_column' for datasources." + ), + DeprecationWarning, ) + self.timestamp_field = timestamp_field or event_timestamp_column + + if not self.timestamp_field: + raise ValueError(f"timestamp field is needed for push source {self.name}") def validate(self, config: RepoConfig): pass @@ -662,7 +689,7 @@ def from_proto(data_source: DataSourceProto): name=data_source.name, schema=schema, batch_source=batch_source, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, @@ -683,7 +710,7 @@ def to_proto(self) -> DataSourceProto: name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options, - event_timestamp_column=self.event_timestamp_column, + timestamp_field=self.timestamp_field, description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b060d286fd..6f4de5473b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -521,7 +521,7 @@ def _plan( >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", - ... event_timestamp_column="event_timestamp", + ... timestamp_field="event_timestamp", ... created_timestamp_column="created", ... ) >>> driver_hourly_stats_view = FeatureView( @@ -632,7 +632,7 @@ def apply( >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", - ... event_timestamp_column="event_timestamp", + ... timestamp_field="event_timestamp", ... created_timestamp_column="created", ... ) >>> driver_hourly_stats_view = FeatureView( diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 51c4e9d78e..3cf5614975 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -80,10 +80,7 @@ def update_data_sources_with_inferred_event_timestamp_col( for data_source in data_sources: if isinstance(data_source, RequestDataSource): continue - if ( - data_source.event_timestamp_column is None - or data_source.event_timestamp_column == "" - ): + if data_source.timestamp_field is None or data_source.timestamp_field == "": # prepare right match pattern for data source ts_column_type_regex_pattern = "" # TODO(adchia): Move Spark source inference out of this logic @@ -102,7 +99,7 @@ def update_data_sources_with_inferred_event_timestamp_col( raise RegistryInferenceFailure( "DataSource", f""" - DataSource inferencing of event_timestamp_column is currently only supported + DataSource inferencing of timestamp_field is currently only supported for FileSource, SparkSource, BigQuerySource, RedshiftSource, and SnowflakeSource. Attempting to infer from {data_source}. """, @@ -117,7 +114,7 @@ def update_data_sources_with_inferred_event_timestamp_col( ) # loop through table columns to find singular match - event_timestamp_column, matched_flag = None, False + timestamp_field, matched_flag = None, False for ( col_name, col_datatype, @@ -132,10 +129,10 @@ def update_data_sources_with_inferred_event_timestamp_col( """, ) matched_flag = True - event_timestamp_column = col_name + timestamp_field = col_name if matched_flag: - assert event_timestamp_column - data_source.event_timestamp_column = event_timestamp_column + assert timestamp_field + data_source.timestamp_field = timestamp_field else: raise RegistryInferenceFailure( "DataSource", @@ -163,17 +160,15 @@ def update_feature_views_with_inferred_features( for fv in fvs: if not fv.features: columns_to_exclude = { - fv.batch_source.event_timestamp_column, + fv.batch_source.timestamp_field, fv.batch_source.created_timestamp_column, } | { entity_name_to_join_key_map[entity_name] for entity_name in fv.entities } - if fv.batch_source.event_timestamp_column in fv.batch_source.field_mapping: + if fv.batch_source.timestamp_field in fv.batch_source.field_mapping: columns_to_exclude.add( - fv.batch_source.field_mapping[ - fv.batch_source.event_timestamp_column - ] + fv.batch_source.field_mapping[fv.batch_source.timestamp_field] ) if ( fv.batch_source.created_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index a29b0eb81a..eca9bdf21e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -27,13 +27,14 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = None, ): """Create a BigQuerySource from an existing table or query. Args: table (optional): The BigQuery table where features can be found. table_ref (optional): (Deprecated) The BigQuery table where features can be found. - event_timestamp_column: Event timestamp column used for point in time joins of feature values. + event_timestamp_column: (Deprecated) Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. @@ -44,6 +45,8 @@ def __init__( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the bigquery source, typically the email of the primary maintainer. + timestamp_field (optional): Event timestamp field used for point in time + joins of feature values. Example: >>> from feast import BigQuerySource >>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table") @@ -93,6 +96,7 @@ def __init__( description=description, tags=tags, owner=owner, + timestamp_field=timestamp_field, ) # Note: Python requires redefining hash in child classes that override __eq__ @@ -109,7 +113,7 @@ def __eq__(self, other): self.name == other.name and self.bigquery_options.table_ref == other.bigquery_options.table_ref and self.bigquery_options.query == other.bigquery_options.query - and self.event_timestamp_column == other.event_timestamp_column + and self.timestamp_field == other.timestamp_field and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping and self.description == other.description @@ -134,7 +138,7 @@ def from_proto(data_source: DataSourceProto): name=data_source.name, field_mapping=dict(data_source.field_mapping), table_ref=data_source.bigquery_options.table_ref, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, query=data_source.bigquery_options.query, description=data_source.description, @@ -153,7 +157,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column return data_source_proto diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 37f33bb20e..b9d3d57301 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -43,6 +43,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = None, ): # If no name, use the table_ref as the default name _name = name @@ -69,6 +70,7 @@ def __init__( description=description, tags=tags, owner=owner, + timestamp_field=timestamp_field, ) warnings.warn( "The spark data source API is an experimental feature in alpha development. " @@ -137,7 +139,7 @@ def from_proto(data_source: DataSourceProto) -> Any: query=spark_options.query, path=spark_options.path, file_format=spark_options.file_format, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, description=data_source.description, tags=dict(data_source.tags), @@ -155,7 +157,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column return data_source_proto diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 383adaaf5d..cb6e874f8a 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -186,9 +186,7 @@ def evaluate_historical_retrieval(): # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): - event_timestamp_column = ( - feature_view.batch_source.event_timestamp_column - ) + event_timestamp_column = feature_view.batch_source.timestamp_field created_timestamp_column = ( feature_view.batch_source.created_timestamp_column ) diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 64d8be06af..12ff0437bf 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -31,6 +31,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = "", ): """Create a FileSource from a file containing feature data. Only Parquet format supported. @@ -38,7 +39,7 @@ def __init__( path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and feature columns. - event_timestamp_column: Event timestamp column used for point in time joins of feature values. + event_timestamp_column(optional): (Deprecated) Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table @@ -50,10 +51,12 @@ def __init__( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the file source, typically the email of the primary maintainer. + timestamp_field (optional): Event timestamp foe;d used for point in time + joins of feature values. Examples: >>> from feast import FileSource - >>> file_source = FileSource(path="my_features.parquet", event_timestamp_column="event_timestamp") + >>> file_source = FileSource(path="my_features.parquet", timestamp_field="event_timestamp") """ if path is None: raise ValueError( @@ -83,6 +86,7 @@ def __init__( description=description, tags=tags, owner=owner, + timestamp_field=timestamp_field, ) # Note: Python requires redefining hash in child classes that override __eq__ @@ -96,7 +100,7 @@ def __eq__(self, other): return ( self.name == other.name and self.file_options.file_format == other.file_options.file_format - and self.event_timestamp_column == other.event_timestamp_column + and self.timestamp_field == other.timestamp_field and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping and self.file_options.s3_endpoint_override @@ -120,7 +124,7 @@ def from_proto(data_source: DataSourceProto): field_mapping=dict(data_source.field_mapping), file_format=FileFormat.from_proto(data_source.file_options.file_format), path=data_source.file_options.uri, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, s3_endpoint_override=data_source.file_options.s3_endpoint_override, description=data_source.description, @@ -139,7 +143,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column return data_source_proto diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 68440df205..c55e91caf8 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -130,7 +130,7 @@ def get_feature_view_query_context( else: ttl_seconds = 0 - event_timestamp_column = feature_view.batch_source.event_timestamp_column + timestamp_field = feature_view.batch_source.timestamp_field created_timestamp_column = feature_view.batch_source.created_timestamp_column min_event_timestamp = None @@ -150,7 +150,7 @@ def get_feature_view_query_context( ], field_mapping=feature_view.batch_source.field_mapping, event_timestamp_column=reverse_field_mapping.get( - event_timestamp_column, event_timestamp_column + timestamp_field, timestamp_field ), created_timestamp_column=reverse_field_mapping.get( created_timestamp_column, created_timestamp_column diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 5beb50a2fb..f099e307cc 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -28,12 +28,13 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", database: Optional[str] = "", + timestamp_field: Optional[str] = "", ): """ Creates a RedshiftSource object. Args: - event_timestamp_column (optional): Event timestamp column used for point in + event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time joins of feature values. table (optional): Redshift table where the features are stored. schema (optional): Redshift schema in which the table is located. @@ -49,6 +50,8 @@ def __init__( owner (optional): The owner of the redshift source, typically the email of the primary maintainer. database (optional): The Redshift database name. + timestamp_field (optional): Event timestamp field used for point in time + joins of feature values. """ # The default Redshift schema is named "public". _schema = "public" if table and not schema else schema @@ -87,6 +90,7 @@ def __init__( description=description, tags=tags, owner=owner, + timestamp_field=timestamp_field, ) @staticmethod @@ -104,7 +108,7 @@ def from_proto(data_source: DataSourceProto): field_mapping=dict(data_source.field_mapping), table=data_source.redshift_options.table, schema=data_source.redshift_options.schema, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, query=data_source.redshift_options.query, description=data_source.description, @@ -129,7 +133,7 @@ def __eq__(self, other): and self.redshift_options.schema == other.redshift_options.schema and self.redshift_options.query == other.redshift_options.query and self.redshift_options.database == other.redshift_options.database - and self.event_timestamp_column == other.event_timestamp_column + and self.timestamp_field == other.timestamp_field and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping and self.description == other.description @@ -173,7 +177,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column return data_source_proto diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 839e090f02..1d24cba44a 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -28,6 +28,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", + timestamp_field: Optional[str] = "", ): """ Creates a SnowflakeSource object. @@ -37,7 +38,7 @@ def __init__( warehouse (optional): Snowflake warehouse where the database is stored. schema (optional): Snowflake schema in which the table is located. table (optional): Snowflake table where the features are stored. - event_timestamp_column (optional): Event timestamp column used for point in + event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time joins of feature values. query (optional): The query to be executed to obtain the features. created_timestamp_column (optional): Timestamp column indicating when the @@ -95,6 +96,7 @@ def __init__( description=description, tags=tags, owner=owner, + timestamp_field=timestamp_field, ) @staticmethod @@ -114,7 +116,7 @@ def from_proto(data_source: DataSourceProto): schema=data_source.snowflake_options.schema, table=data_source.snowflake_options.table, warehouse=data_source.snowflake_options.warehouse, - event_timestamp_column=data_source.event_timestamp_column, + timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, query=data_source.snowflake_options.query, description=data_source.description, @@ -139,7 +141,7 @@ def __eq__(self, other): and self.snowflake_options.table == other.snowflake_options.table and self.snowflake_options.query == other.snowflake_options.query and self.snowflake_options.warehouse == other.snowflake_options.warehouse - and self.event_timestamp_column == other.event_timestamp_column + and self.timestamp_field == other.timestamp_field and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping and self.description == other.description @@ -188,7 +190,7 @@ def to_proto(self) -> DataSourceProto: owner=self.owner, ) - data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column return data_source_proto diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 8e0f8c7803..b1d4ea39f4 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -258,7 +258,7 @@ def _get_column_names( the query to the offline store. """ # if we have mapped fields, use the original field names in the call to the offline store - event_timestamp_column = feature_view.batch_source.event_timestamp_column + event_timestamp_column = feature_view.batch_source.timestamp_field feature_names = [feature.name for feature in feature_view.features] created_timestamp_column = feature_view.batch_source.created_timestamp_column join_keys = [ @@ -382,7 +382,7 @@ def _convert_arrow_to_proto( event_timestamps = [ _coerce_datetime(val) for val in pandas.to_datetime( - table.column(feature_view.batch_source.event_timestamp_column).to_numpy( + table.column(feature_view.batch_source.timestamp_field).to_numpy( zero_copy_only=False ) ) diff --git a/sdk/python/feast/templates/aws/driver_repo.py b/sdk/python/feast/templates/aws/driver_repo.py index 69cc21a647..8c2f884490 100644 --- a/sdk/python/feast/templates/aws/driver_repo.py +++ b/sdk/python/feast/templates/aws/driver_repo.py @@ -23,7 +23,7 @@ table="feast_driver_hourly_stats", # The event timestamp is used for point-in-time joins and for ensuring only # features within the TTL are returned - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", # The (optional) created timestamp is used to ensure there are no duplicate # feature rows in the offline store or when building training datasets created_timestamp_column="created", diff --git a/sdk/python/feast/templates/gcp/driver_repo.py b/sdk/python/feast/templates/gcp/driver_repo.py index 003cd9fa25..e2e1250c62 100644 --- a/sdk/python/feast/templates/gcp/driver_repo.py +++ b/sdk/python/feast/templates/gcp/driver_repo.py @@ -23,7 +23,7 @@ table_ref="feast-oss.demo_data.driver_hourly_stats_2", # The event timestamp is used for point-in-time joins and for ensuring only # features within the TTL are returned - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", # The (optional) created timestamp is used to ensure there are no duplicate # feature rows in the offline store or when building training datasets created_timestamp_column="created", diff --git a/sdk/python/feast/templates/local/example.py b/sdk/python/feast/templates/local/example.py index 6ab549b8c5..4ce2533325 100644 --- a/sdk/python/feast/templates/local/example.py +++ b/sdk/python/feast/templates/local/example.py @@ -9,7 +9,7 @@ # for more info. driver_hourly_stats = FileSource( path="%PARQUET_PATH%", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/sdk/python/feast/templates/snowflake/driver_repo.py b/sdk/python/feast/templates/snowflake/driver_repo.py index 0ecdad7f05..b05080038c 100644 --- a/sdk/python/feast/templates/snowflake/driver_repo.py +++ b/sdk/python/feast/templates/snowflake/driver_repo.py @@ -27,7 +27,7 @@ warehouse="SNOWFLAKE_WAREHOUSE", # The event timestamp is used for point-in-time joins and for ensuring only # features within the TTL are returned - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", # The (optional) created timestamp is used to ensure there are no duplicate # feature rows in the offline store or when building training datasets created_timestamp_column="created", diff --git a/sdk/python/feast/templates/spark/example.py b/sdk/python/feast/templates/spark/example.py index b1741fd688..ce565923d8 100644 --- a/sdk/python/feast/templates/spark/example.py +++ b/sdk/python/feast/templates/spark/example.py @@ -25,14 +25,14 @@ name="driver_hourly_stats", path=f"{CURRENT_DIR}/data/driver_hourly_stats.parquet", file_format="parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) customer_daily_profile = SparkSource( name="customer_daily_profile", path=f"{CURRENT_DIR}/data/customer_daily_profile.parquet", file_format="parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index bf3a09db1e..7d29c406e9 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -21,7 +21,7 @@ def setup_feature_store(): ) driver_hourly_stats = FileSource( path="feature_repo/data/driver_stats.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) driver_hourly_stats_view = FeatureView( diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index 2b39cb421f..db9a0ff991 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -12,31 +12,30 @@ driver_locations_source = BigQuerySource( table="feast-oss.public.drivers", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) driver_locations_source_query = BigQuerySource( query="SELECT * from feast-oss.public.drivers", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) driver_locations_source_query_2 = BigQuerySource( query="SELECT lat * 2 FROM feast-oss.public.drivers", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) customer_profile_source = BigQuerySource( name="customer_profile_source", table_ref="feast-oss.public.customers", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", ) customer_driver_combined_source = BigQuerySource( - table_ref="feast-oss.public.customer_driver", - event_timestamp_column="event_timestamp", + table_ref="feast-oss.public.customer_driver", timestamp_field="event_timestamp", ) driver_locations_push_source = PushSource( diff --git a/sdk/python/tests/example_repos/example_feature_repo_2.py b/sdk/python/tests/example_repos/example_feature_repo_2.py index 96da67ac92..fe95291bda 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_2.py +++ b/sdk/python/tests/example_repos/example_feature_repo_2.py @@ -4,7 +4,7 @@ driver_hourly_stats = FileSource( path="%PARQUET_PATH%", # placeholder to be replaced by the test - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) @@ -28,7 +28,7 @@ global_daily_stats = FileSource( path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py index bdabd14d2a..110ea163de 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py @@ -4,7 +4,7 @@ driver_hourly_stats = FileSource( path="%PARQUET_PATH%", # placeholder to be replaced by the test - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 3029b2e774..1774a3eff9 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -209,37 +209,37 @@ def construct_universal_data_sources( customer_ds = data_source_creator.create_data_source( datasets.customer_df, destination_name="customer_profile", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) driver_ds = data_source_creator.create_data_source( datasets.driver_df, destination_name="driver_hourly", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) location_ds = data_source_creator.create_data_source( datasets.location_df, destination_name="location_hourly", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) orders_ds = data_source_creator.create_data_source( datasets.orders_df, destination_name="orders", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column=None, ) global_ds = data_source_creator.create_data_source( datasets.global_df, destination_name="global", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) field_mapping_ds = data_source_creator.create_data_source( datasets.field_mapping_df, destination_name="field_mapping", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", field_mapping={"column_name": "feature_name"}, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index dcefa29df1..2a13cff3be 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, Optional import pandas as pd @@ -17,6 +17,7 @@ def create_data_source( event_timestamp_column="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, + timestamp_field: Optional[str] = None, ) -> DataSource: """ Create a data source based on the dataframe. Implementing this method requires the underlying implementation to @@ -27,9 +28,11 @@ def create_data_source( df: The dataframe to be used to create the data source. destination_name: This str is used by the implementing classes to isolate the multiple dataframes from each other. - event_timestamp_column: Pass through for the underlying data source. + event_timestamp_column: (Deprecated) Pass through for the underlying data source. created_timestamp_column: Pass through for the underlying data source. field_mapping: Pass through for the underlying data source. + timestamp_field: (Deprecated) Pass through for the underlying data source. + Returns: A Data source object, pointing to a table or file that is uploaded/persisted for the purpose of the diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 3b8fd9410a..cb7113bf66 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -53,7 +53,7 @@ def create_data_source( self, df: pd.DataFrame, destination_name: str, - event_timestamp_column="ts", + timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, **kwargs, @@ -75,7 +75,7 @@ def create_data_source( return BigQuerySource( table_ref=destination_name, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index afc974b843..4ae067728c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -29,7 +29,7 @@ def create_data_source( self, df: pd.DataFrame, destination_name: str, - event_timestamp_column="ts", + timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, ) -> DataSource: @@ -46,7 +46,7 @@ def create_data_source( return FileSource( file_format=ParquetFormat(), path=f"{f.name}", - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, ) @@ -113,7 +113,7 @@ def create_data_source( df: pd.DataFrame, destination_name: Optional[str] = None, suffix: Optional[str] = None, - event_timestamp_column="ts", + timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, ) -> DataSource: @@ -127,7 +127,7 @@ def create_data_source( return FileSource( file_format=ParquetFormat(), path=f"s3://{self.bucket}/{filename}", - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, s3_endpoint_override=f"http://{host}:{port}", diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index c1a31db94b..db007d83ad 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -38,7 +38,7 @@ def create_data_source( df: pd.DataFrame, destination_name: str, suffix: Optional[str] = None, - event_timestamp_column="ts", + timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, ) -> DataSource: @@ -61,7 +61,7 @@ def create_data_source( return RedshiftSource( table=destination_name, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, database=self.offline_store_config.database, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index f7a677303a..5be3b7383e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -38,7 +38,7 @@ def create_data_source( df: pd.DataFrame, destination_name: str, suffix: Optional[str] = None, - event_timestamp_column="ts", + timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, ) -> DataSource: @@ -53,7 +53,7 @@ def create_data_source( return SnowflakeSource( table=destination_name, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, warehouse=self.offline_store_config.warehouse, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py index ecefba7acb..2724db50a6 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/spark_data_source_creator.py @@ -59,23 +59,21 @@ def create_data_source( self, df: pd.DataFrame, destination_name: str, - event_timestamp_column="ts", + timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, **kwargs, ) -> DataSource: - if event_timestamp_column in df: - df[event_timestamp_column] = pd.to_datetime( - df[event_timestamp_column], utc=True - ) + if timestamp_field in df: + df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True) # Make sure the field mapping is correct and convert the datetime datasources. if field_mapping: timestamp_mapping = {value: key for key, value in field_mapping.items()} if ( - event_timestamp_column in timestamp_mapping - and timestamp_mapping[event_timestamp_column] in df + timestamp_field in timestamp_mapping + and timestamp_mapping[timestamp_field] in df ): - col = timestamp_mapping[event_timestamp_column] + col = timestamp_mapping[timestamp_field] df[col] = pd.to_datetime(df[col], utc=True) destination_name = self.get_prefixed_table_name(destination_name) if not self.spark_session: @@ -93,7 +91,7 @@ def create_data_source( return SparkSource( table=destination_name, - event_timestamp_column=event_timestamp_column, + timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, # maps certain column names to other names field_mapping=field_mapping or {"ts_1": "ts"}, diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 43081333d9..be354058d0 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -93,23 +93,23 @@ def get_expected_training_df( ): # Convert all pandas dataframes into records with UTC timestamps customer_records = convert_timestamp_records_to_utc( - customer_df.to_dict("records"), customer_fv.batch_source.event_timestamp_column + customer_df.to_dict("records"), customer_fv.batch_source.timestamp_field ) driver_records = convert_timestamp_records_to_utc( - driver_df.to_dict("records"), driver_fv.batch_source.event_timestamp_column + driver_df.to_dict("records"), driver_fv.batch_source.timestamp_field ) order_records = convert_timestamp_records_to_utc( orders_df.to_dict("records"), event_timestamp ) location_records = convert_timestamp_records_to_utc( - location_df.to_dict("records"), location_fv.batch_source.event_timestamp_column + location_df.to_dict("records"), location_fv.batch_source.timestamp_field ) global_records = convert_timestamp_records_to_utc( - global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column + global_df.to_dict("records"), global_fv.batch_source.timestamp_field ) field_mapping_records = convert_timestamp_records_to_utc( field_mapping_df.to_dict("records"), - field_mapping_fv.batch_source.event_timestamp_column, + field_mapping_fv.batch_source.timestamp_field, ) entity_rows = convert_timestamp_records_to_utc( entity_df.to_dict("records"), event_timestamp @@ -120,7 +120,7 @@ def get_expected_training_df( for entity_row in entity_rows: customer_record = find_asof_record( customer_records, - ts_key=customer_fv.batch_source.event_timestamp_column, + ts_key=customer_fv.batch_source.timestamp_field, ts_start=entity_row[event_timestamp] - customer_fv.ttl, ts_end=entity_row[event_timestamp], filter_keys=["customer_id"], @@ -128,7 +128,7 @@ def get_expected_training_df( ) driver_record = find_asof_record( driver_records, - ts_key=driver_fv.batch_source.event_timestamp_column, + ts_key=driver_fv.batch_source.timestamp_field, ts_start=entity_row[event_timestamp] - driver_fv.ttl, ts_end=entity_row[event_timestamp], filter_keys=["driver_id"], @@ -136,7 +136,7 @@ def get_expected_training_df( ) order_record = find_asof_record( order_records, - ts_key=customer_fv.batch_source.event_timestamp_column, + ts_key=customer_fv.batch_source.timestamp_field, ts_start=entity_row[event_timestamp] - order_fv.ttl, ts_end=entity_row[event_timestamp], filter_keys=["customer_id", "driver_id"], @@ -144,7 +144,7 @@ def get_expected_training_df( ) origin_record = find_asof_record( location_records, - ts_key=location_fv.batch_source.event_timestamp_column, + ts_key=location_fv.batch_source.timestamp_field, ts_start=order_record[event_timestamp] - location_fv.ttl, ts_end=order_record[event_timestamp], filter_keys=["location_id"], @@ -152,7 +152,7 @@ def get_expected_training_df( ) destination_record = find_asof_record( location_records, - ts_key=location_fv.batch_source.event_timestamp_column, + ts_key=location_fv.batch_source.timestamp_field, ts_start=order_record[event_timestamp] - location_fv.ttl, ts_end=order_record[event_timestamp], filter_keys=["location_id"], @@ -160,14 +160,14 @@ def get_expected_training_df( ) global_record = find_asof_record( global_records, - ts_key=global_fv.batch_source.event_timestamp_column, + ts_key=global_fv.batch_source.timestamp_field, ts_start=order_record[event_timestamp] - global_fv.ttl, ts_end=order_record[event_timestamp], ) field_mapping_record = find_asof_record( field_mapping_records, - ts_key=field_mapping_fv.batch_source.event_timestamp_column, + ts_key=field_mapping_fv.batch_source.timestamp_field, ts_start=order_record[event_timestamp] - field_mapping_fv.ttl, ts_end=order_record[event_timestamp], ) @@ -680,7 +680,7 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm driver_stats_data_source = environment.data_source_creator.create_data_source( df=driver_stats_df, destination_name=f"test_driver_stats_{int(time.time_ns())}_{random.randint(1000, 9999)}", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index c7345d3f4d..a1eab11070 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -167,7 +167,7 @@ def test_apply_feature_view_success(test_feature_store): batch_source = FileSource( file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", date_partition_column="date_partition_col", ) @@ -290,7 +290,7 @@ def test_apply_feature_view_integration(test_feature_store): batch_source = FileSource( file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", date_partition_column="date_partition_col", ) @@ -359,7 +359,7 @@ def test_apply_object_and_read(test_feature_store): batch_source = FileSource( file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", ) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 54af9f6ff2..19511220b7 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -127,14 +127,14 @@ def test_update_file_data_source_with_inferred_event_timestamp_col(simple_datase data_sources, RepoConfig(provider="local", project="test") ) actual_event_timestamp_cols = [ - source.event_timestamp_column for source in data_sources + source.timestamp_field for source in data_sources ] assert actual_event_timestamp_cols == ["ts_1", "ts_1", "ts_1"] with prep_file_source(df=df_with_two_viable_timestamp_cols) as file_source: with pytest.raises(RegistryInferenceFailure): - # two viable event_timestamp_columns + # two viable timestamp_fields update_data_sources_with_inferred_event_timestamp_col( [file_source], RepoConfig(provider="local", project="test") ) @@ -146,15 +146,16 @@ def test_update_data_sources_with_inferred_event_timestamp_col(universal_data_so (_, _, data_sources) = universal_data_sources data_sources_copy = deepcopy(data_sources) - # remove defined event_timestamp_column to allow for inference + # remove defined timestamp_field to allow for inference for data_source in data_sources_copy.values(): + data_source.timestamp_field = None data_source.event_timestamp_column = None update_data_sources_with_inferred_event_timestamp_col( data_sources_copy.values(), RepoConfig(provider="local", project="test"), ) actual_event_timestamp_cols = [ - source.event_timestamp_column for source in data_sources_copy.values() + source.timestamp_field for source in data_sources_copy.values() ] assert actual_event_timestamp_cols == ["event_timestamp"] * len( diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 88369ba76b..0d10f3aedc 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -167,7 +167,7 @@ def test_apply_feature_view_success(test_registry): batch_source = FileSource( file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", ) @@ -240,7 +240,7 @@ def test_modify_feature_views_success(test_registry): batch_source = FileSource( file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", ) @@ -360,7 +360,7 @@ def test_apply_feature_view_integration(test_registry): batch_source = FileSource( file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", ) @@ -435,7 +435,7 @@ def test_apply_data_source(test_registry: Registry): name="test_source", file_format=ParquetFormat(), path="file://feast/*", - event_timestamp_column="ts_col", + timestamp_field="ts_col", created_timestamp_column="timestamp", ) @@ -469,7 +469,7 @@ def test_apply_data_source(test_registry: Registry): assert registry_data_source == batch_source # Check that change to batch source propagates - batch_source.event_timestamp_column = "new_ts_col" + batch_source.timestamp_field = "new_ts_col" test_registry.apply_data_source(batch_source, project, commit=False) test_registry.apply_feature_view(fv1, project, commit=True) registry_feature_views = test_registry.list_feature_views(project) diff --git a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py index e8cc1552e2..1ad3917395 100644 --- a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py +++ b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py @@ -55,7 +55,7 @@ def test_infer_odfv_list_features(environment, infer_features, tmp_path): items_df.to_parquet(output_path) fake_items_src = FileSource( path=output_path, - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) items = create_item_embeddings_feature_view(fake_items_src) diff --git a/sdk/python/tests/integration/scaffolding/test_partial_apply.py b/sdk/python/tests/integration/scaffolding/test_partial_apply.py index ce06e26e4f..c66529080f 100644 --- a/sdk/python/tests/integration/scaffolding/test_partial_apply.py +++ b/sdk/python/tests/integration/scaffolding/test_partial_apply.py @@ -21,7 +21,7 @@ def test_partial() -> None: driver_locations_source = BigQuerySource( table_ref="feast-oss.public.drivers", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 5a5baceef0..d98c047cd6 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -18,7 +18,7 @@ def prep_file_source(df, event_timestamp_column=None) -> Iterator[FileSource]: file_source = FileSource( file_format=ParquetFormat(), path=f.name, - event_timestamp_column=event_timestamp_column, + timestamp_field=event_timestamp_column, ) yield file_source @@ -42,9 +42,7 @@ def simple_bq_source_using_table_ref_arg( job = client.load_table_from_dataframe(df, table_ref) job.result() - return BigQuerySource( - table_ref=table_ref, event_timestamp_column=event_timestamp_column, - ) + return BigQuerySource(table_ref=table_ref, timestamp_field=event_timestamp_column,) def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource: @@ -54,5 +52,5 @@ def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuer return BigQuerySource( name=bq_source_using_table_ref.table_ref, query=f"SELECT * FROM {bq_source_using_table_ref.table_ref}", - event_timestamp_column=event_timestamp_column, + timestamp_field=event_timestamp_column, ) diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 1b7b15ac96..77d3fe5838 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -37,7 +37,7 @@ def create_driver_hourly_stats_feature_view(source): def create_driver_hourly_stats_source(parquet_path): return FileSource( path=parquet_path, - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/ui/feature_repo/features.py b/ui/feature_repo/features.py index 8d9f5c66f1..e27d0f50d3 100644 --- a/ui/feature_repo/features.py +++ b/ui/feature_repo/features.py @@ -23,7 +23,7 @@ zipcode_source = FileSource( name="zipcode", path="data/zipcode_table.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) @@ -96,7 +96,7 @@ credit_history_source = FileSource( name="credit_history", path="data/credit_history.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", )