mirror of
https://github.com/cpacker/MemGPT.git
synced 2025-06-03 04:30:22 +00:00
105 lines
3.9 KiB
Python
105 lines
3.9 KiB
Python
import json
|
|
from typing import Optional, Union
|
|
|
|
from letta.agent import Agent
|
|
from letta.interface import AgentInterface
|
|
from letta.orm.group import Group
|
|
from letta.orm.user import User
|
|
from letta.schemas.agent import AgentState
|
|
from letta.schemas.group import ManagerType
|
|
from letta.schemas.message import Message
|
|
|
|
|
|
def load_multi_agent(
|
|
group: Group,
|
|
agent_state: Optional[AgentState],
|
|
actor: User,
|
|
interface: Union[AgentInterface, None] = None,
|
|
) -> Agent:
|
|
if len(group.agent_ids) == 0:
|
|
raise ValueError("Empty group: group must have at least one agent")
|
|
|
|
if not agent_state:
|
|
raise ValueError("Empty manager agent state: manager agent state must be provided")
|
|
|
|
match group.manager_type:
|
|
case ManagerType.round_robin:
|
|
from letta.groups.round_robin_multi_agent import RoundRobinMultiAgent
|
|
|
|
return RoundRobinMultiAgent(
|
|
agent_state=agent_state,
|
|
interface=interface,
|
|
user=actor,
|
|
group_id=group.id,
|
|
agent_ids=group.agent_ids,
|
|
description=group.description,
|
|
max_turns=group.max_turns,
|
|
)
|
|
case ManagerType.dynamic:
|
|
from letta.groups.dynamic_multi_agent import DynamicMultiAgent
|
|
|
|
return DynamicMultiAgent(
|
|
agent_state=agent_state,
|
|
interface=interface,
|
|
user=actor,
|
|
group_id=group.id,
|
|
agent_ids=group.agent_ids,
|
|
description=group.description,
|
|
max_turns=group.max_turns,
|
|
termination_token=group.termination_token,
|
|
)
|
|
case ManagerType.supervisor:
|
|
from letta.groups.supervisor_multi_agent import SupervisorMultiAgent
|
|
|
|
return SupervisorMultiAgent(
|
|
agent_state=agent_state,
|
|
interface=interface,
|
|
user=actor,
|
|
group_id=group.id,
|
|
agent_ids=group.agent_ids,
|
|
description=group.description,
|
|
)
|
|
case ManagerType.background:
|
|
from letta.groups.background_multi_agent import BackgroundMultiAgent
|
|
|
|
return BackgroundMultiAgent(
|
|
agent_state=agent_state,
|
|
interface=interface,
|
|
user=actor,
|
|
group_id=group.id,
|
|
agent_ids=group.agent_ids,
|
|
description=group.description,
|
|
background_agents_interval=group.background_agents_interval,
|
|
)
|
|
case _:
|
|
raise ValueError(f"Type {group.manager_type} is not supported.")
|
|
|
|
|
|
def stringify_message(message: Message) -> str | None:
|
|
if message.role == "user":
|
|
content = json.loads(message.content[0].text)
|
|
if content["type"] == "user_message":
|
|
return f"{message.name or 'user'}: {content['message']}"
|
|
else:
|
|
return None
|
|
elif message.role == "assistant":
|
|
messages = []
|
|
if message.content:
|
|
messages.append(f"{message.name or 'assistant'}: *thinking* {message.content[0].text}")
|
|
if message.tool_calls:
|
|
if message.tool_calls[0].function.name == "send_message":
|
|
messages.append(f"{message.name or 'assistant'}: {json.loads(message.tool_calls[0].function.arguments)['message']}")
|
|
else:
|
|
messages.append(f"{message.name or 'assistant'}: Calling tool {message.tool_calls[0].function.name}")
|
|
return "\n".join(messages)
|
|
elif message.role == "tool":
|
|
if message.content:
|
|
content = json.loads(message.content[0].text)
|
|
if content["message"] != "None" and content["message"] != None:
|
|
return f"{message.name or 'assistant'}: Tool call returned {content['message']}"
|
|
return None
|
|
elif message.role == "system":
|
|
return None
|
|
|
|
return f"{message.name or 'user'}: {message.content[0].text}"
|