diff --git a/cmd/rdsync/abort.go b/cmd/rdsync/abort.go index 11c6af5..190684a 100644 --- a/cmd/rdsync/abort.go +++ b/cmd/rdsync/abort.go @@ -19,7 +19,9 @@ var abortCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliAbort()) + code := app.CliAbort() + app.CloseLogger() + os.Exit(code) }, } diff --git a/cmd/rdsync/hosts.go b/cmd/rdsync/hosts.go index 5a8e135..f4a88ba 100644 --- a/cmd/rdsync/hosts.go +++ b/cmd/rdsync/hosts.go @@ -24,7 +24,9 @@ var hostListCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliHostList()) + code := app.CliHostList() + app.CloseLogger() + os.Exit(code) }, } @@ -47,7 +49,9 @@ var hostAddCmd = &cobra.Command{ } }) - os.Exit(app.CliHostAdd(args[0], priorityVal, dryRun, skipValkeyCheck)) + code := app.CliHostAdd(args[0], priorityVal, dryRun, skipValkeyCheck) + app.CloseLogger() + os.Exit(code) }, } @@ -61,7 +65,9 @@ var hostRemoveCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliHostRemove(args[0])) + code := app.CliHostRemove(args[0]) + app.CloseLogger() + os.Exit(code) }, } diff --git a/cmd/rdsync/info.go b/cmd/rdsync/info.go index 589426d..e376f1c 100644 --- a/cmd/rdsync/info.go +++ b/cmd/rdsync/info.go @@ -18,7 +18,9 @@ var infoCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliInfo(verbose)) + code := app.CliInfo(verbose) + app.CloseLogger() + os.Exit(code) }, } diff --git a/cmd/rdsync/maintenance.go b/cmd/rdsync/maintenance.go index 247e17d..d3f0b2b 100644 --- a/cmd/rdsync/maintenance.go +++ b/cmd/rdsync/maintenance.go @@ -23,7 +23,9 @@ var maintCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliGetMaintenance()) + code := app.CliGetMaintenance() + app.CloseLogger() + os.Exit(code) }, } @@ -36,7 +38,9 @@ var maintOnCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliEnableMaintenance(maintWait)) + code := app.CliEnableMaintenance(maintWait) + app.CloseLogger() + os.Exit(code) }, } @@ -49,7 +53,9 @@ var maintOffCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliDisableMaintenance(maintWait)) + code := app.CliDisableMaintenance(maintWait) + app.CloseLogger() + os.Exit(code) }, } diff --git a/cmd/rdsync/state.go b/cmd/rdsync/state.go index 81ee38c..f288dd5 100644 --- a/cmd/rdsync/state.go +++ b/cmd/rdsync/state.go @@ -18,7 +18,9 @@ var stateCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliState(verbose)) + code := app.CliState(verbose) + app.CloseLogger() + os.Exit(code) }, } diff --git a/cmd/rdsync/switch.go b/cmd/rdsync/switch.go index 180cb53..969d664 100644 --- a/cmd/rdsync/switch.go +++ b/cmd/rdsync/switch.go @@ -25,7 +25,9 @@ var switchCmd = &cobra.Command{ fmt.Println(err) os.Exit(1) } - os.Exit(app.CliSwitch(switchFrom, switchTo, switchWait, switchForce)) + code := app.CliSwitch(switchFrom, switchTo, switchWait, switchForce) + app.CloseLogger() + os.Exit(code) }, } diff --git a/go.mod b/go.mod index 1979f12..dc9aef8 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/heetch/confita v0.11.0 github.com/moby/moby/api v1.54.2 github.com/moby/moby/client v0.4.1 + github.com/rs/zerolog v1.35.1 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 @@ -40,6 +41,8 @@ require ( github.com/hashicorp/go-memdb v1.3.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-isatty v0.0.22 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect @@ -50,6 +53,6 @@ require ( go.opentelemetry.io/otel v1.43.0 // indirect go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/trace v1.43.0 // indirect - golang.org/x/sys v0.43.0 // indirect + golang.org/x/sys v0.44.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d9314fe..cc38d41 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4= +github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/moby/api v1.54.2 h1:wiat9QAhnDQjA7wk1kh/TqHz2I1uUA7M7t9SAl/JNXg= @@ -88,6 +92,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/zerolog v1.35.1 h1:m7xQeoiLIiV0BCEY4Hs+j2NG4Gp2o2KPKmhnnLiazKI= +github.com/rs/zerolog v1.35.1/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= @@ -126,8 +132,8 @@ go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= -golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= -golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/app/active_nodes.go b/internal/app/active_nodes.go index 34b52e8..2d52d2d 100644 --- a/internal/app/active_nodes.go +++ b/internal/app/active_nodes.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "slices" "sort" "strings" @@ -48,13 +47,13 @@ func (app *App) actualizeQuorumReplicas(master string, activeNodes []string) err } if currentValue != expectedValue { - app.logger.Debug(fmt.Sprintf("Setting quorum replicas to %s on %s", expectedValue, master)) + app.logger.Debug().Msgf("Setting quorum replicas to %s on %s", expectedValue, master) err, rewriteErr := node.SetQuorumReplicas(app.ctx, expectedValue) if err != nil { return err } if rewriteErr != nil { - app.logger.Error("Unable to rewrite config", slog.String("fqdn", master), slog.Any("error", rewriteErr)) + app.logger.Error().Str("fqdn", master).Err(rewriteErr).Msg("Unable to rewrite config") } } @@ -76,21 +75,21 @@ func (app *App) updateActiveNodes(state, stateDcs map[string]*HostState, oldActi if removingNodes { err := app.dcs.Set(pathActiveNodes, activeNodes) if err != nil { - app.logger.Error("Update active nodes: failed to update active nodes in dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Update active nodes: failed to update active nodes in dcs") return err } } err := app.actualizeQuorumReplicas(master, activeNodes) if err != nil { - app.logger.Error("Update active nodes: failed to actualize quorum replicas", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Update active nodes: failed to actualize quorum replicas") return err } if !removingNodes { err := app.dcs.Set(pathActiveNodes, activeNodes) if err != nil { - app.logger.Error("Update active nodes: failed to update active nodes in dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Update active nodes: failed to update active nodes in dcs") return err } } @@ -117,7 +116,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive if !node.PingOk { if stateDcs[host].PingOk { if slices.Contains(oldActiveNodes, host) { - app.logger.Warn(fmt.Sprintf("Calc active nodes: %s keeps health lock in dcs, keeping active...", host)) + app.logger.Warn().Msgf("Calc active nodes: %s keeps health lock in dcs, keeping active...", host) activeNodes = append(activeNodes, host) } continue @@ -128,23 +127,23 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive failTime := time.Since(app.nodeFailTime[host]) if failTime < app.config.InactivationDelay { if slices.Contains(oldActiveNodes, host) { - app.logger.Warn(fmt.Sprintf("Calc active nodes: %s is failing, remaining %v", host, app.config.InactivationDelay-failTime)) + app.logger.Warn().Msgf("Calc active nodes: %s is failing, remaining %v", host, app.config.InactivationDelay-failTime) activeNodes = append(activeNodes, host) } continue } - app.logger.Error(fmt.Sprintf("Calc active nodes: %s is down, deleting from active...", host)) + app.logger.Error().Msgf("Calc active nodes: %s is down, deleting from active...", host) continue } else if !stateDcs[host].IsOffline { delete(app.nodeFailTime, host) } replicaState := node.ReplicaState if replicaState == nil { - app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host)) + app.logger.Warn().Msgf("Calc active nodes: lost master %s", host) continue } if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) { - app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host)) + app.logger.Error().Msgf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host) continue } activeNodes = append(activeNodes, host) diff --git a/internal/app/app.go b/internal/app/app.go index 0c9f532..6d203ee 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -3,7 +3,7 @@ package app import ( "context" "fmt" - "log/slog" + "io" "os" "os/signal" "sync/atomic" @@ -11,6 +11,7 @@ import ( "time" "github.com/gofrs/flock" + "github.com/rs/zerolog" "github.com/yandex/rdsync/internal/config" "github.com/yandex/rdsync/internal/dcs" @@ -27,7 +28,8 @@ type App struct { dcs dcs.DCS config *config.Config splitTime map[string]time.Time - logger *slog.Logger + logger *zerolog.Logger + loggerCloser io.Closer nodeFailTime map[string]time.Time shard *valkey.Shard cache *valkey.SentiCacheNode @@ -49,20 +51,6 @@ func baseContext() context.Context { return ctx } -func parseLevel(level string) (slog.Level, error) { - switch level { - case "Debug": - return slog.LevelDebug, nil - case "Info": - return slog.LevelInfo, nil - case "Warn": - return slog.LevelWarn, nil - case "Error": - return slog.LevelError, nil - } - return slog.LevelInfo, fmt.Errorf("unknown error level: %s", level) -} - // NewApp is an App constructor func NewApp(configFile, logLevel string) (*App, error) { conf, err := config.ReadFromFile(configFile) @@ -77,7 +65,7 @@ func NewApp(configFile, logLevel string) (*App, error) { if err != nil { return nil, err } - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevelN})) + logger, loggerCloser := newMainLogger(logLevelN, conf.LogBufferSize, conf.LogPollInterval) mode, err := parseMode(conf.Mode) if err != nil { return nil, err @@ -94,6 +82,7 @@ func NewApp(configFile, logLevel string) (*App, error) { splitTime: make(map[string]time.Time), state: stateInit, logger: logger, + loggerCloser: loggerCloser, config: conf, } app.critical.Store(false) @@ -110,21 +99,26 @@ func (app *App) connectDCS() error { } func (app *App) reconnectDCS() error { - app.logger.Info("Attempting DCS reconnection after prolonged Lost state") + app.logger.Info().Msg("Attempting DCS reconnection after prolonged Lost state") oldDCS := app.dcs err := app.connectDCS() if err != nil { - app.logger.Error("DCS reconnection failed", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("DCS reconnection failed") app.dcs = oldDCS return err } app.dcs.SetDisconnectCallback(func() error { return app.handleCritical() }) app.shard.SetDCS(app.dcs) oldDCS.Close() - app.logger.Info("DCS reconnection successful") + app.logger.Info().Msg("DCS reconnection successful") return nil } +// CloseLogger drains the logger's queue +func (app *App) CloseLogger() { + app.loggerCloser.Close() +} + func (app *App) lockDaemonFile() { app.daemonLock = flock.New(app.config.DaemonLockFile) if locked, err := app.daemonLock.TryLock(); !locked { @@ -132,7 +126,8 @@ func (app *App) lockDaemonFile() { if err != nil { msg = err.Error() } - app.logger.Error(fmt.Sprintf("Unable to acquire daemon lock on %s", app.config.DaemonLockFile), slog.Any("error", msg)) + app.logger.Error().Str("error", msg).Msgf("Unable to acquire daemon lock on %s", app.config.DaemonLockFile) + app.CloseLogger() os.Exit(1) } } @@ -140,7 +135,7 @@ func (app *App) lockDaemonFile() { func (app *App) unlockDaemonFile() { err := app.daemonLock.Unlock() if err != nil { - app.logger.Error(fmt.Sprintf("Unable to unlock daemon lock %s", app.config.DaemonLockFile), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to unlock daemon lock %s", app.config.DaemonLockFile) } } @@ -148,13 +143,14 @@ func (app *App) unlockDaemonFile() { func (app *App) Run() int { app.lockDaemonFile() defer app.unlockDaemonFile() + defer app.loggerCloser.Close() app.timings = newTimingReporter(app.config, app.logger) defer app.timings.Close() err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -165,7 +161,7 @@ func (app *App) Run() int { if app.mode == modeSentinel { app.cache, err = valkey.NewSentiCacheNode(app.config, app.logger) if err != nil { - app.logger.Error("Unable to init senticache node", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to init senticache node") return 1 } defer app.cache.Close() @@ -186,7 +182,7 @@ func (app *App) Run() int { app.timings.Reopen() case <-ticker.C: for { - app.logger.Info(fmt.Sprintf("Rdsync state: %s", app.state)) + app.logger.Info().Msgf("Rdsync state: %s", app.state) stateHandler := map[appState](func() appState){ stateInit: app.stateInit, stateManager: app.stateManager, diff --git a/internal/app/cache.go b/internal/app/cache.go index 37ee49d..40a568e 100644 --- a/internal/app/cache.go +++ b/internal/app/cache.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "time" "github.com/yandex/rdsync/internal/valkey" @@ -96,7 +95,7 @@ func (app *App) cacheUpdater() { err = app.updateCache(dcsState, app.cache) } if err != nil { - app.logger.Error("CacheUpdater: failed to update cache", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("CacheUpdater: failed to update cache") } case <-app.ctx.Done(): diff --git a/internal/app/candidate.go b/internal/app/candidate.go index 7c96cfa..ebd65da 100644 --- a/internal/app/candidate.go +++ b/internal/app/candidate.go @@ -1,9 +1,6 @@ package app import ( - "fmt" - "log/slog" - "github.com/yandex/rdsync/internal/dcs" ) @@ -13,18 +10,18 @@ func (app *App) stateCandidate() appState { } err := app.shard.UpdateHostsInfo() if err != nil { - app.logger.Error("Candidate: failed to update host info from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Candidate: failed to update host info from DCS") return stateCandidate } shardState, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("Failed to get shard state from DB", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get shard state from DB") } else { - app.logger.Info(fmt.Sprintf("Shard state: %v", shardState)) + app.logger.Info().Msgf("Shard state: %v", shardState) } maintenance, err := app.GetMaintenance() if err != nil && err != dcs.ErrNotFound { - app.logger.Error("Candidate: failed to get maintenance from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Candidate: failed to get maintenance from DCS") return stateCandidate } if maintenance != nil && maintenance.RdSyncPaused { @@ -33,13 +30,13 @@ func (app *App) stateCandidate() appState { poisonPill, err := app.getPoisonPill() if err != nil && err != dcs.ErrNotFound { - app.logger.Error("Candidate: failed to get poison pill from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Candidate: failed to get poison pill from DCS") return stateCandidate } if poisonPill != nil { err = app.applyPoisonPill(poisonPill) if err != nil { - app.logger.Error("Candidate: failed to apply poison pill", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Candidate: failed to apply poison pill") return stateCandidate } if poisonPill.TargetHost == app.config.Hostname { @@ -50,7 +47,7 @@ func (app *App) stateCandidate() appState { var master string err = app.dcs.Get(pathMasterNode, &master) if err != nil && err != dcs.ErrNotFound { - app.logger.Error("Candidate: failed to get current master from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Candidate: failed to get current master from DCS") return stateCandidate } app.repairLocalNode(master) diff --git a/internal/app/checks.go b/internal/app/checks.go index a3367a2..0765d34 100644 --- a/internal/app/checks.go +++ b/internal/app/checks.go @@ -1,26 +1,21 @@ package app -import ( - "fmt" - "log/slog" -) - func (app *App) checkHAReplicasRunning() bool { hosts := len(app.shard.Hosts()) if hosts == 1 { - app.logger.Info("Check HA replicas ok: single node mode") + app.logger.Info().Msg("Check HA replicas ok: single node mode") return true } state, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("Check HA replicas failed", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Check HA replicas failed") return false } local := app.shard.Local() localState, ok := state[local.FQDN()] if !ok { - app.logger.Error("Unable to find local node in state", slog.String("fqdn", local.FQDN())) + app.logger.Error().Str("fqdn", local.FQDN()).Msg("Unable to find local node in state") return false } @@ -30,7 +25,7 @@ func (app *App) checkHAReplicasRunning() bool { availableReplicas := 0 for host, hostState := range state { if getOffset(hostState) > baseOffset { - app.logger.Warn("Host is ahead in replication history", slog.String("fqdn", host)) + app.logger.Warn().Str("fqdn", host).Msg("Host is ahead in replication history") aheadHosts++ } if hostState.PingOk && !hostState.IsMaster { @@ -41,13 +36,13 @@ func (app *App) checkHAReplicasRunning() bool { } if aheadHosts > 0 { - app.logger.Error(fmt.Sprintf("Not making local node online: %d nodes are ahead in replication history", aheadHosts)) + app.logger.Error().Msgf("Not making local node online: %d nodes are ahead in replication history", aheadHosts) } if availableReplicas >= hosts/2 { - app.logger.Info(fmt.Sprintf("Check HA replicas ok: %d replicas available", availableReplicas)) + app.logger.Info().Msgf("Check HA replicas ok: %d replicas available", availableReplicas) return true } - app.logger.Error(fmt.Sprintf("Check HA replicas failed: %d replicas available", availableReplicas)) + app.logger.Error().Msgf("Check HA replicas failed: %d replicas available", availableReplicas) return false } diff --git a/internal/app/cli.go b/internal/app/cli.go index 6c883e2..7ed1d3b 100644 --- a/internal/app/cli.go +++ b/internal/app/cli.go @@ -4,7 +4,7 @@ import ( "bufio" "context" "fmt" - "log/slog" + "os" "slices" "sort" @@ -21,7 +21,7 @@ import ( func (app *App) CliInfo(verbose bool) int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } app.dcs.Initialize() @@ -30,7 +30,7 @@ func (app *App) CliInfo(verbose bool) int { app.shard = valkey.NewShard(app.config, app.logger, app.dcs) defer app.shard.Close() if err := app.shard.UpdateHostsInfo(); err != nil { - app.logger.Error("Unable to update hosts info", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to update hosts info") return 1 } @@ -40,14 +40,14 @@ func (app *App) CliInfo(verbose bool) int { haNodes, err := app.shard.GetShardHostsFromDcs() if err != nil { - app.logger.Error("Failed to get hosts", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get hosts") return 1 } data[pathHANodes] = haNodes activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Error("Failed to get active nodes", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get active nodes") return 1 } sort.Strings(activeNodes) @@ -55,7 +55,7 @@ func (app *App) CliInfo(verbose bool) int { shardState, err := app.getShardStateFromDcs() if err != nil { - app.logger.Error("Failed to get shard state", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get shard state") return 1 } health := make(map[string]any) @@ -70,7 +70,7 @@ func (app *App) CliInfo(verbose bool) int { if err == nil { data[path] = switchover.String() } else if err != dcs.ErrNotFound { - app.logger.Error(fmt.Sprintf("Failed to get %s", path), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to get %s", path) return 1 } } @@ -80,7 +80,7 @@ func (app *App) CliInfo(verbose bool) int { if err == nil { data[pathMaintenance] = maintenance.String() } else if err != dcs.ErrNotFound { - app.logger.Error(fmt.Sprintf("Failed to get %s", pathMaintenance), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to get %s", pathMaintenance) return 1 } @@ -89,14 +89,14 @@ func (app *App) CliInfo(verbose bool) int { if err == nil { data[pathPoisonPill] = poisonPill.String() } else if err != dcs.ErrNotFound { - app.logger.Error(fmt.Sprintf("Failed to get %s", pathPoisonPill), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to get %s", pathPoisonPill) return 1 } var manager dcs.LockOwner err = app.dcs.Get(pathManagerLock, &manager) if err != nil && err != dcs.ErrNotFound { - app.logger.Error(fmt.Sprintf("Failed to get %s", pathManagerLock), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to get %s", pathManagerLock) return 1 } data[pathManagerLock] = manager.Hostname @@ -104,7 +104,7 @@ func (app *App) CliInfo(verbose bool) int { var master string err = app.dcs.Get(pathMasterNode, &master) if err != nil && err != dcs.ErrNotFound { - app.logger.Error(fmt.Sprintf("Failed to get %s", pathMasterNode), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to get %s", pathMasterNode) return 1 } data[pathMasterNode] = master @@ -112,13 +112,13 @@ func (app *App) CliInfo(verbose bool) int { } else { tree, err = app.dcs.GetTree("") if err != nil { - app.logger.Error("Failed to get tree", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get tree") return 1 } } data, err := yaml.Marshal(tree) if err != nil { - app.logger.Error("failed to marshal yaml", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("failed to marshal yaml") return 1 } fmt.Print(string(data)) @@ -129,7 +129,7 @@ func (app *App) CliInfo(verbose bool) int { func (app *App) CliState(verbose bool) int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -138,13 +138,13 @@ func (app *App) CliState(verbose bool) int { defer app.shard.Close() if err := app.shard.UpdateHostsInfo(); err != nil { - app.logger.Error("Unable to update hosts info", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to update hosts info") return 1 } shardState, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("Failed to get state", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get state") return 1 } var tree any @@ -159,7 +159,7 @@ func (app *App) CliState(verbose bool) int { } data, err := yaml.Marshal(tree) if err != nil { - app.logger.Error("Failed to marshal yaml", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to marshal yaml") return 1 } fmt.Print(string(data)) @@ -179,20 +179,20 @@ func matchPrefix(hosts []string, prefix string) []string { // CliSwitch performs manual switch-over of the master node func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration, switchForce bool) int { if switchFrom == "" && switchTo == "" { - app.logger.Error("Either --from or --to should be set") + app.logger.Error().Msg("Either --from or --to should be set") return 1 } if switchFrom != "" && switchTo != "" { - app.logger.Error("Option --from and --to can't be used at the same time") + app.logger.Error().Msg("Option --from and --to can't be used at the same time") return 1 } if switchFrom != "" && switchForce { - app.logger.Error("Option --from and --force can't be used at the same time") + app.logger.Error().Msg("Option --from and --force can't be used at the same time") return 1 } err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -201,12 +201,12 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration defer app.shard.Close() if err := app.shard.UpdateHostsInfo(); err != nil { - app.logger.Error("Unable to update hosts info", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to update hosts info") return 1 } if len(app.shard.Hosts()) == 1 { - app.logger.Info("switchover makes no sense on single node shard") + app.logger.Info().Msg("switchover makes no sense on single node shard") fmt.Println("switchover done") return 0 } @@ -215,43 +215,43 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration var currentMaster string if err := app.dcs.Get(pathMasterNode, ¤tMaster); err != nil { - app.logger.Error("Failed to get current master", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get current master") return 1 } activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Error("Unable to get active nodes", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get active nodes") return 1 } if switchTo != "" { desired := matchPrefix(app.shard.Hosts(), switchTo) if len(desired) == 0 { - app.logger.Error(fmt.Sprintf("No nodes match '%s'", switchTo)) + app.logger.Error().Msgf("No nodes match '%s'", switchTo) return 1 } if len(desired) > 1 { - app.logger.Error(fmt.Sprintf("More than one node matches '%s': %s", switchTo, desired)) + app.logger.Error().Msgf("More than one node matches '%s': %s", switchTo, desired) return 1 } toHost = desired[0] if toHost == currentMaster { - app.logger.Info(fmt.Sprintf("Master is already on %s, skipping...", toHost)) + app.logger.Info().Msgf("Master is already on %s, skipping...", toHost) fmt.Println("switchover done") return 0 } if !slices.Contains(activeNodes, toHost) { - app.logger.Error(fmt.Sprintf("%s is not active, can't switch to it", toHost)) + app.logger.Error().Msgf("%s is not active, can't switch to it", toHost) return 1 } } else { notDesired := matchPrefix(app.shard.Hosts(), switchFrom) if len(notDesired) == 0 { - app.logger.Error(fmt.Sprintf("No HA-nodes matches '%s'", switchFrom)) + app.logger.Error().Msgf("No HA-nodes matches '%s'", switchFrom) return 1 } if !slices.Contains(notDesired, currentMaster) { - app.logger.Info(fmt.Sprintf("Master is already not on %s, skipping...", notDesired)) + app.logger.Info().Msgf("Master is already not on %s, skipping...", notDesired) fmt.Println("switchover done") return 0 } @@ -262,7 +262,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } } if len(candidates) == 0 { - app.logger.Error(fmt.Sprintf("There are no active nodes, not matching '%s'", switchFrom)) + app.logger.Error().Msgf("There are no active nodes, not matching '%s'", switchFrom) return 1 } if len(notDesired) == 1 { @@ -270,12 +270,12 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } else { states, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("No actual shard state", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("No actual shard state") return 1 } toHost, err = app.getMostDesirableNode(states, switchFrom) if err != nil { - app.logger.Error("No desirable node", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("No desirable node") return 1 } } @@ -284,11 +284,11 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration var switchover Switchover err = app.dcs.Get(pathCurrentSwitch, &switchover) if err == nil { - app.logger.Error(fmt.Sprintf("Another switchover in progress %v", switchover)) + app.logger.Error().Msgf("Another switchover in progress %v", switchover) return 2 } if err != dcs.ErrNotFound { - app.logger.Error("Unable to get current switchover status", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get current switchover status") return 2 } @@ -301,18 +301,18 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration switchover.RunCount = 1 err = app.dcs.Set(pathActiveNodes, []string{toHost}) if err != nil { - app.logger.Error("Unable to update active nodes") + app.logger.Error().Msg("Unable to update active nodes") return 1 } } err = app.dcs.Create(pathCurrentSwitch, switchover) if err == dcs.ErrExists { - app.logger.Error("Another switchover in progress") + app.logger.Error().Msg("Another switchover in progress") return 2 } if err != nil { - app.logger.Error("Unable to create switchover in dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to create switchover in dcs") return 1 } // wait for switchover to complete @@ -336,10 +336,10 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } } if lastSwitchover.Result == nil { - app.logger.Error("Switchover did not finish until deadline") + app.logger.Error().Msg("Switchover did not finish until deadline") return 1 } else if !lastSwitchover.Result.Ok { - app.logger.Error("Could not wait for switchover to complete because of errors") + app.logger.Error().Msg("Could not wait for switchover to complete because of errors") return 1 } fmt.Println("switchover done") @@ -353,7 +353,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -365,7 +365,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { } err = app.dcs.Create(pathMaintenance, maintenance) if err != nil && err != dcs.ErrExists { - app.logger.Error("Unable to create maintenance path in dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to create maintenance path in dcs") return 1 } if waitTimeout > 0 { @@ -378,7 +378,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { case <-ticker.C: err = app.dcs.Get(pathMaintenance, maintenance) if err != nil { - app.logger.Error("Unable to get maintenance status from dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get maintenance status from dcs") } if maintenance.RdSyncPaused { break Out @@ -388,7 +388,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { } } if !maintenance.RdSyncPaused { - app.logger.Error("Rdsync did not enter maintenance within timeout") + app.logger.Error().Msg("Rdsync did not enter maintenance within timeout") return 1 } fmt.Println("maintenance enabled") @@ -402,7 +402,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -414,13 +414,13 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { fmt.Println("maintenance disabled") return 0 } else if err != nil { - app.logger.Error("Unable to get maintenance status from dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get maintenance status from dcs") return 1 } maintenance.ShouldLeave = true err = app.dcs.Set(pathMaintenance, maintenance) if err != nil { - app.logger.Error("Unable to update maintenance in dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to update maintenance in dcs") return 1 } if waitTimeout > 0 { @@ -437,14 +437,14 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { break Out } if err != nil { - app.logger.Error("Unable to get maintenance status from dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get maintenance status from dcs") } case <-waitCtx.Done(): break Out } } if maintenance != nil { - app.logger.Error("Rdsync did not leave maintenance within timeout") + app.logger.Error().Msg("Rdsync did not leave maintenance within timeout") return 1 } fmt.Println("maintenance disabled") @@ -458,7 +458,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { func (app *App) CliGetMaintenance() int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -478,7 +478,7 @@ func (app *App) CliGetMaintenance() int { fmt.Println("off") return 0 default: - app.logger.Error("Unable to get maintenance status", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get maintenance status") return 1 } } @@ -487,7 +487,7 @@ func (app *App) CliGetMaintenance() int { func (app *App) CliAbort() int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -499,7 +499,7 @@ func (app *App) CliAbort() int { return 0 } if err != nil { - app.logger.Error("Unable to get switchover status", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get switchover status") return 1 } @@ -508,7 +508,7 @@ func (app *App) CliAbort() int { reader := bufio.NewReader(os.Stdin) response, err := reader.ReadString('\n') if err != nil { - app.logger.Error("Unable to parse response", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to parse response") return 1 } if strings.TrimSpace(response) != phrase { @@ -518,7 +518,7 @@ func (app *App) CliAbort() int { err = app.dcs.Delete(pathCurrentSwitch) if err != nil { - app.logger.Error("Unable to remove switchover path from dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to remove switchover path from dcs") return 1 } @@ -530,7 +530,7 @@ func (app *App) CliAbort() int { func (app *App) CliHostList() int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } app.dcs.Initialize() @@ -543,7 +543,7 @@ func (app *App) CliHostList() int { hosts, err := app.shard.GetShardHostsFromDcs() if err != nil { - app.logger.Error("Failed to get hosts", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get hosts") return 1 } sort.Strings(hosts) @@ -551,7 +551,7 @@ func (app *App) CliHostList() int { out, err := yaml.Marshal(data) if err != nil { - app.logger.Error("Failed to marshal yaml", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to marshal yaml") return 1 } fmt.Print(string(out)) @@ -561,13 +561,13 @@ func (app *App) CliHostList() int { // CliHostAdd add hosts to the list of hosts in dcs func (app *App) CliHostAdd(host string, priority *int, dryRun bool, skipValkeyCheck bool) int { if priority != nil && *priority < 0 { - app.logger.Error(fmt.Sprintf("Priority must be >= 0. Got: %d", *priority)) + app.logger.Error().Msgf("Priority must be >= 0. Got: %d", *priority) return 1 } err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -585,13 +585,13 @@ func (app *App) CliHostAdd(host string, priority *int, dryRun bool, skipValkeyCh if !skipValkeyCheck { node, err := valkey.NewNode(app.config, app.logger, host) if err != nil { - app.logger.Error(fmt.Sprintf("Failed to check connection to %s, can't tell if it's alive", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to check connection to %s, can't tell if it's alive", host) return 1 } defer node.Close() _, _, _, _, _, err = node.GetState(app.ctx) if err != nil { - app.logger.Error(fmt.Sprintf("Node %s is dead", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Node %s is dead", host) return 1 } } @@ -599,7 +599,7 @@ func (app *App) CliHostAdd(host string, priority *int, dryRun bool, skipValkeyCh if !dryRun && priority == nil { err = app.dcs.Set(dcs.JoinPath(pathHANodes, host), *valkey.DefaultNodeConfiguration()) if err != nil && err != dcs.ErrExists { - app.logger.Error(fmt.Sprintf("Unable to create dcs path for %s", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to create dcs path for %s", host) return 1 } } @@ -625,7 +625,7 @@ func (app *App) CliHostAdd(host string, priority *int, dryRun bool, skipValkeyCh func (app *App) CliHostRemove(host string) int { err := app.connectDCS() if err != nil { - app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to connect to dcs") return 1 } defer app.dcs.Close() @@ -633,7 +633,7 @@ func (app *App) CliHostRemove(host string) int { err = app.dcs.Delete(dcs.JoinPath(pathHANodes, host)) if err != nil && err != dcs.ErrNotFound { - app.logger.Error(fmt.Sprintf("Unable to delete dcs path for %s", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to delete dcs path for %s", host) return 1 } fmt.Println("host has been removed") diff --git a/internal/app/critical.go b/internal/app/critical.go index 000cb7e..123f1ab 100644 --- a/internal/app/critical.go +++ b/internal/app/critical.go @@ -6,10 +6,10 @@ import ( func (app *App) handleCritical() error { if app.critical.Load().(bool) { - app.logger.Error("Lost dcs connection in critical section") + app.logger.Error().Msg("Lost dcs connection in critical section") os.Exit(1) } else { - app.logger.Info("Lost dcs connection in non-critical section") + app.logger.Info().Msg("Lost dcs connection in non-critical section") } return nil } diff --git a/internal/app/event_reporter.go b/internal/app/event_reporter.go index d09ec90..42221ea 100644 --- a/internal/app/event_reporter.go +++ b/internal/app/event_reporter.go @@ -1,48 +1,56 @@ package app import ( - "log/slog" + "io" "os" "sync" "time" + "github.com/rs/zerolog" + "github.com/yandex/rdsync/internal/config" ) // TimingReporter handles reporting event durations to a separate log file type TimingReporter struct { - logger *slog.Logger - appLogger *slog.Logger - file *os.File - path string - mu sync.Mutex + logger *zerolog.Logger + appLogger *zerolog.Logger + loggerCloser io.Closer + file *os.File + path string + bufSize int + pollInterval time.Duration + mu sync.Mutex } -func openTimingLog(path string) (*os.File, *slog.Logger, error) { +func openTimingLog(path string, bufSize int, pollInterval time.Duration) (*os.File, *zerolog.Logger, io.Closer, error) { f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - logger := slog.New(slog.NewTextHandler(f, &slog.HandlerOptions{Level: slog.LevelInfo})) - return f, logger, nil + logger, closer := newEventLogger(f, bufSize, pollInterval) + return f, logger, closer, nil } -func newTimingReporter(conf *config.Config, appLogger *slog.Logger) *TimingReporter { +func newTimingReporter(conf *config.Config, appLogger *zerolog.Logger) *TimingReporter { if conf.EventTimingLogFile == "" { return nil } - f, logger, err := openTimingLog(conf.EventTimingLogFile) + f, logger, closer, err := openTimingLog(conf.EventTimingLogFile, conf.LogBufferSize, conf.LogPollInterval) if err != nil { - appLogger.Error("Failed to open event timing log file", slog.String("path", conf.EventTimingLogFile), slog.Any("error", err)) + appLogger.Error().Err(err).Str("path", conf.EventTimingLogFile).Msg("Failed to open event timing log file") return nil } return &TimingReporter{ - logger: logger, - appLogger: appLogger, - file: f, - path: conf.EventTimingLogFile, + logger: logger, + appLogger: appLogger, + loggerCloser: closer, + file: f, + path: conf.EventTimingLogFile, + bufSize: conf.LogBufferSize, + pollInterval: conf.LogPollInterval, } } @@ -56,7 +64,7 @@ func (r *TimingReporter) reportTiming(eventType string, duration time.Duration) r.mu.Lock() defer r.mu.Unlock() - r.logger.Info("event_timing", slog.String("event", eventType), slog.Int64("duration_ms", duration.Milliseconds())) + r.logger.Info().Str("event", eventType).Int64("duration_ms", duration.Milliseconds()).Msg("event_timing") } // Reopen closes the current log file and opens it again at the same path. @@ -68,25 +76,33 @@ func (r *TimingReporter) Reopen() { return } - r.appLogger.Info("Reopening timing log file") + r.appLogger.Info().Msg("Reopening timing log file") r.mu.Lock() defer r.mu.Unlock() + // Flush and close the old diode before closing the file. + if r.loggerCloser != nil { + r.loggerCloser.Close() + r.loggerCloser = nil + } if r.file != nil { r.file.Close() + r.file = nil } - f, logger, err := openTimingLog(r.path) + f, logger, closer, err := openTimingLog(r.path, r.bufSize, r.pollInterval) if err != nil { - r.appLogger.Error("Failed to reopen event timing log file", slog.String("path", r.path), slog.Any("error", err)) - r.file = nil - r.logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) + r.appLogger.Error().Err(err).Str("path", r.path).Msg("Failed to reopen event timing log file") + // Fall back to a no-op logger so subsequent reportTiming calls don't panic. + nop := zerolog.Nop() + r.logger = &nop return } r.file = f r.logger = logger + r.loggerCloser = closer } // Close shuts down the reporter and closes the log file @@ -98,6 +114,10 @@ func (r *TimingReporter) Close() { r.mu.Lock() defer r.mu.Unlock() + if r.loggerCloser != nil { + r.loggerCloser.Close() + r.loggerCloser = nil + } if r.file != nil { r.file.Close() r.file = nil diff --git a/internal/app/event_reporter_test.go b/internal/app/event_reporter_test.go index b774802..09af4ec 100644 --- a/internal/app/event_reporter_test.go +++ b/internal/app/event_reporter_test.go @@ -1,20 +1,25 @@ package app import ( - "log/slog" "os" "testing" "time" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/yandex/rdsync/internal/config" ) +func testLogger() *zerolog.Logger { + l := zerolog.Nop() + return &l +} + func TestNewTimingReporterNil(t *testing.T) { conf := &config.Config{ EventTimingLogFile: "", // empty path } - logger := slog.Default() + logger := testLogger() reporter := newTimingReporter(conf, logger) require.Nil(t, reporter, "newTimingReporter should return nil when EventTimingLogFile is empty") @@ -43,8 +48,10 @@ func TestReportTimingWritesToFile(t *testing.T) { conf := &config.Config{ EventTimingLogFile: tmpPath, + LogBufferSize: 1000, + LogPollInterval: 10 * time.Millisecond, } - logger := slog.Default() + logger := testLogger() reporter := newTimingReporter(conf, logger) require.NotNil(t, reporter) @@ -67,8 +74,10 @@ func TestReportTimingWritesToFile(t *testing.T) { func TestNewTimingReporterInvalidPath(t *testing.T) { conf := &config.Config{ EventTimingLogFile: "/nonexistent/directory/timing.log", + LogBufferSize: 1000, + LogPollInterval: 10 * time.Millisecond, } - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + logger := testLogger() reporter := newTimingReporter(conf, logger) require.Nil(t, reporter, "newTimingReporter should return nil when log file cannot be opened") @@ -83,8 +92,10 @@ func TestReportTimingReopen(t *testing.T) { conf := &config.Config{ EventTimingLogFile: logPath, + LogBufferSize: 1000, + LogPollInterval: 10 * time.Millisecond, } - logger := slog.Default() + logger := testLogger() reporter := newTimingReporter(conf, logger) require.NotNil(t, reporter) @@ -104,6 +115,9 @@ func TestReportTimingReopen(t *testing.T) { // Write second event reporter.reportTiming("failover_complete", 2000*time.Millisecond) + // Flush and close to ensure all events are written. + reporter.Close() + // Verify rotated file contains only the first event rotatedContent, err := os.ReadFile(rotatedPath) require.NoError(t, err) diff --git a/internal/app/failover.go b/internal/app/failover.go index 0ff6df2..0c28e4d 100644 --- a/internal/app/failover.go +++ b/internal/app/failover.go @@ -47,7 +47,7 @@ func (app *App) approveFailover(shardState map[string]*HostState, activeNodes [] return fmt.Errorf("all replicas are alive and running replication, seems dcs problems") } - app.logger.Info(fmt.Sprintf("Approve failover: active nodes are %v", activeNodes)) + app.logger.Info().Msgf("Approve failover: active nodes are %v", activeNodes) permissibleReplicas := countAliveHAReplicasWithinNodes(activeNodes, shardState) failoverQuorum := app.getFailoverQuorum(activeNodes) if permissibleReplicas < failoverQuorum { diff --git a/internal/app/info_file.go b/internal/app/info_file.go index b2f6c36..ab0ffee 100644 --- a/internal/app/info_file.go +++ b/internal/app/info_file.go @@ -2,7 +2,6 @@ package app import ( json "encoding/json/v2" - "log/slog" "os" "time" ) @@ -14,19 +13,19 @@ func (app *App) stateFileHandler() { case <-ticker.C: tree, err := app.dcs.GetTree("") if err != nil { - app.logger.Error("StateFileHandler: failed to get current zk tree", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("StateFileHandler: failed to get current zk tree") _ = os.Remove(app.config.InfoFile) continue } data, err := json.Marshal(tree) if err != nil { - app.logger.Error("StateFileHandler: failed to marshal zk node data", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("StateFileHandler: failed to marshal zk node data") _ = os.Remove(app.config.InfoFile) continue } err = os.WriteFile(app.config.InfoFile, data, 0o640) if err != nil { - app.logger.Error("StateFileHandler: failed to write info file", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("StateFileHandler: failed to write info file") _ = os.Remove(app.config.InfoFile) continue } diff --git a/internal/app/lag.go b/internal/app/lag.go index 58008a7..258f3f2 100644 --- a/internal/app/lag.go +++ b/internal/app/lag.go @@ -73,7 +73,7 @@ func (app *App) getMostDesirableNode(shardState map[string]*HostState, switchove return "", fmt.Errorf("no hosts with psync possible from most recent one: %s", recent) } - app.logger.Info(fmt.Sprintf("Selecting most desirable within %s", recentNodes)) + app.logger.Info().Msgf("Selecting most desirable within %s", recentNodes) var priorityHost string var maxPriority int diff --git a/internal/app/local.go b/internal/app/local.go index 5394ca0..3786add 100644 --- a/internal/app/local.go +++ b/internal/app/local.go @@ -1,8 +1,6 @@ package app import ( - "fmt" - "log/slog" "time" "github.com/yandex/rdsync/internal/dcs" @@ -24,21 +22,21 @@ func (app *App) healthChecker() { select { case <-ticker.C: hc := app.getLocalState() - app.logger.Info(fmt.Sprintf("healthcheck: %v", hc)) + app.logger.Info().Msgf("healthcheck: %v", hc) if hc != nil { hcCheckTime = hc.CheckAt err := app.dcs.SetEphemeral(path, hc) if err != nil { - app.logger.Error("Failed to set healthcheck status to dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to set healthcheck status to dcs") } } else if !hcCheckTime.IsZero() { if time.Since(hcCheckTime) < 5*app.config.HealthCheckInterval { - app.logger.Warn("Unable to get local node state, leaving health node in dcs intact") + app.logger.Warn().Msg("Unable to get local node state, leaving health node in dcs intact") } else { - app.logger.Warn("Unable to get local node state, dropping health node from dcs") + app.logger.Warn().Msg("Unable to get local node state, dropping health node from dcs") err := app.dcs.Delete(path) if err != nil { - app.logger.Error("Failed to drop healthcheck status from dcs on dead local node", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to drop healthcheck status from dcs on dead local node") } hcCheckTime = time.Time{} } diff --git a/internal/app/logger.go b/internal/app/logger.go new file mode 100644 index 0000000..2c3217f --- /dev/null +++ b/internal/app/logger.go @@ -0,0 +1,72 @@ +package app + +import ( + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/diode" +) + +func parseLevel(level string) (zerolog.Level, error) { + switch level { + case "Debug": + return zerolog.DebugLevel, nil + case "Info": + return zerolog.InfoLevel, nil + case "Warn": + return zerolog.WarnLevel, nil + case "Error": + return zerolog.ErrorLevel, nil + } + return zerolog.InfoLevel, fmt.Errorf("unknown log level: %s", level) +} + +func levelToUpper(i interface{}) string { + if i == nil { + return "" + } + return strings.ToUpper(fmt.Sprintf("%-5s", i)) +} + +func newMainLogger(level zerolog.Level, bufSize int, poll time.Duration) (*zerolog.Logger, io.Closer) { + cw := zerolog.ConsoleWriter{ + Out: os.Stderr, + NoColor: true, + TimeFormat: time.RFC3339, + FormatLevel: levelToUpper, + } + dw := diode.NewWriter(cw, bufSize, poll, nil) + l := zerolog.New(dw).Level(level).With().Timestamp().Logger() + return &l, dw +} + +func newEventLogger(f *os.File, bufSize int, poll time.Duration) (*zerolog.Logger, io.Closer) { + cw := zerolog.ConsoleWriter{ + Out: f, + NoColor: true, + TimeFormat: time.RFC3339, + FieldsOrder: []string{"event", "duration_ms"}, + FormatTimestamp: func(i interface{}) string { + return fmt.Sprintf("time=%v", i) + }, + FormatLevel: func(i interface{}) string { + return fmt.Sprintf("level=%s", strings.ToUpper(fmt.Sprintf("%s", i))) + }, + FormatMessage: func(i interface{}) string { + return fmt.Sprintf("msg=%v", i) + }, + FormatFieldName: func(i interface{}) string { + return fmt.Sprintf("%s=", i) + }, + FormatFieldValue: func(i interface{}) string { + return fmt.Sprintf("%v", i) + }, + } + dw := diode.NewWriter(cw, bufSize, poll, nil) + l := zerolog.New(dw).Level(zerolog.InfoLevel).With().Timestamp().Logger() + return &l, dw +} diff --git a/internal/app/lost.go b/internal/app/lost.go index 2b3b957..830115d 100644 --- a/internal/app/lost.go +++ b/internal/app/lost.go @@ -1,8 +1,6 @@ package app import ( - "fmt" - "log/slog" "time" ) @@ -11,11 +9,11 @@ func (app *App) stateLost() appState { return stateCandidate } if !app.lostSince.IsZero() && time.Since(app.lostSince) >= app.config.DcsReconnectTimeout { - app.logger.Warn(fmt.Sprintf("Lost state persisted for %s, attempting DCS reconnection", time.Since(app.lostSince).Truncate(time.Second))) + app.logger.Warn().Msgf("Lost state persisted for %s, attempting DCS reconnection", time.Since(app.lostSince).Truncate(time.Second)) err := app.reconnectDCS() app.lostSince = time.Now() if err != nil { - app.logger.Error("DCS reconnection attempt failed, will retry later", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("DCS reconnection attempt failed, will retry later") } return stateLost } @@ -29,48 +27,48 @@ func (app *App) stateLost() appState { if app.checkHAReplicasRunning() { offline, err := node.IsOffline(app.ctx) if err != nil { - app.logger.Error("Failed to get node offline state", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Failed to get node offline state") return stateLost } if offline { - app.logger.Info("Rdsync have lost connection to ZK. However HA cluster is alive. Setting local node online") + app.logger.Info().Msg("Rdsync have lost connection to ZK. However HA cluster is alive. Setting local node online") err = node.SetOnline(app.ctx) if err != nil { - app.logger.Error("Unable to set local node online", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to set local node online") } return stateLost } - app.logger.Info("Rdsync have lost connection to ZK. However HA cluster is alive. Do nothing") + app.logger.Info().Msg("Rdsync have lost connection to ZK. However HA cluster is alive. Do nothing") return stateLost } } else { shardState, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("Failed to get shard state from DB", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get shard state from DB") return stateLost } - app.logger.Info(fmt.Sprintf("Shard state: %v", shardState)) + app.logger.Info().Msgf("Shard state: %v", shardState) master, err := app.getMasterHost(shardState) if err != nil || master == "" { - app.logger.Error("Failed to get master from shard state", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get master from shard state") } else { local := app.shard.Local() offline, err := local.IsOffline(app.ctx) if err != nil { - app.logger.Error("Failed to get node offline state", slog.String("fqdn", local.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", local.FQDN()).Err(err).Msg("Failed to get node offline state") return stateLost } if shardState[master].PingOk && shardState[master].PingStable && replicates(shardState[master], shardState[local.FQDN()].ReplicaState, local.FQDN(), app.shard.Get(master), false) && !app.isReplicaStale(shardState[local.FQDN()].ReplicaState, false) { if offline { - app.logger.Info("Rdsync have lost connection to ZK. However our replication connection is alive. Setting local node online") + app.logger.Info().Msg("Rdsync have lost connection to ZK. However our replication connection is alive. Setting local node online") err = node.SetOnline(app.ctx) if err != nil { - app.logger.Error("Unable to set local node online", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to set local node online") } return stateLost } - app.logger.Info("Rdsync have lost connection to ZK. However our replication connection is alive. Do nothing") + app.logger.Info().Msg("Rdsync have lost connection to ZK. However our replication connection is alive. Do nothing") return stateLost } } @@ -78,16 +76,16 @@ func (app *App) stateLost() appState { offline, err := node.IsOffline(app.ctx) if err != nil { - app.logger.Error("Failed to get node offline state", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Failed to get node offline state") return stateLost } if offline { return stateLost } if err := node.SetOffline(app.ctx); err != nil { - app.logger.Error("Failed to set node offline", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Failed to set node offline") return stateLost } - app.logger.Info("Rdsync have lost connection to ZK. Node is now offline", slog.String("fqdn", node.FQDN())) + app.logger.Info().Str("fqdn", node.FQDN()).Msg("Rdsync have lost connection to ZK. Node is now offline") return stateLost } diff --git a/internal/app/maintenance.go b/internal/app/maintenance.go index fc738ff..63d4609 100644 --- a/internal/app/maintenance.go +++ b/internal/app/maintenance.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "os" "github.com/yandex/rdsync/internal/dcs" @@ -64,7 +63,7 @@ func (app *App) leaveMaintenance() error { func (app *App) createMaintenanceFile() { err := os.WriteFile(app.config.MaintenanceFile, []byte(""), 0o640) if err != nil { - app.logger.Error("Failed to write maintenance file", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to write maintenance file") } } @@ -76,7 +75,7 @@ func (app *App) doesMaintenanceFileExist() bool { func (app *App) removeMaintenanceFile() { err := os.Remove(app.config.MaintenanceFile) if err != nil && !os.IsNotExist(err) { - app.logger.Error("Failed to remove maintenance file", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to remove maintenance file") } } @@ -100,10 +99,10 @@ func (app *App) stateMaintenance() appState { } if err == dcs.ErrNotFound || maintenance.ShouldLeave { if app.dcs.AcquireLock(pathManagerLock) { - app.logger.Info("Leaving maintenance") + app.logger.Info().Msg("Leaving maintenance") err := app.leaveMaintenance() if err != nil { - app.logger.Error("Failed to leave maintenance", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to leave maintenance") return stateMaintenance } app.removeMaintenanceFile() diff --git a/internal/app/manager.go b/internal/app/manager.go index 365eb04..e948f4e 100644 --- a/internal/app/manager.go +++ b/internal/app/manager.go @@ -3,7 +3,6 @@ package app import ( "context" "fmt" - "log/slog" "time" "github.com/yandex/rdsync/internal/dcs" @@ -19,48 +18,48 @@ func (app *App) stateManager() appState { err := app.shard.UpdateHostsInfo() if err != nil { - app.logger.Error("Updating hosts info failed", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Updating hosts info failed") } shardState, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("Failed to get shard state from DB", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get shard state from DB") return stateManager } shardStateDcs, err := app.getShardStateFromDcs() if err != nil { - app.logger.Error("Failed to get shard state from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get shard state from DCS") return stateManager } master, err := app.getCurrentMaster(shardState) if err != nil { - app.logger.Error("Failed to get or identify master", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get or identify master") return stateManager } activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Error("Failed to get active nodes", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get active nodes") return stateManager } - app.logger.Info(fmt.Sprintf("Active nodes: %v", activeNodes)) - app.logger.Info(fmt.Sprintf("Master: %s", master)) - app.logger.Info(fmt.Sprintf("Shard state: %v", shardState)) - app.logger.Info(fmt.Sprintf("DCS shard state: %v", shardStateDcs)) + app.logger.Info().Msgf("Active nodes: %v", activeNodes) + app.logger.Info().Msgf("Master: %s", master) + app.logger.Info().Msgf("Shard state: %v", shardState) + app.logger.Info().Msgf("DCS shard state: %v", shardStateDcs) maintenance, err := app.GetMaintenance() if err != nil && err != dcs.ErrNotFound { - app.logger.Error("Failed to get maintenance from dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to get maintenance from dcs") return stateManager } if maintenance != nil { if !maintenance.RdSyncPaused { - app.logger.Info("Entering maintenance") + app.logger.Info().Msg("Entering maintenance") err := app.enterMaintenance(maintenance, master) if err != nil { - app.logger.Error("Unable to enter maintenance", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to enter maintenance") return stateManager } } @@ -72,58 +71,58 @@ func (app *App) stateManager() appState { var switchover Switchover if err := app.dcs.Get(pathCurrentSwitch, &switchover); err == nil { if !switchover.InitiatedAt.IsZero() && time.Since(switchover.InitiatedAt) > app.config.Valkey.SwitchoverTimeout { - app.logger.Error(fmt.Sprintf("Switchover: %s => %s timed out after %s", switchover.From, switchover.To, time.Since(switchover.InitiatedAt))) + app.logger.Error().Msgf("Switchover: %s => %s timed out after %s", switchover.From, switchover.To, time.Since(switchover.InitiatedAt)) err = app.failSwitchover(&switchover, fmt.Errorf("switchover timed out after %s", time.Since(switchover.InitiatedAt))) if err != nil { - app.logger.Error("Failed to report switchover timeout", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to report switchover timeout") } return stateManager } err = app.approveSwitchover(&switchover, activeNodes, shardState) if err != nil { - app.logger.Error("Unable to perform switchover", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to perform switchover") err = app.finishSwitchover(&switchover, err) if err != nil { - app.logger.Error("Failed to reject switchover", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to reject switchover") } return stateManager } err = app.startSwitchover(&switchover) if err != nil { - app.logger.Error("Unable to start switchover", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to start switchover") return stateManager } err = app.performSwitchover(shardState, activeNodes, &switchover, master) if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound { - app.logger.Error("Switchover was aborted") + app.logger.Error().Msg("Switchover was aborted") } else { if err != nil { err = app.failSwitchover(&switchover, err) if err != nil { - app.logger.Error("Failed to report switchover failure", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to report switchover failure") } } else { err = app.finishSwitchover(&switchover, nil) if err != nil { - app.logger.Error("Failed to report switchover finish", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to report switchover finish") } } } return stateManager } else if err != dcs.ErrNotFound { - app.logger.Error("Getting current switchover failed", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Getting current switchover failed") return stateManager } poisonPill, err := app.getPoisonPill() if err != nil && err != dcs.ErrNotFound { - app.logger.Error("Manager: failed to get poison pill from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Manager: failed to get poison pill from DCS") return stateManager } if poisonPill != nil { err = app.clearPoisonPill() if err != nil { - app.logger.Error("Manager: failed to remove poison pill from DCS", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Manager: failed to remove poison pill from DCS") return stateManager } } @@ -135,11 +134,11 @@ func (app *App) stateManager() appState { if state.PingOk { availableReplicas++ } else { - app.logger.Warn("Host seems down", slog.String("fqdn", host)) + app.logger.Warn().Str("fqdn", host).Msg("Host seems down") } } if availableReplicas > hosts/2 { - app.logger.Error("We see that majority of shard is still alive, but master is not. So it probably failed.") + app.logger.Error().Msg("We see that majority of shard is still alive, but master is not. So it probably failed.") masterFailed = true } } @@ -147,13 +146,13 @@ func (app *App) stateManager() appState { masterFailed = true } if masterFailed { - app.logger.Error(fmt.Sprintf("Master %s failure", master)) + app.logger.Error().Msgf("Master %s failure", master) if app.nodeFailTime[master].IsZero() { app.nodeFailTime[master] = time.Now() } err = app.approveFailover(shardState, activeNodes, master) if err == nil { - app.logger.Info("Failover approved") + app.logger.Info().Msg("Failover approved") // Report master unavailability duration before performing failover if failTime, ok := app.nodeFailTime[master]; ok { dur := time.Since(failTime) @@ -161,10 +160,10 @@ func (app *App) stateManager() appState { } err = app.performFailover(master) if err != nil { - app.logger.Error("Unable to perform failover", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to perform failover") } } else { - app.logger.Error("Failover was not approved", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failover was not approved") } return stateManager } @@ -176,14 +175,14 @@ func (app *App) stateManager() appState { if state.PingOk { availableReplicas++ } else { - app.logger.Warn("Host seems down", slog.String("fqdn", host)) + app.logger.Warn().Str("fqdn", host).Msg("Host seems down") } } for host, state := range shardStateDcs { if state.PingOk { availableReplicasDcs++ } else { - app.logger.Warn("Host seems down in DCS", slog.String("fqdn", host)) + app.logger.Warn().Str("fqdn", host).Msg("Host seems down in DCS") } } if availableReplicas <= hosts/2 && availableReplicasDcs > hosts/2 { @@ -193,20 +192,20 @@ func (app *App) stateManager() appState { if app.config.Valkey.FailoverTimeout > 0 { failedTime := time.Since(app.splitTime[master]) if failedTime < app.config.Valkey.FailoverTimeout { - app.logger.Error( - fmt.Sprintf("According to DCS majority of shard is still alive, but we don't see that from here, will wait for %v before giving up on manager role", - app.config.Valkey.FailoverTimeout-failedTime)) + app.logger.Error().Msgf( + "According to DCS majority of shard is still alive, but we don't see that from here, will wait for %v before giving up on manager role", + app.config.Valkey.FailoverTimeout-failedTime) return stateManager } } needGiveUp = true } } else if master != app.config.Hostname && !shardState[master].PingOk { - app.logger.Error(fmt.Sprintf("Master %s probably failed, do not perform any kind of repair", master)) + app.logger.Error().Msgf("Master %s probably failed, do not perform any kind of repair", master) return stateManager } if needGiveUp { - app.logger.Error("According to DCS majority of shard is still alive, but we don't see that from here. Giving up on manager role") + app.logger.Error().Msg("According to DCS majority of shard is still alive, but we don't see that from here. Giving up on manager role") delete(app.splitTime, master) app.dcs.ReleaseLock(pathManagerLock) waitCtx, cancel := context.WithTimeout(app.ctx, app.config.Valkey.FailoverTimeout) @@ -219,13 +218,13 @@ func (app *App) stateManager() appState { case <-ticker.C: err = app.dcs.Get(pathManagerLock, &manager) if err != nil { - app.logger.Error(fmt.Sprintf("Failed to get %s", pathManagerLock), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Failed to get %s", pathManagerLock) } else if manager.Hostname != app.config.Hostname { - app.logger.Info(fmt.Sprintf("New manager: %s", manager.Hostname)) + app.logger.Info().Msgf("New manager: %s", manager.Hostname) break Out } case <-waitCtx.Done(): - app.logger.Error("No node took manager lock for failover timeout") + app.logger.Error().Msg("No node took manager lock for failover timeout") break Out } } @@ -243,7 +242,7 @@ func (app *App) stateManager() appState { if updateActive { err = app.updateActiveNodes(shardState, shardStateDcs, activeNodes, master) if err != nil { - app.logger.Error("Failed to update active nodes in dcs", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Failed to update active nodes in dcs") } } diff --git a/internal/app/master.go b/internal/app/master.go index c1e9ca9..dd1afe6 100644 --- a/internal/app/master.go +++ b/internal/app/master.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "time" "github.com/yandex/rdsync/internal/dcs" @@ -21,16 +20,16 @@ func (app *App) getCurrentMaster(shardState map[string]*HostState) (string, erro if master != "" { stateMaster, err := app.getMasterHost(shardState) if err != nil { - app.logger.Warn("Have master in DCS but unable to validate", slog.Any("error", err)) + app.logger.Warn().Err(err).Msg("Have master in DCS but unable to validate") return master, nil } if stateMaster != "" && stateMaster != master { - app.logger.Warn(fmt.Sprintf("DCS and valkey master state diverged: %s and %s", master, stateMaster)) + app.logger.Warn().Msgf("DCS and valkey master state diverged: %s and %s", master, stateMaster) allStable := true for host, state := range shardState { if !state.PingStable || state.IsOffline { allStable = false - app.logger.Warn(fmt.Sprintf("%s is dead skipping divergence fix", host)) + app.logger.Warn().Msgf("%s is dead skipping divergence fix", host) break } } @@ -120,13 +119,13 @@ func (app *App) changeMaster(host, master string) error { if !masterState.PingOk { return fmt.Errorf("changeMaster: %s died while waiting to start replication to %s", master, host) } - app.logger.Info(fmt.Sprintf("ChangeMaster: waiting for %s to start replication from %s", host, master)) + app.logger.Info().Msgf("ChangeMaster: waiting for %s to start replication from %s", host, master) app.repairReplica(node, masterState, state, master, host) time.Sleep(time.Second) } rs := state.ReplicaState if rs != nil && replicates(masterState, rs, host, masterNode, false) { - app.logger.Info(fmt.Sprintf("ChangeMaster: %s started replication from %s", host, master)) + app.logger.Info().Msgf("ChangeMaster: %s started replication from %s", host, master) } else { return fmt.Errorf("%s was unable to start replication from %s", host, master) } @@ -149,7 +148,7 @@ func (app *App) waitForCatchup(host, master string) error { return fmt.Errorf("waitForCatchup: replica %s died while waiting for catchup from %s", host, master) } if state.ReplicaState == nil { - app.logger.Warn(fmt.Sprintf("WaitForCatchup: %s has invalid replica state", host)) + app.logger.Warn().Msgf("WaitForCatchup: %s has invalid replica state", host) time.Sleep(time.Second) continue } @@ -157,7 +156,7 @@ func (app *App) waitForCatchup(host, master string) error { if masterState.IsMaster { masterOffset = masterState.MasterReplicationOffset } else if masterState.ReplicaState == nil { - app.logger.Warn(fmt.Sprintf("WaitForCatchup: %s has invalid replica state", master)) + app.logger.Warn().Msgf("WaitForCatchup: %s has invalid replica state", master) time.Sleep(time.Second) continue } else { @@ -166,7 +165,7 @@ func (app *App) waitForCatchup(host, master string) error { if masterOffset <= state.ReplicaState.ReplicationOffset { return nil } - app.logger.Info(fmt.Sprintf("WaitForCatchup: waiting for %s (offset=%d) to catchup with %s (offset=%d)", host, state.ReplicaState.ReplicationOffset, master, masterOffset)) + app.logger.Info().Msgf("WaitForCatchup: waiting for %s (offset=%d) to catchup with %s (offset=%d)", host, state.ReplicaState.ReplicationOffset, master, masterOffset) time.Sleep(time.Second) } @@ -177,7 +176,7 @@ func (app *App) promote(master, oldMaster string, shardState map[string]*HostSta node := app.shard.Get(master) if shardState[master].IsMaster { - app.logger.Info(fmt.Sprintf("%s is already master", master)) + app.logger.Info().Msgf("%s is already master", master) return nil } @@ -187,20 +186,20 @@ func (app *App) promote(master, oldMaster string, shardState map[string]*HostSta case modeCluster: if shardState[oldMaster].PingOk { if time.Now().Before(forceDeadline) { - app.logger.Info("Old master alive. Using FORCE to promote") + app.logger.Info().Msg("Old master alive. Using FORCE to promote") return node.ClusterPromoteForce(app.ctx) } } majorityAlive, err := node.IsClusterMajorityAlive(app.ctx) if err != nil { - app.logger.Error("New master is not able to check cluster majority state. Assuming that majority is alive.", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("New master is not able to check cluster majority state. Assuming that majority is alive.") majorityAlive = true } if majorityAlive { - app.logger.Info("Majority of master nodes in cluster alive. Using FORCE to promote") + app.logger.Info().Msg("Majority of master nodes in cluster alive. Using FORCE to promote") return node.ClusterPromoteForce(app.ctx) } - app.logger.Info("Old master is dead and majority of master nodes in cluster dead. Using TAKEOVER to promote") + app.logger.Info().Msg("Old master is dead and majority of master nodes in cluster dead. Using TAKEOVER to promote") return node.ClusterPromoteTakeover(app.ctx) } diff --git a/internal/app/poison_pill.go b/internal/app/poison_pill.go index 6716fe8..3ce2445 100644 --- a/internal/app/poison_pill.go +++ b/internal/app/poison_pill.go @@ -2,8 +2,6 @@ package app import ( "context" - "fmt" - "log/slog" "time" ) @@ -28,17 +26,17 @@ func (app *App) issuePoisonPill(targetHost string) error { func (app *App) applyPoisonPill(poisonPill *PoisonPill) error { if poisonPill.TargetHost != app.config.Hostname { - app.logger.Info(fmt.Sprintf("Poison pill issued for %s: not local host", poisonPill.TargetHost)) + app.logger.Info().Msgf("Poison pill issued for %s: not local host", poisonPill.TargetHost) return nil } local := app.shard.Local() isOffline, err := local.IsOffline(app.ctx) if err != nil { - app.logger.Error("Unable to check offline status for poison pill apply", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to check offline status for poison pill apply") return local.Restart(app.ctx) } if !isOffline { - app.logger.Info(fmt.Sprintf("Applying poison pill issued by %s: Going offline", poisonPill.InitiatedBy)) + app.logger.Info().Msgf("Applying poison pill issued by %s: Going offline", poisonPill.InitiatedBy) err = local.SetOffline(app.ctx) if err != nil { return err @@ -63,11 +61,11 @@ Out: case <-ticker.C: err := app.dcs.Get(pathPoisonPill, &poisonPill) if err != nil { - app.logger.Error("Wait for poison pill apply", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Wait for poison pill apply") } err = app.applyPoisonPill(&poisonPill) if err != nil { - app.logger.Error("Poison pill apply", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Poison pill apply") } if poisonPill.Applied { break Out @@ -77,6 +75,6 @@ Out: } } if !poisonPill.Applied { - app.logger.Error(fmt.Sprintf("Poison pill for %s was not applied within timeout", poisonPill.TargetHost)) + app.logger.Error().Msgf("Poison pill for %s was not applied within timeout", poisonPill.TargetHost) } } diff --git a/internal/app/pprof.go b/internal/app/pprof.go index 61dec21..092ec79 100644 --- a/internal/app/pprof.go +++ b/internal/app/pprof.go @@ -1,7 +1,6 @@ package app import ( - "log/slog" "net/http" "net/http/pprof" "os" @@ -22,7 +21,7 @@ func (app *App) pprofHandler() { err := http.ListenAndServe(app.config.PprofAddr, serverMux) if err != nil { - app.logger.Error("Unable to init pprof handler", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to init pprof handler") os.Exit(1) } } diff --git a/internal/app/repair.go b/internal/app/repair.go index 00228b5..880b391 100644 --- a/internal/app/repair.go +++ b/internal/app/repair.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "os/exec" "strconv" "strings" @@ -36,10 +35,10 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri if !state.IsReadOnly { err, rewriteErr := node.SetReadOnly(app.ctx, false) if err != nil { - app.logger.Error("Unable to make replica read-only", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Unable to make replica read-only") } if rewriteErr != nil { - app.logger.Error("Unable to rewrite config after making replica read-only", slog.String("fqdn", node.FQDN()), slog.Any("error", rewriteErr)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(rewriteErr).Msg("Unable to rewrite config after making replica read-only") } } rs := state.ReplicaState @@ -48,7 +47,7 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri app.repairReplica(node, masterState, state, master, host) syncing++ } else { - app.logger.Error(fmt.Sprintf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Valkey.MaxParallelSyncs)) + app.logger.Error().Msgf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Valkey.MaxParallelSyncs) } } } @@ -58,32 +57,32 @@ func (app *App) repairMaster(node *valkey.Node, activeNodes []string, state *Hos if state.IsReadOnly || state.MinReplicasToWrite != 0 { err, rewriteErr := node.SetReadWrite(app.ctx) if err != nil { - app.logger.Error("Unable to set master read-write", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Unable to set master read-write") } if rewriteErr != nil { - app.logger.Error("Unable to rewrite config on master", slog.String("fqdn", node.FQDN()), slog.Any("error", rewriteErr)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(rewriteErr).Msg("Unable to rewrite config on master") } } expectedNumReplicas := app.getNumReplicasToWrite(activeNodes) actualNumReplicas, err := node.GetNumQuorumReplicas(app.ctx) if err != nil { - app.logger.Error("Unable to get actual num quorum replicas on master", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Unable to get actual num quorum replicas on master") return } if expectedNumReplicas > actualNumReplicas { - app.logger.Info(fmt.Sprintf("Changing num quorum replicas from %d to %d on master", actualNumReplicas, expectedNumReplicas), slog.String("fqdn", node.FQDN())) + app.logger.Info().Str("fqdn", node.FQDN()).Msgf("Changing num quorum replicas from %d to %d on master", actualNumReplicas, expectedNumReplicas) err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, expectedNumReplicas) if err != nil { - app.logger.Error("Unable to set num quorum replicas on master", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Unable to set num quorum replicas on master") } if rewriteErr != nil { - app.logger.Error("Unable to rewrite config on master", slog.String("fqdn", node.FQDN()), slog.Any("error", rewriteErr)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(rewriteErr).Msg("Unable to rewrite config on master") } } if state.IsReplPaused { err := node.ResumeReplication(app.ctx) if err != nil { - app.logger.Error("Unable to make resume replication on master", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Unable to make resume replication on master") } } } @@ -96,59 +95,59 @@ func (app *App) repairReplica(node *valkey.Node, masterState, state *HostState, app.replFailTime = time.Now() } if time.Since(app.replFailTime) > app.config.Valkey.DestructiveReplicationRepairTimeout && app.config.Valkey.DestructiveReplicationRepairCommand != "" { - app.logger.Error(fmt.Sprintf("Replication is broken for too long: %s. Using destructive repair: %s", - time.Since(app.replFailTime), app.config.Valkey.DestructiveReplicationRepairCommand)) + app.logger.Error().Msgf("Replication is broken for too long: %s. Using destructive repair: %s", + time.Since(app.replFailTime), app.config.Valkey.DestructiveReplicationRepairCommand) split := strings.Fields(app.config.Valkey.DestructiveReplicationRepairCommand) cmd := exec.CommandContext(app.ctx, split[0], split[1:]...) err := cmd.Run() if err != nil { - app.logger.Error("Unable to run destructive replication repair on local node", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to run destructive replication repair on local node") } else { app.replFailTime = time.Now() } } } if !replicates(masterState, rs, replicaFQDN, masterNode, true) { - app.logger.Info("Initiating replica repair", slog.String("fqdn", replicaFQDN)) + app.logger.Info().Str("fqdn", replicaFQDN).Msg("Initiating replica repair") switch app.mode { case modeSentinel: err := node.SentinelMakeReplica(app.ctx, master) if err != nil { - app.logger.Error(fmt.Sprintf("Unable to make %s replica of %s", node.FQDN(), master), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to make %s replica of %s", node.FQDN(), master) } case modeCluster: alone, err := node.IsClusterNodeAlone(app.ctx) if err != nil { - app.logger.Error(fmt.Sprintf("Unable to check if %s is alone", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to check if %s is alone", node.FQDN()) return } if alone { masterIP, err := masterNode.GetIP() if err != nil { - app.logger.Error(fmt.Sprintf("Unable to make %s replica of %s", node.FQDN(), master), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to make %s replica of %s", node.FQDN(), master) return } err = node.ClusterMeet(app.ctx, masterIP, app.config.Valkey.Port, app.config.Valkey.ClusterBusPort) if err != nil { - app.logger.Error(fmt.Sprintf("Unable to make %s meet with master %s at %s:%d:%d", node.FQDN(), master, masterIP, app.config.Valkey.Port, app.config.Valkey.ClusterBusPort), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to make %s meet with master %s at %s:%d:%d", node.FQDN(), master, masterIP, app.config.Valkey.Port, app.config.Valkey.ClusterBusPort) return } } masterID, err := masterNode.ClusterGetID(app.ctx) if err != nil { - app.logger.Error(fmt.Sprintf("Unable to get cluster id of %s", master), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to get cluster id of %s", master) return } err = node.ClusterMakeReplica(app.ctx, masterID) if err != nil { - app.logger.Error(fmt.Sprintf("Unable to make %s replica of %s (%s)", node.FQDN(), master, masterID), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Unable to make %s replica of %s (%s)", node.FQDN(), master, masterID) } } } if state.IsReplPaused { err := node.ResumeReplication(app.ctx) if err != nil { - app.logger.Error("Unable to resume replication", slog.String("fqdn", node.FQDN()), slog.Any("error", err)) + app.logger.Error().Str("fqdn", node.FQDN()).Err(err).Msg("Unable to resume replication") } } } @@ -180,7 +179,7 @@ func (app *App) reservedConnectionsWatchdog(info map[string]string) error { } freeConns := parsedMaxClients - parsedClusterConns - parsedConnectedClients if freeConns < int64(app.config.Valkey.ReservedConnections) { - app.logger.Warn(fmt.Sprintf("Local node has %d free connections left. Killing all client connections.", freeConns)) + app.logger.Warn().Msgf("Local node has %d free connections left. Killing all client connections.", freeConns) node := app.shard.Local() err = node.DisconnectClients(app.ctx, "normal") if err != nil { @@ -196,7 +195,7 @@ func (app *App) repairLocalNode(master string) bool { info, _, _, offline, replPaused, err := local.GetState(app.ctx) if err != nil { - app.logger.Error("Unable to get local node offline state", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get local node offline state") if app.nodeFailTime[local.FQDN()].IsZero() { app.nodeFailTime[local.FQDN()] = time.Now() } @@ -204,7 +203,7 @@ func (app *App) repairLocalNode(master string) bool { if failedTime > app.config.Valkey.BusyTimeout && strings.HasPrefix(err.Error(), "BUSY ") { err = local.ScriptKill(app.ctx) if err != nil { - app.logger.Error("Local node is busy running a script. But SCRIPT KILL failed", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Local node is busy running a script. But SCRIPT KILL failed") } } if strings.HasPrefix(err.Error(), "LOADING ") { @@ -213,7 +212,7 @@ func (app *App) repairLocalNode(master string) bool { app.nodeFailTime[local.FQDN()] = time.Now() err = local.Restart(app.ctx) if err != nil { - app.logger.Error("Unable to restart local node", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to restart local node") } } } else if !offline { @@ -228,33 +227,33 @@ func (app *App) repairLocalNode(master string) bool { if !offline { err = app.adjustAofMode(master) if err != nil { - app.logger.Error("Unable to adjust aof config on local node", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to adjust aof config on local node") } err = app.closeStaleReplica(master) if err != nil { - app.logger.Error("Unable to close local node on staleness", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to close local node on staleness") } err = app.reservedConnectionsWatchdog(info) if err != nil { - app.logger.Error("Unable to run reserved connections watchdog", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to run reserved connections watchdog") } return true } shardState, err := app.getShardStateFromDB() if err != nil { - app.logger.Error("Local repair: unable to get actual shard state", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Local repair: unable to get actual shard state") return false } state, ok := shardState[local.FQDN()] if !ok { - app.logger.Error("Local repair: unable to find local node in shard state") + app.logger.Error().Msg("Local repair: unable to find local node in shard state") return true } if master == local.FQDN() && len(shardState) != 1 { activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Error("Unable to get active nodes for local node repair", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get active nodes for local node repair") return true } activeSet := make(map[string]struct{}, len(activeNodes)) @@ -269,27 +268,27 @@ func (app *App) repairLocalNode(master string) bool { } if baseOffset < getOffset(hostState) { if _, ok := activeSet[host]; ok { - app.logger.Warn(fmt.Sprintf("Host %s is ahead in replication history", host)) + app.logger.Warn().Msgf("Host %s is ahead in replication history", host) aheadHosts++ } } if aheadHosts != 0 { - app.logger.Error(fmt.Sprintf("Not making local node online: %d nodes are ahead in replication history", aheadHosts)) + app.logger.Error().Msgf("Not making local node online: %d nodes are ahead in replication history", aheadHosts) return false } } } else if master == local.FQDN() { if !state.IsMaster { - app.logger.Error("Local node is alone in shard and is replica. Promoting") + app.logger.Error().Msg("Local node is alone in shard and is replica. Promoting") if err := app.promote(master, master, shardState, time.Now().Add(app.config.Valkey.WaitPromoteForceTimeout)); err != nil { - app.logger.Error("Unable to promote lone node in shard", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to promote lone node in shard") return false } } } else if app.isReplicaStale(state.ReplicaState, true) { shardState, err := app.getShardStateFromDcs() if err != nil { - app.logger.Error("Unable to get shard state from dcs on slate replica open", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to get shard state from dcs on slate replica open") } syncing := 0 for host, hostState := range shardState { @@ -306,14 +305,14 @@ func (app *App) repairLocalNode(master string) bool { if replPaused || !replicates(shardState[master], state.ReplicaState, local.FQDN(), nil, true) { if syncing < app.config.Valkey.MaxParallelSyncs { - app.logger.Info("Repairing local replica as it is offline and not replicates from primary") + app.logger.Info().Msg("Repairing local replica as it is offline and not replicates from primary") app.repairReplica(local, shardState[master], state, master, local.FQDN()) } else { - app.logger.Error(fmt.Sprintf("Leaving local offline replica broken: currently syncing %d/%d", syncing, app.config.Valkey.MaxParallelSyncs)) + app.logger.Error().Msgf("Leaving local offline replica broken: currently syncing %d/%d", syncing, app.config.Valkey.MaxParallelSyncs) } } if shardState[master].PingOk && shardState[master].PingStable && time.Since(shardState[master].CheckAt) < 3*app.config.HealthCheckInterval { - app.logger.Error("Not making local node online: considered stale") + app.logger.Error().Msg("Not making local node online: considered stale") return false } } @@ -325,11 +324,11 @@ func (app *App) repairLocalNode(master string) bool { } err = local.SetOnline(app.ctx) if err != nil { - app.logger.Error("Unable to set local node online", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Unable to set local node online") return false } if !app.dcsDivergeTime.IsZero() { - app.logger.Info("Clearing DCS divergence time state") + app.logger.Info().Msg("Clearing DCS divergence time state") app.dcsDivergeTime = time.Time{} } return true diff --git a/internal/app/replication.go b/internal/app/replication.go index d929e14..f578c51 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -30,9 +30,9 @@ func (app *App) isReplicaStale(replicaState *ReplicaState, checkOpenLag bool) bo } result := time.Since(app.dcsDivergeTime) > targetLag if !result { - app.logger.Info(fmt.Sprintf("Local node is primary and we got a dcs info divergence at %v. Waiting for %v to make decision.", app.dcsDivergeTime, time.Since(app.dcsDivergeTime)-targetLag)) + app.logger.Info().Msgf("Local node is primary and we got a dcs info divergence at %v. Waiting for %v to make decision.", app.dcsDivergeTime, time.Since(app.dcsDivergeTime)-targetLag) } else { - app.logger.Info(fmt.Sprintf("Local node is primary and we got a dcs info divergence at %v. Marking local node as stale.", app.dcsDivergeTime)) + app.logger.Info().Msgf("Local node is primary and we got a dcs info divergence at %v. Marking local node as stale.", app.dcsDivergeTime) } return result } @@ -50,7 +50,7 @@ func (app *App) closeStaleReplica(master string) error { local := app.shard.Local() if local.FQDN() == master { if !app.dcsDivergeTime.IsZero() { - app.logger.Info("Clearing DCS divergence time state") + app.logger.Info().Msg("Clearing DCS divergence time state") app.dcsDivergeTime = time.Time{} } return nil @@ -73,11 +73,11 @@ func (app *App) closeStaleReplica(master string) error { } localState := app.getHostState(local.FQDN()) if app.isReplicaStale(localState.ReplicaState, false) { - app.logger.Debug("Local node seems stale. Checking if we could close.") + app.logger.Debug().Msg("Local node seems stale. Checking if we could close.") var switchover Switchover err := app.dcs.Get(pathCurrentSwitch, &switchover) if err == nil { - app.logger.Debug(fmt.Sprintf("Skipping staleness close due to switchover in progress: %v.", switchover)) + app.logger.Debug().Msgf("Skipping staleness close due to switchover in progress: %v.", switchover) return nil } if err != dcs.ErrNotFound { @@ -111,12 +111,12 @@ func (app *App) closeStaleReplica(master string) error { if offline { return nil } - app.logger.Error(fmt.Sprintf("Local node is stale. Alive replicas: %d, stale replicas: %d. Making local node offline.", okReplicas, staleReplicas)) + app.logger.Error().Msgf("Local node is stale. Alive replicas: %d, stale replicas: %d. Making local node offline.", okReplicas, staleReplicas) return local.SetOffline(app.ctx) } } } else if !app.replFailTime.IsZero() { - app.logger.Debug("Clearing local node replication fail time") + app.logger.Debug().Msg("Clearing local node replication fail time") app.replFailTime = time.Time{} } return nil diff --git a/internal/app/state.go b/internal/app/state.go index c9c5306..ac0d076 100644 --- a/internal/app/state.go +++ b/internal/app/state.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "strconv" "strings" "time" @@ -11,7 +10,7 @@ import ( ) func (app *App) setStateError(state *HostState, fqdn, message string) { - app.logger.Error("GetHostState error", slog.String("fqdn", fqdn), slog.String("error", message)) + app.logger.Error().Str("fqdn", fqdn).Str("error", message).Msg("GetHostState error") state.Error = message } @@ -114,7 +113,7 @@ func (app *App) getHostState(fqdn string) *HostState { replicaID := fmt.Sprintf("slave%d", i) replicaValue, ok := info[replicaID] if !ok { - app.logger.Warn(fmt.Sprintf("Master has no %s in info but connected_slaves is %d", replicaID, numReplicas), slog.String("fqdn", fqdn)) + app.logger.Warn().Str("fqdn", fqdn).Msgf("Master has no %s in info but connected_slaves is %d", replicaID, numReplicas) i++ continue } diff --git a/internal/app/switchover.go b/internal/app/switchover.go index a4e15a0..62ea6c6 100644 --- a/internal/app/switchover.go +++ b/internal/app/switchover.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "log/slog" "slices" "time" @@ -29,11 +28,11 @@ func (app *App) getLastSwitchover() Switchover { var lastSwitch, lastRejectedSwitch Switchover err := app.dcs.Get(pathLastSwitch, &lastSwitch) if err != nil && err != dcs.ErrNotFound { - app.logger.Error(pathLastSwitch, slog.Any("error", err)) + app.logger.Error().Err(err).Msg(pathLastSwitch) } errRejected := app.dcs.Get(pathLastRejectedSwitch, &lastRejectedSwitch) if errRejected != nil && errRejected != dcs.ErrNotFound { - app.logger.Error(pathLastRejectedSwitch, slog.Any("error", errRejected)) + app.logger.Error().Err(errRejected).Msg(pathLastRejectedSwitch) } if lastRejectedSwitch.InitiatedAt.After(lastSwitch.InitiatedAt) { @@ -56,14 +55,14 @@ func (app *App) approveSwitchover(switchover *Switchover, activeNodes []string, } func (app *App) startSwitchover(switchover *Switchover) error { - app.logger.Info(fmt.Sprintf("Switchover: %s => %s starting", switchover.From, switchover.To)) + app.logger.Info().Msgf("Switchover: %s => %s starting", switchover.From, switchover.To) switchover.StartedAt = time.Now() switchover.StartedBy = app.config.Hostname return app.dcs.Set(pathCurrentSwitch, switchover) } func (app *App) failSwitchover(switchover *Switchover, err error) error { - app.logger.Error(fmt.Sprintf("Switchover: %s => %s failed", switchover.From, switchover.To), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Switchover: %s => %s failed", switchover.From, switchover.To) switchover.RunCount++ switchover.Progress = nil switchover.Result = new(SwitchoverResult) @@ -100,7 +99,7 @@ func (app *App) finishSwitchover(switchover *Switchover, switchErr error) error path = pathLastRejectedSwitch } - app.logger.Info(fmt.Sprintf("Switchover: %s => %s %s", switchover.From, switchover.To, action)) + app.logger.Info().Msgf("Switchover: %s => %s %s", switchover.From, switchover.To, action) switchover.Progress = nil switchover.Result = new(SwitchoverResult) switchover.Result.Ok = result @@ -162,24 +161,24 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes activeNodes = filterOut(activeNodes, []string{oldMaster}) } - app.logger.Info("Switchover: phase 1: make all shard nodes read-only") + app.logger.Info().Msg("Switchover: phase 1: make all shard nodes read-only") errsRO := runParallel(func(host string) error { if !shardState[host].PingOk { err := fmt.Errorf("host %s is not healthy", host) - app.logger.Error("Setting read-only", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Setting read-only") return err } node := app.shard.Get(host) err, rewriteErr := node.SetReadOnly(app.ctx, host == oldMaster) if err != nil { - app.logger.Error(fmt.Sprintf("Setting %s read-only", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Setting %s read-only", host) return err } if rewriteErr != nil { - app.logger.Warn(fmt.Sprintf("Unable to rewrite config after making %s read-only", host), slog.Any("error", rewriteErr)) + app.logger.Warn().Err(rewriteErr).Msgf("Unable to rewrite config after making %s read-only", host) } - app.logger.Info(fmt.Sprintf("Switchover: host %s is read-only", host)) + app.logger.Info().Msgf("Switchover: host %s is read-only", host) return nil }, activeNodes) @@ -215,26 +214,26 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes } } - app.logger.Info("Switchover: phase 2: stop replication") + app.logger.Info().Msg("Switchover: phase 2: stop replication") errsPause := runParallel(func(host string) error { if !shardState[host].PingOk { err := fmt.Errorf("host %s is not healthy", host) - app.logger.Error("Pausing replication", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Pausing replication") return err } rs := shardState[host].ReplicaState if (rs == nil || !rs.MasterLinkState) && !app.config.Valkey.TurnBeforeSwitchover { - app.logger.Info(fmt.Sprintf("Switchover: skipping replication pause on %s", host)) + app.logger.Info().Msgf("Switchover: skipping replication pause on %s", host) return nil } node := app.shard.Get(host) err := node.PauseReplication(app.ctx) if err != nil { - app.logger.Error(fmt.Sprintf("Pausing replication on %s", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Pausing replication on %s", host) return err } - app.logger.Info(fmt.Sprintf("Switchover: replication on %s is now paused", host)) + app.logger.Info().Msgf("Switchover: replication on %s is now paused", host) return nil }, activeNodes) var aliveActiveNodes []string @@ -250,7 +249,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes len(aliveActiveNodes), failoverQuorum) } - app.logger.Info("Switchover: phase 3: find most up-to-date host") + app.logger.Info().Msg("Switchover: phase 3: find most up-to-date host") states, err := app.getShardStateFromDB() if err != nil { @@ -272,21 +271,21 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes errsResume := runParallel(func(host string) error { if !shardState[host].PingOk { err := fmt.Errorf("host %s is not healthy", host) - app.logger.Error("Resume replication", slog.Any("error", err)) + app.logger.Error().Err(err).Msg("Resume replication") return err } node := app.shard.Get(host) err := node.ResumeReplication(app.ctx) if err != nil { - app.logger.Error(fmt.Sprintf("Resume replication on %s", host), slog.Any("error", err)) + app.logger.Error().Err(err).Msgf("Resume replication on %s", host) return err } - app.logger.Info(fmt.Sprintf("Switchover: replication on %s is now resumed", host)) + app.logger.Info().Msgf("Switchover: replication on %s is now resumed", host) return nil }, activeNodes) combined := combineErrors(errsResume) if combined != nil { - app.logger.Error("Resuming replication on desirable host get fail", slog.Any("error", combined)) + app.logger.Error().Err(combined).Msg("Resuming replication on desirable host get fail") } return fmt.Errorf("get desirable node for switchover: %s", err.Error()) } @@ -303,7 +302,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes } if switchover.Progress.Phase < 5 { - app.logger.Info("Switchover: phase 4: catch up") + app.logger.Info().Msg("Switchover: phase 4: catch up") if newMaster != mostRecent && getOffset(states[newMaster]) != getOffset(states[mostRecent]) { recentNode := app.shard.Get(mostRecent) @@ -332,7 +331,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes return fmt.Errorf("new master %s suddenly became not available during switchover", newMaster) } - app.logger.Info("Switchover: phase 5: promote selected host") + app.logger.Info().Msg("Switchover: phase 5: promote selected host") if switchover.Progress.Phase != 6 { switchover.Progress.Phase = 5 @@ -397,7 +396,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes continue } if !shardState[newMaster].IsReplPaused { - app.logger.Warn(fmt.Sprintf("Unable to psync %s before promote: replication on new master is not paused", host)) + app.logger.Warn().Msgf("Unable to psync %s before promote: replication on new master is not paused", host) continue } if isPartialSyncPossible(shardState[host], shardState[newMaster]) { @@ -418,7 +417,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes err := combineErrors(errs) if err != nil { - app.logger.Warn("Unable to psync some replicas before promote", slog.Any("error", err)) + app.logger.Warn().Err(err).Msg("Unable to psync some replicas before promote") } } deadline := time.Now().Add(app.config.Valkey.WaitPromoteTimeout) @@ -438,7 +437,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes promoted = true break } - app.logger.Warn(fmt.Sprintf("Switchover: phase 5: %s is still replica, trying again", newMaster)) + app.logger.Warn().Msgf("Switchover: phase 5: %s is still replica, trying again", newMaster) } if !promoted { return fmt.Errorf("promote new master %s failed: deadline reached", newMaster) @@ -451,7 +450,7 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes return fmt.Errorf("setting switchover progress on phase 6: %s", err.Error()) } - app.logger.Info("Switchover: phase 6: turn replicas") + app.logger.Info().Msg("Switchover: phase 6: turn replicas") var psyncNodes []string for _, host := range aliveActiveNodes { @@ -488,10 +487,10 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes }, activeNodes) combined := combineErrors(sentiCacheUpdateErrs) if combined != nil { - app.logger.Warn("Unable to notify all senticache nodes on new master", slog.Any("error", combined)) + app.logger.Warn().Err(combined).Msg("Unable to notify all senticache nodes on new master") } } else { - app.logger.Warn("Unable to get state for senticache nodes notify on new master", slog.Any("error", err)) + app.logger.Warn().Err(err).Msg("Unable to get state for senticache nodes notify on new master") } } diff --git a/internal/config/config.go b/internal/config/config.go index 280796a..99951db 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -63,7 +63,6 @@ type SentinelModeConfig struct { // Config contains rdsync application configuration type Config struct { - Mode string `yaml:"mode"` InfoFile string `yaml:"info_file"` Hostname string `yaml:"hostname"` LogLevel string `yaml:"loglevel"` @@ -72,9 +71,12 @@ type Config struct { DaemonLockFile string `yaml:"daemon_lock_file"` PprofAddr string `yaml:"pprof_addr"` EventTimingLogFile string `yaml:"event_timing_log_file"` + Mode string `yaml:"mode"` SentinelMode SentinelModeConfig `yaml:"sentinel_mode"` Zookeeper dcs.ZookeeperConfig `yaml:"zookeeper"` Valkey ValkeyConfig `yaml:"valkey"` + LogPollInterval time.Duration `yaml:"log_poll_interval"` + LogBufferSize int `yaml:"log_buffer_size"` HealthCheckInterval time.Duration `yaml:"healthcheck_interval"` InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"` InactivationDelay time.Duration `yaml:"inactivation_delay"` @@ -171,6 +173,8 @@ func DefaultConfig() (Config, error) { DaemonLockFile: "/var/run/rdsync/rdsync.lock", MaintenanceFile: "/var/run/rdsync/rdsync.maintenance", EventTimingLogFile: "", + LogBufferSize: 10000, + LogPollInterval: 50 * time.Millisecond, PingStable: 3, TickInterval: 5 * time.Second, InactivationDelay: 30 * time.Second, diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index 608ba7d..b478577 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -3,8 +3,8 @@ package dcs import ( "context" json "encoding/json/v2" + "errors" "fmt" - "log/slog" "net" "os" "slices" @@ -14,10 +14,11 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-zookeeper/zk" + "github.com/rs/zerolog" ) type zkDCS struct { - logger *slog.Logger + logger *zerolog.Logger config *ZookeeperConfig conn *zk.Conn eventsChan <-chan zk.Event @@ -30,14 +31,14 @@ type zkDCS struct { isConnected bool } -type zkLoggerProxy struct{ *slog.Logger } +type zkLoggerProxy struct{ *zerolog.Logger } const ( PathHANodesPrefix = "ha_nodes" ) func (zklp zkLoggerProxy) Printf(fmtString string, args ...any) { - zklp.Debug(fmt.Sprintf(fmtString, args...)) + zklp.Debug().Msgf(fmtString, args...) } func retry(config *ZookeeperConfig, operation func() error) error { @@ -53,7 +54,7 @@ func retry(config *ZookeeperConfig, operation func() error) error { } // NewZookeeper returns Zookeeper based DCS storage -func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *slog.Logger) (DCS, error) { +func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *zerolog.Logger) (DCS, error) { if len(config.Hosts) == 0 { return nil, fmt.Errorf("zookeeper not configured, fill zookeeper/hosts in config") } @@ -71,7 +72,8 @@ func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *slog.Log var ec <-chan zk.Event var err error - proxyLogger := logger.With(slog.String("module", "dcs")) + pl := logger.With().Str("module", "dcs").Logger() + proxyLogger := &pl var operation func() error @@ -194,7 +196,7 @@ func (z *zkDCS) makePath(path string) error { func (z *zkDCS) handleEvents() { for ev := range z.eventsChan { - z.logger.Debug("Got ZK event", slog.Any("event", ev)) + z.logger.Debug().Interface("event", ev).Msg("Got ZK event") if ev.Type == zk.EventSession { z.handleSessionEvent(ev) } @@ -209,7 +211,7 @@ func (z *zkDCS) handleSessionEvent(ev zk.Event) { z.closeTimer = nil } if !z.isConnected { - defer z.logger.Info("Session established") + defer z.logger.Info().Msg("Session established") z.isConnected = true for _, c := range z.connectedChans { close(c) @@ -223,11 +225,11 @@ func (z *zkDCS) handleSessionEvent(ev zk.Event) { z.closeTimer = time.AfterFunc(z.config.SessionTimeout, func() { z.connectedLock.Lock() if z.isConnected && z.closeTimer != nil { - defer z.logger.Info("Session lost") + defer z.logger.Info().Msg("Session lost") z.isConnected = false err := z.disconnectCallback() if err != nil { - z.logger.Error("Disconnect callback failure", slog.Any("error", err)) + z.logger.Error().Err(err).Msg("Disconnect callback failure") } } z.connectedLock.Unlock() @@ -262,7 +264,7 @@ func (z *zkDCS) WaitConnected(timeout time.Duration) bool { case <-c: return true case <-t.C: - z.logger.Error(fmt.Sprintf("Failed to connect to DCS within %s", timeout)) + z.logger.Error().Msgf("Failed to connect to DCS within %s", timeout) return false } } @@ -270,7 +272,7 @@ func (z *zkDCS) WaitConnected(timeout time.Duration) bool { func (z *zkDCS) Initialize() { err := z.makePath(z.config.Namespace) if err != nil { - z.logger.Error(fmt.Sprintf("Failed create root path %s", z.config.Namespace), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed create root path %s", z.config.Namespace) } } @@ -292,8 +294,8 @@ func (z *zkDCS) retryRequest(code func() error) { err := retry(z.config, operation) - if err != nil { - z.logger.Error("Request retry failed", slog.Any("error", err)) + if err != nil && !errors.Is(err, zk.ErrNoNode) { + z.logger.Error().Err(err).Msg("Request retry failed") } } @@ -352,7 +354,7 @@ func (z *zkDCS) AcquireLock(path string) bool { self := z.getSelfLockOwner() data, _, err := z.retryGet(fullPath) if err != nil && err != zk.ErrNoNode { - z.logger.Error(fmt.Sprintf("Failed to get lock info %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get lock info %s", fullPath) return false } if err == zk.ErrNoNode { @@ -363,7 +365,7 @@ func (z *zkDCS) AcquireLock(path string) bool { _, err = z.retryCreate(fullPath, data, zk.FlagEphemeral, z.acl) if err != nil { if err != zk.ErrNodeExists { - z.logger.Error(fmt.Sprintf("Failed to acquire lock %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to acquire lock %s", fullPath) } return false } @@ -372,7 +374,7 @@ func (z *zkDCS) AcquireLock(path string) bool { } owner := LockOwner{} if err = json.Unmarshal(data, &owner); err != nil { - z.logger.Error(fmt.Sprintf("Malformed lock data %s (%s)", fullPath, data), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Malformed lock data %s (%s)", fullPath, data) return false } if owner == self { @@ -385,7 +387,7 @@ func (z *zkDCS) AcquireLock(path string) bool { func (z *zkDCS) ReleaseLock(path string) { err := z.ReleaseLockOrError(path) if err != nil { - z.logger.Error(fmt.Sprintf("Release lock %s failed", path), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Release lock %s failed", path) } } @@ -421,7 +423,7 @@ func (z *zkDCS) create(path string, val any, flags int32) error { if err == zk.ErrNodeExists { return ErrExists } - z.logger.Error(fmt.Sprintf("Failed to create node %s with %+v", fullPath, val), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to create node %s with %+v", fullPath, val) } return err } @@ -442,7 +444,7 @@ func (z *zkDCS) set(path string, val any, flags int32) error { } _, stat, err := z.retryGet(fullPath) if err != nil && err != zk.ErrNoNode { - z.logger.Error(fmt.Sprintf("Failed to get node %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get node %s", fullPath) return err } if err == zk.ErrNoNode { @@ -453,7 +455,7 @@ func (z *zkDCS) set(path string, val any, flags int32) error { } _, err = z.retryCreate(fullPath, data, flags, z.acl) if err != nil { - z.logger.Error(fmt.Sprintf("Failed to create node %s with %v", fullPath, val), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to create node %s with %v", fullPath, val) } return err } @@ -462,7 +464,7 @@ func (z *zkDCS) set(path string, val any, flags int32) error { } _, err = z.retrySet(fullPath, data, stat.Version) if err != nil { - z.logger.Error(fmt.Sprintf("Failed to set node %s to %+v", fullPath, val), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to set node %s to %+v", fullPath, val) } return err } @@ -482,12 +484,12 @@ func (z *zkDCS) Delete(path string) error { return nil } if err != nil { - z.logger.Error(fmt.Sprintf("Failed to get node %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get node %s", fullPath) return err } err = z.retryDelete(fullPath, stat.Version) if err != nil { - z.logger.Error(fmt.Sprintf("Failed to delete node %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to delete node %s", fullPath) } return err } @@ -499,11 +501,11 @@ func (z *zkDCS) Get(path string, dest any) error { return ErrNotFound } if err != nil { - z.logger.Error(fmt.Sprintf("Failed to get node %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get node %s", fullPath) return err } if err = json.Unmarshal(data, dest); err != nil { - z.logger.Error(fmt.Sprintf("Malformed node data %s (%s)", fullPath, data), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Malformed node data %s (%s)", fullPath, data) return ErrMalformed } return nil @@ -513,14 +515,14 @@ func (z *zkDCS) GetTree(path string) (any, error) { fullPath := z.buildFullPath(path) children, err := z.retryChildren(fullPath) if err != nil { - z.logger.Error(fmt.Sprintf("Failed to get children of %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get children of %s", fullPath) return nil, err } if len(children) == 0 { var data []byte data, _, err = z.retryGet(fullPath) if err != nil { - z.logger.Error(fmt.Sprintf("Failed to get data of %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get data of %s", fullPath) return nil, err } if len(data) == 0 { @@ -529,7 +531,7 @@ func (z *zkDCS) GetTree(path string) (any, error) { var ret any err = json.Unmarshal(data, &ret) if err != nil { - z.logger.Error(fmt.Sprintf("Malformed node data %s (%s)", fullPath, data), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Malformed node data %s (%s)", fullPath, data) return nil, err } return ret, nil @@ -551,7 +553,7 @@ func (z *zkDCS) GetChildren(path string) ([]string, error) { return nil, ErrNotFound } if err != nil { - z.logger.Error(fmt.Sprintf("Failed to get children of %s", fullPath), slog.Any("error", err)) + z.logger.Error().Err(err).Msgf("Failed to get children of %s", fullPath) return nil, err } return children, nil diff --git a/internal/dcs/zk_host_provider.go b/internal/dcs/zk_host_provider.go index 759ff28..2ff6289 100644 --- a/internal/dcs/zk_host_provider.go +++ b/internal/dcs/zk_host_provider.go @@ -3,11 +3,12 @@ package dcs import ( "context" "fmt" - "log/slog" "math/rand" "net" "sync" "time" + + "github.com/rs/zerolog" ) type zkhost struct { @@ -17,7 +18,7 @@ type zkhost struct { type RandomHostProvider struct { ctx context.Context - logger *slog.Logger + logger *zerolog.Logger resolver *net.Resolver tried map[string]struct{} hosts sync.Map @@ -31,7 +32,7 @@ type RandomHostProvider struct { isRetry bool } -func NewRandomHostProvider(ctx context.Context, config *RandomHostProviderConfig, useAddrs bool, logger *slog.Logger) *RandomHostProvider { +func NewRandomHostProvider(ctx context.Context, config *RandomHostProviderConfig, useAddrs bool, logger *zerolog.Logger) *RandomHostProvider { return &RandomHostProvider{ ctx: ctx, lookupTTL: config.LookupTTL, @@ -53,7 +54,7 @@ func (rhp *RandomHostProvider) Init(servers []string) error { for _, host := range servers { resolved, err := rhp.resolveHost(host) if err != nil { - rhp.logger.Error(fmt.Sprintf("host definition %s is invalid", host), slog.Any("error", err)) + rhp.logger.Error().Err(err).Msgf("host definition %s is invalid", host) continue } allResolvedServers = append(allResolvedServers, resolved...) @@ -86,10 +87,10 @@ func (rhp *RandomHostProvider) checkZKConnectivity(servers []string) error { conn, err := net.DialTimeout("tcp", server, rhp.connectivityCheckTimeout) if err == nil { conn.Close() - rhp.logger.Info("zk connectivity check succeeded", slog.String("server", server)) + rhp.logger.Info().Str("server", server).Msg("zk connectivity check succeeded") return nil } - rhp.logger.Error("connectivity check failed", slog.String("server", server), slog.Any("error", err)) + rhp.logger.Error().Str("server", server).Err(err).Msg("connectivity check failed") } return fmt.Errorf("failed to connect to any zk server: all attempts timed out or refused") @@ -107,7 +108,7 @@ func (rhp *RandomHostProvider) resolveHosts() { if len(zhost.resolved) == 0 || time.Since(zhost.lastLookup) > rhp.lookupTTL { resolved, err := rhp.resolveHost(pair) if err != nil || len(resolved) == 0 { - rhp.logger.Error(fmt.Sprintf("background resolve for %s failed", pair), slog.Any("error", err)) + rhp.logger.Error().Err(err).Msgf("background resolve for %s failed", pair) continue } rhp.hosts.Store(pair, zkhost{ @@ -132,7 +133,7 @@ func (rhp *RandomHostProvider) resolveHost(pair string) ([]string, error) { defer cancel() addrs, err := rhp.resolver.LookupHost(ctx, host) if err != nil { - rhp.logger.Error(fmt.Sprintf("unable to resolve %s", host), slog.Any("error", err)) + rhp.logger.Error().Err(err).Msgf("unable to resolve %s", host) } for _, addr := range addrs { res = append(res, net.JoinHostPort(addr, port)) @@ -148,7 +149,7 @@ func (rhp *RandomHostProvider) Len() int { func (rhp *RandomHostProvider) Next() (server string, retryStart bool) { if rhp.isRetry { v := time.Duration(rand.Float64() * float64(rhp.retryJitter)) - rhp.logger.Info("Triggering connection retry jitter", slog.Duration("duration", v)) + rhp.logger.Info().Dur("duration", v).Msg("Triggering connection retry jitter") time.Sleep(v) rhp.isRetry = false } diff --git a/internal/valkey/node.go b/internal/valkey/node.go index cae58cf..8acb558 100644 --- a/internal/valkey/node.go +++ b/internal/valkey/node.go @@ -3,7 +3,6 @@ package valkey import ( "context" "fmt" - "log/slog" "net" "os/exec" "slices" @@ -12,6 +11,7 @@ import ( "strings" "time" + "github.com/rs/zerolog" client "github.com/valkey-io/valkey-go" "github.com/yandex/rdsync/internal/config" @@ -27,7 +27,7 @@ type Node struct { ipsTime time.Time conn client.Client config *config.Config - logger *slog.Logger + logger *zerolog.Logger cachedInfo map[string]string fqdn string clusterID string @@ -54,7 +54,7 @@ func uniqLookup(host string) ([]net.IP, error) { } // NewNode is a Node constructor -func NewNode(config *config.Config, logger *slog.Logger, fqdn string) (*Node, error) { +func NewNode(config *config.Config, logger *zerolog.Logger, fqdn string) (*Node, error) { var host string if fqdn == config.Hostname { // Offline mode forbids connections on non-lo interfaces @@ -62,11 +62,12 @@ func NewNode(config *config.Config, logger *slog.Logger, fqdn string) (*Node, er } else { host = fqdn } - nodeLogger := logger.With(slog.String("module", "node"), slog.String("fqdn", host)) + nl := logger.With().Str("module", "node").Str("fqdn", host).Logger() + nodeLogger := &nl now := time.Now() ips, err := uniqLookup(fqdn) if err != nil { - nodeLogger.Warn("Dns lookup failed", slog.Any("error", err)) + nodeLogger.Warn().Err(err).Msg("Dns lookup failed") ips = []net.IP{} now = time.Time{} } @@ -92,7 +93,7 @@ func NewNode(config *config.Config, logger *slog.Logger, fqdn string) (*Node, er } conn, err := client.NewClient(opts) if err != nil { - logger.Warn("Unable to establish initial connection", slog.String("fqdn", host), slog.Any("error", err)) + logger.Warn().Str("fqdn", host).Err(err).Msg("Unable to establish initial connection") conn = nil } node := Node{ @@ -155,14 +156,14 @@ func (n *Node) MatchHost(host string) bool { // RefreshAddrs updates internal ip address list if ttl exceeded func (n *Node) RefreshAddrs() error { if time.Since(n.ipsTime) < n.config.Valkey.DNSTTL { - n.logger.Debug("Not updating ips cache due to ttl") + n.logger.Debug().Msg("Not updating ips cache due to ttl") return nil } - n.logger.Debug("Updating ips cache") + n.logger.Debug().Msg("Updating ips cache") now := time.Now() ips, err := uniqLookup(n.fqdn) if err != nil { - n.logger.Error("Updating ips cache failed", slog.Any("error", err)) + n.logger.Error().Err(err).Msg("Updating ips cache failed") return err } n.ips = ips @@ -280,7 +281,7 @@ func (n *Node) SetOffline(ctx context.Context) error { return err } if rewriteErr != nil { - n.logger.Error("Config rewrite after setting node offline failed", slog.Any("error", rewriteErr)) + n.logger.Error().Err(rewriteErr).Msg("Config rewrite after setting node offline failed") } return nil } @@ -358,7 +359,7 @@ func (n *Node) EmptyQuorumReplicas(ctx context.Context) error { return err } if rewriteErr != nil { - n.logger.Error("Rewrite config failed", slog.Any("error", rewriteErr)) + n.logger.Error().Err(rewriteErr).Msg("Rewrite config failed") } } return nil @@ -441,7 +442,7 @@ func (n *Node) Restart(ctx context.Context) error { if !n.IsLocal() { return fmt.Errorf("restarting %s is not possible - not local", n.fqdn) } - n.logger.Warn(fmt.Sprintf("Restarting with %s", n.config.Valkey.RestartCommand)) + n.logger.Warn().Msgf("Restarting with %s", n.config.Valkey.RestartCommand) split := strings.Fields(n.config.Valkey.RestartCommand) cmd := exec.CommandContext(ctx, split[0], split[1:]...) return cmd.Run() @@ -626,7 +627,7 @@ func (n *Node) IsClusterMajorityAlive(ctx context.Context) (bool, error) { } } res := failedMasters < totalMasters/2+1 - n.logger.Debug(fmt.Sprintf("Cluster majority alive check: %d total, %d failed -> %t", totalMasters, failedMasters, res)) + n.logger.Debug().Msgf("Cluster majority alive check: %d total, %d failed -> %t", totalMasters, failedMasters, res) return res, nil } diff --git a/internal/valkey/senticache.go b/internal/valkey/senticache.go index 89f3e2b..f60cc4c 100644 --- a/internal/valkey/senticache.go +++ b/internal/valkey/senticache.go @@ -3,13 +3,13 @@ package valkey import ( "context" "fmt" - "log/slog" "net" "os/exec" "strconv" "strings" "time" + "github.com/rs/zerolog" client "github.com/valkey-io/valkey-go" "github.com/yandex/rdsync/internal/config" @@ -58,14 +58,14 @@ type SentiCacheState struct { // SentiCacheNode represents API to query/manipulate a single Valkey SentiCache node type SentiCacheNode struct { config *config.Config - logger *slog.Logger + logger *zerolog.Logger conn client.Client opts client.ClientOption broken bool } // NewRemoteSentiCacheNode is a remote SentiCacheNode constructor -func NewRemoteSentiCacheNode(config *config.Config, host string, logger *slog.Logger) (*SentiCacheNode, error) { +func NewRemoteSentiCacheNode(config *config.Config, host string, logger *zerolog.Logger) (*SentiCacheNode, error) { addr := net.JoinHostPort(host, strconv.Itoa(config.SentinelMode.CachePort)) opts := client.ClientOption{ InitAddress: []string{addr}, @@ -88,21 +88,22 @@ func NewRemoteSentiCacheNode(config *config.Config, host string, logger *slog.Lo } conn, err := client.NewClient(opts) if err != nil { - logger.Warn("Unable to establish initial connection", slog.String("fqdn", host), slog.Any("error", err)) + logger.Warn().Str("fqdn", host).Err(err).Msg("Unable to establish initial connection") conn = nil } + sl := logger.With().Str("module", "senticache").Logger() node := SentiCacheNode{ config: config, conn: conn, opts: opts, - logger: logger.With(slog.String("module", "senticache")), + logger: &sl, broken: false, } return &node, nil } // NewSentiCacheNode is a local SentiCacheNode constructor -func NewSentiCacheNode(config *config.Config, logger *slog.Logger) (*SentiCacheNode, error) { +func NewSentiCacheNode(config *config.Config, logger *zerolog.Logger) (*SentiCacheNode, error) { return NewRemoteSentiCacheNode(config, localhost, logger) } @@ -125,7 +126,7 @@ func (s *SentiCacheNode) ensureConn() error { } func (s *SentiCacheNode) restart(ctx context.Context) error { - s.logger.Error("Restarting broken senticache") + s.logger.Error().Msg("Restarting broken senticache") split := strings.Fields(s.config.SentinelMode.CacheRestartCommand) cmd := exec.CommandContext(ctx, split[0], split[1:]...) return cmd.Run() @@ -331,7 +332,7 @@ func (s *SentiCacheNode) Update(ctx context.Context, state *SentiCacheState) err return err } } - s.logger.Debug(fmt.Sprintf("Previous state: master: %v, replicas: %v, sentinels: %v", master, replicas, sentinels)) + s.logger.Debug().Msgf("Previous state: master: %v, replicas: %v, sentinels: %v", master, replicas, sentinels) var command = []string{ "SENTINEL", "CACHE-UPDATE", s.config.SentinelMode.CacheUpdateSecret, "master-name:", state.Master.Name + ",", @@ -381,7 +382,7 @@ func (s *SentiCacheNode) Update(ctx context.Context, state *SentiCacheState) err fmt.Sprintf("%d", replica.MasterPort), fmt.Sprintf("%d", replica.SlaveMasterLinkStatus), fmt.Sprintf("%d,", replica.SlaveReplOffset), ) } - s.logger.Debug(fmt.Sprintf("Updating senticache state with %v", command)) + s.logger.Debug().Msgf("Updating senticache state with %v", command) err = s.conn.Do(ctx, s.conn.B().Arbitrary(command...).Build()).Error() if err != nil { s.broken = true diff --git a/internal/valkey/shard.go b/internal/valkey/shard.go index 274b746..c7c77d2 100644 --- a/internal/valkey/shard.go +++ b/internal/valkey/shard.go @@ -2,10 +2,11 @@ package valkey import ( "fmt" - "log/slog" "sort" "sync" + "github.com/rs/zerolog" + "github.com/yandex/rdsync/internal/config" "github.com/yandex/rdsync/internal/dcs" ) @@ -14,7 +15,7 @@ import ( type Shard struct { dcs dcs.DCS config *config.Config - logger *slog.Logger + logger *zerolog.Logger nodes map[string]*Node local *Node sync.Mutex @@ -27,10 +28,11 @@ type NodeConfiguration struct { } // NewShard is a Shard constructor -func NewShard(config *config.Config, logger *slog.Logger, dcs dcs.DCS) *Shard { +func NewShard(config *config.Config, logger *zerolog.Logger, dcs dcs.DCS) *Shard { + sl := logger.With().Str("module", "shard").Logger() s := &Shard{ config: config, - logger: logger.With(slog.String("module", "shard")), + logger: &sl, nodes: make(map[string]*Node), local: nil, dcs: dcs, @@ -67,7 +69,7 @@ func (s *Shard) UpdateHostsInfo() error { if err != nil { return err } - s.logger.Info(fmt.Sprintf("Nodes from DCS: %s", hosts)) + s.logger.Info().Msgf("Nodes from DCS: %s", hosts) set := make(map[string]int, len(hosts)) for _, host := range hosts { set[host]++