Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 127 additions & 75 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func runApply(stackName, composeFile string, opts *ApplyOptions) error {
stackDeployer := swarm.NewStackDeployer(cli, stackName, 3)

// Create snapshot before deployment
snap := snapshot.CreateSnapshot(ctx, stackDeployer)
snap, err := snapshot.CreateSnapshot(ctx, stackDeployer)
if err != nil {
return fmt.Errorf("deployment blocked: %w", err)
}

// Track deployment state
deploymentComplete := make(chan bool, 1)
Expand All @@ -148,7 +151,10 @@ func runApply(stackName, composeFile string, opts *ApplyOptions) error {
os.Exit(0)
default:
log.Println("Deployment interrupted, initiating rollback...")
snapshot.Rollback(context.Background(), stackDeployer, snap)
// Create context with timeout for rollback to prevent hanging
rollbackCtx, rollbackCancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer rollbackCancel()
snapshot.Rollback(rollbackCtx, stackDeployer, snap)
os.Exit(130)
}
}()
Expand Down Expand Up @@ -348,12 +354,12 @@ func monitorServiceTasks(ctx context.Context, cli *client.Client, svc swarm.Serv
}

// waitForAllTasksHealthy waits for all tasks of updated services to become healthy
// Optimized to use batch API calls instead of per-service calls
func waitForAllTasksHealthy(ctx context.Context, cli *client.Client, stackName string, updatedServices []swarm.ServiceUpdateResult, deployID string) error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

startTime := time.Now()
serviceHealthyCount := make(map[string]int)

