diff --git a/internal/api/api.go b/internal/api/api.go index 1b651ef27..e3098cf16 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -54,16 +54,23 @@ type Type struct { server *http.Server } +// ConfigMetadata manges the config related data. +type ConfigMetadata struct { + WholeConf any + SuccessReloadCount *int +} + // New creates a new Benthos HTTP API. func New( version string, dateBuilt string, conf Config, - wholeConf any, + ConfigMetadata ConfigMetadata, log log.Modular, stats metrics.Type, opts ...OptFunc, ) (*Type, error) { + log.Info("HTTP SERVER HAS BEEN EXECUTED After Create Manager") gMux := mux.NewRouter() server := &http.Server{Addr: conf.Address} @@ -103,15 +110,17 @@ func New( } handlePrintJSONConfig := func(w http.ResponseWriter, r *http.Request) { + t.log.Info("handlePrintJson executed") var g any var err error - if node, ok := wholeConf.(yaml.Node); ok { + if node, ok := ConfigMetadata.WholeConf.(yaml.Node); ok { err = node.Decode(&g) } else { g = node } var resBytes []byte if err == nil { + t.log.Info("API INOFOOOOOOOO CHEKC\n %+v \n", g) resBytes, err = json.Marshal(g) } if err != nil { @@ -122,7 +131,7 @@ func New( } handlePrintYAMLConfig := func(w http.ResponseWriter, r *http.Request) { - resBytes, err := yaml.Marshal(wholeConf) + resBytes, err := yaml.Marshal(ConfigMetadata.WholeConf) if err != nil { w.WriteHeader(http.StatusBadGateway) return @@ -146,6 +155,10 @@ func New( } } + handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *ConfigMetadata.SuccessReloadCount) + } + if t.conf.DebugEndpoints { t.RegisterEndpoint( "/debug/config/json", "DEBUG: Returns the loaded config as JSON.", @@ -200,6 +213,7 @@ func New( t.RegisterEndpoint("/ping", "Ping me.", handlePing) t.RegisterEndpoint("/version", "Returns the service version.", handleVersion) + t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement) t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints) // If we want to expose a stats endpoint we register the endpoints. diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 97a4cc40d..53e824c66 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -19,8 +19,11 @@ func TestAPIEnableCORS(t *testing.T) { conf := api.NewConfig() conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"*"} - - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) require.NoError(t, err) handler := s.Handler() @@ -41,7 +44,11 @@ func TestAPIEnableCORSOrigins(t *testing.T) { conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"foo", "bar"} - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) require.NoError(t, err) handler := s.Handler() @@ -80,8 +87,12 @@ func TestAPIEnableCORSOrigins(t *testing.T) { func TestAPIEnableCORSNoHeaders(t *testing.T) { conf := api.NewConfig() conf.CORS.Enabled = true + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + _, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) - _, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) require.Error(t, err) assert.Contains(t, err.Error(), "must specify at least one allowed origin") } @@ -164,7 +175,12 @@ func TestAPIBasicAuth(t *testing.T) { conf.BasicAuth.PasswordHash = tc.correctPass conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ==" - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop()) + if ok := tc.expectedErr(t, err); !(ok && err == nil) { return } diff --git a/internal/cli/common/manager.go b/internal/cli/common/manager.go index be51a3921..81d86ba86 100644 --- a/internal/cli/common/manager.go +++ b/internal/cli/common/manager.go @@ -30,8 +30,10 @@ func CreateManager( logger log.Modular, streamsMode bool, conf config.Type, + successReloadCount *int, mgrOpts ...manager.OptFunc, ) (stoppableMgr *StoppableManager, err error) { + logger.Info("Create Manager Is Executed") var stats *metrics.Namespaced var trac trace.TracerProvider defer func() { @@ -88,7 +90,13 @@ func CreateManager( } var httpServer *api.Type - if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil { + logger.Info("New Method is calling which is having /debug endpoint") + logger.Info("This is Saint NOde %+v", sanitNode) + configMetadata := api.ConfigMetadata{ + WholeConf: sanitNode, + SuccessReloadCount: successReloadCount, + } + if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, configMetadata, logger, stats); err != nil { err = fmt.Errorf("failed to initialise API: %w", err) return } @@ -285,7 +293,7 @@ func (s *StoppableManager) Stop(ctx context.Context) error { return err } if err := s.mgr.CloseObservability(ctx); err != nil { - s.mgr.Logger().Error("Failed to cleanly close observability components: %s", err) + s.mgr.Logger().Error("Failed to cleanly close observability components: %w", err) } return nil } diff --git a/internal/cli/common/service.go b/internal/cli/common/service.go index f0f91595d..f6a05e4b1 100644 --- a/internal/cli/common/service.go +++ b/internal/cli/common/service.go @@ -38,6 +38,7 @@ func (e *ErrExitCode) Unwrap() error { // RunService runs a service command (either the default or the streams // subcommand). func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { + mainPath, inferredMainPath, confReader := ReadConfig(c, cliOpts, streamsMode) conf, pConf, lints, err := confReader.Read() @@ -49,6 +50,8 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { }() logger, err := CreateLogger(c, cliOpts, conf, streamsMode) + logger.Info("RUN SERVICE CALLED Here is the intial conf %+v", conf) + if err != nil { return fmt.Errorf("failed to create logger: %w", err) } @@ -63,6 +66,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { } strict := !cliOpts.RootFlags.GetChilled(c) + logger.Info("This is the Stict Value %v ", strict) for _, lint := range lints { if strict { logger.With("lint", lint).Error("Config lint error") @@ -73,8 +77,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { if strict && len(lints) > 0 { return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled")) } - - stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf) + //Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag. + successReloadCount := 0 + stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &successReloadCount) if err != nil { return err } @@ -90,9 +95,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { watching := cliOpts.RootFlags.GetWatcher(c) if streamsMode { enableStreamsAPI := !c.Bool("no-api") - stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager()) + stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &successReloadCount) } else { - stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager()) + logger.Info("InitMode Get Initiated... strict:%v", strict) + stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &successReloadCount) } if err != nil { return err @@ -133,6 +139,7 @@ func initStreamsMode( strict, watching, enableAPI bool, confReader *config.Reader, mgr *manager.Type, + successReloadCount *int, ) (RunningStream, error) { logger := mgr.Logger() streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI)) @@ -181,7 +188,7 @@ func initStreamsMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil { return nil, fmt.Errorf("failed to create stream config watcher: %w", err) } } @@ -194,9 +201,10 @@ func initNormalMode( strict, watching bool, confReader *config.Reader, mgr *manager.Type, + successReloadCount *int, ) (newStream RunningStream, stoppedChan chan struct{}, err error) { logger := mgr.Logger() - + logger.Info("Init Mode Has been started...") stoppedChan = make(chan struct{}) var closeOnce sync.Once streamInit := func() (RunningStream, error) { @@ -219,11 +227,16 @@ func initNormalMode( logger.Info(opts.ExecTemplate("Launching a {{.ProductName}} instance, use CTRL+C to close")) if err := confReader.SubscribeConfigChanges(func(newStreamConf *config.Type) error { + mgr.Logger().Info("SbscribeConfigChanges is called") ctx, done := context.WithTimeout(context.Background(), 30*time.Second) defer done() // NOTE: We're ignoring observability field changes for now. return stoppableStream.Replace(ctx, func() (RunningStream, error) { + conf.Config = newStreamConf.Config + mgr.Logger().Info("Here is the Final Updated Config") + mgr.Logger().Info("%+v", conf.Config) + // TODO HERE Starte WITH SanitNode Logic to Encode return streamInit() }) }); err != nil { @@ -231,7 +244,8 @@ func initNormalMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict); err != nil { + logger.Info("Inside the Watching Before Begin FileWatching.....") + if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil { return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err) } } diff --git a/internal/cli/studio/pull_runner.go b/internal/cli/studio/pull_runner.go index a773ec8a3..c705046e6 100644 --- a/internal/cli/studio/pull_runner.go +++ b/internal/cli/studio/pull_runner.go @@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er tmpTracingSummary.SetEnabled(false) stopMgrTmp, err := common.CreateManager( - r.cliContext, r.cliOpts, r.logger, false, conf, + r.cliContext, r.cliOpts, r.logger, false, conf, nil, manager.OptSetEnvironment(tmpEnv), manager.OptSetBloblangEnvironment(bloblEnv), manager.OptSetFS(sessFS)) @@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) { } } for _, res := range diff.AddResources { - if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil { + if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil { r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err) runErr = err } } if diff.MainConfig != nil { - if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil { + if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil { r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err) runErr = err } diff --git a/internal/config/reader.go b/internal/config/reader.go index 98e7d014c..db7edf75e 100644 --- a/internal/config/reader.go +++ b/internal/config/reader.go @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig, // TriggerMainUpdate attempts to re-read the main configuration file, trigger // the provided main update func, and apply changes to resources to the provided // manager as appropriate. -func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error { +func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, successReloadCount *int) error { conf, _, lints, err := r.readMain(newPath) if errors.Is(err, fs.ErrNotExist) { if r.mainPath != newPath { @@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat mgr.Logger().Error("Failed to apply updated config: %v", err) return err } + + // Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher + // flag then success watcher count gets increased + if successReloadCount != nil { + *successReloadCount = *successReloadCount + 1 + mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *successReloadCount) + } mgr.Logger().Info("Updated main config") } return nil diff --git a/internal/config/reader_test.go b/internal/config/reader_test.go index 96589cd78..230d1631b 100644 --- a/internal/config/reader_test.go +++ b/internal/config/reader_test.go @@ -161,7 +161,7 @@ processor_resources: assert.False(t, testMgr.ProbeProcessor("c")) assert.False(t, testMgr.ProbeProcessor("d")) - require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml")) + require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil)) // Wait for the config watcher to reload the config select { @@ -226,10 +226,10 @@ processor_resources: return nil })) - require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml")) - require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml")) + require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil)) + require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil)) - require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml")) + require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil)) assert.Equal(t, "fooin", conf.Input.Label) assert.Equal(t, "fooout", conf.Output.Label) diff --git a/internal/config/resource_reader.go b/internal/config/resource_reader.go index 26323caa4..d9e502ada 100644 --- a/internal/config/resource_reader.go +++ b/internal/config/resource_reader.go @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [ // TriggerResourceUpdate attempts to re-read a resource configuration file and // apply changes to the provided manager as appropriate. -func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error { +func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error { newResConf, lints, err := r.readResource(path) if errors.Is(err, fs.ErrNotExist) { return r.TriggerResourceDelete(mgr, path) @@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa } r.resourceFileInfo[path] = newInfo + + if successReloadCount != nil { + *successReloadCount = *successReloadCount + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount) + } return nil } diff --git a/internal/config/resource_reader_test.go b/internal/config/resource_reader_test.go index 9c1ea280a..f0764335c 100644 --- a/internal/config/resource_reader_test.go +++ b/internal/config/resource_reader_test.go @@ -59,7 +59,7 @@ processor_resources: // Watch for configuration changes. testMgr, err := manager.New(conf.ResourceConfig) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) tCtx, done := context.WithTimeout(context.Background(), time.Second*30) defer done() @@ -175,7 +175,7 @@ processor_resources: // Watch for configuration changes. testMgr, err := manager.New(conf.ResourceConfig) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) tCtx, done := context.WithTimeout(context.Background(), time.Second*30) defer done() diff --git a/internal/config/stream_reader.go b/internal/config/stream_reader.go index 673a0583f..603b4ca18 100644 --- a/internal/config/stream_reader.go +++ b/internal/config/stream_reader.go @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) { // TriggerStreamUpdate attempts to re-read a stream configuration file, and // trigger the provided stream update func. -func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error { +func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error { if r.streamUpdateFn == nil { return nil } @@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path return err } mgr.Logger().Info("Updated stream %v config from file.", info.id) + + if successReloadCount != nil { + *successReloadCount = *successReloadCount + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount) + } return nil } diff --git a/internal/config/watcher.go b/internal/config/watcher.go index d0be14862..1f2482b98 100644 --- a/internal/config/watcher.go +++ b/internal/config/watcher.go @@ -68,7 +68,7 @@ func (r *Reader) modifiedSinceLastRead(name string) bool { // WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be // called before this, as otherwise it is unsafe to register them during // watching. -func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error { +func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, successReloadCount *int) error { if r.watcher != nil { return errors.New("a file watcher has already been started") } @@ -102,9 +102,11 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error } refreshFiles := func() error { + mgr.Logger().Info("Inside the Refresh Files") if !r.streamsMode && r.mainPath != "" { if _, err := r.fs.Stat(r.mainPath); err == nil { if err := addNotWatching([]string{r.mainPath}); err != nil { + mgr.Logger().Error("addNotWatching Error") return err } } @@ -173,13 +175,14 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error } var succeeded bool if nameClean == r.mainPath { - succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath)) + succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, successReloadCount)) } else if _, exists := r.streamFileInfo[nameClean]; exists { - succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean)) + succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, successReloadCount)) } else { - succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean)) + succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, successReloadCount)) } if succeeded { + mgr.Logger().Info("This is the collaps changes %v", collapsedChanges) delete(collapsedChanges, nameClean) } else { change.at = time.Now() diff --git a/internal/config/watcher_test.go b/internal/config/watcher_test.go index 11ae49455..cc135ba6d 100644 --- a/internal/config/watcher_test.go +++ b/internal/config/watcher_test.go @@ -48,7 +48,7 @@ output: // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) // Overwrite original config require.NoError(t, os.WriteFile(confFilePath, dummyConfig, 0o644)) @@ -91,16 +91,19 @@ output: changeChan := make(chan struct{}) var updatedConf stream.Config + var once sync.Once require.NoError(t, rdr.SubscribeConfigChanges(func(conf *Type) error { updatedConf = conf.Config - close(changeChan) + once.Do(func() { + close(changeChan) + }) return nil })) // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) // Create a new config folder and place in it a new copy of the config file newConfDir := filepath.Join(rootDir, "config_new") @@ -184,7 +187,7 @@ func TestReaderStreamDirectWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -268,7 +271,7 @@ func TestReaderStreamWildcardWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -352,7 +355,7 @@ func TestReaderStreamDirWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -443,7 +446,7 @@ func TestReaderWatcherRace(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) for i := 0; i < 2; i++ { // Wait for the config watcher to reload each config @@ -523,7 +526,7 @@ processor_resources: // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, procConfig("a", "a2"), 0o644)) require.NoError(t, os.WriteFile(confBPath, procConfig("b", "b2"), 0o644)) diff --git a/internal/impl/io/input_http_server_test.go b/internal/impl/io/input_http_server_test.go index bc98acd11..f7c8ffc59 100644 --- a/internal/impl/io/input_http_server_test.go +++ b/internal/impl/io/input_http_server_test.go @@ -221,8 +221,11 @@ func TestHTTPServerLifecycle(t *testing.T) { apiConf.Enabled = true testURL := fmt.Sprintf("http://localhost:%v/foo/bar", freePort) - - apiImpl, err := api.New("", "", apiConf, nil, log.Noop(), metrics.Noop()) + configMetadata := api.ConfigMetadata{ + WholeConf: nil, + SuccessReloadCount: nil, + } + apiImpl, err := api.New("", "", apiConf, configMetadata, log.Noop(), metrics.Noop()) require.NoError(t, err) go func() { diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index b272eb41e..0fcb886d9 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -880,7 +880,11 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { sanitConf.DocsProvider = env _ = s.configSpec.SanitiseYAML(&sanitNode, sanitConf) } - if apiType, err = api.New("", "", s.http, sanitNode, logger, stats); err != nil { + configMetadata := api.ConfigMetadata{ + WholeConf: sanitNode, + SuccessReloadCount: nil, + } + if apiType, err = api.New("", "", s.http, configMetadata, logger, stats); err != nil { return nil, fmt.Errorf("unable to create stream HTTP server due to: %w. Tip: you can disable the server with `http.enabled` set to `false`, or override the configured server with SetHTTPMux", err) } apiMut = apiType