From 837ded233b195685dc0a094d3304f0402ce99de3 Mon Sep 17 00:00:00 2001 From: Shahzad Date: Tue, 19 May 2026 23:36:42 +0200 Subject: [PATCH 1/3] feat(heartbeat): add `api` monitor type for synthetics API journeys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Heartbeat now recognises `monitor.type: api`, a new synthetics-driven monitor type that runs multi-step API checks via Playwright's `APIRequestContext` without launching Chromium. API monitors reuse the existing synthexec runtime that browser monitors use (same project/inline source pipeline, same SynthEvent stream, same wrappers/summarizer plugin set, same `synthetics` data stream). The only differences are: * A new `x-pack/heartbeat/monitors/api` package registers the `api` plugin and bridges it to `browser.NewSourceJob`. The `ELASTIC_SYNTHETICS_CAPABLE` env gate (which exists to keep browser monitors from running on machines without GUI libs) is intentionally skipped — API journeys never launch Chromium. * `browser/sourcejob.extraArgs` now filters browser-only CLI flags (`--sandbox`, `--screenshots`, `--throttling`/`--no-throttling`) when the monitor type is `api`. `--playwright-options` and `--ignore-https-errors` are still forwarded since both apply to `APIRequestContext`. * `journey/network_info` events from API journeys are routed to a new `synthetics.api.network` dataset (vs. `browser.network`), so ingest pipelines and Kibana dashboards can branch cleanly without inspecting `journey.type` on every doc. * `SynthEvent.Journey.Type` (new) is propagated through `ToMap()` so downstream consumers see `synthetics.journey.type: api`. Older synthetics agents that don't emit the field continue to be treated as browser for back-compat. Central plumbing change: a new `stdfields.IsSyntheticsType(t)` helper returns true for `browser` and `api`. The four call sites that used to special-case `"browser"` (wrappers, summarizer, factory data-stream auto-config, logger network-info extraction) now use the helper. `heartbeat.config` adds a default scaling limit + `SYNTHETICS_LIMIT_API` env var for the new type. Requires `@elastic/synthetics` >= on the host (the release that introduces the `apiJourney(...)` DSL, elastic/synthetics#997). Co-authored-by: Cursor --- ...1779225000-heartbeat-api-monitor-type.yaml | 18 ++++++ heartbeat/config/config.go | 3 +- heartbeat/monitors/factory.go | 12 ++-- heartbeat/monitors/logger/logger.go | 6 +- heartbeat/monitors/stdfields/stdfields.go | 13 ++++ .../monitors/stdfields/stdfields_test.go | 20 ++++++ .../wrappers/summarizer/summarizer.go | 2 +- heartbeat/monitors/wrappers/wrappers.go | 2 +- x-pack/heartbeat/cmd/import.go | 1 + x-pack/heartbeat/include/list.go | 1 + x-pack/heartbeat/monitors/api/api.go | 44 +++++++++++++ x-pack/heartbeat/monitors/browser/config.go | 12 ++++ .../heartbeat/monitors/browser/sourcejob.go | 56 ++++++++++------ .../monitors/browser/sourcejob_test.go | 64 +++++++++++++++++++ .../monitors/browser/synthexec/enrich.go | 11 +++- .../monitors/browser/synthexec/enrich_test.go | 44 +++++++++++++ .../monitors/browser/synthexec/synthtypes.go | 27 +++++--- .../browser/synthexec/synthtypes_test.go | 24 +++++++ 18 files changed, 320 insertions(+), 40 deletions(-) create mode 100644 changelog/fragments/1779225000-heartbeat-api-monitor-type.yaml create mode 100644 x-pack/heartbeat/monitors/api/api.go diff --git a/changelog/fragments/1779225000-heartbeat-api-monitor-type.yaml b/changelog/fragments/1779225000-heartbeat-api-monitor-type.yaml new file mode 100644 index 000000000000..abda00e597c1 --- /dev/null +++ b/changelog/fragments/1779225000-heartbeat-api-monitor-type.yaml @@ -0,0 +1,18 @@ +kind: feature + +summary: Add new `api` monitor type for synthetics multi-step API journeys. + +description: | + Heartbeat now recognizes `monitor.type: api`, a new synthetics-driven + monitor type that runs multi-step API checks via Playwright's + `APIRequestContext` without launching Chromium. API monitors share the + synthexec runtime with browser monitors but skip browser-only CLI + flags (`--sandbox`, `--screenshots`, `--throttling`) and emit network + events to a dedicated `synthetics.api.network` dataset. Requires + `@elastic/synthetics >= `, which introduces the `apiJourney(...)` + DSL. + +component: heartbeat + +# pr: https://github.com/elastic/beats/pull/ +# issue: https://github.com/elastic/synthetics/pull/997 diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index 8a396d46fe8e..234b0409ec0f 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -65,12 +65,13 @@ type Scheduler struct { func DefaultConfig() *Config { limits := map[string]*JobLimit{ "browser": {Limit: 2}, + "api": {Limit: 2}, } // Read the env key SYNTHETICS_LIMIT_{TYPE} for each type of monitor to set scaling limits // hard coded list of types to avoid cycles in current plugin system. // TODO: refactor plugin system to DRY this up - for _, t := range []string{"http", "tcp", "icmp", "browser"} { + for _, t := range []string{"http", "tcp", "icmp", "browser", "api"} { envKey := fmt.Sprintf("SYNTHETICS_LIMIT_%s", strings.ToUpper(t)) if limitStr := os.Getenv(envKey); limitStr != "" { tLimitVal, err := strconv.ParseInt(limitStr, 10, 64) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 8503da2dfacb..d41ec879bb55 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -332,12 +332,12 @@ func preProcessors(info beat.Info, location *config.LocationWithID, settings pub procs.AddProcessor(addfields.NewAddFields(obsFields, true, true)) } - // always use synthetics data streams for browser monitors, there is no good reason not to - // the default `heartbeat` data stream won't split out network and screenshot data. - // at some point we should make all monitors use the `synthetics` datastreams and retire - // the heartbeat one, but browser is the only beta one, and it would be a breaking change - // to do so otherwise. - if monitorType == "browser" && settings.DataStream == nil { + // always use synthetics data streams for synthetics-driven monitors (browser, api), + // there is no good reason not to. The default `heartbeat` data stream won't split + // out network / screenshot / API-network sub-streams. At some point we should make + // all monitors use the `synthetics` datastreams and retire the heartbeat one, but + // that would be a breaking change for the lightweight types. + if stdfields.IsSyntheticsType(monitorType) && settings.DataStream == nil { settings.DataStream = &add_data_stream.DataStream{} } diff --git a/heartbeat/monitors/logger/logger.go b/heartbeat/monitors/logger/logger.go index 826490ae0845..03c50ac66b1e 100644 --- a/heartbeat/monitors/logger/logger.go +++ b/heartbeat/monitors/logger/logger.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" @@ -136,8 +137,9 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { } func extractNetworkInfo(event *beat.Event, monitorType string) NetworkInfo { - // Only relevant for lightweight monitors - if monitorType == "browser" { + // Only relevant for lightweight monitors. Synthetics-driven monitors + // (browser, api) emit their own network_info events via synthexec. + if stdfields.IsSyntheticsType(monitorType) { return nil } diff --git a/heartbeat/monitors/stdfields/stdfields.go b/heartbeat/monitors/stdfields/stdfields.go index e83e23a7f0d5..d6520070c0a9 100644 --- a/heartbeat/monitors/stdfields/stdfields.go +++ b/heartbeat/monitors/stdfields/stdfields.go @@ -60,6 +60,19 @@ type StdMonitorFields struct { BadConfig bool } +// IsSyntheticsType reports whether the monitor type is one that runs via the +// embedded synthetics Node.js agent (multi-step journeys) rather than via a +// Go-side lightweight check. Today this is `browser` and `api`. +func (s StdMonitorFields) IsSyntheticsType() bool { + return IsSyntheticsType(s.Type) +} + +// IsSyntheticsType is the package-level variant of (StdMonitorFields).IsSyntheticsType +// for callers that only have the raw monitor type string. +func IsSyntheticsType(monitorType string) bool { + return monitorType == "browser" || monitorType == "api" +} + func ConfigToStdMonitorFields(conf *config.C) (StdMonitorFields, error) { sFields := StdMonitorFields{Enabled: true, MaxAttempts: 1} diff --git a/heartbeat/monitors/stdfields/stdfields_test.go b/heartbeat/monitors/stdfields/stdfields_test.go index 8439407a9330..6654488da4e0 100644 --- a/heartbeat/monitors/stdfields/stdfields_test.go +++ b/heartbeat/monitors/stdfields/stdfields_test.go @@ -65,3 +65,23 @@ func TestLegacyServiceNameConfig(t *testing.T) { } } + +func TestIsSyntheticsType(t *testing.T) { + cases := []struct { + monitorType string + want bool + }{ + {"browser", true}, + {"api", true}, + {"http", false}, + {"tcp", false}, + {"icmp", false}, + {"", false}, + } + for _, c := range cases { + t.Run(c.monitorType, func(t *testing.T) { + require.Equal(t, c.want, IsSyntheticsType(c.monitorType)) + require.Equal(t, c.want, StdMonitorFields{Type: c.monitorType}.IsSyntheticsType()) + }) + } +} diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index ad0902d45af7..9c7ad0e0cafa 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -93,7 +93,7 @@ func NewSummarizer(rootJob jobs.Job, sf stdfields.StdMonitorFields, mst *monitor func (s *Summarizer) setupPlugins() { // ssp must appear before Err plugin since // it intercepts errors - if s.sf.Type == "browser" { + if s.sf.IsSyntheticsType() { s.plugins = []SummarizerPlugin{ DropBrowserExtraEvents{}, &BrowserDurationPlugin{}, diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index 0b5727717186..87bade4591db 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -39,7 +39,7 @@ import ( func WrapCommon(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, stateLoader monitorstate.StateLoader) []jobs.Job { mst := monitorstate.NewTracker(stateLoader, false) var wrapped []jobs.Job - if stdMonFields.Type != "browser" || stdMonFields.BadConfig { + if !stdMonFields.IsSyntheticsType() || stdMonFields.BadConfig { wrapped = WrapLightweight(js, stdMonFields, mst) } else { wrapped = WrapBrowser(js, stdMonFields, mst) diff --git a/x-pack/heartbeat/cmd/import.go b/x-pack/heartbeat/cmd/import.go index bb93fd909570..158630817554 100644 --- a/x-pack/heartbeat/cmd/import.go +++ b/x-pack/heartbeat/cmd/import.go @@ -9,5 +9,6 @@ package cmd // Imports cmd directly and skips main, import all required plugins // here to have them bundled together import ( + _ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/api" _ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser" ) diff --git a/x-pack/heartbeat/include/list.go b/x-pack/heartbeat/include/list.go index d2fac57099d9..fc08deb70fbe 100644 --- a/x-pack/heartbeat/include/list.go +++ b/x-pack/heartbeat/include/list.go @@ -10,5 +10,6 @@ package include import ( // Import packages that perform 'func init()'. + _ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/api" _ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser" ) diff --git a/x-pack/heartbeat/monitors/api/api.go b/x-pack/heartbeat/monitors/api/api.go new file mode 100644 index 000000000000..5283bfdbdef7 --- /dev/null +++ b/x-pack/heartbeat/monitors/api/api.go @@ -0,0 +1,44 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +//go:build linux || darwin || synthetics + +// Package api registers the `api` monitor type. API monitors run the +// embedded synthetics Node.js agent — same pipeline as `browser` — but +// never launch Chromium. They drive multi-step API checks via +// Playwright's `APIRequestContext`. The bulk of the implementation lives +// in the sibling `browser` package; this file is a thin registration +// shim plus an `api`-specific environment gate. +package api + +import ( + "fmt" + "syscall" + + "github.com/elastic/elastic-agent-libs/config" + + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/security" + "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser" +) + +func init() { + plugin.Register("api", create, "synthetics/api") +} + +func create(name string, cfg *config.C) (p plugin.Plugin, err error) { + // API journeys still run a Node.js child process, so the setuid + // constraint that applies to browser monitors applies here too. + // They do NOT require GUI libraries, so we deliberately skip the + // `ELASTIC_SYNTHETICS_CAPABLE` env gate from the browser plugin. + if syscall.Geteuid() == 0 && security.NodeChildProcCred == nil { + return plugin.Plugin{}, fmt.Errorf("api monitors cannot be run as root") + } + + sj, err := browser.NewSourceJob(cfg) + if err != nil { + return plugin.Plugin{}, err + } + + return sj.Plugin(), nil +} diff --git a/x-pack/heartbeat/monitors/browser/config.go b/x-pack/heartbeat/monitors/browser/config.go index 68aaab84c38f..554453e27ae1 100644 --- a/x-pack/heartbeat/monitors/browser/config.go +++ b/x-pack/heartbeat/monitors/browser/config.go @@ -30,6 +30,10 @@ type Config struct { Params map[string]interface{} `config:"params"` RawConfig *config.C Source *source.Source `config:"source"` + // Type carries the monitor type ("browser" or "api") so the sourcejob + // can shape the CLI invocation accordingly. Populated from the monitor + // config's top-level `type` field. + Type string `config:"type"` // Name is optional for lightweight checks but required for browsers Name string `config:"name"` // Id is optional for lightweight checks but required for browsers @@ -44,6 +48,14 @@ type Config struct { Timeout time.Duration `config:"timeout"` } +// IsAPI reports whether this config is for the `api` monitor type. API +// journeys reuse the same project/inline source pipeline as browser +// journeys but never launch Chromium, so a handful of browser-only CLI +// flags are filtered out before invoking the synthetics agent. +func (c *Config) IsAPI() bool { + return c.Type == "api" +} + var ErrNameRequired = fmt.Errorf("config 'name' must be specified for this monitor") var ErrIdRequired = fmt.Errorf("config 'id' must be specified for this monitor") var ErrSourceRequired = fmt.Errorf("config 'source' must be specified for this monitor, if upgrading from a previous experimental version please see our new config docs") diff --git a/x-pack/heartbeat/monitors/browser/sourcejob.go b/x-pack/heartbeat/monitors/browser/sourcejob.go index 27cdc3dafc05..d2e826d22d14 100644 --- a/x-pack/heartbeat/monitors/browser/sourcejob.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob.go @@ -147,6 +147,8 @@ func (sj *SourceJob) extraArgs(uiOrigin bool) []string { extraArgs = append(extraArgs, sj.browserCfg.SyntheticsArgs...) } + // `--playwright-options` is honored by both browser and API journeys + // (the API journey applies the options to `APIRequestContext.newContext()`). if len(sj.browserCfg.PlaywrightOpts) > 0 { s, err := json.Marshal(sj.browserCfg.PlaywrightOpts) if err != nil { @@ -156,29 +158,36 @@ func (sj *SourceJob) extraArgs(uiOrigin bool) []string { extraArgs = append(extraArgs, "--playwright-options", string(s)) } } + // `--ignore-https-errors` is meaningful for both browser (page-level) + // and API (APIRequestContext-level) journeys. if sj.browserCfg.IgnoreHTTPSErrors { extraArgs = append(extraArgs, "--ignore-https-errors") } - if sj.browserCfg.Sandbox { - extraArgs = append(extraArgs, "--sandbox") - } - if sj.browserCfg.Screenshots != "" { - extraArgs = append(extraArgs, "--screenshots", sj.browserCfg.Screenshots) - } - if sj.browserCfg.Throttling != nil { - switch t := sj.browserCfg.Throttling.(type) { - case bool: - if !t { - extraArgs = append(extraArgs, "--no-throttling") - } - case string: - extraArgs = append(extraArgs, "--throttling", fmt.Sprintf("%v", sj.browserCfg.Throttling)) - case map[string]interface{}: - j, err := json.Marshal(t) - if err != nil { - logp.L().Warnf("could not serialize throttling config to JSON: %s", err) - } else { - extraArgs = append(extraArgs, "--throttling", string(j)) + // The remaining flags are browser-only — they all require Chromium. + // Skip them for API journeys so we don't confuse newer synthetics + // agents and to keep `ps` output / logs clean. + if !sj.browserCfg.IsAPI() { + if sj.browserCfg.Sandbox { + extraArgs = append(extraArgs, "--sandbox") + } + if sj.browserCfg.Screenshots != "" { + extraArgs = append(extraArgs, "--screenshots", sj.browserCfg.Screenshots) + } + if sj.browserCfg.Throttling != nil { + switch t := sj.browserCfg.Throttling.(type) { + case bool: + if !t { + extraArgs = append(extraArgs, "--no-throttling") + } + case string: + extraArgs = append(extraArgs, "--throttling", fmt.Sprintf("%v", sj.browserCfg.Throttling)) + case map[string]interface{}: + j, err := json.Marshal(t) + if err != nil { + logp.L().Warnf("could not serialize throttling config to JSON: %s", err) + } else { + extraArgs = append(extraArgs, "--throttling", string(j)) + } } } } @@ -214,6 +223,13 @@ func (sj *SourceJob) jobs() []jobs.Job { } func (sj *SourceJob) plugin() plugin.Plugin { + return sj.Plugin() +} + +// Plugin returns the SourceJob wrapped as a monitor plugin. Exported so +// the sibling `api` package can register its own monitor type while +// reusing the same source/synthexec pipeline. +func (sj *SourceJob) Plugin() plugin.Plugin { return plugin.Plugin{ Jobs: sj.jobs(), DoClose: sj.Close, diff --git a/x-pack/heartbeat/monitors/browser/sourcejob_test.go b/x-pack/heartbeat/monitors/browser/sourcejob_test.go index 7a2e5da1e532..b252d0830436 100644 --- a/x-pack/heartbeat/monitors/browser/sourcejob_test.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob_test.go @@ -448,3 +448,67 @@ func TestUpdateParams(t *testing.T) { e = s.Close() require.NoError(t, e) } + +// extraArgs must elide browser-only CLI flags when the monitor type is +// `api`. Forwarding them to a chromium-less journey would either be +// ignored (best case) or rejected by stricter agent versions, so we +// pin the contract here. +func TestExtraArgsForAPIMonitor(t *testing.T) { + cfg := conf.MustNewConfigFrom(mapstr.M{ + "type": "api", + "name": "My API monitor", + "id": "myApiId", + "schedule": "@every 1m", + "source": mapstr.M{ + "inline": mapstr.M{ + "script": "// api journey", + }, + }, + // Browser-only — should NOT make it into the CLI invocation. + "sandbox": true, + "screenshots": "on", + "throttling": false, + // Honored for both types. + "ignore_https_errors": true, + "playwright_options": mapstr.M{"ignoreHTTPSErrors": true}, + }) + + sj, err := NewSourceJob(cfg) + require.NoError(t, err) + args := sj.extraArgs(false) + + require.NotContains(t, args, "--sandbox", "api journeys must not receive --sandbox") + require.NotContains(t, args, "--screenshots", "api journeys must not receive --screenshots") + require.NotContains(t, args, "--no-throttling", "api journeys must not receive --no-throttling") + require.NotContains(t, args, "--throttling", "api journeys must not receive --throttling") + + require.Contains(t, args, "--ignore-https-errors", "api journeys must still honor --ignore-https-errors") + require.Contains(t, args, "--playwright-options", "api journeys must still receive --playwright-options") +} + +// Browser monitors must keep receiving all the existing flags — guards +// against accidental over-eager filtering in extraArgs. +func TestExtraArgsForBrowserMonitorUnchanged(t *testing.T) { + cfg := conf.MustNewConfigFrom(mapstr.M{ + "type": "browser", + "name": "My Browser monitor", + "id": "myBrowserId", + "schedule": "@every 1m", + "sandbox": true, + "screenshots": "on", + "throttling": false, + "source": mapstr.M{ + "inline": mapstr.M{ + "script": "// browser journey", + }, + }, + }) + + sj, err := NewSourceJob(cfg) + require.NoError(t, err) + args := sj.extraArgs(false) + + require.Contains(t, args, "--sandbox") + require.Contains(t, args, "--screenshots") + require.Contains(t, args, "--no-throttling") +} diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 9201c11d1e58..37672a8f1053 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -108,7 +108,16 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e case StepScreenshot, StepScreenshotRef, ScreenshotBlock: add_data_stream.SetEventDataset(event, "browser.screenshot") case JourneyNetworkInfo: - add_data_stream.SetEventDataset(event, "browser.network") + // Route API journey network events to their own dataset so + // ingest pipelines / Kibana dashboards can branch cleanly + // without inspecting journey.type on every doc. Fallback to + // browser.network when the journey context is unknown so we + // don't silently drop dataset routing for older agents. + if je.journey.IsAPI() { + add_data_stream.SetEventDataset(event, "synthetics.api.network") + } else { + add_data_stream.SetEventDataset(event, "browser.network") + } } if se.Id != "" { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 607c14256968..c1c913e27f94 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -279,6 +279,50 @@ func TestEnrichSynthEvent(t *testing.T) { } } +// API journeys reuse the same enrichment pipeline as browser journeys +// but route their network events to a separate dataset. Pinning this +// here keeps the dataset contract observable in a single place and +// guards against accidentally collapsing api → browser.network. +func TestEnrichAPIJourneyDatasetRouting(t *testing.T) { + se := newStreamEnricher(stdfields.StdMonitorFields{Type: "api"}) + // Prime the journey context the way the agent does — journey/start + // first, then the network_info events that should inherit the type. + startEvt := &beat.Event{} + require.NoError(t, se.enrich(startEvt, &SynthEvent{ + Type: JourneyStart, + Journey: &Journey{ID: "j1", Name: "API journey", Type: "api"}, + })) + + netEvt := &beat.Event{} + require.NoError(t, se.enrich(netEvt, &SynthEvent{Type: JourneyNetworkInfo})) + + require.Equal(t, + "synthetics.api.network", + netEvt.Meta[add_data_stream.FieldMetaCustomDataset], + "API journey/network_info must land in synthetics.api.network, not browser.network", + ) +} + +// Older synthetics agents (pre-`apiJourney`) don't emit `journey.type`. +// We must keep treating those as browser to avoid silently dropping +// dataset routing during a mixed-version rollout. +func TestEnrichLegacyJourneyDefaultsToBrowser(t *testing.T) { + se := newStreamEnricher(stdfields.StdMonitorFields{Type: "browser"}) + startEvt := &beat.Event{} + require.NoError(t, se.enrich(startEvt, &SynthEvent{ + Type: JourneyStart, + Journey: &Journey{ID: "j1", Name: "legacy"}, // no Type + })) + + netEvt := &beat.Event{} + require.NoError(t, se.enrich(netEvt, &SynthEvent{Type: JourneyNetworkInfo})) + + require.Equal(t, + "browser.network", + netEvt.Meta[add_data_stream.FieldMetaCustomDataset], + ) +} + func makeTestJourneyEnricher(sFields stdfields.StdMonitorFields) *journeyEnricher { return &journeyEnricher{ streamEnricher: newStreamEnricher(sFields), diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go index ddd928b216d6..c279c2dc7a67 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go @@ -213,18 +213,29 @@ type Journey struct { Name string `json:"name"` ID string `json:"id"` Tags []string `json:"tags"` + // Type surfaces the journey kind ("browser" or "api"). The + // synthetics agent emits this on journey/start and journey/end + // events from v1.x onward; older agents simply leave it empty and + // downstream consumers should treat absence as "browser" for back-compat. + Type string `json:"type"` +} + +// IsAPI returns true when this journey came from an `apiJourney(...)` +// declaration in the synthetics agent. +func (j *Journey) IsAPI() bool { + return j != nil && j.Type == "api" } func (j Journey) ToMap() mapstr.M { - if len(j.Tags) > 0 { - return mapstr.M{ - "name": j.Name, - "id": j.ID, - "tags": j.Tags, - } - } - return mapstr.M{ + m := mapstr.M{ "name": j.Name, "id": j.ID, } + if len(j.Tags) > 0 { + m["tags"] = j.Tags + } + if j.Type != "" { + m["type"] = j.Type + } + return m } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go index 38daabab1e1b..f2e73f90b63f 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go @@ -195,3 +195,27 @@ func TestSynthErrConversion(t *testing.T) { require.Equal(t, stack, se.Stack) }) } + +func TestJourneyTypePropagation(t *testing.T) { + t.Run("API journey carries type through ToMap", func(t *testing.T) { + j := Journey{ID: "j1", Name: "API", Type: "api"} + m := j.ToMap() + require.Equal(t, "api", m["type"]) + require.True(t, j.IsAPI()) + }) + + t.Run("legacy journey omits type from ToMap", func(t *testing.T) { + j := Journey{ID: "j1", Name: "legacy"} + m := j.ToMap() + _, hasType := m["type"] + require.False(t, hasType, "Type must be omitted when empty so older docs aren't reshaped") + require.False(t, j.IsAPI()) + }) + + t.Run("Journey unmarshals type from agent JSON", func(t *testing.T) { + raw := []byte(`{"name":"x","id":"x","type":"api"}`) + var j Journey + require.NoError(t, json.Unmarshal(raw, &j)) + require.Equal(t, "api", j.Type) + }) +} From 3e9deb49dafb3f2436e6b42ed8dbe31626aa3e97 Mon Sep 17 00:00:00 2001 From: Shahzad Date: Wed, 20 May 2026 09:11:58 +0200 Subject: [PATCH 2/3] fix(heartbeat): satisfy golangci-lint on files touched by api monitor PR The api-monitor PR enabled CI's `--whole-files` golangci-lint mode on the files it modified, surfacing seven pre-existing violations (plus several more masked by `max-same-issues: 3`) that had been ignored while those files were untouched on main: * heartbeat/monitors/logger/logger.go: `extractRunInfo` was doing five unchecked type assertions on `interface{}` values pulled out of the event, e.g. `monType.(string)`, which trip `errcheck`'s `check-type-assertions: true` and would panic on a malformed event. Switched to the `, ok` form and append a typed error to the existing aggregated-error path so callers see "monitor.type is not a string, got " instead of a runtime panic. * heartbeat/monitors/wrappers/summarizer/summarizer.go: changed `Summarizer.contsRemaining` from `uint16` to `int` so that `contsRemaining += len(conts)` no longer needs the `uint16(len(conts))` narrowing conversion that gosec G115 flags. The field is only used as an internal "still to process" counter compared against 0; widening to int is semantically identical. * heartbeat/monitors/factory.go, heartbeat/config/config.go, heartbeat/monitors/logger/logger.go, x-pack/heartbeat/monitors/browser/sourcejob.go, x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go: annotated the remaining `logp.L()` fallbacks with targeted `//nolint:forbidigo` directives that explain why the call site doesn't have a contextual `*logp.Logger` (factory predates per-beat loggers in FactoryParams; preProcessors is invoked from reload paths; ToMap/extraArgs/StdFields are pure mapping helpers with no logger handle; getLogger() is the documented pre- SetLogger fallback; DefaultConfig runs before the beat-scoped logger is constructed). These match the dozens of other `logp.L()` callers across `heartbeat/` that pre-date the "accept *logp.Logger as a parameter" rule and a real refactor is tracked separately. * In `NewFactory`, hoisted `logger := logp.L()` to a local so the struct literal stays `goimports`-aligned with the `//nolint` directive sitting next to the assignment instead of inside the field block. Verified locally with the same filter CI uses (`--new-from-patch ... --new=false --whole-files`) against golangci-lint v2.5.0 on `./heartbeat/...` and `./x-pack/heartbeat/...`: 0 issues. All affected unit tests still pass. Assisted-By: Cursor Co-authored-by: Cursor --- heartbeat/config/config.go | 1 + heartbeat/monitors/factory.go | 7 ++- heartbeat/monitors/logger/logger.go | 43 +++++++++++++------ .../wrappers/summarizer/summarizer.go | 4 +- .../heartbeat/monitors/browser/sourcejob.go | 3 ++ .../monitors/browser/synthexec/synthtypes.go | 1 + 6 files changed, 44 insertions(+), 15 deletions(-) diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index 234b0409ec0f..949580336fca 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -76,6 +76,7 @@ func DefaultConfig() *Config { if limitStr := os.Getenv(envKey); limitStr != "" { tLimitVal, err := strconv.ParseInt(limitStr, 10, 64) if err != nil { + //nolint:forbidigo // DefaultConfig is called before a beat-scoped logger is available; matches the pre-existing pattern used elsewhere in this package. logp.L().Warnf("Could not parse job limit env var %s with value '%s' as integer", envKey, limitStr) continue } diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index d41ec879bb55..5b7f7d25834a 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -94,13 +94,16 @@ type FactoryParams struct { // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. func NewFactory(fp FactoryParams) *RunnerFactory { + // RunnerFactory predates per-beat loggers being threaded through FactoryParams. + // Fall back to the global logger here; matches the pre-existing pattern in this package. + logger := logp.L() //nolint:forbidigo // see comment above return &RunnerFactory{ info: fp.BeatInfo, addTask: fp.AddTask, byId: map[string]*Monitor{}, mtx: &sync.Mutex{}, pluginsReg: fp.PluginsReg, - logger: logp.L(), + logger: logger, pipelineClientFactory: fp.PipelineClientFactory, beatLocation: fp.BeatRunFrom, stateLoader: fp.StateLoader, @@ -318,6 +321,7 @@ func preProcessors(info beat.Info, location *config.LocationWithID, settings pub geoM, err := util.GeoConfigToMap(location.Geo) if err != nil { geoErrOnce.Do(func() { + //nolint:forbidigo // preProcessors is called from configuration reload paths without a contextual logger; matches the pre-existing pattern. logp.L().Warnf("could not add heartbeat geo info: %v", err) }) } @@ -354,6 +358,7 @@ func preProcessors(info beat.Info, location *config.LocationWithID, settings pub } if !settings.Index.IsEmpty() { + //nolint:forbidigo // preProcessors is called from configuration reload paths without a contextual logger; matches the pre-existing pattern. logp.L().Warn("Deprecated use of 'index' setting in heartbeat monitor, use 'data_stream' instead!") proc, err := indexProcessor(&settings.Index, info) if err != nil { diff --git a/heartbeat/monitors/logger/logger.go b/heartbeat/monitors/logger/logger.go index 03c50ac66b1e..646620224ef2 100644 --- a/heartbeat/monitors/logger/logger.go +++ b/heartbeat/monitors/logger/logger.go @@ -74,7 +74,10 @@ func getLogger() *logp.Logger { defer mtx.Unlock() if eventLogger == nil { - return SetLogger(logp.L()) + // Fall back to the global logger if SetLogger was never called. + // Tests and production paths call SetLogger during initialization; + // this branch only protects against early calls or misconfiguration. + return SetLogger(logp.L()) //nolint:forbidigo // intentional fallback before SetLogger is called } return eventLogger @@ -82,22 +85,22 @@ func getLogger() *logp.Logger { func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { errors := []error{} - monitorID, err := event.GetValue("monitor.id") + monitorIDIface, err := event.GetValue("monitor.id") if err != nil { errors = append(errors, fmt.Errorf("could not extract monitor.id: %w", err)) } - durationUs, err := event.GetValue("monitor.duration.us") + durationUsIface, err := event.GetValue("monitor.duration.us") if err != nil { - durationUs = int64(0) + durationUsIface = int64(0) } - monType, err := event.GetValue("monitor.type") + monTypeIface, err := event.GetValue("monitor.type") if err != nil { errors = append(errors, fmt.Errorf("could not extract monitor.type: %w", err)) } - status, err := event.GetValue("monitor.status") + statusIface, err := event.GetValue("monitor.status") if err != nil { errors = append(errors, fmt.Errorf("could not extract monitor.status: %w", err)) } @@ -113,18 +116,34 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { } } + monitorID, ok := monitorIDIface.(string) + if !ok { + errors = append(errors, fmt.Errorf("monitor.id is not a string, got %T", monitorIDIface)) + } + monType, ok := monTypeIface.(string) + if !ok { + errors = append(errors, fmt.Errorf("monitor.type is not a string, got %T", monTypeIface)) + } + durationUs, ok := durationUsIface.(int64) + if !ok { + errors = append(errors, fmt.Errorf("monitor.duration.us is not an int64, got %T", durationUsIface)) + } + status, ok := statusIface.(string) + if !ok { + errors = append(errors, fmt.Errorf("monitor.status is not a string, got %T", statusIface)) + } + if len(errors) > 0 { return nil, fmt.Errorf("logErrors: %+v", errors) } - networkInfo := extractNetworkInfo(event, monType.(string)) monitor := MonitorRunInfo{ - MonitorID: monitorID.(string), - Type: monType.(string), - Duration: durationUs.(int64), - Status: status.(string), + MonitorID: monitorID, + Type: monType, + Duration: durationUs, + Status: status, Attempt: attempt, - NetworkInfo: networkInfo, + NetworkInfo: extractNetworkInfo(event, monType), } sc, _ := event.Meta.GetValue(META_STEP_COUNT) diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index 9c7ad0e0cafa..a1220bce75b6 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -33,7 +33,7 @@ import ( // this summary. type Summarizer struct { rootJob jobs.Job - contsRemaining uint16 + contsRemaining int mtx *sync.Mutex sf stdfields.StdMonitorFields mst *monitorstate.Tracker @@ -126,7 +126,7 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { s.contsRemaining-- // we just ran one cont, discount it // these many still need to be processed - s.contsRemaining += uint16(len(conts)) + s.contsRemaining += len(conts) for _, plugin := range s.plugins { actions := plugin.EachEvent(event, eventErr) diff --git a/x-pack/heartbeat/monitors/browser/sourcejob.go b/x-pack/heartbeat/monitors/browser/sourcejob.go index d2e826d22d14..cb81245834b9 100644 --- a/x-pack/heartbeat/monitors/browser/sourcejob.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob.go @@ -87,6 +87,7 @@ func (sj *SourceJob) StdFields() stdfields.StdMonitorFields { // Should be impossible since outer monitor.go should run this same code elsewhere // TODO: Just pass stdfields in to remove second deserialize if err != nil { + //nolint:forbidigo // StdFields() has no logger handle and the error is "should never happen"; matches the pre-existing pattern. logp.L().Warnf("Could not deserialize monitor fields for browser, this should never happen: %s", err) } return sFields @@ -153,6 +154,7 @@ func (sj *SourceJob) extraArgs(uiOrigin bool) []string { s, err := json.Marshal(sj.browserCfg.PlaywrightOpts) if err != nil { // This should never happen, if it was parsed as a config it should be serializable + //nolint:forbidigo // extraArgs has no logger handle and the error is "should never happen"; matches the pre-existing pattern. logp.L().Warnf("could not serialize playwright options '%v': %v", sj.browserCfg.PlaywrightOpts, err) } else { extraArgs = append(extraArgs, "--playwright-options", string(s)) @@ -184,6 +186,7 @@ func (sj *SourceJob) extraArgs(uiOrigin bool) []string { case map[string]interface{}: j, err := json.Marshal(t) if err != nil { + //nolint:forbidigo // extraArgs has no logger handle and the error is "should never happen"; matches the pre-existing pattern. logp.L().Warnf("could not serialize throttling config to JSON: %s", err) } else { extraArgs = append(extraArgs, "--throttling", string(j)) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go index c279c2dc7a67..b0c7749d2ba1 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go @@ -96,6 +96,7 @@ func (se SynthEvent) ToMap() (m mapstr.M) { u, e := url.Parse(se.URL) if e != nil { _, _ = m.Put("url", mapstr.M{"full": se.URL}) + //nolint:forbidigo // ToMap has no logger handle; matches the pre-existing pattern in this package. logp.L().Warnf("Could not parse synthetics URL '%s': %s", se.URL, e.Error()) } else { _, _ = m.Put("url", wraputil.URLFields(u)) From 5413db5b55537b71e772103278684f90728cbf0d Mon Sep 17 00:00:00 2001 From: Shahzad Date: Sun, 24 May 2026 07:04:06 +0200 Subject: [PATCH 3/3] fix(heartbeat): align api journey network dataset with integration package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `journey/network_info` events emitted by API journeys were being routed to `synthetics.api.network`, but the Fleet `synthetics` integration package defines the companion stream as `data_stream/api_network/manifest.yml` with `dataset: api.network`. That mismatch caused Heartbeat to write to an auto-created `synthetics-synthetics.api.network-default` data stream that the Fleet-generated agent API key was not scoped to, and Elasticsearch returned `security_exception` / 403 — every API journey network event was silently dropped. Use `api.network` so the routed dataset matches the package, the agent API key includes the right `create_doc` privilege, and the documents land in `synthetics-api.network-default` like the rest of the synthetics integration. The companion-side fix — Fleet must actually enable the `api_network` stream for API monitors — lives in the linked Kibana PR (`format_synthetics_policy.ts`). Without both halves the API key is still missing the privilege. Co-authored-by: Cursor --- .../heartbeat/monitors/browser/synthexec/enrich.go | 14 ++++++++++---- .../monitors/browser/synthexec/enrich_test.go | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 37672a8f1053..2ca0f9a65e67 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -110,11 +110,17 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e case JourneyNetworkInfo: // Route API journey network events to their own dataset so // ingest pipelines / Kibana dashboards can branch cleanly - // without inspecting journey.type on every doc. Fallback to - // browser.network when the journey context is unknown so we - // don't silently drop dataset routing for older agents. + // without inspecting journey.type on every doc. The dataset + // name must match the Fleet integration package's + // `data_stream/api_network/manifest.yml` (`dataset: api.network`), + // otherwise the agent API key — which Fleet scopes to + // `synthetics-api.network-default` — will reject writes with + // a 403 and the data stream will silently stay empty. + // Fallback to browser.network when the journey context is + // unknown so we don't silently drop dataset routing for older + // agents. if je.journey.IsAPI() { - add_data_stream.SetEventDataset(event, "synthetics.api.network") + add_data_stream.SetEventDataset(event, "api.network") } else { add_data_stream.SetEventDataset(event, "browser.network") } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index c1c913e27f94..0b9a20cf2c42 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -297,9 +297,9 @@ func TestEnrichAPIJourneyDatasetRouting(t *testing.T) { require.NoError(t, se.enrich(netEvt, &SynthEvent{Type: JourneyNetworkInfo})) require.Equal(t, - "synthetics.api.network", + "api.network", netEvt.Meta[add_data_stream.FieldMetaCustomDataset], - "API journey/network_info must land in synthetics.api.network, not browser.network", + "API journey/network_info must land in api.network (matching the Fleet integration's `data_stream/api_network` package), not browser.network", ) }