Custom User Defined Function

Often, you may want to implement a feature logic that is not supported by the Aggregations and Transformations that Canso supports out of the box. In such cases, We support custom UDFs that offers greater flexibility, allowing you to apply complex, customized operations to your data, ensuring a more tailored and dynamic processing experience.

How It Works

  • Implement your custom feature logic by developing a Python script that follows the specified UDF contract.

  • Build a custom docker image using the base Docker image provided by the canso platform. You can include your UDF script and any additional dependencies required at runtime.

  • Supply the alias for the image pull secrets in processing_engine_configsarrow-up-right to ensure that your custom Docker image is accessible during feature execution.

  • Use a Python client to define the necessary arguments for your custom UDF, register it, and deploy it.

  • Detailed instructions for each step are provided below.

circle-exclamation

Write Custom UDF Script

Base Class for UDFs

To implement a UDF, you must include this base class in your script. The apply method is where you define the custom logic, and custom_args allows for the dynamic passing of parameters required during feature execution.

from abc import ABC, abstractmethod

class CustomOperationHandler(ABC):
    def __init__(self, custom_args):
        self.custom_args = custom_args

    @abstractmethod
    def apply(self):
        pass

Modes of Operation

Canso Platform supports two modes for processing custom UDFs

  • PySpark SQL Mode: Allows implementation of custom UDFs using Spark SQL approach.

  • PySpark Functional Mode: Allows implementation of custom UDFs using Spark Functional programming approach.

These modes allow you to implement simple or complex UDFs based on your requirements.

PySpark SQL Mode

  • This mode is suitable for executing SQL-based transformations directly in Spark SQL.

  • In this mode, the DataFrame created from a registered data source is treated as a temporary table in PySpark. You can refer to the data source name directly in your custom UDF.

Example: Simple Groupby Logic

  • UDF containing simple logics doesn't need custom_args.

  • For instance, employee_salary_l3m is a feature name, and treasury_data is a registered data source.

Example: Complex Set of Operations

In more complex cases, dynamic values can be retrieved from custom_args to customize the query logic.

chevron-rightExample containing a set of SQL operationshashtag

PySpark Functional Mode

  • This mode uses PySpark's DataFrame API for operations like joins, groupBy, and windowing.

  • The DataFrame created from a registered data source is passed through custom_feature_kwargs, allowing the custom logic to use it dynamically.

Example: Simple Groupby Logic

This simple functional operation joins two DataFrames. The values are hardcoded in the apply method.

Example: Complex Set of DataFrame Operations

For more advanced use cases, custom_args can be used to dynamically adjust the operation parameters. The logic can be modular by defining helper methods.

chevron-rightExample containing complex functional operationshashtag

Build Custom Docker Image

  • Example Dockerfile to demonstrates how to add custom Python dependencies and UDF scripts to the base image.

  • Base image shaktimaanbot/canso-jobs:v0.0.1-beta containing basic set of python libraries will be provided.

Define Python Client

  • This is how users register their custom UDF through the Python client.

  • User need to specify these things specific to custom UDF:

    • custom_class_name: Executable class containing logic.

    • custom_file_path: Additional path supplied in the dockerfile which contains UDF.

    • docker_image: Custom Docker Image containing UDF script with additional dependencies.

    • custom_feature_args: Feature args required by the UDF at the run time.

    • mode: spark_sql to execute sql logic and spark_function to execute functional logic.

Last updated