Often, you may want to implement a feature logic that is not supported by the Aggregations and Transformations that Canso supports out of the box. In such cases, We support custom UDFs that offers greater flexibility, allowing you to apply complex, customized operations to your data, ensuring a more tailored and dynamic processing experience.
How It Works
Implement your custom feature logic by developing a Python script that follows the specified UDF contract.
Build a custom docker image using the base Docker image provided by the canso platform. You can include your UDF script and any additional dependencies required at runtime.
Supply the alias for the image pull secrets in processing_engine_configs to ensure that your custom Docker image is accessible during feature execution.
Use a Python client to define the necessary arguments for your custom UDF, register it, and deploy it.
Detailed instructions for each step are provided below.
Currently, custom Docker images can only be pulled from DockerHub repositories, such that these images run in the Canso data plane. Image pull secrets are not supported yet. Support for private container registries (such as Amazon ECR, Google Container Registry, Azure Container Registry) and image pull secrets will be added in an upcoming release very soon.
Write Custom UDF Script
Base Class for UDFs
To implement a UDF, you must include this base class in your script. The apply method is where you define the custom logic, and custom_args allows for the dynamic passing of parameters required during feature execution.
from abc import ABC, abstractmethod
class CustomOperationHandler(ABC):
def __init__(self, custom_args):
self.custom_args = custom_args
@abstractmethod
def apply(self):
pass
Modes of Operation
Canso Platform supports two modes for processing custom UDFs
PySpark SQL Mode: Allows implementation of custom UDFs using Spark SQL approach.
PySpark Functional Mode: Allows implementation of custom UDFs using Spark Functional programming approach.
These modes allow you to implement simple or complex UDFs based on your requirements.
PySpark SQL Mode
This mode is suitable for executing SQL-based transformations directly in Spark SQL.
In this mode, the DataFrame created from a registered data source is treated as a temporary table in PySpark. You can refer to the data source name directly in your custom UDF.
Example: Simple Groupby Logic
UDF containing simple logics doesn't need custom_args.
For instance, employee_salary_l3m is a feature name, and treasury_data is a registered data source.
class SalaryAggregation(CustomOperationHandler):
def apply(self):
sql_query = f"""
SELECT
id,
timestamp,
SUM(salary) AS employee_salary_l3m
FROM
treasury_data
GROUP BY
id,
timestamp
"""
return sql_query
Example: Complex Set of Operations
In more complex cases, dynamic values can be retrieved from custom_args to customize the query logic.
Example containing a set of SQL operations
class ComplexSalaryAggregation(CustomOperationHandler):
def apply(self):
# Retrieve parameters from custom_args
feature_name = self.custom_args.get("feature_name")
table_name = self.custom_args.get("data_source")
time_periods = self.custom_args.get("time_periods")
weights = self.custom_args.get("weights")
salary_increase_bonus = self.custom_args.get("salary_increase_bonus")
salary_decrease_penalty = self.custom_args.get("salary_decrease_penalty")
top_rank_bonus = self.custom_args.get("top_rank_bonus")
top_rank_threshold = self.custom_args.get("top_rank_threshold")
time_period_clauses = [
f"AVG(salary) OVER (PARTITION BY id ORDER BY timestamp ROWS BETWEEN {period-1} PRECEDING AND CURRENT ROW) AS avg_salary_l{period}m"
for period in time_periods
]
weighted_avg_clauses = [
f"COALESCE(avg_salary_l{period}m, 0) * {weight}"
for period, weight in zip(time_periods, weights)
]
sql_query = f"""
WITH salary_stats AS (
SELECT
id,
timestamp,
salary,
{', '.join(time_period_clauses)},
LAG(salary) OVER (PARTITION BY id ORDER BY timestamp) AS prev_salary,
RANK() OVER (PARTITION BY id ORDER BY salary DESC) AS salary_rank
FROM
{table_name}
),
salary_changes AS (
SELECT
*,
CASE
WHEN salary > prev_salary THEN 1
WHEN salary < prev_salary THEN -1
ELSE 0
END AS salary_change_direction,
(salary - prev_salary) / NULLIF(prev_salary, 0) * 100 AS salary_change_percentage
FROM
salary_stats
)
SELECT
id,
timestamp,
salary AS current_salary,
{', '.join(f'avg_salary_l{period}m' for period in time_periods)},
salary_rank,
salary_change_direction,
salary_change_percentage,
(
{' + '.join(weighted_avg_clauses)} +
CASE
WHEN salary_change_direction = 1 THEN {salary_increase_bonus}
WHEN salary_change_direction = -1 THEN {salary_decrease_penalty}
ELSE 0
END +
CASE
WHEN salary_rank = 1 THEN {top_rank_bonus}
WHEN salary_rank <= {top_rank_threshold} THEN {top_rank_bonus // 2}
ELSE 0
END
) AS {feature_name}
FROM
salary_changes
ORDER BY
id, timestamp
"""
return sql_query
PySpark Functional Mode
This mode uses PySpark's DataFrame API for operations like joins, groupBy, and windowing.
The DataFrame created from a registered data source is passed through custom_feature_kwargs, allowing the custom logic to use it dynamically.
Example: Simple Groupby Logic
This simple functional operation joins two DataFrames. The values are hardcoded in the apply method.
from pyspark.sql import functions as F
class SalaryAggregation(CustomOperationHandler):
def apply(self):
data_source_df = self.custom_args.get("clicks_data")
feature_name = self.custom_args.get("feature_name")
result_df = (data_source_df
.groupBy("id", "timestamp")
.agg(F.sum("salary").alias(feature_name))
)
return result_df
Example: Complex Set of DataFrame Operations
For more advanced use cases, custom_args can be used to dynamically adjust the operation parameters. The logic can be modular by defining helper methods.
Example Dockerfile to demonstrates how to add custom Python dependencies and UDF scripts to the base image.
Base image shaktimaanbot/canso-jobs:v0.0.1-beta containing basic set of python libraries will be provided.
# Set the base image to use for the Docker image being built
FROM --platform=linux/amd64 shaktimaanbot/canso-jobs:v0.0.1-beta
# Set the working directory inside container
WORKDIR /opt/spark/work-dir/bob
# Install additional set of required Python libraries
COPY extra_pip_dependencies.txt .
RUN pip3 install --no-cache-dir -r extra_pip_dependencies.txt && \
rm -rf /root/.cache/pip
# Copy specified set of scripts and modules from the host to current working directory
COPY /my/local/path/to/custom_udf.py /opt/spark/work-dir/bob/src/v2/external_udfs/features/my/custom/path/
WORKDIR /opt/spark/work-dir/bob/src/v2/jobs
# Change the permission of streaming jobs to make them executable within the container
RUN chmod 777 feature_materializer.py
Define Python Client
This is how users register their custom UDF through the Python client.
User need to specify these things specific to custom UDF:
custom_class_name: Executable class containing logic.
custom_file_path: Additional path supplied in the dockerfile which contains UDF.
docker_image: Custom Docker Image containing UDF script with additional dependencies.
custom_feature_args: Feature args required by the UDF at the run time.
mode: spark_sql to execute sql logic and spark_function to execute functional logic.
streaming_feature_obj = StreamingFeature(
name="employee_salary_l3m",
description="Total Employee Salary Sum in the Last 3 months",
data_type=DataType.FLOAT,
data_sources=["treasury_data"],
owners=["custom-user@domain.ai"],
feature_logic=CustomFeatureLogic(
custom_class_name="SalaryAggregation",
custom_file_path="/my/custom/path/custom_udf_spark_sql.py",
custom_docker_image="custom-repo/custom-image:v0.0.1-beta",
mode = "spark_sql",
custom_args={
"timestamp_field": "timestamp",
"groupby_keys": ["id", "timestamp"]
},
),
processing_engine=ProcessingEngine.PYSPARK_K8S,
processing_engine_configs=ProcessingEngineConfigs(spark_streaming_flag=True),
online_sink=["salary_agg_sink"],
online=True,
offline=False,
)