diff --git a/plugins/inputs/prometheus_http/prometheus_http.go b/plugins/inputs/prometheus_http/prometheus_http.go index 5393bada2c205..82a0647f314a1 100644 --- a/plugins/inputs/prometheus_http/prometheus_http.go +++ b/plugins/inputs/prometheus_http/prometheus_http.go @@ -10,6 +10,7 @@ import ( "math" "net" "net/http" + "io" "os" "path/filepath" "sort" @@ -574,6 +575,50 @@ func (p *PrometheusHttp) setMetrics(w *sync.WaitGroup, pm *PrometheusHttpMetric, if ds != nil { period := p.getMetricPeriod(pm) callback(ds.GetData(pm.Query, period, push)) + dsHttpV1, ok := ds.(*PrometheusHttpV1) + if ok { + dsHttpV1.processRetryQueue(w, push, callback) + } + } +} + +func (p *PrometheusHttpV1) processRetryQueue(wg *sync.WaitGroup, push func(time.Time, map[string]string, time.Time, float64), callback func(*PrometheusHttpDatasourceResponse, error) ) { // Best effort processing, will not report errors or stop processing new requests if they arise + defer wg.Done() + requestBuffer := make([]http.Request, 15) + for req := range p.retryQueue { + requestBuffer = append(requestBuffer, req) + if len(requestBuffer) == 15 { + requestBufferCopy := make([]http.Request, len(requestBuffer)) + copy(requestBufferCopy, requestBuffer) + + for _, v := range requestBufferCopy { + resp, _ := p.client.Do(&v) + data, _ := io.ReadAll(resp.Body) + + var res PrometheusHttpV1Response + err := json.Unmarshal(data, &res) + if err != nil || res.Status != "success" || res.Data == nil { + continue + } + + dr := &PrometheusHttpDatasourceResponse{} + switch res.Data.ResultType { + case "matrix": + p.processMatrix(&res, time.Now(), push) + case "vector": + p.processVector(&res, time.Now(), push) + default: + dr.process = time.Since(time.Now()) + } + + callback(dr, nil) + + time.Sleep(20*time.Millisecond) + resp.Body.Close() + } + time.Sleep(2 * time.Second) // Exponential backoff may be good to have + requestBuffer = make([]http.Request, 15) + } } } diff --git a/plugins/inputs/prometheus_http/prometheus_http_v1.go b/plugins/inputs/prometheus_http/prometheus_http_v1.go index ccb3d486d376b..60fcbf6fb4e4f 100644 --- a/plugins/inputs/prometheus_http/prometheus_http_v1.go +++ b/plugins/inputs/prometheus_http/prometheus_http_v1.go @@ -37,6 +37,7 @@ type PrometheusHttpV1 struct { log telegraf.Logger ctx context.Context client *http.Client + retryQueue chan http.Request name string url string user string @@ -74,6 +75,9 @@ func (p *PrometheusHttpV1) httpDoRequest(method, query string, params url.Values } resp, err := p.client.Do(req) + if resp.StatusCode == 429 { + p.retryQueue <- *req.Clone(ctx) + } if err != nil { return nil, 0, err } @@ -261,6 +265,7 @@ func NewPrometheusHttpV1(client *http.Client, name string, log telegraf.Logger, log: log, ctx: ctx, client: client, + retryQueue: make(chan http.Request, 100), url: url, user: user, password: password,