Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Sequential / Session-based recommendation and time series support - Group by sorting values by timestamp #641

Closed
gabrielspmoreira opened this issue Mar 8, 2021 · 6 comments

Comments

@gabrielspmoreira
Copy link
Member

gabrielspmoreira commented Mar 8, 2021

Is your feature request related to a problem? Please describe.
In order to support session-based / sequence recommendation and also time series use cases, we need NVTabular to be able to group data by some columns (e.g. user id, session id), sorted by another column (e.g. usually timestamp), and aggregate other columns with these different aggregation functions that takes order into account: 'list', 'first', 'last'

Describe the solution you'd like

groupby_features = ['user_id', 'session_id', 'timestamp', 'prod_id', 'prod_categ'] >> ops.Groupby(
    groupby_cols=['user_id', 'session_id'], sort_column=['timestamp'], 
    aggregated_cols={'prod_id': 'list',
                   'prod_categ': 'list',
                   'timestamp': ['first', 'last']}
)
processor = nvtabular.Workflow(groupby_features)

Describe alternatives you've considered
It might be ok is some NVT ops do not support the resulting list columns, like Categorify and Standardize, because those ops can be done before the grouping.
But the LambdaOp should support list columns to allow, for example, to extract the length of the lists or truncate the lists.

Additional context
This feature can be accomplished by different data frame frameworks:

Pandas
In this example, the preliminary sorting of the rows is respected by the group by, so that the aggregated columns ('item_id') will be sorted by timestamp.

interactions_df.sort_values('timestamp', inplace=True)
sessions_df = interactions_df.groupby('session_id').agg('item_id': 'list')

P.s. cudf / dask_cudf also supports the 'list' aggregation function. But it does not guarantee that the data frame ordering will be respected, as pandas does.

PySpark

import pyspark.sql.functions as F

# This window ensures the sorting of aggregated columns by another column: timestamp
session_window = Window.partitionBy('user_session').orderBy('timestamp')

df.select('user_session',                        
                        F.collect_list('product_idx').over(session_window).alias('product_seq'), 
                        F.first('event_time_ts').over(session_window).alias('session_start_ts'),
                        F.last('event_time_ts').over(session_window).alias('session_end_ts'),
                        ) \
                .groupBy('user_session').agg(
                    # The F.max() is necessary for the list column because F.collect_list() will create an incremental
                    # list of values for each value, sorted by time, and F.max() returns the longest list (containing all items of the sequence).
                    F.max('product_seq').alias('product_seq'),
                    F.min('session_start_ts').alias('session_start_ts'),
                    F.max('session_end_ts').alias('session_end_ts'),
                 )

This issue was extracted from #355 , which is broader in scope, so that it is implemented independently.
Other related issues: #92 #325

@kkranen
Copy link
Contributor

kkranen commented Mar 8, 2021

This feature is essential in our time series workflows as well. In our core workflow, the core pseudocode looks like this:

df = load_some_df()
df['primary_id'] = flatten_ids(['ids','to','combine'])
df.sort_values('time_id')
df.groupby('primary_id')

The changes proposed here would cover this case.
We also allow for per-group scaling down to the standard distribution. This would likely also be relevant to session based recommendation (scaling values by user or other key)

@kkranen
Copy link
Contributor

kkranen commented Mar 8, 2021

On another note, on the dataloading side, it will likely also be pertinent to allow for sliding-window access-time tensor building on the collected lists. As an example (ignoring NVT conventions to illustrate the point):

list = [10,20,30]
session_loader = nvt.session_loader(list,window=2)
session_loader[0] #Result is [10,20]
session_loader[1] #Result is [20,30]

@rjzamora
Copy link
Collaborator

Thanks for writing up this issue @gabrielspmoreira - As you know, the transform ops available in NVTabular are all "partition-wise" operations (they do not require any shuffling of data between distinct dask.DataFrame partitions). There are stat operators that do move data around, but the statistics/groupby results are always expected to be relatvely small, and the results are always persisted to disk at the end of the fit stage. The GroupbyJoin operator, for example, is still a partition-wise transform, because the (relatively-small) groupby statistics are joined to the original data during the transform stage.

