-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathswitch.py
More file actions
105 lines (80 loc) · 3.03 KB
/
switch.py
File metadata and controls
105 lines (80 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from myhdl import block, always_comb, Signal, always, intbv
@block
def manchester_decoder(clk, encoded, decoded, trigger_out):
# Register to hold the last Manchester-encoded bit
last_bit = Signal(bool(0))
i = Signal(intbv(0, min=0))
index = Signal(intbv(0, min=0))
# Manchester decoding logic
@always(clk.posedge)
def manchester_decode():
if trigger_out:
return
if i % 2:
if last_bit and not encoded:
decoded.next[index.val] = 0
elif not last_bit and encoded:
decoded.next[index.val] = 1
elif not last_bit and not encoded:
trigger_out.next = 1
elif last_bit and encoded:
raise ValueError('Invalid Manchester code 11')
index.next = index + 1
else:
last_bit.next = encoded
i.next = i + 1
return manchester_decode
@block
def ip_parser(decoded, source_port, dest_port, data, trigger_in):
LENGTH = len(decoded)
# Constants for the UDP header fields
SRC_PORT_OFFSET = LENGTH - 34 * 8
DST_PORT_OFFSET = LENGTH - 36 * 8
DATA_OFFSET = LENGTH - 42 * 8
# Constants for the IP header fields
PROTOCOL_OFFSET = LENGTH - 23 * 8
# Constants for the Ethernet header fields
ETHERTYPE_OFFSET = LENGTH - 12 * 8
# Constants for the UDP protocol
UDP_PROTOCOL = 17
# Constants for the Ethernet II frame type
ETHERTYPE_IPV4 = 0x0800
# Define input and output signals
@always_comb
def logic():
if not trigger_in:
return
# Extract Ethernet header fields
ethertype = int(decoded[ETHERTYPE_OFFSET:ETHERTYPE_OFFSET - 16])
protocol = int(decoded[PROTOCOL_OFFSET: PROTOCOL_OFFSET - 8])
# Only parse IPv4 UDP packets
if ethertype == ETHERTYPE_IPV4 and protocol == UDP_PROTOCOL:
# Extract UDP header fields
source_port.next = int(decoded[SRC_PORT_OFFSET:SRC_PORT_OFFSET - 16])
dest_port.next = int(decoded[DST_PORT_OFFSET:DST_PORT_OFFSET - 16])
# Copy UDP data
data.next = decoded[DATA_OFFSET: 32]
# Return the logic process
return logic
# @block
# def manchester_encoder(clk, data, encoded):
# @always_seq(clk.posedge, reset=None)
# def encode_process():
# for i in range(int(len(data) / 2)):
# if data[i * 2] and data[i * 2 + 1]:
# encoded[i] = 1
# elif not data[i * 2] and data[i * 2 + 1]:
# encoded[i] = 0
# else:
# # Invalid Manchester code
# pass
#
# return encode_process
@block
def top(clk, encoded, decoded, source_port, dest_port, data):
# Instantiate decoder, parser, and encoder blocks
trigger = Signal(bool(0))
decode_inst = manchester_decoder(clk, encoded, decoded, trigger)
parse_inst = ip_parser(decoded, source_port, dest_port, data, trigger)
# encode_inst = manchester_encoder(clk, data, encoded)
return decode_inst, parse_inst # , encode_inst