Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/model_context_protocol/server/client_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Server::ClientLogger
Logger::UNKNOWN => "emergency"
}.freeze

attr_accessor :transport
attr_reader :transport
attr_reader :logger_name

def initialize(logger_name: "server", level: "info")
Expand Down
14 changes: 0 additions & 14 deletions lib/model_context_protocol/server/prompt.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,6 @@ def define(&block)
@defined_arguments.concat(definition_dsl.arguments)
end

def with_argument(&block)
@defined_arguments ||= []

argument_dsl = ArgumentDSL.new
argument_dsl.instance_eval(&block)

@defined_arguments << {
name: argument_dsl.name,
description: argument_dsl.description,
required: argument_dsl.required,
completion: argument_dsl.completion
}
end

def inherited(subclass)
subclass.instance_variable_set(:@name, @name)
subclass.instance_variable_set(:@description, @description)
Expand Down
16 changes: 0 additions & 16 deletions lib/model_context_protocol/server/redis_client_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ def hset(key, *args)
with_connection { |redis| redis.hset(key, *args) }
end

def hgetall(key)
with_connection { |redis| redis.hgetall(key) }
end

def hmget(key, *fields)
with_connection { |redis| redis.hmget(key, *fields) }
end
Expand Down Expand Up @@ -71,10 +67,6 @@ def incr(key)
with_connection { |redis| redis.incr(key) }
end

def decr(key)
with_connection { |redis| redis.decr(key) }
end

def keys(pattern)
with_connection { |redis| redis.keys(pattern) }
end
Expand Down Expand Up @@ -105,14 +97,6 @@ def eval(script, keys: [], argv: [])
with_connection { |redis| redis.eval(script, keys: keys, argv: argv) }
end

def ping
with_connection { |redis| redis.ping }
end

def flushdb
with_connection { |redis| redis.flushdb }
end

private

def with_connection(&block)
Expand Down
4 changes: 0 additions & 4 deletions lib/model_context_protocol/server/redis_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ def self.stats
instance.stats
end

def self.pool_manager
instance.manager
end

def initialize
reset!
end
Expand Down
12 changes: 1 addition & 11 deletions lib/model_context_protocol/server/redis_pool_manager.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module ModelContextProtocol
class Server::RedisPoolManager
attr_reader :pool, :reaper_thread
attr_reader :pool

def initialize(redis_url:, pool_size: 20, pool_timeout: 5, ssl_params: nil)
@redis_url = redis_url
Expand Down Expand Up @@ -36,16 +36,6 @@ def shutdown
close_pool
end

def healthy?
return false unless @pool

@pool.with do |conn|
conn.ping == "PONG"
end
rescue
false
end

def reap_now
return unless @pool

