Skip to main content
Version: 0.8

Customize Spark Materialization Jobs

Overview

When defining a Batch Feature View or a Stream Feature View, you can set the batch_compute and stream_compute parameters, which can be set to cluster config objects that are listed in the following table.

Data PlatformSimple Cluster Config ObjectCustom Cluster Config Object
DatabricksDatabricksClusterConfigDatabricksJsonClusterConfig
EMREMRClusterConfigEMRJsonClusterConfig
SnowflakeNot supportedNot supported

Compared to the simple cluster config objects, custom cluster config objects are more flexible. With the custom config objects, you can specify a wide range of configuration settings for your Spark cluster. For example, you can specify the number of worker nodes, the amount of memory and CPU to allocate to each worker, the Spark version to use, and the type of storage to use.

An example Batch Feature View that uses a DatabricksJsonClusterConfig object is shown below.

@batch_feature_view(
...,
batch_compute=DatabricksJsonClusterConfig(
json=my_json
), # my_json is your JSON string containing your spark cluster configurations
...,
)
def feature_view_1():
return ...

The JSON you use to specify your cluster config must conform to the Databricks schema or EMR schema that is required to submit jobs to the cluster.

After a custom cluster config object is instantiated (via the instantiation of a Feature View), Tecton internally updates the contents of the JSON string; some settings are appended and others are overwritten. Using the Web UI, you can see both the original JSON string and the updated JSON string under the materialization tab of your feature view.

The following sections describe how to use DatabricksJsonClusterConfig or EMRJsonClusterConfig to customize Spark materialization jobs.

Configuring Databricks job clusters

Tecton uses the Runs Submit endpoint of the Databricks Jobs API 2.0 to programmatically create and manage materialization jobs that run on Databricks. When a DatabricksJsonClusterConfig object is instantiated, a JSON string that conforms to the Runs Submit request schema must provided.

Use new_cluster for your cluster configurations; existing cluster reuse is not supported.

Materialization jobs are executed as a notebook_task. Other task types, such as spark_jar_task, spark_python_task ,spark_submit_task , and pipeline_task are not supported.

Settings overridden by Tecton

Tecton overrides the following contents of the JSON value that is passed to a DatabricksJsonClusterConfig object: run_name, cluster_log_conf and notebook_task.

"run_name": "orch-<your_feature_view_name>-<your_feature_view_id>-<YOUR_TASK_TYPE>_<your_task_timestamp>-<your_task_attempt_number>",
"new_cluster": {
...,
"cluster_log_conf": {
"s3": {
"destination": "s3://<your_bucket>/logs/<your_run_name>",
"region": "<your_deployment_region>"
}
}
},
"notebook_task": {
"notebook_path": "/Users/<your_user_name>/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://<your_bucket>/intermediate-data/materialization_params/<your_run_name>"
}
}

Settings appended by Tecton

Tecton appends the following contents to the JSON value that is passed to a DatabricksJsonClusterConfig object.

  • init_scripts:

    "init_scripts": [
    ...,
    {
    "s3": {
    "destination": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/chronocollector_init_script_v2_esr.sh",
    "region": "<your_deployment_region>"
    }
    }
    ]
  • custom_tags based on feature view metadata. Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags.

    "custom_tags": [
    ...,
    {
    "key": "feature_view_id",
    "value": "<your_feature_view_id>"
    },
    {
    "key": "tecton-owned",
    "value": "true"
    },
    {
    "key": "tecton-cluster-name",
    "value": "<your_cluster_name>"
    },
    {
    "key": "tecton_feature_view",
    "value": "<your_feature_view_name>"
    },
    {
    "key": "tecton_deployment",
    "value": "<your_deployment_name>"
    },
    {
    "key": "tecton_workspace",
    "value": "<your_workspace_name>"
    }
    ]
  • spark_conf:

    "spark_conf": {
    ...,
    "spark.sql.streaming.metricsEnabled": "true",
    "spark.metrics.namespace": "${spark.app.name}",
    "*.sink.statsd.prefix": "spark",
    "*.sink.statsd.host": "0.0.0.0",
    "*.sink.statsd.period": "5",
    "*.sink.statsd.unit": "seconds",
    "*.sink.statsd.port": "3031",
    "*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
    "*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
    "spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
    }
  • spark_env_vars that are required by materialization jobs:

    "spark_env_vars": {
    ...,
    "AWS_STS_REGIONAL_ENDPOINTS": "regional",
    "TECTON_CLUSTER_NAME": "<your_cluster_name>"
    }
  • libraries that are required by materialization jobs:

    "libraries": [
    ...,
    {
    "jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar"
    },
    {
    "jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar"
    },
    {
    "whl": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
    }
    ]

Example Databricks cluster configuration

The following example first shows a sample JSON value that is passed to a DatabricksJsonClusterConfig object. Next, the final JSON value (including JSON settings overridden and appended by Tecton) is shown. In the final JSON value, settings appended by Tecton are highlighted.

