-
Notifications
You must be signed in to change notification settings - Fork 305
Simplify MCP catalog loading: single fetch per run with ETag caching #2124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,6 @@ package gateway | |
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "log/slog" | ||
| "net/http" | ||
|
|
@@ -12,14 +11,19 @@ import ( | |
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/docker/docker-agent/pkg/paths" | ||
| ) | ||
|
|
||
| const ( | ||
| DockerCatalogURL = "https://desktop.docker.com/mcp/catalog/v3/catalog.yaml" | ||
| catalogCacheFileName = "mcp_catalog.json" | ||
| catalogCacheDuration = 24 * time.Hour | ||
| fetchTimeout = 15 * time.Second | ||
| ) | ||
|
|
||
| // catalogJSON is the URL we actually fetch (JSON is ~3x faster to parse than YAML). | ||
| var catalogJSON = strings.Replace(DockerCatalogURL, ".yaml", ".json", 1) | ||
|
|
||
| func RequiredEnvVars(ctx context.Context, serverName string) ([]Secret, error) { | ||
| server, err := ServerSpec(ctx, serverName) | ||
| if err != nil { | ||
|
|
@@ -36,211 +40,158 @@ func RequiredEnvVars(ctx context.Context, serverName string) ([]Secret, error) { | |
| return server.Secrets, nil | ||
| } | ||
|
|
||
| func ServerSpec(_ context.Context, serverName string) (Server, error) { | ||
| server, ok := getCatalogServer(serverName) | ||
| func ServerSpec(ctx context.Context, serverName string) (Server, error) { | ||
| catalog, err := loadCatalog(ctx) | ||
| if err != nil { | ||
| return Server{}, err | ||
| } | ||
|
|
||
| server, ok := catalog[serverName] | ||
| if !ok { | ||
| return Server{}, fmt.Errorf("MCP server %q not found in MCP catalog", serverName) | ||
| } | ||
|
|
||
| return server, nil | ||
| } | ||
|
|
||
| // cachedCatalog is the on-disk cache format. | ||
| type cachedCatalog struct { | ||
| Catalog Catalog `json:"catalog"` | ||
| CachedAt time.Time `json:"cached_at"` | ||
| Catalog Catalog `json:"catalog"` | ||
| ETag string `json:"etag,omitempty"` | ||
| } | ||
|
|
||
| var ( | ||
| catalogMu sync.RWMutex | ||
| catalogData Catalog | ||
| catalogLoaded bool | ||
| catalogStale bool | ||
| refreshOnce sync.Once | ||
| ) | ||
|
|
||
| // getCatalogServer returns a server from the catalog, refreshing if needed. | ||
| // If server is not found in cache, it will try to fetch fresh data from network | ||
| // in case it's a newly added server. | ||
| func getCatalogServer(serverName string) (Server, bool) { | ||
| // First, ensure catalog is loaded | ||
| ensureCatalogLoaded() | ||
|
|
||
| catalogMu.RLock() | ||
| server, ok := catalogData[serverName] | ||
| stale := catalogStale | ||
| catalogMu.RUnlock() | ||
|
|
||
| if ok { | ||
| // Found in cache. If stale, trigger background refresh for next time. | ||
| if stale { | ||
| triggerBackgroundRefresh() | ||
| } | ||
| return server, true | ||
| } | ||
|
|
||
| // Server not found in cache. Try fetching fresh data in case it's a new server. | ||
| if refreshCatalogFromNetwork() { | ||
| catalogMu.RLock() | ||
| server, ok = catalogData[serverName] | ||
| catalogMu.RUnlock() | ||
| return server, ok | ||
| } | ||
|
|
||
| return Server{}, false | ||
| // catalogOnce guards one-shot catalog loading. | ||
| // We use sync.OnceValues so that: | ||
| // - the catalog is fetched at most once per process, and | ||
| // - we detach from the caller's context to avoid permanently | ||
| // caching a context-cancellation error. | ||
| var catalogOnce = sync.OnceValues(func() (Catalog, error) { | ||
| return fetchAndCache(context.Background()) | ||
| }) | ||
|
|
||
| // loadCatalog returns the catalog, fetching it at most once per process run. | ||
| // On network failure it falls back to the disk cache. | ||
| func loadCatalog(_ context.Context) (Catalog, error) { | ||
| return catalogOnce() | ||
| } | ||
|
|
||
| // ensureCatalogLoaded loads the catalog from cache or network on first access. | ||
| func ensureCatalogLoaded() { | ||
| catalogMu.RLock() | ||
| loaded := catalogLoaded | ||
| catalogMu.RUnlock() | ||
| // fetchAndCache tries to fetch the catalog from the network (using ETag for | ||
| // conditional requests) and falls back to the disk cache on any failure. | ||
| func fetchAndCache(ctx context.Context) (Catalog, error) { | ||
| cacheFile := cacheFilePath() | ||
| cached := loadFromDisk(cacheFile) | ||
|
|
||
| if loaded { | ||
| return | ||
| } | ||
|
|
||
| catalogMu.Lock() | ||
| defer catalogMu.Unlock() | ||
|
|
||
| // Double-check after acquiring write lock | ||
| if catalogLoaded { | ||
| return | ||
| } | ||
|
|
||
| cacheFile := getCacheFilePath() | ||
|
|
||
| // Try loading from local cache first | ||
| if cached, cacheAge, err := loadCatalogFromCache(cacheFile); err == nil { | ||
| slog.Debug("Loaded MCP catalog from cache", "file", cacheFile, "age", cacheAge.Round(time.Second)) | ||
| catalogData = cached | ||
| catalogLoaded = true | ||
| catalogStale = cacheAge > catalogCacheDuration | ||
| return | ||
| catalog, newETag, err := fetchFromNetwork(ctx, cached.ETag) | ||
| if err != nil { | ||
| slog.Debug("Failed to fetch MCP catalog from network, using cache", "error", err) | ||
| if cached.Catalog != nil { | ||
| return cached.Catalog, nil | ||
| } | ||
| return nil, fmt.Errorf("fetching MCP catalog: %w (no cached copy available)", err) | ||
| } | ||
|
|
||
| // Cache miss or invalid, fetch from network | ||
| catalog, err := fetchCatalogFromNetwork() | ||
| if err != nil { | ||
| slog.Error("Failed to fetch MCP catalog", "error", err) | ||
| return | ||
| // A nil catalog means 304 Not Modified — the cached copy is still valid. | ||
| if catalog == nil { | ||
| slog.Debug("MCP catalog not modified (ETag match)") | ||
| return cached.Catalog, nil | ||
| } | ||
|
|
||
| catalogData = catalog | ||
| catalogLoaded = true | ||
| catalogStale = false | ||
| slog.Debug("MCP catalog fetched from network") | ||
| saveToDisk(cacheFile, catalog, newETag) | ||
|
|
||
| // Save to cache (best effort) | ||
| if err := saveCatalogToCache(cacheFile, catalog); err != nil { | ||
| slog.Warn("Failed to save MCP catalog to cache", "error", err) | ||
| } | ||
| return catalog, nil | ||
| } | ||
|
|
||
| // triggerBackgroundRefresh starts a background goroutine to refresh the catalog. | ||
| // Only one background refresh will run at a time. | ||
| func triggerBackgroundRefresh() { | ||
| refreshOnce.Do(func() { | ||
| go func() { | ||
| refreshCatalogFromNetwork() | ||
| // Reset refreshOnce so future stale reads can trigger another refresh | ||
| refreshOnce = sync.Once{} | ||
| }() | ||
| }) | ||
| func cacheFilePath() string { | ||
| return filepath.Join(paths.GetCacheDir(), catalogCacheFileName) | ||
| } | ||
|
|
||
| // refreshCatalogFromNetwork fetches fresh catalog data and updates the cache. | ||
| // Returns true if refresh was successful. | ||
| func refreshCatalogFromNetwork() bool { | ||
| catalog, err := fetchCatalogFromNetwork() | ||
| func loadFromDisk(path string) cachedCatalog { | ||
| data, err := os.ReadFile(path) | ||
| if err != nil { | ||
| slog.Debug("Background catalog refresh failed", "error", err) | ||
| return false | ||
| return cachedCatalog{} | ||
| } | ||
|
|
||
| catalogMu.Lock() | ||
| catalogData = catalog | ||
| catalogStale = false | ||
| catalogMu.Unlock() | ||
|
|
||
| // Save to cache (best effort) | ||
| if err := saveCatalogToCache(getCacheFilePath(), catalog); err != nil { | ||
| slog.Warn("Failed to save refreshed MCP catalog to cache", "error", err) | ||
| var cached cachedCatalog | ||
| if err := json.Unmarshal(data, &cached); err != nil { | ||
| return cachedCatalog{} | ||
| } | ||
|
|
||
| slog.Debug("MCP catalog refreshed from network") | ||
| return true | ||
| return cached | ||
| } | ||
|
|
||
| func getCacheFilePath() string { | ||
| homeDir, err := os.UserHomeDir() | ||
| func saveToDisk(path string, catalog Catalog, etag string) { | ||
| data, err := json.Marshal(cachedCatalog{Catalog: catalog, ETag: etag}) | ||
| if err != nil { | ||
| return "" | ||
| slog.Warn("Failed to marshal MCP catalog cache", "error", err) | ||
| return | ||
| } | ||
| return filepath.Join(homeDir, ".cagent", catalogCacheFileName) | ||
| } | ||
|
|
||
| func loadCatalogFromCache(cacheFile string) (Catalog, time.Duration, error) { | ||
| if cacheFile == "" { | ||
| return nil, 0, errors.New("no cache file path") | ||
| dir := filepath.Dir(path) | ||
| if err := os.MkdirAll(dir, 0o755); err != nil { | ||
| slog.Warn("Failed to create MCP catalog cache directory", "error", err) | ||
| return | ||
| } | ||
|
|
||
| data, err := os.ReadFile(cacheFile) | ||
| // Write to a temp file and rename so readers never see a partial file. | ||
| tmp, err := os.CreateTemp(dir, ".mcp_catalog_*.tmp") | ||
| if err != nil { | ||
| return nil, 0, fmt.Errorf("failed to read cache file: %w", err) | ||
| slog.Warn("Failed to create MCP catalog temp file", "error", err) | ||
| return | ||
| } | ||
| tmpName := tmp.Name() | ||
|
|
||
| var cached cachedCatalog | ||
| if err := json.Unmarshal(data, &cached); err != nil { | ||
| return nil, 0, fmt.Errorf("failed to unmarshal cached data: %w", err) | ||
| if _, err := tmp.Write(data); err != nil { | ||
| tmp.Close() | ||
| os.Remove(tmpName) | ||
| slog.Warn("Failed to write MCP catalog temp file", "error", err) | ||
| return | ||
| } | ||
|
|
||
| cacheAge := time.Since(cached.CachedAt) | ||
| return cached.Catalog, cacheAge, nil | ||
| } | ||
|
|
||
| func saveCatalogToCache(cacheFile string, catalog Catalog) error { | ||
| if cacheFile == "" { | ||
| return nil | ||
| if err := tmp.Close(); err != nil { | ||
| os.Remove(tmpName) | ||
| slog.Warn("Failed to close MCP catalog temp file", "error", err) | ||
| return | ||
| } | ||
|
|
||
| // Ensure directory exists | ||
| if err := os.MkdirAll(filepath.Dir(cacheFile), 0o755); err != nil { | ||
| return fmt.Errorf("failed to create cache directory: %w", err) | ||
| if err := os.Rename(tmpName, path); err != nil { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MEDIUM: Temp file leak when os.Rename fails When Over time with repeated failures, this will accumulate Suggested fix: if err := os.Rename(tmpName, path); err != nil {
if rmErr := os.Remove(tmpName); rmErr != nil {
slog.Warn("Failed to remove temp file after rename failure", "temp_file", tmpName, "rename_error", err, "remove_error", rmErr)
} else {
slog.Warn("Failed to rename MCP catalog cache file", "error", err)
}
} |
||
| os.Remove(tmpName) | ||
| slog.Warn("Failed to rename MCP catalog cache file", "error", err) | ||
| } | ||
| } | ||
|
|
||
| cached := cachedCatalog{ | ||
| Catalog: catalog, | ||
| CachedAt: time.Now(), | ||
| } | ||
| // fetchFromNetwork fetches the catalog, using the ETag for conditional requests. | ||
| // It returns (nil, "", nil) when the server responds with 304 Not Modified. | ||
| func fetchFromNetwork(ctx context.Context, etag string) (Catalog, string, error) { | ||
| ctx, cancel := context.WithTimeout(ctx, fetchTimeout) | ||
| defer cancel() | ||
|
|
||
| data, err := json.Marshal(cached) | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalogJSON, http.NoBody) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal cached data: %w", err) | ||
| return nil, "", err | ||
| } | ||
|
|
||
| if err := os.WriteFile(cacheFile, data, 0o644); err != nil { | ||
| return fmt.Errorf("failed to write cache file: %w", err) | ||
| if etag != "" { | ||
| req.Header.Set("If-None-Match", etag) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func fetchCatalogFromNetwork() (Catalog, error) { | ||
| // Use the JSON version because it's 3x time faster to parse than YAML. | ||
| url := strings.Replace(DockerCatalogURL, ".yaml", ".json", 1) | ||
|
|
||
| resp, err := http.Get(url) | ||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| return nil, err | ||
| return nil, "", err | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| if resp.StatusCode == http.StatusNotModified { | ||
| return nil, "", nil | ||
| } | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MEDIUM: HTTP response body not drained on error status When the HTTP status is not 200 or 304, the function returns an error without reading the response body. According to Go's Since this code uses Suggested fix: if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body) // Drain body to enable connection reuse
return nil, "", fmt.Errorf("unexpected status fetching MCP catalog: %s", resp.Status)
}Don't forget to add |
||
| return nil, fmt.Errorf("failed to fetch URL: %s, status: %s", url, resp.Status) | ||
| return nil, "", fmt.Errorf("unexpected status fetching MCP catalog: %s", resp.Status) | ||
| } | ||
|
|
||
| var topLevel topLevel | ||
| if err := json.NewDecoder(resp.Body).Decode(&topLevel); err != nil { | ||
| return nil, err | ||
| var top topLevel | ||
| if err := json.NewDecoder(resp.Body).Decode(&top); err != nil { | ||
| return nil, "", err | ||
| } | ||
|
|
||
| return topLevel.Catalog, nil | ||
| return top.Catalog, resp.Header.Get("ETag"), nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 HIGH: Context timeout/cancellation ignored
The
sync.OnceValuespattern usescontext.Background(), which completely ignores any timeout or cancellation from the caller's context. This means:fetchTimeout) waiting for the catalog fetch to completeWhile the comment explains this prevents caching cancellation errors, it creates a production issue where different components with different timeout requirements cannot enforce their own deadlines.
Suggested fix: Consider one of these approaches: