tecton.declarative.Aggregation

class tecton.declarative.Aggregation(column, function, time_window, name=None)

This class describes a single aggregation that is applied in a batch or stream feature view.

Parameters
  • column (str) – Column name of the feature we are aggregating.

  • function (Union[str, AggregationFunction]) – One of the built-in aggregation functions.

  • time_window (datetime.timedelta) – Duration to aggregate over. Example: datetime.timedelta(days=30).

  • name (str) – The name of this feature. Defaults to an autogenerated name, e.g. transaction_count_7d_1d.

function can be one of predefined numeric aggregation functions, namely "count", "sum", "mean", "min", "max", "var_samp", "var_pop", "variance" - alias for "var_samp", "stddev_samp", "stddev_pop", "stddev" - alias for "stddev_samp". For these numeric aggregations, you can pass the name of it as a string. Nulls are handled like Spark SQL Function(column), e.g. SUM/MEAN/MIN/MAX/VAR_SAMP/VAR_POP/VAR/STDDEV_SAMP/STDDEV_POP/STDDEV of all nulls is null and COUNT of all nulls is 0.

In addition to numeric aggregations, Aggregation supports the last distinct N aggregation that will compute the last N distinct values for the column by timestamp. Right now only string column is supported as input to this aggregation, i.e., the resulting feature value will be a list of strings. The order of the value in the list is ascending based on the timestamp. Nulls are not included in the aggregated list.

You can use it via the last_distinct() helper function like this:

from tecton.aggregation_functions import last_distinct

@batch_feature_view(
...
aggregations=[Aggregation(
    column='my_column',
    function=last_distinct(15),
    time_window=datetime.timedelta(days=7))],
...
)
def my_fv(data_source):
    pass

Methods

__init__

Method generated by attrs for class Aggregation.

__init__(column, function, time_window, name=None)

Method generated by attrs for class Aggregation.