Canso - ML Platform
  • πŸ‘‹Introduction
  • πŸ›οΈCanso Architecture
  • πŸ’»Getting Started
    • 🏁Overview
    • 🌌Provison K8s Clusters
    • 🚒Install Canso Helm Charts
    • πŸπŸ”— Canso Python Client & Web App
    • πŸ“ŠHealth Metrics for Features in the Data Plane
  • πŸ’‘Feature Store
    • Data Sources
      • Data Spans
    • Data Sinks
    • ML Features
      • Raw ML Batch Feature
      • Derived ML Batch Feature
      • Raw ML Streaming Feature
      • Custom User Defined Function
  • πŸ’‘AI Agents
    • Introduction
    • Getting Started
    • Quickstart
    • Use Cases
      • Fraud Analyst Agent
      • Agent with Memory
      • Memory command examples
    • Concepts
      • Task Server
      • Broker
      • Checkpoint DB
      • Conversation History
      • Memory
    • How Tos
      • Update the AI Agent
      • Delete the AI Agent
    • Toolkit
      • SQL Runner
      • Kubernetes Job
      • Text-to-SQL
    • API Documentation
      • Agent
      • Memory
  • πŸ’‘Risk
    • Overview
    • Workflows and Rules
    • Real Time Transaction Monitoring
    • API Documentation
  • πŸ’‘Fraud Investigation
    • API Documentation
  • πŸ“Guides
    • Registry
    • Dry Runs for Batch ML Features
    • Deployment
Powered by GitBook
On this page
  • Prerequisites
  • Setting Up Components
  • Creating the Project Bootstrap
  • Updating Dependencies
  • Developing the Memory-Enabled Agent
  • Setting Up Environment Variables
  • Building and Pushing the Docker Image
  • Registering and Deploying the Agent
  • Storing Initial Memory Data
  • Updating existing Data in Memory
  • Delete the existing Data from Memory
  • Interacting with the Memory-Enabled Agent
  • Additional Resources

Was this helpful?

  1. πŸ’‘AI Agents
  2. Use Cases

Agent with Memory

PreviousFraud Analyst AgentNextMemory command examples

Last updated 2 months ago

Was this helpful?

This guide will walk you through setting up and deploying an AI agent with memory capabilities using the Canso AI Agentic System. We'll create a memory-agent that can store and retrieve information from a vector database, enhancing its ability to maintain context across interactions.

Prerequisites

Before proceeding, ensure you have:

  1. A set up

  2. installed on your cluster

  3. Gru CLI installed (follow the instructions )

Setting Up Components

Our memory-agent requires several components:

  • A Vector Database for memory storage

  • A Broker for task orchestration

  • A Checkpoint DB for saving agent state

  • A Task Server for executing long-running tasks

First, create a file named config.yaml with the following configuration:

vector_db:
  type: milvus
  name: my-vector-db
  size: 4Gi
  image_pull_secret: docker-secret-cred-agents

broker:
  type: redis
  name: my-redis

checkpoint_db:
  type: postgres
  name: my-postgres
  size: 4Gi

task_server:
  type: celery
  name: my-task-server
  replicas: 1
  concurrency_per_replica: 1
  broker_resource_name: my-redis

Next, run the command to set up all components:

gru component setup --cluster-name <name_of_your_cluster> --config-file config.yaml

Creating the Project Bootstrap

Generate the scaffold for our project:

gru agent create_bootstrap

When prompted, provide the following inputs:

agent_name (Agent Name): memory-agent
agent_framework (Langgraph): Langgraph        
task_server_name: my-task-server
checkpoint_db_name: my-postgres
replicas (1): 1
vector_db_name: my-vector-db

This creates a folder named memory-agent with the following structure:

memory-agent/
β”œβ”€β”€ .dockerignore
β”œβ”€β”€ .env
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ README.md
β”œβ”€β”€ config.yaml
β”œβ”€β”€ requirements.txt
└── src/
    └── main.py

Updating Dependencies

