> For the complete documentation index, see [llms.txt](https://docs.canso.ai/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.canso.ai/feature-store/features/custom-udf.md).

# Custom User Defined Function

Often, you may want to implement a feature logic that is not supported by the Aggregations and Transformations that Canso supports out of the box. In such cases, We support custom UDFs that offers greater flexibility, allowing you to apply complex, customized operations to your data, ensuring a more tailored and dynamic processing experience.

## How It Works

* Implement your [custom feature logic](#write-custom-udf-script) by developing a Python script that follows the specified UDF contract.
* Build a [custom docker image](#build-custom-docker-image) using the base Docker image provided by the canso platform. You can include your UDF script and any additional dependencies required at runtime.
* Supply the alias for the image pull secrets in [processing\_engine\_configs](https://github.com/Yugen-ai/gru/blob/125ecd57355fc05237106c267386c1716d6edddf/gru/config/features/default_processing_engine_configs_streaming.yaml) to ensure that your custom Docker image is accessible during feature execution.
* Use a [Python client](#define-python-client) to define the necessary arguments for your custom UDF, register it, and deploy it.
* Detailed instructions for each step are provided below.

{% hint style="warning" %}
Currently, custom Docker images can only be pulled from DockerHub repositories, such that these images run in the Canso data plane. Image pull secrets are not supported yet. Support for private container registries (such as Amazon ECR, Google Container Registry, Azure Container Registry) and image pull secrets will be added in an upcoming release very soon.
{% endhint %}

## Write Custom UDF Script

### Base Class for UDFs

To implement a UDF, you must include this base class in your script. The `apply` method is where you define the custom logic, and `custom_args` allows for the dynamic passing of parameters required during feature execution.

```python
from abc import ABC, abstractmethod

class CustomOperationHandler(ABC):
    def __init__(self, custom_args):
        self.custom_args = custom_args

    @abstractmethod
    def apply(self):
        pass
```

### Modes of Operation

Canso Platform supports two modes for processing custom UDFs

* **PySpark SQL Mode**: Allows implementation of custom UDFs using `Spark SQL` approach.
* **PySpark Functional Mode**: Allows implementation of custom UDFs using `Spark Functional` programming approach.

These modes allow you to implement simple or complex UDFs based on your requirements.

### PySpark SQL Mode

* This mode is suitable for executing SQL-based transformations directly in Spark SQL.
* In this mode, the DataFrame created from a registered data source is treated as a temporary table in PySpark. You can refer to the data source name directly in your custom UDF.

### Example: Simple Groupby Logic

* UDF containing simple logics doesn't need `custom_args`.
* For instance, `employee_salary_l3m` is a feature name, and `treasury_data` is a registered data source.

```python
class SalaryAggregation(CustomOperationHandler):
    def apply(self):
        sql_query = f"""
        SELECT 
          id,
          timestamp,
          SUM(salary) AS employee_salary_l3m
        FROM 
            treasury_data
        GROUP BY 
            id, 
            timestamp
        """
        return sql_query
```

### Example: Complex Set of Operations

In more complex cases, dynamic values can be retrieved from `custom_args` to customize the query logic.

<details>

<summary>Example containing a set of SQL operations</summary>

```python
class ComplexSalaryAggregation(CustomOperationHandler):
    def apply(self):
        # Retrieve parameters from custom_args
        feature_name = self.custom_args.get("feature_name")
        table_name = self.custom_args.get("data_source")
        time_periods = self.custom_args.get("time_periods")
        weights = self.custom_args.get("weights")
        salary_increase_bonus = self.custom_args.get("salary_increase_bonus")
        salary_decrease_penalty = self.custom_args.get("salary_decrease_penalty")
        top_rank_bonus = self.custom_args.get("top_rank_bonus")
        top_rank_threshold = self.custom_args.get("top_rank_threshold")

        time_period_clauses = [
            f"AVG(salary) OVER (PARTITION BY id ORDER BY timestamp ROWS BETWEEN {period-1} PRECEDING AND CURRENT ROW) AS avg_salary_l{period}m"
            for period in time_periods
        ]
        
        weighted_avg_clauses = [
            f"COALESCE(avg_salary_l{period}m, 0) * {weight}"
            for period, weight in zip(time_periods, weights)
        ]

        sql_query = f"""
        WITH salary_stats AS (
            SELECT 
                id,
                timestamp,
                salary,
                {', '.join(time_period_clauses)},
                LAG(salary) OVER (PARTITION BY id ORDER BY timestamp) AS prev_salary,
                RANK() OVER (PARTITION BY id ORDER BY salary DESC) AS salary_rank
            FROM 
                {table_name}
        ),
        salary_changes AS (
            SELECT 
                *,
                CASE 
                    WHEN salary > prev_salary THEN 1 
                    WHEN salary < prev_salary THEN -1 
                    ELSE 0 
                END AS salary_change_direction,
                (salary - prev_salary) / NULLIF(prev_salary, 0) * 100 AS salary_change_percentage
            FROM 
                salary_stats
        )
        SELECT 
            id,
            timestamp,
            salary AS current_salary,
            {', '.join(f'avg_salary_l{period}m' for period in time_periods)},
            salary_rank,
            salary_change_direction,
            salary_change_percentage,
            (
                {' + '.join(weighted_avg_clauses)} +
                CASE 
                    WHEN salary_change_direction = 1 THEN {salary_increase_bonus}
                    WHEN salary_change_direction = -1 THEN {salary_decrease_penalty}
                    ELSE 0 
                END +
                CASE 
                    WHEN salary_rank = 1 THEN {top_rank_bonus}
                    WHEN salary_rank <= {top_rank_threshold} THEN {top_rank_bonus // 2}
                    ELSE 0 
                END
            ) AS {feature_name}
        FROM 
            salary_changes
        ORDER BY 
            id, timestamp
        """
        
        return sql_query
```

</details>

### PySpark Functional Mode

* This mode uses PySpark's DataFrame API for operations like joins, groupBy, and windowing.
* The DataFrame created from a registered data source is passed through `custom_feature_kwargs`, allowing the custom logic to use it dynamically.

### Example: Simple Groupby Logic

This simple functional operation joins two DataFrames. The values are hardcoded in the `apply` method.

```python
from pyspark.sql import functions as F

class SalaryAggregation(CustomOperationHandler):
    def apply(self):
        data_source_df = self.custom_args.get("clicks_data")
        feature_name = self.custom_args.get("feature_name")

        result_df = (data_source_df
            .groupBy("id", "timestamp")
            .agg(F.sum("salary").alias(feature_name))
        )
        return result_df
```

### Example: Complex Set of DataFrame Operations

For more advanced use cases, `custom_args` can be used to dynamically adjust the operation parameters. The logic can be modular by defining helper methods.

<details>

<summary>Example containing complex functional operations</summary>

```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

class SalaryAggregation(CustomOperationHandler):
    def apply(self):
        self.data_source_df = self.custom_args.get("custom_udf_data_source_v1")
        self.additional_data_df = self.custom_args.get("additional_data_source")
        self.feature_name = self.custom_args.get("feature_name")

        self._create_window_specs()
        joined_df = self._join_data()
        processed_df = self._process_df(joined_df)
        result_df = self._create_result_df(processed_df)

        return result_df

    def _create_window_specs(self):
        self.window_6m = Window.partitionBy("id").orderBy("timestamp").rangeBetween(-180, 0)
        self.window_12m = Window.partitionBy("id").orderBy("timestamp").rangeBetween(-365, 0)

    def _join_data(self):
        return (self.data_source_df
            .join(self.additional_data_df, on="id", how="left")
            .withColumnRenamed("credit_score", "external_credit_score")
        )

    def _process_df(self, joined_df):
        return (joined_df
            .withColumn("salary_6m_avg", F.avg("salary").over(self.window_6m))
            .withColumn("salary_12m_avg", F.avg("salary").over(self.window_12m))
            .withColumn("expenses_6m_sum", F.sum("expenses").over(self.window_6m))
            .withColumn("salary_expense_ratio", F.col("salary") / F.col("expenses"))
            .withColumn("prev_salary", F.lag("salary").over(Window.partitionBy("id").orderBy("timestamp")))
            .withColumn("salary_increase", F.when(F.col("salary") > F.col("prev_salary"), 1).otherwise(0))
            .withColumn("credit_utilization", F.col("credit_used") / F.col("credit_limit"))
            .withColumn("risk_score", 
                F.when(F.col("salary_12m_avg") > 50000, 10)
                .when(F.col("salary_6m_avg") > 40000, 8)
                .when(F.col("credit_utilization") < 0.3, 5)
                .when(F.col("salary_increase") == 1, 3)
                .otherwise(0))
        )

    def _create_result_df(self, processed_df):
        return (processed_df
            .groupBy("id")
            .agg(
                (F.avg("salary_6m_avg") * 0.25 +
                 F.avg("salary_12m_avg") * 0.15 +
                 F.sum("expenses_6m_sum") * -0.1 +
                 F.avg("salary_expense_ratio") * 0.15 +
                 F.sum("salary_increase") * 0.1 +
                 F.avg("credit_utilization") *
```

</details>

## Build Custom Docker Image

* Example Dockerfile to demonstrates how to add custom Python dependencies and UDF scripts to the base image.
* Base image `shaktimaanbot/canso-jobs:v0.0.1-beta` containing basic set of python libraries will be provided.

```dockerfile
# Set the base image to use for the Docker image being built
FROM --platform=linux/amd64 shaktimaanbot/canso-jobs:v0.0.1-beta

# Set the working directory inside container
WORKDIR /opt/spark/work-dir/bob

# Install additional set of required Python libraries
COPY extra_pip_dependencies.txt .
RUN pip3 install --no-cache-dir -r extra_pip_dependencies.txt && \
    rm -rf /root/.cache/pip

# Copy specified set of scripts and modules from the host to current working directory
COPY /my/local/path/to/custom_udf.py /opt/spark/work-dir/bob/src/v2/external_udfs/features/my/custom/path/

WORKDIR /opt/spark/work-dir/bob/src/v2/jobs

# Change the permission of streaming jobs to make them executable within the container
RUN chmod 777 feature_materializer.py
```

## Define Python Client

* This is how users register their custom UDF through the Python client.
* User need to specify these things specific to custom UDF:
  * `custom_class_name`: Executable class containing logic.
  * `custom_file_path`: Additional path supplied in the dockerfile which contains UDF.
  * `docker_image`: Custom Docker Image containing UDF script with additional dependencies.
  * `custom_feature_args`: Feature args required by the UDF at the run time.
  * `mode`: `spark_sql` to execute sql logic and `spark_function` to execute functional logic.

```python
streaming_feature_obj = StreamingFeature(
    name="employee_salary_l3m",
    description="Total Employee Salary Sum in the Last 3 months",
    data_type=DataType.FLOAT,
    data_sources=["treasury_data"],
    owners=["custom-user@domain.ai"],
    feature_logic=CustomFeatureLogic(
        custom_class_name="SalaryAggregation",
        custom_file_path="/my/custom/path/custom_udf_spark_sql.py",
        custom_docker_image="custom-repo/custom-image:v0.0.1-beta",
        mode = "spark_sql",
        custom_args={
            "timestamp_field": "timestamp",
            "groupby_keys": ["id", "timestamp"]
        },
    ),
    processing_engine=ProcessingEngine.PYSPARK_K8S,
    processing_engine_configs=ProcessingEngineConfigs(spark_streaming_flag=True),
    online_sink=["salary_agg_sink"],
    online=True,
    offline=False,
)
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://docs.canso.ai/feature-store/features/custom-udf.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
