mirror of
https://github.com/cpacker/MemGPT.git
synced 2025-06-03 04:30:22 +00:00
fix: Improve test tool schema gen runtime (#1719)
This commit is contained in:
parent
6a461017f8
commit
0437273211
@ -1,7 +1,10 @@
|
||||
import importlib.util
|
||||
import inspect
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
@ -79,107 +82,198 @@ def _run_schema_test(schema_name: str, desired_function_name: str, expect_struct
|
||||
|
||||
_compare_schemas(structured_output, expected_structured_output, strip_heartbeat=False)
|
||||
|
||||
return (schema_name, True) # Return success status
|
||||
|
||||
|
||||
def test_derive_openai_json_schema():
|
||||
"""Test that the schema generator works across a variety of example source code inputs."""
|
||||
|
||||
print("==== TESTING basic example where the arg is a pydantic model ====")
|
||||
_run_schema_test("pydantic_as_single_arg_example", "create_step")
|
||||
# Define test cases
|
||||
test_cases = [
|
||||
("pydantic_as_single_arg_example", "create_step", False),
|
||||
("list_of_pydantic_example", "create_task_plan", False),
|
||||
("nested_pydantic_as_arg_example", "create_task_plan", False),
|
||||
("simple_d20", "roll_d20", False),
|
||||
("all_python_complex", "check_order_status", True),
|
||||
("all_python_complex_nodict", "check_order_status", False),
|
||||
]
|
||||
|
||||
print("==== TESTING basic example where the arg is a list of pydantic models ====")
|
||||
_run_schema_test("list_of_pydantic_example", "create_task_plan")
|
||||
# Create a multiprocessing pool
|
||||
pool = mp.Pool(processes=min(mp.cpu_count(), len(test_cases)))
|
||||
|
||||
print("==== TESTING more complex example where the arg is a nested pydantic model ====")
|
||||
_run_schema_test("nested_pydantic_as_arg_example", "create_task_plan")
|
||||
# Run tests in parallel
|
||||
results = []
|
||||
for schema_name, function_name, expect_fail in test_cases:
|
||||
print(f"==== TESTING {schema_name} ====")
|
||||
# Use apply_async for non-blocking parallel execution
|
||||
result = pool.apply_async(_run_schema_test, args=(schema_name, function_name, expect_fail))
|
||||
results.append((schema_name, result))
|
||||
|
||||
print("==== TESTING simple function with no args ====")
|
||||
_run_schema_test("simple_d20", "roll_d20")
|
||||
# Collect results and check for failures
|
||||
for schema_name, result in results:
|
||||
try:
|
||||
schema_name_result, success = result.get(timeout=60) # Wait for the result with timeout
|
||||
assert success, f"Test for {schema_name} failed"
|
||||
print(f"Test for {schema_name} passed")
|
||||
except Exception as e:
|
||||
print(f"Test for {schema_name} failed with error: {str(e)}")
|
||||
raise
|
||||
|
||||
print("==== TESTING complex function with many args ====")
|
||||
_run_schema_test("all_python_complex", "check_order_status", expect_structured_output_fail=True)
|
||||
|
||||
print("==== TESTING complex function with many args and no dict ====")
|
||||
# TODO we should properly cast Optionals into union nulls
|
||||
# Currently, we just disregard all Optional types on the conversion path
|
||||
_run_schema_test("all_python_complex_nodict", "check_order_status")
|
||||
# Close the pool
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
|
||||
def _openai_payload(model: str, schema: dict, structured_output: bool):
|
||||
def _openai_payload(test_config):
|
||||
"""Create an OpenAI payload with a tool call.
|
||||
|
||||
Raw version of openai_chat_completions_request w/o pydantic models
|
||||
Args:
|
||||
test_config: A tuple containing (filename, model, structured_output)
|
||||
|
||||
Returns:
|
||||
A tuple of (filename, model, structured_output, success, error_message)
|
||||
"""
|
||||
|
||||
if structured_output:
|
||||
tool_schema = convert_to_structured_output(schema)
|
||||
else:
|
||||
tool_schema = schema
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
assert api_key is not None, "OPENAI_API_KEY must be set"
|
||||
|
||||
# Simple system prompt to encourage the LLM to jump directly to a tool call
|
||||
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
||||
data = {
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
],
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": tool_schema,
|
||||
}
|
||||
],
|
||||
"tool_choice": "auto", # TODO force the tool call on the one we want
|
||||
# NOTE: disabled for simplicity
|
||||
"parallel_tool_calls": False,
|
||||
}
|
||||
|
||||
print("Request:\n", json.dumps(data, indent=2), "\n\n")
|
||||
filename, model, structured_output = test_config
|
||||
success = False
|
||||
error_message = None
|
||||
|
||||
try:
|
||||
# Load schema
|
||||
with open(os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{filename}.py"), "r") as file:
|
||||
source_code = file.read()
|
||||
|
||||
schema = derive_openai_json_schema(source_code)
|
||||
|
||||
# Check if we expect the conversion to fail
|
||||
if filename == "all_python_complex" and structured_output:
|
||||
try:
|
||||
convert_to_structured_output(schema)
|
||||
error_message = "Expected ValueError for all_python_complex with structured_output=True"
|
||||
return (filename, model, structured_output, False, error_message)
|
||||
except ValueError:
|
||||
# This is expected
|
||||
success = True
|
||||
return (filename, model, structured_output, success, error_message)
|
||||
|
||||
# Generate tool schema
|
||||
if structured_output:
|
||||
tool_schema = convert_to_structured_output(schema)
|
||||
else:
|
||||
tool_schema = schema
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
assert api_key is not None, "OPENAI_API_KEY must be set"
|
||||
|
||||
# Simple system prompt to encourage the LLM to jump directly to a tool call
|
||||
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
||||
data = {
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
],
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": tool_schema,
|
||||
}
|
||||
],
|
||||
"tool_choice": "auto",
|
||||
"parallel_tool_calls": False,
|
||||
}
|
||||
|
||||
make_post_request(url, headers, data)
|
||||
success = True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Request failed, tool_schema=\n{json.dumps(tool_schema, indent=2)}")
|
||||
print(f"Error: {e}")
|
||||
raise e
|
||||
error_message = str(e)
|
||||
|
||||
return (filename, model, structured_output, success, error_message)
|
||||
|
||||
|
||||
def _load_schema_from_source_filename(filename: str) -> dict:
|
||||
with open(os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{filename}.py"), "r") as file:
|
||||
source_code = file.read()
|
||||
|
||||
return derive_openai_json_schema(source_code)
|
||||
|
||||
|
||||
# @pytest.mark.parametrize("openai_model", ["gpt-4o-mini"])
|
||||
# @pytest.mark.parametrize("structured_output", [True])
|
||||
@pytest.mark.parametrize("openai_model", ["gpt-4", "gpt-4o"])
|
||||
@pytest.mark.parametrize("openai_model", ["gpt-4o"])
|
||||
@pytest.mark.parametrize("structured_output", [True, False])
|
||||
def test_valid_schemas_via_openai(openai_model: str, structured_output: bool):
|
||||
"""Test that we can send the schemas to OpenAI and get a tool call back."""
|
||||
|
||||
for filename in [
|
||||
start_time = time.time()
|
||||
|
||||
# Define all test configurations
|
||||
filenames = [
|
||||
"pydantic_as_single_arg_example",
|
||||
"list_of_pydantic_example",
|
||||
"nested_pydantic_as_arg_example",
|
||||
"simple_d20",
|
||||
"all_python_complex",
|
||||
"all_python_complex_nodict",
|
||||
]:
|
||||
print(f"==== TESTING OPENAI PAYLOAD FOR {openai_model} + {filename} ====")
|
||||
schema = _load_schema_from_source_filename(filename)
|
||||
]
|
||||
|
||||
# We should expect the all_python_complex one to fail when structured_output=True
|
||||
if filename == "all_python_complex" and structured_output:
|
||||
with pytest.raises(ValueError):
|
||||
_openai_payload(openai_model, schema, structured_output)
|
||||
test_configs = []
|
||||
for filename in filenames:
|
||||
test_configs.append((filename, openai_model, structured_output))
|
||||
|
||||
# Run tests in parallel using a process pool (more efficient for API calls)
|
||||
pool = mp.Pool(processes=min(mp.cpu_count(), len(test_configs)))
|
||||
results = pool.map(_openai_payload, test_configs)
|
||||
|
||||
# Check results and handle failures
|
||||
for filename, model, structured, success, error_message in results:
|
||||
print(f"Test for {filename}, {model}, structured_output={structured}: {'SUCCESS' if success else 'FAILED'}")
|
||||
|
||||
if not success:
|
||||
if filename == "all_python_complex" and structured and "Expected ValueError" in error_message:
|
||||
pytest.fail(f"Failed for {filename} with {model}, structured_output={structured}: {error_message}")
|
||||
elif not (filename == "all_python_complex" and structured):
|
||||
pytest.fail(f"Failed for {filename} with {model}, structured_output={structured}: {error_message}")
|
||||
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
end_time = time.time()
|
||||
print(f"Total execution time: {end_time - start_time:.2f} seconds")
|
||||
|
||||
|
||||
# Parallel implementation for Composio test
|
||||
def _run_composio_test(action_name, openai_model, structured_output):
|
||||
"""Run a single Composio test case in parallel"""
|
||||
try:
|
||||
tool_create = ToolCreate.from_composio(action_name=action_name)
|
||||
assert tool_create.json_schema
|
||||
schema = tool_create.json_schema
|
||||
|
||||
if structured_output:
|
||||
tool_schema = convert_to_structured_output(schema)
|
||||
else:
|
||||
_openai_payload(openai_model, schema, structured_output)
|
||||
tool_schema = schema
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
assert api_key is not None, "OPENAI_API_KEY must be set"
|
||||
|
||||
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
||||
data = {
|
||||
"model": openai_model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
],
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": tool_schema,
|
||||
}
|
||||
],
|
||||
"tool_choice": "auto",
|
||||
"parallel_tool_calls": False,
|
||||
}
|
||||
|
||||
make_post_request(url, headers, data)
|
||||
return (action_name, True, None) # Success
|
||||
except Exception as e:
|
||||
return (action_name, False, str(e)) # Failure with error message
|
||||
|
||||
|
||||
@pytest.mark.parametrize("openai_model", ["gpt-4o-mini"])
|
||||
@ -190,24 +284,32 @@ def test_composio_tool_schema_generation(openai_model: str, structured_output: b
|
||||
if not os.getenv("COMPOSIO_API_KEY"):
|
||||
pytest.skip("COMPOSIO_API_KEY not set")
|
||||
|
||||
for action_name in [
|
||||
start_time = time.time()
|
||||
|
||||
action_names = [
|
||||
"GITHUB_STAR_A_REPOSITORY_FOR_THE_AUTHENTICATED_USER", # Simple
|
||||
"CAL_GET_AVAILABLE_SLOTS_INFO", # has an array arg, needs to be converted properly
|
||||
"SALESFORCE_RETRIEVE_LEAD_DETAILS_BY_ID_WITH_CONDITIONAL_SUPPORT", # has an array arg, needs to be converted properly
|
||||
]:
|
||||
tool_create = ToolCreate.from_composio(action_name=action_name)
|
||||
"SALESFORCE_RETRIEVE_LEAD_DETAILS_BY_ID_WITH_CONDITIONAL_SUPPORT",
|
||||
# has an array arg, needs to be converted properly
|
||||
]
|
||||
|
||||
assert tool_create.json_schema
|
||||
schema = tool_create.json_schema
|
||||
print(f"The schema for {action_name}: {json.dumps(schema, indent=4)}\n\n")
|
||||
# Create a pool of processes
|
||||
pool = mp.Pool(processes=min(mp.cpu_count(), len(action_names)))
|
||||
|
||||
try:
|
||||
_openai_payload(openai_model, schema, structured_output)
|
||||
print(f"Successfully called OpenAI using schema {schema} generated from {action_name}\n\n")
|
||||
except:
|
||||
print(f"Failed to call OpenAI using schema {schema} generated from {action_name}\n\n")
|
||||
# Map the work to the pool
|
||||
func = partial(_run_composio_test, openai_model=openai_model, structured_output=structured_output)
|
||||
results = pool.map(func, action_names)
|
||||
|
||||
raise
|
||||
# Check results
|
||||
for action_name, success, error_message in results:
|
||||
print(f"Test for {action_name}: {'SUCCESS' if success else 'FAILED - ' + error_message}")
|
||||
assert success, f"Test for {action_name} failed: {error_message}"
|
||||
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
end_time = time.time()
|
||||
print(f"Total execution time: {end_time - start_time:.2f} seconds")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("openai_model", ["gpt-4o-mini"])
|
||||
@ -230,27 +332,44 @@ def test_langchain_tool_schema_generation(openai_model: str, structured_output:
|
||||
print(f"The schema for {langchain_tool.name}: {json.dumps(schema, indent=4)}\n\n")
|
||||
|
||||
try:
|
||||
_openai_payload(openai_model, schema, structured_output)
|
||||
print(f"Successfully called OpenAI using schema {schema} generated from {langchain_tool.name}\n\n")
|
||||
except:
|
||||
print(f"Failed to call OpenAI using schema {schema} generated from {langchain_tool.name}\n\n")
|
||||
if structured_output:
|
||||
tool_schema = convert_to_structured_output(schema)
|
||||
else:
|
||||
tool_schema = schema
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
assert api_key is not None, "OPENAI_API_KEY must be set"
|
||||
|
||||
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
||||
data = {
|
||||
"model": openai_model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
],
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": tool_schema,
|
||||
}
|
||||
],
|
||||
"tool_choice": "auto",
|
||||
"parallel_tool_calls": False,
|
||||
}
|
||||
|
||||
make_post_request(url, headers, data)
|
||||
print(f"Successfully called OpenAI using schema generated from {langchain_tool.name}\n\n")
|
||||
except Exception:
|
||||
print(f"Failed to call OpenAI using schema generated from {langchain_tool.name}\n\n")
|
||||
raise
|
||||
|
||||
|
||||
@pytest.mark.parametrize("openai_model", ["gpt-4", "gpt-4o"])
|
||||
@pytest.mark.parametrize("structured_output", [True, False])
|
||||
def test_valid_schemas_with_pydantic_args_schema(openai_model: str, structured_output: bool):
|
||||
"""Test that we can send the schemas to OpenAI and get a tool call back."""
|
||||
|
||||
for filename in [
|
||||
"pydantic_as_single_arg_example",
|
||||
"list_of_pydantic_example",
|
||||
"nested_pydantic_as_arg_example",
|
||||
"simple_d20",
|
||||
"all_python_complex",
|
||||
"all_python_complex_nodict",
|
||||
]:
|
||||
# Helper function for pydantic args schema test
|
||||
def _run_pydantic_args_test(filename, openai_model, structured_output):
|
||||
"""Run a single pydantic args schema test case"""
|
||||
try:
|
||||
# Import the module dynamically
|
||||
file_path = os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{filename}.py")
|
||||
spec = importlib.util.spec_from_file_location(filename, file_path)
|
||||
@ -278,11 +397,83 @@ def test_valid_schemas_with_pydantic_args_schema(openai_model: str, structured_o
|
||||
)
|
||||
schema = tool.json_schema
|
||||
|
||||
print(f"==== TESTING OPENAI PAYLOAD FOR {openai_model} + {filename} ====")
|
||||
|
||||
# We should expect the all_python_complex one to fail when structured_output=True
|
||||
# We expect this to fail for all_python_complex with structured_output=True
|
||||
if filename == "all_python_complex" and structured_output:
|
||||
with pytest.raises(ValueError):
|
||||
_openai_payload(openai_model, schema, structured_output)
|
||||
try:
|
||||
convert_to_structured_output(schema)
|
||||
return (filename, False, "Expected ValueError but conversion succeeded")
|
||||
except ValueError:
|
||||
return (filename, True, None) # This is expected
|
||||
|
||||
# Make the API call
|
||||
if structured_output:
|
||||
tool_schema = convert_to_structured_output(schema)
|
||||
else:
|
||||
_openai_payload(openai_model, schema, structured_output)
|
||||
tool_schema = schema
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
assert api_key is not None, "OPENAI_API_KEY must be set"
|
||||
|
||||
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
||||
|
||||
url = "https://api.openai.com/v1/chat/completions"
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
||||
data = {
|
||||
"model": openai_model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
],
|
||||
"tools": [
|
||||
{
|
||||
"type": "function",
|
||||
"function": tool_schema,
|
||||
}
|
||||
],
|
||||
"tool_choice": "auto",
|
||||
"parallel_tool_calls": False,
|
||||
}
|
||||
|
||||
make_post_request(url, headers, data)
|
||||
return (filename, True, None) # Success
|
||||
except Exception as e:
|
||||
return (filename, False, str(e)) # Failure with error message
|
||||
|
||||
|
||||
@pytest.mark.parametrize("openai_model", ["gpt-4o"])
|
||||
@pytest.mark.parametrize("structured_output", [True, False])
|
||||
def test_valid_schemas_with_pydantic_args_schema(openai_model: str, structured_output: bool):
|
||||
"""Test that we can send the schemas to OpenAI and get a tool call back."""
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
filenames = [
|
||||
"pydantic_as_single_arg_example",
|
||||
"list_of_pydantic_example",
|
||||
"nested_pydantic_as_arg_example",
|
||||
"simple_d20",
|
||||
"all_python_complex",
|
||||
"all_python_complex_nodict",
|
||||
]
|
||||
|
||||
# Create a pool of processes
|
||||
pool = mp.Pool(processes=min(mp.cpu_count(), len(filenames)))
|
||||
|
||||
# Map the work to the pool
|
||||
func = partial(_run_pydantic_args_test, openai_model=openai_model, structured_output=structured_output)
|
||||
results = pool.map(func, filenames)
|
||||
|
||||
# Check results
|
||||
for filename, success, error_message in results:
|
||||
print(f"Test for {filename}: {'SUCCESS' if success else 'FAILED - ' + error_message}")
|
||||
|
||||
# Special handling for expected failure
|
||||
if filename == "all_python_complex" and structured_output:
|
||||
assert success, f"Expected failure handling for {filename} didn't work: {error_message}"
|
||||
else:
|
||||
assert success, f"Test for {filename} failed: {error_message}"
|
||||
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
end_time = time.time()
|
||||
print(f"Total execution time: {end_time - start_time:.2f} seconds")
|
||||
|
Loading…
Reference in New Issue
Block a user