Expand Down
4 changes: 2 additions & 2 deletions lib/model_context_protocol/server/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def route(message, request_store: nil, session_id: nil, transport: nil, stream_i

result = nil
begin
execute_with_context(handler, message, session_context:) do
execute_with_context do
context = {
jsonrpc_request_id:,
request_store:,
Expand Down Expand Up @@ -322,7 +322,7 @@ def build_capabilities
end

# Execute handler with appropriate context setup
def execute_with_context(handler, message, session_context:, &block)
def execute_with_context(&block)
# Skip ENV manipulation for streamable_http transport because ENV is
# global state and modifying it is thread-unsafe in multi-threaded servers.
# For stdio transport, apply ENV variables as before (single-threaded).
Expand Down
41 changes: 0 additions & 41 deletions lib/model_context_protocol/server/stdio_transport/request_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,47 +56,6 @@ def unregister_request(jsonrpc_request_id)
@requests.delete(jsonrpc_request_id)
end
end

# Get information about a specific request
#
# @param jsonrpc_request_id [String] the unique JSON-RPC request identifier
# @return [Hash, nil] request information or nil if not found
def get_request(jsonrpc_request_id)
@mutex.synchronize do
@requests[jsonrpc_request_id]&.dup
end
end

# Get all active request IDs
#
# @return [Array<String>] list of active request IDs
def active_requests
@mutex.synchronize do
@requests.keys.dup
end
end

# Clean up old requests (useful for preventing memory leaks)
#
# @param max_age_seconds [Integer] maximum age of requests to keep
# @return [Array<String>] list of cleaned up request IDs
def cleanup_old_requests(max_age_seconds = 300)
cutoff_time = Time.now - max_age_seconds
removed_ids = []

@mutex.synchronize do
@requests.delete_if do |jsonrpc_request_id, data|
if data[:started_at] < cutoff_time
removed_ids << jsonrpc_request_id
true
else
false
end
end
end

removed_ids
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -785,12 +785,6 @@ def cleanup_session(session_id)
@server_request_store.cleanup_session_requests(session_id)
end

# Check if this transport instance has any active local streams
# Used to determine if notifications should be queued or delivered immediately
def has_active_streams?
@stream_registry.has_any_local_streams?
end

# Broadcast notification to all active streams on this transport instance
# Handles connection errors gracefully and removes disconnected streams
def deliver_to_active_streams(notification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,6 @@ def next_event_id
count = @redis.incr(@counter_key)
"#{@server_instance}-#{count}"
end

def current_count
count = @redis.get(@counter_key)
count ? count.to_i : 0
end

def reset
@redis.set(@counter_key, 0)
end

def set_count(value)
@redis.set(@counter_key, value.to_i)
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ def push(notification)
end
end

def pop
notification_json = @redis.rpop(@queue_key)
return nil unless notification_json

JSON.parse(notification_json)
end

def pop_all
notification_jsons = @redis.multi do |multi|
multi.lrange(@queue_key, 0, -1)
Expand All @@ -41,40 +34,6 @@ def pop_all
JSON.parse(notification_json)
end
end

def peek_all
notification_jsons = @redis.lrange(@queue_key, 0, -1)
return [] if notification_jsons.empty?

notification_jsons.reverse.map do |notification_json|
JSON.parse(notification_json)
end
end

def size
@redis.llen(@queue_key)
end

def empty?
size == 0
end

def clear
@redis.del(@queue_key)
end

def push_bulk(notifications)
return if notifications.empty?

notification_jsons = notifications.map(&:to_json)

@redis.multi do |multi|
notification_jsons.each do |json|
multi.lpush(@queue_key, json)
end
multi.ltrim(@queue_key, 0, @max_size - 1)
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,6 @@ def cancelled?(jsonrpc_request_id)
@redis.exists("#{CANCELLED_KEY_PREFIX}#{jsonrpc_request_id}") == 1
end

# Get cancellation information for a request
#
# @param jsonrpc_request_id [String] the unique JSON-RPC request identifier
# @return [Hash, nil] cancellation data or nil if not cancelled
def get_cancellation_info(jsonrpc_request_id)
data = @redis.get("#{CANCELLED_KEY_PREFIX}#{jsonrpc_request_id}")
data ? JSON.parse(data) : nil
rescue JSON::ParserError
nil
end

# Unregister a request (typically called when request completes)
#
# @param jsonrpc_request_id [String] the unique JSON-RPC request identifier
Expand All @@ -101,25 +90,6 @@ def unregister_request(jsonrpc_request_id)
@redis.del(*keys_to_delete) unless keys_to_delete.empty?
end

# Get information about a specific request
#
# @param jsonrpc_request_id [String] the unique JSON-RPC request identifier
# @return [Hash, nil] request information or nil if not found
def get_request(jsonrpc_request_id)
data = @redis.get("#{REQUEST_KEY_PREFIX}#{jsonrpc_request_id}")
data ? JSON.parse(data) : nil
rescue JSON::ParserError
nil
end

# Check if a request is currently active
#
# @param jsonrpc_request_id [String] the unique JSON-RPC request identifier
# @return [Boolean] true if the request is active, false otherwise
def active?(jsonrpc_request_id)
@redis.exists("#{REQUEST_KEY_PREFIX}#{jsonrpc_request_id}") == 1
end

# Clean up all requests associated with a session
# This is typically called when a session is terminated
#
Expand All @@ -146,79 +116,6 @@ def cleanup_session_requests(session_id)
@redis.del(*all_keys) unless all_keys.empty?
jsonrpc_request_ids
end

# Get all active request IDs for a specific session
#
# @param session_id [String] the session identifier
# @return [Array<String>] list of active request IDs for the session
def get_session_requests(session_id)
pattern = "#{SESSION_KEY_PREFIX}#{session_id}:*"
request_keys = @redis.keys(pattern)

request_keys.map do |key|
key.sub("#{SESSION_KEY_PREFIX}#{session_id}:", "")
end
end

# Get all active request IDs across all sessions
#
# @return [Array<String>] list of all active request IDs
def get_all_active_requests
pattern = "#{REQUEST_KEY_PREFIX}*"
request_keys = @redis.keys(pattern)

request_keys.map do |key|
key.sub(REQUEST_KEY_PREFIX, "")
end
end

# Clean up expired requests based on TTL
# This method can be called periodically to ensure cleanup
#
# @return [Integer] number of expired requests cleaned up
def cleanup_expired_requests
active_keys = @redis.keys("#{REQUEST_KEY_PREFIX}*")
expired_count = 0
key_exists_without_expiration = -1
key_does_not_exist = -2

active_keys.each do |key|
ttl = @redis.ttl(key)
if ttl == key_exists_without_expiration
@redis.expire(key, @ttl)
elsif ttl == key_does_not_exist
expired_count += 1
end
end

expired_count
end

# Refresh the TTL for an active request
#
# @param jsonrpc_request_id [String] the unique JSON-RPC request identifier
# @return [Boolean] true if TTL was refreshed, false if request doesn't exist
def refresh_request_ttl(jsonrpc_request_id)
request_data = @redis.get("#{REQUEST_KEY_PREFIX}#{jsonrpc_request_id}")
return false unless request_data

@redis.multi do |multi|
multi.expire("#{REQUEST_KEY_PREFIX}#{jsonrpc_request_id}", @ttl)
multi.expire("#{CANCELLED_KEY_PREFIX}#{jsonrpc_request_id}", @ttl)

begin
data = JSON.parse(request_data)
session_id = data["session_id"]
if session_id
multi.expire("#{SESSION_KEY_PREFIX}#{session_id}:#{jsonrpc_request_id}", @ttl)
end
rescue JSON::ParserError
nil
end
end

true
end
end
end
end
Loading