Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4ea7d34
feat(scanner): redesign scanner as long-running gRPC service with tar…
r2dedios May 28, 2026
ec52152
fix(compose): ensure API waits for DB initialization
r2dedios May 29, 2026
01218a7
fix(scanner): add API readiness check, account seeding, and billing o…
r2dedios May 29, 2026
4deaec0
fix(api): use lightweight struct for expense update instances
r2dedios May 29, 2026
e980741
fix(db): handle NULL columns in system events and schedule models
r2dedios May 29, 2026
2126037
feat(events): add Account resource type to audit event system
r2dedios May 29, 2026
2c62af5
feat(db): add cluster and account names to schedule view
r2dedios May 29, 2026
af023e2
feat(console): replace ModalPowerManagement with ModalCreateAction
r2dedios May 29, 2026
8417c60
fix(console): fix resource path resolution and handle empty resourceId
r2dedios May 29, 2026
05af239
feat(console): add ResourceBadge and ResourceLabel with styled action…
r2dedios May 29, 2026
33c5812
feat(console): unify Target column and add resource labels to detail …
r2dedios May 29, 2026
ae821f1
refactor(db): rename triggered_by to requester and add schedule metad…
r2dedios Jun 1, 2026
f77483b
refactor(backend): rename TriggeredBy to Requester and persist schedu…
r2dedios Jun 1, 2026
752006a
test: update tests for requester rename and enriched event models
r2dedios Jun 1, 2026
26b09c4
feat(console): redesign Audit Logs and Scheduler tables
r2dedios Jun 1, 2026
3d7527a
feat(console): add Scan action dropdown to Account Details
r2dedios Jun 1, 2026
73f1d19
fix(console): align Recent Events table with Audit Logs rendering
r2dedios Jun 1, 2026
7762795
fix(console): add error handling to action create API calls
r2dedios Jun 1, 2026
0b385da
fix(lint): align struct field formatting in action models
r2dedios Jun 1, 2026
08dcd09
fix(scanner): extract magic numbers into named constants
r2dedios Jun 1, 2026
426f3c0
refactor(agent): extract dispatchActionLocked to reduce cyclomatic co…
r2dedios Jun 1, 2026
ff1ab17
fix(ci): remove depends_on and healthcheck incompatible with CI podman
r2dedios Jun 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ AGENT_IMG_NAME ?= $(PROJECT_NAME)-agent
AGENT_IMAGE ?= $(REGISTRY)/$(REGISTRY_REPO)/$(AGENT_IMG_NAME)
AGENT_CONTAINERFILE ?= ./$(DEPLOYMENTS_DIR)/containerfiles/Containerfile-agent
AGENT_PROTO_PATH ?= ./cmd/agent/proto/agent.proto
SCANNER_PROTO_PATH ?= ./cmd/scanner/proto/scanner.proto
PGSQL_IMG_NAME ?= $(PROJECT_NAME)-pgsql
PGSQL_IMAGE ?= $(REGISTRY)/$(REGISTRY_REPO)/$(PGSQL_IMG_NAME)
PGSQL_CONTAINERFILE ?= ./$(DEPLOYMENTS_DIR)/containerfiles/Containerfile-pgsql
Expand Down Expand Up @@ -81,6 +82,8 @@ local-build-api: generate-converters swagger-doc ## Build the API binary

local-build-scanner: ## Build the scanner binary
@echo "### [Building Scanner] ###"
@[ ! -d $(GENERATED_DIR) ] && { mkdir $(GENERATED_DIR); } || { exit 0; }
@$(PROTOC) --go_out=$(GENERATED_DIR) --go-grpc_out=$(GENERATED_DIR) $(SCANNER_PROTO_PATH)
@$(GO) build -o $(BIN_DIR)/scanners/scanner $(LDFLAGS) ./cmd/scanner

