# Data Sources

A Data Source in Canso is a reference to raw data that is either generated and owned by users or defined by users and owned by Canso's Pre-processing pipelines. Currently, Canso supports tabular data.

## Introduction

Data Sources provide an abstraction over datasets owned by users, enabling:

* A standardized way of declaring raw data for feature and pre-processing table calculations.
* A uniform user experience when defining features, allowing Data Scientists to focus on feature logic without worrying about underlying data, DB connections, or access.
* Reusability in defining and materializing features and pre-processing jobs.
* Improved understanding of raw data.

Data Sources can be of 2 types at the very least

1. Batch Data Sources
2. Streaming Data Sources

## Data Source Types

### Batch Data Sources

Batch Data Sources are typically Data Warehouses (BigQuery, Redshift, etc.) or Object Storages (S3, GCS, etc.). In Canso currently we are supporting S3 Data Source only.

### Batch Data Source Attributes (S3 & GCS)

S3 and GCS Batch Data Sources, can be described by the following attributes. These attributes define how data is stored and accessed in object storage, allowing for standardized and reusable data processing.

| Attribute                   | Description                                                      | Example                                    |
| --------------------------- | ---------------------------------------------------------------- | ------------------------------------------ |
| `data_source_name`          | Unique Name of the datasource                                    | `raw_us_orders`                            |
| `bucket`                    | Bucket Name                                                      | `mycompany_data`                           |
| `base_key`                  | Fixed Key Component                                              | `raw_txns/orders/us`                       |
| `varying_key_suffix_format` | Varying Time-based Key Component suffixed to Fixed Key Component | `%Y-%m-%d/%H` `%Y-%m-%d/%H-%M`             |
| `varying_key_suffix_freq`   | Frequency of `varying_key_suffix_format`                         | `30min` `3H` `12H` `1D`                    |
| `time_offset`               | Optional time offset in seconds                                  | `3600`                                     |
| `file_type`                 | File Type                                                        | `CSV`, `PARQUET`                           |
| `description`               | Description of what the data source contains                     | Daily Raw orders placed by users in the US |
| `owner`                     | Team that owns the data source                                   | `['data_engg@yugen.ai', 'sales@yugen.ai']` |
| `schema_path`               | Path where PySpark and BQ Schemas of the data are persisted      | -                                          |
| `event_timestamp_field`     | Column indicating the event time                                 | `ordered_at`                               |
| `event_timestamp_format`    | Format of the `event_timestamp_field`                            | PySpark supported data/time formats        |
| `created_at`                | Time when the Data Source was created                            | `datetime(2021, 1, 1, 0, 0, 0)`            |

The path/key to data files for a data source are obtained using

* `bucket`
* `base_key`
* `varying_key_suffix_format`
* `varying_key_suffix_freq`
* `time_offset`

Canso uses the concept of Data Spans, which supports various ways in which a user's data in stored in Object Storage. To understand more about how Data Spans are used, see [Data Spans](/feature-store/data-sources/dataspans.md)

### Streaming Data Sources

Canso supports Kafka as a data source. This can be used for real-time data processing and feature generation. Kafka data sources enable:

* Real-time data ingestion.
* Stream processing for immediate feature extraction.
* Continuous updates to machine learning models based on live data streams.

### Streaming Data Source Attributes (Kafka)

| Attribute          | Description                                            | Example                                                                                                                                                                                                                                    |
| ------------------ | ------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `name`             | Unique Name of the streaming datasource                | `user_activity_stream`                                                                                                                                                                                                                     |
| `description`      | Description of what the streaming data source contains | Real-time user activity data from the app                                                                                                                                                                                                  |
| `owners`           | Tenants that owns the data source                      | `['data_engg@yugen.ai', 'app_analytics@yugen.ai']]`                                                                                                                                                                                        |
| `topic`            | Kafka topic from which the data is read                | `user_activity_topic`                                                                                                                                                                                                                      |
| `schema`           | Schema definition of the streaming data                | `{"user_id": "STRING", "activity_type": "STRING", "timestamp": "TIMESTAMP"}`                                                                                                                                                               |
| `timestamp_field`  | Field indicating the event time                        | `timestamp`                                                                                                                                                                                                                                |
| `timestamp_format` | Format of the `timestamp_field`                        | `yyyy-MM-dd HH:mm:ssXXX`                                                                                                                                                                                                                   |
| `bootstrap_server` | Kafka bootstrap server                                 | 12.345.678.910:9092                                                                                                                                                                                                                        |
| `read_configs`     | Configuration settings for reading from Kafka          | `{"datasource": "streaming_data_source", "watermark_delay_threshold": "10 seconds", "starting_timestamp": None, "starting_offsets_by_timestamp": {}, "starting_offsets": "earliest", "fail_on_data_loss": False, "include_headers": True}` |
| `cloud_provider`   | Cloud provider where Kafka is hosted                   | `AWS`                                                                                                                                                                                                                                      |

### Special notes on attributes

* `read_configs`: The standard set of attributes we support when registering a data source. Users can provide some configurations at the time of feature registration. Currently, this option is not enabled, but we will be adding support for them soon.

## Working with Data Sources

Once a DataSource has been defined, it can be

1. Registered for re-usability across different teams
2. Referenced to create ML features

### Example objects of Data Sources

* [S3 Data Source](https://github.com/Yugen-ai/gru/blob/c01d1f124605d927bc45312cf86fc3c232fc680a/gru/examples/s3_data_source.py#L32-L48)
* [Kafka Data Source](https://github.com/Yugen-ai/gru/blob/c01d1f124605d927bc45312cf86fc3c232fc680a/gru/examples/kafka_source.py#L5-L39)

### Tool Tips

* Go to [Top](#top) ⬆️
* Go back to [README.md](/...md) ⬅️
* Move forward to see [data-sinks.md](/feature-store/data-sinks.md) ➡️


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.canso.ai/feature-store/data-sources.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
