import json from .utils import get_local_time from .constants import INITIAL_BOOT_MESSAGE, INITIAL_BOOT_MESSAGE_SEND_MESSAGE_THOUGHT, INITIAL_BOOT_MESSAGE_SEND_MESSAGE_FIRST_MSG, MESSAGE_SUMMARY_WARNING_STR def get_initial_boot_messages(version='startup'): if version == 'startup': initial_boot_message = INITIAL_BOOT_MESSAGE messages = [ {"role": "assistant", "content": initial_boot_message}, ] elif version == 'startup_with_send_message': messages = [ # first message includes both inner monologue and function call to send_message { "role": "assistant", "content": INITIAL_BOOT_MESSAGE_SEND_MESSAGE_THOUGHT, "function_call": { "name": "send_message", "arguments": "{\n \"message\": \"" + f"{INITIAL_BOOT_MESSAGE_SEND_MESSAGE_FIRST_MSG}" + "\"\n}" } }, # obligatory function return message { "role": "function", "name": "send_message", "content": package_function_response(True, None) } ] elif version == 'startup_with_send_message_gpt35': messages = [ # first message includes both inner monologue and function call to send_message # { # "role": "assistant", # "content": INITIAL_BOOT_MESSAGE_SEND_MESSAGE_THOUGHT, # "function_call": { # "name": "send_message", # "arguments": "{\n \"message\": \"" + f"{INITIAL_BOOT_MESSAGE_SEND_MESSAGE_FIRST_MSG}" + "\"\n}" # } # }, # # obligatory function return message # { # "role": "function", # "name": "send_message", # "content": package_function_response(True, None) # }, # first message includes both inner monologue and function call to send_message { "role": "assistant", "content": "*inner thoughts* Still waiting on the user. Sending a message with function.", "function_call": { "name": "send_message", "arguments": "{\n \"message\": \"" + f"Hi, is anyone there?" + "\"\n}" } }, # obligatory function return message { "role": "function", "name": "send_message", "content": package_function_response(True, None) } ] else: raise ValueError(version) return messages def get_heartbeat(reason='Automated timer', include_location=False, location_name='San Francisco, CA, USA'): # Package the message with time and location formatted_time = get_local_time() packaged_message = { "type": 'heartbeat', "reason": reason, "time": formatted_time, } if include_location: packaged_message["location"] = location_name return json.dumps(packaged_message) def get_login_event(last_login='Never (first login)', include_location=False, location_name='San Francisco, CA, USA'): # Package the message with time and location formatted_time = get_local_time() packaged_message = { "type": 'login', "last_login": last_login, "time": formatted_time, } if include_location: packaged_message["location"] = location_name return json.dumps(packaged_message) def package_user_message(user_message, time=None, include_location=False, location_name='San Francisco, CA, USA'): # Package the message with time and location formatted_time = time if time else get_local_time() packaged_message = { "type": 'user_message', "message": user_message, "time": formatted_time, } if include_location: packaged_message["location"] = location_name return json.dumps(packaged_message) def package_function_response(was_success, response_string, timestamp=None): formatted_time = get_local_time() if timestamp is None else timestamp packaged_message = { "status": 'OK' if was_success else 'Failed', "message": response_string, "time": formatted_time, } return json.dumps(packaged_message) def package_summarize_message(summary, summary_length, hidden_message_count, total_message_count, timestamp=None): context_message = \ f"Note: prior messages ({hidden_message_count} of {total_message_count} total messages) have been hidden from view due to conversation memory constraints.\n" \ + f"The following is a summary of the previous {summary_length} messages:\n {summary}" formatted_time = get_local_time() if timestamp is None else timestamp packaged_message = { "type": 'system_alert', "message": context_message, "time": formatted_time, } return json.dumps(packaged_message) def package_summarize_message_no_summary(hidden_message_count, timestamp=None, message=None): """Add useful metadata to the summary message""" # Package the message with time and location formatted_time = get_local_time() if timestamp is None else timestamp context_message = message if message else \ f"Note: {hidden_message_count} prior messages with the user have been hidden from view due to conversation memory constraints. Older messages are stored in Recall Memory and can be viewed using functions." packaged_message = { "type": 'system_alert', "message": context_message, "time": formatted_time, } return json.dumps(packaged_message) def get_token_limit_warning(): formatted_time = get_local_time() packaged_message = { "type": 'system_alert', "message": MESSAGE_SUMMARY_WARNING_STR, "time": formatted_time, } return json.dumps(packaged_message)