Canso - ML Platform
  • πŸ‘‹Introduction
  • πŸ›οΈCanso Architecture
  • πŸ’»Getting Started
    • 🏁Overview
    • 🌌Provison K8s Clusters
    • 🚒Install Canso Helm Charts
    • πŸπŸ”— Canso Python Client & Web App
    • πŸ“ŠHealth Metrics for Features in the Data Plane
  • πŸ’‘Feature Store
    • Data Sources
      • Data Spans
    • Data Sinks
    • ML Features
      • Raw ML Batch Feature
      • Derived ML Batch Feature
      • Raw ML Streaming Feature
      • Custom User Defined Function
  • πŸ’‘AI Agents
    • Introduction
    • Getting Started
    • Quickstart
    • Use Cases
      • Fraud Analyst Agent
      • Agent with Memory
      • Memory command examples
    • Concepts
      • Task Server
      • Broker
      • Checkpoint DB
      • Conversation History
      • Memory
    • How Tos
      • Update the AI Agent
      • Delete the AI Agent
    • Toolkit
      • SQL Runner
      • Kubernetes Job
      • Text-to-SQL
    • API Documentation
      • Agent
      • Memory
  • πŸ’‘Risk
    • Overview
    • Workflows and Rules
    • Real Time Transaction Monitoring
    • API Documentation
  • πŸ’‘Fraud Investigation
    • API Documentation
  • πŸ“Guides
    • Registry
    • Dry Runs for Batch ML Features
    • Deployment
Powered by GitBook
On this page
  • Fraud Analyst Agent using Canso AI Agentic System
  • Prerequisites
  • Creating the project bootstrap
  • Developing the agent
  • Registering and Deploying the agent
  • Interacting with the agent

Was this helpful?

  1. πŸ’‘AI Agents
  2. Use Cases

Fraud Analyst Agent

PreviousUse CasesNextAgent with Memory

Last updated 3 months ago

Was this helpful?

The Fraud Analyst Agent is an AI-powered assistant designed to support Data Scientists and Data Engineers in their daily fraud analysis tasks. It comes equipped with capabilities such as data reconciliation, analysis, and explanation, helping streamline workflows and improve efficiency. With more capabilities coming soon, the Fraud Analyst Agent aims to make fraud detection and investigation easier than ever.

Fraud Analyst Agent using Canso AI Agentic System

The Canso AI Agentic System enables fast and seamless development and deployment of a Fraud Analyst Agent.

Follow the steps below to develop, deploy and interact with a Fraud Analyst Agent.

Prerequisites

Before Proceeding, please ensure you have:

  1. A set up.

  2. installed on your cluster.

  3. Canso AI Agent Components - , and are set up on your cluster.

To get started, install Gru by following the instructions

Creating the project bootstrap

Set up the scaffold folder for our fraud-analyst project by executing the command:

gru agent create_bootstrap

This will prompt us with a set of configurations for deploying our AI Agent. Provide inputs as specified below:

  [1/7] agent_name (Agent Name): fraud-analyst
  [2/7] agent_framework (Langgraph):
  [3/7] version (0.0.1):
  [4/7] task_server_name: <your-task-server-name>
  [5/7] checkpoint_db_name: <your-checkpoint-db-name>
  [6/7] replicas (1):
  [7/7] iam_role_arn: <agent-iam-role>
Agent bootstrap project created successfully!

Once done, we get a folder fraud-analyst with the following structure:

fraud-analyst
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ README.md
β”œβ”€β”€ config.yaml
β”œβ”€β”€ .env
β”œβ”€β”€ requirements.txt
└── src
    └── main.py

Developing the agent

The requirements.txt file contains the python requirements of the projects. Let us update it with the required dependencies:

gru==0.0.1rc22.dev16
python-dotenv==1.0.1
langchain-openai==0.2.14
pandas==2.2.3
boto3==1.35.95

src/main.py serves a the entrypoint for our application. In this file, we define our AI Agent and wrap it with the CansoLangraphAgent wrapper.

import os
from typing import Annotated, TypedDict
from dotenv import load_dotenv
from io import StringIO

import boto3
from gru.agents.tools.langgraph.python_arguments import PythonArgumentsTool
from gru.agents.tools.langgraph import GitRepoContentRetriever, PythonCodeRunner, PythonRunStatusChecker
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.prebuilt import tools_condition
from langgraph.graph import END, StateGraph, START
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
from gru.agents import CansoLanggraphAgent
import pandas as pd

# This loads the environment variables from .env file. 
# It is recommended to keep this as the first statement in main.py and not to be removed.
load_dotenv()

GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")

git_content_retriever = GitRepoContentRetriever(GITHUB_TOKEN)

python_argument_tool = PythonArgumentsTool(GITHUB_TOKEN)

python_code_runner = PythonCodeRunner(GITHUB_TOKEN)

python_run_status_tool = PythonRunStatusChecker()

def data_explainer_report(df):
    """
    Generates a data explainer report for a Pandas DataFrame.
    Args:
        df (pd.DataFrame): The DataFrame to analyze.
    Returns:
        dict: A dictionary containing the data quality report.
    """
    report = {}
    report["describe_stats"] = df.describe().to_dict()
    report["missing_values"] = df.isnull().sum().to_dict()
    report["duplicates"] = df.duplicated().sum()
    report["counts"] = {
        "rows": len(df),
        "columns": len(df.columns)
    }
    report["basics"] = {
        "columns": list(df.columns)
    }
    return report

