diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 5d6a92b0..ce4ebfb5 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -8,22 +8,76 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.junit -object TestSuites { +import dev.restate.sdktesting.tests.AwakeableIngressEndpointTest +import dev.restate.sdktesting.tests.AwakeableLeaderTransferTest +import dev.restate.sdktesting.tests.BackwardCompatibilityTest +import dev.restate.sdktesting.tests.ForwardCompatibilityTest +import dev.restate.sdktesting.tests.IngressTest +import dev.restate.sdktesting.tests.InvokerMemoryTest +import dev.restate.sdktesting.tests.JournalRetentionTest +import dev.restate.sdktesting.tests.KafkaTest +import dev.restate.sdktesting.tests.OpenAPITest +import dev.restate.sdktesting.tests.PauseResumeChangingDeploymentTest +import dev.restate.sdktesting.tests.PauseResumeTest +import dev.restate.sdktesting.tests.RestartAsNewInvocationTest +import dev.restate.sdktesting.tests.StatePatchingTest +import dev.restate.sdktesting.tests.TracingTest +import dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation +import dev.restate.sdktesting.tests.UpgradeWithNewInvocation + +object TestSuites : SuiteProvider { + override val defaultSuite: TestSuite + get() = DEFAULT_SUITE + val DEFAULT_SUITE = - TestSuite("default", emptyMap(), "none() | always-suspending | only-single-node") + TestSuite( + "default", + emptyMap(), + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + val THREE_NODES_SUITE = TestSuite( "threeNodes", mapOf( "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(none() | always-suspending | only-multi-node) & !only-single-node", + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + ), 3) + private val ALWAYS_SUSPENDING_SUITE = TestSuite( "alwaysSuspending", mapOf("RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s"), - "always-suspending | only-always-suspending | only-single-node") + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + )) + private val THREE_NODES_ALWAYS_SUSPENDING_SUITE = TestSuite( "threeNodesAlwaysSuspending", @@ -31,12 +85,16 @@ object TestSuites { "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(always-suspending | only-always-suspending | only-multi-node) & !only-single-node", + listOf(clazz()), 3) + private val VERSION_COMPATIBILITY_SUITE = - TestSuite("versionCompat", emptyMap(), "version-compatibility") + TestSuite( + "versionCompat", + emptyMap(), + listOf(clazz(), clazz())) - fun allSuites(): List { + override fun allSuites(): List { return listOf( DEFAULT_SUITE, THREE_NODES_SUITE, @@ -45,7 +103,7 @@ object TestSuites { VERSION_COMPATIBILITY_SUITE) } - fun resolveSuites(suite: String?): List { + override fun resolveSuites(suite: String?): List { return when (suite ?: "all") { "all" -> allSuites() else -> { diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt index ada51ed8..f81d2943 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt @@ -8,251 +8,6 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting -import com.charleskorn.kaml.Yaml -import com.charleskorn.kaml.decodeFromStream -import com.charleskorn.kaml.encodeToStream -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.core.subcommands -import com.github.ajalt.clikt.parameters.arguments.* -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.groups.provideDelegate -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.enum -import com.github.ajalt.clikt.parameters.types.path -import com.github.ajalt.mordant.rendering.TextColors.green -import com.github.ajalt.mordant.rendering.TextColors.red -import com.github.ajalt.mordant.rendering.TextStyles.bold -import com.github.ajalt.mordant.terminal.Terminal -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.junit.ExecutionResult import dev.restate.sdktesting.junit.TestSuites -import java.io.File -import java.io.FileInputStream -import java.io.FileOutputStream -import java.nio.file.Path -import java.time.LocalDateTime -import java.time.format.DateTimeFormatter -import kotlin.jvm.optionals.getOrNull -import kotlin.system.exitProcess -import kotlin.time.Duration -import kotlinx.serialization.Serializable -import org.junit.platform.engine.Filter -import org.junit.platform.engine.discovery.ClassNameFilter -import org.junit.platform.engine.support.descriptor.MethodSource -import org.junit.platform.launcher.MethodFilter -@Serializable data class ExclusionsFile(val exclusions: Map>? = emptyMap()) - -class RestateSdkTestSuite : CliktCommand() { - override fun run() { - // Disable log4j2 JMX, this prevents reconfiguration - System.setProperty("log4j2.disable.jmx", "true") - // This is hours of debugging, don't touch it - // tl;dr this makes sure a single log4j2 configuration exists for the whole JVM, - // important to make Configurator.reconfigure work - System.setProperty( - "log4j2.contextSelector", "org.apache.logging.log4j.core.selector.BasicContextSelector") - // The default keep alive time is way too long, and this is a problem when we stop and restart - // containers. - System.setProperty("jdk.httpclient.keepalive.timeout", "5") - // The health check strategy uses the HttpUrlConnection which has no connect timeout by default. - // Could have caused the health check to hang indefinitely. - System.setProperty("sun.net.client.defaultConnectTimeout", "5000") - // Enable Logging of JDK client - // System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager") - // System.setProperty("jdk.httpclient.HttpClient.log", "all") - } -} - -class TestRunnerOptions : OptionGroup() { - val restateContainerImage by - option(envvar = "RESTATE_CONTAINER_IMAGE").help("Image used to run Restate") - val reportDir by - option(envvar = "TEST_REPORT_DIR").path().help("Base report directory").defaultLazy { - defaultReportDirectory() - } - val imagePullPolicy by - option() - .enum() - .help( - "Pull policy used to pull containers required for testing. In case of ALWAYS, docker won't pull images with repository prefix restate.local or localhost") - .default(PullPolicy.ALWAYS) - - fun applyToDeployerConfig(deployerConfig: RestateDeployerConfig): RestateDeployerConfig { - var newConfig = deployerConfig - if (restateContainerImage != null) { - newConfig = newConfig.copy(restateContainerImage = restateContainerImage!!) - } - newConfig = newConfig.copy(imagePullPolicy = imagePullPolicy) - return newConfig - } - - private fun defaultReportDirectory(): Path { - val formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss") - return Path.of("test_report/${LocalDateTime.now().format(formatter)}").toAbsolutePath() - } -} - -class FilterOptions : OptionGroup() { - val testSuite by - option() - .required() - .help( - "Test suite to run. Available: ${listOf("all") + TestSuites.allSuites().map { it.name }}") - val testName by option().help("Name of the test to run for the given suite") -} - -abstract class TestRunCommand(help: String) : CliktCommand(help) { - val testRunnerOptions by TestRunnerOptions() -} - -class Run : - TestRunCommand( - """ -Run test suite, executing the service as container. -""" - .trimIndent()) { - val filter by FilterOptions().cooccurring() - val exclusionsFile by - option("--exclusions", "--exclusions-file", envvar = "TEST_EXCLUSIONS_FILE") - .help("File containing the excluded tests") - val parallel by - option(help = "Enable parallel testing") - .help( - "If set, runs tests in parallel. We suggest running tests sequentially when using podman") - .flag("--sequential", default = true) - - override fun run() { - val terminal = Terminal() - - val restateDeployerConfig = - RestateDeployerConfig( - mapOf(), - ) - - // Register global config of the deployer - registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) - - // Resolve test configurations - val testSuites = TestSuites.resolveSuites(filter?.testSuite) - - // Load exclusions file - val loadedExclusions: ExclusionsFile = - if (exclusionsFile != null) { - FileInputStream(File(exclusionsFile!!)) - .use { Yaml.default.decodeFromStream(it) } - .also { exclusions -> - println("Using exclusions file $exclusionsFile") - println("Loaded exclusions: ${exclusions.exclusions}") - } - } else { - ExclusionsFile() - } - - val reports = mutableListOf() - val newExclusions = mutableMapOf>() - var newFailures = false - for (testSuite in testSuites) { - val exclusions = loadedExclusions.exclusions?.get(testSuite.name) ?: emptyList() - val exclusionsFilters = - if (exclusions.isNotEmpty()) { - listOf(MethodFilter.excludeMethodNamePatterns(exclusions)) - } else { - listOf() - } - - val cliOptionFilter = - filter?.testName?.let { - listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(it))) - } ?: emptyList>() - - val report = - testSuite.runTests( - terminal, - testRunnerOptions.reportDir, - exclusionsFilters + cliOptionFilter, - false, - parallel) - - reports.add(report) - // No need to wait the end of the run for this - report.printFailuresToFiles(testRunnerOptions.reportDir) - val failures = report.failedTests - if (failures.isNotEmpty() || exclusions.isNotEmpty()) { - newExclusions[testSuite.name] = - (failures - .mapNotNull { it.source.getOrNull() } - .mapNotNull { - when (it) { - is MethodSource -> "${it.className}.${it.methodName}" - else -> null - } - } - .distinct() + exclusions) - .sorted() - } - if (failures.isNotEmpty()) { - newFailures = true - } - } - - // Write out the exclusions file - FileOutputStream(testRunnerOptions.reportDir.resolve("exclusions.new.yaml").toFile()).use { - Yaml.default.encodeToStream(ExclusionsFile(newExclusions.toSortedMap()), it) - } - - // Print final report - val succeededTests = reports.sumOf { it.succeededTests } - val executedTests = reports.sumOf { it.executedTests } - val testsStyle = if (succeededTests == executedTests) green else red - val testsInfoLine = testsStyle("""* Succeeded tests: $succeededTests / ${executedTests}""") - - val failedClasses = reports.sumOf { it.executedClasses - it.succeededClasses } - val classesStyle = if (failedClasses != 0) red else green - val classesInfoLine = classesStyle("""* Failed classes initialization: $failedClasses""") - - val totalDuration = reports.fold(Duration.ZERO) { d, res -> d + res.executionDuration } - - println( - """ - ${bold("========================= Final results =========================")} - 🗈 Report directory: ${testRunnerOptions.reportDir} - * Run test suites: ${reports.map { it.testSuite }} - $testsInfoLine - $classesInfoLine - * Execution time: $totalDuration - """ - .trimIndent()) - - for (report in reports) { - report.printFailuresToTerminal(terminal) - } - - exitProcess( - if (newFailures) { - 1 - } else { - 0 - }) - } -} - -fun main(args: Array) { - val args = - if (args.isEmpty()) { - arrayOf("run") - } else { - args - } - - RestateSdkTestSuite().subcommands(Run()).main(args) -} - -private fun testClassNameToFQCN(className: String): String { - if (className.contains('.')) { - // Then it's FQCN - return className - } - return "dev.restate.sdktesting.tests.${className}" -} +fun main(args: Array) = runMain(args, TestSuites) diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt index 8a02ea99..dec0362b 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt @@ -28,7 +28,6 @@ import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension @@ -76,7 +75,6 @@ class AwakeableLeaderTransferTest { @Test @Timeout(180) - @Tag("only-multi-node") fun awakeableCompletionsAreNotLostDuringLeaderTransfer( @InjectClient ingressClient: Client, @InjectContainerHandle(hostName = RESTATE_RUNTIME) runtimeHandle: ContainerHandle, diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt index 1ffc0cdf..3c185cbb 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt @@ -51,7 +51,6 @@ import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.ClassOrderer import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Order -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestClassOrder import org.junit.jupiter.api.TestInstance @@ -64,7 +63,6 @@ import org.junit.jupiter.api.parallel.Isolated * Tests verifying backward compatibility (newer Restate version can read data written by older * version). */ -@Tag("version-compatibility") @Isolated @Execution(ExecutionMode.SAME_THREAD) @TestClassOrder(ClassOrderer.OrderAnnotation::class) @@ -145,7 +143,6 @@ class BackwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(1) @Isolated @@ -237,7 +234,6 @@ class BackwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(2) @Isolated diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt index 348c3a51..b20699b5 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt @@ -52,7 +52,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode import org.junit.jupiter.api.parallel.Isolated /** Tests to verify forward compatibility (older version can read data written by newer version). */ -@Tag("version-compatibility") @Isolated @Execution(ExecutionMode.SAME_THREAD) @TestClassOrder(ClassOrderer.OrderAnnotation::class) @@ -133,7 +132,6 @@ class ForwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(1) @Isolated @@ -223,7 +221,6 @@ class ForwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(2) @Isolated diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt index c481fc29..a62adc50 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt @@ -47,7 +47,6 @@ import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension @@ -69,7 +68,6 @@ import org.junit.jupiter.api.extension.RegisterExtension * The yield tests verify invocations eventually complete correctly. The oversized tests verify the * invocation is paused. */ -@Tag("only-single-node") class InvokerMemoryTest { @Service diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt deleted file mode 100644 index 57879dcb..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.admin.api.KafkaClusterApi -import dev.restate.admin.api.SubscriptionApi -import dev.restate.admin.client.ApiClient -import dev.restate.admin.model.CreateKafkaClusterRequest -import dev.restate.admin.model.CreateSubscriptionRequest -import dev.restate.sdktesting.infra.KafkaContainer -import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions -import dev.restate.sdktesting.infra.runtimeconfig.KafkaClusterOptions -import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema -import java.net.URI -import java.util.Properties -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer -import org.apache.kafka.clients.producer.ProducerRecord - -object Kafka { - fun produceMessagesToKafka(port: Int, topic: String, values: List>) { - val props = Properties() - props["bootstrap.servers"] = "PLAINTEXT://localhost:$port" - props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - - val producer: Producer = KafkaProducer(props) - for (value in values) { - producer.send(ProducerRecord(topic, value.first, value.second)) - } - producer.close() - } - - fun registerKafkaCluster( - adminURI: URI, - ) { - val kafkaClustersClient = - KafkaClusterApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) - retryOnServiceUnavailable { - kafkaClustersClient.createKafkaCluster( - CreateKafkaClusterRequest() - .name("my-cluster") - .properties( - mapOf( - "bootstrap.servers" to - "PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))) - } - } - - fun createKafkaSubscription( - adminURI: URI, - topic: String, - serviceName: String, - handlerName: String - ) { - val subscriptionsClient = - SubscriptionApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) - retryOnServiceUnavailable { - subscriptionsClient.createSubscription( - CreateSubscriptionRequest() - .source(URI.create("kafka://my-cluster/$topic")) - .sink(URI.create("service://$serviceName/$handlerName")) - .options(mapOf("auto.offset.reset" to "earliest"))) - } - } - - val configSchema: RestateConfigSchema.() -> Unit = { - this.withIngress( - IngressOptions() - .withKafkaClusters( - listOf( - KafkaClusterOptions() - .withName("my-cluster") - .withBrokers( - listOf("PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))))) - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt deleted file mode 100644 index 988d85f8..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.client.Client -import dev.restate.client.kotlin.attachSuspend -import dev.restate.client.kotlin.getOutputSuspend -import dev.restate.client.kotlin.response -import dev.restate.client.kotlin.toWorkflow -import dev.restate.client.kotlin.workflowHandle -import dev.restate.common.reflections.ReflectionUtils.extractServiceName -import dev.restate.sdk.annotation.Shared -import dev.restate.sdk.annotation.Workflow -import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.durablePromiseKey -import dev.restate.sdk.kotlin.promise -import dev.restate.sdk.kotlin.promiseHandle -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema -import java.net.URI -import kotlinx.serialization.json.Json -import org.assertj.core.api.Assertions.assertThat -import org.awaitility.kotlin.await -import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.RegisterExtension -import org.junit.jupiter.api.parallel.Execution -import org.junit.jupiter.api.parallel.ExecutionMode - -class KafkaAndWorkflowAPITest { - - @Workflow - class MyWorkflow { - - companion object { - val PROMISE = durablePromiseKey("promise") - } - - @Workflow suspend fun run(myTask: String) = "Run $myTask" - - @Shared - suspend fun setPromise(myValue: String) { - promiseHandle(PROMISE).resolve(myValue) - } - - @Shared suspend fun getPromise() = promise(PROMISE).future().await() - } - - companion object { - private const val SHARED_HANDLER_TOPIC = "shared-handler" - private const val WORKFLOW_TOPIC = "workflow" - - @RegisterExtension - val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withContainer("kafka", KafkaContainer(SHARED_HANDLER_TOPIC, WORKFLOW_TOPIC)) - withConfig(RestateConfigSchema().apply(Kafka.configSchema)) - withEndpoint(Endpoint.bind(MyWorkflow())) - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun callWorkflowHandler( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - // Create subscription - Kafka.createKafkaSubscription( - adminURI, WORKFLOW_TOPIC, extractServiceName(MyWorkflow::class.java), "run") - - val keyMessages = linkedMapOf("a" to "1", "b" to "2", "c" to "3") - - // Produce message to kafka - Kafka.produceMessagesToKafka( - kafkaPort, WORKFLOW_TOPIC, keyMessages.map { it.key to Json.encodeToString(it.value) }) - - // Now assert that those invocations are stored there, let's do this twice just for the sake of. - for (keyMessage in keyMessages) { - await withAlias - "Workflow invocations from Kafka" untilAsserted - { - assertThat( - ingressClient - .workflowHandle( - extractServiceName(MyWorkflow::class.java), keyMessage.key) - .attachSuspend() - .response) - .isEqualTo("Run ${keyMessage.value}") - } - } - - for (keyMessage in keyMessages) { - await withAlias - "Workflow invocations from Kafka" untilAsserted - { - assertThat( - ingressClient - .workflowHandle( - extractServiceName(MyWorkflow::class.java), keyMessage.key) - .getOutputSuspend() - .response - .value) - .isEqualTo("Run ${keyMessage.value}") - } - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun callSharedWorkflowHandler( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - // Create subscription - Kafka.createKafkaSubscription( - adminURI, SHARED_HANDLER_TOPIC, extractServiceName(MyWorkflow::class.java), "setPromise") - - val keyMessages = linkedMapOf("a" to "a", "b" to "b", "c" to "c") - - // Produce message to kafka - Kafka.produceMessagesToKafka( - kafkaPort, - SHARED_HANDLER_TOPIC, - keyMessages.map { it.key to Json.encodeToString(it.value) }) - - // Now assert that the promises are fulfilled. - for (keyMessage in keyMessages) { - await withAlias - "Workflow invocations from Kafka" untilAsserted - { - assertThat( - ingressClient - .toWorkflow(keyMessage.key) - .request { getPromise() } - .call() - .response) - .isEqualTo(keyMessage.value) - } - } - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt deleted file mode 100644 index 086fbda3..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.client.Client -import dev.restate.client.kotlin.response -import dev.restate.client.kotlin.toVirtualObject -import dev.restate.sdk.annotation.Handler -import dev.restate.sdk.annotation.Name -import dev.restate.sdk.annotation.Service -import dev.restate.sdk.annotation.VirtualObject -import dev.restate.sdk.common.StateKey -import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.state -import dev.restate.sdk.kotlin.stateKey -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.tests.Kafka.createKafkaSubscription -import dev.restate.sdktesting.tests.Kafka.produceMessagesToKafka -import dev.restate.sdktesting.tests.Kafka.registerKafkaCluster -import java.net.URI -import java.util.* -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import org.assertj.core.api.Assertions.assertThat -import org.awaitility.kotlin.await -import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Tag -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.RegisterExtension -import org.junit.jupiter.api.parallel.Execution -import org.junit.jupiter.api.parallel.ExecutionMode -import org.junit.jupiter.api.parallel.Isolated - -@Tag("only-single-node" /* This test depends on metadata propagation happening immediately */) -@Isolated -class KafkaDynamicSetupTest { - - @VirtualObject - @Name("Counter") - class Counter { - companion object { - private val COUNTER_KEY: StateKey = stateKey("counter") - } - - @Handler - suspend fun add(value: Long): Long { - val current = state().get(COUNTER_KEY) ?: 0L - val newValue = current + value - state().set(COUNTER_KEY, newValue) - return newValue - } - - @Handler - suspend fun get(): Long { - return state().get(COUNTER_KEY) ?: 0L - } - } - - @Service - @Name("EventHandler") - class EventHandler { - @Serializable data class ProxyRequest(val key: String, val value: Long) - - @Handler - suspend fun oneWayCall(request: ProxyRequest) { - dev.restate.sdk.kotlin - .toVirtualObject(request.key) - .request { add(request.value) } - .send() - } - } - - companion object { - private const val COUNTER_TOPIC = "counter" - private const val EVENT_HANDLER_TOPIC = "event-handler" - - @RegisterExtension - val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withEndpoint(Endpoint.bind(Counter()).bind(EventHandler())) - withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC)) - } - - @JvmStatic - @BeforeAll - fun beforeAll( - @InjectAdminURI adminURI: URI, - ) { - registerKafkaCluster(adminURI) - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun handleEventInCounterService( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - val counter = UUID.randomUUID().toString() - - // Register subscription - createKafkaSubscription(adminURI, COUNTER_TOPIC, "Counter", "add") - - // Produce message to kafka - produceMessagesToKafka( - kafkaPort, COUNTER_TOPIC, listOf(counter to "1", counter to "2", counter to "3")) - - await withAlias - "Updates from Kafka are visible in the counter" untilAsserted - { - assertThat( - ingressClient.toVirtualObject(counter).request { get() }.call().response) - .isEqualTo(6L) - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun handleEventInEventHandler( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - val counter = UUID.randomUUID().toString() - - // Register subscription - createKafkaSubscription(adminURI, EVENT_HANDLER_TOPIC, "EventHandler", "oneWayCall") - - // Produce message to kafka - produceMessagesToKafka( - kafkaPort, - EVENT_HANDLER_TOPIC, - listOf( - null to Json.encodeToString(EventHandler.ProxyRequest(counter, 1)), - null to Json.encodeToString(EventHandler.ProxyRequest(counter, 2)), - null to Json.encodeToString(EventHandler.ProxyRequest(counter, 3)))) - - await withAlias - "Updates from Kafka are visible in the counter" untilAsserted - { - assertThat( - ingressClient.toVirtualObject(counter).request { get() }.call().response) - .isEqualTo(6L) - } - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt new file mode 100644 index 00000000..22422675 --- /dev/null +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt @@ -0,0 +1,392 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.tests + +import dev.restate.admin.api.KafkaClusterApi +import dev.restate.admin.api.SubscriptionApi +import dev.restate.admin.client.ApiClient +import dev.restate.admin.model.CreateKafkaClusterRequest +import dev.restate.admin.model.CreateSubscriptionRequest +import dev.restate.client.Client +import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.getOutputSuspend +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject +import dev.restate.client.kotlin.toWorkflow +import dev.restate.client.kotlin.workflowHandle +import dev.restate.common.reflections.ReflectionUtils.extractServiceName +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Name +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.annotation.Shared +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.annotation.Workflow +import dev.restate.sdk.common.StateKey +import dev.restate.sdk.endpoint.Endpoint +import dev.restate.sdk.kotlin.durablePromiseKey +import dev.restate.sdk.kotlin.get +import dev.restate.sdk.kotlin.promise +import dev.restate.sdk.kotlin.promiseHandle +import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state +import dev.restate.sdk.kotlin.stateKey +import dev.restate.sdktesting.infra.InjectAdminURI +import dev.restate.sdktesting.infra.InjectClient +import dev.restate.sdktesting.infra.InjectContainerPort +import dev.restate.sdktesting.infra.KafkaContainer +import dev.restate.sdktesting.infra.RestateDeployerExtension +import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema +import dev.restate.sdktesting.tests.Tracing.JAEGER_HOSTNAME +import dev.restate.sdktesting.tests.Tracing.JAEGER_QUERY_PORT +import java.net.URI +import java.util.Properties +import java.util.UUID +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.assertj.core.api.Assertions.assertThat +import org.awaitility.kotlin.await +import org.awaitility.kotlin.withAlias +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode +import org.junit.jupiter.api.parallel.Isolated + +@Isolated +class KafkaTest { + + @Workflow + class MyWorkflow { + companion object { + val PROMISE = durablePromiseKey("promise") + } + + @Workflow suspend fun run(myTask: String) = "Run $myTask" + + @Shared + suspend fun setPromise(myValue: String) { + promiseHandle(PROMISE).resolve(myValue) + } + + @Shared suspend fun getPromise() = promise(PROMISE).future().await() + } + + @VirtualObject + @Name("Counter") + class Counter { + companion object { + private val COUNTER_KEY: StateKey = stateKey("counter") + } + + @Handler + suspend fun add(value: Long): Long { + val current = state().get(COUNTER_KEY) ?: 0L + val newValue = current + value + state().set(COUNTER_KEY, newValue) + return newValue + } + + @Handler suspend fun get(): Long = state().get(COUNTER_KEY) ?: 0L + } + + @Service + @Name("EventHandler") + class EventHandler { + @Serializable data class ProxyRequest(val key: String, val value: Long) + + @Handler + suspend fun oneWayCall(request: ProxyRequest) { + dev.restate.sdk.kotlin + .toVirtualObject(request.key) + .request { add(request.value) } + .send() + } + } + + @VirtualObject + @Name("TracingCounter") + class TracingCounter { + @Handler + suspend fun set(value: String) { + check(state().get("state") == null) + state().set("state", value) + } + + @Shared suspend fun get() = state().get("state") + } + + companion object { + private const val WORKFLOW_TOPIC = "workflow" + private const val SHARED_HANDLER_TOPIC = "shared-handler" + private const val COUNTER_TOPIC = "counter" + private const val EVENT_HANDLER_TOPIC = "event-handler" + private const val TRACING_TOPIC = "tracing" + + @RegisterExtension + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withContainer(Tracing.jaegerContainer()) + withContainer( + "kafka", + KafkaContainer( + WORKFLOW_TOPIC, + SHARED_HANDLER_TOPIC, + COUNTER_TOPIC, + EVENT_HANDLER_TOPIC, + TRACING_TOPIC)) + withConfig(RestateConfigSchema().apply(Tracing.configSchema)) + withEndpoint( + Endpoint.bind(MyWorkflow()).bind(Counter()).bind(EventHandler()).bind(TracingCounter())) + } + + fun produceMessagesToKafka(port: Int, topic: String, values: List>) { + val props = Properties() + props["bootstrap.servers"] = "PLAINTEXT://localhost:$port" + props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + + val producer: Producer = KafkaProducer(props) + for (value in values) { + producer.send(ProducerRecord(topic, value.first, value.second)) + } + producer.close() + } + + fun registerKafkaCluster( + adminURI: URI, + ) { + val kafkaClustersClient = + KafkaClusterApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) + retryOnServiceUnavailable { + kafkaClustersClient.createKafkaCluster( + CreateKafkaClusterRequest() + .name("my-cluster") + .properties( + mapOf( + "bootstrap.servers" to + "PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))) + } + } + + fun createKafkaSubscription( + adminURI: URI, + topic: String, + serviceName: String, + handlerName: String + ) { + val subscriptionsClient = + SubscriptionApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) + retryOnServiceUnavailable { + subscriptionsClient.createSubscription( + CreateSubscriptionRequest() + .source(URI.create("kafka://my-cluster/$topic")) + .sink(URI.create("service://$serviceName/$handlerName")) + .options(mapOf("auto.offset.reset" to "earliest"))) + } + } + + @JvmStatic + @BeforeAll + fun beforeAll( + @InjectAdminURI adminURI: URI, + ) { + registerKafkaCluster(adminURI) + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callWorkflowHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + createKafkaSubscription( + adminURI, WORKFLOW_TOPIC, extractServiceName(MyWorkflow::class.java), "run") + + val keyMessages = linkedMapOf("a" to "1", "b" to "2", "c" to "3") + + produceMessagesToKafka( + kafkaPort, WORKFLOW_TOPIC, keyMessages.map { it.key to Json.encodeToString(it.value) }) + + for (keyMessage in keyMessages) { + await withAlias + "Workflow invocations from Kafka" untilAsserted + { + assertThat( + ingressClient + .workflowHandle( + extractServiceName(MyWorkflow::class.java), keyMessage.key) + .attachSuspend() + .response) + .isEqualTo("Run ${keyMessage.value}") + } + } + + for (keyMessage in keyMessages) { + await withAlias + "Workflow invocations from Kafka" untilAsserted + { + assertThat( + ingressClient + .workflowHandle( + extractServiceName(MyWorkflow::class.java), keyMessage.key) + .getOutputSuspend() + .response + .value) + .isEqualTo("Run ${keyMessage.value}") + } + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callSharedWorkflowHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + createKafkaSubscription( + adminURI, SHARED_HANDLER_TOPIC, extractServiceName(MyWorkflow::class.java), "setPromise") + + val keyMessages = linkedMapOf("a" to "a", "b" to "b", "c" to "c") + + produceMessagesToKafka( + kafkaPort, + SHARED_HANDLER_TOPIC, + keyMessages.map { it.key to Json.encodeToString(it.value) }) + + for (keyMessage in keyMessages) { + await withAlias + "Workflow invocations from Kafka" untilAsserted + { + assertThat( + ingressClient + .toWorkflow(keyMessage.key) + .request { getPromise() } + .call() + .response) + .isEqualTo(keyMessage.value) + } + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callObjectHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + val counter = UUID.randomUUID().toString() + + createKafkaSubscription(adminURI, COUNTER_TOPIC, "Counter", "add") + + produceMessagesToKafka( + kafkaPort, COUNTER_TOPIC, listOf(counter to "1", counter to "2", counter to "3")) + + await withAlias + "Updates from Kafka are visible in the counter" untilAsserted + { + assertThat( + ingressClient.toVirtualObject(counter).request { get() }.call().response) + .isEqualTo(6L) + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callServiceHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + val counter = UUID.randomUUID().toString() + + createKafkaSubscription(adminURI, EVENT_HANDLER_TOPIC, "EventHandler", "oneWayCall") + + produceMessagesToKafka( + kafkaPort, + EVENT_HANDLER_TOPIC, + listOf( + null to Json.encodeToString(EventHandler.ProxyRequest(counter, 1)), + null to Json.encodeToString(EventHandler.ProxyRequest(counter, 2)), + null to Json.encodeToString(EventHandler.ProxyRequest(counter, 3)))) + + await withAlias + "Updates from Kafka are visible in the counter" untilAsserted + { + assertThat( + ingressClient.toVirtualObject(counter).request { get() }.call().response) + .isEqualTo(6L) + } + } + + @Test + fun shouldGenerateTraces( + @InjectClient ingressClient: Client, + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + ) = runTest { + createKafkaSubscription( + adminURI, TRACING_TOPIC, extractServiceName(TracingCounter::class.java), "set") + + produceMessagesToKafka(kafkaPort, TRACING_TOPIC, listOf("a" to Json.encodeToString("a"))) + + val client = ingressClient.toVirtualObject("a") + await withAlias + "state is updated" untilAsserted + { + assertThat(client.request { get() }.call().response).isEqualTo("a") + } + + await withAlias + "traces are available" untilAsserted + { + val traces = Tracing.getTraces(jaegerPort, "Restate") + + assertThat(traces.result.resourceSpans).isNotEmpty() + + val counterAddSpans = + traces.result.resourceSpans + .flatMap { it.scopeSpans } + .flatMap { it.spans } + .filter { + it.name.contains( + "ingress_kafka ${extractServiceName(TracingCounter::class.java)}/{key}/set") + } + + assertThat(counterAddSpans).isNotEmpty() + + val span = counterAddSpans.first() + val attributes = span.attributes.associate { it.key to it.value.stringValue } + assertThat(attributes) + .containsEntry( + "restate.invocation.target", + "${extractServiceName(TracingCounter::class.java)}/a/set") + .containsEntry("messaging.system", "kafka") + .containsEntry("messaging.source.name", TRACING_TOPIC) + .containsEntry("messaging.operation.type", "process") + .containsKeys( + "restate.invocation.id", + "messaging.consumer.group.name", + "messaging.kafka.offset", + "messaging.source.partition.id") + } + } +} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt deleted file mode 100644 index 3d23e1b6..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.client.Client -import dev.restate.client.kotlin.response -import dev.restate.client.kotlin.toVirtualObject -import dev.restate.common.reflections.ReflectionUtils.extractServiceName -import dev.restate.sdk.annotation.Handler -import dev.restate.sdk.annotation.Name -import dev.restate.sdk.annotation.Shared -import dev.restate.sdk.annotation.VirtualObject -import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.get -import dev.restate.sdk.kotlin.set -import dev.restate.sdk.kotlin.state -import dev.restate.sdktesting.infra.InjectAdminURI -import dev.restate.sdktesting.infra.InjectClient -import dev.restate.sdktesting.infra.InjectContainerPort -import dev.restate.sdktesting.infra.KafkaContainer -import dev.restate.sdktesting.infra.RestateDeployerExtension -import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema -import dev.restate.sdktesting.tests.Tracing.JAEGER_HOSTNAME -import dev.restate.sdktesting.tests.Tracing.JAEGER_QUERY_PORT -import java.net.URI -import kotlinx.serialization.json.Json -import org.assertj.core.api.Assertions.assertThat -import org.awaitility.kotlin.await -import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.RegisterExtension - -class KafkaTracingTest { - - @VirtualObject - @Name("Counter") - class Counter { - @Handler - suspend fun set(value: String) { - check(state().get("state") == null) - state().set("state", value) - } - - @Shared suspend fun get() = state().get("state") - } - - companion object { - private const val TOPIC = "my-topic" - - @RegisterExtension - val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - // Add Jaeger and Kafka container - withContainer(Tracing.jaegerContainer()) - withContainer("kafka", KafkaContainer(TOPIC)) - - // Configure Restate to send traces to Jaeger - withConfig(RestateConfigSchema().apply(Tracing.configSchema).apply(Kafka.configSchema)) - - withEndpoint(Endpoint.bind(Counter())) - } - } - - @Test - fun shouldGenerateTraces( - @InjectClient ingressClient: Client, - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - ) = runTest { - Kafka.createKafkaSubscription(adminURI, TOPIC, extractServiceName(Counter::class.java), "set") - - // Produce message to kafka - Kafka.produceMessagesToKafka(kafkaPort, TOPIC, listOf("a" to Json.encodeToString("a"))) - - // Await that state is updated - val client = ingressClient.toVirtualObject("a") - await withAlias - "state is updated" untilAsserted - { - assertThat(client.request { get() }.call().response).isEqualTo("a") - } - - // Check the traces - await withAlias - "traces are available" untilAsserted - { - val traces = Tracing.getTraces(jaegerPort, "Restate") - - assertThat(traces.result.resourceSpans).isNotEmpty() - - // Find the GreeterService spans - val counterAddSpans = - traces.result.resourceSpans - .flatMap { it.scopeSpans } - .flatMap { it.spans } - .filter { it.name.contains("ingress_kafka Counter/{key}/set") } - - assertThat(counterAddSpans).isNotEmpty() - - // Verify span attributes - val span = counterAddSpans.first() - - // Verify Restate-specific attributes - val attributes = span.attributes.associate { it.key to it.value.stringValue } - assertThat(attributes) - .containsEntry("restate.invocation.target", "Counter/a/set") - .containsEntry("messaging.system", "kafka") - .containsEntry("messaging.source.name", TOPIC) - .containsEntry("messaging.operation.type", "process") - .containsKeys( - "restate.invocation.id", - "messaging.consumer.group.name", - "messaging.kafka.offset", - "messaging.source.partition.id") - } - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt index 38529e6b..aa134c5f 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt @@ -25,11 +25,9 @@ import java.net.URI import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension -@Tag("only-single-node" /* This test depends on metadata propagation happening immediately */) class PauseResumeChangingDeploymentTest { @Service diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt index 04500990..dbe792b3 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt @@ -32,7 +32,6 @@ import kotlinx.serialization.Serializable import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension @@ -79,8 +78,6 @@ class VersionedService(private val version: String) { } } -@Tag("always-suspending") -@Tag("only-single-node") class UpgradeWithNewInvocation { companion object { @@ -122,8 +119,6 @@ class UpgradeWithNewInvocation { } } -@Tag("always-suspending") -@Tag("only-single-node") class UpgradeWithInFlightInvocation { companion object { diff --git a/infra/build.gradle.kts b/infra/build.gradle.kts index e1dc8318..d2ec0b73 100644 --- a/infra/build.gradle.kts +++ b/infra/build.gradle.kts @@ -27,7 +27,9 @@ dependencies { compileOnly(libs.tomcat.annotations) compileOnly(libs.google.findbugs.jsr305) + implementation(libs.clikt) implementation(libs.mordant) + implementation(libs.dotenv) implementation(libs.restate.sdk.client.kotlin) implementation(libs.restate.sdk.kotlin.http) implementation(libs.vertx) diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt new file mode 100644 index 00000000..cd6f3784 --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt @@ -0,0 +1,17 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.junit + +interface SuiteProvider { + val defaultSuite: TestSuite + + fun allSuites(): List + + fun resolveSuites(suite: String?): List +} diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt index c62d0a5e..ab2593e6 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt @@ -20,18 +20,27 @@ import org.apache.logging.log4j.Level import org.apache.logging.log4j.core.config.Configurator import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration +import org.junit.platform.engine.DiscoverySelector import org.junit.platform.engine.Filter +import org.junit.platform.engine.discovery.ClassNameFilter import org.junit.platform.engine.discovery.DiscoverySelectors import org.junit.platform.launcher.* import org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder import org.junit.platform.launcher.core.LauncherFactory import org.junit.platform.reporting.legacy.xml.LegacyXmlReportGeneratingListener +inline fun clazz(): DiscoverySelector = DiscoverySelectors.selectClass(T::class.java) + +inline fun method(name: String): DiscoverySelector = + DiscoverySelectors.selectMethod(T::class.java, name) + +private const val CUSTOM_TEST_CLASS = "dev.restate.sdktesting.tests.Custom" + class TestSuite( val name: String, val additionalEnvs: Map, - val junitIncludeTags: String, - val restateNodes: Int = 1 + val selectors: List, + val restateNodes: Int = 1, ) { fun runTests( terminal: Terminal, @@ -60,9 +69,13 @@ class TestSuite( // Prepare launch request var builder = LauncherDiscoveryRequestBuilder.request() - .selectors(DiscoverySelectors.selectPackage("dev.restate.sdktesting.tests")) - .filters(TagFilter.includeTags(junitIncludeTags)) + .selectors(selectors) .filters(*filters.toTypedArray()) + .apply { + if (restateDeployerConfig.customTestsFile == null) { + filters(ClassNameFilter.excludeClassNamePatterns(CUSTOM_TEST_CLASS)) + } + } // Redirect STDOUT/STDERR .configurationParameter(LauncherConstants.CAPTURE_STDOUT_PROPERTY_NAME, "true") .configurationParameter(LauncherConstants.CAPTURE_STDERR_PROPERTY_NAME, "true") diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt b/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt new file mode 100644 index 00000000..24e8308c --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt @@ -0,0 +1,317 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting + +import com.charleskorn.kaml.Yaml +import com.charleskorn.kaml.decodeFromStream +import com.charleskorn.kaml.encodeToStream +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.core.subcommands +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.arguments.convert +import com.github.ajalt.clikt.parameters.arguments.help +import com.github.ajalt.clikt.parameters.arguments.multiple +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.cooccurring +import com.github.ajalt.clikt.parameters.groups.provideDelegate +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.enum +import com.github.ajalt.clikt.parameters.types.int +import com.github.ajalt.clikt.parameters.types.path +import com.github.ajalt.mordant.rendering.TextColors.green +import com.github.ajalt.mordant.rendering.TextColors.red +import com.github.ajalt.mordant.rendering.TextStyles.bold +import com.github.ajalt.mordant.terminal.Terminal +import dev.restate.sdktesting.infra.ContainerServiceDeploymentConfig +import dev.restate.sdktesting.infra.LocalForwardServiceDeploymentConfig +import dev.restate.sdktesting.infra.PullPolicy +import dev.restate.sdktesting.infra.RestateDeployerConfig +import dev.restate.sdktesting.infra.ServiceSpec +import dev.restate.sdktesting.infra.registerGlobalConfig +import dev.restate.sdktesting.junit.ExecutionResult +import dev.restate.sdktesting.junit.SuiteProvider +import io.github.cdimascio.dotenv.Dotenv +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.nio.file.Path +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import kotlin.jvm.optionals.getOrNull +import kotlin.system.exitProcess +import kotlin.time.Duration +import kotlinx.serialization.Serializable +import org.junit.platform.engine.Filter +import org.junit.platform.engine.discovery.ClassNameFilter +import org.junit.platform.engine.support.descriptor.MethodSource +import org.junit.platform.launcher.MethodFilter + +@Serializable data class ExclusionsFile(val exclusions: Map>? = emptyMap()) + +private class RestateE2ETests : CliktCommand() { + override fun run() { + // Disable log4j2 JMX, this prevents reconfiguration + System.setProperty("log4j2.disable.jmx", "true") + // This is hours of debugging, don't touch it + // tl;dr this makes sure a single log4j2 configuration exists for the whole JVM, + // important to make Configurator.reconfigure work + System.setProperty( + "log4j2.contextSelector", "org.apache.logging.log4j.core.selector.BasicContextSelector") + // The default keep alive time is way too long, and this is a problem when we stop and restart + // containers. + System.setProperty("jdk.httpclient.keepalive.timeout", "5") + // The health check strategy uses the HttpUrlConnection which has no connect timeout by default. + // Could have caused the health check to hang indefinitely. + System.setProperty("sun.net.client.defaultConnectTimeout", "5000") + } +} + +private class TestRunnerOptions : OptionGroup() { + val restateContainerImage by + option(envvar = "RESTATE_CONTAINER_IMAGE").help("Image used to run Restate") + val reportDir by + option(envvar = "TEST_REPORT_DIR").path().help("Base report directory").defaultLazy { + defaultReportDirectory() + } + val imagePullPolicy by + option() + .enum() + .help( + "Pull policy for container images. ALWAYS skips pulling images prefixed with restate.local or localhost") + .default(PullPolicy.ALWAYS) + val customTestsFile by + option("--custom-tests", "--custom-tests-file") + .help("File containing the custom tests configurations") + + fun applyToDeployerConfig(deployerConfig: RestateDeployerConfig): RestateDeployerConfig { + var newConfig = deployerConfig + if (restateContainerImage != null) { + newConfig = newConfig.copy(restateContainerImage = restateContainerImage!!) + } + if (customTestsFile != null) { + newConfig = newConfig.copy(customTestsFile = customTestsFile!!) + } + newConfig = newConfig.copy(imagePullPolicy = imagePullPolicy) + return newConfig + } + + private fun defaultReportDirectory(): Path { + val formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss") + return Path.of("test_report/${LocalDateTime.now().format(formatter)}").toAbsolutePath() + } +} + +private class FilterOptions(suites: SuiteProvider) : OptionGroup() { + val testSuite by + option() + .required() + .help( + "Test suite to run. Available: ${listOf("all") + suites.allSuites().map { it.name }}") + val testName by option().help("Name of the test class to run within the suite") +} + +private abstract class TestRunCommand(help: String) : CliktCommand(help) { + val testRunnerOptions by TestRunnerOptions() +} + +private class Run(private val suites: SuiteProvider) : + TestRunCommand("Run test suite, executing the service as a container.") { + val filter by FilterOptions(suites).cooccurring() + val exclusionsFile by + option("--exclusions", "--exclusions-file", envvar = "TEST_EXCLUSIONS_FILE") + .help("YAML file containing the excluded tests") + val parallel by + option() + .flag("--sequential", default = true) + .help( + "Run tests in parallel (default). Use --sequential with Podman or to reduce resource usage.") + val serviceContainerImage by + option("--service-container-image", envvar = "SERVICE_CONTAINER_IMAGE") + .help( + "Docker image for the service under test. If omitted, no service container is deployed.") + val serviceContainerEnvFile by + option("--service-container-env-file") + .help(".env file whose variables are injected into the service container") + + override fun run() { + val terminal = Terminal() + + val additionalServiceEnvs = + serviceContainerEnvFile + ?.let { envFile -> + Dotenv.configure().filename(envFile).load().entries().associate { it.key to it.value } + } + .orEmpty() + + val serviceDeploymentConfig = + if (serviceContainerImage != null) { + mapOf( + ServiceSpec.DEFAULT_SERVICE_NAME to + ContainerServiceDeploymentConfig(serviceContainerImage!!, additionalServiceEnvs)) + } else { + mapOf() + } + + registerGlobalConfig( + testRunnerOptions.applyToDeployerConfig(RestateDeployerConfig(serviceDeploymentConfig))) + + val testSuites = suites.resolveSuites(filter?.testSuite) + + val loadedExclusions: ExclusionsFile = + if (exclusionsFile != null) { + FileInputStream(File(exclusionsFile!!)) + .use { Yaml.default.decodeFromStream(it) } + .also { exclusions -> + println("Using exclusions file $exclusionsFile") + println("Loaded exclusions: ${exclusions.exclusions}") + } + } else { + ExclusionsFile() + } + + val reports = mutableListOf() + val newExclusions = mutableMapOf>() + var newFailures = false + for (testSuite in testSuites) { + val exclusions = loadedExclusions.exclusions?.get(testSuite.name) ?: emptyList() + val exclusionsFilters = + if (exclusions.isNotEmpty()) listOf(MethodFilter.excludeMethodNamePatterns(exclusions)) + else listOf() + val cliOptionFilter = + filter?.testName?.let { + listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(it))) + } ?: emptyList>() + + val report = + testSuite.runTests( + terminal, + testRunnerOptions.reportDir, + exclusionsFilters + cliOptionFilter, + false, + parallel) + + reports.add(report) + report.printFailuresToFiles(testRunnerOptions.reportDir) + + val failures = report.failedTests + if (failures.isNotEmpty() || exclusions.isNotEmpty()) { + newExclusions[testSuite.name] = + (failures + .mapNotNull { it.source.getOrNull() } + .mapNotNull { + when (it) { + is MethodSource -> "${it.className}.${it.methodName}" + else -> null + } + } + .distinct() + exclusions) + .sorted() + } + if (failures.isNotEmpty()) { + newFailures = true + } + } + + FileOutputStream(testRunnerOptions.reportDir.resolve("exclusions.new.yaml").toFile()).use { + Yaml.default.encodeToStream(ExclusionsFile(newExclusions.toSortedMap()), it) + } + + val succeededTests = reports.sumOf { it.succeededTests } + val executedTests = reports.sumOf { it.executedTests } + val testsStyle = if (succeededTests == executedTests) green else red + val testsInfoLine = testsStyle("* Succeeded tests: $succeededTests / $executedTests") + + val failedClasses = reports.sumOf { it.executedClasses - it.succeededClasses } + val classesStyle = if (failedClasses != 0) red else green + val classesInfoLine = classesStyle("* Failed classes initialization: $failedClasses") + + val totalDuration = reports.fold(Duration.ZERO) { d, res -> d + res.executionDuration } + + println( + """ + ${bold("========================= Final results =========================")} + 🗈 Report directory: ${testRunnerOptions.reportDir} + * Run test suites: ${reports.map { it.testSuite }} + $testsInfoLine + $classesInfoLine + * Execution time: $totalDuration + """ + .trimIndent()) + + for (report in reports) { + report.printFailuresToTerminal(terminal) + } + + exitProcess(if (newFailures) 1 else 0) + } +} + +private class Debug(private val suites: SuiteProvider) : + TestRunCommand( + "Run a single test without a service container, forwarding to a local process.") { + val testSuite by + option() + .default(suites.defaultSuite.name) + .help( + "Test suite to use for environment setup. Available: ${suites.allSuites().map { it.name }}") + val testName by option().required().help("Name of the test class to run") + val localContainers by + argument() + .convert { spec -> + if (spec.contains('=')) spec.split('=', limit = 2).let { it[0] to it[1].toInt() } + else ServiceSpec.DEFAULT_SERVICE_NAME to spec.toInt() + } + .multiple(required = true) + .help("Local service ports: '9080' or 'serviceName=9080'. Repeatable.") + val retainAfterEnd by + option() + .flag("--dont-retain-after-end", default = false) + .help("Keep the Docker network alive after the test ends (requires manual cleanup)") + val mountStateDirectory by option().help("Mount a local directory as the Restate data directory") + val localIngressPort by option().int().help("Bind Restate ingress to this host port") + val localAdminPort by option().int().help("Bind Restate admin to this host port") + val localNodePort by option().int().help("Bind Restate node-to-node port to this host port") + + override fun run() { + val terminal = Terminal() + + val restateDeployerConfig = + RestateDeployerConfig( + localContainers.associate { + it.first to LocalForwardServiceDeploymentConfig(it.second) + }, + localAdminPort = this.localAdminPort, + localIngressPort = this.localIngressPort, + localNodePort = this.localNodePort, + stateDirectoryMount = this.mountStateDirectory, + retainAfterEnd = this.retainAfterEnd) + registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) + + val suite = suites.resolveSuites(testSuite)[0] + val testFilters = + listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(testName))) + + val report = suite.runTests(terminal, testRunnerOptions.reportDir, testFilters, true, false) + + report.printFailuresToTerminal(terminal) + report.printFailuresToFiles(testRunnerOptions.reportDir) + + exitProcess(if (report.failedTests.isNotEmpty()) 1 else 0) + } +} + +fun runMain(args: Array, suites: SuiteProvider) { + val actualArgs = if (args.isEmpty()) arrayOf("run") else args + RestateE2ETests().subcommands(Run(suites), Debug(suites)).main(actualArgs) +} + +private fun testClassNameToFQCN(className: String): String { + if (className.contains('.')) return className + return "dev.restate.sdktesting.tests.$className" +} diff --git a/sdk-tests/action.yml b/sdk-tests/action.yml index e8d379ca..50ac7c80 100644 --- a/sdk-tests/action.yml +++ b/sdk-tests/action.yml @@ -13,7 +13,7 @@ inputs: default: 'ghcr.io/restatedev/restate:main' serviceContainerImage: required: true - description: Container image to use for the service + description: Docker image for the service under test exclusionsFile: required: false description: Exclusions file @@ -59,7 +59,7 @@ runs: env: RESTATE_CONTAINER_IMAGE: ${{ inputs.restateContainerImage }} shell: bash - run: java -jar sdk-tests.jar run ${{ inputs.testSuite != '' && format('--test-suite={0}', inputs.testSuite) || '' }} ${{ inputs.testName != '' && format('--test-name={0}', inputs.testName) || '' }} ${{ inputs.exclusionsFile != '' && format('--exclusions-file={0}', inputs.exclusionsFile) || '' }} ${{ inputs.serviceContainerEnvFile != '' && format('--service-container-env-file={0}', inputs.serviceContainerEnvFile) || '' }} ${{ inputs.customTestsFile != '' && format('--custom-tests-file={0}', inputs.customTestsFile) || '' }} --report-dir=test-report ${{ inputs.serviceContainerImage }} + run: java -jar sdk-tests.jar run ${{ inputs.testSuite != '' && format('--test-suite={0}', inputs.testSuite) || '' }} ${{ inputs.testName != '' && format('--test-name={0}', inputs.testName) || '' }} ${{ inputs.exclusionsFile != '' && format('--exclusions-file={0}', inputs.exclusionsFile) || '' }} ${{ inputs.serviceContainerEnvFile != '' && format('--service-container-env-file={0}', inputs.serviceContainerEnvFile) || '' }} ${{ inputs.customTestsFile != '' && format('--custom-tests-file={0}', inputs.customTestsFile) || '' }} --service-container-image=${{ inputs.serviceContainerImage }} --report-dir=test-report # Upload logs and publish test result - uses: actions/upload-artifact@v4 diff --git a/sdk-tests/build.gradle.kts b/sdk-tests/build.gradle.kts index 30e5a97c..89816a8a 100644 --- a/sdk-tests/build.gradle.kts +++ b/sdk-tests/build.gradle.kts @@ -55,9 +55,7 @@ allOpen { application { mainClass = "dev.restate.sdktesting.MainKt" } -tasks.shadowJar { - archiveClassifier = "" -} +tasks.shadowJar { archiveClassifier = "" } spotless { kotlin { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 7f875da2..e08bc3fe 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -8,21 +8,95 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.junit -object TestSuites { - val DEFAULT_SUITE = TestSuite("default", emptyMap(), "none() | always-suspending | customTests") +import dev.restate.sdktesting.tests.CallOrdering +import dev.restate.sdktesting.tests.Cancellation +import dev.restate.sdktesting.tests.Combinators +import dev.restate.sdktesting.tests.Custom +import dev.restate.sdktesting.tests.Ingress +import dev.restate.sdktesting.tests.KillInvocation +import dev.restate.sdktesting.tests.KillRuntime +import dev.restate.sdktesting.tests.NonDeterminismErrors +import dev.restate.sdktesting.tests.ProxyRequestSigning +import dev.restate.sdktesting.tests.RunFlush +import dev.restate.sdktesting.tests.RunRetry +import dev.restate.sdktesting.tests.ServiceToServiceCommunication +import dev.restate.sdktesting.tests.Sleep +import dev.restate.sdktesting.tests.SleepWithFailures +import dev.restate.sdktesting.tests.State +import dev.restate.sdktesting.tests.StopRuntime +import dev.restate.sdktesting.tests.UserErrors +import dev.restate.sdktesting.tests.WorkflowAPI + +object TestSuites : SuiteProvider { + override val defaultSuite: TestSuite + get() = DEFAULT_SUITE + + val DEFAULT_SUITE = + TestSuite( + "default", + emptyMap(), + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + val THREE_NODES_SUITE = TestSuite( "threeNodes", mapOf( "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(none() | always-suspending) & !only-single-node & !customTests", + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + ), 3) + private val ALWAYS_SUSPENDING_SUITE = TestSuite( "alwaysSuspending", mapOf("RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s"), - "always-suspending | only-always-suspending") + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + private val THREE_NODES_ALWAYS_SUSPENDING_SUITE = TestSuite( "threeNodesAlwaysSuspending", @@ -30,8 +104,20 @@ object TestSuites { "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(always-suspending | only-always-suspending) & !only-single-node", + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + ), 3) + private val SINGLE_THREAD_SINGLE_PARTITION_SUITE = TestSuite( "singleThreadSinglePartition", @@ -39,14 +125,32 @@ object TestSuites { "RESTATE_DEFAULT_NUM_PARTITIONS" to "1", "RESTATE_DEFAULT_THREAD_POOL_SIZE" to "1", ), - "none() | always-suspending | stop-runtime") + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + private val LAZY_STATE_SUITE = TestSuite( "lazyState", mapOf( "RESTATE_WORKER__INVOKER__DISABLE_EAGER_STATE" to "true", ), - "lazy-state") + listOf(clazz())) + private val LAZY_STATE_ALWAYS_SUSPENDING_SUITE = TestSuite( "lazyStateAlwaysSuspending", @@ -54,12 +158,15 @@ object TestSuites { "RESTATE_WORKER__INVOKER__DISABLE_EAGER_STATE" to "true", "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", ), - "lazy-state") + listOf(clazz())) + private val PERSISTED_TIMERS_SUITE = TestSuite( - "persistedTimers", mapOf("RESTATE_WORKER__NUM_TIMERS_IN_MEMORY_LIMIT" to "1"), "timers") + "persistedTimers", + mapOf("RESTATE_WORKER__NUM_TIMERS_IN_MEMORY_LIMIT" to "1"), + listOf(clazz(), method("manySleeps"))) - fun allSuites(): List { + override fun allSuites(): List { return listOf( DEFAULT_SUITE, THREE_NODES_SUITE, @@ -71,7 +178,7 @@ object TestSuites { PERSISTED_TIMERS_SUITE) } - fun resolveSuites(suite: String?): List { + override fun resolveSuites(suite: String?): List { return when (suite ?: "all") { "all" -> allSuites() else -> { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt index d595ae0d..f81d2943 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt @@ -8,323 +8,6 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting -import com.charleskorn.kaml.Yaml -import com.charleskorn.kaml.decodeFromStream -import com.charleskorn.kaml.encodeToStream -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.core.subcommands -import com.github.ajalt.clikt.parameters.arguments.* -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.groups.provideDelegate -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.enum -import com.github.ajalt.clikt.parameters.types.int -import com.github.ajalt.clikt.parameters.types.path -import com.github.ajalt.mordant.rendering.TextColors.green -import com.github.ajalt.mordant.rendering.TextColors.red -import com.github.ajalt.mordant.rendering.TextStyles.bold -import com.github.ajalt.mordant.terminal.Terminal -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.junit.ExecutionResult import dev.restate.sdktesting.junit.TestSuites -import io.github.cdimascio.dotenv.Dotenv -import java.io.File -import java.io.FileInputStream -import java.io.FileOutputStream -import java.nio.file.Path -import java.time.LocalDateTime -import java.time.format.DateTimeFormatter -import kotlin.jvm.optionals.getOrNull -import kotlin.system.exitProcess -import kotlin.time.Duration -import kotlinx.serialization.Serializable -import org.junit.platform.engine.Filter -import org.junit.platform.engine.discovery.ClassNameFilter -import org.junit.platform.engine.support.descriptor.MethodSource -import org.junit.platform.launcher.MethodFilter -@Serializable data class ExclusionsFile(val exclusions: Map> = emptyMap()) - -class RestateSdkTestSuite : CliktCommand() { - override fun run() { - // Disable log4j2 JMX, this prevents reconfiguration - System.setProperty("log4j2.disable.jmx", "true") - // This is hours of debugging, don't touch it - // tl;dr this makes sure a single log4j2 configuration exists for the whole JVM, - // important to make Configurator.reconfigure work - System.setProperty( - "log4j2.contextSelector", "org.apache.logging.log4j.core.selector.BasicContextSelector") - // The default keep alive time is way too long, and this is a problem when we stop and restart - // containers. - System.setProperty("jdk.httpclient.keepalive.timeout", "5") - // The health check strategy uses the HttpUrlConnection which has no connect timeout by default. - // Could have caused the health check to hang indefinitely. - System.setProperty("sun.net.client.defaultConnectTimeout", "5000") - // Enable Logging of JDK client - // System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager") - // System.setProperty("jdk.httpclient.HttpClient.log", "all") - } -} - -class TestRunnerOptions : OptionGroup() { - val restateContainerImage by - option(envvar = "RESTATE_CONTAINER_IMAGE").help("Image used to run Restate") - val reportDir by - option(envvar = "TEST_REPORT_DIR").path().help("Base report directory").defaultLazy { - defaultReportDirectory() - } - val imagePullPolicy by - option() - .enum() - .help( - "Pull policy used to pull containers required for testing. In case of ALWAYS, docker won't pull images with repository prefix restate.local or localhost") - .default(PullPolicy.ALWAYS) - val customTestsFile by - option("--custom-tests", "--custom-tests-file") - .help("File containing the custom tests configurations") - - fun applyToDeployerConfig(deployerConfig: RestateDeployerConfig): RestateDeployerConfig { - var newConfig = deployerConfig - if (restateContainerImage != null) { - newConfig = newConfig.copy(restateContainerImage = restateContainerImage!!) - } - if (customTestsFile != null) { - newConfig = newConfig.copy(customTestsFile = customTestsFile!!) - } - newConfig = newConfig.copy(imagePullPolicy = imagePullPolicy) - return newConfig - } - - private fun defaultReportDirectory(): Path { - val formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss") - return Path.of("test_report/${LocalDateTime.now().format(formatter)}").toAbsolutePath() - } -} - -class FilterOptions : OptionGroup() { - val testSuite by - option() - .required() - .help( - "Test suite to run. Available: ${listOf("all") + TestSuites.allSuites().map { it.name }}") - val testName by option().help("Name of the test to run for the given suite") -} - -abstract class TestRunCommand(help: String) : CliktCommand(help) { - val testRunnerOptions by TestRunnerOptions() -} - -class Run : - TestRunCommand( - """ -Run test suite, executing the service as container. -""" - .trimIndent()) { - val filter by FilterOptions().cooccurring() - val exclusionsFile by - option("--exclusions", "--exclusions-file").help("File containing the excluded tests") - val parallel by - option(help = "Enable parallel testing") - .help( - "If set, runs tests in parallel. We suggest running tests sequentially when using podman") - .flag("--sequential", default = true) - val serviceContainerEnvFile by - option("--service-container-env-file").help(".env file to apply to service container") - val imageName by argument() - - override fun run() { - val terminal = Terminal() - - val additionalServiceEnvs = - serviceContainerEnvFile - ?.let { - Dotenv.configure().filename(it).load().entries().associate { it.key to it.value } - } - .orEmpty() - - val restateDeployerConfig = - RestateDeployerConfig( - mapOf( - ServiceSpec.DEFAULT_SERVICE_NAME to - ContainerServiceDeploymentConfig(imageName, additionalServiceEnvs)), - ) - - // Register global config of the deployer - registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) - - // Resolve test configurations - val testSuites = TestSuites.resolveSuites(filter?.testSuite) - - // Load exclusions file - val loadedExclusions: ExclusionsFile = - if (exclusionsFile != null) { - FileInputStream(File(exclusionsFile!!)).use { Yaml.default.decodeFromStream(it) } - } else { - ExclusionsFile() - } - - val reports = mutableListOf() - val newExclusions = mutableMapOf>() - var newFailures = false - for (testSuite in testSuites) { - val exclusions = loadedExclusions.exclusions[testSuite.name] ?: emptyList() - val exclusionsFilters = exclusions.map { MethodFilter.excludeMethodNamePatterns(it) } - val cliOptionFilter = - filter?.testName?.let { - listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(it))) - } ?: emptyList>() - - val report = - testSuite.runTests( - terminal, - testRunnerOptions.reportDir, - exclusionsFilters + cliOptionFilter, - false, - parallel) - - reports.add(report) - // No need to wait the end of the run for this - report.printFailuresToFiles(testRunnerOptions.reportDir) - val failures = report.failedTests - if (failures.isNotEmpty() || exclusions.isNotEmpty()) { - newExclusions[testSuite.name] = - (failures - .mapNotNull { it.source.getOrNull() } - .mapNotNull { - when (it) { - is MethodSource -> "${it.className}.${it.methodName}" - else -> null - } - } - .distinct() + exclusions) - .sorted() - } - if (failures.isNotEmpty()) { - newFailures = true - } - } - - // Write out the exclusions file - FileOutputStream(testRunnerOptions.reportDir.resolve("exclusions.new.yaml").toFile()).use { - Yaml.default.encodeToStream(ExclusionsFile(newExclusions.toSortedMap()), it) - } - - // Print final report - val succeededTests = reports.sumOf { it.succeededTests } - val executedTests = reports.sumOf { it.executedTests } - val testsStyle = if (succeededTests == executedTests) green else red - val testsInfoLine = testsStyle("""* Succeeded tests: $succeededTests / ${executedTests}""") - - val failedClasses = reports.sumOf { it.executedClasses - it.succeededClasses } - val classesStyle = if (failedClasses != 0) red else green - val classesInfoLine = classesStyle("""* Failed classes initialization: $failedClasses""") - - val totalDuration = reports.fold(Duration.ZERO) { d, res -> d + res.executionDuration } - - println( - """ - ${bold("========================= Final results =========================")} - 🗈 Report directory: ${testRunnerOptions.reportDir} - * Run test suites: ${reports.map { it.testSuite }} - $testsInfoLine - $classesInfoLine - * Execution time: $totalDuration - """ - .trimIndent()) - - for (report in reports) { - report.printFailuresToTerminal(terminal) - } - - if (newFailures) { - // Exit - exitProcess(1) - } - } -} - -class Debug : - TestRunCommand( - """ -Run test suite, without executing the service inside a container. -""" - .trimIndent()) { - val testSuite by - option() - .default(TestSuites.DEFAULT_SUITE.name) - .help("Test suite to run. Available: ${TestSuites.allSuites().map { it.name }}") - val testName by option().required().help("Name of the test to run for the given suite") - val localContainers by - argument() - .convert { localContainerSpec -> - if (localContainerSpec.contains('=')) { - localContainerSpec.split('=', limit = 2).let { it[0] to it[1].toInt() } - } else { - ServiceSpec.DEFAULT_SERVICE_NAME to localContainerSpec.toInt() - } - } - .multiple(required = true) - .help( - "Local containers name=ports. Example: '9080' (for default-service container), 'otherContainer=9081'") - val retainAfterEnd by - option() - .flag("--dont-retain-after-end", default = false) - .help( - "Retain the created docker network after the end of the test. You MUST manually clean it up afterwards!") - val mountStateDirectory by - option() - .help( - "Mount the given state directory as restate data when starting the runtime container") - val localIngressPort by option().int().help("Ingress port to bind the restate container") - val localAdminPort by option().int().help("Admin port to bind the restate container") - val localNodePort by option().int().help("Node port to bind the restate container") - - override fun run() { - val terminal = Terminal() - - // Register global config of the deployer - val restateDeployerConfig = - RestateDeployerConfig( - localContainers.associate { - it.first to LocalForwardServiceDeploymentConfig(it.second) - }, - localAdminPort = this.localAdminPort, - localIngressPort = this.localIngressPort, - localNodePort = this.localNodePort, - stateDirectoryMount = this.mountStateDirectory, - retainAfterEnd = this.retainAfterEnd) - registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) - - if (restateDeployerConfig.retainAfterEnd) { - // Disable ryuk, as it will otherwise cleanup the network after the JVM goes away. - // System.getenv().put("TESTCONTAINERS_RYUK_DISABLED", "true") - - } - - // Resolve test configurations - val testSuite = TestSuites.resolveSuites(testSuite)[0] - val testFilters = - listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(testName))) - - val report = testSuite.runTests(terminal, testRunnerOptions.reportDir, testFilters, true, false) - - report.printFailuresToTerminal(terminal) - report.printFailuresToFiles(testRunnerOptions.reportDir) - - if (report.failedTests.isNotEmpty()) { - // Exit - exitProcess(1) - } - } -} - -fun main(args: Array) = RestateSdkTestSuite().subcommands(Run(), Debug()).main(args) - -private fun testClassNameToFQCN(className: String): String { - if (className.contains('.')) { - // Then it's FQCN - return className - } - return "dev.restate.sdktesting.tests.${className}" -} +fun main(args: Array) = runMain(args, TestSuites) diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt index d9b57c08..0a11fc50 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt @@ -27,12 +27,10 @@ import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource -@Tag("always-suspending") class Cancellation { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt index 973e66e4..526c34ed 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt @@ -28,13 +28,11 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class Combinators { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt index d71bcf90..e65e60b1 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt @@ -22,7 +22,6 @@ import java.util.stream.Stream import kotlin.time.Duration.Companion.minutes import kotlinx.serialization.Serializable import org.apache.logging.log4j.LogManager -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution @@ -30,7 +29,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource -@Tag("customTests") class Custom { @Serializable diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt index 507e6b6a..29c98a97 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt @@ -24,13 +24,10 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.atMost import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension -@Tag("always-suspending") -@Tag("only-single-node") class KillRuntime { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt index 074d4721..029a3e25 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt @@ -21,7 +21,6 @@ import java.util.UUID import org.assertj.core.api.Assertions.* import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode @@ -29,8 +28,6 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource /** Test non-determinism/journal mismatch checks in the SDKs. */ -@Tag("only-always-suspending") -@Tag("only-single-node") class NonDeterminismErrors { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt index 14e5636d..b0f83adc 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt @@ -17,13 +17,11 @@ import dev.restate.sdktesting.infra.RestateDeployerExtension import dev.restate.sdktesting.infra.ServiceSpec import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("only-always-suspending") class RunFlush { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt index eaff9585..f24ce66c 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt @@ -18,13 +18,11 @@ import dev.restate.sdktesting.infra.ServiceSpec import java.util.* import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class RunRetry { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt index 904957dd..34d4701c 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt @@ -28,14 +28,12 @@ import kotlin.time.Duration.Companion.seconds import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class ServiceToServiceCommunication { companion object { @@ -184,7 +182,6 @@ class ServiceToServiceCommunication { @Test @Execution(ExecutionMode.CONCURRENT) @Timeout(value = 30, unit = TimeUnit.SECONDS) - @Tag("timers") fun oneWayCallWithDelay(@InjectClient ingressClient: Client) = runTest(timeout = 30.seconds) { val counterId = UUID.randomUUID().toString() diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt index 4302ed40..1c9d161b 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt @@ -24,15 +24,12 @@ import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") -@Tag("timers") class Sleep { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt index d918c1f5..2eab7868 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt @@ -22,13 +22,11 @@ import kotlin.time.TimeSource import kotlin.time.toJavaDuration import kotlinx.coroutines.* import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension // -- Sleep tests with terminations/killings of service endpoint -@Tag("always-suspending") class SleepWithFailures { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt index e6aaa8ef..33821410 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt @@ -25,14 +25,11 @@ import java.util.function.Function import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") -@Tag("lazy-state") class State { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt index 9682e087..5606a7b6 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt @@ -25,13 +25,10 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.atMost import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension -@Tag("always-suspending") -@Tag("only-single-node") class StopRuntime { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt index fb3f5349..de172673 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt @@ -23,13 +23,11 @@ import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class UserErrors { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt index 85e67c63..c7a70719 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt @@ -18,13 +18,11 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class WorkflowAPI { companion object {