Canso - ML Platform
  • πŸ‘‹Introduction
  • πŸ›οΈCanso Architecture
  • πŸ’»Getting Started
    • 🏁Overview
    • 🌌Provison K8s Clusters
    • 🚒Install Canso Helm Charts
    • πŸπŸ”— Canso Python Client & Web App
    • πŸ“ŠHealth Metrics for Features in the Data Plane
  • πŸ’‘Feature Store
    • Data Sources
      • Data Spans
    • Data Sinks
    • ML Features
      • Raw ML Batch Feature
      • Derived ML Batch Feature
      • Raw ML Streaming Feature
      • Custom User Defined Function
  • πŸ’‘AI Agents
    • Introduction
    • Getting Started
    • Quickstart
    • Use Cases
      • Fraud Analyst Agent
      • Agent with Memory
      • Memory command examples
    • Concepts
      • Task Server
      • Broker
      • Checkpoint DB
      • Conversation History
      • Memory
    • How Tos
      • Update the AI Agent
      • Delete the AI Agent
    • Toolkit
      • SQL Runner
      • Kubernetes Job
      • Text-to-SQL
    • API Documentation
      • Agent
      • Memory
  • πŸ’‘Risk
    • Overview
    • Workflows and Rules
    • Real Time Transaction Monitoring
    • API Documentation
  • πŸ’‘Fraud Investigation
    • API Documentation
  • πŸ“Guides
    • Registry
    • Dry Runs for Batch ML Features
    • Deployment
Powered by GitBook
On this page
  • How It Works
  • Write Custom UDF Script
  • Base Class for UDFs
  • Modes of Operation
  • PySpark SQL Mode
  • Example: Simple Groupby Logic
  • Example: Complex Set of Operations
  • PySpark Functional Mode
  • Example: Simple Groupby Logic
  • Example: Complex Set of DataFrame Operations
  • Build Custom Docker Image
  • Define Python Client

Was this helpful?

  1. πŸ’‘Feature Store
  2. ML Features

Custom User Defined Function

PreviousRaw ML Streaming FeatureNextIntroduction

Last updated 6 months ago

Was this helpful?

Often, you may want to implement a feature logic that is not supported by the Aggregations and Transformations that Canso supports out of the box. In such cases, We support custom UDFs that offers greater flexibility, allowing you to apply complex, customized operations to your data, ensuring a more tailored and dynamic processing experience.

How It Works

  • Implement your by developing a Python script that follows the specified UDF contract.

  • Build a using the base Docker image provided by the canso platform. You can include your UDF script and any additional dependencies required at runtime.

  • Supply the alias for the image pull secrets in to ensure that your custom Docker image is accessible during feature execution.

  • Use a to define the necessary arguments for your custom UDF, register it, and deploy it.

  • Detailed instructions for each step are provided below.

Currently, custom Docker images can only be pulled from DockerHub repositories, such that these images run in the Canso data plane. Image pull secrets are not supported yet. Support for private container registries (such as Amazon ECR, Google Container Registry, Azure Container Registry) and image pull secrets will be added in an upcoming release very soon.

Write Custom UDF Script

Base Class for UDFs

To implement a UDF, you must include this base class in your script. The apply method is where you define the custom logic, and custom_args allows for the dynamic passing of parameters required during feature execution.

from abc import ABC, abstractmethod

class CustomOperationHandler(ABC):
    def __init__(self, custom_args):
        self.custom_args = custom_args

    @abstractmethod
    def apply(self):
        pass

Modes of Operation

Canso Platform supports two modes for processing custom UDFs

  • PySpark SQL Mode: Allows implementation of custom UDFs using Spark SQL approach.

  • PySpark Functional Mode: Allows implementation of custom UDFs using Spark Functional programming approach.

These modes allow you to implement simple or complex UDFs based on your requirements.

PySpark SQL Mode

  • This mode is suitable for executing SQL-based transformations directly in Spark SQL.

  • In this mode, the DataFrame created from a registered data source is treated as a temporary table in PySpark. You can refer to the data source name directly in your custom UDF.

Example: Simple Groupby Logic

  • UDF containing simple logics doesn't need custom_args.

  • For instance, employee_salary_l3m is a feature name, and treasury_data is a registered data source.

class SalaryAggregation(CustomOperationHandler):
    def apply(self):
        sql_query = f"""
        SELECT 
          id,
          timestamp,
          SUM(salary) AS employee_salary_l3m
        FROM 
            treasury_data
        GROUP BY 
            id, 
            timestamp
        """
        return sql_query

