Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions comp/logs/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func BuildEndpointsWithConfig(coreConfig pkgconfigmodel.Reader, logsConfig *Logs
if logsDDURL, defined := logsConfig.logsDDURL(); defined {
haveHTTPProxy = strings.HasPrefix(logsDDURL, "http://") || strings.HasPrefix(logsDDURL, "https://")
}
if logsConfig.isForceHTTPUse() || haveHTTPProxy || logsConfig.obsPipelineWorkerEnabled() || (bool(httpConnectivity) && !(logsConfig.isForceTCPUse() || logsConfig.isSocks5ProxySet() || logsConfig.hasAdditionalEndpoints())) {
if logsConfig.isGRPCUse() || logsConfig.isForceHTTPUse() || haveHTTPProxy || logsConfig.obsPipelineWorkerEnabled() || (bool(httpConnectivity) && !(logsConfig.isForceTCPUse() || logsConfig.isSocks5ProxySet() || logsConfig.hasAdditionalEndpoints())) {
return BuildHTTPEndpointsWithConfig(coreConfig, logsConfig, endpointPrefix, intakeTrackType, intakeProtocol, intakeOrigin)
}
log.Warnf("You are currently sending Logs to Datadog through TCP (either because %s or %s is set or the HTTP connectivity test has failed) "+
Expand Down Expand Up @@ -373,7 +373,7 @@ func buildHTTPEndpoints(coreConfig pkgconfigmodel.Reader, logsConfig *LogsConfig
batchMaxContentSize := logsConfig.batchMaxContentSize()
inputChanSize := logsConfig.inputChanSize()

return NewEndpointsWithBatchSettings(main, additionals, false, true, batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize, inputChanSize), nil
return NewEndpointsWithBatchSettings(main, additionals, false, true, logsConfig.isGRPCUse(), batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize, inputChanSize), nil
}

type defaultParseAddressFunc func(string) (host string, port int, err error)
Expand Down Expand Up @@ -447,6 +447,11 @@ func TaggerWarmupDuration(coreConfig pkgconfigmodel.Reader) time.Duration {
return defaultLogsConfigKeys(coreConfig).taggerWarmupDuration()
}

// StreamLifetime returns the duration for gRPC stream lifetime before rotation.
func StreamLifetime(coreConfig pkgconfigmodel.Reader) time.Duration {
return defaultLogsConfigKeys(coreConfig).streamLifetime()
}

// AggregationTimeout is used when performing aggregation operations
func AggregationTimeout(coreConfig pkgconfigmodel.Reader) time.Duration {
return defaultLogsConfigKeys(coreConfig).aggregationTimeout()
Expand Down
14 changes: 14 additions & 0 deletions comp/logs/agent/config/config_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (l *LogsConfigKeys) isForceHTTPUse() bool {
l.getConfig().GetBool(l.getConfigKey("force_use_http"))
}

func (l *LogsConfigKeys) isGRPCUse() bool {
return l.getConfig().GetBool(l.getConfigKey("use_grpc"))
}

func (l *LogsConfigKeys) logsNoSSL() bool {
return l.getConfig().GetBool(l.getConfigKey("logs_no_ssl"))
}
Expand Down Expand Up @@ -292,6 +296,16 @@ func (l *LogsConfigKeys) senderRecoveryReset() bool {
return l.getConfig().GetBool(l.getConfigKey("sender_recovery_reset"))
}

func (l *LogsConfigKeys) streamLifetime() time.Duration {
key := l.getConfigKey("stream_lifetime")
streamLifetime := l.getConfig().GetInt(key)
if streamLifetime <= 0 {
log.Warnf("Invalid %s: %v should be > 0, fallback on %v", key, streamLifetime, pkgconfigsetup.DefaultLogsStreamLifetime)
return time.Duration(pkgconfigsetup.DefaultLogsStreamLifetime) * time.Second
}
return time.Duration(streamLifetime) * time.Second
}

// AggregationTimeout is used when performing aggregation operations
func (l *LogsConfigKeys) aggregationTimeout() time.Duration {
return l.getConfig().GetDuration(l.getConfigKey("aggregation_timeout")) * time.Millisecond
Expand Down
6 changes: 3 additions & 3 deletions comp/logs/agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsEnvVar() {
isReliable: true,
}

expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, false, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
Expand Down Expand Up @@ -414,7 +414,7 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsInConfig() {
isReliable: true,
}

expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, false, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
Expand Down Expand Up @@ -504,7 +504,7 @@ func (suite *ConfigTestSuite) TestMultipleHttpEndpointsInConfig2() {
isReliable: true,
}

expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
expectedEndpoints := NewEndpointsWithBatchSettings(expectedMainEndpoint, []Endpoint{expectedAdditionalEndpoint1, expectedAdditionalEndpoint2}, false, true, false, 1*time.Second, pkgconfigsetup.DefaultBatchMaxConcurrentSend, pkgconfigsetup.DefaultBatchMaxSize, pkgconfigsetup.DefaultBatchMaxContentSize, pkgconfigsetup.DefaultInputChanSize)
endpoints, err := BuildHTTPEndpoints(suite.config, "test-track", "test-proto", "test-source")

suite.Nil(err)
Expand Down
21 changes: 20 additions & 1 deletion comp/logs/agent/config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ type Endpoints struct {
Endpoints []Endpoint
UseProto bool
UseHTTP bool
UseGRPC bool
BatchWait time.Duration
BatchMaxConcurrentSend int
BatchMaxSize int
Expand All @@ -369,6 +370,23 @@ func NewEndpoints(main Endpoint, additionalEndpoints []Endpoint, useProto bool,
additionalEndpoints,
useProto,
useHTTP,
false, // useGRPC defaults to false for backward compatibility
pkgconfigsetup.DefaultBatchWait,
pkgconfigsetup.DefaultBatchMaxConcurrentSend,
pkgconfigsetup.DefaultBatchMaxSize,
pkgconfigsetup.DefaultBatchMaxContentSize,
pkgconfigsetup.DefaultInputChanSize,
)
}

// NewEndpointsWithGRPC returns a new endpoints composite with gRPC support
func NewEndpointsWithGRPC(main Endpoint, additionalEndpoints []Endpoint, useProto bool, useHTTP bool, useGRPC bool) *Endpoints {
return NewEndpointsWithBatchSettings(
main,
additionalEndpoints,
useProto,
useHTTP,
useGRPC,
pkgconfigsetup.DefaultBatchWait,
pkgconfigsetup.DefaultBatchMaxConcurrentSend,
pkgconfigsetup.DefaultBatchMaxSize,
Expand All @@ -378,12 +396,13 @@ func NewEndpoints(main Endpoint, additionalEndpoints []Endpoint, useProto bool,
}

// NewEndpointsWithBatchSettings returns a new endpoints composite with non-default batching settings specified
func NewEndpointsWithBatchSettings(main Endpoint, additionalEndpoints []Endpoint, useProto bool, useHTTP bool, batchWait time.Duration, batchMaxConcurrentSend int, batchMaxSize int, batchMaxContentSize int, inputChanSize int) *Endpoints {
func NewEndpointsWithBatchSettings(main Endpoint, additionalEndpoints []Endpoint, useProto bool, useHTTP bool, useGRPC bool, batchWait time.Duration, batchMaxConcurrentSend int, batchMaxSize int, batchMaxContentSize int, inputChanSize int) *Endpoints {
return &Endpoints{
Main: main,
Endpoints: append([]Endpoint{main}, additionalEndpoints...),
UseProto: useProto,
UseHTTP: useHTTP,
UseGRPC: useGRPC,
BatchWait: batchWait,
BatchMaxConcurrentSend: batchMaxConcurrentSend,
BatchMaxSize: batchMaxSize,
Expand Down
32 changes: 32 additions & 0 deletions comp/logs/agent/config/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ func (suite *EndpointsTestSuite) TestBuildEndpointsShouldSucceedWithValidHTTPCon
suite.Equal("agent-http-intake.logs.datadoghq.com.", endpoint.Host)
}

func (suite *EndpointsTestSuite) TestBuildEndpointsShouldSucceedWithValidGRPCConfig() {
var endpoints *Endpoints
var endpoint Endpoint
var err error

suite.config.SetWithoutSource("logs_config.use_grpc", true)

endpoints, err = BuildEndpoints(suite.config, HTTPConnectivityFailure, "test-track", "test-proto", "test-source")
suite.Nil(err)
suite.True(endpoints.UseGRPC)
suite.False(endpoints.UseHTTP)
suite.Equal(endpoints.BatchWait, 5*time.Second)

endpoint = endpoints.Main
suite.True(endpoint.UseSSL())
suite.Equal("agent-http-intake.logs.datadoghq.com.", endpoint.Host)
}

func (suite *EndpointsTestSuite) TestBuildEndpointsShouldSucceedWithValidHTTPConfigAndCompression() {
var endpoints *Endpoints
var endpoint Endpoint
Expand Down Expand Up @@ -259,6 +277,7 @@ func (suite *EndpointsTestSuite) TestBuildEndpointsShouldTakeIntoAccountHTTPConn
suite.config.SetWithoutSource("logs_config.force_use_tcp", "false")
suite.config.SetWithoutSource("logs_config.use_http", "false")
suite.config.SetWithoutSource("logs_config.force_use_http", "false")
suite.config.SetWithoutSource("logs_config.use_grpc", "false")
suite.config.SetWithoutSource("logs_config.socks5_proxy_address", "")
suite.config.SetWithoutSource("logs_config.additional_endpoints", []map[string]interface{}{})
}
Expand Down Expand Up @@ -329,6 +348,19 @@ func (suite *EndpointsTestSuite) TestBuildEndpointsShouldTakeIntoAccountHTTPConn
suite.config.SetWithoutSource("logs_config.socks5_proxy_address", "")
})

suite.Run("When use_grpc is true always create gRPC endpoints", func() {
defer resetHTTPConfigValuesToFalse()
suite.config.SetWithoutSource("logs_config.use_grpc", "true")
endpoints, err := BuildEndpoints(suite.config, HTTPConnectivitySuccess, "test-track", "test-proto", "test-source")
suite.Nil(err)
suite.True(endpoints.UseGRPC)
suite.False(endpoints.UseHTTP)
endpoints, err = BuildEndpoints(suite.config, HTTPConnectivityFailure, "test-track", "test-proto", "test-source")
suite.Nil(err)
suite.True(endpoints.UseGRPC)
suite.False(endpoints.UseHTTP)
})

suite.Run("When additional_endpoints is not empty always create TCP endpoints", func() {
defer resetHTTPConfigValuesToFalse()
suite.config.SetWithoutSource("logs_config.additional_endpoints", []map[string]interface{}{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ const (
// DefaultLogsSenderBackoffRecoveryInterval is the default logs sender backoff recovery interval
DefaultLogsSenderBackoffRecoveryInterval = 2

// DefaultLogsStreamLifetime is the default gRPC stream lifetime in seconds (15 minutes)
DefaultLogsStreamLifetime = 900

// maxExternalMetricsProviderChunkSize ensures batch queries are limited in size.
maxExternalMetricsProviderChunkSize = 35

Expand Down Expand Up @@ -2728,6 +2731,7 @@ func bindEnvAndSetLogsConfigKeys(config pkgconfigmodel.Setup, prefix string) {
config.BindEnvAndSetDefault(prefix+"sender_backoff_max", DefaultLogsSenderBackoffMax)
config.BindEnvAndSetDefault(prefix+"sender_recovery_interval", DefaultForwarderRecoveryInterval)
config.BindEnvAndSetDefault(prefix+"sender_recovery_reset", false)
config.BindEnvAndSetDefault(prefix+"stream_lifetime", DefaultLogsStreamLifetime)
config.BindEnvAndSetDefault(prefix+"use_v2_api", true)
config.SetKnown(prefix + "dev_mode_no_ssl") //nolint:forbidigo // TODO: replace by 'SetDefaultAndBindEnv'
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/logs/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/logs/sources"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo/statefulpb"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -38,6 +39,8 @@ type Payload struct {
Encoding string
// The size of the unencoded payload
UnencodedSize int
// Extra information for Stateful gRPC streaming (batch-level state changes)
StatefulExtra any
}

// NewPayload creates a new payload with the given message metadata, encoded content, encoding type and unencoded size
Expand Down Expand Up @@ -70,6 +73,13 @@ type Message struct {
MessageMetadata
}

// StatefulMessage represents a log message for gRPC stateful streaming
// It contains a Datum (from stateful_encoding.proto) and associated metadata
type StatefulMessage struct {
Datum *statefulpb.Datum
Metadata *MessageMetadata
}

// MessageMetadata contains metadata information about a log message
//
//nolint:revive // exported: ignore package name struct conflict
Expand Down Expand Up @@ -125,7 +135,7 @@ type MessageContent struct { //nolint:revive
content []byte
// structured content
structuredContent StructuredContent
State MessageContentState
State MessageContentState
}

// MessageContentState is used to represent the MessageContent state.
Expand Down
Loading
Loading