JSON value passed to the DatabricksJsonClusterConfig object
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
}
],
"spark_conf": {
"spark.databricks.service.server.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.master": "local[*]",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME"
}
},
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
}
]
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to Databricks
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"init_scripts": [
{
"s3": {
"destination": "s3://path_to_script_store/chronocollector_init_script_v2_esr.sh",
"region": "us-west-2"
}
}
],
"cluster_log_conf": {
"s3": {
"destination": "s3://path_to_log_store/your_run_name",
"region": "us-west-2"
}
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
},
{
"key": "feature_view_id",
"value": "fv_id"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "dev-blair"
},
{
"key": "tecton_feature_view",
"value": "test_bfv"
},
{
"key": "tecton_deployment",
"value": "dev-blair"
},
{
"key": "tecton_workspace",
"value": "db-1"
}
],
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.master": "local[*]",
"spark.databricks.service.server.enabled": "true",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "test_bfv__fv_id"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
},
"notebook_task": {
"notebook_path": "/Users/bot@tecton.ai/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://path_to_param_store/your_run_name"
}
},
"run_name": "orch-test_bfv-fv_id-BATCH_2022.12.28.00:00:00-1",
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
},
{
"jar": "s3://path_to_script_store/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
}
]
}

Configuring EMR job clusters

note

Tecton supports EMR versions from 6.5 to 6.9.

Tecton uses the RunJobFlow request action of the EMR API to start a new EMR cluster. When you instantiate an EMRJsonClusterConfig object, you must provide a JSON string that conforms with the RunJobFlow request schema. The following are required parameters: ReleaseLabel, Instances, ServiceRole, JobFlowRole. These parameters are explained in the EMR RunJobFlow documentation that is linked to earlier in this paragraph.

Settings overridden by Tecton

Tecton overrides the following contents of the JSON value that is passed to a EMRJsonClusterConfig object.

  • The name of the job flow:

    “Name”:<your_run_name>
  • LogUri , based on feature view metadata:

    "LogUri": "s3://<your_bucket>/logs/<your_run_name>"
  • Applications to include all required dependencies :

    "Applications": [
    {
    "Name": "Hadoop"
    },
    {
    "Name": "Hive"
    },
    {
    "Name": "Spark"
    },
    {
    "Name": "Livy"
    }
    ]

Settings appended by Tecton

Tecton appends the following contents to the JSON value that is passed to a EMRJsonClusterConfig object.

Bootstrap actions appended by Tecton

  • chronocollector initialization

    {
    "Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
    "ScriptBootstrapAction": {
    "Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
    "Args": []
    }
    }
  • The Delta JAR installation for Python:

    {
    "Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
    "ScriptBootstrapAction": {
    "Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
    "Args": [
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
    ]
    }
    }
  • Installation of Tecton JARs from S3 to the destination /home/hadoop/extrajars/*, including tecton-udfs-spark-3.jar and tecton-online-store-sink-spark-3.jar and delta-core.jar if you're using EMR 6.5:

    {
    "Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
    "ScriptBootstrapAction": {
    "Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
    "Args": [
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar",
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar",
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
    ]
    }
    }
  • Installation of the materialization library from S3 to the destination /home/hadoop/extrajars/*

    {
    "Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
    "ScriptBootstrapAction": {
    "Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
    "Args": [
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
    ]
    }
    }
  • Installation of pyarrow==5.0.0 if you’re using EMR 6.5:

    {
    "Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
    "ScriptBootstrapAction": {
    "Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
    "Args": ["pyarrow==5.0.0"]
    }
    }

Steps appended by Tecton

  • A first step to enable debugging:

    {
    "Name": "Enable debugging",
    "ActionOnFailure": "TERMINATE_JOB_FLOW",
    "HadoopJarStep": {
    "Properties": [],
    "Jar": "command-runner.jar",
    "Args": ["state-pusher-script"]
    }
    }
  • A step for materialization, where transformation logic is run. Tecton will fill in the executable and its arguments to the spark-submit command and append several required dependency packages to the --packages argument and --jars argument:

    {
    "Name": "Tecton Materialization",
    "ActionOnFailure": "TERMINATE_CLUSTER",
    "HadoopJarStep": {
    "Properties": [],
    "Jar": "command-runner.jar",
    "Args": [
    "spark-submit",
    ...,
    "--packages",
    "org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
    "--jars",
    "/home/hadoop/extrajars/*",
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
    "--materialization-params",
    "s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
    "--materialization-step",
    "1",
    "--spark-session-name",
    "<your_feature_view_name>__<your_feature_view_id>"
    ]
    }
    }

If you would like to specify any additional library dependencies via --jars, --packages or --py-files, see following section.

  • A Tecton Feature Store Writer step:

    {
    "Name": "Tecton Feature Store Writer",
    "ActionOnFailure": "TERMINATE_CLUSTER",
    "HadoopJarStep": {
    "Properties": [],
    "Jar": "command-runner.jar",
    "Args": [
    "spark-submit",
    "--packages",
    "org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
    "--jars",
    "/home/hadoop/extrajars/*",
    "--conf",
    "spark.driver.maxResultSize=4G",
    "--conf",
    "spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
    "--conf",
    "spark.serializer=org.apache.spark.serializer.KryoSerializer",
    "--conf",
    "spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
    "--conf",
    "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "--conf",
    "spark.delta.DynamoDBLogStore.tableName=delta",
    "--conf",
    "spark.delta.DynamoDBLogStore.region=us-west-2",
    "--conf",
    "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
    "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
    "--materialization-params",
    "s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
    "--materialization-step",
    "2",
    "--spark-session-name",
    "<your_feature_view_name>__<your_feature_view_id>"
    ]
    }
    }

Configurations appended by Tecton

Properties for the spark-defaults, spark-metrics, yarn-env, spark-env and livy-env classifications. Do not specify duplicate Properties keys as shown below, since duplicate keys are not allowed.

"Configurations": [
...,
{
"Classification": "spark-defaults",
"Configurations": [...],
"Properties": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
}
},
{
"Classification": "yarn-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "spark-metrics",
"Configurations": [...],
"Properties": {
...,
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "livy-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
}
]

If you use a Glue or Iceberg catalog as your metastore, you may want to specify hive-site and spark-hive-site settings.

Tags appended by Tecton

Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags.

"Tags": [
...,
{
"Key": "feature_view_id",
"Value": "<your_feature_view_id>"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "<your_cluster_name>"
},
{
"Key": "tecton_feature_view",
"Value": "<your_feature_view_name>"
},
{
"Key": "tecton_deployment",
"Value": "<your_deployment_name>"
},
{
"Key": "tecton_workspace",
"Value": "<your_workspace_name>"
}
]

Attaching custom libraries to EMR materialization jobs

  1. Download and install your dependency libraries to your job clusters. We recommend doing so either via a bootstrap action or a step.

  2. Pass your dependency libraries as arguments to the spark-submit command in Tecton Materialization step. For example, if you want to attach all custom jars installed in /home/hadoop/customjars/* directory, a python library in an accessible S3 location and the protobuf maven package, the Tecton Materialization step config in your JSON could read as follows:

    {
    "Name": "Tecton Materialization",
    "HadoopJarStep": {
    "Args": [
    "spark-submit",
    "--packages",
    "com.google.protobuf:protobuf-java:3.22.0",
    "--jars",
    "/home/hadoop/customjars/*",
    "--py-files",
    "s3://<path_to_your_python_library>"
    ]
    }
    }

Example EMR cluster configuration

The following example first shows a sample JSON value that is passed to a EMRJsonClusterConfig object. Next, the final JSON value (including JSON settings overridden and appended by Tecton) is shown. In the final JSON value, settings appended by Tecton are highlighted.

JSON value passed to the EMRJsonClusterConfig object
{
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.driver.maxResultSize": "4G",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.sql.catalogImplementation": "hive",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE"
}
}
],
"Properties": {}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
}
],
"Steps": [
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl"
]
}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID"
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to EMR
{
"Name": "orch-test_bfv-fv_id-BATCH_2022.11.02.00:00:00-1",
"LogUri": "s3://path_to_log_store/your_run_name",
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Steps": [
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0,org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/customjars/*,/home/hadoop/extrajars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"1",
"--spark-session-name",
"test_bfv__fv_id"
]
}
},
{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.sql.sources.partitionOverwriteMode=dynamic",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"2",
"--spark-session-name",
"test_bfv__fv_id"
]
}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
},
{
"Name": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
]
}
},
{
"Name": "s3://path_to_script_store/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_jars_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton-udfs-spark-3.jar",
"s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
]
}
},
{
"Name": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"Args": []
}
}
],
"Applications": [
{
"Name": "Hadoop",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Hive",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Spark",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Livy",
"Args": [],
"AdditionalInfo": {}
}
],
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.catalogImplementation": "hive",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "test_bfv__fv_id"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE",
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "spark-metrics",
"Configurations": [],
"Properties": {
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "livy-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID",
"Tags": [
{
"Key": "feature_view_id",
"Value": "fv_id"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "dev-blair"
},
{
"Key": "tecton_feature_view",
"Value": "test_bfv"
},
{
"Key": "tecton_deployment",
"Value": "dev-blair"
},
{
"Key": "tecton_workspace",
"Value": "emr-1"
},
{
"Key": "tecton-accessible:dev-shared",
"Value": "true"
}
]
}

Migrating simple cluster config objects to custom cluster config objects

To migrate simple cluster config objects to custom cluster config objects, follow these steps.

  1. Upgrade your Tecton SDK version to 0.6
  2. For each Feature View that uses simple cluster config objects:
    1. Replace the simple cluster config objects with new custom cluster config objects.
    2. As needed, update batch_compute and/or stream_compute in your Feature view declaration to use the new custom cluster config objects.
    3. In the Tecton CLI, run tecton apply to apply your changes. The new configuration will be used for any future jobs.

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon