Skip to content

05 creating features

Now that we've looked at the data sources and the features that already exist, let's try creating some of our own. We know we are working on a model to predict the propensity of a user clicking on a given ad, if served.

Creating a FeaturePackage from a Batch Data Source

Instructions for Creating a Batch FeaturePackage

  1. We decide we want to create a feature based on the number of ads a user has been shown on a given partner site over the past 7 days.
  2. To create this feature, we convert thead_impressions_batch dataset into a temporary table - this allows us to query the data source directly using SQL.

    ds = tecton.get_virtual_data_source('ad_impressions_batch')
    ds.dataframe().createOrReplaceTempView("tmp_ad_impressions")
    
  3. We then write the query for generating this feature - allowing some configurable parameters for flexibility.

    prediction_date = "2020-07-09"
    days_back = 7
    
    seven_days_cnt = spark.sql(f"""
          SELECT
            user_uuid,
            partner_id,
            count(*) as user_partner_total_impressions_{days_back}_days
          FROM
            tmp_ad_impressions
          WHERE timestamp < to_timestamp('{prediction_date}')
          AND timestamp > date_sub(to_timestamp('{prediction_date}'), {days_back})
          GROUP BY
            user_uuid, partner_id""")
    
    seven_days_cnt.show()
    
  4. After letting the query run, we are happy with the results. We decide to add this to the Feature Store as a user_partner_total_impressions_7_days feature. This is done by adding a new python file to the git repo they previously checked out. first by filling out a feature definition file. A blank one has been included below to fill out. For more detail on any of the parameters, please refer to our documentation.

    a. Filename: user_partner_total_impressions_7_days.py

    from tecton import sql_transformation, TemporalFeaturePackage, DataSourceConfig, MaterializationConfig
    from feature_repo.shared import entities as e, data_sources
    from datetime import datetime
    
    @sql_transformation(inputs=data_sources.ad_impressions_batch, has_context=True)
    def TRANSFORMATION_NAME(context, ad_impressions_batch):
        return f"""
        SELECT
            user_uuid,
            partner_id,
            count(*) as user_partner_total_impressions_7_days,
            to_timestamp('{context.feature_data_end_time}') as timestamp
        FROM
            {ad_impressions_batch}
        GROUP BY
            user_uuid, partner_id
                    """
    
    FEATURE_PACKAGE_NAME= TemporalFeaturePackage(
        name="CHANGE_ME_feature_name", # use underscores when naming
        description="[feature type] UPDATE DESCRIPTION OF FEATURE HERE",
        transformation=TRANSFORMATION_NAME,
        entities=[], # reference entity in feature store config
        materialization=MaterializationConfig(
            online_enabled=False,
            offline_enabled=False,
            feature_start_time=,
            schedule_interval='1day',
            serving_ttl='1day',
            data_lookback_period='7days'
        ),
    )
    

    b. Place the file in this location: {project_location}/ad-serving-tutorial/feature_store/feature_repo/shared/features/

  5. Run tecton plan to see if there are any errors or conflicts with the Feature Store. You should see a new Transformation and FeaturePackage. When satisfied with the update, run tecton apply.

Optional Exercise: Creating a Streaming Package

One of the great things about Tecton is the ability to create and use features directly from stream processing sources. Infrastructurally, these can be very challenging to setup - but Tecton makes it much simpler.

Instructions for Creating a Streaming FeaturePackage

  1. For this streaming FeaturePackage, we will use a class called TemporalAggregateFeaturePackage. It is very similar to the TemporalFeaturePacakge class we used for the batch feature, with some additions around common aggregations (count, min, max, sum). Streaming data is near-real time and we can take advantage of the information by running windowing functions calculated directly from the stream.
  2. To add this feature to the Feature Store, fill out a feature definition file. A blank one has been included below to fill out during the video. For more detail on any of the parameters, please refer to our documentation.

    from datetime import datetime
    from tecton import TemporalAggregateFeaturePackage, FeatureAggregation, DataSourceConfig, sql_transformation, MaterializationConfig
    from feature_repo.shared import data_sources, entities
    
    @sql_transformation(inputs=data_sources.ad_impressions_stream)
    def TRANSFORMER_NAME(input_df):
        return f"""
            select
                user_uuid,
                ad_id,
                1 as impression,
                timestamp
            from
                {input_df}
            """
    
    FEATURE_PACKAGE_NAME= TemporalAggregateFeaturePackage(
        name="",
        description="[Stream Feature] ",
        entities=[, ], # two entities are used in this transformation
        transformation=TRANSFORMER_NAME,
        aggregation_slide_period="",
        aggregations=[FeatureAggregation(column="", function="", time_windows=[])],
        materialization=MaterializationConfig(
            online_enabled=,
            offline_enabled=,
            feature_start_time=,
        ),
    )
    
  3. Paste the feature definition into a new python file in your local Feature Store Configuration. It should be placed alongside the other features that have been defined.

  4. Run tecton plan to see if there are any errors or conflicts with the Feature Store. You should see a new transformer and feature package. When satisfied with the update, run tecton apply.