Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions backend/helpers/pluginhelper/api/graphql_async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package api
import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
"sync"
"time"

"github.com/merico-ai/graphql"
)
Expand All @@ -47,30 +49,52 @@ type GraphqlAsyncClient struct {
getRateCost func(q interface{}) int
}

// defaultRateLimitConst is the generic fallback rate limit for GraphQL requests.
// It is used as the initial remaining quota when dynamic rate limit
// information is unavailable from the provider.
const defaultRateLimitConst = 1000

// CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
func CreateAsyncGraphqlClient(
taskCtx plugin.TaskContext,
graphqlClient *graphql.Client,
logger log.Logger,
getRateRemaining func(context.Context, *graphql.Client, log.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error),
opts ...func(*GraphqlAsyncClient),
) (*GraphqlAsyncClient, errors.Error) {
ctxWithCancel, cancel := context.WithCancel(taskCtx.GetContext())

graphqlAsyncClient := &GraphqlAsyncClient{
ctx: ctxWithCancel,
cancel: cancel,
client: graphqlClient,
logger: logger,
rateExhaustCond: sync.NewCond(&sync.Mutex{}),
rateRemaining: 0,
rateRemaining: defaultRateLimitConst,
getRateRemaining: getRateRemaining,
}

// apply options
for _, opt := range opts {
opt(graphqlAsyncClient)
}

// Env config wins over everything, only if explicitly set
if rateLimit := resolveRateLimit(taskCtx, logger); rateLimit != -1 {
logger.Info("GRAPHQL_RATE_LIMIT env override applied: %d (was %d)", rateLimit, graphqlAsyncClient.rateRemaining)
graphqlAsyncClient.rateRemaining = rateLimit
}

if getRateRemaining != nil {
rateRemaining, resetAt, err := getRateRemaining(taskCtx.GetContext(), graphqlClient, logger)
if err != nil {
panic(err)
graphqlAsyncClient.logger.Info("failed to fetch initial graphql rate limit, fallback to default: %v", err)
graphqlAsyncClient.updateRateRemaining(graphqlAsyncClient.rateRemaining, nil)
} else {
graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
}
graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
} else {
graphqlAsyncClient.updateRateRemaining(graphqlAsyncClient.rateRemaining, nil)
}

// load retry/timeout from configuration
Expand Down Expand Up @@ -115,6 +139,10 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
apiClient.rateExhaustCond.Signal()
}
go func() {
if apiClient.getRateRemaining == nil {
return
}

nextDuring := 3 * time.Minute
if resetAt != nil && resetAt.After(time.Now()) {
nextDuring = time.Until(*resetAt)
Expand All @@ -126,7 +154,15 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
case <-time.After(nextDuring):
newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
if err != nil {
panic(err)
apiClient.logger.Info("failed to update graphql rate limit, will retry next cycle: %v", err)
// Floor the reused value so Signal() always fires; prevents deadlock when
// rateRemaining is 0 and the rate-limit endpoint keeps erroring (e.g. GHE).
fallback := apiClient.rateRemaining
if fallback < defaultRateLimitConst {
fallback = defaultRateLimitConst
}
apiClient.updateRateRemaining(fallback, nil)
return
}
apiClient.updateRateRemaining(newRateRemaining, newResetAt)
}
Expand Down Expand Up @@ -218,3 +254,25 @@ func (apiClient *GraphqlAsyncClient) Wait() {
func (apiClient *GraphqlAsyncClient) Release() {
apiClient.cancel()
}

// WithFallbackRateLimit sets the initial/fallback rate limit used when
// rate limit information cannot be fetched dynamically.
// This value may be overridden later by getRateRemaining.
func WithFallbackRateLimit(limit int) func(*GraphqlAsyncClient) {
return func(c *GraphqlAsyncClient) {
if limit > 0 {
c.rateRemaining = limit
}
}
}

// resolveRateLimit returns -1 if GRAPHQL_RATE_LIMIT is not set or invalid
func resolveRateLimit(taskCtx plugin.TaskContext, logger log.Logger) int {
if v := taskCtx.GetConfig("GRAPHQL_RATE_LIMIT"); v != "" {
if parsed, err := strconv.Atoi(v); err == nil {
return parsed
}
logger.Warn(nil, "invalid GRAPHQL_RATE_LIMIT, using default")
}
return -1
}
82 changes: 82 additions & 0 deletions backend/plugins/circleci/e2e/job_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"reflect"
"sort"
"testing"

"github.com/apache/incubator-devlake/helpers/e2ehelper"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/impl"
"github.com/apache/incubator-devlake/plugins/circleci/models"
"github.com/apache/incubator-devlake/plugins/circleci/tasks"
"github.com/stretchr/testify/assert"
)

// TestCircleciUnfinishedJobsInputIterator is a regression test for
// https://github.com/apache/devlake/issues/8907. The "collect unfinished job
// details" collector builds its URL from "/v2/workflow/{{ .Input.Id }}/job" while
// scanning rows into a models.CircleciJob. Its input query must therefore expose the
// workflow id in the row's Id field; a bare "DISTINCT workflow_id" left Id empty and
// produced "/v2/workflow//job" (HTTP 500). This test runs the production query
// (tasks.UnfinishedJobsInputClauses) through the real iterator and asserts each
// yielded row's Id is the workflow id, that results are DISTINCT, and that the
// status/connection filters hold.
func TestCircleciUnfinishedJobsInputIterator(t *testing.T) {
var circleci impl.Circleci
dataflowTester := e2ehelper.NewDataFlowTester(t, "circleci", circleci)

const projectSlug = "github/test/repo"
dataflowTester.FlushTabler(&models.CircleciJob{})

seed := []models.CircleciJob{
{ConnectionId: 1, WorkflowId: "wf-onhold", Id: "job-1", ProjectSlug: projectSlug, Status: "on_hold"},
{ConnectionId: 1, WorkflowId: "wf-onhold", Id: "job-2", ProjectSlug: projectSlug, Status: "running"}, // same workflow -> DISTINCT
{ConnectionId: 1, WorkflowId: "wf-queued", Id: "job-3", ProjectSlug: projectSlug, Status: "queued"},
{ConnectionId: 1, WorkflowId: "wf-success", Id: "job-4", ProjectSlug: projectSlug, Status: "success"}, // terminal -> excluded
{ConnectionId: 2, WorkflowId: "wf-otherconn", Id: "job-5", ProjectSlug: projectSlug, Status: "on_hold"}, // other connection -> excluded
}
for i := range seed {
assert.Nil(t, dataflowTester.Dal.Create(&seed[i]))
}

cursor, err := dataflowTester.Dal.Cursor(tasks.UnfinishedJobsInputClauses(1, projectSlug)...)
assert.Nil(t, err)
iter, err := api.NewDalCursorIterator(dataflowTester.Dal, cursor, reflect.TypeOf(models.CircleciJob{}))
assert.Nil(t, err)
defer iter.Close()

var ids []string
for iter.HasNext() {
item, err := iter.Fetch()
assert.Nil(t, err)
job := item.(*models.CircleciJob)
ids = append(ids, job.Id)
}
sort.Strings(ids)

// Distinct workflow ids for connection 1's non-terminal jobs, with Id populated
// (the URL template reads .Input.Id). wf-success (terminal) and wf-otherconn
// (connection 2) are excluded.
assert.Equal(t, []string{"wf-onhold", "wf-queued"}, ids)
for _, id := range ids {
assert.NotEmpty(t, id, "Input.Id must be the workflow id, not empty (#8907)")
}
}
22 changes: 15 additions & 7 deletions backend/plugins/circleci/tasks/job_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ var CollectJobsMeta = plugin.SubTaskMeta{
DomainTypes: []string{plugin.DOMAIN_TYPE_CICD},
}

// UnfinishedJobsInputClauses returns the DAL clauses that select the workflows whose
// jobs are still in a non-terminal status and therefore need their job details
// recollected by the CollectJobs "unfinished details" collector.
func UnfinishedJobsInputClauses(connectionId uint64, projectSlug string) []dal.Clause {
return []dal.Clause{
dal.Select("DISTINCT workflow_id AS id"), // #8907: alias to id so {{ .Input.Id }} resolves when scanned into CircleciJob
dal.From(&models.CircleciJob{}),
dal.Where(
"connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')",
connectionId, projectSlug,
),
}
}

func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_JOB_TABLE)
logger := taskCtx.GetLogger()
Expand Down Expand Up @@ -94,14 +108,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
AfterResponse: ignoreDeletedBuilds,
},
BuildInputIterator: func() (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once
dal.From(&models.CircleciJob{}),
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
cursor, err := db.Cursor(UnfinishedJobsInputClauses(data.Options.ConnectionId, data.Options.ProjectSlug)...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
connection_id,scope_id,day,enterprise_id,daily_active_users,weekly_active_users,monthly_active_users,monthly_active_chat_users,monthly_active_agent_users,pr_total_reviewed,pr_total_created,pr_total_created_by_copilot,pr_total_reviewed_by_copilot,user_initiated_interaction_count,code_generation_activity_count,code_acceptance_activity_count,loc_suggested_to_add_sum,loc_suggested_to_delete_sum,loc_added_sum,loc_deleted_sum
1,octodemo,2025-09-01T00:00:00.000+00:00,,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,octodemo,2025-09-02T00:00:00.000+00:00,,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
connection_id,scope_id,day,enterprise_id,daily_active_users,weekly_active_users,monthly_active_users,monthly_active_chat_users,monthly_active_agent_users,daily_active_cli_users,daily_active_copilot_code_review_users,daily_passive_copilot_code_review_users,weekly_active_copilot_code_review_users,weekly_passive_copilot_code_review_users,monthly_active_copilot_code_review_users,monthly_passive_copilot_code_review_users,chat_panel_agent_mode,chat_panel_ask_mode,chat_panel_custom_mode,chat_panel_edit_mode,chat_panel_plan_mode,chat_panel_unknown_mode,pr_total_reviewed,pr_total_created,pr_total_merged,pr_median_minutes_to_merge,pr_total_suggestions,pr_total_applied_suggestions,pr_total_created_by_copilot,pr_total_reviewed_by_copilot,pr_total_merged_created_by_copilot,pr_total_merged_reviewed_by_copilot,pr_median_min_to_merge_copilot_authored,pr_median_min_to_merge_copilot_reviewed,pr_total_copilot_suggestions,pr_total_copilot_applied_suggestions,user_initiated_interaction_count,code_generation_activity_count,code_acceptance_activity_count,loc_suggested_to_add_sum,loc_suggested_to_delete_sum,loc_added_sum,loc_deleted_sum,cli_session_count,cli_request_count,cli_prompt_count,cli_output_token_sum,cli_prompt_token_sum
1,octodemo,2025-09-01T00:00:00.000+00:00,,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,octodemo,2025-09-02T00:00:00.000+00:00,,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
connection_id,organization,user_login,user_id,plan_type,created_at,last_activity_at,last_activity_editor,last_authenticated_at,pending_cancellation_date,updated_at
1,octodemo,nathos,4215,enterprise,2023-08-28T23:50:42.000+00:00,2025-11-06T16:12:15.000+00:00,copilot_pr_review,2025-12-04T15:53:22.000+00:00,,2024-02-01T00:00:00.000+00:00
1,octodemo,octocat,1,enterprise,2024-01-10T10:11:12.000+00:00,,vscode/1.0.0/copilot-chat/0.1.0,,,2024-02-02T00:00:00.000+00:00
connection_id,organization,user_login,user_id,user_name,user_email,plan_type,assigning_team_id,assigning_team_name,assigning_team_slug,created_at,last_activity_at,last_activity_editor,last_authenticated_at,pending_cancellation_date,updated_at
1,octodemo,nathos,4215,,,enterprise,0,,,2023-08-28T23:50:42.000+00:00,2025-11-06T16:12:15.000+00:00,copilot_pr_review,2025-12-04T15:53:22.000+00:00,,2024-02-01T00:00:00.000+00:00
1,octodemo,octocat,1,,,enterprise,0,,,2024-01-10T10:11:12.000+00:00,,vscode/1.0.0/copilot-chat/0.1.0,,,2024-02-02T00:00:00.000+00:00
48 changes: 44 additions & 4 deletions backend/plugins/gh-copilot/models/enterprise_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type CopilotCodeMetrics struct {
LocDeletedSum int `json:"locDeletedSum"`
}

// CopilotCliMetrics contains CLI usage breakdown metrics.
type CopilotCliMetrics struct {
CliSessionCount int `json:"cliSessionCount" gorm:"comment:Number of CLI sessions"`
CliRequestCount int `json:"cliRequestCount" gorm:"comment:Number of CLI requests"`
CliPromptCount int `json:"cliPromptCount" gorm:"comment:Number of CLI prompts"`
CliOutputTokenSum int `json:"cliOutputTokenSum" gorm:"comment:Total output tokens from CLI"`
CliPromptTokenSum int `json:"cliPromptTokenSum" gorm:"comment:Total prompt tokens from CLI"`
}

// GhCopilotEnterpriseDailyMetrics captures daily enterprise-level aggregate Copilot metrics.
type GhCopilotEnterpriseDailyMetrics struct {
ConnectionId uint64 `gorm:"primaryKey" json:"connectionId"`
Expand All @@ -57,12 +66,43 @@ type GhCopilotEnterpriseDailyMetrics struct {
MonthlyActiveChatUsers int `json:"monthlyActiveChatUsers"`
MonthlyActiveAgentUsers int `json:"monthlyActiveAgentUsers"`

PRTotalReviewed int `json:"prTotalReviewed" gorm:"comment:Total PRs reviewed"`
PRTotalCreated int `json:"prTotalCreated" gorm:"comment:Total PRs created"`
PRTotalCreatedByCopilot int `json:"prTotalCreatedByCopilot" gorm:"comment:PRs created by Copilot"`
PRTotalReviewedByCopilot int `json:"prTotalReviewedByCopilot" gorm:"comment:PRs reviewed by Copilot"`
// CLI active users
DailyActiveCliUsers int `json:"dailyActiveCliUsers" gorm:"comment:Daily active CLI users"`

// Code review user counts
DailyActiveCopilotCodeReviewUsers int `json:"dailyActiveCopilotCodeReviewUsers"`
DailyPassiveCopilotCodeReviewUsers int `json:"dailyPassiveCopilotCodeReviewUsers"`
WeeklyActiveCopilotCodeReviewUsers int `json:"weeklyActiveCopilotCodeReviewUsers"`
WeeklyPassiveCopilotCodeReviewUsers int `json:"weeklyPassiveCopilotCodeReviewUsers"`
MonthlyActiveCopilotCodeReviewUsers int `json:"monthlyActiveCopilotCodeReviewUsers"`
MonthlyPassiveCopilotCodeReviewUsers int `json:"monthlyPassiveCopilotCodeReviewUsers"`

// Chat panel mode breakdown
ChatPanelAgentMode int `json:"chatPanelAgentMode" gorm:"comment:Chat panel agent mode interactions"`
ChatPanelAskMode int `json:"chatPanelAskMode" gorm:"comment:Chat panel ask mode interactions"`
ChatPanelCustomMode int `json:"chatPanelCustomMode" gorm:"comment:Chat panel custom mode interactions"`
ChatPanelEditMode int `json:"chatPanelEditMode" gorm:"comment:Chat panel edit mode interactions"`
ChatPanelPlanMode int `json:"chatPanelPlanMode" gorm:"comment:Chat panel plan mode interactions"`
ChatPanelUnknownMode int `json:"chatPanelUnknownMode" gorm:"comment:Chat panel unknown mode interactions"`

// Pull request metrics (expanded)
PRTotalReviewed int `json:"prTotalReviewed" gorm:"comment:Total PRs reviewed"`
PRTotalCreated int `json:"prTotalCreated" gorm:"comment:Total PRs created"`
PRTotalMerged int `json:"prTotalMerged" gorm:"comment:Total PRs merged"`
PRMedianMinutesToMerge float64 `json:"prMedianMinutesToMerge" gorm:"comment:Median minutes to merge PRs"`
PRTotalSuggestions int `json:"prTotalSuggestions" gorm:"comment:Total PR review suggestions"`
PRTotalAppliedSuggestions int `json:"prTotalAppliedSuggestions" gorm:"comment:Total applied PR suggestions"`
PRTotalCreatedByCopilot int `json:"prTotalCreatedByCopilot" gorm:"comment:PRs created by Copilot"`
PRTotalReviewedByCopilot int `json:"prTotalReviewedByCopilot" gorm:"comment:PRs reviewed by Copilot"`
PRTotalMergedCreatedByCopilot int `json:"prTotalMergedCreatedByCopilot" gorm:"comment:Merged PRs created by Copilot"`
PRTotalMergedReviewedByCopilot int `json:"prTotalMergedReviewedByCopilot" gorm:"comment:Merged PRs reviewed by Copilot"`
PRMedianMinToMergeCopilotAuthored float64 `json:"prMedianMinToMergeCopilotAuthored" gorm:"comment:Median min to merge Copilot-authored PRs"`
PRMedianMinToMergeCopilotReviewed float64 `json:"prMedianMinToMergeCopilotReviewed" gorm:"comment:Median min to merge Copilot-reviewed PRs"`
PRTotalCopilotSuggestions int `json:"prTotalCopilotSuggestions" gorm:"comment:Total Copilot review suggestions"`
PRTotalCopilotAppliedSuggestions int `json:"prTotalCopilotAppliedSuggestions" gorm:"comment:Total Copilot applied suggestions"`

CopilotActivityMetrics `mapstructure:",squash"`
CopilotCliMetrics `mapstructure:",squash"`
common.NoPKModel
}

Expand Down
Loading