mirror of
https://github.com/cpacker/MemGPT.git
synced 2025-06-03 04:30:22 +00:00
feat: add sleeptime tools to executor (#1840)
This commit is contained in:
parent
a23cb5601c
commit
88af5ef905
@ -12,7 +12,6 @@ from letta.services.tool_executor.tool_executor import (
|
||||
ExternalComposioToolExecutor,
|
||||
ExternalMCPToolExecutor,
|
||||
LettaCoreToolExecutor,
|
||||
LettaMemoryToolExecutor,
|
||||
LettaMultiAgentToolExecutor,
|
||||
SandboxToolExecutor,
|
||||
ToolExecutor,
|
||||
@ -26,8 +25,9 @@ class ToolExecutorFactory:
|
||||
|
||||
_executor_map: Dict[ToolType, Type[ToolExecutor]] = {
|
||||
ToolType.LETTA_CORE: LettaCoreToolExecutor,
|
||||
ToolType.LETTA_MEMORY_CORE: LettaCoreToolExecutor,
|
||||
ToolType.LETTA_SLEEPTIME_CORE: LettaCoreToolExecutor,
|
||||
ToolType.LETTA_MULTI_AGENT_CORE: LettaMultiAgentToolExecutor,
|
||||
ToolType.LETTA_MEMORY_CORE: LettaMemoryToolExecutor,
|
||||
ToolType.EXTERNAL_COMPOSIO: ExternalComposioToolExecutor,
|
||||
ToolType.EXTERNAL_MCP: ExternalMCPToolExecutor,
|
||||
}
|
||||
@ -35,13 +35,8 @@ class ToolExecutorFactory:
|
||||
@classmethod
|
||||
def get_executor(cls, tool_type: ToolType) -> ToolExecutor:
|
||||
"""Get the appropriate executor for the given tool type."""
|
||||
executor_class = cls._executor_map.get(tool_type)
|
||||
|
||||
if executor_class:
|
||||
return executor_class()
|
||||
|
||||
# Default to sandbox executor for unknown types
|
||||
return SandboxToolExecutor()
|
||||
executor_class = cls._executor_map.get(tool_type, SandboxToolExecutor)
|
||||
return executor_class()
|
||||
|
||||
|
||||
class ToolExecutionManager:
|
||||
|
@ -3,7 +3,7 @@ import traceback
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY, RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE
|
||||
from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY, CORE_MEMORY_LINE_NUMBER_WARNING, RETRIEVAL_QUERY_DEFAULT_PAGE_SIZE
|
||||
from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_function_annotations_from_source
|
||||
from letta.functions.helpers import execute_composio_action, generate_composio_action_from_func_name
|
||||
from letta.helpers.composio_helpers import get_composio_api_key
|
||||
@ -58,6 +58,12 @@ class LettaCoreToolExecutor(ToolExecutor):
|
||||
"conversation_search": self.conversation_search,
|
||||
"archival_memory_search": self.archival_memory_search,
|
||||
"archival_memory_insert": self.archival_memory_insert,
|
||||
"core_memory_append": self.core_memory_append,
|
||||
"core_memory_replace": self.core_memory_replace,
|
||||
"memory_replace": self.memory_replace,
|
||||
"memory_insert": self.memory_insert,
|
||||
"memory_rethink": self.memory_rethink,
|
||||
"memory_finish_edits": self.memory_finish_edits,
|
||||
}
|
||||
|
||||
if function_name not in function_map:
|
||||
@ -186,53 +192,7 @@ class LettaCoreToolExecutor(ToolExecutor):
|
||||
AgentManager().rebuild_system_prompt(agent_id=agent_state.id, actor=actor, force=True)
|
||||
return None
|
||||
|
||||
|
||||
class LettaMultiAgentToolExecutor(ToolExecutor):
|
||||
"""Executor for LETTA multi-agent core tools."""
|
||||
|
||||
# TODO: Implement
|
||||
# def execute(self, function_name: str, function_args: dict, agent: "Agent", tool: Tool) -> ToolExecutionResult:
|
||||
# callable_func = get_function_from_module(LETTA_MULTI_AGENT_TOOL_MODULE_NAME, function_name)
|
||||
# function_args["self"] = agent # need to attach self to arg since it's dynamically linked
|
||||
# function_response = callable_func(**function_args)
|
||||
# return ToolExecutionResult(func_return=function_response)
|
||||
|
||||
|
||||
class LettaMemoryToolExecutor(ToolExecutor):
|
||||
"""Executor for LETTA memory core tools with direct implementation."""
|
||||
|
||||
def execute(
|
||||
self,
|
||||
function_name: str,
|
||||
function_args: dict,
|
||||
agent_state: AgentState,
|
||||
tool: Tool,
|
||||
actor: User,
|
||||
sandbox_config: Optional[SandboxConfig] = None,
|
||||
sandbox_env_vars: Optional[Dict[str, Any]] = None,
|
||||
) -> ToolExecutionResult:
|
||||
# Map function names to method calls
|
||||
function_map = {
|
||||
"core_memory_append": self.core_memory_append,
|
||||
"core_memory_replace": self.core_memory_replace,
|
||||
}
|
||||
|
||||
if function_name not in function_map:
|
||||
raise ValueError(f"Unknown function: {function_name}")
|
||||
|
||||
# Execute the appropriate function with the copied state
|
||||
function_args_copy = function_args.copy() # Make a copy to avoid modifying the original
|
||||
function_response = function_map[function_name](agent_state, **function_args_copy)
|
||||
|
||||
# Update memory if changed
|
||||
AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
|
||||
return ToolExecutionResult(
|
||||
status="success",
|
||||
func_return=function_response,
|
||||
)
|
||||
|
||||
def core_memory_append(self, agent_state: "AgentState", label: str, content: str) -> Optional[str]:
|
||||
def core_memory_append(self, agent_state: "AgentState", actor: User, label: str, content: str) -> Optional[str]:
|
||||
"""
|
||||
Append to the contents of core memory.
|
||||
|
||||
@ -246,9 +206,17 @@ class LettaMemoryToolExecutor(ToolExecutor):
|
||||
current_value = str(agent_state.memory.get_block(label).value)
|
||||
new_value = current_value + "\n" + str(content)
|
||||
agent_state.memory.update_block_value(label=label, value=new_value)
|
||||
AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
return None
|
||||
|
||||
def core_memory_replace(self, agent_state: "AgentState", label: str, old_content: str, new_content: str) -> Optional[str]:
|
||||
def core_memory_replace(
|
||||
self,
|
||||
agent_state: "AgentState",
|
||||
actor: User,
|
||||
label: str,
|
||||
old_content: str,
|
||||
new_content: str,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Replace the contents of core memory. To delete memories, use an empty string for new_content.
|
||||
|
||||
@ -265,8 +233,253 @@ class LettaMemoryToolExecutor(ToolExecutor):
|
||||
raise ValueError(f"Old content '{old_content}' not found in memory block '{label}'")
|
||||
new_value = current_value.replace(str(old_content), str(new_content))
|
||||
agent_state.memory.update_block_value(label=label, value=new_value)
|
||||
AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
return None
|
||||
|
||||
def memory_replace(
|
||||
agent_state: "AgentState",
|
||||
actor: User,
|
||||
label: str,
|
||||
old_str: str,
|
||||
new_str: Optional[str] = None,
|
||||
) -> str:
|
||||
"""
|
||||
The memory_replace command allows you to replace a specific string in a memory
|
||||
block with a new string. This is used for making precise edits.
|
||||
|
||||
Args:
|
||||
label (str): Section of the memory to be edited, identified by its label.
|
||||
old_str (str): The text to replace (must match exactly, including whitespace
|
||||
and indentation).
|
||||
new_str (Optional[str]): The new text to insert in place of the old text.
|
||||
Omit this argument to delete the old_str.
|
||||
|
||||
Returns:
|
||||
str: The success message
|
||||
"""
|
||||
import re
|
||||
|
||||
if bool(re.search(r"\nLine \d+: ", old_str)):
|
||||
raise ValueError(
|
||||
"old_str contains a line number prefix, which is not allowed. "
|
||||
"Do not include line numbers when calling memory tools (line "
|
||||
"numbers are for display purposes only)."
|
||||
)
|
||||
if CORE_MEMORY_LINE_NUMBER_WARNING in old_str:
|
||||
raise ValueError(
|
||||
"old_str contains a line number warning, which is not allowed. "
|
||||
"Do not include line number information when calling memory tools "
|
||||
"(line numbers are for display purposes only)."
|
||||
)
|
||||
if bool(re.search(r"\nLine \d+: ", new_str)):
|
||||
raise ValueError(
|
||||
"new_str contains a line number prefix, which is not allowed. "
|
||||
"Do not include line numbers when calling memory tools (line "
|
||||
"numbers are for display purposes only)."
|
||||
)
|
||||
|
||||
old_str = str(old_str).expandtabs()
|
||||
new_str = str(new_str).expandtabs()
|
||||
current_value = str(agent_state.memory.get_block(label).value).expandtabs()
|
||||
|
||||
# Check if old_str is unique in the block
|
||||
occurences = current_value.count(old_str)
|
||||
if occurences == 0:
|
||||
raise ValueError(
|
||||
f"No replacement was performed, old_str `{old_str}` did not appear " f"verbatim in memory block with label `{label}`."
|
||||
)
|
||||
elif occurences > 1:
|
||||
content_value_lines = current_value.split("\n")
|
||||
lines = [idx + 1 for idx, line in enumerate(content_value_lines) if old_str in line]
|
||||
raise ValueError(
|
||||
f"No replacement was performed. Multiple occurrences of "
|
||||
f"old_str `{old_str}` in lines {lines}. Please ensure it is unique."
|
||||
)
|
||||
|
||||
# Replace old_str with new_str
|
||||
new_value = current_value.replace(str(old_str), str(new_str))
|
||||
|
||||
# Write the new content to the block
|
||||
agent_state.memory.update_block_value(label=label, value=new_value)
|
||||
|
||||
AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
|
||||
# Create a snippet of the edited section
|
||||
SNIPPET_LINES = 3
|
||||
replacement_line = current_value.split(old_str)[0].count("\n")
|
||||
start_line = max(0, replacement_line - SNIPPET_LINES)
|
||||
end_line = replacement_line + SNIPPET_LINES + new_str.count("\n")
|
||||
snippet = "\n".join(new_value.split("\n")[start_line : end_line + 1])
|
||||
|
||||
# Prepare the success message
|
||||
success_msg = f"The core memory block with label `{label}` has been edited. "
|
||||
# success_msg += self._make_output(
|
||||
# snippet, f"a snippet of {path}", start_line + 1
|
||||
# )
|
||||
# success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n"
|
||||
success_msg += (
|
||||
"Review the changes and make sure they are as expected (correct indentation, "
|
||||
"no duplicate lines, etc). Edit the memory block again if necessary."
|
||||
)
|
||||
|
||||
# return None
|
||||
return success_msg
|
||||
|
||||
def memory_insert(
|
||||
agent_state: "AgentState",
|
||||
actor: User,
|
||||
label: str,
|
||||
new_str: str,
|
||||
insert_line: int = -1,
|
||||
) -> str:
|
||||
"""
|
||||
The memory_insert command allows you to insert text at a specific location
|
||||
in a memory block.
|
||||
|
||||
Args:
|
||||
label (str): Section of the memory to be edited, identified by its label.
|
||||
new_str (str): The text to insert.
|
||||
insert_line (int): The line number after which to insert the text (0 for
|
||||
beginning of file). Defaults to -1 (end of the file).
|
||||
|
||||
Returns:
|
||||
str: The success message
|
||||
"""
|
||||
import re
|
||||
|
||||
if bool(re.search(r"\nLine \d+: ", new_str)):
|
||||
raise ValueError(
|
||||
"new_str contains a line number prefix, which is not allowed. Do not "
|
||||
"include line numbers when calling memory tools (line numbers are for "
|
||||
"display purposes only)."
|
||||
)
|
||||
if CORE_MEMORY_LINE_NUMBER_WARNING in new_str:
|
||||
raise ValueError(
|
||||
"new_str contains a line number warning, which is not allowed. Do not "
|
||||
"include line number information when calling memory tools (line numbers "
|
||||
"are for display purposes only)."
|
||||
)
|
||||
|
||||
current_value = str(agent_state.memory.get_block(label).value).expandtabs()
|
||||
new_str = str(new_str).expandtabs()
|
||||
current_value_lines = current_value.split("\n")
|
||||
n_lines = len(current_value_lines)
|
||||
|
||||
# Check if we're in range, from 0 (pre-line), to 1 (first line), to n_lines (last line)
|
||||
if insert_line < 0 or insert_line > n_lines:
|
||||
raise ValueError(
|
||||
f"Invalid `insert_line` parameter: {insert_line}. It should be within "
|
||||
f"the range of lines of the memory block: {[0, n_lines]}, or -1 to "
|
||||
f"append to the end of the memory block."
|
||||
)
|
||||
|
||||
# Insert the new string as a line
|
||||
SNIPPET_LINES = 3
|
||||
new_str_lines = new_str.split("\n")
|
||||
new_value_lines = current_value_lines[:insert_line] + new_str_lines + current_value_lines[insert_line:]
|
||||
snippet_lines = (
|
||||
current_value_lines[max(0, insert_line - SNIPPET_LINES) : insert_line]
|
||||
+ new_str_lines
|
||||
+ current_value_lines[insert_line : insert_line + SNIPPET_LINES]
|
||||
)
|
||||
|
||||
# Collate into the new value to update
|
||||
new_value = "\n".join(new_value_lines)
|
||||
snippet = "\n".join(snippet_lines)
|
||||
|
||||
# Write into the block
|
||||
agent_state.memory.update_block_value(label=label, value=new_value)
|
||||
|
||||
AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
|
||||
# Prepare the success message
|
||||
success_msg = f"The core memory block with label `{label}` has been edited. "
|
||||
# success_msg += self._make_output(
|
||||
# snippet,
|
||||
# "a snippet of the edited file",
|
||||
# max(1, insert_line - SNIPPET_LINES + 1),
|
||||
# )
|
||||
# success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n"
|
||||
success_msg += (
|
||||
"Review the changes and make sure they are as expected (correct indentation, "
|
||||
"no duplicate lines, etc). Edit the memory block again if necessary."
|
||||
)
|
||||
|
||||
return success_msg
|
||||
|
||||
def memory_rethink(agent_state: "AgentState", actor: User, label: str, new_memory: str) -> str:
|
||||
"""
|
||||
The memory_rethink command allows you to completely rewrite the contents of a
|
||||
memory block. Use this tool to make large sweeping changes (e.g. when you want
|
||||
to condense or reorganize the memory blocks), do NOT use this tool to make small
|
||||
precise edits (e.g. add or remove a line, replace a specific string, etc).
|
||||
|
||||
Args:
|
||||
label (str): The memory block to be rewritten, identified by its label.
|
||||
new_memory (str): The new memory contents with information integrated from
|
||||
existing memory blocks and the conversation context.
|
||||
|
||||
Returns:
|
||||
str: The success message
|
||||
"""
|
||||
import re
|
||||
|
||||
if bool(re.search(r"\nLine \d+: ", new_memory)):
|
||||
raise ValueError(
|
||||
"new_memory contains a line number prefix, which is not allowed. Do not "
|
||||
"include line numbers when calling memory tools (line numbers are for "
|
||||
"display purposes only)."
|
||||
)
|
||||
if CORE_MEMORY_LINE_NUMBER_WARNING in new_memory:
|
||||
raise ValueError(
|
||||
"new_memory contains a line number warning, which is not allowed. Do not "
|
||||
"include line number information when calling memory tools (line numbers "
|
||||
"are for display purposes only)."
|
||||
)
|
||||
|
||||
if agent_state.memory.get_block(label) is None:
|
||||
agent_state.memory.create_block(label=label, value=new_memory)
|
||||
|
||||
agent_state.memory.update_block_value(label=label, value=new_memory)
|
||||
|
||||
AgentManager().update_memory_if_changed(agent_id=agent_state.id, new_memory=agent_state.memory, actor=actor)
|
||||
|
||||
# Prepare the success message
|
||||
success_msg = f"The core memory block with label `{label}` has been edited. "
|
||||
# success_msg += self._make_output(
|
||||
# snippet, f"a snippet of {path}", start_line + 1
|
||||
# )
|
||||
# success_msg += f"A snippet of core memory block `{label}`:\n{snippet}\n"
|
||||
success_msg += (
|
||||
"Review the changes and make sure they are as expected (correct indentation, "
|
||||
"no duplicate lines, etc). Edit the memory block again if necessary."
|
||||
)
|
||||
|
||||
# return None
|
||||
return success_msg
|
||||
|
||||
def memory_finish_edits(agent_state: "AgentState") -> None:
|
||||
"""
|
||||
Call the memory_finish_edits command when you are finished making edits
|
||||
(integrating all new information) into the memory blocks. This function
|
||||
is called when the agent is done rethinking the memory.
|
||||
|
||||
Returns:
|
||||
Optional[str]: None is always returned as this function does not produce a response.
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
class LettaMultiAgentToolExecutor(ToolExecutor):
|
||||
"""Executor for LETTA multi-agent core tools."""
|
||||
|
||||
# TODO: Implement
|
||||
# def execute(self, function_name: str, function_args: dict, agent: "Agent", tool: Tool) -> ToolExecutionResult:
|
||||
# callable_func = get_function_from_module(LETTA_MULTI_AGENT_TOOL_MODULE_NAME, function_name)
|
||||
# function_args["self"] = agent # need to attach self to arg since it's dynamically linked
|
||||
# function_response = callable_func(**function_args)
|
||||
# return ToolExecutionResult(func_return=function_response)
|
||||
|
||||
|
||||
class ExternalComposioToolExecutor(ToolExecutor):
|
||||
"""Executor for external Composio tools."""
|
||||
|
Loading…
Reference in New Issue
Block a user