Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ var commonFlags = flagMap{
configType: "string",
defaultValue: defaultConfig.Logging.Path,
},
"log-level": {
flagName: "log-level",
configKey: "logging.level",
description: "The log level to use. (Options: debug, info, warn, error)",
configType: "string",
defaultValue: defaultConfig.Logging.Level,
},
}

var httpFlags = flagMap{
Expand Down Expand Up @@ -222,6 +229,7 @@ var ftpFlags = flagMap{
}

var startFlags = flagMap{
// @todo - Next Major release [Swap this to be enabled]
"http-disabled": {
flagName: "http-disabled",
configKey: "server.disable",
Expand Down
10 changes: 6 additions & 4 deletions core/gossip/handler/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ type (

BroadcastActionHandler struct {
timeoutWatcher *metrics.TimeoutWatcher
logger *zap.Logger
}
)

func NewBroadcastActionHandler(timeoutWatcher *metrics.TimeoutWatcher) *BroadcastActionHandler {
func NewBroadcastActionHandler(timeoutWatcher *metrics.TimeoutWatcher, logger *zap.Logger) *BroadcastActionHandler {
return &BroadcastActionHandler{
timeoutWatcher: timeoutWatcher,
logger: logger,
}
}

Expand All @@ -33,18 +35,18 @@ func (h *BroadcastActionHandler) Handle(action *action.BroadcastAction) {
ip := data[0]
duration, err := strconv.Atoi(data[1])
if err != nil {
zap.L().Sugar().Errorw("Failed to parse duration", "data", action.Data)
h.logger.Sugar().Errorw("Failed to parse duration", "data", action.Data)
return
}

if !h.timeoutWatcher.HasColdCacheTimeout(ip) {
zap.L().Sugar().Infow("ADD_COLD_IP is new to this node, Rebroadcasting.", "ip", ip, "duration", duration)
h.logger.Sugar().Infow("ADD_COLD_IP is new to this node, Rebroadcasting.", "ip", ip, "duration", duration)
h.timeoutWatcher.CommitToColdCacheWithBroadcast(ip, time.Duration(duration))
} else {
h.timeoutWatcher.CommitToColdCache(ip, time.Duration(duration))
}

default:
zap.L().Sugar().Warnw("Received unknown action", "action", action.Action, "data", action.Data)
h.logger.Sugar().Warnw("Received unknown action", "action", action.Action, "data", action.Data)
}
}
14 changes: 9 additions & 5 deletions core/gossip/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type (

// The metadata for the current node
nodeInfo *NodeInfo

// Logger used for memberlist
logger *zap.Logger
}

// Metadata related to the current node
Expand Down Expand Up @@ -107,13 +110,14 @@ func NewMemberList(lf fx.Lifecycle, logger *zap.Logger, config *config.Config, b
connectionAttempts: config.Cluster.ConnectionAttempts,
connectionTimeout: time.Duration(config.Cluster.ConnectionTimeout) * time.Second,
nodeInfo: nodeInfo,
logger: logger,
}

lf.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
go func() {
if err := memberList.Join(); err != nil {
zap.L().Sugar().Error("Failed to join member list", "error", err)
logger.Sugar().Error("Failed to join member list", "error", err)
}
}()
return nil
Expand All @@ -131,7 +135,7 @@ func (m *Memberlist) Join() error {
for i := 0; i < m.connectionAttempts; i++ {
nodes, err := m.client.Join(m.nodeInfo.PeerIpAddresses)
if err != nil && nodes == 0 {
zap.L().Sugar().Warnw("Failed to connect to other nodes", "err", err, "attempt", i)
m.logger.Sugar().Warnw("Failed to connect to other nodes", "err", err, "attempt", i)
time.Sleep(m.connectionTimeout)
continue
}
Expand All @@ -142,7 +146,7 @@ func (m *Memberlist) Join() error {
return errors.New("failed to connect to any other nodes in the cluster")
}

zap.L().Sugar().Infow("Connected to other nodes", "peers", m.client.NumMembers(), "ip", m.GetIpAddress())
m.logger.Sugar().Infow("Connected to other nodes", "peers", m.client.NumMembers(), "ip", m.GetIpAddress())
m.listenForBroadcastActions()

return nil
Expand All @@ -155,7 +159,7 @@ func (m *Memberlist) GetIpAddress() string {

// Broadcasts a message to peer nodes in the cluster
func (m *Memberlist) Dispatch(broadcast *action.BroadcastAction) {
zap.L().Sugar().Infow("Broadcasting action", "action", broadcast.Action, "data", broadcast.Data)
m.logger.Sugar().Infow("Broadcasting action", "action", broadcast.Action, "data", broadcast.Data)
m.delegate.Broadcasts.QueueBroadcast(broadcast)
}

Expand All @@ -167,7 +171,7 @@ func (m *Memberlist) listenForBroadcastActions() {
case msg := <-m.delegate.MessageChan:
action, err := action.ParseBroadcastAction(msg)
if err != nil {
zap.L().Sugar().Warnw("Failed to parse broadcast action", "err", err, "data", string(msg))
m.logger.Sugar().Warnw("Failed to parse broadcast action", "err", err, "data", string(msg))
continue
}
go m.handler.Handle(action)
Expand Down
12 changes: 7 additions & 5 deletions core/metrics/recast.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type (
telemetry *Telemetry
shutdownChan chan bool
shutdowner fx.Shutdowner
logger *zap.Logger

// Internals
minimumRecastInterval int
Expand All @@ -32,7 +33,7 @@ type (
}
)

func NewRecast(lf fx.Lifecycle, shutdowner fx.Shutdowner, config *config.Config, telemetry *Telemetry) (*Recast, error) {
func NewRecast(lf fx.Lifecycle, shutdowner fx.Shutdowner, config *config.Config, telemetry *Telemetry, logger *zap.Logger) (*Recast, error) {
if !config.Recast.Enabled {
return nil, errors.New("Recast is not enabled")
}
Expand All @@ -45,6 +46,7 @@ func NewRecast(lf fx.Lifecycle, shutdowner fx.Shutdowner, config *config.Config,
shutdownChan: make(chan bool),
telemetry: telemetry,
shutdowner: shutdowner,
logger: logger,

minimumRecastInterval: config.Recast.MinimumRecastIntervalMin,
maximumRecastInterval: config.Recast.MaximumRecastIntervalMin,
Expand Down Expand Up @@ -74,15 +76,15 @@ func (r *Recast) StartChecking() {
for {
// Sleep for a random amount of time between the minimum and maximum recast interval
recastWaitTime := rand.RandomInt(r.minimumRecastInterval, r.minimumRecastInterval)
zap.L().Sugar().Info("Recast Waiting", "timeUntilNextCheck", recastWaitTime)
r.logger.Sugar().Info("Recast Waiting", "timeUntilNextCheck", recastWaitTime)
recastCheckDuration := time.Duration(recastWaitTime) * time.Minute
select {
case <-time.After(recastCheckDuration):
wastedTimeSinceLastCheck := r.telemetry.GetWastedTime() - cumulativeWastedTime
zap.L().Sugar().Infow("Checking if node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration)
r.logger.Sugar().Infow("Checking if node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration)

if wastedTimeSinceLastCheck < recastCheckDuration.Seconds()*timeWastedRatio {
zap.L().Sugar().Warnw("Node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration)
r.logger.Sugar().Warnw("Node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration)
if err := r.shutdowner.Shutdown(); err != nil {
// Not sure how to handle this error
return
Expand All @@ -92,7 +94,7 @@ func (r *Recast) StartChecking() {

cumulativeWastedTime = r.telemetry.GetWastedTime()
case <-r.shutdownChan:
zap.L().Sugar().Warnw("Shutting down recast checker!")
r.logger.Sugar().Warnw("Shutting down recast checker!")
return
}
}
Expand Down
35 changes: 20 additions & 15 deletions core/metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/log"
"github.com/gofiber/fiber/v2/middleware/adaptor"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -46,10 +45,13 @@ type (
wastedTimeCounter prometheus.Counter
secretsGeneratedCounter prometheus.Counter
shutdownChan chan bool

// Logger
logger *zap.Logger
}
)

func NewTelemetry(lf fx.Lifecycle, config *config.Config) (*Telemetry, error) {
func NewTelemetry(lf fx.Lifecycle, config *config.Config, logger *zap.Logger) (*Telemetry, error) {
if !config.Telemetry.Enabled {
return nil, nil
}
Expand Down Expand Up @@ -83,6 +85,9 @@ func NewTelemetry(lf fx.Lifecycle, config *config.Config) (*Telemetry, error) {
Help: "Number of secrets generated by this service",
}),
shutdownChan: make(chan bool, 1),

// Logger
logger: logger,
}

lf.Append(fx.Hook{
Expand All @@ -104,7 +109,7 @@ func NewTelemetry(lf fx.Lifecycle, config *config.Config) (*Telemetry, error) {

func (t *Telemetry) StartPushGateway() {
if !t.pushGatewayEnabled {
zap.L().Sugar().Infof("Push Gateway is not enabled")
t.logger.Sugar().Infof("Push Gateway is not enabled")
return
}

Expand All @@ -118,7 +123,7 @@ func (t *Telemetry) StartPushGateway() {
return
case <-pushTicker.C:
if err := t.getAuthedClient().Gatherer(registry).Push(); err != nil {
zap.L().Sugar().Errorw("Failed to push metrics", "error", err)
t.logger.Sugar().Errorw("Failed to push metrics", "error", err)
}
}
}
Expand All @@ -130,30 +135,30 @@ func (t *Telemetry) StartMetricsServer() {
return
}

zap.L().Sugar().Infof("Starting metrics server on port %d", t.serverPort)
t.logger.Sugar().Infof("Starting metrics server on port %d", t.serverPort)
t.app = fiber.New(fiber.Config{
DisableStartupMessage: true,
})
logger := logging.NewServerLogger(zap.L().Named("metrics-server"))
logger := logging.NewServerLogger(t.logger.Named("metrics-server"))
logger.Use(t.app)
t.app.Get(t.serverPath, adaptor.HTTPHandler(promhttp.HandlerFor(t.getPrometheusRegistry(), promhttp.HandlerOpts{})))
go func() {
if err := t.app.Listen(fmt.Sprintf(":%d", t.serverPort)); err != nil {
// Not sure how to handle this error
zap.L().Sugar().Fatalw("Failed to start metrics server", "error", err)
t.logger.Sugar().Fatalw("Failed to start metrics server", "error", err)
}
}()
}

func (t *Telemetry) Stop() error {
zap.L().Sugar().Warnw("Stopping telemetry")
t.logger.Sugar().Warnw("Stopping telemetry")
if err := t.StopPushGateway(); err != nil {
return err
}
if err := t.StopMetricsServer(); err != nil {
return err
}
zap.L().Sugar().Warnw("Stopped telemetry")
t.logger.Sugar().Warnw("Stopped telemetry")
return nil
}

Expand All @@ -163,9 +168,9 @@ func (t *Telemetry) StopPushGateway() error {
}
// Shutdown push gateway
t.shutdownChan <- true
zap.L().Sugar().Infof("Deleting metrics from push gateway")
t.logger.Sugar().Infof("Deleting metrics from push gateway")
if err := t.getAuthedClient().Delete(); err != nil {
zap.L().Sugar().Errorw("Failed to delete metrics", "error", err)
t.logger.Sugar().Errorw("Failed to delete metrics", "error", err)
return err
}

Expand All @@ -177,7 +182,7 @@ func (t *Telemetry) StopMetricsServer() error {
return nil
}

zap.L().Sugar().Infof("Shutting down metrics server")
t.logger.Sugar().Infof("Shutting down metrics server")
return t.app.Shutdown()
}

Expand Down Expand Up @@ -209,17 +214,17 @@ func (t *Telemetry) TrackWastedTime(wastedTime time.Duration) {
}

func (t *Telemetry) GetWastedTime() float64 {
return getCounterValue(t.wastedTimeCounter)
return t.getCounterValue(t.wastedTimeCounter)
}

func (t *Telemetry) TrackGeneratedSecrets(generatedSecrets int) {
t.secretsGeneratedCounter.Add(float64(generatedSecrets))
}

func getCounterValue(counter prometheus.Counter) float64 {
func (t *Telemetry) getCounterValue(counter prometheus.Counter) float64 {
m := &dto.Metric{}
if err := counter.Write(m); err != nil {
log.Warn("Failed to pull metric data", "error", err)
t.logger.Sugar().Errorw("Failed to pull metric data", "error", err)
return 0
}
return m.GetCounter().GetValue()
Expand Down
16 changes: 9 additions & 7 deletions core/metrics/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
coldCachePool *cache.Cache

actionDispatcher action.IBroadcastActionDispatcher
logger *zap.Logger

opts *TimeoutWatcherOptions
}
Expand Down Expand Up @@ -196,7 +197,7 @@ func (t *TimeoutForIp) RecordValidTimeout(timeout time.Duration) {
}
}

func NewTimeoutWatcher(config *config.Config) *TimeoutWatcher {
func NewTimeoutWatcher(config *config.Config, logger *zap.Logger) *TimeoutWatcher {
if !config.TimeoutWatcher.Enabled {
return nil
}
Expand All @@ -207,6 +208,7 @@ func NewTimeoutWatcher(config *config.Config) *TimeoutWatcher {
actionDispatcher: nil,
hotCachePool: cache.New(time.Duration(twConfig.CacheHotPoolTTL)*time.Second, time.Minute),
coldCachePool: cache.New(time.Duration(twConfig.CacheColdPoolTTL)*time.Second, time.Hour),
logger: logger,

// Map options from config to TimeoutWatcherOptions
opts: &TimeoutWatcherOptions{
Expand Down Expand Up @@ -238,7 +240,7 @@ func (tw *TimeoutWatcher) RecordResponse(identifier string, timeout time.Duratio
}

if data, ok = result.(*TimeoutForIp); !ok {
zap.L().Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier)
tw.logger.Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier)
data = NewTimeoutForIp(tw.opts)
}

Expand All @@ -249,7 +251,7 @@ func (tw *TimeoutWatcher) RecordResponse(identifier string, timeout time.Duratio
}

if !successful && timeout > tw.opts.instantCommitThreshold {
zap.L().Sugar().Infow("Timeout recorded higher than instant commit threshold", "ip", identifier, "timeout", timeout)
tw.logger.Sugar().Infow("Timeout recorded higher than instant commit threshold", "ip", identifier, "timeout", timeout)
tw.CommitToColdCacheWithBroadcast(identifier, tw.opts.longestTimeout)
return
}
Expand All @@ -266,14 +268,14 @@ func (tw *TimeoutWatcher) RecordResponse(identifier string, timeout time.Duratio
if sd > tw.opts.sampleDeviation {
return
}
zap.L().Sugar().Infow("Standard deviation is low. We have probably found the timeout! Committing to cold cache", "ip", identifier, "sd", sd)
tw.logger.Sugar().Infow("Standard deviation is low. We have probably found the timeout! Committing to cold cache", "ip", identifier, "sd", sd)
avg := data.GetAverageTimeoutInSample()
timeoutToCommit := avg - (sd * 2)
if timeoutToCommit < tw.opts.lowerTimeoutBound {
timeoutToCommit = tw.opts.lowerTimeoutBound
}

zap.L().Sugar().Infow("Committed to cold cache", "ip", identifier, "timeout", timeoutToCommit)
tw.logger.Sugar().Infow("Committed to cold cache", "ip", identifier, "timeout", timeoutToCommit)
tw.CommitToColdCacheWithBroadcast(identifier, timeoutToCommit)
}

Expand Down Expand Up @@ -310,7 +312,7 @@ func (tw *TimeoutWatcher) GetTimeout(identifier string) time.Duration {
return timeout
}

zap.L().Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier)
tw.logger.Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier)
tw.coldCachePool.Delete(identifier)
}

Expand All @@ -322,7 +324,7 @@ func (tw *TimeoutWatcher) GetTimeout(identifier string) time.Duration {
}

if data, ok = result.(*TimeoutForIp); !ok {
zap.L().Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier)
tw.logger.Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier)
data = NewTimeoutForIp(tw.opts)
}

Expand Down
Loading