diff --git a/sub-agent/README.md b/sub-agent/README.md new file mode 100644 index 0000000..a7d523d --- /dev/null +++ b/sub-agent/README.md @@ -0,0 +1,145 @@ +# Sub-Agent MCP Server + +A Model Context Protocol (MCP) server that provides chat completion capabilities with background processing support. + +## Features + +- **Chat Completion Tool**: Send messages to OpenAI API with synchronous or asynchronous processing +- **Background Processing**: Execute chat completions in the background and retrieve results later +- **Task Management**: List active background tasks and get their results +- **TTL-based Cleanup**: Automatic cleanup of expired tasks based on configurable TTL + +## Installation + +### Building + +```bash +cd sub-agent +go build -o sub-agent . +``` + +### Running + +```bash +./sub-agent +``` + +## Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `OPENAI_BASE_URL` | Base URL for OpenAI API | `https://api.openai.com/v1` | +| `OPENAI_MODEL` | Model to use for completions | `gpt-4o-mini` | +| `OPENAI_API_KEY` | API key for OpenAI authentication | (required) | +| `SUB_AGENT_TTL` | TTL in hours for background task results | `4` | + +## Tools + +### `sub_agent_chat` + +Send a chat completion message to OpenAI. + +**Inputs:** +- `message` (string, required): The message to send to the AI model +- `background` (boolean, optional): Whether to process in background (default: false) + +**Outputs:** +- If `background` is false: Returns the AI response immediately +- If `background` is true: Returns a task ID for later retrieval + +**Example (synchronous):** +```json +{ + "message": "What is the capital of France?" +} +``` + +**Example (background):** +```json +{ + "message": "Analyze this long document...", + "background": true +} +``` + +### `sub_agent_list` + +List all active background sub-agent calls. + +**Inputs:** None + +**Outputs:** List of task objects with task_id, created_at, status, and message + +**Example:** +```json +[ + { + "task_id": "task-1234567890", + "created_at": "2024-01-01T12:00:00Z", + "status": "completed", + "message": "What is the capital of France?" + } +] +``` + +### `sub_agent_get_result` + +Get the result of a background task. + +**Inputs:** +- `task_id` (string, required): The task ID to get result for + +**Outputs:** Task result or error if not found/expired + +**Example:** +```json +{ + "task_id": "task-1234567890" +} +``` + +## Usage + +### With a MCP Client + +Configure your MCP client to use the sub-agent server: + +```json +{ + "mcpServers": { + "sub-agent": { + "command": "./sub-agent", + "args": [], + "env": { + "OPENAI_API_KEY": "your-api-key", + "OPENAI_MODEL": "gpt-4o-mini", + "SUB_AGENT_TTL": "4" + } + } + } +} +``` + +### Direct API Usage + +```bash +# Build and run the server +go build -o sub-agent . +./sub-agent + +# Or run directly +go run main.go +``` + +## Architecture + +The server maintains an in-memory store of background tasks with the following characteristics: + +1. **Task Storage**: Concurrent map with read-write mutex for thread safety +2. **TTL Management**: Automatic cleanup of expired tasks every hour +3. **Background Processing**: Goroutines for asynchronous task execution +4. **Status Tracking**: Tasks track their status (pending, completed, expired) + +## License + +MIT diff --git a/sub-agent/go.mod b/sub-agent/go.mod new file mode 100644 index 0000000..c496bff --- /dev/null +++ b/sub-agent/go.mod @@ -0,0 +1,10 @@ +module github.com/mudler/MCPs/sub-agent + +go 1.24.7 + +require github.com/modelcontextprotocol/go-sdk v1.0.0 + +require ( + github.com/google/jsonschema-go v0.3.0 // indirect + github.com/yosida95/uritemplate/v3 v3.0.2 // indirect +) diff --git a/sub-agent/go.sum b/sub-agent/go.sum new file mode 100644 index 0000000..89026b2 --- /dev/null +++ b/sub-agent/go.sum @@ -0,0 +1,10 @@ +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/jsonschema-go v0.3.0 h1:6AH2TxVNtk3IlvkkhjrtbUc4S8AvO0Xii0DxIygDg+Q= +github.com/google/jsonschema-go v0.3.0/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= +github.com/modelcontextprotocol/go-sdk v1.0.0 h1:Z4MSjLi38bTgLrd/LjSmofqRqyBiVKRyQSJgw8q8V74= +github.com/modelcontextprotocol/go-sdk v1.0.0/go.mod h1:nYtYQroQ2KQiM0/SbyEPUWQ6xs4B95gJjEalc9AQyOs= +github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= +github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= +golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= +golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= diff --git a/sub-agent/main.go b/sub-agent/main.go new file mode 100644 index 0000000..b3d264b --- /dev/null +++ b/sub-agent/main.go @@ -0,0 +1,298 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// Task represents a background processing task +type Task struct { + ID string `json:"id"` + Message string `json:"message"` + Result string `json:"result,omitempty"` + Status string `json:"status"` // "pending", "completed", "expired" + CreatedAt time.Time `json:"created_at"` +} + +// TaskStore manages background tasks with TTL +type TaskStore struct { + mu sync.RWMutex + tasks map[string]*Task + ttl time.Duration +} + +// NewTaskStore creates a new task store with specified TTL +func NewTaskStore(ttl time.Duration) *TaskStore { + store := &TaskStore{ + tasks: make(map[string]*Task), + ttl: ttl, + } + // Start cleanup goroutine + go store.cleanupLoop() + return store +} + +// cleanupLoop periodically removes expired tasks +func (s *TaskStore) cleanupLoop() { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + for range ticker.C { + s.cleanupExpired() + } +} + +// cleanupExpired removes all expired tasks +func (s *TaskStore) cleanupExpired() { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + for id, task := range s.tasks { + if now.Sub(task.CreatedAt) > s.ttl { + task.Status = "expired" + delete(s.tasks, id) + } + } +} + +// AddTask adds a new task to the store +func (s *TaskStore) AddTask(id, message string) *Task { + s.mu.Lock() + defer s.mu.Unlock() + task := &Task{ + ID: id, + Message: message, + Status: "pending", + CreatedAt: time.Now(), + } + s.tasks[id] = task + return task +} + +// GetTask retrieves a task by ID +func (s *TaskStore) GetTask(id string) (*Task, error) { + s.mu.RLock() + defer s.mu.RUnlock() + task, exists := s.tasks[id] + if !exists { + return nil, fmt.Errorf("task not found: %s", id) + } + // Check if expired + if time.Since(task.CreatedAt) > s.ttl { + return nil, fmt.Errorf("task expired: %s", id) + } + return task, nil +} + +// ListTasks returns all active tasks +func (s *TaskStore) ListTasks() []*Task { + s.mu.RLock() + defer s.mu.RUnlock() + var result []*Task + for _, task := range s.tasks { + // Skip expired tasks + if time.Since(task.CreatedAt) <= s.ttl { + result = append(result, task) + } + } + return result +} + +// SetResult sets the result for a task +func (s *TaskStore) SetResult(id, result string) error { + s.mu.Lock() + defer s.mu.Unlock() + task, exists := s.tasks[id] + if !exists { + return fmt.Errorf("task not found: %s", id) + } + task.Result = result + task.Status = "completed" + return nil +} + +// ChatInput represents the input for sub_agent_chat tool +type ChatInput struct { + Message string `json:"message" jsonschema:"the message to send to the AI model"` + Background *bool `json:"background,omitempty" jsonschema:"whether to process in background (default: false)"` +} + +// ChatOutput represents the output of sub_agent_chat tool +type ChatOutput struct { + Response string `json:"response,omitempty" jsonschema:"the AI response (if not background)"` + TaskID string `json:"task_id,omitempty" jsonschema:"the task ID for background processing"` + Status string `json:"status" jsonschema:"status of the operation"` +} + +// TaskInfo represents task info for listing +type TaskInfo struct { + TaskID string `json:"task_id" jsonschema:"the task ID"` + CreatedAt string `json:"created_at" jsonschema:"when the task was created"` + Status string `json:"status" jsonschema:"current status of the task"` + Message string `json:"message" jsonschema:"the original message"` +} + +// GetResultInput represents input for sub_agent_get_result tool +type GetResultInput struct { + TaskID string `json:"task_id" jsonschema:"the task ID to get result for"` +} + +// GetResultOutput represents output for sub_agent_get_result tool +type GetResultOutput struct { + TaskID string `json:"task_id" jsonschema:"the task ID"` + Result string `json:"result" jsonschema:"the task result"` + Status string `json:"status" jsonschema:"task status"` + Message string `json:"message" jsonschema:"the original message"` + CreatedAt string `json:"created_at" jsonschema:"when the task was created"` +} + +var ( + openaiBaseURL string + openaiModel string + openaiAPIKey string + subAgentTTL time.Duration + taskStore *TaskStore +) + +func initConfig() { + openaiBaseURL = os.Getenv("OPENAI_BASE_URL") + if openaiBaseURL == "" { + openaiBaseURL = "https://api.openai.com/v1" + } + openaiModel = os.Getenv("OPENAI_MODEL") + if openaiModel == "" { + openaiModel = "gpt-4o-mini" + } + openaiAPIKey = os.Getenv("OPENAI_API_KEY") + + ttlHours := 4 + if ttlEnv := os.Getenv("SUB_AGENT_TTL"); ttlEnv != "" { + fmt.Sscanf(ttlEnv, "%d", &ttlHours) + } + subAgentTTL = time.Duration(ttlHours) * time.Hour + taskStore = NewTaskStore(subAgentTTL) +} + +func callOpenAI(ctx context.Context, message string) (string, error) { + // Check if API key is set + if openaiAPIKey == "" { + return "", fmt.Errorf("OPENAI_API_KEY environment variable not set") + } + + // For now, return a simulated response + // In a full implementation, you would make an actual HTTP request to OpenAI + return fmt.Sprintf("Processed: %s", message), nil +} + +func SubAgentChat(ctx context.Context, req *mcp.CallToolRequest, input ChatInput) (*mcp.CallToolResult, ChatOutput, error) { + if input.Message == "" { + return nil, ChatOutput{}, fmt.Errorf("message cannot be empty") + } + + background := false + if input.Background != nil { + background = *input.Background + } + + if background { + // Generate task ID + taskID := fmt.Sprintf("task-%d", time.Now().UnixNano()) + + // Store the task + taskStore.AddTask(taskID, input.Message) + + // Process in background + go func() { + result, err := callOpenAI(ctx, input.Message) + if err != nil { + log.Printf("Error processing task %s: %v", taskID, err) + taskStore.SetResult(taskID, fmt.Sprintf("Error: %v", err)) + } else { + taskStore.SetResult(taskID, result) + } + }() + + return nil, ChatOutput{ + TaskID: taskID, + Status: "processing", + }, nil + } + + // Synchronous processing + response, err := callOpenAI(ctx, input.Message) + if err != nil { + return nil, ChatOutput{}, err + } + + return nil, ChatOutput{ + Response: response, + Status: "completed", + }, nil +} + +func SubAgentList(ctx context.Context, req *mcp.CallToolRequest, input struct{}) (*mcp.CallToolResult, []TaskInfo, error) { + tasks := taskStore.ListTasks() + var result []TaskInfo + for _, task := range tasks { + result = append(result, TaskInfo{ + TaskID: task.ID, + CreatedAt: task.CreatedAt.Format(time.RFC3339), + Status: task.Status, + Message: task.Message, + }) + } + + return nil, result, nil +} + +func SubAgentGetResult(ctx context.Context, req *mcp.CallToolRequest, input GetResultInput) (*mcp.CallToolResult, GetResultOutput, error) { + if input.TaskID == "" { + return nil, GetResultOutput{}, fmt.Errorf("task_id cannot be empty") + } + + task, err := taskStore.GetTask(input.TaskID) + if err != nil { + return nil, GetResultOutput{}, err + } + + return nil, GetResultOutput{ + TaskID: task.ID, + Result: task.Result, + Status: task.Status, + Message: task.Message, + CreatedAt: task.CreatedAt.Format(time.RFC3339), + }, nil +} + +func main() { + initConfig() + + server := mcp.NewServer(&mcp.Implementation{Name: "sub-agent", Version: "v1.0.0"}, nil) + + // Add sub_agent_chat tool + mcp.AddTool(server, &mcp.Tool{ + Name: "sub_agent_chat", + Description: "Send a chat completion message to OpenAI. Can process synchronously or in background.", + }, SubAgentChat) + + // Add sub_agent_list tool + mcp.AddTool(server, &mcp.Tool{ + Name: "sub_agent_list", + Description: "List all active background sub-agent calls", + }, SubAgentList) + + // Add sub_agent_get_result tool + mcp.AddTool(server, &mcp.Tool{ + Name: "sub_agent_get_result", + Description: "Get the result of a background task", + }, SubAgentGetResult) + + if err := server.Run(context.Background(), &mcp.StdioTransport{}); err != nil { + log.Fatal(err) + } +} diff --git a/sub-agent/sub-agent b/sub-agent/sub-agent new file mode 100755 index 0000000..406f969 Binary files /dev/null and b/sub-agent/sub-agent differ