From 9af23e149d7b65d113b7ba0575e0e13f19945b16 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 15:52:11 +0200 Subject: [PATCH 1/4] Add more combinators tests, add tests for signals --- .../sdktesting/contracts/TestUtilsService.kt | 22 ++ .../VirtualObjectCommandInterpreter.kt | 32 ++ .../restate/sdktesting/junit/TestSuites.kt | 11 +- .../restate/sdktesting/tests/Combinators.kt | 277 +++++++++++++++++- .../dev/restate/sdktesting/tests/Signals.kt | 92 ++++++ 5 files changed, 428 insertions(+), 6 deletions(-) create mode 100644 sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Signals.kt diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt index 3d6efa8f..0403cef5 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt @@ -9,11 +9,27 @@ package dev.restate.sdktesting.contracts import dev.restate.sdk.annotation.* +import kotlinx.serialization.Serializable /** Collection of various utilities/corner cases scenarios used by tests */ @Service @Name("TestUtilsService") interface TestUtilsService { + + @Serializable + data class ResolveSignalRequest( + val invocationId: String, + val signalName: String, + val value: String + ) + + @Serializable + data class RejectSignalRequest( + val invocationId: String, + val signalName: String, + val reason: String + ) + /** Just echo */ @Handler suspend fun echo(input: String): String @@ -40,4 +56,10 @@ interface TestUtilsService { /** Cancel invocation using the context. */ @Handler suspend fun cancelInvocation(invocationId: String) + + /** Resolve a named signal on the given invocation. */ + @Handler suspend fun resolveSignal(req: ResolveSignalRequest) + + /** Reject a named signal on the given invocation. */ + @Handler suspend fun rejectSignal(req: RejectSignalRequest) } diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/VirtualObjectCommandInterpreter.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/VirtualObjectCommandInterpreter.kt index a3b21adf..ae41fa2e 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/VirtualObjectCommandInterpreter.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/VirtualObjectCommandInterpreter.kt @@ -26,11 +26,23 @@ interface VirtualObjectCommandInterpreter { // This is serialized as `{"type": "sleep", ...}` @Serializable @SerialName("sleep") data class Sleep(val timeoutMillis: Long) : AwaitableCommand + // This is serialized as `{"type": "runReturns", ...}` + // When implementing this, make sure that the run executes some actual work, especially in async + // world (e.g. in ts, something as simple as setTimeout(1) is enough...) + @Serializable + @SerialName("runReturns") + data class RunReturns(val value: String) : AwaitableCommand + // This is serialized as `{"type": "runThrowTerminalException", ...}` @Serializable @SerialName("runThrowTerminalException") data class RunThrowTerminalException(val reason: String) : AwaitableCommand + // This is serialized as `{"type": "createSignal", ...}` + @Serializable + @SerialName("createSignal") + data class CreateSignal(val signalName: String) : AwaitableCommand + @Serializable sealed interface Command // Returns the index of the one that completed first successfully @@ -54,6 +66,26 @@ interface VirtualObjectCommandInterpreter { @SerialName("awaitAwakeableOrTimeout") data class AwaitAwakeableOrTimeout(val awakeableKey: String, val timeoutMillis: Long) : Command + // Awaits all commands; returns first succeeded or throws if all failed + @Serializable + @SerialName("awaitFirstSucceededOrAllFailed") + data class AwaitFirstSucceededOrAllFailed(val commands: List) : Command + + // Awaits all commands; returns first completed (throwing if it failed) + @Serializable + @SerialName("awaitFirstCompleted") + data class AwaitFirstCompleted(val commands: List) : Command + + // Awaits all commands; returns pipe-joined values if all succeed, throws on first failure + @Serializable + @SerialName("awaitAllSucceededOrFirstFailed") + data class AwaitAllSucceededOrFirstFailed(val commands: List) : Command + + // Awaits all commands; returns pipe-joined "ok:" or "err:" for each result + @Serializable + @SerialName("awaitAllCompleted") + data class AwaitAllCompleted(val commands: List) : Command + @Serializable data class InterpretRequest(val commands: List) @Serializable diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index d0f9ed46..eefbaddb 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -20,6 +20,7 @@ import dev.restate.sdktesting.tests.ProxyRequestSigning import dev.restate.sdktesting.tests.RunFlush import dev.restate.sdktesting.tests.RunRetry import dev.restate.sdktesting.tests.ServiceToServiceCommunication +import dev.restate.sdktesting.tests.Signals import dev.restate.sdktesting.tests.Sleep import dev.restate.sdktesting.tests.SleepWithFailures import dev.restate.sdktesting.tests.State @@ -52,7 +53,7 @@ object TestSuites : SuiteProvider { clazz(), clazz(), clazz(), - )) + clazz())) val THREE_NODES_SUITE = TestSuite( @@ -74,7 +75,7 @@ object TestSuites : SuiteProvider { clazz(), clazz(), clazz(), - ), + clazz()), 3) private val ALWAYS_SUSPENDING_SUITE = @@ -95,7 +96,7 @@ object TestSuites : SuiteProvider { clazz(), clazz(), clazz(), - )) + clazz())) private val THREE_NODES_ALWAYS_SUSPENDING_SUITE = TestSuite( @@ -115,7 +116,7 @@ object TestSuites : SuiteProvider { clazz(), clazz(), clazz(), - ), + clazz()), 3) private val SINGLE_THREAD_SINGLE_PARTITION_SUITE = @@ -141,7 +142,7 @@ object TestSuites : SuiteProvider { clazz(), clazz(), clazz(), - )) + clazz())) private val LAZY_STATE_SUITE = TestSuite( diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt index 526c34ed..bbc7f141 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt @@ -10,14 +10,23 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client import dev.restate.client.kotlin.* +import dev.restate.sdktesting.contracts.TestUtilsService +import dev.restate.sdktesting.contracts.TestUtilsService.RejectSignalRequest +import dev.restate.sdktesting.contracts.TestUtilsService.ResolveSignalRequest import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitAllCompleted +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitAllSucceededOrFirstFailed import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitAny import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitAnySuccessful import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitAwakeableOrTimeout +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitFirstCompleted +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitFirstSucceededOrAllFailed import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.CreateAwakeable +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.CreateSignal import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.InterpretRequest import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.RejectAwakeable import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.ResolveAwakeable +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.RunReturns import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.RunThrowTerminalException import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.Sleep import dev.restate.sdktesting.infra.* @@ -38,7 +47,8 @@ class Combinators { @RegisterExtension val deployerExt: RestateDeployerExtension = RestateDeployerExtension { withServiceSpec( - ServiceSpec.defaultBuilder().withServices(VirtualObjectCommandInterpreter::class)) + ServiceSpec.defaultBuilder() + .withServices(VirtualObjectCommandInterpreter::class, TestUtilsService::class)) } } @@ -161,4 +171,269 @@ class Combinators { assertThat(result.await()).isEqualTo("awk1-result") } + + @Test + @DisplayName("Signals: await first successful or all failed") + @Execution(ExecutionMode.CONCURRENT) + fun signalFirstSuccessfulOrAllFailed(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitAnySuccessful( + listOf( + CreateSignal("sig0"), + CreateSignal("sig1"), + CreateSignal("sig2")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { rejectSignal(RejectSignalRequest(invocationId, "sig0", "fail0")) } + .options(idempotentCallOptions) + .call() + utilsClient + .request { rejectSignal(RejectSignalRequest(invocationId, "sig2", "fail2")) } + .options(idempotentCallOptions) + .call() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig1", "success1")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("success1") + } + + @Test + @DisplayName("Signals: await all successful or first failed") + @Execution(ExecutionMode.CONCURRENT) + fun signalAllSuccessfulOrFirstFailed(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitAllSucceededOrFirstFailed( + listOf( + CreateSignal("sig0"), + CreateSignal("sig1"), + CreateSignal("sig2")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig0", "val0")) } + .options(idempotentCallOptions) + .call() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig1", "val1")) } + .options(idempotentCallOptions) + .call() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig2", "val2")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("val0|val1|val2") + } + + @Test + @DisplayName("Signals: await all completed") + @Execution(ExecutionMode.CONCURRENT) + fun signalAllCompleted(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitAllCompleted( + listOf( + CreateSignal("sig0"), + CreateSignal("sig1"), + CreateSignal("sig2")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig0", "val0")) } + .options(idempotentCallOptions) + .call() + utilsClient + .request { rejectSignal(RejectSignalRequest(invocationId, "sig1", "err1")) } + .options(idempotentCallOptions) + .call() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig2", "val2")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("ok:val0|err:err1|ok:val2") + } + + @Test + @DisplayName("Signals: await first completed") + @Execution(ExecutionMode.CONCURRENT) + fun signalFirstCompleted(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitAny( + listOf( + CreateSignal("sig0"), + CreateSignal("sig1"), + CreateSignal("sig2")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig1", "first")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("first") + } + + @Test + @DisplayName("Mixed: run wins the race against an unresolved signal") + @Execution(ExecutionMode.CONCURRENT) + fun mixedRunWinsRaceAgainstSignal(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + + // RunReturns completes immediately; the signal is never resolved so it can't win. + assertThat( + ingressClient + .toVirtualObject(testId) + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitFirstCompleted( + listOf(CreateSignal("sig"), RunReturns("runval")))))) + } + .options(idempotentCallOptions) + .call() + .response) + .isEqualTo("runval") + } + + @Test + @DisplayName("Mixed: failed run is skipped, signal succeeds in AwaitFirstSucceededOrAllFailed") + @Execution(ExecutionMode.CONCURRENT) + fun mixedSignalWinsAfterRunFails(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitFirstSucceededOrAllFailed( + listOf(RunThrowTerminalException("fail"), CreateSignal("sig")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig", "sigval")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("sigval") + } + + @Test + @DisplayName("Mixed: AwaitAllSucceededOrFirstFailed with run and signal both succeed") + @Execution(ExecutionMode.CONCURRENT) + fun mixedAllSucceededRunAndSignal(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitAllSucceededOrFirstFailed( + listOf(RunReturns("runval"), CreateSignal("sig")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig", "sigval")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("runval|sigval") + } + + @Test + @DisplayName("Mixed: AwaitAllCompleted with run (ok), signal (ok), and failed run (err)") + @Execution(ExecutionMode.CONCURRENT) + fun mixedAllCompletedRunSignalAndFailedRun(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + val utilsClient = ingressClient.toService() + + val sendResponse = + interpreterClient + .request { + interpretCommands( + InterpretRequest( + listOf( + AwaitAllCompleted( + listOf( + RunReturns("runval"), + CreateSignal("sig"), + RunThrowTerminalException("fail")))))) + } + .options(idempotentCallOptions) + .send() + + val invocationId = sendResponse.invocationId() + utilsClient + .request { resolveSignal(ResolveSignalRequest(invocationId, "sig", "sigval")) } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("ok:runval|ok:sigval|err:fail") + } } diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Signals.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Signals.kt new file mode 100644 index 00000000..fd3881f5 --- /dev/null +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Signals.kt @@ -0,0 +1,92 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.tests + +import dev.restate.client.Client +import dev.restate.client.kotlin.* +import dev.restate.sdktesting.contracts.TestUtilsService +import dev.restate.sdktesting.contracts.TestUtilsService.RejectSignalRequest +import dev.restate.sdktesting.contracts.TestUtilsService.ResolveSignalRequest +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitOne +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.CreateSignal +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.InterpretRequest +import dev.restate.sdktesting.infra.* +import java.util.UUID +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode + +class Signals { + companion object { + @RegisterExtension + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withServiceSpec( + ServiceSpec.defaultBuilder() + .withServices(VirtualObjectCommandInterpreter::class, TestUtilsService::class)) + } + } + + @Test + @DisplayName("Await one signal with successful resolution") + @Execution(ExecutionMode.CONCURRENT) + fun resolve(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + + val sendResponse = + interpreterClient + .request { + interpretCommands(InterpretRequest(listOf(AwaitOne(CreateSignal("mysignal"))))) + } + .options(idempotentCallOptions) + .send() + + ingressClient + .toService() + .request { + resolveSignal(ResolveSignalRequest(sendResponse.invocationId(), "mysignal", "hello")) + } + .options(idempotentCallOptions) + .call() + + assertThat(sendResponse.attachSuspend().response).isEqualTo("hello") + } + + @Test + @DisplayName("Await one signal with rejection propagates as terminal error") + @Execution(ExecutionMode.CONCURRENT) + fun reject(@InjectClient ingressClient: Client) = runTest { + val testId = UUID.randomUUID().toString() + val interpreterClient = ingressClient.toVirtualObject(testId) + + val sendResponse = + interpreterClient + .request { + interpretCommands(InterpretRequest(listOf(AwaitOne(CreateSignal("mysignal"))))) + } + .options(idempotentCallOptions) + .send() + + ingressClient + .toService() + .request { + rejectSignal(RejectSignalRequest(sendResponse.invocationId(), "mysignal", "boom")) + } + .options(idempotentCallOptions) + .call() + + assertThat(runCatching { sendResponse.attachSuspend().response }.exceptionOrNull()) + .message() + .contains("boom") + } +} From 7f4e3f0902ee79adbeec657c2fcbfeaab9558829 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 16:16:11 +0200 Subject: [PATCH 2/4] Added logging client thing --- .../infra/BaseRestateDeployerExtension.kt | 2 +- .../restate/sdktesting/infra/LoggingClient.kt | 110 ++++++++++++++++++ .../dev/restate/sdktesting/junit/TestSuite.kt | 2 +- 3 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 infra/src/main/kotlin/dev/restate/sdktesting/infra/LoggingClient.kt diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/infra/BaseRestateDeployerExtension.kt b/infra/src/main/kotlin/dev/restate/sdktesting/infra/BaseRestateDeployerExtension.kt index 59f6b9b0..a0cb7284 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/infra/BaseRestateDeployerExtension.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/infra/BaseRestateDeployerExtension.kt @@ -63,7 +63,7 @@ abstract class BaseRestateDeployerExtension : ParameterResolver { } private fun resolveIngressClient(extensionContext: ExtensionContext): Client { - return Client.connect(resolveIngressURI(extensionContext).toString()) + return LoggingClient(Client.connect(resolveIngressURI(extensionContext).toString())) } private fun resolveContainerAddress( diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/infra/LoggingClient.kt b/infra/src/main/kotlin/dev/restate/sdktesting/infra/LoggingClient.kt new file mode 100644 index 00000000..eed51d47 --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/infra/LoggingClient.kt @@ -0,0 +1,110 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.infra + +import dev.restate.client.Client +import dev.restate.client.Response +import dev.restate.client.SendResponse +import dev.restate.common.Request +import dev.restate.serde.TypeTag +import java.time.Duration +import java.util.concurrent.CompletableFuture +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializerOrNull +import org.apache.logging.log4j.LogManager + +internal class LoggingClient(private val delegate: Client) : Client { + + companion object { + private val LOG = LogManager.getLogger(LoggingClient::class.java) + private val JSON = Json { prettyPrint = true } + } + + override fun callAsync(request: Request): CompletableFuture> { + LOG.info("→ CALL {}", formatRequest(request)) + return delegate.callAsync(request).whenComplete { response, ex -> + if (ex != null) { + LOG.info("← CALL {} error: {}", request.target, ex.message) + } else { + LOG.info( + "← CALL {} status={} headers={} response={}", + request.target, + response.statusCode(), + response.headers().toLowercaseMap(), + formatBody(response.response())) + } + } + } + + override fun sendAsync( + request: Request, + delay: Duration? + ): CompletableFuture> { + LOG.info("→ SEND {}", formatRequest(request)) + return delegate.sendAsync(request, delay).whenComplete { response, ex -> + if (ex != null) { + LOG.info("← SEND {} error: {}", request.target, ex.message) + } else { + LOG.info( + "← SEND {} status={} invocationId={} sendStatus={}", + request.target, + response.statusCode(), + response.invocationId(), + response.sendStatus()) + } + } + } + + // Delegate remaining abstract methods unchanged. + + override fun awakeableHandle(id: String): Client.AwakeableHandle = delegate.awakeableHandle(id) + + override fun invocationHandle( + invocationId: String, + resTypeTag: TypeTag + ): Client.InvocationHandle = delegate.invocationHandle(invocationId, resTypeTag) + + override fun idempotentInvocationHandle( + target: dev.restate.common.Target, + idempotencyKey: String, + resTypeTag: TypeTag + ): Client.IdempotentInvocationHandle = + delegate.idempotentInvocationHandle(target, idempotencyKey, resTypeTag) + + override fun workflowHandle( + workflowName: String, + workflowId: String, + resTypeTag: TypeTag + ): Client.WorkflowHandle = delegate.workflowHandle(workflowName, workflowId, resTypeTag) + + private fun formatRequest(request: Request<*, *>): String = buildString { + append(request.target) + if (request.idempotencyKey != null) append(" idempotency-key=${request.idempotencyKey}") + val headers = request.headers + if (!headers.isNullOrEmpty()) append(" headers=$headers") + append("\n payload: ") + append(formatBody(request.request)) + } + + @OptIn(ExperimentalSerializationApi::class) + @Suppress("UNCHECKED_CAST") + private fun formatBody(value: Any?): String { + if (value == null || value == Unit) return "(empty)" + if (value is ByteArray) return "[${value.size} bytes]" + return try { + val serializer = + serializerOrNull(value.javaClass) as? KSerializer ?: return value.toString() + JSON.encodeToString(serializer, value) + } catch (_: Exception) { + value.toString() + } + } +} diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt index ab2593e6..c96808ee 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt @@ -187,7 +187,7 @@ class TestSuite( val testContainersLogger = builder - .newLogger("org.testcontainers", Level.TRACE) + .newLogger("org.testcontainers", Level.DEBUG) .add(builder.newAppenderRef("testRunnerLog")) .add(builder.newAppenderRef("routingAppender")) .addAttribute("additivity", false) From ea463482c20dbbfb33dabf3fe024ad8784fb8b6f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 16:23:07 +0200 Subject: [PATCH 3/4] Remove sleepConcurrently --- .../sdktesting/contracts/TestUtilsService.kt | 3 -- .../dev/restate/sdktesting/tests/Sleep.kt | 42 ++++++++++++------- .../sdktesting/tests/SleepWithFailures.kt | 19 ++++++--- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt index 0403cef5..363ca4b9 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtilsService.kt @@ -42,9 +42,6 @@ interface TestUtilsService { /** Just echo */ @Handler @Raw suspend fun rawEcho(@Raw input: ByteArray): ByteArray - /** Create timers and await them all. Durations in milliseconds */ - @Handler suspend fun sleepConcurrently(millisDuration: List) - /** * Invoke `ctx.run` incrementing a local variable counter (not a restate state key!). * diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt index 1c9d161b..12226d3a 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt @@ -9,11 +9,16 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client -import dev.restate.client.kotlin.toService -import dev.restate.sdktesting.contracts.TestUtilsService +import dev.restate.client.kotlin.toVirtualObject +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitAllCompleted +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitOne +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.InterpretRequest +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.Sleep import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import dev.restate.sdktesting.infra.ServiceSpec +import java.util.UUID import java.util.concurrent.TimeUnit import kotlin.random.Random import kotlin.random.nextLong @@ -35,7 +40,8 @@ class Sleep { companion object { @RegisterExtension val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withServiceSpec(ServiceSpec.defaultBuilder().withServices(TestUtilsService::class)) + withServiceSpec( + ServiceSpec.defaultBuilder().withServices(VirtualObjectCommandInterpreter::class)) } } @@ -46,8 +52,11 @@ class Sleep { val elapsed = measureNanoTime { ingressClient - .toService() - .request { sleepConcurrently(listOf(sleepDuration.inWholeMilliseconds)) } + .toVirtualObject(UUID.randomUUID().toString()) + .request { + interpretCommands( + InterpretRequest(listOf(AwaitOne(Sleep(sleepDuration.inWholeMilliseconds))))) + } .options(idempotentCallOptions) .call() } @@ -65,20 +74,23 @@ class Sleep { val sleepsPerInvocation = 20 val concurrentSleepInvocations = 50 - val coordinatorClient = ingressClient.toService() - - // Range is inclusive (1..concurrentSleepInvocations) .map { launch { - coordinatorClient + ingressClient + .toVirtualObject(UUID.randomUUID().toString()) .request { - sleepConcurrently( - (1..sleepsPerInvocation).map { - Random.nextLong( - minSleepDuration.inWholeMilliseconds..maxSleepDuration - .inWholeMilliseconds) - }) + interpretCommands( + InterpretRequest( + listOf( + AwaitAllCompleted( + (1..sleepsPerInvocation).map { + Sleep( + Random.nextLong( + minSleepDuration + .inWholeMilliseconds..maxSleepDuration + .inWholeMilliseconds)) + })))) } .options(idempotentCallOptions) .call() diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt index 2eab7868..2630dba4 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt @@ -9,9 +9,13 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client -import dev.restate.client.kotlin.toService -import dev.restate.sdktesting.contracts.TestUtilsService +import dev.restate.client.kotlin.toVirtualObject +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.AwaitOne +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.InterpretRequest +import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter.Sleep import dev.restate.sdktesting.infra.* +import java.util.UUID import java.util.concurrent.TimeUnit import kotlin.random.Random import kotlin.random.nextLong @@ -32,7 +36,8 @@ class SleepWithFailures { companion object { @RegisterExtension val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withServiceSpec(ServiceSpec.defaultBuilder().withServices(TestUtilsService::class)) + withServiceSpec( + ServiceSpec.defaultBuilder().withServices(VirtualObjectCommandInterpreter::class)) } private val DEFAULT_SLEEP_DURATION = 4.seconds @@ -43,12 +48,16 @@ class SleepWithFailures { sleepDuration: Duration = DEFAULT_SLEEP_DURATION, action: suspend () -> Unit ) { + val testId = UUID.randomUUID().toString() val start = TimeSource.Monotonic.markNow() val job = coroutineScope { launch { ingressClient - .toService() - .request { sleepConcurrently(listOf(sleepDuration.inWholeMilliseconds)) } + .toVirtualObject(testId) + .request { + interpretCommands( + InterpretRequest(listOf(AwaitOne(Sleep(sleepDuration.inWholeMilliseconds))))) + } .options(idempotentCallOptions) .call() } From 784e68c6552d7433f915c8074ed95492f64b645c Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 17:00:23 +0200 Subject: [PATCH 4/4] Add skill to add sdk conformance tests --- .../skills/add-sdk-conformance-test/SKILL.md | 135 ++++++++++++++++++ .gitignore | 2 +- 2 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 .claude/skills/add-sdk-conformance-test/SKILL.md diff --git a/.claude/skills/add-sdk-conformance-test/SKILL.md b/.claude/skills/add-sdk-conformance-test/SKILL.md new file mode 100644 index 00000000..2680a17f --- /dev/null +++ b/.claude/skills/add-sdk-conformance-test/SKILL.md @@ -0,0 +1,135 @@ +--- +name: add-sdk-conformance-test +description: Add a new test to the SDK conformance test suite. Use when the user wants to add a new sdk test, conformance test, or test a new SDK feature across all SDK implementations. +user-invocable: true +--- + +# Adding a New SDK Conformance Test + +The `sdk-tests` module is a **conformance tool** — it defines contracts that SDK implementations must satisfy and test runners that verify them. It contains NO implementation code. + +## Architecture + +- **Contracts** (`sdk-tests/src/main/kotlin/dev/restate/sdktesting/contracts/`) — Kotlin interfaces (`@Service`, `@VirtualObject`, `@Workflow`) that each SDK implements. These define the wire API (service name, handler names, JSON field names). +- **Tests** (`sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/`) — JUnit 5 test classes that drive contracts through the Restate ingress client. + +**Never add implementation code to sdk-tests.** Only interfaces in contracts, only test logic in tests. + +## Step 1: Expand the contracts (if needed) + +Edit the relevant contract interface only if strictly needed. The main ones: + +- `VirtualObjectCommandInterpreter` — interpreter for combinator/signal/awakeable tests; the workhorse for most feature tests +- `TestUtilsService` — utility handlers (cancel, signal resolve/reject, etc.) + +**Contract rules:** +- Data classes → `@Serializable`; sealed hierarchies → `@SerialName("camelCase")` discriminator +- Handler inputs must be a single type — wrap multiple fields in a `@Serializable` data class +- `@Handler` for exclusive handlers, `@Shared` for shared handlers + +### VirtualObjectCommandInterpreter — key types + +**AwaitableCommand** (sub-operations that can be composed): +- `CreateAwakeable(awakeableKey)`, `CreateSignal(signalName)`, `Sleep(timeoutMillis)`, `RunReturns(value)`, `RunThrowTerminalException(reason)` + +**Command** (top-level interpreter steps): +- `AwaitOne(command)` — await a single sub-operation +- `AwaitAny(commands)` — first to complete (race); throws if winner failed +- `AwaitAnySuccessful(commands)` — first successful or all failed (legacy) +- `AwaitFirstCompleted(commands)` — first to complete (race) +- `AwaitFirstSucceededOrAllFailed(commands)` — first success, or throws if all fail +- `AwaitAllSucceededOrFirstFailed(commands)` — all succeed → pipe-joined `"v0|v1"`; throws on first fail +- `AwaitAllCompleted(commands)` — all settle → pipe-joined `"ok:v0|err:reason|ok:v2"` + +**TestUtilsService:** +- `resolveSignal(ResolveSignalRequest(invocationId, signalName, value))` +- `rejectSignal(RejectSignalRequest(invocationId, signalName, reason))` + +## Step 2: Write the test + +### Test class boilerplate + +```kotlin +class MyFeature { + companion object { + @RegisterExtension + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withServiceSpec( + ServiceSpec.defaultBuilder() + .withServices(VirtualObjectCommandInterpreter::class, TestUtilsService::class)) + } + } + + @Test + @DisplayName("Human-readable description") + @Execution(ExecutionMode.CONCURRENT) + fun myTest(@InjectClient ingressClient: Client) = runTest { + // ... + } +} +``` + +### Client patterns + +```kotlin +// Build clients +val voClient = ingressClient.toVirtualObject(UUID.randomUUID().toString()) +val svcClient = ingressClient.toService() + +// Call and get result immediately +val result = voClient.request { myHandler(req) }.options(idempotentCallOptions).call().response + +// Send (non-blocking) + attach later — REQUIRED for signals +val sendResponse = voClient.request { myHandler(req) }.options(idempotentCallOptions).send() +val invocationId = sendResponse.invocationId() +// ... resolve/reject signals ... +val result = sendResponse.attachSuspend().response + +// Expect a terminal error +assertThat(runCatching { sendResponse.attachSuspend().response }.exceptionOrNull()) + .message().contains("expected substring") + +// Poll until condition (awakeables only — not needed for signals) +await withAlias "description" untilAsserted { + assertThat(voClient.request { hasAwakeable("key") }.call().response).isTrue() +} +``` + +### Awakeable vs Signal patterns + +**Awakeables** — identified by a unique runtime ID stored in VirtualObject state: +- Send the `interpretCommands` call and poll `hasAwakeable(key)` before resolving +- Resolve/reject via `interpreterClient.request { resolveAwakeable(ResolveAwakeable(key, value)) }` + +**Signals** — identified by invocation ID + name; no pre-registration needed: +- `.send()` → get `invocationId()` → resolve/reject via `TestUtilsService.resolveSignal/rejectSignal` → `attachSuspend()` +- No polling required — signals can be sent before or after the handler starts waiting + +## Step 3: Verify it compiles + +```bash +./gradlew :sdk-tests:compileKotlin +``` + +## Step 4: Run against a local SDK image + +Build the SDK Docker image (example for TypeScript SDK): + +```bash +# From the sdk-typescript repo root +podman build -t e2e-ts:local -f packages/tests/restate-e2e-services/Dockerfile . +``` + +Run just the new test class: + +```bash +./gradlew :sdk-tests:run --args='run --sequential --image-pull-policy=CACHED --test-suite=default --test-name=MyFeature --service-container-image=localhost/e2e-ts:local' +``` + +## Step 5: Update SDK implementations + +After adding a new contract or command type, you must update each SDK's test service implementation. The TypeScript SDK (the reference implementation) lives in `sdk-typescript`: +- Main: `packages/tests/restate-e2e-services/src/virtual_object_command_interpreter.ts` and `test_utils.ts` +- Gen: `packages/libs/restate-sdk-gen/test-services/src/vo-command-interpreter.ts` and `test-utils.ts` + +Use the `update-sdk-test-contracts` skill in the sdk-typescript repo for guidance on the implementation patterns. diff --git a/.gitignore b/.gitignore index a2737689..73dbf6af 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,4 @@ services/node-services/restatedev-restate-sdk-* test_report .kotlin -.claude/ +.claude/settings.local.json