local-build-agent: ## Build the agent binary
Expand Down
103 changes: 95 additions & 8 deletions cmd/agent/executor_agent_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"crypto/tls"
"fmt"
"net/http"
"strconv"
"sync"

"github.com/RHEcosystemAppEng/cluster-iq/internal/actions"
"github.com/RHEcosystemAppEng/cluster-iq/internal/clients"
cexec "github.com/RHEcosystemAppEng/cluster-iq/internal/cloud_executors"
"github.com/RHEcosystemAppEng/cluster-iq/internal/config"
"github.com/RHEcosystemAppEng/cluster-iq/internal/credentials"
Expand All @@ -28,6 +30,8 @@ type ExecutorAgentService struct {
client http.Client // HTTP Client for retrieving the schedule from API
eventService *eventservice.EventService // Service for handling audit logs
actionRepo repositories.ActionRepository
scannerClient *clients.ScannerGRPCClient // gRPC client for the Scanner service
actionRunRepo repositories.ActionRunRepository
}

// NewExecutorAgentService creates and initializes a new AgentCron instance for managing the scheduled actions
Expand Down Expand Up @@ -57,6 +61,13 @@ func NewExecutorAgentService(cfg *config.ExecutorAgentServiceConfig, actionsChan

eventService := eventservice.NewEventService(db, logger)
actionRepo := repositories.NewActionRepository(db)
actionRunRepo := repositories.NewActionRunRepository(db)

scannerClient, err := clients.NewScannerGRPCClient(cfg.ScannerURL, logger)
if err != nil {
logger.Error("Failed to create Scanner gRPC client", zap.Error(err))
return nil
}

eas := ExecutorAgentService{
cfg: cfg,
Expand All @@ -66,9 +77,11 @@ func NewExecutorAgentService(cfg *config.ExecutorAgentServiceConfig, actionsChan
logger: logger,
wg: wg,
},
client: client,
eventService: eventService,
actionRepo: actionRepo,
client: client,
eventService: eventService,
actionRepo: actionRepo,
scannerClient: scannerClient,
actionRunRepo: actionRunRepo,
}

// Reading credentials file and creating executors per account
Expand Down Expand Up @@ -198,25 +211,46 @@ func (e *ExecutorAgentService) processAction(action actions.Action) {
zap.Any("requester", action.GetRequester()),
)

// Initialize event tracker
target := action.GetTarget()

resourceType := inventory.ClusterResourceType
resourceID := target.ClusterID
if action.GetActionOperation() == actions.Scan {
resourceType = inventory.AccountResourceType
if len(target.TargetAccountIDs) > 0 {
resourceID = target.TargetAccountIDs[0]
}
}

var scheduleID *int64
if sid, err := strconv.ParseInt(action.GetID(), 10, 64); err == nil {
scheduleID = &sid
}

tracker := e.eventService.StartTracking(&eventservice.EventOptions{
Action: action.GetActionOperation(),
Description: action.GetDescription(),
ResourceID: action.GetTarget().ClusterID,
ResourceType: inventory.ClusterResourceType,
ResourceID: resourceID,
ResourceType: resourceType,
Result: eventservice.ResultPending,
Severity: eventservice.SeverityInfo,
TriggeredBy: action.GetRequester(),
Requester: action.GetRequester(),
ScheduleID: scheduleID,
})

if action.GetActionOperation() == actions.Scan {
e.processScanAction(action, tracker)
return
}

// Mark as running
if !e.setActionStatus(action, actions.StatusRunning) {
tracker.Failed()
return
}

// Get executor
executor := e.GetExecutor(action.GetTarget().AccountID)
executor := e.GetExecutor(target.AccountID)
if executor == nil {
e.handleMissingExecutor(action, tracker)
return
Expand All @@ -235,6 +269,59 @@ func (e *ExecutorAgentService) processAction(action actions.Action) {
e.resetCronActionStatus(action)
}

// processScanAction dispatches a Scan action to the Scanner gRPC service.
func (e *ExecutorAgentService) processScanAction(action actions.Action, tracker *eventservice.EventTracker) {
if !e.setActionStatus(action, actions.StatusRunning) {
tracker.Failed()
return
}

target := action.GetTarget()

runID, err := e.actionRunRepo.Create(context.Background(), action.GetID())
if err != nil {
e.logger.Error("Failed to create action run for scan",
zap.String("action_id", action.GetID()), zap.Error(err))
e.setActionStatus(action, actions.StatusFailed)
tracker.Failed()
return
}
runIDStr := strconv.FormatInt(runID, 10)

resp, err := e.scannerClient.Scan(
context.Background(),
runID,
target.TargetAccountIDs,
target.SelectAll,
)
if err != nil {
e.logger.Error("Scanner gRPC call failed",
zap.String("action_id", action.GetID()), zap.Error(err))
e.setActionStatus(action, actions.StatusFailed)
_ = e.actionRunRepo.Update(context.Background(), runIDStr, "Failed", err.Error())
tracker.Failed()
return
}

if resp.Error != 0 {
e.logger.Error("Scanner returned error",
zap.String("action_id", action.GetID()),
zap.String("message", resp.Message))
e.setActionStatus(action, actions.StatusFailed)
_ = e.actionRunRepo.Update(context.Background(), runIDStr, "Failed", resp.Message)
tracker.Failed()
return
}

e.logger.Info("Scan completed successfully",
zap.String("action_id", action.GetID()),
zap.Int32("accounts_scanned", resp.AccountsScanned))
e.setActionStatus(action, actions.StatusSuccess)
_ = e.actionRunRepo.Update(context.Background(), runIDStr, "Success", "")
tracker.Success()
e.resetCronActionStatus(action)
}

// setActionStatus safely updates action status with type assertion.
// Returns false if update failed (caller should abort).
func (e *ExecutorAgentService) setActionStatus(action actions.Action, status actions.ActionStatus) bool {
Expand Down
55 changes: 37 additions & 18 deletions cmd/agent/schedule_agent_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,26 +271,45 @@ func (a *ScheduleAgentService) ScheduleNewActions(newSchedule []actions.Action)

// Checking the entire new schedule to schedule or reschedule actions
for _, action := range newSchedule {
var scheduledFunc func(*actions.ScheduledAction)
var cronFunc func(*actions.CronAction)

if _, exists := a.schedule[action.GetID()]; !exists { // Schedule new actions
scheduledFunc = a.scheduleNewScheduledAction
cronFunc = a.scheduleNewCronAction
} else { // Reschedule actions
scheduledFunc = a.rescheduleScheduledAction
cronFunc = a.rescheduleCronAction
}
a.dispatchActionLocked(action)
}
}

// managing actions based on type
switch t := action.(type) {
case *actions.ScheduledAction:
scheduledFunc(t)
case *actions.CronAction:
cronFunc(t)
default:
a.logger.Error("Unknown action type", zap.String("action_id", action.GetID()))
// dispatchActionLocked schedules, reschedules, or dispatches a single action.
// Must be called with a.mutex held.
func (a *ScheduleAgentService) dispatchActionLocked(action actions.Action) {
_, exists := a.schedule[action.GetID()]

switch t := action.(type) {
case *actions.InstantAction:
if exists {
return
}
a.logger.Info("Dispatching InstantAction for immediate execution", zap.String("action_id", t.GetID()))
a.schedule[t.GetID()] = scheduleItem{
cancel: func() {},
action: t,
}
go func() {
a.actionsChannel <- t
a.mutex.Lock()
delete(a.schedule, t.GetID())
a.mutex.Unlock()
}()
case *actions.ScheduledAction:
if !exists {
a.scheduleNewScheduledAction(t)
} else {
a.rescheduleScheduledAction(t)
}
case *actions.CronAction:
if !exists {
a.scheduleNewCronAction(t)
} else {
a.rescheduleCronAction(t)
}
default:
a.logger.Error("Unknown action type", zap.String("action_id", action.GetID()))
}
}

Expand Down
12 changes: 12 additions & 0 deletions cmd/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type APIHandlers struct {
ExpenseHandler *handlers.ExpenseHandler
EventHandler *handlers.EventHandler
ActionHandler *handlers.ActionHandler
ActionRunHandler *handlers.ActionRunHandler
OverviewHandler *handlers.OverviewHandler
HealthCheckHandler *handlers.HealthCheckHandler
}
Expand All @@ -30,6 +31,7 @@ func Setup(engine *gin.Engine, handlers APIHandlers) {
setupExpenseRoutes(baseGroup, handlers.ExpenseHandler)
setupEventRoutes(baseGroup, handlers.EventHandler)
setupActionRoutes(baseGroup, handlers.ActionHandler)
setupActionRunRoutes(baseGroup, handlers.ActionRunHandler)
setupOverviewRoutes(baseGroup, handlers.OverviewHandler)
}
}
Expand Down Expand Up @@ -110,6 +112,16 @@ func setupActionRoutes(group *gin.RouterGroup, handler *handlers.ActionHandler)
}
}

