forked from fulfilio/mcp-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflask_app.py
More file actions
141 lines (112 loc) · 3.52 KB
/
flask_app.py
File metadata and controls
141 lines (112 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Example Flask application implementing an MCP server.
`pip install mcp-utils flask gunicorn redis`
* Needs Redis to be running
* Needs Gunicorn to be installed
In addition to the requirements for the simple example, this example also:
* Demonstrates logging setup
* Demonstrates session management
* Demonstrates SSE stream handling
"""
import logging
import sys
import redis
from flask import Flask, Response, request, url_for
from gunicorn.app.base import BaseApplication
from mcp_utils.core import MCPServer
from mcp_utils.queue import RedisResponseQueue
from mcp_utils.schema import (
CallToolResult,
CompletionValues,
GetPromptResult,
Message,
TextContent,
)
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
)
app = Flask(__name__)
mcp = MCPServer("weather", "1.0", response_queue=RedisResponseQueue(redis_client))
logger = logging.getLogger("mcp_utils")
logger.setLevel(logging.DEBUG)
@mcp.tool()
def get_weather(city: str) -> CallToolResult:
return "sunny"
@mcp.prompt()
def get_forecast() -> GetPromptResult:
return GetPromptResult(
description="Weather forecast prompt",
messages=[
Message(
role="user",
content=TextContent(
text="What is the weather forecast like?",
),
)
],
)
@mcp.prompt()
def get_weather_prompt(city: str) -> GetPromptResult:
return GetPromptResult(
description="Weather prompt",
messages=[
Message(
role="user",
content=TextContent(
text=f"What is the weather like in {city}?",
),
)
],
)
@get_weather_prompt.completion("city")
def get_cities(city_name: str) -> CompletionValues:
all_cities = ["New York", "London", "Tokyo", "Sydney", "Beijing"]
# Filter cities that start with the given city
return [city for city in all_cities if city.lower().startswith(city_name.lower())]
@app.route("/sse")
def sse():
session_id = mcp.generate_session_id()
messages_endpoint = url_for("message", session_id=session_id)
logger.info(f"SSE endpoint: {messages_endpoint}")
logger.info(f"Session ID: {session_id}")
return Response(
mcp.sse_stream(session_id, messages_endpoint), mimetype="text/event-stream"
)
@app.route("/message", methods=["POST"])
def message():
"""
Messages from the client are received as a POST
request with a JSON body.
"""
logger.debug(f"Received message: {request.get_json()}")
mcp.handle_message(request.args["session_id"], request.get_json())
return "", 202
class FlaskApplication(BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings
}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
if __name__ == "__main__":
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(name)s: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
options = {
"bind": "0.0.0.0:9000",
"workers": 1,
"worker_class": "gevent",
"loglevel": "debug",
}
FlaskApplication(app, options).run()