From bbfd24bbf6c653fd5428fd083813e4a39a83e42f Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 4 Feb 2025 20:22:29 +0000 Subject: [PATCH] feat(31): Refactor logging to use DI container --- config/flags.go | 8 ++++++ core/gossip/handler/broadcast.go | 10 ++++--- core/gossip/memberlist.go | 14 ++++++---- core/metrics/recast.go | 12 ++++---- core/metrics/telemetry.go | 35 ++++++++++++++---------- core/metrics/timeout.go | 16 ++++++----- core/stall/stall_pool.go | 21 +++++++------- core/stall/staller_collection.go | 3 -- generator/config_collection.go | 9 +++--- protocol/ftp/driver/crypto.go | 2 -- protocol/ftp/driver/file_info.go | 7 ----- protocol/ftp/stall/file_stall_factory.go | 5 +++- protocol/ftp/throttle/ftp_throttle.go | 6 ++-- protocol/http/stall/stall.go | 2 +- 14 files changed, 82 insertions(+), 68 deletions(-) diff --git a/config/flags.go b/config/flags.go index 5325c30..d6c72ae 100644 --- a/config/flags.go +++ b/config/flags.go @@ -86,6 +86,13 @@ var commonFlags = flagMap{ configType: "string", defaultValue: defaultConfig.Logging.Path, }, + "log-level": { + flagName: "log-level", + configKey: "logging.level", + description: "The log level to use. (Options: debug, info, warn, error)", + configType: "string", + defaultValue: defaultConfig.Logging.Level, + }, } var httpFlags = flagMap{ @@ -222,6 +229,7 @@ var ftpFlags = flagMap{ } var startFlags = flagMap{ + // @todo - Next Major release [Swap this to be enabled] "http-disabled": { flagName: "http-disabled", configKey: "server.disable", diff --git a/core/gossip/handler/broadcast.go b/core/gossip/handler/broadcast.go index 222bba0..e6491e6 100644 --- a/core/gossip/handler/broadcast.go +++ b/core/gossip/handler/broadcast.go @@ -17,12 +17,14 @@ type ( BroadcastActionHandler struct { timeoutWatcher *metrics.TimeoutWatcher + logger *zap.Logger } ) -func NewBroadcastActionHandler(timeoutWatcher *metrics.TimeoutWatcher) *BroadcastActionHandler { +func NewBroadcastActionHandler(timeoutWatcher *metrics.TimeoutWatcher, logger *zap.Logger) *BroadcastActionHandler { return &BroadcastActionHandler{ timeoutWatcher: timeoutWatcher, + logger: logger, } } @@ -33,18 +35,18 @@ func (h *BroadcastActionHandler) Handle(action *action.BroadcastAction) { ip := data[0] duration, err := strconv.Atoi(data[1]) if err != nil { - zap.L().Sugar().Errorw("Failed to parse duration", "data", action.Data) + h.logger.Sugar().Errorw("Failed to parse duration", "data", action.Data) return } if !h.timeoutWatcher.HasColdCacheTimeout(ip) { - zap.L().Sugar().Infow("ADD_COLD_IP is new to this node, Rebroadcasting.", "ip", ip, "duration", duration) + h.logger.Sugar().Infow("ADD_COLD_IP is new to this node, Rebroadcasting.", "ip", ip, "duration", duration) h.timeoutWatcher.CommitToColdCacheWithBroadcast(ip, time.Duration(duration)) } else { h.timeoutWatcher.CommitToColdCache(ip, time.Duration(duration)) } default: - zap.L().Sugar().Warnw("Received unknown action", "action", action.Action, "data", action.Data) + h.logger.Sugar().Warnw("Received unknown action", "action", action.Action, "data", action.Data) } } diff --git a/core/gossip/memberlist.go b/core/gossip/memberlist.go index 8dfde36..a3823a3 100644 --- a/core/gossip/memberlist.go +++ b/core/gossip/memberlist.go @@ -43,6 +43,9 @@ type ( // The metadata for the current node nodeInfo *NodeInfo + + // Logger used for memberlist + logger *zap.Logger } // Metadata related to the current node @@ -107,13 +110,14 @@ func NewMemberList(lf fx.Lifecycle, logger *zap.Logger, config *config.Config, b connectionAttempts: config.Cluster.ConnectionAttempts, connectionTimeout: time.Duration(config.Cluster.ConnectionTimeout) * time.Second, nodeInfo: nodeInfo, + logger: logger, } lf.Append(fx.Hook{ OnStart: func(ctx context.Context) error { go func() { if err := memberList.Join(); err != nil { - zap.L().Sugar().Error("Failed to join member list", "error", err) + logger.Sugar().Error("Failed to join member list", "error", err) } }() return nil @@ -131,7 +135,7 @@ func (m *Memberlist) Join() error { for i := 0; i < m.connectionAttempts; i++ { nodes, err := m.client.Join(m.nodeInfo.PeerIpAddresses) if err != nil && nodes == 0 { - zap.L().Sugar().Warnw("Failed to connect to other nodes", "err", err, "attempt", i) + m.logger.Sugar().Warnw("Failed to connect to other nodes", "err", err, "attempt", i) time.Sleep(m.connectionTimeout) continue } @@ -142,7 +146,7 @@ func (m *Memberlist) Join() error { return errors.New("failed to connect to any other nodes in the cluster") } - zap.L().Sugar().Infow("Connected to other nodes", "peers", m.client.NumMembers(), "ip", m.GetIpAddress()) + m.logger.Sugar().Infow("Connected to other nodes", "peers", m.client.NumMembers(), "ip", m.GetIpAddress()) m.listenForBroadcastActions() return nil @@ -155,7 +159,7 @@ func (m *Memberlist) GetIpAddress() string { // Broadcasts a message to peer nodes in the cluster func (m *Memberlist) Dispatch(broadcast *action.BroadcastAction) { - zap.L().Sugar().Infow("Broadcasting action", "action", broadcast.Action, "data", broadcast.Data) + m.logger.Sugar().Infow("Broadcasting action", "action", broadcast.Action, "data", broadcast.Data) m.delegate.Broadcasts.QueueBroadcast(broadcast) } @@ -167,7 +171,7 @@ func (m *Memberlist) listenForBroadcastActions() { case msg := <-m.delegate.MessageChan: action, err := action.ParseBroadcastAction(msg) if err != nil { - zap.L().Sugar().Warnw("Failed to parse broadcast action", "err", err, "data", string(msg)) + m.logger.Sugar().Warnw("Failed to parse broadcast action", "err", err, "data", string(msg)) continue } go m.handler.Handle(action) diff --git a/core/metrics/recast.go b/core/metrics/recast.go index 65ee5e2..0d62e5b 100644 --- a/core/metrics/recast.go +++ b/core/metrics/recast.go @@ -24,6 +24,7 @@ type ( telemetry *Telemetry shutdownChan chan bool shutdowner fx.Shutdowner + logger *zap.Logger // Internals minimumRecastInterval int @@ -32,7 +33,7 @@ type ( } ) -func NewRecast(lf fx.Lifecycle, shutdowner fx.Shutdowner, config *config.Config, telemetry *Telemetry) (*Recast, error) { +func NewRecast(lf fx.Lifecycle, shutdowner fx.Shutdowner, config *config.Config, telemetry *Telemetry, logger *zap.Logger) (*Recast, error) { if !config.Recast.Enabled { return nil, errors.New("Recast is not enabled") } @@ -45,6 +46,7 @@ func NewRecast(lf fx.Lifecycle, shutdowner fx.Shutdowner, config *config.Config, shutdownChan: make(chan bool), telemetry: telemetry, shutdowner: shutdowner, + logger: logger, minimumRecastInterval: config.Recast.MinimumRecastIntervalMin, maximumRecastInterval: config.Recast.MaximumRecastIntervalMin, @@ -74,15 +76,15 @@ func (r *Recast) StartChecking() { for { // Sleep for a random amount of time between the minimum and maximum recast interval recastWaitTime := rand.RandomInt(r.minimumRecastInterval, r.minimumRecastInterval) - zap.L().Sugar().Info("Recast Waiting", "timeUntilNextCheck", recastWaitTime) + r.logger.Sugar().Info("Recast Waiting", "timeUntilNextCheck", recastWaitTime) recastCheckDuration := time.Duration(recastWaitTime) * time.Minute select { case <-time.After(recastCheckDuration): wastedTimeSinceLastCheck := r.telemetry.GetWastedTime() - cumulativeWastedTime - zap.L().Sugar().Infow("Checking if node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration) + r.logger.Sugar().Infow("Checking if node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration) if wastedTimeSinceLastCheck < recastCheckDuration.Seconds()*timeWastedRatio { - zap.L().Sugar().Warnw("Node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration) + r.logger.Sugar().Warnw("Node should recast", "wastedTimeSinceLastCheck", wastedTimeSinceLastCheck, "timeWastedRatio", timeWastedRatio, "recastCheckDuration", recastCheckDuration) if err := r.shutdowner.Shutdown(); err != nil { // Not sure how to handle this error return @@ -92,7 +94,7 @@ func (r *Recast) StartChecking() { cumulativeWastedTime = r.telemetry.GetWastedTime() case <-r.shutdownChan: - zap.L().Sugar().Warnw("Shutting down recast checker!") + r.logger.Sugar().Warnw("Shutting down recast checker!") return } } diff --git a/core/metrics/telemetry.go b/core/metrics/telemetry.go index 10127b1..a39461d 100644 --- a/core/metrics/telemetry.go +++ b/core/metrics/telemetry.go @@ -6,7 +6,6 @@ import ( "time" "github.com/gofiber/fiber/v2" - "github.com/gofiber/fiber/v2/log" "github.com/gofiber/fiber/v2/middleware/adaptor" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -46,10 +45,13 @@ type ( wastedTimeCounter prometheus.Counter secretsGeneratedCounter prometheus.Counter shutdownChan chan bool + + // Logger + logger *zap.Logger } ) -func NewTelemetry(lf fx.Lifecycle, config *config.Config) (*Telemetry, error) { +func NewTelemetry(lf fx.Lifecycle, config *config.Config, logger *zap.Logger) (*Telemetry, error) { if !config.Telemetry.Enabled { return nil, nil } @@ -83,6 +85,9 @@ func NewTelemetry(lf fx.Lifecycle, config *config.Config) (*Telemetry, error) { Help: "Number of secrets generated by this service", }), shutdownChan: make(chan bool, 1), + + // Logger + logger: logger, } lf.Append(fx.Hook{ @@ -104,7 +109,7 @@ func NewTelemetry(lf fx.Lifecycle, config *config.Config) (*Telemetry, error) { func (t *Telemetry) StartPushGateway() { if !t.pushGatewayEnabled { - zap.L().Sugar().Infof("Push Gateway is not enabled") + t.logger.Sugar().Infof("Push Gateway is not enabled") return } @@ -118,7 +123,7 @@ func (t *Telemetry) StartPushGateway() { return case <-pushTicker.C: if err := t.getAuthedClient().Gatherer(registry).Push(); err != nil { - zap.L().Sugar().Errorw("Failed to push metrics", "error", err) + t.logger.Sugar().Errorw("Failed to push metrics", "error", err) } } } @@ -130,30 +135,30 @@ func (t *Telemetry) StartMetricsServer() { return } - zap.L().Sugar().Infof("Starting metrics server on port %d", t.serverPort) + t.logger.Sugar().Infof("Starting metrics server on port %d", t.serverPort) t.app = fiber.New(fiber.Config{ DisableStartupMessage: true, }) - logger := logging.NewServerLogger(zap.L().Named("metrics-server")) + logger := logging.NewServerLogger(t.logger.Named("metrics-server")) logger.Use(t.app) t.app.Get(t.serverPath, adaptor.HTTPHandler(promhttp.HandlerFor(t.getPrometheusRegistry(), promhttp.HandlerOpts{}))) go func() { if err := t.app.Listen(fmt.Sprintf(":%d", t.serverPort)); err != nil { // Not sure how to handle this error - zap.L().Sugar().Fatalw("Failed to start metrics server", "error", err) + t.logger.Sugar().Fatalw("Failed to start metrics server", "error", err) } }() } func (t *Telemetry) Stop() error { - zap.L().Sugar().Warnw("Stopping telemetry") + t.logger.Sugar().Warnw("Stopping telemetry") if err := t.StopPushGateway(); err != nil { return err } if err := t.StopMetricsServer(); err != nil { return err } - zap.L().Sugar().Warnw("Stopped telemetry") + t.logger.Sugar().Warnw("Stopped telemetry") return nil } @@ -163,9 +168,9 @@ func (t *Telemetry) StopPushGateway() error { } // Shutdown push gateway t.shutdownChan <- true - zap.L().Sugar().Infof("Deleting metrics from push gateway") + t.logger.Sugar().Infof("Deleting metrics from push gateway") if err := t.getAuthedClient().Delete(); err != nil { - zap.L().Sugar().Errorw("Failed to delete metrics", "error", err) + t.logger.Sugar().Errorw("Failed to delete metrics", "error", err) return err } @@ -177,7 +182,7 @@ func (t *Telemetry) StopMetricsServer() error { return nil } - zap.L().Sugar().Infof("Shutting down metrics server") + t.logger.Sugar().Infof("Shutting down metrics server") return t.app.Shutdown() } @@ -209,17 +214,17 @@ func (t *Telemetry) TrackWastedTime(wastedTime time.Duration) { } func (t *Telemetry) GetWastedTime() float64 { - return getCounterValue(t.wastedTimeCounter) + return t.getCounterValue(t.wastedTimeCounter) } func (t *Telemetry) TrackGeneratedSecrets(generatedSecrets int) { t.secretsGeneratedCounter.Add(float64(generatedSecrets)) } -func getCounterValue(counter prometheus.Counter) float64 { +func (t *Telemetry) getCounterValue(counter prometheus.Counter) float64 { m := &dto.Metric{} if err := counter.Write(m); err != nil { - log.Warn("Failed to pull metric data", "error", err) + t.logger.Sugar().Errorw("Failed to pull metric data", "error", err) return 0 } return m.GetCounter().GetValue() diff --git a/core/metrics/timeout.go b/core/metrics/timeout.go index 2c7838e..159f4e7 100644 --- a/core/metrics/timeout.go +++ b/core/metrics/timeout.go @@ -30,6 +30,7 @@ type ( coldCachePool *cache.Cache actionDispatcher action.IBroadcastActionDispatcher + logger *zap.Logger opts *TimeoutWatcherOptions } @@ -196,7 +197,7 @@ func (t *TimeoutForIp) RecordValidTimeout(timeout time.Duration) { } } -func NewTimeoutWatcher(config *config.Config) *TimeoutWatcher { +func NewTimeoutWatcher(config *config.Config, logger *zap.Logger) *TimeoutWatcher { if !config.TimeoutWatcher.Enabled { return nil } @@ -207,6 +208,7 @@ func NewTimeoutWatcher(config *config.Config) *TimeoutWatcher { actionDispatcher: nil, hotCachePool: cache.New(time.Duration(twConfig.CacheHotPoolTTL)*time.Second, time.Minute), coldCachePool: cache.New(time.Duration(twConfig.CacheColdPoolTTL)*time.Second, time.Hour), + logger: logger, // Map options from config to TimeoutWatcherOptions opts: &TimeoutWatcherOptions{ @@ -238,7 +240,7 @@ func (tw *TimeoutWatcher) RecordResponse(identifier string, timeout time.Duratio } if data, ok = result.(*TimeoutForIp); !ok { - zap.L().Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier) + tw.logger.Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier) data = NewTimeoutForIp(tw.opts) } @@ -249,7 +251,7 @@ func (tw *TimeoutWatcher) RecordResponse(identifier string, timeout time.Duratio } if !successful && timeout > tw.opts.instantCommitThreshold { - zap.L().Sugar().Infow("Timeout recorded higher than instant commit threshold", "ip", identifier, "timeout", timeout) + tw.logger.Sugar().Infow("Timeout recorded higher than instant commit threshold", "ip", identifier, "timeout", timeout) tw.CommitToColdCacheWithBroadcast(identifier, tw.opts.longestTimeout) return } @@ -266,14 +268,14 @@ func (tw *TimeoutWatcher) RecordResponse(identifier string, timeout time.Duratio if sd > tw.opts.sampleDeviation { return } - zap.L().Sugar().Infow("Standard deviation is low. We have probably found the timeout! Committing to cold cache", "ip", identifier, "sd", sd) + tw.logger.Sugar().Infow("Standard deviation is low. We have probably found the timeout! Committing to cold cache", "ip", identifier, "sd", sd) avg := data.GetAverageTimeoutInSample() timeoutToCommit := avg - (sd * 2) if timeoutToCommit < tw.opts.lowerTimeoutBound { timeoutToCommit = tw.opts.lowerTimeoutBound } - zap.L().Sugar().Infow("Committed to cold cache", "ip", identifier, "timeout", timeoutToCommit) + tw.logger.Sugar().Infow("Committed to cold cache", "ip", identifier, "timeout", timeoutToCommit) tw.CommitToColdCacheWithBroadcast(identifier, timeoutToCommit) } @@ -310,7 +312,7 @@ func (tw *TimeoutWatcher) GetTimeout(identifier string) time.Duration { return timeout } - zap.L().Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier) + tw.logger.Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier) tw.coldCachePool.Delete(identifier) } @@ -322,7 +324,7 @@ func (tw *TimeoutWatcher) GetTimeout(identifier string) time.Duration { } if data, ok = result.(*TimeoutForIp); !ok { - zap.L().Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier) + tw.logger.Sugar().Warn("Failed to cast timeout data for IP address. Resetting", "ip", identifier) data = NewTimeoutForIp(tw.opts) } diff --git a/core/stall/stall_pool.go b/core/stall/stall_pool.go index b6e220e..dd9c37b 100644 --- a/core/stall/stall_pool.go +++ b/core/stall/stall_pool.go @@ -13,8 +13,8 @@ import ( type ( StallerPool struct { - // Map of connection ID's to map to HttpStaller stallers *StallerCollection + logger *zap.Logger deregisterChan chan Staller stopChan chan bool maximumConnections int @@ -26,8 +26,9 @@ type ( } ) -func NewStallerPool(lifecycle fx.Lifecycle, config *config.Config) *StallerPool { +func NewStallerPool(lifecycle fx.Lifecycle, config *config.Config, logger *zap.Logger) *StallerPool { pool := &StallerPool{ + logger: logger, deregisterChan: make(chan Staller, config.Staller.MaximumConnections), stopChan: make(chan bool), stallers: NewStallerCollection(config.Staller.GroupLimit), @@ -40,7 +41,8 @@ func NewStallerPool(lifecycle fx.Lifecycle, config *config.Config) *StallerPool return nil }, OnStop: func(context.Context) error { - pool.Stop() + // Top priority is to stop the pool while connections are still active + // so is called at top level return nil }, }) @@ -50,7 +52,7 @@ func NewStallerPool(lifecycle fx.Lifecycle, config *config.Config) *StallerPool func (s *StallerPool) Register(staller Staller) error { if s.stallers.Len() >= s.maximumConnections { - zap.L().Sugar().Warnw("maximum connections reached, cannot register staller") + s.logger.Sugar().Warnw("maximum connections reached, cannot register staller") return fmt.Errorf("maximum connections reached, cannot register staller") } @@ -64,7 +66,7 @@ func (s *StallerPool) Register(staller Staller) error { // If we fail to add a staller close it and close it and abort the registation the registration if err := s.stallers.Add(staller); err != nil { staller.Close() - zap.L().Error("failed to add staller", zap.Error(err)) + s.logger.Error("failed to add staller", zap.Error(err)) return err } @@ -99,19 +101,17 @@ func (s *StallerPool) Start() { } func (s *StallerPool) Stop() { - zap.L().Sugar().Warnw("Stopping staller pool") + s.logger.Sugar().Warnw("Stopping staller pool") for _, ipMap := range s.stallers.stallers { for _, staller := range ipMap { - zap.L().Sugar().Warnw("Closing staller", "group", staller.GetGroupIdentifier(), "id", staller.GetGroupIdentifier()) + s.logger.Sugar().Warnw("Closing staller", "group", staller.GetGroupIdentifier(), "id", staller.GetIdentifier()) staller.Close() } } - // Sometimes the deregister channel hangs - // @todo: Fix this go (func() { s.stopChan <- true })() - zap.L().Sugar().Warnw("Stopped staller pool") + s.logger.Sugar().Warnw("Stopped staller pool") } func (s *StallerPool) StopByIdentifier(id string) { @@ -124,7 +124,6 @@ func (s *StallerPool) Prune() { if length > target { diff := length - target - fmt.Println(diff) s.stallers.PruneNByIdentifier(diff) } } diff --git a/core/stall/staller_collection.go b/core/stall/staller_collection.go index 1ce2e5a..b418e44 100644 --- a/core/stall/staller_collection.go +++ b/core/stall/staller_collection.go @@ -3,8 +3,6 @@ package stall import ( "fmt" "sync" - - "go.uber.org/zap" ) // Structured map for stallers mapped by identifierAddress and Connection ID @@ -39,7 +37,6 @@ func (c *StallerCollection) Add(staller Staller) error { } func (c *StallerCollection) Delete(staller Staller) { - zap.L().Sugar().Debugw("Deleting staller", "groupId", staller.GetIdentifier(), "id", staller.GetIdentifier()) c.lock.Lock() defer c.lock.Unlock() diff --git a/generator/config_collection.go b/generator/config_collection.go index 96df5d5..1a6ec6d 100644 --- a/generator/config_collection.go +++ b/generator/config_collection.go @@ -24,26 +24,25 @@ type ( } ) -func NewConfigGeneratorCollection() (*ConfigGeneratorCollection, error) { +func NewConfigGeneratorCollection(logger *zap.Logger) (*ConfigGeneratorCollection, error) { schemaDir, err := schemaFiles.ReadDir(schemaDir) if err != nil { return nil, err } - logger := zap.L().Sugar() generators := make(map[string]*chaff.RootGenerator) for _, dirEntry := range schemaDir { - logger.Debug("Parsing Schema File", "filename", dirEntry.Name()) + logger.Sugar().Debugw("Parsing Schema File", "filename", dirEntry.Name()) generator, err := parseSchemaFile(dirEntry) if err != nil { - logger.Warnw("Failed to parse schema file", "filename", dirEntry.Name(), "error", err) + logger.Sugar().Warnw("Failed to parse schema file", "filename", dirEntry.Name(), "error", err) continue } for path, err := range generator.Metadata.Errors { - logger.Debug("Issue when parsing schema file", "filename", dirEntry.Name(), "path", path, "error", err) + logger.Sugar().Debugw("Issue when parsing schema file", "filename", dirEntry.Name(), "path", path, "error", err) } generators[dirEntry.Name()] = generator diff --git a/protocol/ftp/driver/crypto.go b/protocol/ftp/driver/crypto.go index 3d46b0b..2232512 100644 --- a/protocol/ftp/driver/crypto.go +++ b/protocol/ftp/driver/crypto.go @@ -7,7 +7,6 @@ import ( "crypto/x509" "crypto/x509/pkix" "encoding/pem" - "fmt" "math/big" "time" @@ -18,7 +17,6 @@ import ( func getSelfSignedCert(c *config.Config) (tls.Certificate, error) { key, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { - fmt.Println("Failed to generate key") return tls.Certificate{}, err } diff --git a/protocol/ftp/driver/file_info.go b/protocol/ftp/driver/file_info.go index 3489336..216ecea 100644 --- a/protocol/ftp/driver/file_info.go +++ b/protocol/ftp/driver/file_info.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ryanolee/go-pot/generator/filesystem" - "go.uber.org/zap" ) var isDirRegexp = regexp.MustCompile(fmt.Sprintf(`(%s\/?|\/|\/\-a)$`, filesystem.DirSuffix)) @@ -28,31 +27,25 @@ func NewFtpFileInfo(path string, fileSize int) *FtpFileInfo { } func (f *FtpFileInfo) Name() string { - zap.L().Sugar().Debug("__STUB__ Name") return f.path } func (f *FtpFileInfo) Size() int64 { - zap.L().Sugar().Debug("__STUB__ Size") return int64(f.fileSize) } func (f *FtpFileInfo) Mode() os.FileMode { - zap.L().Sugar().Debug("__STUB__ Mode") return 0 } func (f *FtpFileInfo) ModTime() time.Time { - zap.L().Sugar().Debug("__STUB__ ModTime") return time.Now() } func (f *FtpFileInfo) IsDir() bool { - zap.L().Sugar().Debug("__STUB__ IsDir") return isDirRegexp.MatchString(f.path) } func (f *FtpFileInfo) Sys() any { - zap.L().Sugar().Debug("__STUB__ Sys") return nil } diff --git a/protocol/ftp/stall/file_stall_factory.go b/protocol/ftp/stall/file_stall_factory.go index 584923e..804b352 100644 --- a/protocol/ftp/stall/file_stall_factory.go +++ b/protocol/ftp/stall/file_stall_factory.go @@ -21,6 +21,7 @@ type ( stallerPool *stall.StallerPool configGenerators *generator.ConfigGeneratorCollection secretGenerators *secrets.SecretGeneratorCollection + logger *zap.Logger } ) @@ -29,12 +30,14 @@ func NewFtpFileStallerFactory( stallerPool *stall.StallerPool, configGenerators *generator.ConfigGeneratorCollection, secretGenerators *secrets.SecretGeneratorCollection, + logger *zap.Logger, ) *FtpFileStallerFactory { return &FtpFileStallerFactory{ config: config, stallerPool: stallerPool, configGenerators: configGenerators, secretGenerators: secretGenerators, + logger: logger, } } @@ -54,7 +57,7 @@ func (f *FtpFileStallerFactory) FromName(ctx ftpserver.ClientContext, name strin }) if err := f.stallerPool.Register(staller); err != nil { - zap.L().Warn("Failed to register staller", zap.Error(err)) + f.logger.Warn("Failed to register staller", zap.Error(err)) staller.Close() return nil } diff --git a/protocol/ftp/throttle/ftp_throttle.go b/protocol/ftp/throttle/ftp_throttle.go index 9d6d11a..bbb48eb 100644 --- a/protocol/ftp/throttle/ftp_throttle.go +++ b/protocol/ftp/throttle/ftp_throttle.go @@ -25,9 +25,10 @@ type FtpThrottle struct { waitTime time.Duration closeChannel chan bool + logger *zap.Logger } -func NewFtpThrottle(lf fx.Lifecycle, cfg *config.Config) *FtpThrottle { +func NewFtpThrottle(lf fx.Lifecycle, cfg *config.Config, logger *zap.Logger) *FtpThrottle { if !cfg.FtpServer.Enabled { return nil } @@ -38,6 +39,7 @@ func NewFtpThrottle(lf fx.Lifecycle, cfg *config.Config) *FtpThrottle { waitChannels: make(map[int64][]chan bool), maxPendingOperations: cfg.FtpServer.Throttle.MaxPendingOperations, waitTime: time.Millisecond * time.Duration(cfg.FtpServer.Throttle.WaitTime), + logger: logger, } lf.Append(fx.Hook{ @@ -98,7 +100,7 @@ func (t *FtpThrottle) ReleasePendingProcess(id int64) { waitChannel <- true close(waitChannel) - zap.L().Sugar().Debug("Released pending operation for conn", "id", id) + t.logger.Sugar().Debug("Released pending operation for conn", "id", id) // Remove the released operation from the list t.waitChannels[id] = t.waitChannels[id][1:] diff --git a/protocol/http/stall/stall.go b/protocol/http/stall/stall.go index 39bd7ad..9080050 100644 --- a/protocol/http/stall/stall.go +++ b/protocol/http/stall/stall.go @@ -180,7 +180,7 @@ func (s *HttpStaller) PushDataToClient(ctx context.Context, w *bufio.Writer, dat case <-ctx.Done(): // Flush the rest of the data to the client in the case we are closing if _, err := w.Write(data[i:]); err != nil { - zap.L().Sugar().Warn("Failed rest if data", "connId", s.id, "err", err) + zap.L().Sugar().Warn("Failed to write rest of data", "connId", s.id, "err", err) } w.Flush()