# Raw ML Streaming Feature

## Streaming Feature

Streaming Features enable real-time data processing by reading live data from Kafka topics. These features perform predefined or custom logic on the live data and store the results in specified data sinks.

### Introduction

Streaming features operate on live data, making them crucial for real-time machine learning applications. It provides:

* They allow for immediate processing and analysis of data as it arrives, enabling real-time predictions and insights.
* Streaming features are especially important in scenarios where timely data processing is critical, such as fraud detection, recommendation systems, and dynamic pricing models.
* By continuously updating features based on the latest data, streaming features ensure that machine learning models are always working with the most current information.
* In this context, sliding window operations and custom user-defined functions (UDFs) are supported to provide flexible and powerful data processing capabilities.

## Overview of Streaming Aggregations and Transformations

Streaming features enable dynamic real-time data processing by applying a variety of aggregations and transformations on live data streams. These operations are critical for creating powerful, real-time machine learning features.

### Supported Operations

| Operation Type      | Description                                                                             | Use Case                                 | Example                                                 |
| ------------------- | --------------------------------------------------------------------------------------- | ---------------------------------------- | ------------------------------------------------------- |
| Window Aggregations | Compute metrics over defined time intervals using sliding, tumbling, or session windows | Time-series analysis, Event monitoring   | Calculate average transaction value per 5-minute window |
| Filter              | Selectively process events based on specified conditions                                | Data cleaning, Event filtering           | Filter out transactions below threshold value           |
| Map                 | One-to-one transformation of individual events                                          | Data normalization, Format conversion    | Convert temperature from Celsius to Fahrenheit          |
| FlatMap             | One-to-many transformation breaking down events into multiple outputs                   | Event decomposition, Data expansion      | Split compound events into individual components        |
| Count               | Calculate event occurrence frequency                                                    | Event frequency analysis, Usage metrics  | Count user interactions per session                     |
| Repartition         | Redistribute data across specified number of partitions                                 | Performance optimization, Load balancing | Rebalance data across processing nodes                  |
| SelectExpr          | SQL-style column transformations and filtering                                          | Column manipulation, Data projection     | Extract specific fields using SQL expressions           |

### Getting Started

1. Choose the appropriate transformation type based on your use case
2. Configure the transformation parameters
3. Define your StreamingFeature with the chosen transformation
4. Register and deploy your feature using the YugenClient

### Raw Feature Types

#### Feature with Predefined Logic

Raw Features with predefined logic utilize built-in transformations and aggregations for ease of use and consistency. Features created using common aggregations like window or sliding window and transformations such as SUM, MIN, MAX, etc.

#### Feature with Custom UDF

Features created using user-defined functions for more complex and specific transformations. Raw Features with custom UDFs allow for more flexibility and can handle complex transformations not covered by predefined logic.

#### Streaming Feature Attributes

| Attribute                   | Description                                                                     | Example                                     |
| --------------------------- | ------------------------------------------------------------------------------- | ------------------------------------------- |
| `name`                      | Unique name of the Kafka feature                                                | `real_time_click_rate`                      |
| `description`               | Description of what the streaming feature contains                              | `Real-time click rate of users`             |
| `data_type`                 | Data type of the Kafka feature                                                  | `FLOAT`                                     |
| `data_sources`              | List of Kafka topics from which data is read                                    | `['clicks_topic']`                          |
| `staging_sink`              | Data sink for staging the processed data                                        | `real_time_clicks_processed_data_s3_bucket` |
| `owners`                    | List of team members or teams responsible for the feature                       | `['data_team@company.com']`                 |
| `feature_logic`             | Transformation logic applied to the live data                                   | `SlidingWindowAggregation`                  |
| `processing_engine`         | Engine used for processing the feature logic                                    | `pyspark`                                   |
| `processing_engine_configs` | Configuration options for the processing engine                                 | `{'parallelism': 4}`                        |
| `online`                    | Boolean flag indicating if the feature should be available for online retrieval | `True`                                      |
| `offline`                   | Boolean flag indicating if the feature should be available for offline analysis | `True`                                      |

