forked from eden-network/tx-explain
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexplain.py
More file actions
146 lines (126 loc) · 5.94 KB
/
explain.py
File metadata and controls
146 lines (126 loc) · 5.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import json
import asyncio
import argparse
from datetime import datetime
from dotenv import load_dotenv
from anthropic import AsyncAnthropic
from google.cloud import storage
load_dotenv() # Load environment variables from .env file
BUCKET_NAME = os.getenv('GCS_BUCKET_NAME')
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
async def extract_json(string):
start_index = string.find('{')
end_index = string.rfind('}')
if start_index == -1 or end_index == -1:
return string
result = string[start_index : end_index + 1]
return result
async def read_json_files(network):
json_data = []
blobs = bucket.list_blobs(prefix=f'{network}/transactions/simulations/trimmed/')
for blob in blobs:
if blob.name.endswith('.json'):
file_path = blob.name
results_file_path = file_path.replace(f'{network}/transactions/simulations/trimmed/', f'{network}/transactions/explanations/')
if storage.Blob(results_file_path, bucket).exists():
continue
data = json.loads(blob.download_as_string())
if data['m'][0]['f'] in SKIP_FUNCTION_CALLS:
continue
json_data.append((file_path, data))
return json_data
async def get_cached_explanation(tx_hash, network):
blob = bucket.blob(f'{network}/transactions/explanations/{tx_hash}.json')
if blob.exists():
return json.loads(blob.download_as_string())
return None
async def explain_transaction(client, payload, network='ethereum', system_prompt=None, model="claude-3-haiku-20240307", max_tokens=2000, temperature=0):
request_params = {
'model': model,
'max_tokens': max_tokens,
'temperature': temperature,
'messages': [
{
"role": "user",
"content": [
{
"type": "text",
"text": json.dumps(payload)
}
]
}
]
}
if system_prompt:
request_params['system'] = system_prompt
explanation = ""
try:
async with client.messages.stream(**request_params) as stream:
async for word in stream.text_stream:
yield word
explanation += word
except Exception as e:
print(f"Error streaming explanation: {str(e)}")
tx_hash = payload['hash']
if explanation and explanation != "" and tx_hash:
try:
await write_explanation_to_bucket(network, tx_hash, explanation, model)
except Exception as e:
print(f'Error uploading explanation for {tx_hash}: {str(e)}')
async def write_explanation_to_bucket(network, tx_hash, explanation, model):
file_path = f'{network}/transactions/explanations/{tx_hash}.json'
blob = bucket.blob(file_path)
updated_at = datetime.now().isoformat()
blob.upload_from_string(json.dumps({'result': explanation, 'model': model, 'updated_at': updated_at}))
async def process_json_file(anthropic_client, file_path, data, network, semaphore, delay_time, system_prompt, model):
async with semaphore:
print(f'Analyzing: {file_path}...')
explanation = ""
async for item in explain_transaction(anthropic_client, data, network=network, system_prompt=system_prompt, model=model):
explanation += item
if explanation and explanation != "":
tx_hash = data['hash']
await write_explanation_to_bucket(network, tx_hash, explanation, model)
else:
print(f'Error processing {file_path}')
await asyncio.sleep(delay_time)
async def main(network, delay_time, max_concurrent_connections, skip_function_calls, system_prompt_file, model):
global SKIP_FUNCTION_CALLS
SKIP_FUNCTION_CALLS = skip_function_calls
system_prompt = None
if system_prompt_file:
with open(system_prompt_file, 'r') as file:
system_prompt = file.read()
json_data = await read_json_files(network)
api_key = os.getenv('ANTHROPIC_API_KEY')
anthropic_client = AsyncAnthropic(api_key=api_key)
semaphore = asyncio.Semaphore(max_concurrent_connections)
tasks = []
for file_path, data in json_data:
task = asyncio.create_task(process_json_file(anthropic_client, file_path, data, network, semaphore, delay_time, system_prompt, model))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Blockchain Transaction Analyzer')
parser.add_argument('-n', '--network', type=str, default='ethereum', choices=['ethereum', 'arbitrum', 'avalanche', 'optimism'],
help='Blockchain network to analyze transactions for (default: ethereum)')
parser.add_argument('-d', '--delay', type=float, default=1.2,
help='Delay time between API requests in seconds (default: 1.2)')
parser.add_argument('-c', '--concurrency', type=int, default=1,
help='Maximum number of concurrent connections to the API (default: 1)')
parser.add_argument('-s', '--skip', type=str, nargs='+', default=None,
help='List of function calls to skip (default: None, suggested: transfer approve transferFrom)')
parser.add_argument('-p', '--prompt', type=str, default=None,
help='Path to the file containing the system prompt (default: None)')
parser.add_argument('-m', '--model', type=str, default='claude-3-haiku-20240307',
help='Model to use for generating explanations (default: claude-3-haiku-20240307)')
args = parser.parse_args()
network = args.network
delay_time = args.delay
max_concurrent_connections = args.concurrency
skip_function_calls = args.skip
system_prompt_file = args.prompt
model = args.model
asyncio.run(main(network, delay_time, max_concurrent_connections, skip_function_calls, system_prompt_file, model))