func setupActionRunRoutes(group *gin.RouterGroup, handler *handlers.ActionRunHandler) {
actionRuns := group.Group("/action-runs")
{
actionRuns.GET("", handler.List)
actionRuns.GET("/:id", handler.Get)
actionRuns.POST("", handler.Create)
actionRuns.PATCH("/:id", handler.Update)
}
}

func setupOverviewRoutes(group *gin.RouterGroup, handler *handlers.OverviewHandler) {
overview := group.Group("/overview")
{
Expand Down
3 changes: 3 additions & 0 deletions cmd/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func main() {
expenseRepo := repositories.NewExpenseRepository(dbClient)
eventRepo := repositories.NewEventRepository(dbClient)
actionRepo := repositories.NewActionRepository(dbClient)
actionRunRepo := repositories.NewActionRunRepository(dbClient)

// Initializing services
inventoryService := services.NewInventoryService(inventoryRepo)
Expand All @@ -212,6 +213,7 @@ func main() {
expenseService := services.NewExpenseService(expenseRepo)
eventService := services.NewEventService(eventRepo)
actionService := services.NewActionService(actionRepo)
actionRunService := services.NewActionRunService(actionRunRepo)
overviewService := services.NewOverviewService(clusterRepo, instanceRepo, accountRepo)

// Initializing handlers
Expand All @@ -223,6 +225,7 @@ func main() {
ExpenseHandler: handlers.NewExpenseHandler(expenseService, logger),
EventHandler: handlers.NewEventHandler(eventService, logger),
ActionHandler: handlers.NewActionHandler(actionService, logger),
ActionRunHandler: handlers.NewActionRunHandler(actionRunService, logger),
OverviewHandler: handlers.NewOverviewHandler(overviewService, logger),
HealthCheckHandler: handlers.NewHealthCheckHandler(dbClient, logger),
}
Expand Down
33 changes: 33 additions & 0 deletions cmd/scanner/proto/scanner.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package scanner;

option go_package = "./scanner";

// gRPC service for the Scanner
service ScannerService {
rpc Scan (ScanRequest) returns (ScanResponse);
rpc Health (HealthRequest) returns (HealthResponse);
}

// Message for requesting a scan
message ScanRequest {
int64 run_id = 1;
repeated string account_ids = 2;
bool select_all = 3;
}

// Message for answering to ScanRequests
message ScanResponse {
int32 error = 1;
string message = 2;
int32 accounts_scanned = 3;
}

// Message for health check requests
message HealthRequest {}

// Message for health check responses
message HealthResponse {
bool ready = 1;
}
Loading
Loading