Configure Materialization of Spark-based Feature Views
Overview
When defining a Batch Feature View or a Stream Feature View, you can set the
batch_compute
and stream_compute
parameters, which can be set to cluster
config objects that are listed in the following table.
Data Platform | Simple Cluster Config Object | Custom Cluster Config Object |
---|---|---|
Databricks | DatabricksClusterConfig | DatabricksJsonClusterConfig |
EMR | EMRClusterConfig | EMRJsonClusterConfig |
Snowflake | Not supported | Not supported |
Compared to the simple cluster config objects, custom cluster config objects are more flexible. With the custom config objects, you can specify a wide range of configuration settings for your Spark cluster. For example, you can specify the number of worker nodes, the amount of memory and CPU to allocate to each worker, the Spark version to use, and the type of storage to use.
An example Batch Feature View that uses a DatabricksJsonClusterConfig
object
is shown below.
@batch_feature_view(
...,
batch_compute=DatabricksJsonClusterConfig(
json=my_json
), # my_json is your JSON string containing your spark cluster configurations
...,
)
def feature_view_1():
return ...
The JSON you use to specify your cluster config must conform to the Databricks schema or EMR schema that is required to submit jobs to the cluster.
After a custom cluster config object is instantiated (via the instantiation of a Feature View), Tecton internally updates the contents of the JSON string; some settings are appended and others are overwritten. Using the Web UI, you can see both the original JSON string and the updated JSON string under the materialization tab of your feature view.
The following sections describe how to use DatabricksJsonClusterConfig
or
EMRJsonClusterConfig
to customize Spark materialization jobs.
Configuring Databricks job clusters
Tecton uses the
Runs Submit
endpoint of the Databricks Jobs API 2.0 to programmatically create and manage
materialization jobs that run on Databricks. When a
DatabricksJsonClusterConfig
object is instantiated, a JSON string that
conforms to the Runs Submit
request schema
must provided.
Use new_cluster
for your cluster configurations; existing cluster reuse is not
supported.
Materialization jobs are executed as a notebook_task
. Other task types, such
as spark_jar_task
, spark_python_task
,spark_submit_task
, and
pipeline_task
are not supported.
Settings overridden by Tecton
Tecton overrides the following contents of the JSON value that is passed to a
DatabricksJsonClusterConfig
object: run_name
, cluster_log_conf
and
notebook_task
.
"run_name": "orch-<your_feature_view_name>-<your_feature_view_id>-<YOUR_TASK_TYPE>_<your_task_timestamp>-<your_task_attempt_number>",
"new_cluster": {
...,
"cluster_log_conf": {
"s3": {
"destination": "s3://<your_bucket>/logs/<your_run_name>",
"region": "<your_deployment_region>"
}
}
},
"notebook_task": {
"notebook_path": "/Users/<your_user_name>/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://<your_bucket>/intermediate-data/materialization_params/<your_run_name>"
}
}
Settings appended by Tecton
Tecton appends the following contents to the JSON value that is passed to a
DatabricksJsonClusterConfig
object.
-
init_scripts
:"init_scripts": [
...,
{
"s3": {
"destination": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/chronocollector_init_script_v2_esr.sh",
"region": "<your_deployment_region>"
}
}
] -
custom_tags
based on feature view metadata. Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags."custom_tags": [
...,
{
"key": "feature_view_id",
"value": "<your_feature_view_id>"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "<your_cluster_name>"
},
{
"key": "tecton_feature_view",
"value": "<your_feature_view_name>"
},
{
"key": "tecton_deployment",
"value": "<your_deployment_name>"
},
{
"key": "tecton_workspace",
"value": "<your_workspace_name>"
}
] -
spark_conf
:"spark_conf": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
} -
spark_env_vars
that are required by materialization jobs:"spark_env_vars": {
...,
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
} -
libraries
that are required by materialization jobs:DatabricksJsonClusterConfig
allows for specification of additional libraries, such as Maven or PyPI packages, that are used for materialization jobs on Databricks. This is done via the Databricks Jobs API, which supports the dynamic inclusion of libraries when creating new clusters for each job execution. Please keep the following considerations in mind:- Compatibility: Ensure that the libraries you add are compatible with the Databricks Runtime (DBR) version and other libraries in your environment.
- Cluster Lifecycle: Each materialization job creates a new cluster. There is no need to restart existing clusters for updates, as each job uses a freshly configured cluster.
"libraries": [
...,
{
"jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
}
]
Example Databricks cluster configuration
The following example first shows a sample JSON value that is passed to a
DatabricksJsonClusterConfig
object. Next, the final JSON value (including JSON
settings overridden and appended by Tecton) is shown. In the final JSON value,
settings appended by Tecton are highlighted.
JSON value passed to the DatabricksJsonClusterConfig object
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
}
],
"spark_conf": {
"spark.databricks.service.server.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.master": "local[*]",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME"
}
},
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
}
]
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to Databricks
{
"new_cluster": {
"num_workers": 0,
"spark_version": "11.3.x-scala2.12",
"node_type_id": "m5.large",
"aws_attributes": {
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100,
"instance_profile_arn": "arn:aws:iam::your_account_id:instance-profile/your-role",
"availability": "SPOT",
"zone_id": "auto"
},
"init_scripts": [
{
"s3": {
"destination": "s3://path_to_script_store/chronocollector_init_script_v2_esr.sh",
"region": "us-west-2"
}
}
],
"cluster_log_conf": {
"s3": {
"destination": "s3://path_to_log_store/your_run_name",
"region": "us-west-2"
}
},
"custom_tags": [
{
"key": "test_key",
"value": "test_value"
},
{
"key": "feature_view_id",
"value": "fv_id"
},
{
"key": "tecton-owned",
"value": "true"
},
{
"key": "tecton-cluster-name",
"value": "dev-blair"
},
{
"key": "tecton_feature_view",
"value": "test_bfv"
},
{
"key": "tecton_deployment",
"value": "dev-blair"
},
{
"key": "tecton_workspace",
"value": "db-1"
}
],
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.databricks.hive.metastore.glueCatalog.enabled": "true",
"spark.hadoop.fs.s3a.acl.default": "BucketOwnerFullControl",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.master": "local[*]",
"spark.databricks.service.server.enabled": "true",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink",
"spark.app.name": "test_bfv__fv_id"
},
"spark_env_vars": {
"TEST_ENV_VAR": "IGNORE_ME",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
},
"notebook_task": {
"notebook_path": "/Users/bot@tecton.ai/materialization_notebook.py",
"base_parameters": {
"materialization_params": "s3://path_to_param_store/your_run_name"
}
},
"run_name": "orch-test_bfv-fv_id-BATCH_2022.12.28.00:00:00-1",
"libraries": [
{
"jar": "s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
},
{
"jar": "s3://path_to_script_store/tecton-udfs-spark-3.jar"
},
{
"jar": "s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
},
{
"whl": "s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
}
]
}
Configuring EMR job clusters
See the list of supported EMR versions.
Required Settings
Tecton uses the RunJobFlow
request action of the EMR API to start a new EMR
cluster. When you instantiate an EMRJsonClusterConfig
object, you must provide
a JSON string that conforms with the
RunJobFlow request schema.
The following are required parameters:
ReleaseLabel
Instances
ServiceRole
JobFlowRole
These parameters are explained in the EMR RunJobFlow documentation here.
Setting Python Version in EMR Cluster
The system Python on EMR clusters varies depending on the EMR version - EMR 6.X uses Python 3.7. It is recommended to use a custom Python 3.9+ installation in your EMR 6.X cluster because Tecton will sunset support for Python 3.7 in the 1.3 SDK release. To set up the custom Python installation:
-
Add the following Python installation Bootstrap Action to your EMR cluster configuration.
To properly use the custom Python installation:
- Be sure to add the Python installation Bootstrap action before any
library installations from PyPi or S3 to ensure the libraries are installed
to the correct Python installation.
- The PATH variable is updated to point to the custom Python installation
by updating
/etc/environment
. In any Bootstrap actions used to install Python libraries (for PyPi or S3), runsource /etc/environment
to ensure the custom Python installation is referenced.
- The PATH variable is updated to point to the custom Python installation
by updating
Tecton currently offers the following Python tarballs:
- For EMR 6.X, use:
- Python 3.9.13:
s3://tecton.ai.public/install_scripts/emr-6.X/python3.9.13.tar.gz
- Python 3.9.13:
- For EMR 7.X, use:
- Python 3.9.13:
s3://tecton.ai.public/install_scripts/emr-7.X/python3.9.13.tar.gz
- Python 3.9.13:
Example for EMR 6.X, Python 3.9.13:
{
"BootstrapActions": [
{
"Name": "Install Python 3.9",
"ScriptBootstrapAction": {
"Path": "s3://tecton.ai.public/install_scripts/install_python_tarball.sh",
"Args": [
"s3://tecton.ai.public/install_scripts/emr-6.X/python3.9.13.tar.gz"
]
}
}
]
}Addsource /etc/environment
to any custom Bootstrap action scriptsThe PATH variable is updated to point to the custom Python installation by updating
/etc/environment
. In any Bootstrap actions used to install Python libraries (for PyPi or S3), runsource /etc/environment
to ensure the custom Python installation is referenced. - Be sure to add the Python installation Bootstrap action before any
library installations from PyPi or S3 to ensure the libraries are installed
to the correct Python installation.
-
Set your Python path to
spark-defaults
in theConfigurations
field forspark.pyspark.python
andspark.pyspark.driver.python
to ensure your Spark references the custom Python executable.The example below uses the Python path from the Tecton-provided tarball. Be sure to update with the appropriate path if you use a custom tarball.
Example:
{
"Configurations": [
{
"Classification": "spark-defaults",
"Properties": {
"spark.pyspark.python": "/usr/local/opt/python-tarball/bin/python3.9",
"spark.pyspark.driver.python": "/usr/local/opt/python-tarball/bin/python3.9"
}
}
]
}
Settings overridden by Tecton
Tecton overrides the following contents of the JSON value that is passed to a
EMRJsonClusterConfig
object.
-
The name of the job flow:
“Name”: “<your_run_name>”
-
LogUri
, based on feature view metadata:"LogUri": "s3://<your_bucket>/logs/<your_run_name>"
-
Applications
to include all required dependencies :"Applications": [
{
"Name": "Hadoop"
},
{
"Name": "Hive"
},
{
"Name": "Spark"
},
{
"Name": "Livy"
}
]
Settings appended by Tecton
Tecton appends the following contents to the JSON value that is passed to a
EMRJsonClusterConfig
object.
Bootstrap actions appended by Tecton
-
chronocollector
initialization{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/emr_chronocollector_init_script_v2.sh",
"Args": []
}
} -
The Delta JAR installation for Python:
{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_delta.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
]
}
} -
Installation of Tecton JARs from S3 to the destination
/home/hadoop/extrajars/*
, includingtecton-udfs-spark-3.jar
andtecton-online-store-sink-spark-3.jar
anddelta-core.jar
if you're using EMR 6.5:{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_jars_from_s3.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-udfs-spark-3.jar",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton-online-store-sink-spark-3.jar",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/delta-core.jar"
]
}
} -
Installation of the materialization library from S3 to the destination
/home/hadoop/extrajars/*
{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_s3.sh",
"Args": [
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/tecton_materialization-latest-py3-none-any.whl"
]
}
} -
Installation of
pyarrow==5.0.0
if you’re using EMR 6.5:{
"Name": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
"ScriptBootstrapAction": {
"Path": "s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/install_python_libraries_from_pypi.sh",
"Args": ["pyarrow==5.0.0"]
}
}
Steps appended by Tecton
-
A first step to enable debugging:
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
} -
A step for materialization, where transformation logic is run. Tecton will fill in the executable and its arguments to the
spark-submit
command and append several required dependency packages to the--packages
argument and--jars
argument:{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
...,
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
"--materialization-params",
"s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
"--materialization-step",
"1",
"--spark-session-name",
"<your_feature_view_name>__<your_feature_view_id>"
]
}
}
If you would like to specify any additional library dependencies via --jars
,
--packages
or --py-files
, see
following section.
-
A
Tecton Feature Store Writer
step:{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://<your_bucket>/internal/intermediate-data/script_store/<commit_hash>/materialization_entrypoint.py",
"--materialization-params",
"s3://<your_bucket>/internal/intermediate-data/materialization_params/<your_run_name>",
"--materialization-step",
"2",
"--spark-session-name",
"<your_feature_view_name>__<your_feature_view_id>"
]
}
}
Configurations appended by Tecton
Properties
for the spark-defaults
, spark-metrics
, yarn-env
, spark-env
and livy-env
classifications. Do not specify duplicate Properties
keys as
shown below, since duplicate keys are not allowed.
"Configurations": [
...,
{
"Classification": "spark-defaults",
"Configurations": [...],
"Properties": {
...,
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "<your_feature_view_name>__<your_feature_view_id>"
}
},
{
"Classification": "yarn-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "spark-metrics",
"Configurations": [...],
"Properties": {
...,
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
},
{
"Classification": "livy-env",
"Configurations": [
...,
{
"Classification": "export",
"Configurations": [...],
"Properties": {
...,
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "<your_cluster_name>"
}
}
],
"Properties": {...}
}
]
If you use a Glue or Iceberg catalog as your metastore, you may want to specify
hive-site
and spark-hive-site
settings.
Tags appended by Tecton
Do not specify tags with the same key as shown below, since duplicate keys are not allowed for tags.
"Tags": [
...,
{
"Key": "feature_view_id",
"Value": "<your_feature_view_id>"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "<your_cluster_name>"
},
{
"Key": "tecton_feature_view",
"Value": "<your_feature_view_name>"
},
{
"Key": "tecton_deployment",
"Value": "<your_deployment_name>"
},
{
"Key": "tecton_workspace",
"Value": "<your_workspace_name>"
}
]
Attaching custom libraries to EMR materialization jobs
-
Download and install your dependency libraries to your job clusters. We recommend doing so either via a bootstrap action or a step.
-
Pass your dependency libraries as arguments to the spark-submit command in
Tecton Materialization
step. For example, if you want to attach all custom jars installed in/home/hadoop/customjars/*
directory, a python library in an accessible S3 location and the protobuf maven package, theTecton Materialization
step config in your JSON could read as follows:{
"Name": "Tecton Materialization",
"HadoopJarStep": {
"Args": [
"spark-submit",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://<path_to_your_python_library>"
]
}
}
Example EMR cluster configuration
The following example first shows a sample JSON value that is passed to a
EMRJsonClusterConfig
object. Next, the final JSON value (including JSON
settings overridden and appended by Tecton) is shown. In the final JSON value,
settings appended by Tecton are highlighted.
JSON value passed to the EMRJsonClusterConfig object
{
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.driver.maxResultSize": "4G",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.sql.catalogImplementation": "hive",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE"
}
}
],
"Properties": {}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
}
],
"Steps": [
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0",
"--jars",
"/home/hadoop/customjars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl"
]
}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID"
}
Final JSON value (with settings appended by Tecton highlighted) that is passed to EMR
{
"Name": "orch-test_bfv-fv_id-BATCH_2022.11.02.00:00:00-1",
"LogUri": "s3://path_to_log_store/your_run_name",
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [],
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
},
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 0,
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "r4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 100,
"Configurations": []
}
]
}
],
"Ec2SubnetIds": ["subnet-your_net_a", "subnet-your_net_b"],
"EmrManagedMasterSecurityGroup": "sg-your_group_a",
"EmrManagedSlaveSecurityGroup": "sg-your_group_b",
"ServiceAccessSecurityGroup": "sg-your_group_c",
"AdditionalMasterSecurityGroups": [],
"AdditionalSlaveSecurityGroups": []
},
"Steps": [
{
"Name": "Enable debugging",
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Test Custom Step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": ["state-pusher-script"]
}
},
{
"Name": "Tecton Materialization",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--packages",
"com.google.protobuf:protobuf-java:3.22.0,org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/customjars/*,/home/hadoop/extrajars/*",
"--py-files",
"s3://tecton-dev-shared/blair/test.whl",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"1",
"--spark-session-name",
"test_bfv__fv_id"
]
}
},
{
"Name": "Tecton Feature Store Writer",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Properties": [],
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.qubole.spark:spark-sql-kinesis_2.12:1.2.0_spark-3.0,xerces:xercesImpl:2.8.0",
"--jars",
"/home/hadoop/extrajars/*",
"--conf",
"spark.driver.maxResultSize=4G",
"--conf",
"spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER",
"--conf",
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf",
"spark.sql.sources.partitionOverwriteMode=dynamic",
"--conf",
"spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
"--conf",
"spark.delta.DynamoDBLogStore.tableName=delta",
"--conf",
"spark.delta.DynamoDBLogStore.region=us-west-2",
"--conf",
"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension",
"s3://path_to_script_store/materialization_entrypoint.py",
"--materialization-params",
"s3://path_to_param_store/your_run_name",
"--materialization-step",
"2",
"--spark-session-name",
"test_bfv__fv_id"
]
}
}
],
"BootstrapActions": [
{
"Name": "install_jars_from_s3",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_jars_from_s3.sh",
"Args": [
"s3://tecton.ai.public/jars/spark-snowflake_2.12-2.9.1-spark_3.0.jar",
"s3://tecton.ai.public/jars/snowflake-jdbc-3.13.6.jar"
]
}
},
{
"Name": "install_python_libraries_from_pypi",
"ScriptBootstrapAction": {
"Path": "s3://tecton-dev-shared/blair/install_python_libraries_from_pypi.sh",
"Args": ["apache-sedona"]
}
},
{
"Name": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_python_libraries_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton_materialization-latest-py3-none-any.whl"
]
}
},
{
"Name": "s3://path_to_script_store/install_jars_from_s3.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/install_jars_from_s3.sh",
"Args": [
"s3://path_to_script_store/tecton-udfs-spark-3.jar",
"s3://path_to_script_store/tecton-online-store-sink-spark-3.jar"
]
}
},
{
"Name": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"ScriptBootstrapAction": {
"Path": "s3://path_to_script_store/emr_chronocollector_init_script_v2.sh",
"Args": []
}
}
],
"Applications": [
{
"Name": "Hadoop",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Hive",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Spark",
"Args": [],
"AdditionalInfo": {}
},
{
"Name": "Livy",
"Args": [],
"AdditionalInfo": {}
}
],
"Configurations": [
{
"Classification": "spark-defaults",
"Configurations": [],
"Properties": {
"spark.sql.legacy.parquet.datetimeRebaseModeInRead": "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInRead": "CORRECTED",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.catalogImplementation": "hive",
"spark.hadoop.hive.metastore.glue.catalogid": "your_account_id",
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED",
"spark.sql.streaming.metricsEnabled": "true",
"spark.metrics.namespace": "${spark.app.name}",
"spark.app.name": "test_bfv__fv_id"
}
},
{
"Classification": "hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "spark-hive-site",
"Configurations": [],
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"hive.metastore.glue.catalogid": "your_account_id"
}
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"TEST_ENV_VAR": "TEST_VALUE",
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "spark-metrics",
"Configurations": [],
"Properties": {
"*.sink.statsd.prefix": "spark",
"*.sink.statsd.host": "0.0.0.0",
"*.sink.statsd.period": "5",
"*.sink.statsd.unit": "seconds",
"*.sink.statsd.port": "3031",
"*.source.jvm.class": "org.apache.spark.metrics.source.JvmSource",
"*.sink.statsd.class": "org.apache.spark.metrics.sink.StatsdSink"
}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
},
{
"Classification": "livy-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"CLUSTER_REGION": "us-west-2",
"AWS_STS_REGIONAL_ENDPOINTS": "regional",
"TECTON_CLUSTER_NAME": "dev-blair"
}
}
],
"Properties": {}
}
],
"JobFlowRole": "your-job-flow-role",
"ServiceRole": "your-service-role",
"CustomAmiId": "your-custom-AMI-ID",
"Tags": [
{
"Key": "feature_view_id",
"Value": "fv_id"
},
{
"Key": "tecton-owned",
"Value": "true"
},
{
"Key": "tecton-cluster-name",
"Value": "dev-blair"
},
{
"Key": "tecton_feature_view",
"Value": "test_bfv"
},
{
"Key": "tecton_deployment",
"Value": "dev-blair"
},
{
"Key": "tecton_workspace",
"Value": "emr-1"
},
{
"Key": "tecton-accessible:dev-shared",
"Value": "true"
}
]
}
Migrating simple cluster config objects to custom cluster config objects
To migrate simple cluster config objects to custom cluster config objects, follow these steps.
- Upgrade your Tecton SDK version to 0.6
- For each Feature View that uses simple cluster config objects:
- Replace the simple cluster config objects with new custom cluster config objects.
- As needed, update
batch_compute
and/orstream_compute
in your Feature view declaration to use the new custom cluster config objects. - In the Tecton CLI, run
tecton apply
to apply your changes. The new configuration will be used for any future jobs.