Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/rdsync/abort.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ var abortCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliAbort())
code := app.CliAbort()
app.CloseLogger()
os.Exit(code)
},
}

Expand Down
12 changes: 9 additions & 3 deletions cmd/rdsync/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ var hostListCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliHostList())
code := app.CliHostList()
app.CloseLogger()
os.Exit(code)
},
}

Expand All @@ -47,7 +49,9 @@ var hostAddCmd = &cobra.Command{
}
})

os.Exit(app.CliHostAdd(args[0], priorityVal, dryRun, skipValkeyCheck))
code := app.CliHostAdd(args[0], priorityVal, dryRun, skipValkeyCheck)
app.CloseLogger()
os.Exit(code)
},
}

Expand All @@ -61,7 +65,9 @@ var hostRemoveCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliHostRemove(args[0]))
code := app.CliHostRemove(args[0])
app.CloseLogger()
os.Exit(code)
},
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/rdsync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ var infoCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliInfo(verbose))
code := app.CliInfo(verbose)
app.CloseLogger()
os.Exit(code)
},
}

Expand Down
12 changes: 9 additions & 3 deletions cmd/rdsync/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ var maintCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliGetMaintenance())
code := app.CliGetMaintenance()
app.CloseLogger()
os.Exit(code)
},
}

Expand All @@ -36,7 +38,9 @@ var maintOnCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliEnableMaintenance(maintWait))
code := app.CliEnableMaintenance(maintWait)
app.CloseLogger()
os.Exit(code)
},
}

