From 667237e3c12d44b91a7dbfeac1afa95fffad4f83 Mon Sep 17 00:00:00 2001 From: Chet Nichols III Date: Tue, 19 May 2026 17:46:44 -0700 Subject: [PATCH] refactor: Migrate VPC Peering API handlers to WithTx Continues the `WithTx` migration, covering both `BeginTx` sites in `vpcpeering.go` (`Create` and `Delete`). Both handlers trigger workflows and wait on `we.Get`, so they're also adopting our little `timeoutResp` pattern that has evolved over a few of these PRs. Callouts are: - `Create` and `Delete` both use `WithTx` + `timeoutResp`. `Create`'s single `vpcPeering` output would fit `WithTxResult` shape-wise, but mixing that with the outer-scope `timeoutResp` closure isn't a great fit. - Validation reads (site, VPCs, tenant accounts, tenant sites, existing peerings) were already outside the legacy tx and stay there. - Errors switched to `NewAPIError`. Signed-off-by: Chet Nichols III --- api/pkg/api/handler/vpcpeering.go | 359 ++++++++++++++++-------------- 1 file changed, 191 insertions(+), 168 deletions(-) diff --git a/api/pkg/api/handler/vpcpeering.go b/api/pkg/api/handler/vpcpeering.go index 0f01e7672..4dae2dc23 100644 --- a/api/pkg/api/handler/vpcpeering.go +++ b/api/pkg/api/handler/vpcpeering.go @@ -19,7 +19,6 @@ package handler import ( "context" - "database/sql" "encoding/json" "errors" "fmt" @@ -275,128 +274,143 @@ func (cvph CreateVpcPeeringHandler) Handle(c echo.Context) error { tenantID = &tenant.ID } - // Start a db tx - tx, err := cdb.BeginTx(ctx, cvph.dbSession, &sql.TxOptions{}) - if err != nil { - logger.Error().Err(err).Msg("unable to start transaction") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to create VPC Peering, DB transaction error", nil) - } - - // If false, a rollback will be triggered on any early return. - // If all goes well, we'll set it to true later on. - txCommitted := false - defer common.RollbackTx(ctx, tx, &txCommitted) - - // Create the VPC Peering in db - vpcPeering, err := vpcPeeringDAO.Create( - ctx, - tx, - cdbm.VpcPeeringCreateInput{ - Vpc1ID: vpc1ID, - Vpc2ID: vpc2ID, - SiteID: site.ID, - IsMultiTenant: isMultiTenant, - InfrastructureProviderID: infrastructureProviderID, - TenantID: tenantID, - CreatedByID: dbUser.ID, - }, - ) - if err != nil { - logger.Error().Err(err).Msg("error creating VPC Peering record in DB") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to create VPC Peering, DB error", nil) - } - - // Create a status detail record for the VPC Peering sdDAO := cdbm.NewStatusDetailDAO(cvph.dbSession) - statusDetail, serr := sdDAO.CreateFromParams(ctx, tx, vpcPeering.ID.String(), - *cdb.GetStrPtr(cdbm.VpcPeeringStatusPending), - cdb.GetStrPtr("Received VPC Peering creation request, pending processing")) - if serr != nil { - logger.Error().Err(serr).Msg("error creating status detail for VPC Peering") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to create Status Detail for VPC Peering", nil) - } - if statusDetail == nil { - logger.Error().Msg("Status Detail DB entry not returned from CreateFromParams") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to get new Status Detail for VPC Peering", nil) - } - // Create the peering directly in NICo via site agent - err = vpcPeeringDAO.UpdateStatusByID(ctx, tx, vpcPeering.ID, cdbm.VpcPeeringStatusConfiguring) - if err != nil { - logger.Error().Err(err).Msg("error updating VPC Peering status to Configuring") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to update VPC Peering status to Configuring", nil) - } + // vpcPeering is populated inside the closure and consumed by the + // best-effort post-commit status update and the response payload. + var vpcPeering *cdbm.VpcPeering + + // timeoutResp lets the closure signal a post-rollback handler — the + // TerminateWorkflow call has to run after the closure returns so that + // the DB tx unwinds before we make the second remote call. nil means + // no timeout occurred and the normal flow continues. + var timeoutResp func() error + err = cdb.WithTx(ctx, cvph.dbSession, func(tx *cdb.Tx) error { + // Create the VPC Peering in db + createdVpcPeering, derr := vpcPeeringDAO.Create( + ctx, + tx, + cdbm.VpcPeeringCreateInput{ + Vpc1ID: vpc1ID, + Vpc2ID: vpc2ID, + SiteID: site.ID, + IsMultiTenant: isMultiTenant, + InfrastructureProviderID: infrastructureProviderID, + TenantID: tenantID, + CreatedByID: dbUser.ID, + }, + ) + if derr != nil { + logger.Error().Err(derr).Msg("error creating VPC Peering record in DB") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to create VPC Peering, DB error", nil) + } + vpcPeering = createdVpcPeering + + // Create a status detail record for the VPC Peering + statusDetail, derr := sdDAO.CreateFromParams(ctx, tx, vpcPeering.ID.String(), + *cdb.GetStrPtr(cdbm.VpcPeeringStatusPending), + cdb.GetStrPtr("Received VPC Peering creation request, pending processing")) + if derr != nil { + logger.Error().Err(derr).Msg("error creating status detail for VPC Peering") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to create Status Detail for VPC Peering", nil) + } + if statusDetail == nil { + logger.Error().Msg("Status Detail DB entry not returned from CreateFromParams") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to get new Status Detail for VPC Peering", nil) + } - // Create the VPC Peering creation request - createVpcPeeringRequest := &cwssaws.VpcPeeringCreationRequest{ - VpcId: &cwssaws.VpcId{Value: vpcPeering.Vpc1ID.String()}, - PeerVpcId: &cwssaws.VpcId{Value: vpcPeering.Vpc2ID.String()}, - Id: &cwssaws.VpcPeeringId{Value: vpcPeering.ID.String()}, - } + // Create the peering directly in NICo via site agent + derr = vpcPeeringDAO.UpdateStatusByID(ctx, tx, vpcPeering.ID, cdbm.VpcPeeringStatusConfiguring) + if derr != nil { + logger.Error().Err(derr).Msg("error updating VPC Peering status to Configuring") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to update VPC Peering status to Configuring", nil) + } - logger.Info().Msg("triggering VPC Peering create workflow on Site") + // Create the VPC Peering creation request + createVpcPeeringRequest := &cwssaws.VpcPeeringCreationRequest{ + VpcId: &cwssaws.VpcId{Value: vpcPeering.Vpc1ID.String()}, + PeerVpcId: &cwssaws.VpcId{Value: vpcPeering.Vpc2ID.String()}, + Id: &cwssaws.VpcPeeringId{Value: vpcPeering.ID.String()}, + } - // Create workflow options - workflowOptions := tclient.StartWorkflowOptions{ - ID: "vpcpeering-create-" + vpcPeering.ID.String(), - WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout, - TaskQueue: queue.SiteTaskQueue, - } + logger.Info().Msg("triggering VPC Peering create workflow on Site") - // Get the temporal client for the site we are working with - stc, err := cvph.scp.GetClientByID(vpcPeering.SiteID) - if err != nil { - logger.Error().Err(err).Msg("failed to retrieve Temporal client for Site") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to retrieve Temporal client for Site", nil) - } + // Create workflow options + workflowOptions := tclient.StartWorkflowOptions{ + ID: "vpcpeering-create-" + vpcPeering.ID.String(), + WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout, + TaskQueue: queue.SiteTaskQueue, + } - // Add context deadline - workflowCtx, cancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout) - defer cancel() + // Get the temporal client for the site we are working with + stc, derr := cvph.scp.GetClientByID(vpcPeering.SiteID) + if derr != nil { + logger.Error().Err(derr).Msg("failed to retrieve Temporal client for Site") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to retrieve Temporal client for Site", nil) + } - // Trigger Site workflow - workflowRun, err := stc.ExecuteWorkflow(workflowCtx, workflowOptions, "CreateVpcPeering", createVpcPeeringRequest) - if err != nil { - logger.Error().Err(err).Msg("failed to start VPC Peering creation workflow") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to start VPC Peering creation workflow", nil) - } + // Add context deadline + workflowCtx, cancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout) + defer cancel() - workflowId := workflowRun.GetID() + // Trigger Site workflow + workflowRun, derr := stc.ExecuteWorkflow(workflowCtx, workflowOptions, "CreateVpcPeering", createVpcPeeringRequest) + if derr != nil { + logger.Error().Err(derr).Msg("failed to start VPC Peering creation workflow") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to start VPC Peering creation workflow", nil) + } - logger.Info().Str("Workflow ID", workflowId).Msg("started VPC Peering creation workflow") + workflowId := workflowRun.GetID() - // Wait for workflow completion synchronously - err = workflowRun.Get(workflowCtx, nil) - if err != nil { - var applicationErr *tp.ApplicationError - if errors.As(err, &applicationErr) && slices.Contains(swe.UnimplementedOrDeniedErrTypes(), applicationErr.Type()) { - logger.Error().Msg("feature not yet implemented on target Site") - return cutil.NewAPIErrorResponse(c, http.StatusNotImplemented, fmt.Sprintf("Feature not yet implemented on target Site: %s", err), nil) - } + logger.Info().Str("Workflow ID", workflowId).Msg("started VPC Peering creation workflow") - var timeoutErr *tp.TimeoutError - if errors.As(err, &timeoutErr) || err == context.DeadlineExceeded || ctx.Err() != nil { - return common.TerminateWorkflowOnTimeOut(c, logger, stc, workflowId, err, "VpcPeering", "CreateVpcPeering") + // Wait for workflow completion synchronously + wferr := workflowRun.Get(workflowCtx, nil) + if wferr != nil { + var applicationErr *tp.ApplicationError + if errors.As(wferr, &applicationErr) && slices.Contains(swe.UnimplementedOrDeniedErrTypes(), applicationErr.Type()) { + logger.Error().Msg("feature not yet implemented on target Site") + return cutil.NewAPIError(http.StatusNotImplemented, fmt.Sprintf("Feature not yet implemented on target Site: %s", wferr), nil) + } + + var timeoutErr *tp.TimeoutError + if errors.As(wferr, &timeoutErr) || wferr == context.DeadlineExceeded || workflowCtx.Err() != nil { + logger.Error().Err(wferr).Msg("failed to create VPC Peering, timeout occurred executing workflow on Site.") + timeoutCause := wferr + timeoutResp = func() error { + return common.TerminateWorkflowOnTimeOut(c, logger, stc, workflowId, timeoutCause, "VpcPeering", "CreateVpcPeering") + } + return cutil.NewAPIError(http.StatusInternalServerError, "VPC Peering create workflow timed out", nil) + } + + logger.Error().Err(wferr).Msg("failed to synchronously execute Temporal workflow to update CreateVpcPeering") + return cutil.NewAPIError(http.StatusInternalServerError, fmt.Sprintf("Failed to execute sync workflow to create VPC Peering on Site: %s", wferr), nil) } - logger.Error().Err(err).Msg("failed to synchronously execute Temporal workflow to update CreateVpcPeering") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("Failed to execute sync workflow to create VPC Peering on Site: %s", err), nil) + return nil + }) + // Surface real tx-helper errors first so they aren't masked by the + // timeout response (commit/rollback failures wrap into something other + // than the cutil.APIError marker we returned for the timeout case). + if err != nil { + var apiErr *cutil.APIError + if !errors.As(err, &apiErr) { + return common.HandleTxError(c, logger, err, "Failed to create VPC Peering, DB transaction error") + } + } + if timeoutResp != nil { + return timeoutResp() } - - // Commit the transaction - err = tx.Commit() if err != nil { - logger.Error().Err(err).Msg("error committing transaction") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to create VPC Peering", nil) + return common.HandleTxError(c, logger, err, "Failed to create VPC Peering, DB transaction error") } - txCommitted = true // Best effort post-commit update: workflow completed, so mark peering as Ready. // This is intentionally outside of the transaction so create does not fail if this update fails. status := cdbm.VpcPeeringStatusConfiguring - err = vpcPeeringDAO.UpdateStatusByID(ctx, nil, vpcPeering.ID, cdbm.VpcPeeringStatusReady) - if err != nil { - logger.Warn().Err(err).Msg("best-effort update to Ready status failed after workflow completion") + uerr := vpcPeeringDAO.UpdateStatusByID(ctx, nil, vpcPeering.ID, cdbm.VpcPeeringStatusReady) + if uerr != nil { + logger.Warn().Err(uerr).Msg("best-effort update to Ready status failed after workflow completion") } else { status = cdbm.VpcPeeringStatusReady } @@ -855,91 +869,100 @@ func (dvph DeleteVpcPeeringHandler) Handle(c echo.Context) error { return cutil.NewAPIErrorResponse(c, http.StatusForbidden, "User does not have access to delete the VPC Peering", nil) } - // Start a db tx for the deletion workflow - tx, err := cdb.BeginTx(ctx, dvph.dbSession, &sql.TxOptions{}) - if err != nil { - logger.Error().Err(err).Msg("unable to start transaction") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to delete VPC Peering", nil) - } + // timeoutResp lets the closure signal a post-rollback handler — the + // TerminateWorkflow call has to run after the closure returns so that + // the DB tx unwinds before we make the second remote call. nil means + // no timeout occurred and the normal flow continues. + var timeoutResp func() error + err = cdb.WithTx(ctx, dvph.dbSession, func(tx *cdb.Tx) error { + // Update status to Deleting first + derr := vpcPeeringDAO.UpdateStatusByID(ctx, tx, vpcPeering.ID, cdbm.VpcPeeringStatusDeleting) + if derr != nil { + logger.Error().Err(derr).Msg("error updating VPC Peering status to Deleting") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to update VPC Peering status to Deleting", nil) + } - // If false, a rollback will be trigger on any early return. - // If all goes well, we'll set it to true later on. - txCommitted := false - defer common.RollbackTx(ctx, tx, &txCommitted) + // Create the VPC Peering deletion request + deleteVpcPeeringRequest := &cwssaws.VpcPeeringDeletionRequest{ + Id: &cwssaws.VpcPeeringId{Value: vpcPeering.ID.String()}, + } - // Update status to Deleting first - err = vpcPeeringDAO.UpdateStatusByID(ctx, tx, vpcPeering.ID, cdbm.VpcPeeringStatusDeleting) - if err != nil { - logger.Error().Err(err).Msg("error updating VPC Peering status to Deleting") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to update VPC Peering status to Deleting", nil) - } + // Get the site temporal client + stc, derr := dvph.scp.GetClientByID(vpcPeering.SiteID) + if derr != nil { + logger.Error().Err(derr).Msg("failed to retrieve Temporal client for Site") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to retrieve Temporal client for Site", nil) + } - // Create the VPC Peering deletion request - deleteVpcPeeringRequest := &cwssaws.VpcPeeringDeletionRequest{ - Id: &cwssaws.VpcPeeringId{Value: vpcPeering.ID.String()}, - } + // Setup workflow options + workflowOptions := tclient.StartWorkflowOptions{ + ID: "vpcpeering-delete-" + vpcPeering.ID.String(), + WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout, + TaskQueue: queue.SiteTaskQueue, + } - // Get the site temporal client - stc, err := dvph.scp.GetClientByID(vpcPeering.SiteID) - if err != nil { - logger.Error().Err(err).Msg("failed to retrieve Temporal client for Site") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to retrieve Temporal client for Site", nil) - } + logger.Info().Msg("triggering VPC Peering delete workflow") - // Setup workflow options - workflowOptions := tclient.StartWorkflowOptions{ - ID: "vpcpeering-delete-" + vpcPeering.ID.String(), - WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout, - TaskQueue: queue.SiteTaskQueue, - } + // Add context deadline + workflowCtx, cancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout) + defer cancel() - logger.Info().Msg("triggering VPC Peering delete workflow") + // Trigger site workflow to delete VPC Peering + we, derr := stc.ExecuteWorkflow(workflowCtx, workflowOptions, "DeleteVpcPeering", deleteVpcPeeringRequest) + if derr != nil { + logger.Error().Err(derr).Msg("failed to start VPC Peering deletion workflow") + return cutil.NewAPIError(http.StatusInternalServerError, "Failed to start VPC Peering deletion workflow", nil) + } - // Add context deadline - workflowCtx, cancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout) - defer cancel() + wid := we.GetID() + logger.Info().Str("Workflow ID", wid).Msg("started VPC Peering deletion workflow") - // Trigger site workflow to delete VPC Peering - we, err := stc.ExecuteWorkflow(workflowCtx, workflowOptions, "DeleteVpcPeering", deleteVpcPeeringRequest) - if err != nil { - logger.Error().Err(err).Msg("failed to start VPC Peering deletion workflow") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to start VPC Peering deletion workflow", nil) - } + // Wait for workflow completion synchronously + wferr := we.Get(workflowCtx, nil) + if wferr != nil { + var applicationErr *tp.ApplicationError + if errors.As(wferr, &applicationErr) && slices.Contains(swe.UnimplementedOrDeniedErrTypes(), applicationErr.Type()) { + logger.Error().Msg("feature not yet implemented on target Site") + return cutil.NewAPIError(http.StatusNotImplemented, fmt.Sprintf("Feature not yet implemented on target Site: %s", wferr), nil) + } - wid := we.GetID() - logger.Info().Str("Workflow ID", wid).Msg("started VPC Peering deletion workflow") + var timeoutErr *tp.TimeoutError + if errors.As(wferr, &timeoutErr) || wferr == context.DeadlineExceeded || workflowCtx.Err() != nil { + logger.Error().Err(wferr).Msg("failed to delete VPC Peering, timeout occurred executing workflow on Site.") + timeoutCause := wferr + timeoutResp = func() error { + return common.TerminateWorkflowOnTimeOut(c, logger, stc, wid, timeoutCause, "VpcPeering", "DeleteVpcPeering") + } + return cutil.NewAPIError(http.StatusInternalServerError, "VPC Peering delete workflow timed out", nil) + } - // Wait for workflow completion synchronously - err = we.Get(workflowCtx, nil) - if err != nil { - var applicationErr *tp.ApplicationError - if errors.As(err, &applicationErr) && slices.Contains(swe.UnimplementedOrDeniedErrTypes(), applicationErr.Type()) { - logger.Error().Msg("feature not yet implemented on target Site") - return cutil.NewAPIErrorResponse(c, http.StatusNotImplemented, fmt.Sprintf("Feature not yet implemented on target Site: %s", err), nil) + logger.Error().Err(wferr).Msg("failed to synchronously execute Temporal workflow to delete VPC Peering") + return cutil.NewAPIError(http.StatusInternalServerError, fmt.Sprintf("Failed to execute sync workflow to delete VPC Peering on Site: %s", wferr), nil) } - var timeoutErr *tp.TimeoutError - if errors.As(err, &timeoutErr) || err == context.DeadlineExceeded || ctx.Err() != nil { - return common.TerminateWorkflowOnTimeOut(c, logger, stc, wid, err, "VpcPeering", "DeleteVpcPeering") + return nil + }) + // Surface real tx-helper errors first so they aren't masked by the + // timeout response (commit/rollback failures wrap into something other + // than the cutil.APIError marker we returned for the timeout case). + if err != nil { + var apiErr *cutil.APIError + if !errors.As(err, &apiErr) { + return common.HandleTxError(c, logger, err, "Failed to delete VPC Peering, DB transaction error") } - - logger.Error().Err(err).Msg("failed to synchronously execute Temporal workflow to delete VPC Peering") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("Failed to execute sync workflow to delete VPC Peering on Site: %s", err), nil) } - - // Commit the transaction - err = tx.Commit() + if timeoutResp != nil { + return timeoutResp() + } if err != nil { - logger.Error().Err(err).Msg("error committing transaction") - return cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to delete VPC Peering", nil) + return common.HandleTxError(c, logger, err, "Failed to delete VPC Peering, DB transaction error") } - txCommitted = true // Best effort post-commit cleanup: remove VPC Peering from DB. // This is intentionally outside of the transaction so delete does not fail if this cleanup fails. - err = vpcPeeringDAO.Delete(ctx, nil, vpcPeering.ID) - if err != nil { - logger.Warn().Err(err).Msg("best-effort delete of VPC Peering from DB failed after workflow completion") + derr := vpcPeeringDAO.Delete(ctx, nil, vpcPeering.ID) + if derr != nil { + logger.Warn().Err(derr).Msg("best-effort delete of VPC Peering from DB failed after workflow completion") } logger.Info().Msg("finishing API handler")