The Fraud Analyst Agent is an AI-powered assistant designed to support Data Scientists and Data Engineers in their daily fraud analysis tasks. It comes equipped with capabilities such as data reconciliation, analysis, and explanation, helping streamline workflows and improve efficiency. With more capabilities coming soon, the Fraud Analyst Agent aims to make fraud detection and investigation easier than ever.
Fraud Analyst Agent using Canso AI Agentic System
The Canso AI Agentic System enables fast and seamless development and deployment of a Fraud Analyst Agent.
Follow the steps below to develop, deploy and interact with a Fraud Analyst Agent.
src/main.py serves a the entrypoint for our application. In this file, we define our AI Agent and wrap it with the CansoLangraphAgent wrapper.
import os
from typing import Annotated, TypedDict
from dotenv import load_dotenv
from io import StringIO
import boto3
from gru.agents.tools.langgraph.python_arguments import PythonArgumentsTool
from gru.agents.tools.langgraph import GitRepoContentRetriever, PythonCodeRunner, PythonRunStatusChecker
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.prebuilt import tools_condition
from langgraph.graph import END, StateGraph, START
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
from gru.agents import CansoLanggraphAgent
import pandas as pd
# This loads the environment variables from .env file.
# It is recommended to keep this as the first statement in main.py and not to be removed.
load_dotenv()
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
git_content_retriever = GitRepoContentRetriever(GITHUB_TOKEN)
python_argument_tool = PythonArgumentsTool(GITHUB_TOKEN)
python_code_runner = PythonCodeRunner(GITHUB_TOKEN)
python_run_status_tool = PythonRunStatusChecker()
def data_explainer_report(df):
"""
Generates a data explainer report for a Pandas DataFrame.
Args:
df (pd.DataFrame): The DataFrame to analyze.
Returns:
dict: A dictionary containing the data quality report.
"""
report = {}
report["describe_stats"] = df.describe().to_dict()
report["missing_values"] = df.isnull().sum().to_dict()
report["duplicates"] = df.duplicated().sum()
report["counts"] = {
"rows": len(df),
"columns": len(df.columns)
}
report["basics"] = {
"columns": list(df.columns)
}
return report
@tool
def check_data_quality(bucket: str, file_path: str) -> str:
""" use this for data quality check.
Args:
bucket : bucket in which the data is stored
file_path: path of the file in the bucket
"""
try:
s3_client = boto3.client('s3')
csv_obj = s3_client.get_object(Bucket=bucket, Key=file_path)
df = pd.read_csv(StringIO(csv_obj['Body'].read().decode('utf-8')))
report = data_explainer_report(df)
response = []
response.append(f"Data Quality Report for {file_path}:")
response.append(f"- Total rows: {report['counts']['rows']}")
response.append(f"- Total columns: {report['counts']['columns']}")
response.append(f"- Number of duplicates: {report['duplicates']}")
# Report missing values if any exist
missing = {k: v for k, v in report['missing_values'].items() if v > 0}
if missing:
response.append("\nMissing values found in columns:")
for col, count in missing.items():
response.append(f"- {col}: {count} missing values")
return "\n".join(response)
except Exception as e:
return f"Error checking data quality: {str(e)}"
safe_tools = [python_argument_tool, python_run_status_tool, check_data_quality]
sensitive_tools = [git_content_retriever, python_code_runner]
sensitive_tool_names = {tool.name for tool in sensitive_tools}
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
def route_tools(state: State):
"""Route to different tool nodes based on the tool being called."""
next_node = tools_condition(state)
# If no tools are invoked, return END
if next_node == END:
return END
ai_message = state["messages"][-1]
# Handle the first tool call (assuming single tool calls)
first_tool_call = ai_message.tool_calls[0]
# Route to sensitive tools if the tool name is in sensitive_tool_names
if first_tool_call["name"] in sensitive_tool_names:
return "sensitive_tools"
return "safe_tools"
model = ChatOpenAI(model="gpt-4o", temperature=0)
assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant capable of performing reconciliation activity using the tools provided to you."
"Always check the data quality and take confirmation from the user before reconciliation."
"Reconciliation is performed by running python code from a repository."
"Ask for necessary inputs from the user whenever necessary."
),
("placeholder", "{messages}"),
]
)
all_tools = safe_tools + sensitive_tools
assistant_runnable = assistant_prompt | model.bind_tools(all_tools)
graph = StateGraph(State)
graph.add_node("assistant", Assistant(assistant_runnable))
graph.add_node("safe_tools", ToolNode(safe_tools))
graph.add_node("sensitive_tools", ToolNode(sensitive_tools))
graph.add_edge(START, "assistant")
graph.add_conditional_edges(
"assistant",
route_tools,
["safe_tools", "sensitive_tools", END]
)
graph.add_edge("safe_tools", "assistant")
graph.add_edge("sensitive_tools", "assistant")
canso_agent = CansoLanggraphAgent(stateGraph=graph, interrupt_before=["sensitive_tools"])
canso_agent.run()
Add the necessary environment variables to the .env file.
Congratulations! The Fraud Analyst Agent is successfully deployed and is ready to work for you!
Interacting with the agent
You now interact with the agent using the gru agent converse <agent-name> command. Here's an example conversation with the Fraud Analyst Agent:
% gru agent converse fraud-analyst
Conversation ID: 3d1d9e67
User: hi
Agent: Hello! How can I assist you today?
User: perform reconciliation process using code from repository
Agent: I'll be calling the tool get_git_repo_contents with following arguments:
repository:
Do you approve of this action? Type 'y' to continue; otherwise, explain your requested changed.
User: y
Agent: To perform the reconciliation process using the `recon.py` script from the repository, I need the following inputs from you:
1. **Source Bucket**: The name of the source bucket.
2. **Partner Report**: The path to the partner report file.
3. **Downloads TPAT**: The path to the downloads TPAT file.
4. **Conversion Attempts**: The path to the conversion attempts file.
5. **Destination Bucket**: The name of the destination bucket.
6. **Result Path**: The path where the result should be stored.
7. **Analysis Path**: The path for the analysis output.
Please provide these details so I can proceed with running the reconciliation process.
User:
Agent: I'll be calling the tool run_python_code with following arguments:
repository:
file_path_to_execute:
arguments:
run_id: reconciliation_run_001
Do you approve of this action? Type 'y' to continue; otherwise, explain your requested changed.
User: y
Agent: The reconciliation process has been initiated. You can check the status of the run using the run ID `reconciliation_run_001`. If you need further assistance or want to check the status now, feel free to ask!
User: what is the status?
Agent: The reconciliation process has been successfully completed. If you need any further assistance or have any questions, feel free to ask!
User: good
Agent: Great! If you have any more questions or need further assistance in the future, don't hesitate to reach out. Have a wonderful day!