@tool
def check_data_quality(bucket: str, file_path: str) -> str:
    """ use this for data quality check. 
    Args:
        bucket : bucket in which the data is stored
        file_path: path of the file in the bucket
    """
    try:

        s3_client = boto3.client('s3')
        csv_obj = s3_client.get_object(Bucket=bucket, Key=file_path)
        df = pd.read_csv(StringIO(csv_obj['Body'].read().decode('utf-8')))

        report = data_explainer_report(df)

        response = []
        response.append(f"Data Quality Report for {file_path}:")
        response.append(f"- Total rows: {report['counts']['rows']}")
        response.append(f"- Total columns: {report['counts']['columns']}")
        response.append(f"- Number of duplicates: {report['duplicates']}")

        # Report missing values if any exist
        missing = {k: v for k, v in report['missing_values'].items() if v > 0}
        if missing:
            response.append("\nMissing values found in columns:")
            for col, count in missing.items():
                response.append(f"- {col}: {count} missing values")

        return "\n".join(response)

    except Exception as e:
        return f"Error checking data quality: {str(e)}"

safe_tools = [python_argument_tool, python_run_status_tool, check_data_quality]
sensitive_tools = [git_content_retriever, python_code_runner]

sensitive_tool_names = {tool.name for tool in sensitive_tools}

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # If the LLM happens to return an empty response, we will re-prompt it
            # for an actual response.
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

def route_tools(state: State):
    """Route to different tool nodes based on the tool being called."""
    next_node = tools_condition(state)
    # If no tools are invoked, return END
    if next_node == END:
        return END

    ai_message = state["messages"][-1]
    # Handle the first tool call (assuming single tool calls)
    first_tool_call = ai_message.tool_calls[0]

    # Route to sensitive tools if the tool name is in sensitive_tool_names
    if first_tool_call["name"] in sensitive_tool_names:
        return "sensitive_tools"
    return "safe_tools"

model = ChatOpenAI(model="gpt-4o", temperature=0)

assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant capable of performing reconciliation activity using the tools provided to you."
            "Always check the data quality and take confirmation from the user before reconciliation."
            "Reconciliation is performed by running python code from a repository."
            "Ask for necessary inputs from the user whenever necessary."
        ),
        ("placeholder", "{messages}"),
    ]
)

all_tools = safe_tools + sensitive_tools
assistant_runnable = assistant_prompt | model.bind_tools(all_tools)

graph = StateGraph(State)

graph.add_node("assistant", Assistant(assistant_runnable))
graph.add_node("safe_tools", ToolNode(safe_tools))
graph.add_node("sensitive_tools", ToolNode(sensitive_tools))

graph.add_edge(START, "assistant")
graph.add_conditional_edges(
    "assistant",
    route_tools,
    ["safe_tools", "sensitive_tools", END]
)
graph.add_edge("safe_tools", "assistant")
graph.add_edge("sensitive_tools", "assistant")

canso_agent = CansoLanggraphAgent(stateGraph=graph, interrupt_before=["sensitive_tools"])
canso_agent.run()

Add the necessary environment variables to the .env file.

GITHUB_TOKEN=<your-github-token>
OPENAI_API_KEY=<your-open-ai-api-key>

Now we build the docker image for our Agent using the provided Dockerfile and push it to the repository.

docker build -t <your-account>/fraud-analyst:0.0.1 .
docker push <your-account>/fraud-analyst:0.0.1

Registering and Deploying the agent

Run the following commands to register and deploy the agent in your cluster.

# Register agent
gru agent register . --cluster-name <name_of_your_cluster> --image <your-account>/fraud-analyst:0.0.1 --image_pull_secret <image_pull_secret>

# Deploy agent
gru agent deploy fraud-analyst

Congratulations! The Fraud Analyst Agent is successfully deployed and is ready to work for you!

Interacting with the agent

You now interact with the agent using the gru agent converse <agent-name> command. Here's an example conversation with the Fraud Analyst Agent:

% gru agent converse fraud-analyst
Conversation ID: 3d1d9e67
User: hi
Agent: Hello! How can I assist you today?
User: perform reconciliation process using code from repository 
Agent: I'll be calling the tool get_git_repo_contents with following arguments:
repository: 
Do you approve of this action? Type 'y' to continue; otherwise, explain your requested changed.
User: y
Agent: To perform the reconciliation process using the `recon.py` script from the repository, I need the following inputs from you:

1. **Source Bucket**: The name of the source bucket.
2. **Partner Report**: The path to the partner report file.
3. **Downloads TPAT**: The path to the downloads TPAT file.
4. **Conversion Attempts**: The path to the conversion attempts file.
5. **Destination Bucket**: The name of the destination bucket.
6. **Result Path**: The path where the result should be stored.
7. **Analysis Path**: The path for the analysis output.

Please provide these details so I can proceed with running the reconciliation process.
User: 
Agent: I'll be calling the tool run_python_code with following arguments:
repository: 
file_path_to_execute: 
arguments: 
run_id: reconciliation_run_001
Do you approve of this action? Type 'y' to continue; otherwise, explain your requested changed.
User: y
Agent: The reconciliation process has been initiated. You can check the status of the run using the run ID `reconciliation_run_001`. If you need further assistance or want to check the status now, feel free to ask!
User: what is the status?
Agent: The reconciliation process has been successfully completed. If you need any further assistance or have any questions, feel free to ask!
User: good
Agent: Great! If you have any more questions or need further assistance in the future, don't hesitate to reach out. Have a wonderful day!
Canso compatible Kubernetes cluster
Canso Helm charts
Broker
Checkpoint DB
Task Server
here