Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions comp/logs/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func BuildEndpointsWithConfig(coreConfig pkgconfigmodel.Reader, logsConfig *Logs
if logsDDURL, defined := logsConfig.logsDDURL(); defined {
haveHTTPProxy = strings.HasPrefix(logsDDURL, "http://") || strings.HasPrefix(logsDDURL, "https://")
}
if logsConfig.isForceHTTPUse() || haveHTTPProxy || logsConfig.obsPipelineWorkerEnabled() || (bool(httpConnectivity) && !(logsConfig.isForceTCPUse() || logsConfig.isSocks5ProxySet() || logsConfig.hasAdditionalEndpoints())) {
if logsConfig.isGRPCUse() || logsConfig.isForceHTTPUse() || haveHTTPProxy || logsConfig.obsPipelineWorkerEnabled() || (bool(httpConnectivity) && !(logsConfig.isForceTCPUse() || logsConfig.isSocks5ProxySet() || logsConfig.hasAdditionalEndpoints())) {
return BuildHTTPEndpointsWithConfig(coreConfig, logsConfig, endpointPrefix, intakeTrackType, intakeProtocol, intakeOrigin)
}
log.Warnf("You are currently sending Logs to Datadog through TCP (either because %s or %s is set or the HTTP connectivity test has failed) "+
Expand Down Expand Up @@ -373,7 +373,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig
batchMaxContentSize := logsConfig.batchMaxContentSize()
inputChanSize := logsConfig.inputChanSize()

return NewEndpointsWithBatchSettings(main, additionals, false, true, batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize, inputChanSize), nil
return NewEndpointsWithBatchSettings(main, additionals, false, true, logsConfig.isGRPCUse(), batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize, inputChanSize), nil
}

type defaultParseAddressFunc func(string) (host string, port int, err error)
Expand Down Expand Up @@ -447,6 +447,11 @@ func TaggerWarmupDuration(coreConfig pkgconfigmodel.Reader) time.Duration {
return defaultLogsConfigKeys(coreConfig).taggerWarmupDuration()
}

// StreamLifetime returns the duration for gRPC stream lifetime before rotation.
func StreamLifetime(coreConfig pkgconfigmodel.Reader) time.Duration {
return defaultLogsConfigKeys(coreConfig).streamLifetime()
}

// AggregationTimeout is used when performing aggregation operations
func AggregationTimeout(coreConfig pkgconfigmodel.Reader) time.Duration {
return defaultLogsConfigKeys(coreConfig).aggregationTimeout()
Expand Down
14 changes: 14 additions & 0 deletions comp/logs/agent/config/config_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (l *LogsConfigKeys) isForceHTTPUse() bool {
l.getConfig().GetBool(l.getConfigKey("force_use_http"))
}

func (l *LogsConfigKeys) isGRPCUse() bool {
return l.getConfig().GetBool(l.getConfigKey("use_grpc"))
}

func (l *LogsConfigKeys) logsNoSSL() bool {
return l.getConfig().GetBool(l.getConfigKey("logs_no_ssl"))
}
Expand Down Expand Up @@ -292,6 +296,16 @@ func (l *LogsConfigKeys) senderRecoveryReset() bool {
return l.getConfig().GetBool(l.getConfigKey("sender_recovery_reset"))
}

func (l *LogsConfigKeys) streamLifetime() time.Duration {
key := l.getConfigKey("stream_lifetime")
streamLifetime := l.getConfig().GetInt(key)
if streamLifetime <= 0 {
log.Warnf("Invalid %s: %v should be > 0, fallback on %v", key, streamLifetime, pkgconfigsetup.DefaultLogsStreamLifetime)
return time.Duration(pkgconfigsetup.DefaultLogsStreamLifetime) * time.Second
}
return time.Duration(streamLifetime) * time.Second
}

// AggregationTimeout is used when performing aggregation operations
func (l *LogsConfigKeys) aggregationTimeout() time.Duration {
return l.getConfig().GetDuration(l.getConfigKey("aggregation_timeout")) * time.Millisecond
Expand Down
6 changes: 3 additions & 3 deletions comp/logs/agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsEnvVar() {
isReliable: true,
}

expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, false, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
Expand Down Expand Up @@ -414,7 +414,7 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsInConfig() {
isReliable: true,
}

expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, false, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
Expand Down Expand Up @@ -504,7 +504,7 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsInConfig2() {
isReliable: true,
}

expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, false, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
Expand Down
21 changes: 20 additions & 1 deletion comp/logs/agent/config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ type Endpoints struct {
Endpoints []Endpoint
UseProto bool
UseHTTP bool
UseGRPC bool
BatchWait time.Duration
BatchMaxConcurrentSend int
BatchMaxSize int
Expand All @@ -369,6 +370,23 @@ func NewEndpoints(main Endpoint, additionalEndpoints []Endpoint, useProto bool,
additionalEndpoints,
useProto,
useHTTP,
false, // useGRPC defaults to false for backward compatibility
pkgconfigsetup.DefaultBatchWait,
pkgconfigsetup.DefaultBatchMaxConcurrentSend,
pkgconfigsetup.DefaultBatchMaxSize,
pkgconfigsetup.DefaultBatchMaxContentSize,
pkgconfigsetup.DefaultInputChanSize,
)
}

// NewEndpointsWithGRPC returns a new endpoints composite with gRPC support
func NewEndpointsWithGRPC(main Endpoint, additionalEndpoints []Endpoint, useProto bool, useHTTP bool, useGRPC bool) *Endpoints {
return NewEndpointsWithBatchSettings(
main,
additionalEndpoints,
useProto,
useHTTP,
useGRPC,
pkgconfigsetup.DefaultBatchWait,
pkgconfigsetup.DefaultBatchMaxConcurrentSend,
pkgconfigsetup.DefaultBatchMaxSize,
Expand All @@ -378,12 +396,13 @@ func NewEndpoints(main Endpoint, additionalEndpoints []Endpoint, useProto bool,
}

// NewEndpointsWithBatchSettings returns a new endpoints composite with non-default batching settings specified
func NewEndpointsWithBatchSettings(main Endpoint, additionalEndpoints []Endpoint, useProto bool, useHTTP bool, batchWait time.Duration, batchMaxConcurrentSend int, batchMaxSize int, batchMaxContentSize int, inputChanSize int) *Endpoints {
func NewEndpointsWithBatchSettings(main Endpoint, additionalEndpoints []Endpoint, useProto bool, useHTTP bool, useGRPC bool, batchWait time.Duration, batchMaxConcurrentSend int, batchMaxSize int, batchMaxContentSize int, inputChanSize int) *Endpoints {
return &Endpoints{
Main: main,
Endpoints: append([]Endpoint{main}, additionalEndpoints...),
UseProto: useProto,
UseHTTP: useHTTP,
UseGRPC: useGRPC,
BatchWait: batchWait,
BatchMaxConcurrentSend: batchMaxConcurrentSend,
BatchMaxSize: batchMaxSize,
Expand Down
32 changes: 32 additions & 0 deletions comp/logs/agent/config/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ func (suite *EndpointsTestSuite) TestBuildEndpointsShouldSucceedWithValidHTTPCon
suite.Equal("agent-http-intake.logs.datadoghq.com.", endpoint.Host)
}

func (suite *EndpointsTestSuite) TestBuildEndpointsShouldSucceedWithValidGRPCConfig() {
var endpoints *Endpoints
var endpoint Endpoint
var err error

suite.config.SetWithoutSource("logs_config.use_grpc", true)

endpoints, err = BuildEndpoints(suite.config, HTTPConnectivityFailure, "test-track", "test-proto", "test-source")
suite.Nil(err)
suite.True(endpoints.UseGRPC)
suite.False(endpoints.UseHTTP)
suite.Equal(endpoints.BatchWait, 5*time.Second)

endpoint = endpoints.Main
suite.True(endpoint.UseSSL())
suite.Equal("agent-http-intake.logs.datadoghq.com.", endpoint.Host)
}

func (suite *EndpointsTestSuite) TestBuildEndpointsShouldSucceedWithValidHTTPConfigAndCompression() {
var endpoints *Endpoints
var endpoint Endpoint
Expand Down Expand Up @@ -259,6 +277,7 @@ func (suite *EndpointsTestSuite) TestBuildEndpointsShouldTakeIntoAccountHTTPConn
suite.config.SetWithoutSource("logs_config.force_use_tcp", "false")
suite.config.SetWithoutSource("logs_config.use_http", "false")
suite.config.SetWithoutSource("logs_config.force_use_http", "false")
suite.config.SetWithoutSource("logs_config.use_grpc", "false")
suite.config.SetWithoutSource("logs_config.socks5_proxy_address", "")
suite.config.SetWithoutSource("logs_config.additional_endpoints", []map[string]interface{}{})
}
Expand Down Expand Up @@ -329,6 +348,19 @@ func (suite *EndpointsTestSuite) TestBuildEndpointsShouldTakeIntoAccountHTTPConn
suite.config.SetWithoutSource("logs_config.socks5_proxy_address", "")
})

suite.Run("When use_grpc is true always create gRPC endpoints", func() {
defer resetHTTPConfigValuesToFalse()
suite.config.SetWithoutSource("logs_config.use_grpc", "true")
endpoints, err := BuildEndpoints(suite.config, HTTPConnectivitySuccess, "test-track", "test-proto", "test-source")
suite.Nil(err)
suite.True(endpoints.UseGRPC)
suite.False(endpoints.UseHTTP)
endpoints, err = BuildEndpoints(suite.config, HTTPConnectivityFailure, "test-track", "test-proto", "test-source")
suite.Nil(err)
suite.True(endpoints.UseGRPC)
suite.False(endpoints.UseHTTP)
})

suite.Run("When additional_endpoints is not empty always create TCP endpoints", func() {
defer resetHTTPConfigValuesToFalse()
suite.config.SetWithoutSource("logs_config.additional_endpoints", []map[string]interface{}{
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ const (
// DefaultLogsSenderBackoffRecoveryInterval is the default logs sender backoff recovery interval
DefaultLogsSenderBackoffRecoveryInterval = 2

// DefaultLogsStreamLifetime is the default gRPC stream lifetime in seconds (15 minutes)
DefaultLogsStreamLifetime = 900

// maxExternalMetricsProviderChunkSize ensures batch queries are limited in size.
maxExternalMetricsProviderChunkSize = 35

Expand Down Expand Up @@ -2729,6 +2732,8 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) {
config.BindEnvAndSetDefault(prefix+"sender_recovery_interval", DefaultForwarderRecoveryInterval)
config.BindEnvAndSetDefault(prefix+"sender_recovery_reset", false)
config.BindEnvAndSetDefault(prefix+"use_v2_api", true)
config.BindEnvAndSetDefault(prefix+"use_grpc", false)
config.BindEnvAndSetDefault(prefix+"stream_lifetime", DefaultLogsStreamLifetime)
config.SetKnown(prefix + "dev_mode_no_ssl") //nolint:forbidigo // TODO: replace by 'SetDefaultAndBindEnv'
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/logs/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/logs/sources"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -38,6 +39,8 @@ type Payload struct {
Encoding string
// The size of the unencoded payload
UnencodedSize int
// Extra information for Stateful gRPC streaming (batch-level state changes)
StatefulExtra any
}

// NewPayload creates a new payload with the given message metadata, encoded content, encoding type and unencoded size
Expand Down Expand Up @@ -70,6 +73,13 @@ type Message struct {
MessageMetadata
}

// StatefulMessage represents a log message for gRPC stateful streaming
// It contains a Datum (from stateful_encoding.proto) and associated metadata
type StatefulMessage struct {
Datum *statefulpb.Datum
Metadata *MessageMetadata
}

// MessageMetadata contains metadata information about a log message
//
//nolint:revive // exported: ignore package name struct conflict
Expand Down
Loading
Loading