Update requirements.txt with all necessary dependencies:

gru==0.0.1rc22.dev16
python-dotenv==1.0.1
langchain-openai==0.2.14
openai==1.14.1
boto3==1.35.95

Developing the Memory-Enabled Agent

Replace the content of src/main.py with the following code:

import os
from typing import Annotated, TypedDict
from dotenv import load_dotenv
from gru.agents.tools.langgraph.python_arguments import PythonArgumentsTool
from gru.agents.tools.langgraph import GitRepoContentRetriever, PythonCodeRunner, PythonRunStatusChecker, DataQualityTool, MemoryRetrievalTool
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.prebuilt import tools_condition
from langgraph.graph import END, StateGraph, START
from langchain_openai import ChatOpenAI
from openai import OpenAI
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt import ToolNode
from gru.agents import CansoLanggraphAgent
from gru.agents import CansoMemory
from gru.agents.tools.core.vector_db.vectordb_factory import VectorDBType
from gru.agents.tools.core.embeddings.embedding_factory import EmbeddingType

# This loads the environment variables from .env file.
# It is recommended to keep this as the first statement in main.py and not to be removed.
load_dotenv()

GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
OPEN_AI_KEY = os.getenv("OPENAI_API_KEY", "")

# Initialize OpenAI client for embeddings
embedding_client = OpenAI()

# Initialize CansoMemory with embedding client
memory = CansoMemory(
    client=embedding_client, 
    embedding_type=EmbeddingType.OPENAI, 
    vector_db_type=VectorDBType.MILVUS
)

# Initialize the LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)

# Initialize tools
git_content_retriever = GitRepoContentRetriever(GITHUB_TOKEN)
python_argument_tool = PythonArgumentsTool(GITHUB_TOKEN)
python_code_runner = PythonCodeRunner(GITHUB_TOKEN)
python_run_status_tool = PythonRunStatusChecker()
check_data_quality = DataQualityTool()
memory_retrieval_tool = MemoryRetrievalTool(memory=memory)

# Group tools by sensitivity
safe_tools = [python_argument_tool, python_run_status_tool, check_data_quality, memory_retrieval_tool]
sensitive_tools = [git_content_retriever, python_code_runner]

sensitive_tool_names = {tool.name for tool in sensitive_tools}

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # If the LLM happens to return an empty response, we will re-prompt it
            # for an actual response.
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

def route_tools(state: State):
    """Route to different tool nodes based on the tool being called."""
    next_node = tools_condition(state)
    # If no tools are invoked, return END
    if next_node == END:
        return END

    ai_message = state["messages"][-1]
    # Handle the first tool call (assuming single tool calls)
    first_tool_call = ai_message.tool_calls[0]

    # Route to sensitive tools if the tool name is in sensitive_tool_names
    if first_tool_call["name"] in sensitive_tool_names:
        return "sensitive_tools"
    return "safe_tools"

# Define the system prompt
assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant with memory capabilities. You can store and retrieve information "
            "from your memory to maintain context across conversations. "
            "Use the query_memory tool to search for relevant information stored in your memory. "
            "You can analyze data quality, run code from repositories, and perform other advanced tasks "
            "using the provided tools. Always ask for confirmation before running sensitive operations."
        ),
        ("placeholder", "{messages}"),
    ]
)

# Combine all tools and create the runnable
all_tools = safe_tools + sensitive_tools
assistant_runnable = assistant_prompt | model.bind_tools(all_tools)

# Create the graph
graph = StateGraph(State)

graph.add_node("assistant", Assistant(assistant_runnable))
graph.add_node("safe_tools", ToolNode(safe_tools))
graph.add_node("sensitive_tools", ToolNode(sensitive_tools))

graph.add_edge(START, "assistant")
graph.add_conditional_edges(
    "assistant",
    route_tools,
    ["safe_tools", "sensitive_tools", END]
)
graph.add_edge("safe_tools", "assistant")
graph.add_edge("sensitive_tools", "assistant")

# Initialize the Canso agent with memory
canso_agent = CansoLanggraphAgent(
    stateGraph=graph, 
    interrupt_before=["sensitive_tools"], 
    memory=memory
)
canso_agent.run()

Setting Up Environment Variables

Create or update the .env file with the following content:

OPENAI_API_KEY=<your_openai_api_key>
GITHUB_TOKEN=<your_github_token>

Building and Pushing the Docker Image

Build the Docker image and push it to your container registry:

docker build -t <your-account>/memory-agent:0.0.1 .
docker push <your-account>/memory-agent:0.0.1

Registering and Deploying the Agent

Register and deploy your agent with:

# Register agent
gru agent register . --cluster-name <name_of_your_cluster> --image <your-account>/memory-agent:0.0.1 --image_pull_secret <image_pull_secret>

# Deploy agent
gru agent deploy memory-agent

Storing Initial Memory Data

Before interacting with your agent, let's store some initial data in its memory.

Create a JSON file named customer_data.json:

{
  "collection_name": "canso_domain_knowledge",
  "text": "Customer support policy: All premium customers receive priority support with a response time of maximum 4 hours. Standard customers receive support within 24 hours. For technical issues, we offer screen sharing sessions for premium customers. Refunds are processed within 7 business days.",
  "data": {
    "domain": "customer_support",
    "tags": ["support", "policy", "premium", "standard"],
    "metadata": {
      "version": "1.2",
      "last_updated": "2025-02-10",
      "created_by": "policy-team",
      "criticality": "high"
    }
  }
}

Insert this data into agent's memory

gru agent memory insert --agent-name memory-agent --file customer_data.json

This command will return the name of the memory where the data was inserted, which you can use for update and delete operations.

Memory registration request queued successfully with memory name = 8a0a3e62-de36-403f-a094-606c15f5a54f

Updating existing Data in Memory

With this operation, you can also update certain attributes of the data you previously added.

Create a JSON file named customer_update_data.json:

{
  "collection_name": "canso_domain_knowledge",
  "memory_name": "8a0a3e62-de36-403f-a094-606c15f5a54f",
  "text": "Customer support policy: All premium customers receive priority support with a response time of maximum 3 hours. Standard customers receive support within 48 hours. For technical issues, we offer screen sharing sessions and dedicated agents for premium customers. Refunds are processed within 5 business days.",
  "data": {
    "domain": "customer_success",
    "tags": ["support", "policy", "premium", "standard", "refund"],
    "metadata": {
      "version": "1.3",
      "last_updated": "2025-03-15",
      "created_by": "success-team",
      "criticality": "critical"
    }
  }
}

Update this data in the agent's memory.

gru agent memory update --agent-name memory-agent --file customer_update_data.json

Delete the existing Data from Memory

You can also delete the previously stored elements from memory by.

gru agent memory delete --agent-name memory-agent --file customer_delete_data.json

Interacting with the Memory-Enabled Agent

Start a conversation with your agent:

gru agent converse memory-agent

Example interaction:

Conversation ID: abcd1234
User: What do you know about our customer support policies?

Agent: I'll check my memory for information about customer support policies.

I've found some relevant information in my knowledge base about our customer support policies:

We have different service levels based on customer type:
- Premium customers receive priority support with a maximum response time of 4 hours
- Standard customers receive support within 24 hours
- Premium customers are eligible for screen sharing sessions for technical issues
- All refunds are processed within 7 business days

This information was last updated on March 15, 2025 (version 1.2) and is considered high-criticality business information.

Is there anything specific about these policies you'd like me to elaborate on?

User: Can you also run a data quality check on our customer database?

Agent: I'll be calling the tool check_data_quality with following arguments:
bucket: customer-data-bucket
file_path: customers.csv
Do you approve of this action? Type 'y' to continue; otherwise, explain your requested changed.

User: y

Agent: [Results of data quality check would appear here]

Additional Resources

Canso compatible Kubernetes cluster
Canso Helm charts
here
Memory API Reference
Memory Examples
SQL Runner Tool