Skip to content

Commit 9588d2f

Browse files
committed
OCTRL-678 Pass the error message to BKP in case of GO_ERROR
An error message is now propagated to kafka events just before scheduling a GO_ERROR transition.
1 parent d6ef344 commit 9588d2f

File tree

5 files changed

+80
-0
lines changed

5 files changed

+80
-0
lines changed

core/environment/environment.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,15 @@ func (env *Environment) QueryRoles(pathSpec string) (rs []workflow.Role) {
11721172
return
11731173
}
11741174

1175+
func (env *Environment) GetId() uid.ID {
1176+
if env == nil {
1177+
return ""
1178+
}
1179+
env.Mu.RLock()
1180+
defer env.Mu.RUnlock()
1181+
return env.id
1182+
}
1183+
11751184
func (env *Environment) GetPath() string {
11761185
return ""
11771186
}
@@ -1218,7 +1227,12 @@ func (env *Environment) subscribeToWfState(taskman *task.Manager) {
12181227
log.WithField("partition", env.id).
12191228
WithField("level", infologger.IL_Ops).
12201229
Error("one of the critical tasks went into ERROR state, transitioning the environment into ERROR")
1230+
1231+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1232+
NewEnvGoErrorEvent(env, newCriticalTasksErrorMessage(env)),
1233+
)
12211234
err := env.TryTransition(NewGoErrorTransition(taskman))
1235+
12221236
if err != nil {
12231237
if env.Sm.Current() == "ERROR" {
12241238
log.WithField("partition", env.id).
@@ -1451,6 +1465,11 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t
14511465
log.WithField("partition", env.id).
14521466
WithField("run", env.currentRunNumber).
14531467
Errorf("Scheduled auto stop transition failed: %s, Transitioning into ERROR", err.Error())
1468+
1469+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1470+
NewEnvGoErrorEvent(env, fmt.Sprintf("scheduled auto stop transition failed: %s", err.Error())),
1471+
)
1472+
14541473
err = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman))
14551474
if err != nil {
14561475
log.WithField("partition", env.id).

core/environment/manager.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
489489
WithError(err).
490490
Warnf("auto-transitioning environment failed %s, cleanup in progress", op)
491491

492+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
493+
NewEnvGoErrorEvent(env, fmt.Sprintf("%s failed: %v", op, err)),
494+
)
492495
err := env.TryTransition(NewGoErrorTransition(
493496
envs.taskman),
494497
)
@@ -593,6 +596,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
593596
WithField("level", infologger.IL_Devel).
594597
Error("environment deployment and configuration error, cleanup in progress")
595598

599+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
600+
NewEnvGoErrorEvent(env, fmt.Sprintf("deployment or configuration failed: %v", err)),
601+
)
596602
errTxErr := env.TryTransition(NewGoErrorTransition(
597603
envs.taskman),
598604
)
@@ -1053,6 +1059,9 @@ func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEve
10531059
}
10541060

10551061
if env.CurrentState() != "ERROR" {
1062+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1063+
NewEnvGoErrorEvent(env, "ODC partition went to ERROR during RUNNING"),
1064+
)
10561065
err = env.TryTransition(NewGoErrorTransition(envs.taskman))
10571066
if err != nil {
10581067
log.WithPrefix("scheduler").
@@ -1467,6 +1476,9 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
14671476
WithError(err).
14681477
Warnf("auto-transitioning environment failed %s, cleanup in progress", op)
14691478

1479+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
1480+
NewEnvGoErrorEvent(env, fmt.Sprintf("%s failed: %v", op, err)),
1481+
)
14701482
err := env.TryTransition(NewGoErrorTransition(
14711483
envs.taskman),
14721484
)

core/environment/utils.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
"encoding/json"
3030
"fmt"
3131
"github.com/AliceO2Group/Control/common/logger/infologger"
32+
pb "github.com/AliceO2Group/Control/common/protos"
33+
"github.com/AliceO2Group/Control/core/task"
34+
"github.com/AliceO2Group/Control/core/task/sm"
35+
"github.com/AliceO2Group/Control/core/workflow"
3236
"os"
3337
"sort"
3438

@@ -101,3 +105,37 @@ func sortMapToString(m map[string]string) string {
101105
}
102106
return b.String()
103107
}
108+
109+
func NewEnvGoErrorEvent(env *Environment, err string) *pb.Ev_EnvironmentEvent {
110+
return &pb.Ev_EnvironmentEvent{
111+
EnvironmentId: env.GetId().String(),
112+
State: env.Sm.Current(),
113+
RunNumber: env.GetCurrentRunNumber(),
114+
Error: err,
115+
Message: "a critical error occurred, GO_ERROR transition imminent",
116+
LastRequestUser: env.GetLastRequestUser(),
117+
WorkflowTemplateInfo: env.GetWorkflowInfo(),
118+
}
119+
}
120+
121+
func newCriticalTasksErrorMessage(env *Environment) string {
122+
criticalTasksInError := env.workflow.GetTasks().Filtered(func(t *task.Task) bool {
123+
return t.GetTraits().Critical && t.GetState() == sm.ERROR
124+
})
125+
126+
if len(criticalTasksInError) == 0 {
127+
return "no critical tasks in ERROR"
128+
} else if len(criticalTasksInError) == 1 {
129+
t := criticalTasksInError[0]
130+
name := t.GetName()
131+
132+
// if available, we prefer role name, because it does not have a long hash for JIT-generated DPL tasks
133+
role, ok := t.GetParentRole().(workflow.Role)
134+
if ok {
135+
name = role.GetName()
136+
}
137+
return fmt.Sprintf("critical task '%s' on host '%s' transitioned to ERROR", name, t.GetHostname())
138+
} else {
139+
return fmt.Sprintf("%d critical tasks transitioned to ERROR, could not determine the first one to fail", len(criticalTasksInError))
140+
}
141+
}

core/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package core
2828

2929
import (
3030
"encoding/json"
31+
"fmt"
3132
"maps"
3233
"runtime"
3334
"sort"
@@ -646,6 +647,9 @@ func (m *RpcServer) ControlEnvironment(cxt context.Context, req *pb.ControlEnvir
646647
WithField("level", infologger.IL_Ops).
647648
WithError(err).
648649
Errorf("transition '%s' failed, transitioning into ERROR.", req.GetType().String())
650+
the.EventWriterWithTopic(topic.Environment).WriteEvent(
651+
environment.NewEnvGoErrorEvent(env, fmt.Sprintf("transition %s failed: %v", req.GetType().String(), err)),
652+
)
649653
err = env.TryTransition(environment.NewGoErrorTransition(m.state.taskman))
650654
if err != nil {
651655
log.WithField("partition", env.Id()).Warnf("could not complete requested GO_ERROR transition, forcing move to ERROR: %s", err.Error())

core/task/task.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ func (t *Task) SetSafeToStop(done bool) {
151151
t.safeToStop = done
152152
}
153153

154+
func (t *Task) GetState() sm.State {
155+
t.mu.Lock()
156+
defer t.mu.Unlock()
157+
158+
return t.state
159+
}
160+
154161
func (t *Task) GetParentRole() interface{} {
155162
t.mu.RLock()
156163
defer t.mu.RUnlock()

0 commit comments

Comments
 (0)