From 139a235bbd20fcee78b654ee567614a32b710550 Mon Sep 17 00:00:00 2001 From: Pantelis Roditis Date: Sat, 28 Feb 2026 19:43:43 +0200 Subject: [PATCH 1/3] plug file-descriptor leaks --- main.go | 128 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 84 insertions(+), 44 deletions(-) diff --git a/main.go b/main.go index 2621512..52e90f6 100644 --- a/main.go +++ b/main.go @@ -16,12 +16,11 @@ import ( "syscall" "time" + _ "github.com/go-sql-driver/mysql" "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" logrus "github.com/sirupsen/logrus" - - _ "github.com/go-sql-driver/mysql" _ "modernc.org/sqlite" ) @@ -29,14 +28,12 @@ import ( // GLOBAL CONFIG // /////////////////// var ( - dbDSN string - dbDriver string - serverAddr string - allowedOrigins []string - - pongWait = 60 * time.Second - pingPeriod = 30 * time.Second - + dbDSN string + dbDriver string + serverAddr string + allowedOrigins []string + pongWait = 60 * time.Second + pingPeriod = 30 * time.Second offlineTTL time.Duration maxQueuedMessagesPerPlayer int maxConnectionsPerPlayer int @@ -101,6 +98,9 @@ var ( // Rate limiting limiters = make(map[string]*limiter) lm sync.Mutex + + // logFileHandle holds the open log file so it can be closed on shutdown. + logFileHandle *os.File ) type limiter struct { @@ -250,7 +250,8 @@ func registerConnection(playerID string, c *websocket.Conn, token string) { flushPendingMessages(playerID, c) } -// unregisterConnection removes a websocket connection for a player and decrements the active connections metric. +// unregisterConnection removes a websocket connection for a player, explicitly closes the underlying +// websocket (releasing the file descriptor), and decrements the active connections metric. // If no connections remain for the player, the player's entry is removed from the players map. func unregisterConnection(playerID string, c *websocket.Conn) { mu.Lock() @@ -259,6 +260,8 @@ func unregisterConnection(playerID string, c *websocket.Conn) { if len(players[playerID]) == 0 { delete(players, playerID) } + + _ = c.Close() connections.Dec() } @@ -317,8 +320,8 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { // check connection limit mu.Lock() current := len(players[playerID]) - mu.Unlock() if current >= maxConnectionsPerPlayer { + mu.Unlock() logrus.WithFields(logrus.Fields{ "player_id": playerID, "current": current, @@ -327,6 +330,7 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "too many connections", http.StatusTooManyRequests) return } + mu.Unlock() // upgrade to WebSocket conn, err := upgrader.Upgrade(w, r, nil) @@ -353,9 +357,18 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() + + done := make(chan struct{}) + defer close(done) + go func() { - for range ticker.C { - _ = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)) + for { + select { + case <-done: + return + case <-ticker.C: + _ = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)) + } } }() @@ -501,26 +514,33 @@ func initMetrics() { // startTokenRevalidation periodically validates all active websocket tokens. // Invalid tokens cause connections to be closed and removed. -func startTokenRevalidation(interval time.Duration) { +// The provided stopCh can be closed to stop the revalidation loop and its ticker. +func startTokenRevalidation(interval time.Duration, stopCh <-chan struct{}) { ticker := time.NewTicker(interval) go func() { - for range ticker.C { - mu.Lock() - for playerID, conns := range players { - for c, wc := range conns { - _, valid := validateToken(wc.token, false) - if !valid { - logrus.WithFields(logrus.Fields{ - "player_id": playerID, - }).Info("Token invalid, closing connection") - _ = wc.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "token expired")) - _ = wc.conn.Close() - delete(conns, c) - connections.Dec() + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + mu.Lock() + for playerID, conns := range players { + for c, wc := range conns { + _, valid := validateToken(wc.token, false) + if !valid { + logrus.WithFields(logrus.Fields{ + "player_id": playerID, + }).Info("Token invalid, closing connection") + _ = wc.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "token expired")) + _ = wc.conn.Close() + delete(conns, c) + connections.Dec() + } } } + mu.Unlock() } - mu.Unlock() } }() } @@ -588,6 +608,7 @@ func daemonizeSelf() error { // setupLogging configures logrus logging for the application. // It sets the output destination and log level based on global flags. // Returns an error if the log file cannot be opened or if the log level is invalid. +// The opened log file handle is stored in logFileHandle so it can be closed on shutdown. func setupLogging() error { logrus.SetFormatter(&logrus.JSONFormatter{}) @@ -596,6 +617,8 @@ func setupLogging() error { if err != nil { return fmt.Errorf("failed to open log file %s: %w", logFile, err) } + + logFileHandle = f logrus.SetOutput(f) } else { logrus.SetOutput(os.Stdout) @@ -657,11 +680,17 @@ func run() error { return fmt.Errorf("failed to setup logging: %w", err) } + if logFileHandle != nil { + defer logFileHandle.Close() + } + // Initialize DB if err := initDB(); err != nil { return fmt.Errorf("failed to init DB: %w", err) } + defer db.Close() + // Daemonize if needed if daemonize { if err := daemonizeSelf(); err != nil { @@ -671,31 +700,40 @@ func run() error { initMetrics() + // stopCh is closed on shutdown to signal background goroutines to exit. + stopCh := make(chan struct{}) + // Start offline message cleanup go func() { ticker := time.NewTicker(30 * time.Second) - for range ticker.C { - now := time.Now() - pendingMu.Lock() - for pid, msgs := range pendingMessages { - filtered := msgs[:0] - for _, pm := range msgs { - if now.Sub(pm.timestamp) <= offlineTTL { - filtered = append(filtered, pm) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + now := time.Now() + pendingMu.Lock() + for pid, msgs := range pendingMessages { + filtered := msgs[:0] + for _, pm := range msgs { + if now.Sub(pm.timestamp) <= offlineTTL { + filtered = append(filtered, pm) + } + } + if len(filtered) == 0 { + delete(pendingMessages, pid) + } else { + pendingMessages[pid] = filtered } } - if len(filtered) == 0 { - delete(pendingMessages, pid) - } else { - pendingMessages[pid] = filtered - } + pendingMu.Unlock() } - pendingMu.Unlock() } }() // start WS token revalidation - startTokenRevalidation(tokenRevalidationPeriod) + startTokenRevalidation(tokenRevalidationPeriod, stopCh) mux := http.NewServeMux() mux.HandleFunc("/ws", wsHandler) @@ -711,6 +749,8 @@ func run() error { go func() { <-quit logrus.Info("Shutting down server...") + // Signal all background goroutines (cleanup, revalidation) to stop. + close(stopCh) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = server.Shutdown(ctx) From a05a9db6362f23069802b7a445e9f5d106a1dc72 Mon Sep 17 00:00:00 2001 From: Pantelis Roditis Date: Sat, 28 Feb 2026 20:02:46 +0200 Subject: [PATCH 2/3] reduce complexity --- main.go | 132 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 73 insertions(+), 59 deletions(-) diff --git a/main.go b/main.go index 52e90f6..022c9c1 100644 --- a/main.go +++ b/main.go @@ -512,6 +512,77 @@ func initMetrics() { prometheus.MustRegister(connections, messagesPublished, messagesDelivered) } +// startOfflineMessageCleanup periodically removes expired pending messages from the offline queue. +// It stops when stopCh is closed. +func startOfflineMessageCleanup(stopCh <-chan struct{}) { + go func() { + ticker := time.NewTicker(30 * time.Second) + // FIX 8: Stop the cleanup ticker on shutdown so the goroutine can exit cleanly. + defer ticker.Stop() + + for { + select { + case <-stopCh: + return + case <-ticker.C: + now := time.Now() + pendingMu.Lock() + for pid, msgs := range pendingMessages { + filtered := msgs[:0] + for _, pm := range msgs { + if now.Sub(pm.timestamp) <= offlineTTL { + filtered = append(filtered, pm) + } + } + if len(filtered) == 0 { + delete(pendingMessages, pid) + } else { + pendingMessages[pid] = filtered + } + } + pendingMu.Unlock() + } + } + }() +} + +// buildServer constructs and returns the HTTP server and its ServeMux with all routes registered. +func buildServer() *http.Server { + mux := http.NewServeMux() + mux.HandleFunc("/ws", wsHandler) + mux.HandleFunc("/publish", publishHandler) + mux.HandleFunc("/broadcast", broadcastHandler) + mux.Handle("/metrics", promhttp.Handler()) + + return &http.Server{Addr: serverAddr, Handler: mux} +} + +// runServer starts the HTTP server and blocks until it shuts down. +// It listens for SIGINT/SIGTERM, closes stopCh to signal background goroutines, +// then performs a graceful HTTP shutdown followed by closing all websocket connections. +// Returns an error if the server exits unexpectedly. +func runServer(server *http.Server, stopCh chan struct{}) error { + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-quit + logrus.Info("Shutting down server...") + // Signal all background goroutines (cleanup, revalidation) to stop. + close(stopCh) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = server.Shutdown(ctx) + closeAllConnections() + }() + + logrus.Infof("Server listening on %s", serverAddr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return fmt.Errorf("server error: %w", err) + } + return nil +} + // startTokenRevalidation periodically validates all active websocket tokens. // Invalid tokens cause connections to be closed and removed. // The provided stopCh can be closed to stop the revalidation loop and its ticker. @@ -703,65 +774,8 @@ func run() error { // stopCh is closed on shutdown to signal background goroutines to exit. stopCh := make(chan struct{}) - // Start offline message cleanup - go func() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - for { - select { - case <-stopCh: - return - case <-ticker.C: - now := time.Now() - pendingMu.Lock() - for pid, msgs := range pendingMessages { - filtered := msgs[:0] - for _, pm := range msgs { - if now.Sub(pm.timestamp) <= offlineTTL { - filtered = append(filtered, pm) - } - } - if len(filtered) == 0 { - delete(pendingMessages, pid) - } else { - pendingMessages[pid] = filtered - } - } - pendingMu.Unlock() - } - } - }() - - // start WS token revalidation + startOfflineMessageCleanup(stopCh) startTokenRevalidation(tokenRevalidationPeriod, stopCh) - mux := http.NewServeMux() - mux.HandleFunc("/ws", wsHandler) - mux.HandleFunc("/publish", publishHandler) - mux.HandleFunc("/broadcast", broadcastHandler) - mux.Handle("/metrics", promhttp.Handler()) - - server := &http.Server{Addr: serverAddr, Handler: mux} - - // Graceful shutdown - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-quit - logrus.Info("Shutting down server...") - // Signal all background goroutines (cleanup, revalidation) to stop. - close(stopCh) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _ = server.Shutdown(ctx) - closeAllConnections() - }() - - logrus.Infof("Server listening on %s", serverAddr) - err := server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - return fmt.Errorf("server error: %w", err) - } - - return nil + return runServer(buildServer(), stopCh) } From 353d37c05cb1a370598a5ea4d49279b9cadf63dc Mon Sep 17 00:00:00 2001 From: Pantelis Roditis Date: Sat, 28 Feb 2026 20:03:19 +0200 Subject: [PATCH 3/3] remove the fix comment --- main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/main.go b/main.go index 022c9c1..64fe733 100644 --- a/main.go +++ b/main.go @@ -517,7 +517,6 @@ func initMetrics() { func startOfflineMessageCleanup(stopCh <-chan struct{}) { go func() { ticker := time.NewTicker(30 * time.Second) - // FIX 8: Stop the cleanup ticker on shutdown so the goroutine can exit cleanly. defer ticker.Stop() for {