Expand All @@ -49,7 +53,9 @@ var maintOffCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliDisableMaintenance(maintWait))
code := app.CliDisableMaintenance(maintWait)
app.CloseLogger()
os.Exit(code)
},
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/rdsync/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ var stateCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliState(verbose))
code := app.CliState(verbose)
app.CloseLogger()
os.Exit(code)
},
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/rdsync/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ var switchCmd = &cobra.Command{
fmt.Println(err)
os.Exit(1)
}
os.Exit(app.CliSwitch(switchFrom, switchTo, switchWait, switchForce))
code := app.CliSwitch(switchFrom, switchTo, switchWait, switchForce)
app.CloseLogger()
os.Exit(code)
},
}

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/heetch/confita v0.11.0
github.com/moby/moby/api v1.54.2
github.com/moby/moby/client v0.4.1
github.com/rs/zerolog v1.35.1
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.10
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -40,6 +41,8 @@ require (
github.com/hashicorp/go-memdb v1.3.4 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.22 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
Expand All @@ -50,6 +53,6 @@ require (
go.opentelemetry.io/otel v1.43.0 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/sys v0.44.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4=
github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/moby/api v1.54.2 h1:wiat9QAhnDQjA7wk1kh/TqHz2I1uUA7M7t9SAl/JNXg=
Expand All @@ -88,6 +92,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/zerolog v1.35.1 h1:m7xQeoiLIiV0BCEY4Hs+j2NG4Gp2o2KPKmhnnLiazKI=
github.com/rs/zerolog v1.35.1/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
Expand Down Expand Up @@ -126,8 +132,8 @@ go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU=
golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
21 changes: 10 additions & 11 deletions internal/app/active_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"fmt"
"log/slog"
"slices"
"sort"
"strings"
Expand Down Expand Up @@ -48,13 +47,13 @@ func (app *App) actualizeQuorumReplicas(master string, activeNodes []string) err
}

if currentValue != expectedValue {
app.logger.Debug(fmt.Sprintf("Setting quorum replicas to %s on %s", expectedValue, master))
app.logger.Debug().Msgf("Setting quorum replicas to %s on %s", expectedValue, master)
err, rewriteErr := node.SetQuorumReplicas(app.ctx, expectedValue)
if err != nil {
return err
}
if rewriteErr != nil {
app.logger.Error("Unable to rewrite config", slog.String("fqdn", master), slog.Any("error", rewriteErr))
app.logger.Error().Str("fqdn", master).Err(rewriteErr).Msg("Unable to rewrite config")
}
}

Expand All @@ -76,21 +75,21 @@ func (app *App) updateActiveNodes(state, stateDcs map[string]*HostState, oldActi
if removingNodes {
err := app.dcs.Set(pathActiveNodes, activeNodes)
if err != nil {
app.logger.Error("Update active nodes: failed to update active nodes in dcs", slog.Any("error", err))
app.logger.Error().Err(err).Msg("Update active nodes: failed to update active nodes in dcs")
return err
}
}

err := app.actualizeQuorumReplicas(master, activeNodes)
if err != nil {
app.logger.Error("Update active nodes: failed to actualize quorum replicas", slog.Any("error", err))
app.logger.Error().Err(err).Msg("Update active nodes: failed to actualize quorum replicas")
return err
}

if !removingNodes {
err := app.dcs.Set(pathActiveNodes, activeNodes)
if err != nil {
app.logger.Error("Update active nodes: failed to update active nodes in dcs", slog.Any("error", err))
app.logger.Error().Err(err).Msg("Update active nodes: failed to update active nodes in dcs")
return err
}
}
Expand All @@ -117,7 +116,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive
if !node.PingOk {
if stateDcs[host].PingOk {
if slices.Contains(oldActiveNodes, host) {
app.logger.Warn(fmt.Sprintf("Calc active nodes: %s keeps health lock in dcs, keeping active...", host))
app.logger.Warn().Msgf("Calc active nodes: %s keeps health lock in dcs, keeping active...", host)
activeNodes = append(activeNodes, host)
}
continue
Expand All @@ -128,23 +127,23 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive
failTime := time.Since(app.nodeFailTime[host])
if failTime < app.config.InactivationDelay {
if slices.Contains(oldActiveNodes, host) {
app.logger.Warn(fmt.Sprintf("Calc active nodes: %s is failing, remaining %v", host, app.config.InactivationDelay-failTime))
app.logger.Warn().Msgf("Calc active nodes: %s is failing, remaining %v", host, app.config.InactivationDelay-failTime)
activeNodes = append(activeNodes, host)
}
continue
}
app.logger.Error(fmt.Sprintf("Calc active nodes: %s is down, deleting from active...", host))
app.logger.Error().Msgf("Calc active nodes: %s is down, deleting from active...", host)
continue
} else if !stateDcs[host].IsOffline {
delete(app.nodeFailTime, host)
}
replicaState := node.ReplicaState
if replicaState == nil {
app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host))
app.logger.Warn().Msgf("Calc active nodes: lost master %s", host)
continue
}
if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) {
app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host))
app.logger.Error().Msgf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host)
continue
}
activeNodes = append(activeNodes, host)
Expand Down
46 changes: 21 additions & 25 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package app
import (
"context"
"fmt"
"log/slog"
"io"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

"github.com/gofrs/flock"
"github.com/rs/zerolog"

"github.com/yandex/rdsync/internal/config"
"github.com/yandex/rdsync/internal/dcs"
Expand All @@ -27,7 +28,8 @@ type App struct {
dcs dcs.DCS
config *config.Config
splitTime map[string]time.Time
logger *slog.Logger
logger *zerolog.Logger
loggerCloser io.Closer
nodeFailTime map[string]time.Time
shard *valkey.Shard
cache *valkey.SentiCacheNode
Expand All @@ -49,20 +51,6 @@ func baseContext() context.Context {
return ctx
}

func parseLevel(level string) (slog.Level, error) {
switch level {
case "Debug":
return slog.LevelDebug, nil
case "Info":
return slog.LevelInfo, nil
case "Warn":
return slog.LevelWarn, nil
case "Error":
return slog.LevelError, nil
}
return slog.LevelInfo, fmt.Errorf("unknown error level: %s", level)
}

// NewApp is an App constructor
func NewApp(configFile, logLevel string) (*App, error) {
conf, err := config.ReadFromFile(configFile)
Expand All @@ -77,7 +65,7 @@ func NewApp(configFile, logLevel string) (*App, error) {
if err != nil {
return nil, err
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevelN}))
logger, loggerCloser := newMainLogger(logLevelN, conf.LogBufferSize, conf.LogPollInterval)
mode, err := parseMode(conf.Mode)
if err != nil {
return nil, err
Expand All @@ -94,6 +82,7 @@ func NewApp(configFile, logLevel string) (*App, error) {
splitTime: make(map[string]time.Time),
state: stateInit,
logger: logger,
loggerCloser: loggerCloser,
config: conf,
}
app.critical.Store(false)
Expand All @@ -110,51 +99,58 @@ func (app *App) connectDCS() error {
}

func (app *App) reconnectDCS() error {
app.logger.Info("Attempting DCS reconnection after prolonged Lost state")
app.logger.Info().Msg("Attempting DCS reconnection after prolonged Lost state")
oldDCS := app.dcs
err := app.connectDCS()
if err != nil {
app.logger.Error("DCS reconnection failed", slog.Any("error", err))
app.logger.Error().Err(err).Msg("DCS reconnection failed")
app.dcs = oldDCS
return err
}
app.dcs.SetDisconnectCallback(func() error { return app.handleCritical() })
app.shard.SetDCS(app.dcs)
oldDCS.Close()
app.logger.Info("DCS reconnection successful")
app.logger.Info().Msg("DCS reconnection successful")
return nil
}

// CloseLogger drains the logger's queue
func (app *App) CloseLogger() {
app.loggerCloser.Close()
}

func (app *App) lockDaemonFile() {
app.daemonLock = flock.New(app.config.DaemonLockFile)
if locked, err := app.daemonLock.TryLock(); !locked {
msg := "another instance is running."
if err != nil {
msg = err.Error()
}
app.logger.Error(fmt.Sprintf("Unable to acquire daemon lock on %s", app.config.DaemonLockFile), slog.Any("error", msg))
app.logger.Error().Str("error", msg).Msgf("Unable to acquire daemon lock on %s", app.config.DaemonLockFile)
app.CloseLogger()
os.Exit(1)
}
}

func (app *App) unlockDaemonFile() {
err := app.daemonLock.Unlock()
if err != nil {
app.logger.Error(fmt.Sprintf("Unable to unlock daemon lock %s", app.config.DaemonLockFile), slog.Any("error", err))
app.logger.Error().Err(err).Msgf("Unable to unlock daemon lock %s", app.config.DaemonLockFile)
}
}

// Run enters the main application loop
func (app *App) Run() int {
app.lockDaemonFile()
defer app.unlockDaemonFile()
defer app.loggerCloser.Close()

app.timings = newTimingReporter(app.config, app.logger)
defer app.timings.Close()

err := app.connectDCS()
if err != nil {
app.logger.Error("Unable to connect to dcs", slog.Any("error", err))
app.logger.Error().Err(err).Msg("Unable to connect to dcs")
return 1
}
defer app.dcs.Close()
Expand All @@ -165,7 +161,7 @@ func (app *App) Run() int {
if app.mode == modeSentinel {
app.cache, err = valkey.NewSentiCacheNode(app.config, app.logger)
if err != nil {
app.logger.Error("Unable to init senticache node", slog.Any("error", err))
app.logger.Error().Err(err).Msg("Unable to init senticache node")
return 1
}
defer app.cache.Close()
Expand All @@ -186,7 +182,7 @@ func (app *App) Run() int {
app.timings.Reopen()
case <-ticker.C:
for {
app.logger.Info(fmt.Sprintf("Rdsync state: %s", app.state))
app.logger.Info().Msgf("Rdsync state: %s", app.state)
stateHandler := map[appState](func() appState){
stateInit: app.stateInit,
stateManager: app.stateManager,
Expand Down
Loading
Loading