-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
287 lines (225 loc) · 11.4 KB
/
server.py
File metadata and controls
287 lines (225 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import asyncio
import heapq
import itertools
import grpc
from concurrent import futures
import time
import ticker_service_pb2
import ticker_service_pb2_grpc
# Hardcoded tickers and market data for simplicity
TICKERS = [
ticker_service_pb2.TickerInfo(symbol="AAPL", name="Apple Inc."),
ticker_service_pb2.TickerInfo(symbol="GOOGL", name="Alphabet Inc."),
ticker_service_pb2.TickerInfo(symbol="AMZN", name="Amazon Inc."),
]
class Order:
def __init__(self, order_id, order_type, price, symbol, name, quantity):
self.order_id = order_id
self.order_type = order_type
self.price = price
self.symbol = symbol
self.name = name
self.quantity = quantity
def __repr__(self):
return f"Order({self.order_id}, {self.order_type}, {self.price}, {self.quantity})"
# Add comparison based on price
def __lt__(self, other):
# Orders are compared based on price
if not isinstance(other, Order):
return NotImplemented
return self.price < other.price
def __eq__(self, other):
if not isinstance(other, Order):
return NotImplemented
return self.price == other.price
class OrderBook:
def __init__(self, symbol, name):
self.symbol = symbol
self.name = name
self.buy_orders = [] # Max heap for buy orders (store negative prices for max-heap behavior)
self.sell_orders = [] # Min heap for sell orders
self.best_avg_price = None # Store average price between best buy and sell orders
self.matched_trades = [] # Store latest matched trades
self.order_id_counter = itertools.count(1) # Automatic order ID generator
self.lock = asyncio.Lock() # Lock for synchronization
async def get_top_orders(self, n=3):
"""
Get the top N buy and sell orders for display.
Returns:
tuple: (top_buy_orders, top_sell_orders)
"""
async with self.lock:
# Get the top N buy orders (remember to negate the price for correct max-heap behavior)
top_buy_orders = [(abs(price), order) for price, order in heapq.nsmallest(n, self.buy_orders, key=lambda x: x[0])]
# Get the top N sell orders (min-heap)
top_sell_orders = [(price, order) for price, order in heapq.nsmallest(n, self.sell_orders, key=lambda x: x[0])]
return top_buy_orders, top_sell_orders
async def add_limit_order(self, order_type, price, quantity):
async with self.lock:
if order_type not in ['buy', 'sell']:
raise ValueError(f"Unknown order type: {order_type}")
order_id = next(self.order_id_counter)
order = Order(order_id, order_type, price, self.symbol, self.name, quantity)
if order_type == 'buy':
heapq.heappush(self.buy_orders, (-order.price, order)) # Max heap (negative prices)
else:
heapq.heappush(self.sell_orders, (order.price, order)) # Min heap
self.update_best_avg_price()
return order_id
async def add_market_order(self, order_type, quantity):
async with self.lock:
if order_type not in ['buy', 'sell']:
raise ValueError(f"Unknown order type: {order_type}")
order_id = next(self.order_id_counter)
order = Order(order_id, order_type, None, self.symbol, self.name, quantity)
if order_type == 'buy':
await self._execute_market_buy_order(order)
else:
await self._execute_market_sell_order(order)
return order_id
async def _execute_market_buy_order(self, order):
while self.sell_orders and order.quantity > 0:
best_sell = self.sell_orders[0][1] # Get the best sell order (lowest price)
if best_sell.quantity <= order.quantity:
traded_quantity = best_sell.quantity
heapq.heappop(self.sell_orders) # Remove the sell order
order.quantity -= traded_quantity
else:
traded_quantity = order.quantity
best_sell.quantity -= traded_quantity
order.quantity = 0
self.matched_trades.append((traded_quantity, best_sell.price))
if len(self.matched_trades) > 5:
self.matched_trades.pop(0)
async def _execute_market_sell_order(self, order):
while self.buy_orders and order.quantity > 0:
best_buy = self.buy_orders[0][1] # Get the best buy order (highest price)
if best_buy.quantity <= order.quantity:
traded_quantity = best_buy.quantity
heapq.heappop(self.buy_orders) # Remove the buy order
order.quantity -= traded_quantity
else:
traded_quantity = order.quantity
best_buy.quantity -= traded_quantity
order.quantity = 0
self.matched_trades.append((traded_quantity, best_buy.price))
if len(self.matched_trades) > 5:
self.matched_trades.pop(0)
def update_best_avg_price(self):
if self.buy_orders and self.sell_orders:
best_buy = -self.buy_orders[0][0] # Remember the price is negated in the buy heap
best_sell = self.sell_orders[0][0]
self.best_avg_price = (best_buy + best_sell) / 2
else:
self.best_avg_price = None
async def match_orders(self):
async with self.lock:
while self.buy_orders and self.sell_orders:
highest_buy = heapq.heappop(self.buy_orders)[1]
lowest_sell = heapq.heappop(self.sell_orders)[1]
if highest_buy.price >= lowest_sell.price:
traded_quantity = min(highest_buy.quantity, lowest_sell.quantity)
highest_buy.quantity -= traded_quantity
lowest_sell.quantity -= traded_quantity
self.matched_trades.append((traded_quantity, lowest_sell.price))
if len(self.matched_trades) > 5:
self.matched_trades.pop(0)
if highest_buy.quantity > 0:
heapq.heappush(self.buy_orders, (-highest_buy.price, highest_buy))
if lowest_sell.quantity > 0:
heapq.heappush(self.sell_orders, (lowest_sell.price, lowest_sell))
else:
heapq.heappush(self.buy_orders, (-highest_buy.price, highest_buy))
heapq.heappush(self.sell_orders, (lowest_sell.price, lowest_sell))
break
self.update_best_avg_price()
class TickerServiceServicer(ticker_service_pb2_grpc.TickerServiceServicer):
def __init__(self):
self.order_books = {ticker.symbol: OrderBook(ticker.symbol, ticker.name) for ticker in TICKERS}
self.clients = [] # Store tuples of (queue, ticker_symbol) for each connected client
self.submission_locks = {} # To track rate-limiting by client
self.rate_limit_duration = 0.5 # Time limit between submissions (in seconds)
async def GetTickers(self, request, context):
response = ticker_service_pb2.TickerResponse()
if request.ticker_symbol:
filtered_tickers = [ticker for ticker in TICKERS if ticker.symbol == request.ticker_symbol]
response.tickers.extend(filtered_tickers)
else:
response.tickers.extend(TICKERS)
return response
async def ConnectToMarketData(self, request, context):
"""
Handles client connection for a specific ticker's market data.
The client subscribes to updates for the requested ticker_symbol.
"""
ticker_symbol = request.ticker_symbol
print(f"Subscriber connected for ticker: {ticker_symbol}")
queue = asyncio.Queue()
self.clients.append((queue, ticker_symbol)) # Store both the queue and the subscribed ticker symbol
try:
while True:
market_data = await queue.get()
yield market_data
except asyncio.CancelledError:
self.clients.remove((queue, ticker_symbol))
raise
async def broadcast_market_data(self, ticker_symbol):
"""
Broadcasts market data updates to clients subscribed to the specific ticker_symbol.
"""
order_book = self.order_books[ticker_symbol]
bidOrders, askOrders = await order_book.get_top_orders(1)
if len(bidOrders) == 0 or len(askOrders) == 0:
return
market_data = ticker_service_pb2.MarketData(
ticker_symbol=ticker_symbol,
best_bid_price=bidOrders[0][1].price if bidOrders else 0,
best_ask_price=askOrders[0][1].price if askOrders else 0,
best_bid_quantity=bidOrders[0][1].quantity if bidOrders else 0,
best_ask_quantity=askOrders[0][1].quantity if askOrders else 0,
)
# Only broadcast to clients who subscribed to this specific ticker symbol
for client_queue, subscribed_ticker_symbol in self.clients:
if subscribed_ticker_symbol == ticker_symbol:
try:
await client_queue.put(market_data)
except asyncio.QueueFull:
# Handle clients unable to keep up with updates
self.clients.remove((client_queue, subscribed_ticker_symbol))
async def SubmitLimitOrder(self, request, context):
# Rate limiting for client requests
client_id = str(context.peer())
last_submission = self.submission_locks.get(client_id, 0)
current_time = time.time()
if current_time - last_submission < self.rate_limit_duration:
context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "Rate limit exceeded")
self.submission_locks[client_id] = current_time
order_id_code = await self.order_books[request.ticker_symbol].add_limit_order(
request.side, request.price, request.quantity)
await self.order_books[request.ticker_symbol].match_orders()
# Trigger market data broadcast upon a new order
await self.broadcast_market_data(request.ticker_symbol)
return ticker_service_pb2.OrderResponse(order_id=str(order_id_code))
async def SubmitMarketOrder(self, request, context):
client_id = str(context.peer())
last_submission = self.submission_locks.get(client_id, 0)
current_time = time.time()
if current_time - last_submission < self.rate_limit_duration:
context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "Rate limit exceeded")
self.submission_locks[client_id] = current_time
order_id_code = await self.order_books[request.ticker_symbol].add_market_order(
request.side, request.quantity)
await self.order_books[request.ticker_symbol].match_orders()
# Trigger market data broadcast upon a new order
await self.broadcast_market_data(request.ticker_symbol)
return ticker_service_pb2.OrderResponse(order_id=str(order_id_code))
async def serve():
server = grpc.aio.server()
ticker_service = TickerServiceServicer()
ticker_service_pb2_grpc.add_TickerServiceServicer_to_server(ticker_service, server)
server.add_insecure_port('[::]:50051')
await server.start()
print("Server started on port 50051")
await server.wait_for_termination()
if __name__ == '__main__':
asyncio.run(serve())