Example: Complex Set of Operations

In more complex cases, dynamic values can be retrieved from custom_args to customize the query logic.

Example containing a set of SQL operations
class ComplexSalaryAggregation(CustomOperationHandler):
    def apply(self):
        # Retrieve parameters from custom_args
        feature_name = self.custom_args.get("feature_name")
        table_name = self.custom_args.get("data_source")
        time_periods = self.custom_args.get("time_periods")
        weights = self.custom_args.get("weights")
        salary_increase_bonus = self.custom_args.get("salary_increase_bonus")
        salary_decrease_penalty = self.custom_args.get("salary_decrease_penalty")
        top_rank_bonus = self.custom_args.get("top_rank_bonus")
        top_rank_threshold = self.custom_args.get("top_rank_threshold")

        time_period_clauses = [
            f"AVG(salary) OVER (PARTITION BY id ORDER BY timestamp ROWS BETWEEN {period-1} PRECEDING AND CURRENT ROW) AS avg_salary_l{period}m"
            for period in time_periods
        ]
        
        weighted_avg_clauses = [
            f"COALESCE(avg_salary_l{period}m, 0) * {weight}"
            for period, weight in zip(time_periods, weights)
        ]

        sql_query = f"""
        WITH salary_stats AS (
            SELECT 
                id,
                timestamp,
                salary,
                {', '.join(time_period_clauses)},
                LAG(salary) OVER (PARTITION BY id ORDER BY timestamp) AS prev_salary,
                RANK() OVER (PARTITION BY id ORDER BY salary DESC) AS salary_rank
            FROM 
                {table_name}
        ),
        salary_changes AS (
            SELECT 
                *,
                CASE 
                    WHEN salary > prev_salary THEN 1 
                    WHEN salary < prev_salary THEN -1 
                    ELSE 0 
                END AS salary_change_direction,
                (salary - prev_salary) / NULLIF(prev_salary, 0) * 100 AS salary_change_percentage
            FROM 
                salary_stats
        )
        SELECT 
            id,
            timestamp,
            salary AS current_salary,
            {', '.join(f'avg_salary_l{period}m' for period in time_periods)},
            salary_rank,
            salary_change_direction,
            salary_change_percentage,
            (
                {' + '.join(weighted_avg_clauses)} +
                CASE 
                    WHEN salary_change_direction = 1 THEN {salary_increase_bonus}
                    WHEN salary_change_direction = -1 THEN {salary_decrease_penalty}
                    ELSE 0 
                END +
                CASE 
                    WHEN salary_rank = 1 THEN {top_rank_bonus}
                    WHEN salary_rank <= {top_rank_threshold} THEN {top_rank_bonus // 2}
                    ELSE 0 
                END
            ) AS {feature_name}
        FROM 
            salary_changes
        ORDER BY 
            id, timestamp
        """
        
        return sql_query

PySpark Functional Mode

  • This mode uses PySpark's DataFrame API for operations like joins, groupBy, and windowing.

  • The DataFrame created from a registered data source is passed through custom_feature_kwargs, allowing the custom logic to use it dynamically.

Example: Simple Groupby Logic

This simple functional operation joins two DataFrames. The values are hardcoded in the apply method.

from pyspark.sql import functions as F

class SalaryAggregation(CustomOperationHandler):
    def apply(self):
        data_source_df = self.custom_args.get("clicks_data")
        feature_name = self.custom_args.get("feature_name")

        result_df = (data_source_df
            .groupBy("id", "timestamp")
            .agg(F.sum("salary").alias(feature_name))
        )
        return result_df

Example: Complex Set of DataFrame Operations

For more advanced use cases, custom_args can be used to dynamically adjust the operation parameters. The logic can be modular by defining helper methods.

Example containing complex functional operations
from pyspark.sql import functions as F
from pyspark.sql.window import Window

