Custom User Defined Function
Often, you may want to implement a feature logic that is not supported by the Aggregations and Transformations that Canso supports out of the box. In such cases, We support custom UDFs that offers greater flexibility, allowing you to apply complex, customized operations to your data, ensuring a more tailored and dynamic processing experience.
How It Works
Implement your custom feature logic by developing a Python script that follows the specified UDF contract.
Build a custom docker image using the base Docker image provided by the canso platform. You can include your UDF script and any additional dependencies required at runtime.
Supply the alias for the image pull secrets in processing_engine_configs to ensure that your custom Docker image is accessible during feature execution.
Use a Python client to define the necessary arguments for your custom UDF, register it, and deploy it.
Detailed instructions for each step are provided below.
Currently, custom Docker images can only be pulled from DockerHub repositories, such that these images run in the Canso data plane. Image pull secrets are not supported yet. Support for private container registries (such as Amazon ECR, Google Container Registry, Azure Container Registry) and image pull secrets will be added in an upcoming release very soon.
Write Custom UDF Script
Base Class for UDFs
To implement a UDF, you must include this base class in your script. The apply
method is where you define the custom logic, and custom_args
allows for the dynamic passing of parameters required during feature execution.
from abc import ABC, abstractmethod
class CustomOperationHandler(ABC):
def __init__(self, custom_args):
self.custom_args = custom_args
@abstractmethod
def apply(self):
pass
Modes of Operation
Canso Platform supports two modes for processing custom UDFs
PySpark SQL Mode: Allows implementation of custom UDFs using
Spark SQL
approach.PySpark Functional Mode: Allows implementation of custom UDFs using
Spark Functional
programming approach.
These modes allow you to implement simple or complex UDFs based on your requirements.
PySpark SQL Mode
This mode is suitable for executing SQL-based transformations directly in Spark SQL.
In this mode, the DataFrame created from a registered data source is treated as a temporary table in PySpark. You can refer to the data source name directly in your custom UDF.
Example: Simple Groupby Logic
UDF containing simple logics doesn't need
custom_args
.For instance,
employee_salary_l3m
is a feature name, andtreasury_data
is a registered data source.
class SalaryAggregation(CustomOperationHandler):
def apply(self):
sql_query = f"""
SELECT
id,
timestamp,
SUM(salary) AS employee_salary_l3m
FROM
treasury_data
GROUP BY
id,
timestamp
"""
return sql_query
Example: Complex Set of Operations
In more complex cases, dynamic values can be retrieved from custom_args
to customize the query logic.
PySpark Functional Mode
This mode uses PySpark's DataFrame API for operations like joins, groupBy, and windowing.
The DataFrame created from a registered data source is passed through
custom_feature_kwargs
, allowing the custom logic to use it dynamically.
Example: Simple Groupby Logic
This simple functional operation joins two DataFrames. The values are hardcoded in the apply
method.
from pyspark.sql import functions as F
class SalaryAggregation(CustomOperationHandler):
def apply(self):
data_source_df = self.custom_args.get("clicks_data")
feature_name = self.custom_args.get("feature_name")
result_df = (data_source_df
.groupBy("id", "timestamp")
.agg(F.sum("salary").alias(feature_name))
)
return result_df
Example: Complex Set of DataFrame Operations
For more advanced use cases, custom_args
can be used to dynamically adjust the operation parameters. The logic can be modular by defining helper methods.
Build Custom Docker Image
Example Dockerfile to demonstrates how to add custom Python dependencies and UDF scripts to the base image.
Base image
shaktimaanbot/canso-jobs:v0.0.1-beta
containing basic set of python libraries will be provided.
# Set the base image to use for the Docker image being built
FROM --platform=linux/amd64 shaktimaanbot/canso-jobs:v0.0.1-beta
# Set the working directory inside container
WORKDIR /opt/spark/work-dir/bob
# Install additional set of required Python libraries
COPY extra_pip_dependencies.txt .
RUN pip3 install --no-cache-dir -r extra_pip_dependencies.txt && \
rm -rf /root/.cache/pip
# Copy specified set of scripts and modules from the host to current working directory
COPY /my/local/path/to/custom_udf.py /opt/spark/work-dir/bob/src/v2/external_udfs/features/my/custom/path/
WORKDIR /opt/spark/work-dir/bob/src/v2/jobs
# Change the permission of streaming jobs to make them executable within the container
RUN chmod 777 feature_materializer.py
Define Python Client
This is how users register their custom UDF through the Python client.
User need to specify these things specific to custom UDF:
custom_class_name
: Executable class containing logic.custom_file_path
: Additional path supplied in the dockerfile which contains UDF.docker_image
: Custom Docker Image containing UDF script with additional dependencies.custom_feature_args
: Feature args required by the UDF at the run time.mode
:spark_sql
to execute sql logic andspark_function
to execute functional logic.
streaming_feature_obj = StreamingFeature(
name="employee_salary_l3m",
description="Total Employee Salary Sum in the Last 3 months",
data_type=DataType.FLOAT,
data_sources=["treasury_data"],
owners=["[email protected]"],
feature_logic=CustomFeatureLogic(
custom_class_name="SalaryAggregation",
custom_file_path="/my/custom/path/custom_udf_spark_sql.py",
custom_docker_image="custom-repo/custom-image:v0.0.1-beta",
mode = "spark_sql",
custom_args={
"timestamp_field": "timestamp",
"groupby_keys": ["id", "timestamp"]
},
),
processing_engine=ProcessingEngine.PYSPARK_K8S,
processing_engine_configs=ProcessingEngineConfigs(spark_streaming_flag=True),
online_sink=["salary_agg_sink"],
online=True,
offline=False,
)
Last updated
Was this helpful?