#### Special notes on attributes

* `processing_engine & their configs`: These are the [default set of PySpark streaming configurations](https://github.com/Yugen-ai/gru/blob/main/gru/config/features/default_processing_engine_configs_streaming.yaml#L1-L20) used to run the Derived Feature.
* `checkpoint_path`: Users can specify a `checkpoint_path` as part of the processing engine configuration to enable checkpointing for stateful recovery in Spark Streaming jobs. If no `checkpoint_path` is provided, the streaming job will run without checkpointing. Users are encouraged to provide a checkpoint path to ensure fault tolerance and state recovery.
  * For more details, refer to Spark's official documentation on [Checkpointing](https://spark.apache.org/docs/3.5.3/streaming-programming-guide.html#checkpointing).
* `online`: If the online flag is enabled, the data will be ingested into the online sink (i.e., Redis cache).
* `offline`: If the offline flag is enabled, the Raw Feature will ingest the data into the offline sink (i.e., S3 sink).
* `Read option configs`: Users can provide some configurations at the time of feature registration. Currently, this option is not enabled, but we will be adding support for them soon.
* `staging_sink_write_option_configs` and `online_sink_write_option_configs`: These are optional configurations that provide flexibility in controlling how data is written to sinks. Currently, this option is not enabled, but we will be adding support for them soon.

### Session Windows

Session windows group data based on periods of activity separated by gaps of inactivity. Unlike fixed-time windows, session windows have dynamic lengths that expand as new events arrive and close when no events occur within a specified timeout period.

#### When to Use Session Windows

Session windows are perfect for analyzing continuous sequences of events that should be grouped together until there's a significant gap in activity. Consider these metrics:

* Time spent per active session
* Events processed per session
* Session-based conversion rates

These metrics are valuable in scenarios like user website interactions, where each user action extends the session timeout. If no new actions occur within the timeout period (e.g., 30 minutes), the session closes and a new one begins with the next action.

For more details on different types of time windows in Spark Structured Streaming, see the [official documentation](https://spark.apache.org/docs/3.5.3/structured-streaming-programming-guide.html#types-of-time-windows).

#### Example Object of Streaming Feature

* [Feature with Predefined Logic](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature.py#L13-L43)
* [Feature with Custom UDF having pyspark functional logic](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_custom_feature_spark_function.py#L10-L31)
* [Feature with Custom UDF having pyspark sql logic](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_custom_feature_spark_sql.py#L10-L31)
* [Real-time Maximum Salary Tracker: StreamingFeature with Window Aggregation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_window_max_aggregation.py#L12-L42)
* [Real-time Median Salary Tracker: StreamingFeature with Window Aggregation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_window_median_aggregation.py#L12-L42)
* [Real-time Distinct Count Salary Tracker: StreamingFeature with Window Aggregation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_window_agg_approx_count_distinct.py#L12-L42)
* [StreamingFeature with Filter Transformation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_filter_transformation.py#L16-L41)
* [StreamingFeature with Map Transformation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_map_transformation.py#L16-L39)
* [StreamingFeature with FlatMap Transformation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_flatmap_transformation.py#L16-L39)
* [StreamingFeature with Session Window Aggregation](https://github.com/Yugen-ai/gru/blob/main/gru/examples/kafka_feature_session_window_aggregation.py#L16-L42)

#### Working with Kafka Features

* Kafka Features are defined and registered similarly to batch features but are continuously updated as new data arrives.
* This real-time processing allows for immediate updates to features, which can then be used for live model predictions or analytics.

#### Tool Tips

* Go to [Top](#top) ⬆️
* Go back to [README.md](/...md) ⬅️
* Go back to [data-sources.md](/feature-store/data-sources.md) ⬅️
* Go back to [data-sinks.md](/feature-store/data-sinks.md) ⬅️
* Move forward to see [register-feature.md](/guides/register-feature.md) ➡️


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.canso.ai/feature-store/features/streaming-feature.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
