mirror of
https://github.com/cpacker/MemGPT.git
synced 2025-06-03 04:30:22 +00:00
303 lines
13 KiB
Python
303 lines
13 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from letta import LocalClient, create_client
|
|
from letta.functions.functions import derive_openai_json_schema, parse_source_code
|
|
from letta.schemas.embedding_config import EmbeddingConfig
|
|
from letta.schemas.letta_message import SystemMessage, ToolReturnMessage
|
|
from letta.schemas.llm_config import LLMConfig
|
|
from letta.schemas.memory import ChatMemory
|
|
from letta.schemas.tool import Tool
|
|
from letta.services.agent_manager import AgentManager
|
|
from tests.helpers.utils import retry_until_success
|
|
from tests.utils import wait_for_incoming_message
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def client():
|
|
client = create_client()
|
|
client.set_default_llm_config(LLMConfig.default_config("gpt-4o"))
|
|
client.set_default_embedding_config(EmbeddingConfig.default_config(provider="openai"))
|
|
|
|
yield client
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def remove_stale_agents(client):
|
|
stale_agents = AgentManager().list_agents(actor=client.user, limit=300)
|
|
for agent in stale_agents:
|
|
client.delete_agent(agent_id=agent.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def agent_obj(client: LocalClient):
|
|
"""Create a test agent that we can call functions on"""
|
|
send_message_to_agent_and_wait_for_reply_tool_id = client.get_tool_id(name="send_message_to_agent_and_wait_for_reply")
|
|
agent_state = client.create_agent(tool_ids=[send_message_to_agent_and_wait_for_reply_tool_id])
|
|
|
|
agent_obj = client.server.load_agent(agent_id=agent_state.id, actor=client.user)
|
|
yield agent_obj
|
|
|
|
# client.delete_agent(agent_obj.agent_state.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def other_agent_obj(client: LocalClient):
|
|
"""Create another test agent that we can call functions on"""
|
|
agent_state = client.create_agent(include_multi_agent_tools=False)
|
|
|
|
other_agent_obj = client.server.load_agent(agent_id=agent_state.id, actor=client.user)
|
|
yield other_agent_obj
|
|
|
|
client.delete_agent(other_agent_obj.agent_state.id)
|
|
|
|
|
|
@pytest.fixture
|
|
def roll_dice_tool(client):
|
|
def roll_dice():
|
|
"""
|
|
Rolls a 6 sided die.
|
|
|
|
Returns:
|
|
str: The roll result.
|
|
"""
|
|
return "Rolled a 5!"
|
|
|
|
# Set up tool details
|
|
source_code = parse_source_code(roll_dice)
|
|
source_type = "python"
|
|
description = "test_description"
|
|
tags = ["test"]
|
|
|
|
tool = Tool(description=description, tags=tags, source_code=source_code, source_type=source_type)
|
|
derived_json_schema = derive_openai_json_schema(source_code=tool.source_code, name=tool.name)
|
|
|
|
derived_name = derived_json_schema["name"]
|
|
tool.json_schema = derived_json_schema
|
|
tool.name = derived_name
|
|
|
|
tool = client.server.tool_manager.create_or_update_tool(tool, actor=client.user)
|
|
|
|
# Yield the created tool
|
|
yield tool
|
|
|
|
|
|
@retry_until_success(max_attempts=5, sleep_time_seconds=2)
|
|
def test_send_message_to_agent(client, agent_obj, other_agent_obj):
|
|
secret_word = "banana"
|
|
|
|
# Encourage the agent to send a message to the other agent_obj with the secret string
|
|
client.send_message(
|
|
agent_id=agent_obj.agent_state.id,
|
|
role="user",
|
|
message=f"Use your tool to send a message to another agent with id {other_agent_obj.agent_state.id} to share the secret word: {secret_word}!",
|
|
)
|
|
|
|
# Conversation search the other agent
|
|
messages = client.get_messages(other_agent_obj.agent_state.id)
|
|
# Check for the presence of system message
|
|
for m in reversed(messages):
|
|
print(f"\n\n {other_agent_obj.agent_state.id} -> {m.model_dump_json(indent=4)}")
|
|
if isinstance(m, SystemMessage):
|
|
assert secret_word in m.content
|
|
break
|
|
|
|
# Search the sender agent for the response from another agent
|
|
in_context_messages = agent_obj.agent_manager.get_in_context_messages(agent_id=agent_obj.agent_state.id, actor=agent_obj.user)
|
|
found = False
|
|
target_snippet = f"{other_agent_obj.agent_state.id} said:"
|
|
|
|
for m in in_context_messages:
|
|
if target_snippet in m.content[0].text:
|
|
found = True
|
|
break
|
|
|
|
print(f"In context messages of the sender agent (without system):\n\n{"\n".join([m.content[0].text for m in in_context_messages[1:]])}")
|
|
if not found:
|
|
raise Exception(f"Was not able to find an instance of the target snippet: {target_snippet}")
|
|
|
|
# Test that the agent can still receive messages fine
|
|
response = client.send_message(agent_id=agent_obj.agent_state.id, role="user", message="So what did the other agent say?")
|
|
print(response.messages)
|
|
|
|
|
|
@retry_until_success(max_attempts=5, sleep_time_seconds=2)
|
|
def test_send_message_to_agents_with_tags_simple(client):
|
|
worker_tags_123 = ["worker", "user-123"]
|
|
worker_tags_456 = ["worker", "user-456"]
|
|
|
|
# Clean up first from possibly failed tests
|
|
prev_worker_agents = client.server.agent_manager.list_agents(
|
|
client.user, tags=list(set(worker_tags_123 + worker_tags_456)), match_all_tags=True
|
|
)
|
|
for agent in prev_worker_agents:
|
|
client.delete_agent(agent.id)
|
|
|
|
secret_word = "banana"
|
|
|
|
# Create "manager" agent
|
|
send_message_to_agents_matching_tags_tool_id = client.get_tool_id(name="send_message_to_agents_matching_tags")
|
|
manager_agent_state = client.create_agent(name="manager_agent", tool_ids=[send_message_to_agents_matching_tags_tool_id])
|
|
manager_agent = client.server.load_agent(agent_id=manager_agent_state.id, actor=client.user)
|
|
|
|
# Create 3 non-matching worker agents (These should NOT get the message)
|
|
worker_agents_123 = []
|
|
for idx in range(2):
|
|
worker_agent_state = client.create_agent(name=f"not_worker_{idx}", include_multi_agent_tools=False, tags=worker_tags_123)
|
|
worker_agent = client.server.load_agent(agent_id=worker_agent_state.id, actor=client.user)
|
|
worker_agents_123.append(worker_agent)
|
|
|
|
# Create 3 worker agents that should get the message
|
|
worker_agents_456 = []
|
|
for idx in range(2):
|
|
worker_agent_state = client.create_agent(name=f"worker_{idx}", include_multi_agent_tools=False, tags=worker_tags_456)
|
|
worker_agent = client.server.load_agent(agent_id=worker_agent_state.id, actor=client.user)
|
|
worker_agents_456.append(worker_agent)
|
|
|
|
# Encourage the manager to send a message to the other agent_obj with the secret string
|
|
response = client.send_message(
|
|
agent_id=manager_agent.agent_state.id,
|
|
role="user",
|
|
message=f"Send a message to all agents with tags {worker_tags_456} informing them of the secret word: {secret_word}!",
|
|
)
|
|
|
|
for m in response.messages:
|
|
if isinstance(m, ToolReturnMessage):
|
|
tool_response = eval(json.loads(m.tool_return)["message"])
|
|
print(f"\n\nManager agent tool response: \n{tool_response}\n\n")
|
|
assert len(tool_response) == len(worker_agents_456)
|
|
|
|
# We can break after this, the ToolReturnMessage after is not related
|
|
break
|
|
|
|
# Conversation search the worker agents
|
|
for agent in worker_agents_456:
|
|
messages = client.get_messages(agent.agent_state.id)
|
|
# Check for the presence of system message
|
|
for m in reversed(messages):
|
|
print(f"\n\n {agent.agent_state.id} -> {m.model_dump_json(indent=4)}")
|
|
if isinstance(m, SystemMessage):
|
|
assert secret_word in m.content
|
|
break
|
|
|
|
# Ensure it's NOT in the non matching worker agents
|
|
for agent in worker_agents_123:
|
|
messages = client.get_messages(agent.agent_state.id)
|
|
# Check for the presence of system message
|
|
for m in reversed(messages):
|
|
print(f"\n\n {agent.agent_state.id} -> {m.model_dump_json(indent=4)}")
|
|
if isinstance(m, SystemMessage):
|
|
assert secret_word not in m.content
|
|
|
|
# Test that the agent can still receive messages fine
|
|
response = client.send_message(agent_id=manager_agent.agent_state.id, role="user", message="So what did the other agents say?")
|
|
print("Manager agent followup message: \n\n" + "\n".join([str(m) for m in response.messages]))
|
|
|
|
# Clean up agents
|
|
client.delete_agent(manager_agent_state.id)
|
|
for agent in worker_agents_456 + worker_agents_123:
|
|
client.delete_agent(agent.agent_state.id)
|
|
|
|
|
|
@retry_until_success(max_attempts=5, sleep_time_seconds=2)
|
|
def test_send_message_to_agents_with_tags_complex_tool_use(client, roll_dice_tool):
|
|
worker_tags = ["dice-rollers"]
|
|
|
|
# Clean up first from possibly failed tests
|
|
prev_worker_agents = client.server.agent_manager.list_agents(client.user, tags=worker_tags, match_all_tags=True)
|
|
for agent in prev_worker_agents:
|
|
client.delete_agent(agent.id)
|
|
|
|
# Create "manager" agent
|
|
send_message_to_agents_matching_tags_tool_id = client.get_tool_id(name="send_message_to_agents_matching_tags")
|
|
manager_agent_state = client.create_agent(tool_ids=[send_message_to_agents_matching_tags_tool_id])
|
|
manager_agent = client.server.load_agent(agent_id=manager_agent_state.id, actor=client.user)
|
|
|
|
# Create 3 worker agents
|
|
worker_agents = []
|
|
worker_tags = ["dice-rollers"]
|
|
for _ in range(2):
|
|
worker_agent_state = client.create_agent(include_multi_agent_tools=False, tags=worker_tags, tool_ids=[roll_dice_tool.id])
|
|
worker_agent = client.server.load_agent(agent_id=worker_agent_state.id, actor=client.user)
|
|
worker_agents.append(worker_agent)
|
|
|
|
# Encourage the manager to send a message to the other agent_obj with the secret string
|
|
broadcast_message = f"Send a message to all agents with tags {worker_tags} asking them to roll a dice for you!"
|
|
response = client.send_message(
|
|
agent_id=manager_agent.agent_state.id,
|
|
role="user",
|
|
message=broadcast_message,
|
|
)
|
|
|
|
for m in response.messages:
|
|
if isinstance(m, ToolReturnMessage):
|
|
tool_response = eval(json.loads(m.tool_return)["message"])
|
|
print(f"\n\nManager agent tool response: \n{tool_response}\n\n")
|
|
assert len(tool_response) == len(worker_agents)
|
|
|
|
# We can break after this, the ToolReturnMessage after is not related
|
|
break
|
|
|
|
# Test that the agent can still receive messages fine
|
|
response = client.send_message(agent_id=manager_agent.agent_state.id, role="user", message="So what did the other agents say?")
|
|
print("Manager agent followup message: \n\n" + "\n".join([str(m) for m in response.messages]))
|
|
|
|
# Clean up agents
|
|
client.delete_agent(manager_agent_state.id)
|
|
for agent in worker_agents:
|
|
client.delete_agent(agent.agent_state.id)
|
|
|
|
|
|
@retry_until_success(max_attempts=5, sleep_time_seconds=2)
|
|
def test_agents_async_simple(client):
|
|
"""
|
|
Test two agents with multi-agent tools sending messages back and forth to count to 5.
|
|
The chain is started by prompting one of the agents.
|
|
"""
|
|
# Cleanup from potentially failed previous runs
|
|
existing_agents = client.server.agent_manager.list_agents(client.user)
|
|
for agent in existing_agents:
|
|
client.delete_agent(agent.id)
|
|
|
|
# Create two agents with multi-agent tools
|
|
send_message_to_agent_async_tool_id = client.get_tool_id(name="send_message_to_agent_async")
|
|
memory_a = ChatMemory(
|
|
human="Chad - I'm interested in hearing poem.",
|
|
persona="You are an AI agent that can communicate with your agent buddy using `send_message_to_agent_async`, who has some great poem ideas (so I've heard).",
|
|
)
|
|
charles_state = client.create_agent(name="charles", memory=memory_a, tool_ids=[send_message_to_agent_async_tool_id])
|
|
charles = client.server.load_agent(agent_id=charles_state.id, actor=client.user)
|
|
|
|
memory_b = ChatMemory(
|
|
human="No human - you are to only communicate with the other AI agent.",
|
|
persona="You are an AI agent that can communicate with your agent buddy using `send_message_to_agent_async`, who is interested in great poem ideas.",
|
|
)
|
|
sarah_state = client.create_agent(name="sarah", memory=memory_b, tool_ids=[send_message_to_agent_async_tool_id])
|
|
|
|
# Start the count chain with Agent1
|
|
initial_prompt = f"I want you to talk to the other agent with ID {sarah_state.id} using `send_message_to_agent_async`. Specifically, I want you to ask him for a poem idea, and then craft a poem for me."
|
|
client.send_message(
|
|
agent_id=charles.agent_state.id,
|
|
role="user",
|
|
message=initial_prompt,
|
|
)
|
|
|
|
found_in_charles = wait_for_incoming_message(
|
|
client=client,
|
|
agent_id=charles_state.id,
|
|
substring="[Incoming message from agent with ID",
|
|
max_wait_seconds=10,
|
|
sleep_interval=0.5,
|
|
)
|
|
assert found_in_charles, "Charles never received the system message from Sarah (timed out)."
|
|
|
|
found_in_sarah = wait_for_incoming_message(
|
|
client=client,
|
|
agent_id=sarah_state.id,
|
|
substring="[Incoming message from agent with ID",
|
|
max_wait_seconds=10,
|
|
sleep_interval=0.5,
|
|
)
|
|
assert found_in_sarah, "Sarah never received the system message from Charles (timed out)."
|