tecton.EMRClusterConfig

class tecton.EMRClusterConfig(instance_type=None, instance_availability=None, number_of_workers=None, root_volume_size_in_gb=None, extra_pip_dependencies=None, spark_config=None)

Configuration used to specify materialization cluster options.

This class describes the attributes of the new clusters which are created in EMR during materialization jobs. You can configure options of these clusters, like cluster size and extra pip dependencies.

Parameters
  • instance_type (Optional[str]) – Instance type for the cluster. Must be a valid type as listed in https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-supported-instance-types.html. If not specified, a value determined by the Tecton backend is used.

  • instance_availability (Optional[str]) – Instance availability for the cluster : “spot” or “on_demand”. If not specified, default is spot.

  • number_of_workers (Optional[int]) – Number of instances for the materialization job. If not specified, a value determined by the Tecton backend is used

  • root_volume_size_in_gb (Optional[int]) – Size of the root volume in GB per instance for the materialization job. If not specified, a value determined by the Tecton backend is used.

  • extra_pip_dependencies (Optional[List[str]]) – Extra pip dependencies to be installed on the cluster.

  • spark_config (Optional[Dict[str, str]]) – Map of Spark configuration options and their respective values that will be passed to the FeatureView materialization Spark cluster. Currently, we support only the following options: spark.driver.memory, spark.driver.memoryOverhead, spark.executor.memory, spark.executor.memoryOverhead

Note on extra_pip_dependencies: This is a list of pip package names that will be installed during materialization. These libraries will only be available to use inside Spark UDFs. For example, if you set extra_pip_dependencies=["tensorflow"], you can use it in your transformation as shown below.

An example of EMRClusterConfig.

from tecton import batch_feature_view, Input, EMRClusterConfig

@batch_feature_view(
    inputs={'credit_scores': Input(credit_scores_batch)},
    # Can be an argument instance to a batch feature view decorator
    batch_cluster_config = EMRClusterConfig(
        instance_type = 'm5.2xlarge',
        number_of_workers=4,
        extra_pip_dependencies=["tensorflow==2.2.0"],
    ),
    # Other named arguments to batch feature view
    ...
)

# Use the tensorflow package in the UDF since tensorflow will be installed
# on the EMR Spark cluster. The import has to be within the UDF body. Putting it at the
# top of the file or inside transformation function won't work.

@transformation(mode='pyspark')
def test_transformation(transformation_input):
    from pyspark.sql import functions as F
    from pyspark.sql.types import IntegerType

    def my_tensorflow(x):
        import tensorflow as tf
        return int(tf.math.log1p(float(x)).numpy())

    my_tensorflow_udf = F.udf(my_tensorflow, IntegerType())

Methods

__init__

Initialize self.

__init__(instance_type=None, instance_availability=None, number_of_workers=None, root_volume_size_in_gb=None, extra_pip_dependencies=None, spark_config=None)

Initialize self. See help(type(self)) for accurate signature.