From eebed22f35d961568efe0a528958c8ba68fb4e3d Mon Sep 17 00:00:00 2001 From: Tirth Date: Thu, 19 Sep 2024 22:38:54 +0530 Subject: [PATCH 1/3] initial commit --- internal/api/api.go | 6 ++++++ internal/api/api_test.go | 8 ++++---- internal/cli/common/manager.go | 3 ++- internal/cli/common/service.go | 17 +++++++++++------ internal/cli/studio/pull_runner.go | 6 +++--- internal/config/reader.go | 9 ++++++++- internal/config/reader_test.go | 8 ++++---- internal/config/resource_reader.go | 7 ++++++- internal/config/resource_reader_test.go | 4 ++-- internal/config/stream_reader.go | 7 ++++++- internal/config/watcher.go | 11 +++++++---- internal/config/watcher_test.go | 19 +++++++++++-------- internal/impl/io/input_http_server_test.go | 2 +- public/service/stream_builder.go | 2 +- 14 files changed, 72 insertions(+), 37 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index 1b651ef27..8f0136276 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -62,6 +62,7 @@ func New( wholeConf any, log log.Modular, stats metrics.Type, + count *int, opts ...OptFunc, ) (*Type, error) { gMux := mux.NewRouter() @@ -146,6 +147,10 @@ func New( } } + handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *count) + } + if t.conf.DebugEndpoints { t.RegisterEndpoint( "/debug/config/json", "DEBUG: Returns the loaded config as JSON.", @@ -200,6 +205,7 @@ func New( t.RegisterEndpoint("/ping", "Ping me.", handlePing) t.RegisterEndpoint("/version", "Returns the service version.", handleVersion) + t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement) t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints) // If we want to expose a stats endpoint we register the endpoints. diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 97a4cc40d..a0a745dcf 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -20,7 +20,7 @@ func TestAPIEnableCORS(t *testing.T) { conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"*"} - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.NoError(t, err) handler := s.Handler() @@ -41,7 +41,7 @@ func TestAPIEnableCORSOrigins(t *testing.T) { conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"foo", "bar"} - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.NoError(t, err) handler := s.Handler() @@ -81,7 +81,7 @@ func TestAPIEnableCORSNoHeaders(t *testing.T) { conf := api.NewConfig() conf.CORS.Enabled = true - _, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + _, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.Error(t, err) assert.Contains(t, err.Error(), "must specify at least one allowed origin") } @@ -164,7 +164,7 @@ func TestAPIBasicAuth(t *testing.T) { conf.BasicAuth.PasswordHash = tc.correctPass conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ==" - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) if ok := tc.expectedErr(t, err); !(ok && err == nil) { return } diff --git a/internal/cli/common/manager.go b/internal/cli/common/manager.go index be51a3921..0bc86a4c9 100644 --- a/internal/cli/common/manager.go +++ b/internal/cli/common/manager.go @@ -30,6 +30,7 @@ func CreateManager( logger log.Modular, streamsMode bool, conf config.Type, + count *int, mgrOpts ...manager.OptFunc, ) (stoppableMgr *StoppableManager, err error) { var stats *metrics.Namespaced @@ -88,7 +89,7 @@ func CreateManager( } var httpServer *api.Type - if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil { + if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats, count); err != nil { err = fmt.Errorf("failed to initialise API: %w", err) return } diff --git a/internal/cli/common/service.go b/internal/cli/common/service.go index f0f91595d..93c3f6003 100644 --- a/internal/cli/common/service.go +++ b/internal/cli/common/service.go @@ -54,6 +54,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { } verLogger := logger.With("benthos_version", cliOpts.Version) + if mainPath == "" { verLogger.Info("Running without a main config file") } else if inferredMainPath { @@ -73,8 +74,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { if strict && len(lints) > 0 { return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled")) } - - stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf) + //Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag. + success_reload_count := 0 + stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &success_reload_count) if err != nil { return err } @@ -90,9 +92,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { watching := cliOpts.RootFlags.GetWatcher(c) if streamsMode { enableStreamsAPI := !c.Bool("no-api") - stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager()) + stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &success_reload_count) } else { - stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager()) + logger.Info("InitMode Get Initiated... strict:%v", strict) + stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &success_reload_count) } if err != nil { return err @@ -133,6 +136,7 @@ func initStreamsMode( strict, watching, enableAPI bool, confReader *config.Reader, mgr *manager.Type, + success_reload_count *int, ) (RunningStream, error) { logger := mgr.Logger() streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI)) @@ -181,7 +185,7 @@ func initStreamsMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil { return nil, fmt.Errorf("failed to create stream config watcher: %w", err) } } @@ -194,6 +198,7 @@ func initNormalMode( strict, watching bool, confReader *config.Reader, mgr *manager.Type, + success_reload_count *int, ) (newStream RunningStream, stoppedChan chan struct{}, err error) { logger := mgr.Logger() @@ -231,7 +236,7 @@ func initNormalMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil { return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err) } } diff --git a/internal/cli/studio/pull_runner.go b/internal/cli/studio/pull_runner.go index a773ec8a3..c705046e6 100644 --- a/internal/cli/studio/pull_runner.go +++ b/internal/cli/studio/pull_runner.go @@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er tmpTracingSummary.SetEnabled(false) stopMgrTmp, err := common.CreateManager( - r.cliContext, r.cliOpts, r.logger, false, conf, + r.cliContext, r.cliOpts, r.logger, false, conf, nil, manager.OptSetEnvironment(tmpEnv), manager.OptSetBloblangEnvironment(bloblEnv), manager.OptSetFS(sessFS)) @@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) { } } for _, res := range diff.AddResources { - if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil { + if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil { r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err) runErr = err } } if diff.MainConfig != nil { - if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil { + if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil { r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err) runErr = err } diff --git a/internal/config/reader.go b/internal/config/reader.go index 98e7d014c..095717277 100644 --- a/internal/config/reader.go +++ b/internal/config/reader.go @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig, // TriggerMainUpdate attempts to re-read the main configuration file, trigger // the provided main update func, and apply changes to resources to the provided // manager as appropriate. -func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error { +func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, success_reload_count *int) error { conf, _, lints, err := r.readMain(newPath) if errors.Is(err, fs.ErrNotExist) { if r.mainPath != newPath { @@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat mgr.Logger().Error("Failed to apply updated config: %v", err) return err } + + // Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher + // flag then success watcher count gets increased + if success_reload_count != nil { + *success_reload_count = *success_reload_count + 1 + mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *success_reload_count) + } mgr.Logger().Info("Updated main config") } return nil diff --git a/internal/config/reader_test.go b/internal/config/reader_test.go index 96589cd78..230d1631b 100644 --- a/internal/config/reader_test.go +++ b/internal/config/reader_test.go @@ -161,7 +161,7 @@ processor_resources: assert.False(t, testMgr.ProbeProcessor("c")) assert.False(t, testMgr.ProbeProcessor("d")) - require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml")) + require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil)) // Wait for the config watcher to reload the config select { @@ -226,10 +226,10 @@ processor_resources: return nil })) - require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml")) - require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml")) + require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil)) + require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil)) - require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml")) + require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil)) assert.Equal(t, "fooin", conf.Input.Label) assert.Equal(t, "fooout", conf.Output.Label) diff --git a/internal/config/resource_reader.go b/internal/config/resource_reader.go index 26323caa4..dab995c63 100644 --- a/internal/config/resource_reader.go +++ b/internal/config/resource_reader.go @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [ // TriggerResourceUpdate attempts to re-read a resource configuration file and // apply changes to the provided manager as appropriate. -func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error { +func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error { newResConf, lints, err := r.readResource(path) if errors.Is(err, fs.ErrNotExist) { return r.TriggerResourceDelete(mgr, path) @@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa } r.resourceFileInfo[path] = newInfo + + if success_reload_count != nil { + *success_reload_count = *success_reload_count + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count) + } return nil } diff --git a/internal/config/resource_reader_test.go b/internal/config/resource_reader_test.go index 9c1ea280a..f0764335c 100644 --- a/internal/config/resource_reader_test.go +++ b/internal/config/resource_reader_test.go @@ -59,7 +59,7 @@ processor_resources: // Watch for configuration changes. testMgr, err := manager.New(conf.ResourceConfig) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) tCtx, done := context.WithTimeout(context.Background(), time.Second*30) defer done() @@ -175,7 +175,7 @@ processor_resources: // Watch for configuration changes. testMgr, err := manager.New(conf.ResourceConfig) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) tCtx, done := context.WithTimeout(context.Background(), time.Second*30) defer done() diff --git a/internal/config/stream_reader.go b/internal/config/stream_reader.go index 673a0583f..a2a7ed6f6 100644 --- a/internal/config/stream_reader.go +++ b/internal/config/stream_reader.go @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) { // TriggerStreamUpdate attempts to re-read a stream configuration file, and // trigger the provided stream update func. -func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error { +func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error { if r.streamUpdateFn == nil { return nil } @@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path return err } mgr.Logger().Info("Updated stream %v config from file.", info.id) + + if success_reload_count != nil { + *success_reload_count = *success_reload_count + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count) + } return nil } diff --git a/internal/config/watcher.go b/internal/config/watcher.go index d0be14862..26b2d390c 100644 --- a/internal/config/watcher.go +++ b/internal/config/watcher.go @@ -68,7 +68,7 @@ func (r *Reader) modifiedSinceLastRead(name string) bool { // WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be // called before this, as otherwise it is unsafe to register them during // watching. -func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error { +func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, success_reload_count *int) error { if r.watcher != nil { return errors.New("a file watcher has already been started") } @@ -102,9 +102,11 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error } refreshFiles := func() error { + mgr.Logger().Info("Inside the Refresh Files") if !r.streamsMode && r.mainPath != "" { if _, err := r.fs.Stat(r.mainPath); err == nil { if err := addNotWatching([]string{r.mainPath}); err != nil { + mgr.Logger().Error("addNotWatching Error") return err } } @@ -173,13 +175,14 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error } var succeeded bool if nameClean == r.mainPath { - succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath)) + succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, success_reload_count)) } else if _, exists := r.streamFileInfo[nameClean]; exists { - succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean)) + succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, success_reload_count)) } else { - succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean)) + succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, success_reload_count)) } if succeeded { + mgr.Logger().Info("This is the collaps changes %v", collapsedChanges) delete(collapsedChanges, nameClean) } else { change.at = time.Now() diff --git a/internal/config/watcher_test.go b/internal/config/watcher_test.go index 11ae49455..cc135ba6d 100644 --- a/internal/config/watcher_test.go +++ b/internal/config/watcher_test.go @@ -48,7 +48,7 @@ output: // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) // Overwrite original config require.NoError(t, os.WriteFile(confFilePath, dummyConfig, 0o644)) @@ -91,16 +91,19 @@ output: changeChan := make(chan struct{}) var updatedConf stream.Config + var once sync.Once require.NoError(t, rdr.SubscribeConfigChanges(func(conf *Type) error { updatedConf = conf.Config - close(changeChan) + once.Do(func() { + close(changeChan) + }) return nil })) // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) // Create a new config folder and place in it a new copy of the config file newConfDir := filepath.Join(rootDir, "config_new") @@ -184,7 +187,7 @@ func TestReaderStreamDirectWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -268,7 +271,7 @@ func TestReaderStreamWildcardWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -352,7 +355,7 @@ func TestReaderStreamDirWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -443,7 +446,7 @@ func TestReaderWatcherRace(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) for i := 0; i < 2; i++ { // Wait for the config watcher to reload each config @@ -523,7 +526,7 @@ processor_resources: // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, procConfig("a", "a2"), 0o644)) require.NoError(t, os.WriteFile(confBPath, procConfig("b", "b2"), 0o644)) diff --git a/internal/impl/io/input_http_server_test.go b/internal/impl/io/input_http_server_test.go index bc98acd11..bd11d7066 100644 --- a/internal/impl/io/input_http_server_test.go +++ b/internal/impl/io/input_http_server_test.go @@ -222,7 +222,7 @@ func TestHTTPServerLifecycle(t *testing.T) { testURL := fmt.Sprintf("http://localhost:%v/foo/bar", freePort) - apiImpl, err := api.New("", "", apiConf, nil, log.Noop(), metrics.Noop()) + apiImpl, err := api.New("", "", apiConf, nil, log.Noop(), metrics.Noop(), nil) require.NoError(t, err) go func() { diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index b272eb41e..42bba91e8 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -880,7 +880,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { sanitConf.DocsProvider = env _ = s.configSpec.SanitiseYAML(&sanitNode, sanitConf) } - if apiType, err = api.New("", "", s.http, sanitNode, logger, stats); err != nil { + if apiType, err = api.New("", "", s.http, sanitNode, logger, stats, nil); err != nil { return nil, fmt.Errorf("unable to create stream HTTP server due to: %w. Tip: you can disable the server with `http.enabled` set to `false`, or override the configured server with SetHTTPMux", err) } apiMut = apiType From 3d5ef683f2b8fd5ce852653bf155b810c96d44f3 Mon Sep 17 00:00:00 2001 From: Tirth Date: Fri, 20 Sep 2024 12:09:30 +0530 Subject: [PATCH 2/3] fix-lint --- internal/cli/common/service.go | 16 ++++++++-------- internal/config/reader.go | 8 ++++---- internal/config/resource_reader.go | 8 ++++---- internal/config/stream_reader.go | 8 ++++---- internal/config/watcher.go | 8 ++++---- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/internal/cli/common/service.go b/internal/cli/common/service.go index 93c3f6003..35d0e8b28 100644 --- a/internal/cli/common/service.go +++ b/internal/cli/common/service.go @@ -75,8 +75,8 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled")) } //Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag. - success_reload_count := 0 - stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &success_reload_count) + successReloadCount := 0 + stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &successReloadCount) if err != nil { return err } @@ -92,10 +92,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { watching := cliOpts.RootFlags.GetWatcher(c) if streamsMode { enableStreamsAPI := !c.Bool("no-api") - stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &success_reload_count) + stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &successReloadCount) } else { logger.Info("InitMode Get Initiated... strict:%v", strict) - stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &success_reload_count) + stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &successReloadCount) } if err != nil { return err @@ -136,7 +136,7 @@ func initStreamsMode( strict, watching, enableAPI bool, confReader *config.Reader, mgr *manager.Type, - success_reload_count *int, + successReloadCount *int, ) (RunningStream, error) { logger := mgr.Logger() streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI)) @@ -185,7 +185,7 @@ func initStreamsMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil { return nil, fmt.Errorf("failed to create stream config watcher: %w", err) } } @@ -198,7 +198,7 @@ func initNormalMode( strict, watching bool, confReader *config.Reader, mgr *manager.Type, - success_reload_count *int, + successReloadCount *int, ) (newStream RunningStream, stoppedChan chan struct{}, err error) { logger := mgr.Logger() @@ -236,7 +236,7 @@ func initNormalMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil { return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err) } } diff --git a/internal/config/reader.go b/internal/config/reader.go index 095717277..db7edf75e 100644 --- a/internal/config/reader.go +++ b/internal/config/reader.go @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig, // TriggerMainUpdate attempts to re-read the main configuration file, trigger // the provided main update func, and apply changes to resources to the provided // manager as appropriate. -func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, success_reload_count *int) error { +func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, successReloadCount *int) error { conf, _, lints, err := r.readMain(newPath) if errors.Is(err, fs.ErrNotExist) { if r.mainPath != newPath { @@ -419,9 +419,9 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat // Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher // flag then success watcher count gets increased - if success_reload_count != nil { - *success_reload_count = *success_reload_count + 1 - mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *success_reload_count) + if successReloadCount != nil { + *successReloadCount = *successReloadCount + 1 + mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *successReloadCount) } mgr.Logger().Info("Updated main config") } diff --git a/internal/config/resource_reader.go b/internal/config/resource_reader.go index dab995c63..d9e502ada 100644 --- a/internal/config/resource_reader.go +++ b/internal/config/resource_reader.go @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [ // TriggerResourceUpdate attempts to re-read a resource configuration file and // apply changes to the provided manager as appropriate. -func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error { +func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error { newResConf, lints, err := r.readResource(path) if errors.Is(err, fs.ErrNotExist) { return r.TriggerResourceDelete(mgr, path) @@ -274,9 +274,9 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa r.resourceFileInfo[path] = newInfo - if success_reload_count != nil { - *success_reload_count = *success_reload_count + 1 - mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count) + if successReloadCount != nil { + *successReloadCount = *successReloadCount + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount) } return nil } diff --git a/internal/config/stream_reader.go b/internal/config/stream_reader.go index a2a7ed6f6..603b4ca18 100644 --- a/internal/config/stream_reader.go +++ b/internal/config/stream_reader.go @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) { // TriggerStreamUpdate attempts to re-read a stream configuration file, and // trigger the provided stream update func. -func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error { +func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error { if r.streamUpdateFn == nil { return nil } @@ -237,9 +237,9 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path } mgr.Logger().Info("Updated stream %v config from file.", info.id) - if success_reload_count != nil { - *success_reload_count = *success_reload_count + 1 - mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count) + if successReloadCount != nil { + *successReloadCount = *successReloadCount + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount) } return nil } diff --git a/internal/config/watcher.go b/internal/config/watcher.go index 26b2d390c..1f2482b98 100644 --- a/internal/config/watcher.go +++ b/internal/config/watcher.go @@ -68,7 +68,7 @@ func (r *Reader) modifiedSinceLastRead(name string) bool { // WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be // called before this, as otherwise it is unsafe to register them during // watching. -func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, success_reload_count *int) error { +func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, successReloadCount *int) error { if r.watcher != nil { return errors.New("a file watcher has already been started") } @@ -175,11 +175,11 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, succes } var succeeded bool if nameClean == r.mainPath { - succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, success_reload_count)) + succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, successReloadCount)) } else if _, exists := r.streamFileInfo[nameClean]; exists { - succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, success_reload_count)) + succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, successReloadCount)) } else { - succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, success_reload_count)) + succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, successReloadCount)) } if succeeded { mgr.Logger().Info("This is the collaps changes %v", collapsedChanges) From b3472b059f02d299f2f9710b6aaf200bd96e2dca Mon Sep 17 00:00:00 2001 From: Tirth Date: Sat, 28 Sep 2024 02:06:20 +0530 Subject: [PATCH 3/3] abstracted wholeConf and successReloadCount var into common metadata --- internal/api/api.go | 18 ++++++++++----- internal/api/api_test.go | 26 +++++++++++++++++----- internal/cli/common/manager.go | 13 ++++++++--- internal/cli/common/service.go | 13 +++++++++-- internal/impl/io/input_http_server_test.go | 7 ++++-- public/service/stream_builder.go | 6 ++++- 6 files changed, 65 insertions(+), 18 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index 8f0136276..e3098cf16 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -54,17 +54,23 @@ type Type struct { server *http.Server } +// ConfigMetadata manges the config related data. +type ConfigMetadata struct { + WholeConf any + SuccessReloadCount *int +} + // New creates a new Benthos HTTP API. func New( version string, dateBuilt string, conf Config, - wholeConf any, + ConfigMetadata ConfigMetadata, log log.Modular, stats metrics.Type, - count *int, opts ...OptFunc, ) (*Type, error) { + log.Info("HTTP SERVER HAS BEEN EXECUTED After Create Manager") gMux := mux.NewRouter() server := &http.Server{Addr: conf.Address} @@ -104,15 +110,17 @@ func New( } handlePrintJSONConfig := func(w http.ResponseWriter, r *http.Request) { + t.log.Info("handlePrintJson executed") var g any var err error - if node, ok := wholeConf.(yaml.Node); ok { + if node, ok := ConfigMetadata.WholeConf.(yaml.Node); ok { err = node.Decode(&g) } else { g = node } var resBytes []byte if err == nil { + t.log.Info("API INOFOOOOOOOO CHEKC\n %+v \n", g) resBytes, err = json.Marshal(g) } if err != nil { @@ -123,7 +131,7 @@ func New( } handlePrintYAMLConfig := func(w http.ResponseWriter, r *http.Request) { - resBytes, err := yaml.Marshal(wholeConf) + resBytes, err := yaml.Marshal(ConfigMetadata.WholeConf) if err != nil { w.WriteHeader(http.StatusBadGateway) return @@ -148,7 +156,7 @@ func New( } handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *count) + fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *ConfigMetadata.SuccessReloadCount) } if t.conf.DebugEndpoints { diff --git a/internal/api/api_test.go b/internal/api/api_test.go index a0a745dcf..53e824c66 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -19,8 +19,11 @@ func TestAPIEnableCORS(t *testing.T) { conf := api.NewConfig() conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"*"} - - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) require.NoError(t, err) handler := s.Handler() @@ -41,7 +44,11 @@ func TestAPIEnableCORSOrigins(t *testing.T) { conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"foo", "bar"} - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) require.NoError(t, err) handler := s.Handler() @@ -80,8 +87,12 @@ func TestAPIEnableCORSOrigins(t *testing.T) { func TestAPIEnableCORSNoHeaders(t *testing.T) { conf := api.NewConfig() conf.CORS.Enabled = true + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + _, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) - _, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.Error(t, err) assert.Contains(t, err.Error(), "must specify at least one allowed origin") } @@ -164,7 +175,12 @@ func TestAPIBasicAuth(t *testing.T) { conf.BasicAuth.PasswordHash = tc.correctPass conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ==" - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) + if ok := tc.expectedErr(t, err); !(ok && err == nil) { return } diff --git a/internal/cli/common/manager.go b/internal/cli/common/manager.go index 0bc86a4c9..81d86ba86 100644 --- a/internal/cli/common/manager.go +++ b/internal/cli/common/manager.go @@ -30,9 +30,10 @@ func CreateManager( logger log.Modular, streamsMode bool, conf config.Type, - count *int, + successReloadCount *int, mgrOpts ...manager.OptFunc, ) (stoppableMgr *StoppableManager, err error) { + logger.Info("Create Manager Is Executed") var stats *metrics.Namespaced var trac trace.TracerProvider defer func() { @@ -89,7 +90,13 @@ func CreateManager( } var httpServer *api.Type - if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats, count); err != nil { + logger.Info("New Method is calling which is having /debug endpoint") + logger.Info("This is Saint NOde %+v", sanitNode) + configMetadata := api.ConfigMetadata{ + WholeConf: sanitNode, + SuccessReloadCount: successReloadCount, + } + if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, configMetadata, logger, stats); err != nil { err = fmt.Errorf("failed to initialise API: %w", err) return } @@ -286,7 +293,7 @@ func (s *StoppableManager) Stop(ctx context.Context) error { return err } if err := s.mgr.CloseObservability(ctx); err != nil { - s.mgr.Logger().Error("Failed to cleanly close observability components: %s", err) + s.mgr.Logger().Error("Failed to cleanly close observability components: %w", err) } return nil } diff --git a/internal/cli/common/service.go b/internal/cli/common/service.go index 35d0e8b28..f6a05e4b1 100644 --- a/internal/cli/common/service.go +++ b/internal/cli/common/service.go @@ -38,6 +38,7 @@ func (e *ErrExitCode) Unwrap() error { // RunService runs a service command (either the default or the streams // subcommand). func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { + mainPath, inferredMainPath, confReader := ReadConfig(c, cliOpts, streamsMode) conf, pConf, lints, err := confReader.Read() @@ -49,12 +50,13 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { }() logger, err := CreateLogger(c, cliOpts, conf, streamsMode) + logger.Info("RUN SERVICE CALLED Here is the intial conf %+v", conf) + if err != nil { return fmt.Errorf("failed to create logger: %w", err) } verLogger := logger.With("benthos_version", cliOpts.Version) - if mainPath == "" { verLogger.Info("Running without a main config file") } else if inferredMainPath { @@ -64,6 +66,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { } strict := !cliOpts.RootFlags.GetChilled(c) + logger.Info("This is the Stict Value %v ", strict) for _, lint := range lints { if strict { logger.With("lint", lint).Error("Config lint error") @@ -201,7 +204,7 @@ func initNormalMode( successReloadCount *int, ) (newStream RunningStream, stoppedChan chan struct{}, err error) { logger := mgr.Logger() - + logger.Info("Init Mode Has been started...") stoppedChan = make(chan struct{}) var closeOnce sync.Once streamInit := func() (RunningStream, error) { @@ -224,11 +227,16 @@ func initNormalMode( logger.Info(opts.ExecTemplate("Launching a {{.ProductName}} instance, use CTRL+C to close")) if err := confReader.SubscribeConfigChanges(func(newStreamConf *config.Type) error { + mgr.Logger().Info("SbscribeConfigChanges is called") ctx, done := context.WithTimeout(context.Background(), 30*time.Second) defer done() // NOTE: We're ignoring observability field changes for now. return stoppableStream.Replace(ctx, func() (RunningStream, error) { + conf.Config = newStreamConf.Config + mgr.Logger().Info("Here is the Final Updated Config") + mgr.Logger().Info("%+v", conf.Config) + // TODO HERE Starte WITH SanitNode Logic to Encode return streamInit() }) }); err != nil { @@ -236,6 +244,7 @@ func initNormalMode( } if watching { + logger.Info("Inside the Watching Before Begin FileWatching.....") if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil { return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err) } diff --git a/internal/impl/io/input_http_server_test.go b/internal/impl/io/input_http_server_test.go index bd11d7066..f7c8ffc59 100644 --- a/internal/impl/io/input_http_server_test.go +++ b/internal/impl/io/input_http_server_test.go @@ -221,8 +221,11 @@ func TestHTTPServerLifecycle(t *testing.T) { apiConf.Enabled = true testURL := fmt.Sprintf("http://localhost:%v/foo/bar", freePort) - - apiImpl, err := api.New("", "", apiConf, nil, log.Noop(), metrics.Noop(), nil) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + apiImpl, err := api.New("", "", apiConf, configMetadata, log.Noop(), metrics.Noop()) require.NoError(t, err) go func() { diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index 42bba91e8..0fcb886d9 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -880,7 +880,11 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { sanitConf.DocsProvider = env _ = s.configSpec.SanitiseYAML(&sanitNode, sanitConf) } - if apiType, err = api.New("", "", s.http, sanitNode, logger, stats, nil); err != nil { + configMetadata := api.ConfigMetadata{ + WholeConf: sanitNode, + SuccessReloadCount: nil, + } + if apiType, err = api.New("", "", s.http, configMetadata, logger, stats); err != nil { return nil, fmt.Errorf("unable to create stream HTTP server due to: %w. Tip: you can disable the server with `http.enabled` set to `false`, or override the configured server with SetHTTPMux", err) } apiMut = apiType