With the above in mind, I want to clarify: Is this issue requesting a transform operator that does not follow the "partition-wise" convention? That is, is the goal to introduce a new type of transform that cannot be performed with a linear pass over the underlying ddf partitions or would this be a new "statistics" operation (only to be performed at "fit" time)? In order to avoid a departure from the current/simple transform convention, it may actually make sense to separate this feature from the usual Workflow/fit/tansform pipeline altogether, and just introduce a DataFrame.groupby method.

cc @benfred (since I'm certainly interested in your thoughts here as well)

@gabrielspmoreira
Copy link
Member Author

Thanks for your analysis @rjzamora. I understand that a GroupBy op requires full shuffle, because other rows from the same group (e.g. user id or session id) might not be in the same partition, right?

In this sense, I understand that it would be easier to implement DataFrame.groupby method outside the workflow. Therefore, it should be possible to do some custom sequence-wise transforms on the list columns after grouping. like: truncating the lists to a maximum length, counting the length of the sequence, getting the first value of the sequence. Such ops could be performed using a LambdaOp within our workflow, or our user could possibly implement then with dask_cudf apply outside the NVTabular workflow.

Another option would be having the GroupBy op as the first step in our pipeline, and then performing the subsequent ops (e.g. Categorify, LopOp, ...) on the list columns.

During inference, the task of sequential recommendation and session-based recommendation is to provide a ranked list of items for a user or session, represented by a sequence of his past interactions. So, Triton should receive a batch with the last user interactions and return the recommendation list. The GroupBy op (ordered by the timestamp column), should be performed on inference the same way it was during preprocessing. So I understand that the GroupBy op should be within our workflow.

@rjzamora
Copy link
Collaborator

rjzamora commented Mar 24, 2021

Thanks for your analysis @rjzamora. I understand that a GroupBy op requires full shuffle, because other rows from the same group (e.g. user id or session id) might not be in the same partition, right?

Exactly right - I was using the term "partition-wise" to describe a transform that does not require a row/record to move between partitions, but I'm not sure of the best language here.

Another option would be having the GroupBy op as the first step in our pipeline, and then performing the subsequent ops (e.g. Categorify, LopOp, ...) on the list columns.

The only reason I was suggesting a Dataset.groupby method (in lieue of a Groupby operator) is to make it impossible for users to try to use something like ops.Groupby in the "wrong" way. If the grouping logic belongs to the Dataset object, and is designed to produce a new Dataset object, we are effectively requiring this operation to be the first step in a typical NVT pipeline. In practice, users will get much better down-stream transform/dataloading performance if the result of the initial Dataset.groupby operation is persisted to disk (because it would relieve down-stream memory pressure), but an intermediate to_parquet call would not be a requirement.



Note that my current thinking is a bit different (see below), but I am still thinking that ops.Groupby cannot move data between partitions.

During inference, the task of sequential recommendation and session-based recommendation is to provide a ranked list of items for a user or session, represented by a sequence of his past interactions. So, Triton should receive a batch with the last user interactions and return the recommendation list. The GroupBy op (ordered by the timestamp column), should be performed on inference the same way it was during preprocessing. So I understand that the GroupBy op should be within our workflow.

Ah - This is a great point. Then I agree that we ultimately do need something like ops.Groupby. However, this operation will probably need to be a partition-wise transformation, and we will still need a new Dataset.<something> method to handle our data-shuffling needs for the training data. Perhaps it would be sufficient to enable the user to shuffle the initial Dataset object by the groupby keys? For example, if training/inference needs to group a batch of data by ['user_id', 'session_id’], this can be a partition-wise operation if the input data is guaranteed to have unique 'user_id’-‘session_id’ pairs within the same partition. So, perhaps we add both a Dataset.group_shuffle method and ops.Groupby? The only problem with this approach is that the partitioning may change if the user persists the Dataset.group_shuffle result to disk, and then they try to read it back for processing. We would need a new explicit option to partition by file in both the read and write (to_parquet).

@gabrielspmoreira
Copy link
Member Author

I have created a separate issue #734 for the support of list column truncation to a maximum sequence length

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants