import difflib import json import os from datetime import datetime, timezone from io import BytesIO from typing import Any, Dict, List, Mapping import pytest from fastapi.testclient import TestClient from rich.console import Console from rich.syntax import Syntax from letta import create_client from letta.config import LettaConfig from letta.orm import Base from letta.orm.enums import ToolType from letta.schemas.agent import AgentState, CreateAgent from letta.schemas.block import Block, CreateBlock from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import MessageRole from letta.schemas.llm_config import LLMConfig from letta.schemas.message import MessageCreate from letta.schemas.organization import Organization from letta.schemas.user import User from letta.serialize_schemas.pydantic_agent_schema import AgentSchema from letta.server.rest_api.app import app from letta.server.server import SyncServer console = Console() def _clear_tables(): from letta.server.db import db_context with db_context() as session: for table in reversed(Base.metadata.sorted_tables): # Reverse to avoid FK issues session.execute(table.delete()) # Truncate table session.commit() @pytest.fixture def fastapi_client(): """Fixture to create a FastAPI test client.""" return TestClient(app) @pytest.fixture(autouse=True) def clear_tables(): _clear_tables() @pytest.fixture(scope="module") def local_client(): client = create_client() client.set_default_llm_config(LLMConfig.default_config("gpt-4o-mini")) client.set_default_embedding_config(EmbeddingConfig.default_config(provider="openai")) yield client @pytest.fixture(scope="module") def server(): config = LettaConfig.load() config.save() server = SyncServer(init_with_default_org_and_user=False) return server @pytest.fixture def default_organization(server: SyncServer): """Fixture to create and return the default organization.""" org = server.organization_manager.create_default_organization() yield org @pytest.fixture def default_user(server: SyncServer, default_organization): """Fixture to create and return the default user within the default organization.""" user = server.user_manager.create_default_user(org_id=default_organization.id) yield user @pytest.fixture def other_organization(server: SyncServer): """Fixture to create and return the default organization.""" org = server.organization_manager.create_organization(pydantic_org=Organization(name="letta")) yield org @pytest.fixture def other_user(server: SyncServer, other_organization): """Fixture to create and return the default user within the default organization.""" user = server.user_manager.create_user(pydantic_user=User(organization_id=other_organization.id, name="sarah")) yield user @pytest.fixture def weather_tool(local_client, weather_tool_func): weather_tool = local_client.create_or_update_tool(func=weather_tool_func) yield weather_tool @pytest.fixture def print_tool(local_client, print_tool_func): print_tool = local_client.create_or_update_tool(func=print_tool_func) yield print_tool @pytest.fixture def default_block(server: SyncServer, default_user): """Fixture to create and return a default block.""" block_data = Block( label="default_label", value="Default Block Content", description="A default test block", limit=1000, metadata={"type": "test"}, ) block = server.block_manager.create_or_update_block(block_data, actor=default_user) yield block @pytest.fixture def serialize_test_agent(server: SyncServer, default_user, default_organization, default_block, weather_tool): """Fixture to create and return a sample agent within the default organization.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") f"serialize_test_agent_{timestamp}" server.tool_manager.upsert_base_tools(actor=default_user) memory_blocks = [CreateBlock(label="human", value="BananaBoy"), CreateBlock(label="persona", value="I am a helpful assistant")] create_agent_request = CreateAgent( system="test system", memory_blocks=memory_blocks, llm_config=LLMConfig.default_config("gpt-4o-mini"), embedding_config=EmbeddingConfig.default_config(provider="openai"), block_ids=[default_block.id], tool_ids=[weather_tool.id], tags=["a", "b"], description="test_description", metadata={"test_key": "test_value"}, initial_message_sequence=[MessageCreate(role=MessageRole.user, content="hello world")], tool_exec_environment_variables={"test_env_var_key_a": "test_env_var_value_a", "test_env_var_key_b": "test_env_var_value_b"}, message_buffer_autoclear=True, ) agent_state = server.agent_manager.create_agent( agent_create=create_agent_request, actor=default_user, ) yield agent_state # Helper functions below def dict_to_pretty_json(d: Dict[str, Any]) -> str: """Convert a dictionary to a pretty JSON string with sorted keys, handling datetime objects.""" return json.dumps(d, indent=2, sort_keys=True, default=_json_serializable) def _json_serializable(obj: Any) -> Any: """Convert non-serializable objects (like datetime) to a JSON-friendly format.""" if isinstance(obj, datetime): return obj.isoformat() # Convert datetime to ISO 8601 format raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") def print_dict_diff(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> None: """Prints a detailed colorized diff between two dictionaries.""" json1 = dict_to_pretty_json(dict1).splitlines() json2 = dict_to_pretty_json(dict2).splitlines() diff = list(difflib.unified_diff(json1, json2, fromfile="Expected", tofile="Actual", lineterm="")) if diff: console.print("\nšŸ” [bold red]Dictionary Diff:[/bold red]") diff_text = "\n".join(diff) syntax = Syntax(diff_text, "diff", theme="monokai", line_numbers=False) console.print(syntax) else: console.print("\nāœ… [bold green]No differences found in dictionaries.[/bold green]") def has_same_prefix(value1: Any, value2: Any) -> bool: """Check if two string values have the same major prefix (before the second hyphen).""" if not isinstance(value1, str) or not isinstance(value2, str): return False prefix1 = value1.split("-")[0] prefix2 = value2.split("-")[0] return prefix1 == prefix2 def compare_lists(list1: List[Any], list2: List[Any]) -> bool: """Compare lists while handling unordered dictionaries inside.""" if len(list1) != len(list2): return False if all(isinstance(item, Mapping) for item in list1) and all(isinstance(item, Mapping) for item in list2): return all(any(_compare_agent_state_model_dump(i1, i2, log=False) for i2 in list2) for i1 in list1) return sorted(list1) == sorted(list2) def strip_datetime_fields(d: Dict[str, Any]) -> Dict[str, Any]: """Remove datetime fields from a dictionary before comparison.""" return {k: v for k, v in d.items() if not isinstance(v, datetime)} def _log_mismatch(key: str, expected: Any, actual: Any, log: bool) -> None: """Log detailed information about a mismatch.""" if log: print(f"\nšŸ”“ Mismatch Found in Key: '{key}'") print(f"Expected: {expected}") print(f"Actual: {actual}") if isinstance(expected, str) and isinstance(actual, str): print("\nšŸ” String Diff:") diff = difflib.ndiff(expected.splitlines(), actual.splitlines()) print("\n".join(diff)) def _compare_agent_state_model_dump(d1: Dict[str, Any], d2: Dict[str, Any], log: bool = True) -> bool: """ Compare two dictionaries with special handling: - Keys in `ignore_prefix_fields` should match only by prefix. - 'message_ids' lists should match in length only. - Datetime fields are ignored. - Order-independent comparison for lists of dicts. """ ignore_prefix_fields = {"id", "last_updated_by_id", "organization_id", "created_by_id", "agent_id", "project_id"} # Remove datetime fields upfront d1 = strip_datetime_fields(d1) d2 = strip_datetime_fields(d2) if d1.keys() != d2.keys(): _log_mismatch("dict_keys", set(d1.keys()), set(d2.keys())) return False for key, v1 in d1.items(): v2 = d2[key] if key in ignore_prefix_fields: if v1 and v2 and not has_same_prefix(v1, v2): _log_mismatch(key, v1, v2, log) return False elif key == "message_ids": if not isinstance(v1, list) or not isinstance(v2, list) or len(v1) != len(v2): _log_mismatch(key, v1, v2, log) return False elif key == "tool_exec_environment_variables": if not isinstance(v1, list) or not isinstance(v2, list) or len(v1) != len(v2): _log_mismatch(key, v1, v2, log) return False elif isinstance(v1, Dict) and isinstance(v2, Dict): if not _compare_agent_state_model_dump(v1, v2): _log_mismatch(key, v1, v2, log) return False elif isinstance(v1, list) and isinstance(v2, list): if not compare_lists(v1, v2): _log_mismatch(key, v1, v2, log) return False elif v1 != v2: _log_mismatch(key, v1, v2, log) return False return True def compare_agent_state(server, original: AgentState, copy: AgentState, append_copy_suffix: bool, og_user: User, copy_user: User) -> bool: """Wrapper function that provides a default set of ignored prefix fields.""" if not append_copy_suffix: assert original.name == copy.name compare_in_context_message_id_remapping(server, original, copy, og_user, copy_user) return _compare_agent_state_model_dump(original.model_dump(exclude="name"), copy.model_dump(exclude="name")) def compare_in_context_message_id_remapping(server, og_agent: AgentState, copy_agent: AgentState, og_user, copy_user): """ Test deserializing JSON into an Agent instance results in messages with remapped IDs but identical relevant content and order. """ # Serialize the original agent state result = server.agent_manager.serialize(agent_id=og_agent.id, actor=og_user) # Retrieve the in-context messages for both the original and the copy # Corrected typo: agent_id instead of agent_id in_context_messages_og = server.agent_manager.get_in_context_messages(agent_id=og_agent.id, actor=og_user) in_context_messages_copy = server.agent_manager.get_in_context_messages(agent_id=copy_agent.id, actor=copy_user) # 1. Check if the number of messages is the same assert len(in_context_messages_og) == len( in_context_messages_copy ), f"Original message count ({len(in_context_messages_og)}) differs from copy ({len(in_context_messages_copy)})" # 2. Iterate and compare messages by order, checking content equality and ID difference if not in_context_messages_og: # If there are no messages, the test passes trivially for message comparison. # Depending on the test case, you might want to assert that messages *should* exist. # pytest.fail("Expected messages to exist for comparison, but none were found.") pass # Or skip if empty lists are valid outcomes for i, (msg_og, msg_copy) in enumerate(zip(in_context_messages_og, in_context_messages_copy)): # --- Assert ID Remapping --- assert msg_og.id != msg_copy.id, f"Message ID at index {i} was not remapped: {msg_og.id}" # --- Assert Content Equivalence (excluding fields expected to change) --- # Fields defining the core message content/intent: assert msg_og.role == msg_copy.role, f"Mismatch in 'role' at index {i}" assert msg_og.content == msg_copy.content, f"Mismatch in 'content' at index {i}" assert msg_og.model == msg_copy.model, f"Mismatch in 'model' at index {i}" assert msg_og.name == msg_copy.name, f"Mismatch in 'name' at index {i}" # Name might be role-based assert msg_og.tool_calls == msg_copy.tool_calls, f"Mismatch in 'tool_calls' at index {i}" assert msg_og.tool_returns == msg_copy.tool_returns, f"Mismatch in 'tool_returns' at index {i}" # Add other fields here if they should be identical across copies # --- Assert Context/Ownership Fields (verify they point to the *new* context) --- assert msg_copy.agent_id == copy_agent.id, f"Copied message at index {i} has wrong agent_id: {msg_copy.agent_id} != {copy_agent.id}" # Assuming organization_id should belong to the 'other_user' context if applicable # assert msg_copy.organization_id == other_user.organization_id # If relevant/expected # --- Optionally Assert Original Context Fields (verify they point to the *old* context) --- assert msg_og.agent_id == og_agent.id, f"Original message at index {i} has wrong agent_id: {msg_og.agent_id} != {og_agent.id}" # Sanity tests for our agent model_dump verifier helpers def test_sanity_identical_dicts(): d1 = {"name": "Alice", "age": 30, "details": {"city": "New York"}} d2 = {"name": "Alice", "age": 30, "details": {"city": "New York"}} assert _compare_agent_state_model_dump(d1, d2) def test_sanity_different_dicts(): d1 = {"name": "Alice", "age": 30} d2 = {"name": "Bob", "age": 30} assert not _compare_agent_state_model_dump(d1, d2) def test_sanity_ignored_id_fields(): d1 = {"id": "user-abc123", "name": "Alice"} d2 = {"id": "user-xyz789", "name": "Alice"} # Different ID, same prefix assert _compare_agent_state_model_dump(d1, d2) def test_sanity_different_id_prefix_fails(): d1 = {"id": "user-abc123"} d2 = {"id": "admin-xyz789"} # Different prefix assert not _compare_agent_state_model_dump(d1, d2) def test_sanity_nested_dicts(): d1 = {"user": {"id": "user-123", "name": "Alice"}} d2 = {"user": {"id": "user-456", "name": "Alice"}} # ID changes, but prefix matches assert _compare_agent_state_model_dump(d1, d2) def test_sanity_list_handling(): d1 = {"items": [1, 2, 3]} d2 = {"items": [1, 2, 3]} assert _compare_agent_state_model_dump(d1, d2) def test_sanity_list_mismatch(): d1 = {"items": [1, 2, 3]} d2 = {"items": [1, 2, 4]} assert not _compare_agent_state_model_dump(d1, d2) def test_sanity_message_ids_length_check(): d1 = {"message_ids": ["msg-123", "msg-456", "msg-789"]} d2 = {"message_ids": ["msg-abc", "msg-def", "msg-ghi"]} # Same length, different values assert _compare_agent_state_model_dump(d1, d2) def test_sanity_message_ids_different_length(): d1 = {"message_ids": ["msg-123", "msg-456"]} d2 = {"message_ids": ["msg-123"]} assert not _compare_agent_state_model_dump(d1, d2) def test_sanity_datetime_fields(): d1 = {"created_at": datetime(2025, 3, 4, 18, 25, 37, tzinfo=timezone.utc)} d2 = {"created_at": datetime(2025, 3, 4, 18, 25, 37, tzinfo=timezone.utc)} assert _compare_agent_state_model_dump(d1, d2) def test_sanity_datetime_mismatch(): d1 = {"created_at": datetime(2025, 3, 4, 18, 25, 37, tzinfo=timezone.utc)} d2 = {"created_at": datetime(2025, 3, 4, 18, 25, 38, tzinfo=timezone.utc)} # One second difference assert _compare_agent_state_model_dump(d1, d2) # Should ignore # Agent serialize/deserialize tests def test_deserialize_simple(local_client, server, serialize_test_agent, default_user, other_user): """Test deserializing JSON into an Agent instance.""" append_copy_suffix = False result = server.agent_manager.serialize(agent_id=serialize_test_agent.id, actor=default_user) # Deserialize the agent agent_copy = server.agent_manager.deserialize(serialized_agent=result, actor=other_user, append_copy_suffix=append_copy_suffix) # Compare serialized representations to check for exact match print_dict_diff(json.loads(serialize_test_agent.model_dump_json()), json.loads(agent_copy.model_dump_json())) assert compare_agent_state(server, serialize_test_agent, agent_copy, append_copy_suffix, default_user, other_user) @pytest.mark.parametrize("override_existing_tools", [True, False]) def test_deserialize_override_existing_tools( local_client, server, serialize_test_agent, default_user, weather_tool, print_tool, override_existing_tools ): """ Test deserializing an agent with tools and ensure correct behavior for overriding existing tools. """ append_copy_suffix = False result = server.agent_manager.serialize(agent_id=serialize_test_agent.id, actor=default_user) # Extract tools before upload tool_data_list = result.tools tool_names = {tool.name: tool for tool in tool_data_list} # Rewrite all the tool source code to the print_tool source code for tool in result.tools: tool.source_code = print_tool.source_code # Deserialize the agent with different override settings server.agent_manager.deserialize( serialized_agent=result, actor=default_user, append_copy_suffix=append_copy_suffix, override_existing_tools=override_existing_tools ) # Verify tool behavior for tool_name, expected_tool_data in tool_names.items(): existing_tool = server.tool_manager.get_tool_by_name(tool_name, actor=default_user) if existing_tool.tool_type in {ToolType.LETTA_CORE, ToolType.LETTA_MULTI_AGENT_CORE, ToolType.LETTA_MEMORY_CORE}: assert existing_tool.source_code != print_tool.source_code elif override_existing_tools: if existing_tool.name == weather_tool.name: assert existing_tool.source_code == print_tool.source_code, f"Tool {tool_name} should be overridden" else: assert existing_tool.source_code == weather_tool.source_code, f"Tool {tool_name} should NOT be overridden" def test_agent_serialize_with_user_messages(local_client, server, serialize_test_agent, default_user, other_user): """Test deserializing JSON into an Agent instance.""" append_copy_suffix = False server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="hello")] ) result = server.agent_manager.serialize(agent_id=serialize_test_agent.id, actor=default_user) # Deserialize the agent agent_copy = server.agent_manager.deserialize(serialized_agent=result, actor=other_user, append_copy_suffix=append_copy_suffix) # Get most recent original agent instance serialize_test_agent = server.agent_manager.get_agent_by_id(agent_id=serialize_test_agent.id, actor=default_user) # Compare serialized representations to check for exact match print_dict_diff(json.loads(serialize_test_agent.model_dump_json()), json.loads(agent_copy.model_dump_json())) assert compare_agent_state(server, serialize_test_agent, agent_copy, append_copy_suffix, default_user, other_user) # Make sure both agents can receive messages after server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="and hello again")], ) server.send_messages( actor=other_user, agent_id=agent_copy.id, input_messages=[MessageCreate(role=MessageRole.user, content="and hello again")] ) def test_agent_serialize_tool_calls(disable_e2b_api_key, local_client, server, serialize_test_agent, default_user, other_user): """Test deserializing JSON into an Agent instance.""" append_copy_suffix = False server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="What's the weather like in San Francisco?")], ) result = server.agent_manager.serialize(agent_id=serialize_test_agent.id, actor=default_user) # Deserialize the agent agent_copy = server.agent_manager.deserialize(serialized_agent=result, actor=other_user, append_copy_suffix=append_copy_suffix) # Get most recent original agent instance serialize_test_agent = server.agent_manager.get_agent_by_id(agent_id=serialize_test_agent.id, actor=default_user) # Compare serialized representations to check for exact match print_dict_diff(json.loads(serialize_test_agent.model_dump_json()), json.loads(agent_copy.model_dump_json())) assert compare_agent_state(server, serialize_test_agent, agent_copy, append_copy_suffix, default_user, other_user) # Make sure both agents can receive messages after original_agent_response = server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="What's the weather like in Seattle?")], ) copy_agent_response = server.send_messages( actor=other_user, agent_id=agent_copy.id, input_messages=[MessageCreate(role=MessageRole.user, content="What's the weather like in Seattle?")], ) assert original_agent_response.completion_tokens > 0 and original_agent_response.step_count > 0 assert copy_agent_response.completion_tokens > 0 and copy_agent_response.step_count > 0 def test_agent_serialize_update_blocks(disable_e2b_api_key, local_client, server, serialize_test_agent, default_user, other_user): """Test deserializing JSON into an Agent instance.""" append_copy_suffix = False server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="Append 'banana' to core_memory.")], ) server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="What do you think about that?")], ) result = server.agent_manager.serialize(agent_id=serialize_test_agent.id, actor=default_user) # Deserialize the agent agent_copy = server.agent_manager.deserialize(serialized_agent=result, actor=other_user, append_copy_suffix=append_copy_suffix) # Get most recent original agent instance serialize_test_agent = server.agent_manager.get_agent_by_id(agent_id=serialize_test_agent.id, actor=default_user) # Compare serialized representations to check for exact match print_dict_diff(json.loads(serialize_test_agent.model_dump_json()), json.loads(agent_copy.model_dump_json())) assert compare_agent_state(server, serialize_test_agent, agent_copy, append_copy_suffix, default_user, other_user) # Make sure both agents can receive messages after original_agent_response = server.send_messages( actor=default_user, agent_id=serialize_test_agent.id, input_messages=[MessageCreate(role=MessageRole.user, content="Hi")], ) copy_agent_response = server.send_messages( actor=other_user, agent_id=agent_copy.id, input_messages=[MessageCreate(role=MessageRole.user, content="Hi")], ) assert original_agent_response.completion_tokens > 0 and original_agent_response.step_count > 0 assert copy_agent_response.completion_tokens > 0 and copy_agent_response.step_count > 0 # FastAPI endpoint tests @pytest.mark.parametrize("append_copy_suffix", [True, False]) @pytest.mark.parametrize("project_id", ["project-12345", None]) def test_agent_download_upload_flow(fastapi_client, server, serialize_test_agent, default_user, other_user, append_copy_suffix, project_id): """ Test the full E2E serialization and deserialization flow using FastAPI endpoints. """ agent_id = serialize_test_agent.id # Step 1: Download the serialized agent response = fastapi_client.get(f"/v1/agents/{agent_id}/export", headers={"user_id": default_user.id}) assert response.status_code == 200, f"Download failed: {response.text}" # Ensure response matches expected schema response_json = response.json() agent_schema = AgentSchema.model_validate(response_json) # Validate as Pydantic model agent_json = agent_schema.model_dump(mode="json") # Convert back to serializable JSON # Step 2: Upload the serialized agent as a copy agent_bytes = BytesIO(json.dumps(agent_json).encode("utf-8")) files = {"file": ("agent.json", agent_bytes, "application/json")} upload_response = fastapi_client.post( "/v1/agents/import", headers={"user_id": other_user.id}, params={"append_copy_suffix": append_copy_suffix, "override_existing_tools": False, "project_id": project_id}, files=files, ) assert upload_response.status_code == 200, f"Upload failed: {upload_response.text}" # Sanity checks copied_agent = upload_response.json() copied_agent_id = copied_agent["id"] assert copied_agent_id != agent_id, "Copied agent should have a different ID" if append_copy_suffix: assert copied_agent["name"] == serialize_test_agent.name + "_copy", "Copied agent name should have '_copy' suffix" # Step 3: Retrieve the copied agent serialize_test_agent = server.agent_manager.get_agent_by_id(agent_id=serialize_test_agent.id, actor=default_user) agent_copy = server.agent_manager.get_agent_by_id(agent_id=copied_agent_id, actor=other_user) print_dict_diff(json.loads(serialize_test_agent.model_dump_json()), json.loads(agent_copy.model_dump_json())) assert compare_agent_state(server, serialize_test_agent, agent_copy, append_copy_suffix, default_user, other_user) @pytest.mark.parametrize( "filename", [ "composio_github_star_agent.af", "outreach_workflow_agent.af", "customer_service.af", "deep_research_agent.af", "memgpt_agent_with_convo.af", ], ) def test_upload_agentfile_from_disk(server, disable_e2b_api_key, fastapi_client, other_user, filename): """ Test uploading each .af file from the test_agent_files directory via FastAPI. """ file_path = os.path.join(os.path.dirname(__file__), "test_agent_files", filename) with open(file_path, "rb") as f: files = {"file": (filename, f, "application/json")} response = fastapi_client.post( "/v1/agents/import", headers={"user_id": other_user.id}, params={"append_copy_suffix": True, "override_existing_tools": False}, files=files, ) assert response.status_code == 200, f"Failed to upload {filename}: {response.text}" json_response = response.json() assert "id" in json_response and json_response["id"].startswith("agent-"), "Uploaded agent response is malformed" copied_agent_id = json_response["id"] server.send_messages( actor=other_user, agent_id=copied_agent_id, input_messages=[MessageCreate(role=MessageRole.user, content="Hello there!")], )