-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmemory_agent.py
More file actions
303 lines (258 loc) · 12 KB
/
memory_agent.py
File metadata and controls
303 lines (258 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
import asyncio
import contextvars
import json
import regex
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
from pydantic import BaseModel
from autogen.agentchat.conversable_agent import ConversableAgent
from autogen.agentchat.assistant_agent import AssistantAgent
from autogen.agentchat.agent import Agent
from autogen.oai.client import OpenAIWrapper
from autogen.io.base import IOStream
from autogen.formatting_utils import colored
class MemoryAgent(AssistantAgent):
DEFAULT_MEMORY_UPDATE_PROMPT = """Use the entire history of the groupchat (presented before this message) to populate and update the current memory json with factual imformation.
*** OUTPUT SHOULD ONLY BE VALID JSON.
Be very careful to not include anything that renders the output not directly loadable with json.loads(). ***
For context, current memory: {memory}.
Newest response by yourself: {new_response}
Updated memory in JSON format: """
DEFAULT_MEMORY_REPLY_PROMPT = """Using the above instruction, group chat history, and memory json with important information from the chat history, to solve the conversation delegation agent's task.
Respond according to the ***last instruction from Conversation delegation agent***.
For cuntext, current memory from chat based on your own outputs: {memory}.
Newest instruction: {instruction}.
Output: """
def __init__(
self,
structured_output: BaseModel,
memory_update_prompt: Optional[str] = None,
memory_reply_prompt: Optional[str] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.strucutred_output = structured_output
self.memory_json = self.strucutred_output().model_dump(mode='json')
self.memory_update_prompt = (
memory_update_prompt
if memory_update_prompt != None
else self.DEFAULT_MEMORY_UPDATE_PROMPT
)
self.memory_reply_prompt = (
memory_reply_prompt
if memory_reply_prompt != None
else self.DEFAULT_MEMORY_REPLY_PROMPT
)
self.replace_reply_func(
ConversableAgent.generate_oai_reply, MemoryAgent.generate_oai_reply
)
strucutred_output_config = self.llm_config.copy()
strucutred_output_config['config_list'][0].update(response_format = self.strucutred_output)
self.client_memory = OpenAIWrapper(**strucutred_output_config)
def memory_to_structured_output(self) -> BaseModel:
from pydantic import create_model
def dict_model(name:str,dict_def:dict):
fields = {}
for field_name,value in dict_def.items():
if isinstance(value,tuple):
fields[field_name]=value
elif isinstance(value,dict):
fields[field_name]=(dict_model(f'{name}_{field_name}',value),...)
else:
raise ValueError(f"Field {field_name}:{value} has invalid syntax")
return create_model(name,**fields)
model = dict_model("memory",self.memory)
return model
@property
def memory(self) -> dict:
"""Return the system message."""
return self.memory_json
@property
def memory_prompt(self) -> str:
"""Return the system message."""
return self.memory_prompt
def generate_oai_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[OpenAIWrapper] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
import pdb
"""Generate a reply using autogen.oai."""
client = self.client if config is None else config
if client is None:
return False, None
if messages is None:
messages = self._oai_messages[sender]
memory_instruction = self.memory_reply_prompt.format(
memory=self.memory_json, instruction=messages[-1]['content']
)
memory_instruction = [{"content": memory_instruction, "role": "user"}]
extracted_response = self._generate_oai_reply_from_client(
client,
self._oai_system_message + messages[:-1] + memory_instruction,
self.client_cache,
)
instruction = [
{
"content": self.memory_update_prompt.format(
memory=self.memory_json, new_response=extracted_response
),
"name": self.name,
"role": "user",
}
]
memory_response = self._generate_oai_reply_from_client(
self.client_memory, messages + instruction, self.client_cache
)
iostream = IOStream.get_default()
iostream.print(colored("***** raw Memory *****", "green"), flush=True)
iostream.print(memory_response, flush=True)
# pattern = regex.compile(r'\{(?:[^{}]|(?R))*\}')
# a = pattern.findall(memory_response.strip())
try:
self.memory_json = json.loads(memory_response["content"])
# print the message received
except:
iostream.print(colored("***** loaded Memory *****", "green"), flush=True)
iostream.print("illegal format", flush=True)
else:
iostream.print(colored("***** loaded Memory *****", "green"), flush=True)
iostream.print(self.memory_json, flush=True)
return (
(False, None) if extracted_response is None else (True, extracted_response)
)
async def a_generate_oai_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
"""Generate a reply using autogen.oai asynchronously."""
iostream = IOStream.get_default()
parent_context = contextvars.copy_context()
def _generate_oai_reply(
self, iostream: IOStream, *args: Any, **kwargs: Any
) -> Tuple[bool, Union[str, Dict, None]]:
with IOStream.set_default(iostream):
return self.generate_oai_reply(*args, **kwargs)
memory_instruction = self.memory_reply_prompt.format(memory=self.memory_json)
return await asyncio.get_event_loop().run_in_executor(
None,
lambda: parent_context.run(
_generate_oai_reply,
self=self,
iostream=iostream,
messages=messages + memory_instruction,
sender=sender,
config=config,
),
)
# def send(
# self,
# message: Union[Dict, str],
# recipient: Agent,
# request_reply: Optional[bool] = None,
# silent: Optional[bool] = False,
# ):
# """Send a message to another agent.
# Args:
# message (dict or str): message to be sent.
# The message could contain the following fields:
# - content (str or List): Required, the content of the message. (Can be None)
# - function_call (str): the name of the function to be called.
# - name (str): the name of the function to be called.
# - role (str): the role of the message, any role that is not "function"
# will be modified to "assistant".
# - context (dict): the context of the message, which will be passed to
# [OpenAIWrapper.create](../oai/client#create).
# For example, one agent can send a message A as:
# ```python
# {
# "content": lambda context: context["use_tool_msg"],
# "context": {
# "use_tool_msg": "Use tool X if they are relevant."
# }
# }
# ```
# Next time, one agent can send a message B with a different "use_tool_msg".
# Then the content of message A will be refreshed to the new "use_tool_msg".
# So effectively, this provides a way for an agent to send a "link" and modify
# the content of the "link" later.
# recipient (Agent): the recipient of the message.
# request_reply (bool or None): whether to request a reply from the recipient.
# silent (bool or None): (Experimental) whether to print the message sent.
# Raises:
# ValueError: if the message can't be converted into a valid ChatCompletion message.
# """
# memory = self._update_memory(messages=[message], sender=self)
# iostream = IOStream.get_default()
# # print the message received
# iostream.print(colored("***** Memory *****", "green"), flush=True)
# iostream.print(self.memory_json, flush=True)
# message = self._process_message_before_send(message, recipient, ConversableAgent._is_silent(self, silent))
# # When the agent composes and sends the message, the role of the message is "assistant"
# # unless it's "function".
# valid = self._append_oai_message(message, "assistant", recipient, is_sending=True)
# if valid:
# recipient.receive(message, self, request_reply, silent)
# else:
# raise ValueError(
# "Message can't be converted into a valid ChatCompletion message. Either content or function_call must be provided."
# )
# async def a_send(
# self,
# message: Union[Dict, str],
# recipient: Agent,
# request_reply: Optional[bool] = None,
# silent: Optional[bool] = False,
# ):
# """(async) Send a message to another agent.
# Args:
# message (dict or str): message to be sent.
# The message could contain the following fields:
# - content (str or List): Required, the content of the message. (Can be None)
# - function_call (str): the name of the function to be called.
# - name (str): the name of the function to be called.
# - role (str): the role of the message, any role that is not "function"
# will be modified to "assistant".
# - context (dict): the context of the message, which will be passed to
# [OpenAIWrapper.create](../oai/client#create).
# For example, one agent can send a message A as:
# ```python
# {
# "content": lambda context: context["use_tool_msg"],
# "context": {
# "use_tool_msg": "Use tool X if they are relevant."
# }
# }
# ```
# Next time, one agent can send a message B with a different "use_tool_msg".
# Then the content of message A will be refreshed to the new "use_tool_msg".
# So effectively, this provides a way for an agent to send a "link" and modify
# the content of the "link" later.
# recipient (Agent): the recipient of the message.
# request_reply (bool or None): whether to request a reply from the recipient.
# silent (bool or None): (Experimental) whether to print the message sent.
# Raises:
# ValueError: if the message can't be converted into a valid ChatCompletion message.
# """
# message = await self._a_process_message_before_send(
# message, recipient, self._is_silent(self, silent)
# )
# # When the agent composes and sends the message, the role of the message is "assistant"
# # unless it's "function".
# valid = self._append_oai_message(message, "assistant", recipient, is_sending=True)
# if valid:
# await recipient.a_receive(message, self, request_reply, silent)
# else:
# raise ValueError(
# "Message can't be converted into a valid ChatCompletion message. Either content or function_call must be provided."
# )