Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 139 additions & 121 deletions api/pkg/api/handler/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package handler

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -1367,148 +1366,167 @@ func (umh UpdateMachineHandler) Handle(c echo.Context) error {
return cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "Machine must have an assigned Instance for online repair", nil)
}

orTx, err := cdb.BeginTx(ctx, umh.dbSession, &sql.TxOptions{})
if err != nil {
logger.Error().Err(err).Msg("unable to start transaction")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Error updating machine", nil)
}
orTxCommitted := false
defer common.RollbackTx(ctx, orTx, &orTxCommitted)

iDAO := cdbm.NewInstanceDAO(umh.dbSession)
instances, _, ierr := iDAO.GetAll(ctx, orTx, cdbm.InstanceFilterInput{MachineIDs: []string{machine.ID}}, cdbp.PageInput{Limit: cdb.GetIntPtr(1)}, nil)
if ierr != nil {
logger.Error().Err(ierr).Msg("error retrieving Instance for Machine")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to retrieve Instance for Machine", nil)
}
if len(instances) == 0 {
return cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "Machine must have an assigned Instance for online repair", nil)
}
inst := instances[0]
// timeoutResp lets the closure signal a post-rollback handler — the
// TerminateWorkflow call has to run after the closure returns so that
// the DB tx unwinds before we make the second remote call. nil means
// no timeout occurred and the normal flow continues.
var timeoutResp func() error

