diff --git a/core/services/workflows/v2/engine_execution_concurrency_test.go b/core/services/workflows/v2/engine_execution_concurrency_test.go new file mode 100644 index 00000000000..4c86289b513 --- /dev/null +++ b/core/services/workflows/v2/engine_execution_concurrency_test.go @@ -0,0 +1,116 @@ +package v2_test + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" + modulemocks "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host/mocks" + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + + regmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + capmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" + v2 "github.com/smartcontractkit/chainlink/v2/core/services/workflows/v2" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" +) + +// TestEngine_ExecutionConcurrencySerializesOverlappingRuns proves that when PerWorkflow +// ExecutionConcurrencyLimit is 1, a second trigger cannot start Module.Execute until the first +// run completes (executionsSemaphore.Wait blocks in handleAllTriggerEvents). +func TestEngine_ExecutionConcurrencySerializesOverlappingRuns(t *testing.T) { + t.Parallel() + + continueFirst := make(chan struct{}) + var execRunCount atomic.Int32 + + module := modulemocks.NewModuleV2(t) + module.EXPECT().Start().Once() + module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).Return(newTriggerSubs(1), nil).Once() + module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).Run( + func(_ context.Context, _ *sdkpb.ExecuteRequest, _ host.ExecutionHelper) { + n := execRunCount.Add(1) + if n == 1 { + <-continueFirst + } + }).Return(nil, nil).Times(2) + module.EXPECT().Close().Once() + + capreg := regmocks.NewCapabilitiesRegistry(t) + capreg.EXPECT().LocalNode(matches.AnyContext).Return(newNode(t), nil).Once() + + initDoneCh := make(chan error, 1) + subscribedToTriggersCh := make(chan []string, 1) + executionFinishedCh := make(chan string, 2) + + cfg := defaultTestConfig(t, func(cfg *cresettings.Workflows) { + cfg.ExecutionConcurrencyLimit.DefaultValue = 1 + }) + cfg.Module = module + cfg.CapRegistry = capreg + cfg.BillingClient = setupMockBillingClient(t) + cfg.Hooks = v2.LifecycleHooks{ + OnInitialized: func(err error) { + initDoneCh <- err + }, + OnSubscribedToTriggers: func(triggerIDs []string) { + subscribedToTriggersCh <- triggerIDs + }, + OnExecutionFinished: func(executionID string, _ string) { + executionFinishedCh <- executionID + }, + } + + engine, err := v2.NewEngine(cfg) + require.NoError(t, err) + + trigger := capmocks.NewTriggerCapability(t) + capreg.EXPECT().GetTrigger(matches.AnyContext, "id_0").Return(trigger, nil).Once() + eventCh := make(chan capabilities.TriggerResponse) + trigger.EXPECT().RegisterTrigger(matches.AnyContext, mock.Anything).Return(eventCh, nil).Once() + trigger.EXPECT().UnregisterTrigger(matches.AnyContext, mock.Anything).Return(nil).Once() + trigger.EXPECT().AckEvent(matches.AnyContext, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + require.NoError(t, engine.Start(t.Context())) + require.NoError(t, <-initDoneCh) + require.Equal(t, []string{"id_0"}, <-subscribedToTriggersCh) + + eventCh <- capabilities.TriggerResponse{ + Event: capabilities.TriggerEvent{ + TriggerType: "basic-trigger@1.0.0", + ID: "event_concurrency_1", + Payload: nil, + }, + } + + require.Eventually(t, func() bool { return execRunCount.Load() == 1 }, 2*time.Second, 5*time.Millisecond, + "first execution should start") + + eventCh <- capabilities.TriggerResponse{ + Event: capabilities.TriggerEvent{ + TriggerType: "basic-trigger@1.0.0", + ID: "event_concurrency_2", + Payload: nil, + }, + } + + time.Sleep(100 * time.Millisecond) + require.Equal(t, int32(1), execRunCount.Load(), + "second execution must not start while the first holds the executions semaphore") + + continueFirst <- struct{}{} + + require.Eventually(t, func() bool { return execRunCount.Load() == 2 }, 2*time.Second, 5*time.Millisecond, + "second execution should start after the first completes") + + <-executionFinishedCh + <-executionFinishedCh + + require.NoError(t, engine.Close()) +}