From 40d19e0ba7cf954d22ba3a562a092cdc31ae1104 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Mon, 22 Jun 2026 13:13:45 -0600 Subject: [PATCH] fix(circleci): skip HTTP 500 on workflow/job collectors and filter by time range * Skip 500 (corrupt CircleCI Server records) alongside 404 in AfterResponse hook so a single bad pipeline does not abort the entire subtask * Apply SyncPolicy.TimeAfter to workflow and job DB iterators on full sync to avoid calling the API for every historical tool-layer row Signed-off-by: Joshua Smith --- .../plugins/circleci/tasks/job_collector.go | 10 +-- backend/plugins/circleci/tasks/shared.go | 13 ++-- backend/plugins/circleci/tasks/shared_test.go | 62 +++++++++++++++++++ .../circleci/tasks/workflow_collector.go | 10 +-- 4 files changed, 83 insertions(+), 12 deletions(-) create mode 100644 backend/plugins/circleci/tasks/shared_test.go diff --git a/backend/plugins/circleci/tasks/job_collector.go b/backend/plugins/circleci/tasks/job_collector.go index 00fd234524f..ca158c1b934 100644 --- a/backend/plugins/circleci/tasks/job_collector.go +++ b/backend/plugins/circleci/tasks/job_collector.go @@ -73,8 +73,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug), } - if isIncremental { - clauses = append(clauses, dal.Where("created_date > ?", createdAfter)) + // Incremental: workflows newer than last successful collectJobs. + // Full sync: workflows within SyncPolicy.TimeAfter window. + if createdAfter != nil { + clauses = append(clauses, dal.Where("created_date >= ?", createdAfter)) } db := taskCtx.GetDal() @@ -88,7 +90,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", Query: BuildQueryParamsWithPageToken, ResponseParser: ParseCircleciPageTokenResp, - AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted + AfterResponse: ignoreDeletedOrBrokenBuilds, }, GetCreated: func(item json.RawMessage) (time.Time, errors.Error) { var job struct { // Individual job response lacks created_at field, so have to use started_at @@ -105,7 +107,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow Query: BuildQueryParamsWithPageToken, ResponseParser: ParseCircleciPageTokenResp, - AfterResponse: ignoreDeletedBuilds, + AfterResponse: ignoreDeletedOrBrokenBuilds, }, BuildInputIterator: func() (api.Iterator, errors.Error) { db := taskCtx.GetDal() diff --git a/backend/plugins/circleci/tasks/shared.go b/backend/plugins/circleci/tasks/shared.go index 6e235ecd6b0..d6bbee8120f 100644 --- a/backend/plugins/circleci/tasks/shared.go +++ b/backend/plugins/circleci/tasks/shared.go @@ -123,10 +123,15 @@ func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.E return data.Items, err } -func ignoreDeletedBuilds(res *http.Response) errors.Error { - // CircleCI API will return a 404 response for a workflow/job that has been deleted - // due to their data retention policy. We should ignore these errors. - if res.StatusCode == http.StatusNotFound { +// ignoreDeletedOrBrokenBuilds skips per-item API failures that should not +// abort an entire collector subtask. 404 means the resource was deleted +// (retention); 500 means the CircleCI Server record is corrupt/stuck +// (e.g. pipeline exists but its /workflow endpoint errors). +func ignoreDeletedOrBrokenBuilds(res *http.Response) errors.Error { + switch res.StatusCode { + case http.StatusNotFound: + return api.ErrIgnoreAndContinue + case http.StatusInternalServerError: return api.ErrIgnoreAndContinue } return nil diff --git a/backend/plugins/circleci/tasks/shared_test.go b/backend/plugins/circleci/tasks/shared_test.go new file mode 100644 index 00000000000..a6c55e9c9c7 --- /dev/null +++ b/backend/plugins/circleci/tasks/shared_test.go @@ -0,0 +1,62 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "bytes" + "io" + "net/http" + "testing" + + "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/stretchr/testify/assert" +) + +func makeResponse(statusCode int) *http.Response { + return &http.Response{ + StatusCode: statusCode, + Body: io.NopCloser(bytes.NewBufferString("")), + Request: &http.Request{}, + } +} + +func TestIgnoreDeletedOrBrokenBuilds(t *testing.T) { + tests := []struct { + name string + statusCode int + want error + }{ + {"404 returns ErrIgnoreAndContinue", http.StatusNotFound, api.ErrIgnoreAndContinue}, + {"500 returns ErrIgnoreAndContinue", http.StatusInternalServerError, api.ErrIgnoreAndContinue}, + {"200 returns nil", http.StatusOK, nil}, + {"403 returns nil", http.StatusForbidden, nil}, + {"502 returns nil", http.StatusBadGateway, nil}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := makeResponse(tt.statusCode) + got := ignoreDeletedOrBrokenBuilds(res) + if tt.want == nil { + assert.Nil(t, got) + } else { + assert.Equal(t, tt.want, got) + } + }) + } +} + diff --git a/backend/plugins/circleci/tasks/workflow_collector.go b/backend/plugins/circleci/tasks/workflow_collector.go index f0f3aebe5c9..25c5c727d36 100644 --- a/backend/plugins/circleci/tasks/workflow_collector.go +++ b/backend/plugins/circleci/tasks/workflow_collector.go @@ -60,8 +60,10 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error { dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug), } - if isIncremental { - clauses = append(clauses, dal.Where("created_date > ?", createdAfter)) + // Incremental: pipelines newer than last successful collectWorkflows. + // Full sync: pipelines within SyncPolicy.TimeAfter window. + if createdAfter != nil { + clauses = append(clauses, dal.Where("created_date >= ?", createdAfter)) } db := taskCtx.GetDal() @@ -75,7 +77,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error { UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow", Query: BuildQueryParamsWithPageToken, ResponseParser: ParseCircleciPageTokenResp, - AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted + AfterResponse: ignoreDeletedOrBrokenBuilds, }, GetCreated: extractCreatedAt, }, @@ -88,7 +90,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error { err := api.UnmarshalResponse(res, &data) return []json.RawMessage{data}, err }, - AfterResponse: ignoreDeletedBuilds, + AfterResponse: ignoreDeletedOrBrokenBuilds, }, BuildInputIterator: func() (api.Iterator, errors.Error) { clauses := []dal.Clause{