class SalaryAggregation(CustomOperationHandler):
    def apply(self):
        self.data_source_df = self.custom_args.get("custom_udf_data_source_v1")
        self.additional_data_df = self.custom_args.get("additional_data_source")
        self.feature_name = self.custom_args.get("feature_name")

        self._create_window_specs()
        joined_df = self._join_data()
        processed_df = self._process_df(joined_df)
        result_df = self._create_result_df(processed_df)

        return result_df

    def _create_window_specs(self):
        self.window_6m = Window.partitionBy("id").orderBy("timestamp").rangeBetween(-180, 0)
        self.window_12m = Window.partitionBy("id").orderBy("timestamp").rangeBetween(-365, 0)

    def _join_data(self):
        return (self.data_source_df
            .join(self.additional_data_df, on="id", how="left")
            .withColumnRenamed("credit_score", "external_credit_score")
        )

    def _process_df(self, joined_df):
        return (joined_df
            .withColumn("salary_6m_avg", F.avg("salary").over(self.window_6m))
            .withColumn("salary_12m_avg", F.avg("salary").over(self.window_12m))
            .withColumn("expenses_6m_sum", F.sum("expenses").over(self.window_6m))
            .withColumn("salary_expense_ratio", F.col("salary") / F.col("expenses"))
            .withColumn("prev_salary", F.lag("salary").over(Window.partitionBy("id").orderBy("timestamp")))
            .withColumn("salary_increase", F.when(F.col("salary") > F.col("prev_salary"), 1).otherwise(0))
            .withColumn("credit_utilization", F.col("credit_used") / F.col("credit_limit"))
            .withColumn("risk_score", 
                F.when(F.col("salary_12m_avg") > 50000, 10)
                .when(F.col("salary_6m_avg") > 40000, 8)
                .when(F.col("credit_utilization") < 0.3, 5)
                .when(F.col("salary_increase") == 1, 3)
                .otherwise(0))
        )

    def _create_result_df(self, processed_df):
        return (processed_df
            .groupBy("id")
            .agg(
                (F.avg("salary_6m_avg") * 0.25 +
                 F.avg("salary_12m_avg") * 0.15 +
                 F.sum("expenses_6m_sum") * -0.1 +
                 F.avg("salary_expense_ratio") * 0.15 +
                 F.sum("salary_increase") * 0.1 +
                 F.avg("credit_utilization") *

Build Custom Docker Image

  • Example Dockerfile to demonstrates how to add custom Python dependencies and UDF scripts to the base image.

  • Base image shaktimaanbot/canso-jobs:v0.0.1-beta containing basic set of python libraries will be provided.

# Set the base image to use for the Docker image being built
FROM --platform=linux/amd64 shaktimaanbot/canso-jobs:v0.0.1-beta

# Set the working directory inside container
WORKDIR /opt/spark/work-dir/bob

# Install additional set of required Python libraries
COPY extra_pip_dependencies.txt .
RUN pip3 install --no-cache-dir -r extra_pip_dependencies.txt && \
    rm -rf /root/.cache/pip

# Copy specified set of scripts and modules from the host to current working directory
COPY /my/local/path/to/custom_udf.py /opt/spark/work-dir/bob/src/v2/external_udfs/features/my/custom/path/

WORKDIR /opt/spark/work-dir/bob/src/v2/jobs

# Change the permission of streaming jobs to make them executable within the container
RUN chmod 777 feature_materializer.py

Define Python Client

  • This is how users register their custom UDF through the Python client.

  • User need to specify these things specific to custom UDF:

    • custom_class_name: Executable class containing logic.

    • custom_file_path: Additional path supplied in the dockerfile which contains UDF.

    • docker_image: Custom Docker Image containing UDF script with additional dependencies.

    • custom_feature_args: Feature args required by the UDF at the run time.

    • mode: spark_sql to execute sql logic and spark_function to execute functional logic.

streaming_feature_obj = StreamingFeature(
    name="employee_salary_l3m",
    description="Total Employee Salary Sum in the Last 3 months",
    data_type=DataType.FLOAT,
    data_sources=["treasury_data"],
    owners=["custom-user@domain.ai"],
    feature_logic=CustomFeatureLogic(
        custom_class_name="SalaryAggregation",
        custom_file_path="/my/custom/path/custom_udf_spark_sql.py",
        custom_docker_image="custom-repo/custom-image:v0.0.1-beta",
        mode = "spark_sql",
        custom_args={
            "timestamp_field": "timestamp",
            "groupby_keys": ["id", "timestamp"]
        },
    ),
    processing_engine=ProcessingEngine.PYSPARK_K8S,
    processing_engine_configs=ProcessingEngineConfigs(spark_streaming_flag=True),
    online_sink=["salary_agg_sink"],
    online=True,
    offline=False,
)
custom feature logic
custom docker image
processing_engine_configs
Python client