if apiRequest.OnlineRepairEnabled() {
if inst.Status != cdbm.InstanceStatusReady {
return cutil.NewAPIErrorResponse(c, http.StatusBadRequest, fmt.Sprintf("Instance must be in Ready state to enter online repair (current state: %s)", inst.Status), nil)
err := cdb.WithTx(ctx, umh.dbSession, func(orTx *cdb.Tx) error {
iDAO := cdbm.NewInstanceDAO(umh.dbSession)
instances, _, derr := iDAO.GetAll(ctx, orTx, cdbm.InstanceFilterInput{MachineIDs: []string{machine.ID}}, cdbp.PageInput{Limit: cdb.GetIntPtr(1)}, nil)
if derr != nil {
logger.Error().Err(derr).Msg("error retrieving Instance for Machine")
return cutil.NewAPIError(http.StatusInternalServerError, "Failed to retrieve Instance for Machine", nil)
}

insReq, ierr := apiRequest.ToInsertHealthReportOverrideProto(machine.ID)
if ierr != nil {
logger.Error().Err(ierr).Msg("failed to build online repair health override request")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to build online repair request", nil)
if len(instances) == 0 {
return cutil.NewAPIError(http.StatusBadRequest, "Machine must have an assigned Instance for online repair", nil)
}
inst := instances[0]

instanceLabels := maps.Clone(inst.Labels)
if instanceLabels == nil {
instanceLabels = map[string]string{}
}
if apiRequest.OnlineRepairEnabled() {
if inst.Status != cdbm.InstanceStatusReady {
return cutil.NewAPIError(http.StatusBadRequest, fmt.Sprintf("Instance must be in Ready state to enter online repair (current state: %s)", inst.Status), nil)
}

if *apiRequest.OnlineRepair.Policy.AllowAutoInstanceDeletionOnFailure {
instanceLabels[model.InstanceLabelOnlineRepairAllowAutoDeletion] = "true"
} else {
instanceLabels[model.InstanceLabelOnlineRepairAllowAutoDeletion] = "false"
}
insReq, perr := apiRequest.ToInsertHealthReportOverrideProto(machine.ID)
if perr != nil {
logger.Error().Err(perr).Msg("failed to build online repair health override request")
return cutil.NewAPIError(http.StatusInternalServerError, "Failed to build online repair request", nil)
}

_, err = iDAO.Update(ctx, orTx, cdbm.InstanceUpdateInput{
InstanceID: inst.ID,
Status: cdb.GetStrPtr(cdbm.InstanceStatusRepairing),
Labels: instanceLabels,
})
if err != nil {
logger.Error().Err(err).Msg("error updating Instance for online repair in DB")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to update Instance for online repair", nil)
}
instanceLabels := maps.Clone(inst.Labels)
if instanceLabels == nil {
instanceLabels = map[string]string{}
}

wfOpts := temporalClient.StartWorkflowOptions{
ID: "site-machine-online-repair-" + machine.ID,
WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout,
TaskQueue: queue.SiteTaskQueue,
}
if *apiRequest.OnlineRepair.Policy.AllowAutoInstanceDeletionOnFailure {
instanceLabels[model.InstanceLabelOnlineRepairAllowAutoDeletion] = "true"
} else {
instanceLabels[model.InstanceLabelOnlineRepairAllowAutoDeletion] = "false"
}

wfCtx, wfCancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout)
defer wfCancel()
_, derr = iDAO.Update(ctx, orTx, cdbm.InstanceUpdateInput{
InstanceID: inst.ID,
Status: cdb.GetStrPtr(cdbm.InstanceStatusRepairing),
Labels: instanceLabels,
})
if derr != nil {
logger.Error().Err(derr).Msg("error updating Instance for online repair in DB")
return cutil.NewAPIError(http.StatusInternalServerError, "Failed to update Instance for online repair", nil)
}

we, err := stc.ExecuteWorkflow(wfCtx, wfOpts, "CreateMachineHealthReportOverride", insReq)
if err != nil {
logger.Error().Err(err).Msg("failed to start Temporal workflow for applying online repair health override")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("Failed to start applying online repair health override workflow on Site: %s", err), nil)
}
wid := we.GetID()
logger.Info().Str("Workflow ID", wid).Msg("executed synchronous applying online repair health override workflow")
err = we.Get(wfCtx, nil)
if err != nil {
var timeoutErr *tp.TimeoutError
if errors.As(err, &timeoutErr) || err == context.DeadlineExceeded || wfCtx.Err() != nil || ctx.Err() != nil {
return common.TerminateWorkflowOnTimeOut(c, logger, stc, wid, err, "Machine", "CreateMachineHealthReportOverride")
wfOpts := temporalClient.StartWorkflowOptions{
ID: "site-machine-online-repair-" + machine.ID,
WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout,
TaskQueue: queue.SiteTaskQueue,
}
code, werr := common.UnwrapWorkflowError(err)
logger.Error().Err(werr).Msg("applying online repair health override workflow failed")
return cutil.NewAPIErrorResponse(c, code, fmt.Sprintf("Failed to execute applying online repair health override workflow on Site: %s", werr), nil)
}
logger.Info().Str("Workflow ID", wid).Msg("completed synchronous applying online repair health override workflow")
} else {
if inst.Status != cdbm.InstanceStatusRepairing {
return cutil.NewAPIErrorResponse(c, http.StatusBadRequest, fmt.Sprintf("Instance must be in Repairing state to exit online repair (current state: %s)", inst.Status), nil)
}

instanceLabels := maps.Clone(inst.Labels)
if instanceLabels == nil {
instanceLabels = map[string]string{}
}
wfCtx, wfCancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout)
defer wfCancel()

we, wferr := stc.ExecuteWorkflow(wfCtx, wfOpts, "CreateMachineHealthReportOverride", insReq)
if wferr != nil {
logger.Error().Err(wferr).Msg("failed to start Temporal workflow for applying online repair health override")
return cutil.NewAPIError(http.StatusInternalServerError, fmt.Sprintf("Failed to start applying online repair health override workflow on Site: %s", wferr), nil)
}
wid := we.GetID()
logger.Info().Str("Workflow ID", wid).Msg("executed synchronous applying online repair health override workflow")
wferr = we.Get(wfCtx, nil)
if wferr != nil {
var timeoutErr *tp.TimeoutError
if errors.As(wferr, &timeoutErr) || wferr == context.DeadlineExceeded || wfCtx.Err() != nil || ctx.Err() != nil {
logger.Error().Err(wferr).Msg("failed to apply online repair health override, timeout occurred executing workflow on Site.")
timeoutCause := wferr // explicit capture; defensive against future refactors
timeoutResp = func() error {
return common.TerminateWorkflowOnTimeOut(c, logger, stc, wid, timeoutCause, "Machine", "CreateMachineHealthReportOverride")
}
return cutil.NewAPIError(http.StatusInternalServerError, "Applying online repair health override workflow timed out", nil)
}
code, werr := common.UnwrapWorkflowError(wferr)
logger.Error().Err(werr).Msg("applying online repair health override workflow failed")
return cutil.NewAPIError(code, fmt.Sprintf("Failed to execute applying online repair health override workflow on Site: %s", werr), nil)
}
logger.Info().Str("Workflow ID", wid).Msg("completed synchronous applying online repair health override workflow")
} else {
if inst.Status != cdbm.InstanceStatusRepairing {
return cutil.NewAPIError(http.StatusBadRequest, fmt.Sprintf("Instance must be in Repairing state to exit online repair (current state: %s)", inst.Status), nil)
}

delete(instanceLabels, model.InstanceLabelOnlineRepairAllowAutoDeletion)
instanceLabels := maps.Clone(inst.Labels)
if instanceLabels == nil {
instanceLabels = map[string]string{}
}

_, err = iDAO.Update(ctx, orTx, cdbm.InstanceUpdateInput{
InstanceID: inst.ID,
Status: cdb.GetStrPtr(cdbm.InstanceStatusReady),
Labels: instanceLabels,
})
if err != nil {
logger.Error().Err(err).Msg("error updating Instance after clearing online repair in DB")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to update Instance after clearing online repair", nil)
}
delete(instanceLabels, model.InstanceLabelOnlineRepairAllowAutoDeletion)

rmReq, err := apiRequest.ToRemoveHealthReportOverrideProto(machine.ID)
if err != nil {
logger.Error().Err(err).Msg("failed to build remove online repair health override request")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to build remove online repair request", nil)
}
_, derr = iDAO.Update(ctx, orTx, cdbm.InstanceUpdateInput{
InstanceID: inst.ID,
Status: cdb.GetStrPtr(cdbm.InstanceStatusReady),
Labels: instanceLabels,
})
if derr != nil {
logger.Error().Err(derr).Msg("error updating Instance after clearing online repair in DB")
return cutil.NewAPIError(http.StatusInternalServerError, "Failed to update Instance after clearing online repair", nil)
}

wfOpts := temporalClient.StartWorkflowOptions{
ID: "site-clear-machine-online-repair-" + machine.ID,
WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout,
TaskQueue: queue.SiteTaskQueue,
}
rmReq, perr := apiRequest.ToRemoveHealthReportOverrideProto(machine.ID)
if perr != nil {
logger.Error().Err(perr).Msg("failed to build remove online repair health override request")
return cutil.NewAPIError(http.StatusInternalServerError, "Failed to build remove online repair request", nil)
}

wfCtx, wfCancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout)
defer wfCancel()
wfOpts := temporalClient.StartWorkflowOptions{
ID: "site-clear-machine-online-repair-" + machine.ID,
WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout,
TaskQueue: queue.SiteTaskQueue,
}

we, err := stc.ExecuteWorkflow(wfCtx, wfOpts, "DeleteMachineHealthReportOverride", rmReq)
if err != nil {
logger.Error().Err(err).Msg("failed to start Temporal workflow to clear online repair health override")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("Failed to start clear online repair workflow on Site: %s", err), nil)
}
wid := we.GetID()
logger.Info().Str("Workflow ID", wid).Msg("executed synchronous DeleteMachineHealthReportOverride workflow")
err = we.Get(wfCtx, nil)
if err != nil {
var timeoutErr *tp.TimeoutError
if errors.As(err, &timeoutErr) || err == context.DeadlineExceeded || wfCtx.Err() != nil || ctx.Err() != nil {
return common.TerminateWorkflowOnTimeOut(c, logger, stc, wid, err, "Machine", "DeleteMachineHealthReportOverride")
wfCtx, wfCancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout)
defer wfCancel()

we, wferr := stc.ExecuteWorkflow(wfCtx, wfOpts, "DeleteMachineHealthReportOverride", rmReq)
if wferr != nil {
logger.Error().Err(wferr).Msg("failed to start Temporal workflow to clear online repair health override")
return cutil.NewAPIError(http.StatusInternalServerError, fmt.Sprintf("Failed to start clear online repair workflow on Site: %s", wferr), nil)
}
wid := we.GetID()
logger.Info().Str("Workflow ID", wid).Msg("executed synchronous DeleteMachineHealthReportOverride workflow")
wferr = we.Get(wfCtx, nil)
if wferr != nil {
var timeoutErr *tp.TimeoutError
if errors.As(wferr, &timeoutErr) || wferr == context.DeadlineExceeded || wfCtx.Err() != nil || ctx.Err() != nil {
logger.Error().Err(wferr).Msg("failed to clear online repair health override, timeout occurred executing workflow on Site.")
timeoutCause := wferr // explicit capture; defensive against future refactors
timeoutResp = func() error {
return common.TerminateWorkflowOnTimeOut(c, logger, stc, wid, timeoutCause, "Machine", "DeleteMachineHealthReportOverride")
}
return cutil.NewAPIError(http.StatusInternalServerError, "Clear online repair workflow timed out", nil)
}
code, werr := common.UnwrapWorkflowError(wferr)
logger.Error().Err(werr).Msg("clear online repair health override workflow failed")
return cutil.NewAPIError(code, fmt.Sprintf("Failed to execute clear online repair workflow on Site: %s", werr), nil)
}
code, werr := common.UnwrapWorkflowError(err)
logger.Error().Err(werr).Msg("clear online repair health override workflow failed")
return cutil.NewAPIErrorResponse(c, code, fmt.Sprintf("Failed to execute clear online repair workflow on Site: %s", werr), nil)
logger.Info().Str("Workflow ID", wid).Msg("completed synchronous DeleteMachineHealthReportOverride workflow")
}
logger.Info().Str("Workflow ID", wid).Msg("completed synchronous DeleteMachineHealthReportOverride workflow")
}

err = orTx.Commit()
return nil
})
// Surface real tx-helper errors first so they aren't masked by the
// timeout response (commit/rollback failures wrap into something other
// than the cutil.APIError marker we returned for the timeout case).
if err != nil {
logger.Error().Err(err).Msg("error committing transaction")
return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to update Machine, DB transaction error", nil)
var apiErr *cutil.APIError
if !errors.As(err, &apiErr) {
return common.HandleTxError(c, logger, err, "Failed to update Machine, DB transaction error")
}
}
if timeoutResp != nil {
return timeoutResp()
}
if err != nil {
return common.HandleTxError(c, logger, err, "Failed to update Machine, DB transaction error")
}
orTxCommitted = true
um = machine
}

// Create response
Expand Down
Loading