for {
select {
Expand All @@ -364,62 +370,74 @@ func waitForAllTasksHealthy(ctx context.Context, cli *client.Client, stackName s
case <-ticker.C:
allHealthy := true
unhealthyTasks := []string{}
serviceHealthyCount := make(map[string]int)

// OPTIMIZATION 1: Batch fetch all services in stack with one API call
serviceFilter := filters.NewArgs()
serviceFilter.Add("label", "com.docker.stack.namespace="+stackName)
allServices, err := cli.ServiceList(ctx, types.ServiceListOptions{
Filters: serviceFilter,
})
if err != nil {
log.Printf("[HealthCheck] Failed to list services: %v", err)
allHealthy = false
continue
}

for _, svc := range updatedServices {
// Get current service by name to get updated service ID
// During updates, service ID remains the same but this ensures we have the latest service state
serviceFilter := filters.NewArgs()
serviceFilter.Add("name", svc.ServiceName)
services, err := cli.ServiceList(ctx, types.ServiceListOptions{
Filters: serviceFilter,
})
if err != nil {
log.Printf("[HealthCheck] Failed to get service %s: %v", svc.ServiceName, err)
allHealthy = false
continue
}
if len(services) == 0 {
log.Printf("[HealthCheck] Service %s not found", svc.ServiceName)
allHealthy = false
continue
}
// Create service name -> service map for quick lookup
serviceMap := make(map[string]dockerswarm.Service)
for _, svc := range allServices {
serviceMap[svc.Spec.Name] = svc
}

// OPTIMIZATION 2: Batch fetch all tasks in stack with one API call
taskFilter := filters.NewArgs()
taskFilter.Add("label", "com.docker.stack.namespace="+stackName)
allStackTasks, err := cli.TaskList(ctx, types.TaskListOptions{
Filters: taskFilter,
})
if err != nil {
log.Printf("[HealthCheck] Failed to list tasks: %v", err)
allHealthy = false
continue
}

currentServiceID := services[0].ID
// Group tasks by service name and filter by deployID
tasksByService := make(map[string][]dockerswarm.Task)
for _, task := range allStackTasks {
// Filter by deployID
if task.Spec.ContainerSpec != nil && task.Spec.ContainerSpec.Labels != nil {
if taskDeployID, ok := task.Spec.ContainerSpec.Labels["com.stackman.deploy.id"]; ok && taskDeployID == deployID {
serviceName := task.Spec.ContainerSpec.Labels["com.docker.swarm.service.name"]
tasksByService[serviceName] = append(tasksByService[serviceName], task)
}
}
}

// Get ALL tasks for this service
// Note: Docker API does not support label filtering for tasks, only for containers
// So we get all tasks and filter manually
filter := filters.NewArgs()
filter.Add("service", currentServiceID)
// OPTIMIZATION 3: Collect containers that need inspection
type containerTask struct {
containerID string
task dockerswarm.Task
serviceName string
}
containersToInspect := []containerTask{}

allTasks, err := cli.TaskList(ctx, types.TaskListOptions{
Filters: filter,
})
if err != nil {
log.Printf("[HealthCheck] Failed to list tasks for service %s: %v", svc.ServiceName, err)
// Process each updated service
for _, svc := range updatedServices {
_, exists := serviceMap[svc.ServiceName]
if !exists {
log.Printf("[HealthCheck] Service %s not found", svc.ServiceName)
allHealthy = false
continue
}

// Filter tasks by deployID from ContainerSpec labels
tasks := []dockerswarm.Task{}
for _, t := range allTasks {
if t.Spec.ContainerSpec != nil && t.Spec.ContainerSpec.Labels != nil {
if taskDeployID, ok := t.Spec.ContainerSpec.Labels["com.stackman.deploy.id"]; ok && taskDeployID == deployID {
tasks = append(tasks, t)
}
}
}

log.Printf("[HealthCheck] Service %s: found %d tasks with deployID %s (total tasks: %d)",
svc.ServiceName, len(tasks), deployID, len(allTasks))
tasks := tasksByService[svc.ServiceName]
log.Printf("[HealthCheck] Service %s: found %d tasks with deployID %s",
svc.ServiceName, len(tasks), deployID)

healthyTaskCount := 0
hasRunningTask := false

for _, t := range tasks {
// deployID label guarantees correct tasks - no version check needed

// Log failed/shutdown tasks but don't fail immediately (Docker Swarm may restart)
if t.Status.State == dockerswarm.TaskStateFailed ||
t.Status.State == dockerswarm.TaskStateShutdown ||
Expand All @@ -444,47 +462,81 @@ func waitForAllTasksHealthy(ctx context.Context, cli *client.Client, stackName s
continue
}

// Check container health if healthcheck is defined
// Mark container for inspection if it exists
if t.Status.ContainerStatus != nil && t.Status.ContainerStatus.ContainerID != "" {
containerInfo, err := cli.ContainerInspect(ctx, t.Status.ContainerStatus.ContainerID)
if err != nil {
log.Printf("[HealthCheck] Failed to inspect container %s for task %s (%s): %v",
t.Status.ContainerStatus.ContainerID[:12], t.ID[:12], svc.ServiceName, err)
allHealthy = false
unhealthyTasks = append(unhealthyTasks, fmt.Sprintf("%s/%s (inspect failed)", svc.ServiceName, t.ID[:12]))
continue
}

// If container has health check, wait for healthy status
if containerInfo.State.Health != nil {
if containerInfo.State.Health.Status != container.Healthy {
allHealthy = false
unhealthyTasks = append(unhealthyTasks, fmt.Sprintf("%s/%s (health: %s)", svc.ServiceName, t.ID[:12], containerInfo.State.Health.Status))
log.Printf("[HealthCheck] ⏳ Task %s (%s) is %s", t.ID[:12], svc.ServiceName, containerInfo.State.Health.Status)
} else {
log.Printf("[HealthCheck] ✅ Task %s (%s) is healthy", t.ID[:12], svc.ServiceName)
healthyTaskCount++
}
} else {
// No healthcheck defined, just check if running
log.Printf("[HealthCheck] ✅ Task %s (%s) is running (no healthcheck)", t.ID[:12], svc.ServiceName)
healthyTaskCount++
}
containersToInspect = append(containersToInspect, containerTask{
containerID: t.Status.ContainerStatus.ContainerID,
task: t,
serviceName: svc.ServiceName,
})
} else {
// No container status yet
allHealthy = false
unhealthyTasks = append(unhealthyTasks, fmt.Sprintf("%s/%s (no container)", svc.ServiceName, t.ID[:12]))
log.Printf("[HealthCheck] ⏳ Task %s (%s) has no container yet", t.ID[:12], svc.ServiceName)
}
}

// Track if service has running tasks
if !hasRunningTask {
log.Printf("[HealthCheck] ⏳ Service %s has no running tasks yet (may be restarting)", svc.ServiceName)
allHealthy = false
}
}

// OPTIMIZATION 4: Parallel container inspections with goroutines
type inspectResult struct {
ct containerTask
info types.ContainerJSON
err error
}

resultChan := make(chan inspectResult, len(containersToInspect))
var wg sync.WaitGroup

for _, ct := range containersToInspect {
wg.Add(1)
go func(c containerTask) {
defer wg.Done()
info, err := cli.ContainerInspect(ctx, c.containerID)
resultChan <- inspectResult{ct: c, info: info, err: err}
}(ct)
}

serviceHealthyCount[svc.ServiceName] = healthyTaskCount
go func() {
wg.Wait()
close(resultChan)
}()

// Process inspection results
for result := range resultChan {
ct := result.ct
taskID := ct.task.ID
serviceName := ct.serviceName

if result.err != nil {
log.Printf("[HealthCheck] Failed to inspect container %s for task %s (%s): %v",
ct.containerID[:12], taskID[:12], serviceName, result.err)
allHealthy = false
unhealthyTasks = append(unhealthyTasks, fmt.Sprintf("%s/%s (inspect failed)", serviceName, taskID[:12]))
continue
}

// Check health status
if result.info.State.Health != nil {
if result.info.State.Health.Status != container.Healthy {
allHealthy = false
unhealthyTasks = append(unhealthyTasks, fmt.Sprintf("%s/%s (health: %s)",
serviceName, taskID[:12], result.info.State.Health.Status))
log.Printf("[HealthCheck] ⏳ Task %s (%s) is %s",
taskID[:12], serviceName, result.info.State.Health.Status)
} else {
log.Printf("[HealthCheck] ✅ Task %s (%s) is healthy", taskID[:12], serviceName)
serviceHealthyCount[serviceName]++
}
} else {
// No healthcheck defined, just check if running
log.Printf("[HealthCheck] ✅ Task %s (%s) is running (no healthcheck)", taskID[:12], serviceName)
serviceHealthyCount[serviceName]++
}
}

// Check that all services have at least one healthy task
Expand Down
13 changes: 6 additions & 7 deletions internal/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type Monitor struct {
doneChan chan struct{} // signals monitor has stopped

// Lifecycle management
ctx context.Context
cancel context.CancelFunc
shutdownOnce sync.Once
stopped bool

Expand All @@ -57,8 +55,6 @@ func NewMonitor(client client.APIClient, taskID string, serviceID string, servic

// NewMonitorWithLogs creates a new task monitor with optional log streaming
func NewMonitorWithLogs(client client.APIClient, taskID string, serviceID string, serviceName string, showLogs bool) *Monitor {
ctx, cancel := context.WithCancel(context.Background())

return &Monitor{
client: client,
taskID: taskID,
Expand All @@ -68,8 +64,6 @@ func NewMonitorWithLogs(client client.APIClient, taskID string, serviceID string
eventChan: make(chan Event, 10),
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
ctx: ctx,
cancel: cancel,
healthStatus: "unknown",
lastSeen: time.Now(),
}
Expand Down Expand Up @@ -134,7 +128,6 @@ func (m *Monitor) Stop() {
m.stopped = true
m.mu.Unlock()
close(m.stopChan)
m.cancel()
})
}

Expand Down Expand Up @@ -276,6 +269,12 @@ func (m *Monitor) checkHealth(ctx context.Context) {

// streamLogs streams container logs for this task
func (m *Monitor) streamLogs(ctx context.Context) {
// If Docker client is not initialized, skip log streaming
if m.client == nil {
log.Printf("[TaskLogs] Docker client is nil, skipping log streaming for task %s", m.shortTaskID())
return
}

log.Printf("[TaskLogs] Waiting for container ID for task %s...", m.shortTaskID())

// Wait for container ID to be available AND task to be running
Expand Down
8 changes: 0 additions & 8 deletions internal/health/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,6 @@ func TestMonitor_Stop(t *testing.T) {
t.Error("stopChan should be closed")
}

// Context should be cancelled
select {
case <-monitor.ctx.Done():
// Expected
case <-time.After(100 * time.Millisecond):
t.Error("context should be cancelled")
}

// Multiple calls to Stop should be safe
monitor.Stop()
monitor.Stop()
Expand Down
Loading