Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions changelog/fragments/1779225000-heartbeat-api-monitor-type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
kind: feature

summary: Add new `api` monitor type for synthetics multi-step API journeys.

description: |
Heartbeat now recognizes `monitor.type: api`, a new synthetics-driven
monitor type that runs multi-step API checks via Playwright's
`APIRequestContext` without launching Chromium. API monitors share the
synthexec runtime with browser monitors but skip browser-only CLI
flags (`--sandbox`, `--screenshots`, `--throttling`) and emit network
events to a dedicated `synthetics.api.network` dataset. Requires
`@elastic/synthetics >= <TBD>`, which introduces the `apiJourney(...)`
DSL.

component: heartbeat

# pr: https://github.com/elastic/beats/pull/<TBD>
# issue: https://github.com/elastic/synthetics/pull/997
4 changes: 3 additions & 1 deletion heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,18 @@ type Scheduler struct {
func DefaultConfig() *Config {
limits := map[string]*JobLimit{
"browser": {Limit: 2},
"api": {Limit: 2},
}

// Read the env key SYNTHETICS_LIMIT_{TYPE} for each type of monitor to set scaling limits
// hard coded list of types to avoid cycles in current plugin system.
// TODO: refactor plugin system to DRY this up
for _, t := range []string{"http", "tcp", "icmp", "browser"} {
for _, t := range []string{"http", "tcp", "icmp", "browser", "api"} {
envKey := fmt.Sprintf("SYNTHETICS_LIMIT_%s", strings.ToUpper(t))
if limitStr := os.Getenv(envKey); limitStr != "" {
tLimitVal, err := strconv.ParseInt(limitStr, 10, 64)
if err != nil {
//nolint:forbidigo // DefaultConfig is called before a beat-scoped logger is available; matches the pre-existing pattern used elsewhere in this package.
logp.L().Warnf("Could not parse job limit env var %s with value '%s' as integer", envKey, limitStr)
continue
}
Expand Down
19 changes: 12 additions & 7 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,16 @@ type FactoryParams struct {

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(fp FactoryParams) *RunnerFactory {
// RunnerFactory predates per-beat loggers being threaded through FactoryParams.
// Fall back to the global logger here; matches the pre-existing pattern in this package.
logger := logp.L() //nolint:forbidigo // see comment above
return &RunnerFactory{
info: fp.BeatInfo,
addTask: fp.AddTask,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: fp.PluginsReg,
logger: logp.L(),
logger: logger,
pipelineClientFactory: fp.PipelineClientFactory,
beatLocation: fp.BeatRunFrom,
stateLoader: fp.StateLoader,
Expand Down Expand Up @@ -318,6 +321,7 @@ func preProcessors(info beat.Info, location *config.LocationWithID, settings pub
geoM, err := util.GeoConfigToMap(location.Geo)
if err != nil {
geoErrOnce.Do(func() {
//nolint:forbidigo // preProcessors is called from configuration reload paths without a contextual logger; matches the pre-existing pattern.
logp.L().Warnf("could not add heartbeat geo info: %v", err)
})
}
Expand All @@ -332,12 +336,12 @@ func preProcessors(info beat.Info, location *config.LocationWithID, settings pub
procs.AddProcessor(addfields.NewAddFields(obsFields, true, true))
}

// always use synthetics data streams for browser monitors, there is no good reason not to
// the default `heartbeat` data stream won't split out network and screenshot data.
// at some point we should make all monitors use the `synthetics` datastreams and retire
// the heartbeat one, but browser is the only beta one, and it would be a breaking change
// to do so otherwise.
if monitorType == "browser" && settings.DataStream == nil {
// always use synthetics data streams for synthetics-driven monitors (browser, api),
// there is no good reason not to. The default `heartbeat` data stream won't split
// out network / screenshot / API-network sub-streams. At some point we should make
// all monitors use the `synthetics` datastreams and retire the heartbeat one, but
// that would be a breaking change for the lightweight types.
if stdfields.IsSyntheticsType(monitorType) && settings.DataStream == nil {
settings.DataStream = &add_data_stream.DataStream{}
}

Expand All @@ -354,6 +358,7 @@ func preProcessors(info beat.Info, location *config.LocationWithID, settings pub
}

if !settings.Index.IsEmpty() {
//nolint:forbidigo // preProcessors is called from configuration reload paths without a contextual logger; matches the pre-existing pattern.
logp.L().Warn("Deprecated use of 'index' setting in heartbeat monitor, use 'data_stream' instead!")
proc, err := indexProcessor(&settings.Index, info)
if err != nil {
Expand Down
49 changes: 35 additions & 14 deletions heartbeat/monitors/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -73,30 +74,33 @@ func getLogger() *logp.Logger {
defer mtx.Unlock()

if eventLogger == nil {
return SetLogger(logp.L())
// Fall back to the global logger if SetLogger was never called.
// Tests and production paths call SetLogger during initialization;
// this branch only protects against early calls or misconfiguration.
return SetLogger(logp.L()) //nolint:forbidigo // intentional fallback before SetLogger is called
}

return eventLogger
}

func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
errors := []error{}
monitorID, err := event.GetValue("monitor.id")
monitorIDIface, err := event.GetValue("monitor.id")
if err != nil {
errors = append(errors, fmt.Errorf("could not extract monitor.id: %w", err))
}

durationUs, err := event.GetValue("monitor.duration.us")
durationUsIface, err := event.GetValue("monitor.duration.us")
if err != nil {
durationUs = int64(0)
durationUsIface = int64(0)
}

monType, err := event.GetValue("monitor.type")
monTypeIface, err := event.GetValue("monitor.type")
if err != nil {
errors = append(errors, fmt.Errorf("could not extract monitor.type: %w", err))
}

status, err := event.GetValue("monitor.status")
statusIface, err := event.GetValue("monitor.status")
if err != nil {
errors = append(errors, fmt.Errorf("could not extract monitor.status: %w", err))
}
Expand All @@ -112,18 +116,34 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
}
}

monitorID, ok := monitorIDIface.(string)
if !ok {
errors = append(errors, fmt.Errorf("monitor.id is not a string, got %T", monitorIDIface))
}
monType, ok := monTypeIface.(string)
if !ok {
errors = append(errors, fmt.Errorf("monitor.type is not a string, got %T", monTypeIface))
}
durationUs, ok := durationUsIface.(int64)
if !ok {
errors = append(errors, fmt.Errorf("monitor.duration.us is not an int64, got %T", durationUsIface))
}
status, ok := statusIface.(string)
if !ok {
errors = append(errors, fmt.Errorf("monitor.status is not a string, got %T", statusIface))
}

if len(errors) > 0 {
return nil, fmt.Errorf("logErrors: %+v", errors)
}

networkInfo := extractNetworkInfo(event, monType.(string))
monitor := MonitorRunInfo{
MonitorID: monitorID.(string),
Type: monType.(string),
Duration: durationUs.(int64),
Status: status.(string),
MonitorID: monitorID,
Type: monType,
Duration: durationUs,
Status: status,
Attempt: attempt,
NetworkInfo: networkInfo,
NetworkInfo: extractNetworkInfo(event, monType),
}

sc, _ := event.Meta.GetValue(META_STEP_COUNT)
Expand All @@ -136,8 +156,9 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
}

func extractNetworkInfo(event *beat.Event, monitorType string) NetworkInfo {
// Only relevant for lightweight monitors
if monitorType == "browser" {
// Only relevant for lightweight monitors. Synthetics-driven monitors
// (browser, api) emit their own network_info events via synthexec.
if stdfields.IsSyntheticsType(monitorType) {
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions heartbeat/monitors/stdfields/stdfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ type StdMonitorFields struct {
BadConfig bool
}

// IsSyntheticsType reports whether the monitor type is one that runs via the
// embedded synthetics Node.js agent (multi-step journeys) rather than via a
// Go-side lightweight check. Today this is `browser` and `api`.
func (s StdMonitorFields) IsSyntheticsType() bool {
return IsSyntheticsType(s.Type)
}

// IsSyntheticsType is the package-level variant of (StdMonitorFields).IsSyntheticsType
// for callers that only have the raw monitor type string.
func IsSyntheticsType(monitorType string) bool {
return monitorType == "browser" || monitorType == "api"
}

func ConfigToStdMonitorFields(conf *config.C) (StdMonitorFields, error) {
sFields := StdMonitorFields{Enabled: true, MaxAttempts: 1}

Expand Down
20 changes: 20 additions & 0 deletions heartbeat/monitors/stdfields/stdfields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,23 @@ func TestLegacyServiceNameConfig(t *testing.T) {
}

}

func TestIsSyntheticsType(t *testing.T) {
cases := []struct {
monitorType string
want bool
}{
{"browser", true},
{"api", true},
{"http", false},
{"tcp", false},
{"icmp", false},
{"", false},
}
for _, c := range cases {
t.Run(c.monitorType, func(t *testing.T) {
require.Equal(t, c.want, IsSyntheticsType(c.monitorType))
require.Equal(t, c.want, StdMonitorFields{Type: c.monitorType}.IsSyntheticsType())
})
}
}
6 changes: 3 additions & 3 deletions heartbeat/monitors/wrappers/summarizer/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// this summary.
type Summarizer struct {
rootJob jobs.Job
contsRemaining uint16
contsRemaining int
mtx *sync.Mutex
sf stdfields.StdMonitorFields
mst *monitorstate.Tracker
Expand Down Expand Up @@ -93,7 +93,7 @@ func NewSummarizer(rootJob jobs.Job, sf stdfields.StdMonitorFields, mst *monitor
func (s *Summarizer) setupPlugins() {
// ssp must appear before Err plugin since
// it intercepts errors
if s.sf.Type == "browser" {
if s.sf.IsSyntheticsType() {
s.plugins = []SummarizerPlugin{
DropBrowserExtraEvents{},
&BrowserDurationPlugin{},
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job {

s.contsRemaining-- // we just ran one cont, discount it
// these many still need to be processed
s.contsRemaining += uint16(len(conts))
s.contsRemaining += len(conts)

for _, plugin := range s.plugins {
actions := plugin.EachEvent(event, eventErr)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/wrappers/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func WrapCommon(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, stateLoader monitorstate.StateLoader) []jobs.Job {
mst := monitorstate.NewTracker(stateLoader, false)
var wrapped []jobs.Job
if stdMonFields.Type != "browser" || stdMonFields.BadConfig {
if !stdMonFields.IsSyntheticsType() || stdMonFields.BadConfig {
wrapped = WrapLightweight(js, stdMonFields, mst)
} else {
wrapped = WrapBrowser(js, stdMonFields, mst)
Expand Down
1 change: 1 addition & 0 deletions x-pack/heartbeat/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ package cmd
// Imports cmd directly and skips main, import all required plugins
// here to have them bundled together
import (
_ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/api"
_ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser"
)
1 change: 1 addition & 0 deletions x-pack/heartbeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions x-pack/heartbeat/monitors/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//go:build linux || darwin || synthetics

// Package api registers the `api` monitor type. API monitors run the
// embedded synthetics Node.js agent — same pipeline as `browser` — but
// never launch Chromium. They drive multi-step API checks via
// Playwright's `APIRequestContext`. The bulk of the implementation lives
// in the sibling `browser` package; this file is a thin registration
// shim plus an `api`-specific environment gate.
package api

import (
"fmt"
"syscall"

"github.com/elastic/elastic-agent-libs/config"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/security"
"github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser"
)

func init() {
plugin.Register("api", create, "synthetics/api")
}

func create(name string, cfg *config.C) (p plugin.Plugin, err error) {
// API journeys still run a Node.js child process, so the setuid
// constraint that applies to browser monitors applies here too.
// They do NOT require GUI libraries, so we deliberately skip the
// `ELASTIC_SYNTHETICS_CAPABLE` env gate from the browser plugin.
if syscall.Geteuid() == 0 && security.NodeChildProcCred == nil {
return plugin.Plugin{}, fmt.Errorf("api monitors cannot be run as root")
}

sj, err := browser.NewSourceJob(cfg)
if err != nil {
return plugin.Plugin{}, err
}

return sj.Plugin(), nil
}
12 changes: 12 additions & 0 deletions x-pack/heartbeat/monitors/browser/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Config struct {
Params map[string]interface{} `config:"params"`
RawConfig *config.C
Source *source.Source `config:"source"`
// Type carries the monitor type ("browser" or "api") so the sourcejob
// can shape the CLI invocation accordingly. Populated from the monitor
// config's top-level `type` field.
Type string `config:"type"`
// Name is optional for lightweight checks but required for browsers
Name string `config:"name"`
// Id is optional for lightweight checks but required for browsers
Expand All @@ -44,6 +48,14 @@ type Config struct {
Timeout time.Duration `config:"timeout"`
}

// IsAPI reports whether this config is for the `api` monitor type. API
// journeys reuse the same project/inline source pipeline as browser
// journeys but never launch Chromium, so a handful of browser-only CLI
// flags are filtered out before invoking the synthetics agent.
func (c *Config) IsAPI() bool {
return c.Type == "api"
}

var ErrNameRequired = fmt.Errorf("config 'name' must be specified for this monitor")
var ErrIdRequired = fmt.Errorf("config 'id' must be specified for this monitor")
var ErrSourceRequired = fmt.Errorf("config 'source' must be specified for this monitor, if upgrading from a previous experimental version please see our new config docs")
Expand Down
Loading
Loading