From def1c36954a0f3221397d508fc1db1e82654875c Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 09:47:00 +0200 Subject: [PATCH 1/4] Unify the main --- .../restate/sdktesting/junit/TestSuites.kt | 8 +- .../kotlin/dev/restate/sdktesting/main.kt | 247 +------------- infra/build.gradle.kts | 2 + .../restate/sdktesting/junit/SuiteProvider.kt | 15 + .../kotlin/dev/restate/sdktesting/runner.kt | 313 +++++++++++++++++ sdk-tests/action.yml | 4 +- .../restate/sdktesting/junit/TestSuites.kt | 8 +- .../kotlin/dev/restate/sdktesting/main.kt | 319 +----------------- 8 files changed, 344 insertions(+), 572 deletions(-) create mode 100644 infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt create mode 100644 infra/src/main/kotlin/dev/restate/sdktesting/runner.kt diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 5d6a92b0..8ce1913e 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -8,7 +8,9 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.junit -object TestSuites { +object TestSuites : SuiteProvider { + override val defaultSuite: TestSuite get() = DEFAULT_SUITE + val DEFAULT_SUITE = TestSuite("default", emptyMap(), "none() | always-suspending | only-single-node") val THREE_NODES_SUITE = @@ -36,7 +38,7 @@ object TestSuites { private val VERSION_COMPATIBILITY_SUITE = TestSuite("versionCompat", emptyMap(), "version-compatibility") - fun allSuites(): List { + override fun allSuites(): List { return listOf( DEFAULT_SUITE, THREE_NODES_SUITE, @@ -45,7 +47,7 @@ object TestSuites { VERSION_COMPATIBILITY_SUITE) } - fun resolveSuites(suite: String?): List { + override fun resolveSuites(suite: String?): List { return when (suite ?: "all") { "all" -> allSuites() else -> { diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt index ada51ed8..f81d2943 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/main.kt @@ -8,251 +8,6 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting -import com.charleskorn.kaml.Yaml -import com.charleskorn.kaml.decodeFromStream -import com.charleskorn.kaml.encodeToStream -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.core.subcommands -import com.github.ajalt.clikt.parameters.arguments.* -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.groups.provideDelegate -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.enum -import com.github.ajalt.clikt.parameters.types.path -import com.github.ajalt.mordant.rendering.TextColors.green -import com.github.ajalt.mordant.rendering.TextColors.red -import com.github.ajalt.mordant.rendering.TextStyles.bold -import com.github.ajalt.mordant.terminal.Terminal -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.junit.ExecutionResult import dev.restate.sdktesting.junit.TestSuites -import java.io.File -import java.io.FileInputStream -import java.io.FileOutputStream -import java.nio.file.Path -import java.time.LocalDateTime -import java.time.format.DateTimeFormatter -import kotlin.jvm.optionals.getOrNull -import kotlin.system.exitProcess -import kotlin.time.Duration -import kotlinx.serialization.Serializable -import org.junit.platform.engine.Filter -import org.junit.platform.engine.discovery.ClassNameFilter -import org.junit.platform.engine.support.descriptor.MethodSource -import org.junit.platform.launcher.MethodFilter -@Serializable data class ExclusionsFile(val exclusions: Map>? = emptyMap()) - -class RestateSdkTestSuite : CliktCommand() { - override fun run() { - // Disable log4j2 JMX, this prevents reconfiguration - System.setProperty("log4j2.disable.jmx", "true") - // This is hours of debugging, don't touch it - // tl;dr this makes sure a single log4j2 configuration exists for the whole JVM, - // important to make Configurator.reconfigure work - System.setProperty( - "log4j2.contextSelector", "org.apache.logging.log4j.core.selector.BasicContextSelector") - // The default keep alive time is way too long, and this is a problem when we stop and restart - // containers. - System.setProperty("jdk.httpclient.keepalive.timeout", "5") - // The health check strategy uses the HttpUrlConnection which has no connect timeout by default. - // Could have caused the health check to hang indefinitely. - System.setProperty("sun.net.client.defaultConnectTimeout", "5000") - // Enable Logging of JDK client - // System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager") - // System.setProperty("jdk.httpclient.HttpClient.log", "all") - } -} - -class TestRunnerOptions : OptionGroup() { - val restateContainerImage by - option(envvar = "RESTATE_CONTAINER_IMAGE").help("Image used to run Restate") - val reportDir by - option(envvar = "TEST_REPORT_DIR").path().help("Base report directory").defaultLazy { - defaultReportDirectory() - } - val imagePullPolicy by - option() - .enum() - .help( - "Pull policy used to pull containers required for testing. In case of ALWAYS, docker won't pull images with repository prefix restate.local or localhost") - .default(PullPolicy.ALWAYS) - - fun applyToDeployerConfig(deployerConfig: RestateDeployerConfig): RestateDeployerConfig { - var newConfig = deployerConfig - if (restateContainerImage != null) { - newConfig = newConfig.copy(restateContainerImage = restateContainerImage!!) - } - newConfig = newConfig.copy(imagePullPolicy = imagePullPolicy) - return newConfig - } - - private fun defaultReportDirectory(): Path { - val formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss") - return Path.of("test_report/${LocalDateTime.now().format(formatter)}").toAbsolutePath() - } -} - -class FilterOptions : OptionGroup() { - val testSuite by - option() - .required() - .help( - "Test suite to run. Available: ${listOf("all") + TestSuites.allSuites().map { it.name }}") - val testName by option().help("Name of the test to run for the given suite") -} - -abstract class TestRunCommand(help: String) : CliktCommand(help) { - val testRunnerOptions by TestRunnerOptions() -} - -class Run : - TestRunCommand( - """ -Run test suite, executing the service as container. -""" - .trimIndent()) { - val filter by FilterOptions().cooccurring() - val exclusionsFile by - option("--exclusions", "--exclusions-file", envvar = "TEST_EXCLUSIONS_FILE") - .help("File containing the excluded tests") - val parallel by - option(help = "Enable parallel testing") - .help( - "If set, runs tests in parallel. We suggest running tests sequentially when using podman") - .flag("--sequential", default = true) - - override fun run() { - val terminal = Terminal() - - val restateDeployerConfig = - RestateDeployerConfig( - mapOf(), - ) - - // Register global config of the deployer - registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) - - // Resolve test configurations - val testSuites = TestSuites.resolveSuites(filter?.testSuite) - - // Load exclusions file - val loadedExclusions: ExclusionsFile = - if (exclusionsFile != null) { - FileInputStream(File(exclusionsFile!!)) - .use { Yaml.default.decodeFromStream(it) } - .also { exclusions -> - println("Using exclusions file $exclusionsFile") - println("Loaded exclusions: ${exclusions.exclusions}") - } - } else { - ExclusionsFile() - } - - val reports = mutableListOf() - val newExclusions = mutableMapOf>() - var newFailures = false - for (testSuite in testSuites) { - val exclusions = loadedExclusions.exclusions?.get(testSuite.name) ?: emptyList() - val exclusionsFilters = - if (exclusions.isNotEmpty()) { - listOf(MethodFilter.excludeMethodNamePatterns(exclusions)) - } else { - listOf() - } - - val cliOptionFilter = - filter?.testName?.let { - listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(it))) - } ?: emptyList>() - - val report = - testSuite.runTests( - terminal, - testRunnerOptions.reportDir, - exclusionsFilters + cliOptionFilter, - false, - parallel) - - reports.add(report) - // No need to wait the end of the run for this - report.printFailuresToFiles(testRunnerOptions.reportDir) - val failures = report.failedTests - if (failures.isNotEmpty() || exclusions.isNotEmpty()) { - newExclusions[testSuite.name] = - (failures - .mapNotNull { it.source.getOrNull() } - .mapNotNull { - when (it) { - is MethodSource -> "${it.className}.${it.methodName}" - else -> null - } - } - .distinct() + exclusions) - .sorted() - } - if (failures.isNotEmpty()) { - newFailures = true - } - } - - // Write out the exclusions file - FileOutputStream(testRunnerOptions.reportDir.resolve("exclusions.new.yaml").toFile()).use { - Yaml.default.encodeToStream(ExclusionsFile(newExclusions.toSortedMap()), it) - } - - // Print final report - val succeededTests = reports.sumOf { it.succeededTests } - val executedTests = reports.sumOf { it.executedTests } - val testsStyle = if (succeededTests == executedTests) green else red - val testsInfoLine = testsStyle("""* Succeeded tests: $succeededTests / ${executedTests}""") - - val failedClasses = reports.sumOf { it.executedClasses - it.succeededClasses } - val classesStyle = if (failedClasses != 0) red else green - val classesInfoLine = classesStyle("""* Failed classes initialization: $failedClasses""") - - val totalDuration = reports.fold(Duration.ZERO) { d, res -> d + res.executionDuration } - - println( - """ - ${bold("========================= Final results =========================")} - 🗈 Report directory: ${testRunnerOptions.reportDir} - * Run test suites: ${reports.map { it.testSuite }} - $testsInfoLine - $classesInfoLine - * Execution time: $totalDuration - """ - .trimIndent()) - - for (report in reports) { - report.printFailuresToTerminal(terminal) - } - - exitProcess( - if (newFailures) { - 1 - } else { - 0 - }) - } -} - -fun main(args: Array) { - val args = - if (args.isEmpty()) { - arrayOf("run") - } else { - args - } - - RestateSdkTestSuite().subcommands(Run()).main(args) -} - -private fun testClassNameToFQCN(className: String): String { - if (className.contains('.')) { - // Then it's FQCN - return className - } - return "dev.restate.sdktesting.tests.${className}" -} +fun main(args: Array) = runMain(args, TestSuites) diff --git a/infra/build.gradle.kts b/infra/build.gradle.kts index e1dc8318..d2ec0b73 100644 --- a/infra/build.gradle.kts +++ b/infra/build.gradle.kts @@ -27,7 +27,9 @@ dependencies { compileOnly(libs.tomcat.annotations) compileOnly(libs.google.findbugs.jsr305) + implementation(libs.clikt) implementation(libs.mordant) + implementation(libs.dotenv) implementation(libs.restate.sdk.client.kotlin) implementation(libs.restate.sdk.kotlin.http) implementation(libs.vertx) diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt new file mode 100644 index 00000000..d940f4aa --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt @@ -0,0 +1,15 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.junit + +interface SuiteProvider { + val defaultSuite: TestSuite + fun allSuites(): List + fun resolveSuites(suite: String?): List +} diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt b/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt new file mode 100644 index 00000000..21478d07 --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt @@ -0,0 +1,313 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting + +import com.charleskorn.kaml.Yaml +import com.charleskorn.kaml.decodeFromStream +import com.charleskorn.kaml.encodeToStream +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.core.subcommands +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.arguments.convert +import com.github.ajalt.clikt.parameters.arguments.help +import com.github.ajalt.clikt.parameters.arguments.multiple +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.cooccurring +import com.github.ajalt.clikt.parameters.groups.provideDelegate +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.enum +import com.github.ajalt.clikt.parameters.types.int +import com.github.ajalt.clikt.parameters.types.path +import com.github.ajalt.mordant.rendering.TextColors.green +import com.github.ajalt.mordant.rendering.TextColors.red +import com.github.ajalt.mordant.rendering.TextStyles.bold +import com.github.ajalt.mordant.terminal.Terminal +import dev.restate.sdktesting.infra.ContainerServiceDeploymentConfig +import dev.restate.sdktesting.infra.LocalForwardServiceDeploymentConfig +import dev.restate.sdktesting.infra.PullPolicy +import dev.restate.sdktesting.infra.RestateDeployerConfig +import dev.restate.sdktesting.infra.ServiceSpec +import dev.restate.sdktesting.infra.registerGlobalConfig +import dev.restate.sdktesting.junit.ExecutionResult +import dev.restate.sdktesting.junit.SuiteProvider +import io.github.cdimascio.dotenv.Dotenv +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.nio.file.Path +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import kotlin.jvm.optionals.getOrNull +import kotlin.system.exitProcess +import kotlin.time.Duration +import kotlinx.serialization.Serializable +import org.junit.platform.engine.Filter +import org.junit.platform.engine.discovery.ClassNameFilter +import org.junit.platform.engine.support.descriptor.MethodSource +import org.junit.platform.launcher.MethodFilter + +@Serializable data class ExclusionsFile(val exclusions: Map>? = emptyMap()) + +private class RestateE2ETests : CliktCommand() { + override fun run() { + // Disable log4j2 JMX, this prevents reconfiguration + System.setProperty("log4j2.disable.jmx", "true") + // This is hours of debugging, don't touch it + // tl;dr this makes sure a single log4j2 configuration exists for the whole JVM, + // important to make Configurator.reconfigure work + System.setProperty( + "log4j2.contextSelector", "org.apache.logging.log4j.core.selector.BasicContextSelector") + // The default keep alive time is way too long, and this is a problem when we stop and restart + // containers. + System.setProperty("jdk.httpclient.keepalive.timeout", "5") + // The health check strategy uses the HttpUrlConnection which has no connect timeout by default. + // Could have caused the health check to hang indefinitely. + System.setProperty("sun.net.client.defaultConnectTimeout", "5000") + } +} + +private class TestRunnerOptions : OptionGroup() { + val restateContainerImage by + option(envvar = "RESTATE_CONTAINER_IMAGE").help("Image used to run Restate") + val reportDir by + option(envvar = "TEST_REPORT_DIR").path().help("Base report directory").defaultLazy { + defaultReportDirectory() + } + val imagePullPolicy by + option() + .enum() + .help( + "Pull policy for container images. ALWAYS skips pulling images prefixed with restate.local or localhost") + .default(PullPolicy.ALWAYS) + val customTestsFile by + option("--custom-tests", "--custom-tests-file") + .help("File containing the custom tests configurations") + + fun applyToDeployerConfig(deployerConfig: RestateDeployerConfig): RestateDeployerConfig { + var newConfig = deployerConfig + if (restateContainerImage != null) { + newConfig = newConfig.copy(restateContainerImage = restateContainerImage!!) + } + if (customTestsFile != null) { + newConfig = newConfig.copy(customTestsFile = customTestsFile!!) + } + newConfig = newConfig.copy(imagePullPolicy = imagePullPolicy) + return newConfig + } + + private fun defaultReportDirectory(): Path { + val formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss") + return Path.of("test_report/${LocalDateTime.now().format(formatter)}").toAbsolutePath() + } +} + +private class FilterOptions(suites: SuiteProvider) : OptionGroup() { + val testSuite by + option() + .required() + .help( + "Test suite to run. Available: ${listOf("all") + suites.allSuites().map { it.name }}") + val testName by option().help("Name of the test class to run within the suite") +} + +private abstract class TestRunCommand(help: String) : CliktCommand(help) { + val testRunnerOptions by TestRunnerOptions() +} + +private class Run(private val suites: SuiteProvider) : + TestRunCommand("Run test suite, executing the service as a container.") { + val filter by FilterOptions(suites).cooccurring() + val exclusionsFile by + option("--exclusions", "--exclusions-file", envvar = "TEST_EXCLUSIONS_FILE") + .help("YAML file containing the excluded tests") + val parallel by + option() + .flag("--sequential", default = true) + .help( + "Run tests in parallel (default). Use --sequential with Podman or to reduce resource usage.") + val serviceContainerImage by + option("--service-container-image", envvar = "SERVICE_CONTAINER_IMAGE") + .help( + "Docker image for the service under test. If omitted, no service container is deployed.") + val serviceContainerEnvFile by + option("--service-container-env-file") + .help(".env file whose variables are injected into the service container") + + override fun run() { + val terminal = Terminal() + + val additionalServiceEnvs = + serviceContainerEnvFile + ?.let { envFile -> + Dotenv.configure().filename(envFile).load().entries().associate { it.key to it.value } + } + .orEmpty() + + val serviceDeploymentConfig = + if (serviceContainerImage != null) { + mapOf( + ServiceSpec.DEFAULT_SERVICE_NAME to + ContainerServiceDeploymentConfig(serviceContainerImage!!, additionalServiceEnvs)) + } else { + mapOf() + } + + registerGlobalConfig( + testRunnerOptions.applyToDeployerConfig(RestateDeployerConfig(serviceDeploymentConfig))) + + val testSuites = suites.resolveSuites(filter?.testSuite) + + val loadedExclusions: ExclusionsFile = + if (exclusionsFile != null) { + FileInputStream(File(exclusionsFile!!)) + .use { Yaml.default.decodeFromStream(it) } + .also { exclusions -> + println("Using exclusions file $exclusionsFile") + println("Loaded exclusions: ${exclusions.exclusions}") + } + } else { + ExclusionsFile() + } + + val reports = mutableListOf() + val newExclusions = mutableMapOf>() + var newFailures = false + for (testSuite in testSuites) { + val exclusions = loadedExclusions.exclusions?.get(testSuite.name) ?: emptyList() + val exclusionsFilters = + if (exclusions.isNotEmpty()) listOf(MethodFilter.excludeMethodNamePatterns(exclusions)) + else listOf() + val cliOptionFilter = + filter?.testName?.let { + listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(it))) + } ?: emptyList>() + + val report = + testSuite.runTests( + terminal, + testRunnerOptions.reportDir, + exclusionsFilters + cliOptionFilter, + false, + parallel) + + reports.add(report) + report.printFailuresToFiles(testRunnerOptions.reportDir) + + val failures = report.failedTests + if (failures.isNotEmpty() || exclusions.isNotEmpty()) { + newExclusions[testSuite.name] = + (failures + .mapNotNull { it.source.getOrNull() } + .mapNotNull { + when (it) { + is MethodSource -> "${it.className}.${it.methodName}" + else -> null + } + } + .distinct() + exclusions) + .sorted() + } + if (failures.isNotEmpty()) { + newFailures = true + } + } + + FileOutputStream(testRunnerOptions.reportDir.resolve("exclusions.new.yaml").toFile()).use { + Yaml.default.encodeToStream(ExclusionsFile(newExclusions.toSortedMap()), it) + } + + val succeededTests = reports.sumOf { it.succeededTests } + val executedTests = reports.sumOf { it.executedTests } + val testsStyle = if (succeededTests == executedTests) green else red + val testsInfoLine = testsStyle("* Succeeded tests: $succeededTests / $executedTests") + + val failedClasses = reports.sumOf { it.executedClasses - it.succeededClasses } + val classesStyle = if (failedClasses != 0) red else green + val classesInfoLine = classesStyle("* Failed classes initialization: $failedClasses") + + val totalDuration = reports.fold(Duration.ZERO) { d, res -> d + res.executionDuration } + + println( + """ + ${bold("========================= Final results =========================")} + 🗈 Report directory: ${testRunnerOptions.reportDir} + * Run test suites: ${reports.map { it.testSuite }} + $testsInfoLine + $classesInfoLine + * Execution time: $totalDuration + """ + .trimIndent()) + + for (report in reports) { + report.printFailuresToTerminal(terminal) + } + + exitProcess(if (newFailures) 1 else 0) + } +} + +private class Debug(private val suites: SuiteProvider) : + TestRunCommand("Run a single test without a service container, forwarding to a local process.") { + val testSuite by + option() + .default(suites.defaultSuite.name) + .help("Test suite to use for environment setup. Available: ${suites.allSuites().map { it.name }}") + val testName by option().required().help("Name of the test class to run") + val localContainers by + argument() + .convert { spec -> + if (spec.contains('=')) spec.split('=', limit = 2).let { it[0] to it[1].toInt() } + else ServiceSpec.DEFAULT_SERVICE_NAME to spec.toInt() + } + .multiple(required = true) + .help("Local service ports: '9080' or 'serviceName=9080'. Repeatable.") + val retainAfterEnd by + option() + .flag("--dont-retain-after-end", default = false) + .help("Keep the Docker network alive after the test ends (requires manual cleanup)") + val mountStateDirectory by + option().help("Mount a local directory as the Restate data directory") + val localIngressPort by option().int().help("Bind Restate ingress to this host port") + val localAdminPort by option().int().help("Bind Restate admin to this host port") + val localNodePort by option().int().help("Bind Restate node-to-node port to this host port") + + override fun run() { + val terminal = Terminal() + + val restateDeployerConfig = + RestateDeployerConfig( + localContainers.associate { it.first to LocalForwardServiceDeploymentConfig(it.second) }, + localAdminPort = this.localAdminPort, + localIngressPort = this.localIngressPort, + localNodePort = this.localNodePort, + stateDirectoryMount = this.mountStateDirectory, + retainAfterEnd = this.retainAfterEnd) + registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) + + val suite = suites.resolveSuites(testSuite)[0] + val testFilters = listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(testName))) + + val report = suite.runTests(terminal, testRunnerOptions.reportDir, testFilters, true, false) + + report.printFailuresToTerminal(terminal) + report.printFailuresToFiles(testRunnerOptions.reportDir) + + exitProcess(if (report.failedTests.isNotEmpty()) 1 else 0) + } +} + +fun runMain(args: Array, suites: SuiteProvider) { + val actualArgs = if (args.isEmpty()) arrayOf("run") else args + RestateE2ETests().subcommands(Run(suites), Debug(suites)).main(actualArgs) +} + +private fun testClassNameToFQCN(className: String): String { + if (className.contains('.')) return className + return "dev.restate.sdktesting.tests.$className" +} diff --git a/sdk-tests/action.yml b/sdk-tests/action.yml index e8d379ca..50ac7c80 100644 --- a/sdk-tests/action.yml +++ b/sdk-tests/action.yml @@ -13,7 +13,7 @@ inputs: default: 'ghcr.io/restatedev/restate:main' serviceContainerImage: required: true - description: Container image to use for the service + description: Docker image for the service under test exclusionsFile: required: false description: Exclusions file @@ -59,7 +59,7 @@ runs: env: RESTATE_CONTAINER_IMAGE: ${{ inputs.restateContainerImage }} shell: bash - run: java -jar sdk-tests.jar run ${{ inputs.testSuite != '' && format('--test-suite={0}', inputs.testSuite) || '' }} ${{ inputs.testName != '' && format('--test-name={0}', inputs.testName) || '' }} ${{ inputs.exclusionsFile != '' && format('--exclusions-file={0}', inputs.exclusionsFile) || '' }} ${{ inputs.serviceContainerEnvFile != '' && format('--service-container-env-file={0}', inputs.serviceContainerEnvFile) || '' }} ${{ inputs.customTestsFile != '' && format('--custom-tests-file={0}', inputs.customTestsFile) || '' }} --report-dir=test-report ${{ inputs.serviceContainerImage }} + run: java -jar sdk-tests.jar run ${{ inputs.testSuite != '' && format('--test-suite={0}', inputs.testSuite) || '' }} ${{ inputs.testName != '' && format('--test-name={0}', inputs.testName) || '' }} ${{ inputs.exclusionsFile != '' && format('--exclusions-file={0}', inputs.exclusionsFile) || '' }} ${{ inputs.serviceContainerEnvFile != '' && format('--service-container-env-file={0}', inputs.serviceContainerEnvFile) || '' }} ${{ inputs.customTestsFile != '' && format('--custom-tests-file={0}', inputs.customTestsFile) || '' }} --service-container-image=${{ inputs.serviceContainerImage }} --report-dir=test-report # Upload logs and publish test result - uses: actions/upload-artifact@v4 diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 7f875da2..53bea059 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -8,7 +8,9 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.junit -object TestSuites { +object TestSuites : SuiteProvider { + override val defaultSuite: TestSuite get() = DEFAULT_SUITE + val DEFAULT_SUITE = TestSuite("default", emptyMap(), "none() | always-suspending | customTests") val THREE_NODES_SUITE = TestSuite( @@ -59,7 +61,7 @@ object TestSuites { TestSuite( "persistedTimers", mapOf("RESTATE_WORKER__NUM_TIMERS_IN_MEMORY_LIMIT" to "1"), "timers") - fun allSuites(): List { + override fun allSuites(): List { return listOf( DEFAULT_SUITE, THREE_NODES_SUITE, @@ -71,7 +73,7 @@ object TestSuites { PERSISTED_TIMERS_SUITE) } - fun resolveSuites(suite: String?): List { + override fun resolveSuites(suite: String?): List { return when (suite ?: "all") { "all" -> allSuites() else -> { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt index d595ae0d..f81d2943 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/main.kt @@ -8,323 +8,6 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting -import com.charleskorn.kaml.Yaml -import com.charleskorn.kaml.decodeFromStream -import com.charleskorn.kaml.encodeToStream -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.core.subcommands -import com.github.ajalt.clikt.parameters.arguments.* -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.groups.provideDelegate -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.enum -import com.github.ajalt.clikt.parameters.types.int -import com.github.ajalt.clikt.parameters.types.path -import com.github.ajalt.mordant.rendering.TextColors.green -import com.github.ajalt.mordant.rendering.TextColors.red -import com.github.ajalt.mordant.rendering.TextStyles.bold -import com.github.ajalt.mordant.terminal.Terminal -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.junit.ExecutionResult import dev.restate.sdktesting.junit.TestSuites -import io.github.cdimascio.dotenv.Dotenv -import java.io.File -import java.io.FileInputStream -import java.io.FileOutputStream -import java.nio.file.Path -import java.time.LocalDateTime -import java.time.format.DateTimeFormatter -import kotlin.jvm.optionals.getOrNull -import kotlin.system.exitProcess -import kotlin.time.Duration -import kotlinx.serialization.Serializable -import org.junit.platform.engine.Filter -import org.junit.platform.engine.discovery.ClassNameFilter -import org.junit.platform.engine.support.descriptor.MethodSource -import org.junit.platform.launcher.MethodFilter -@Serializable data class ExclusionsFile(val exclusions: Map> = emptyMap()) - -class RestateSdkTestSuite : CliktCommand() { - override fun run() { - // Disable log4j2 JMX, this prevents reconfiguration - System.setProperty("log4j2.disable.jmx", "true") - // This is hours of debugging, don't touch it - // tl;dr this makes sure a single log4j2 configuration exists for the whole JVM, - // important to make Configurator.reconfigure work - System.setProperty( - "log4j2.contextSelector", "org.apache.logging.log4j.core.selector.BasicContextSelector") - // The default keep alive time is way too long, and this is a problem when we stop and restart - // containers. - System.setProperty("jdk.httpclient.keepalive.timeout", "5") - // The health check strategy uses the HttpUrlConnection which has no connect timeout by default. - // Could have caused the health check to hang indefinitely. - System.setProperty("sun.net.client.defaultConnectTimeout", "5000") - // Enable Logging of JDK client - // System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager") - // System.setProperty("jdk.httpclient.HttpClient.log", "all") - } -} - -class TestRunnerOptions : OptionGroup() { - val restateContainerImage by - option(envvar = "RESTATE_CONTAINER_IMAGE").help("Image used to run Restate") - val reportDir by - option(envvar = "TEST_REPORT_DIR").path().help("Base report directory").defaultLazy { - defaultReportDirectory() - } - val imagePullPolicy by - option() - .enum() - .help( - "Pull policy used to pull containers required for testing. In case of ALWAYS, docker won't pull images with repository prefix restate.local or localhost") - .default(PullPolicy.ALWAYS) - val customTestsFile by - option("--custom-tests", "--custom-tests-file") - .help("File containing the custom tests configurations") - - fun applyToDeployerConfig(deployerConfig: RestateDeployerConfig): RestateDeployerConfig { - var newConfig = deployerConfig - if (restateContainerImage != null) { - newConfig = newConfig.copy(restateContainerImage = restateContainerImage!!) - } - if (customTestsFile != null) { - newConfig = newConfig.copy(customTestsFile = customTestsFile!!) - } - newConfig = newConfig.copy(imagePullPolicy = imagePullPolicy) - return newConfig - } - - private fun defaultReportDirectory(): Path { - val formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss") - return Path.of("test_report/${LocalDateTime.now().format(formatter)}").toAbsolutePath() - } -} - -class FilterOptions : OptionGroup() { - val testSuite by - option() - .required() - .help( - "Test suite to run. Available: ${listOf("all") + TestSuites.allSuites().map { it.name }}") - val testName by option().help("Name of the test to run for the given suite") -} - -abstract class TestRunCommand(help: String) : CliktCommand(help) { - val testRunnerOptions by TestRunnerOptions() -} - -class Run : - TestRunCommand( - """ -Run test suite, executing the service as container. -""" - .trimIndent()) { - val filter by FilterOptions().cooccurring() - val exclusionsFile by - option("--exclusions", "--exclusions-file").help("File containing the excluded tests") - val parallel by - option(help = "Enable parallel testing") - .help( - "If set, runs tests in parallel. We suggest running tests sequentially when using podman") - .flag("--sequential", default = true) - val serviceContainerEnvFile by - option("--service-container-env-file").help(".env file to apply to service container") - val imageName by argument() - - override fun run() { - val terminal = Terminal() - - val additionalServiceEnvs = - serviceContainerEnvFile - ?.let { - Dotenv.configure().filename(it).load().entries().associate { it.key to it.value } - } - .orEmpty() - - val restateDeployerConfig = - RestateDeployerConfig( - mapOf( - ServiceSpec.DEFAULT_SERVICE_NAME to - ContainerServiceDeploymentConfig(imageName, additionalServiceEnvs)), - ) - - // Register global config of the deployer - registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) - - // Resolve test configurations - val testSuites = TestSuites.resolveSuites(filter?.testSuite) - - // Load exclusions file - val loadedExclusions: ExclusionsFile = - if (exclusionsFile != null) { - FileInputStream(File(exclusionsFile!!)).use { Yaml.default.decodeFromStream(it) } - } else { - ExclusionsFile() - } - - val reports = mutableListOf() - val newExclusions = mutableMapOf>() - var newFailures = false - for (testSuite in testSuites) { - val exclusions = loadedExclusions.exclusions[testSuite.name] ?: emptyList() - val exclusionsFilters = exclusions.map { MethodFilter.excludeMethodNamePatterns(it) } - val cliOptionFilter = - filter?.testName?.let { - listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(it))) - } ?: emptyList>() - - val report = - testSuite.runTests( - terminal, - testRunnerOptions.reportDir, - exclusionsFilters + cliOptionFilter, - false, - parallel) - - reports.add(report) - // No need to wait the end of the run for this - report.printFailuresToFiles(testRunnerOptions.reportDir) - val failures = report.failedTests - if (failures.isNotEmpty() || exclusions.isNotEmpty()) { - newExclusions[testSuite.name] = - (failures - .mapNotNull { it.source.getOrNull() } - .mapNotNull { - when (it) { - is MethodSource -> "${it.className}.${it.methodName}" - else -> null - } - } - .distinct() + exclusions) - .sorted() - } - if (failures.isNotEmpty()) { - newFailures = true - } - } - - // Write out the exclusions file - FileOutputStream(testRunnerOptions.reportDir.resolve("exclusions.new.yaml").toFile()).use { - Yaml.default.encodeToStream(ExclusionsFile(newExclusions.toSortedMap()), it) - } - - // Print final report - val succeededTests = reports.sumOf { it.succeededTests } - val executedTests = reports.sumOf { it.executedTests } - val testsStyle = if (succeededTests == executedTests) green else red - val testsInfoLine = testsStyle("""* Succeeded tests: $succeededTests / ${executedTests}""") - - val failedClasses = reports.sumOf { it.executedClasses - it.succeededClasses } - val classesStyle = if (failedClasses != 0) red else green - val classesInfoLine = classesStyle("""* Failed classes initialization: $failedClasses""") - - val totalDuration = reports.fold(Duration.ZERO) { d, res -> d + res.executionDuration } - - println( - """ - ${bold("========================= Final results =========================")} - 🗈 Report directory: ${testRunnerOptions.reportDir} - * Run test suites: ${reports.map { it.testSuite }} - $testsInfoLine - $classesInfoLine - * Execution time: $totalDuration - """ - .trimIndent()) - - for (report in reports) { - report.printFailuresToTerminal(terminal) - } - - if (newFailures) { - // Exit - exitProcess(1) - } - } -} - -class Debug : - TestRunCommand( - """ -Run test suite, without executing the service inside a container. -""" - .trimIndent()) { - val testSuite by - option() - .default(TestSuites.DEFAULT_SUITE.name) - .help("Test suite to run. Available: ${TestSuites.allSuites().map { it.name }}") - val testName by option().required().help("Name of the test to run for the given suite") - val localContainers by - argument() - .convert { localContainerSpec -> - if (localContainerSpec.contains('=')) { - localContainerSpec.split('=', limit = 2).let { it[0] to it[1].toInt() } - } else { - ServiceSpec.DEFAULT_SERVICE_NAME to localContainerSpec.toInt() - } - } - .multiple(required = true) - .help( - "Local containers name=ports. Example: '9080' (for default-service container), 'otherContainer=9081'") - val retainAfterEnd by - option() - .flag("--dont-retain-after-end", default = false) - .help( - "Retain the created docker network after the end of the test. You MUST manually clean it up afterwards!") - val mountStateDirectory by - option() - .help( - "Mount the given state directory as restate data when starting the runtime container") - val localIngressPort by option().int().help("Ingress port to bind the restate container") - val localAdminPort by option().int().help("Admin port to bind the restate container") - val localNodePort by option().int().help("Node port to bind the restate container") - - override fun run() { - val terminal = Terminal() - - // Register global config of the deployer - val restateDeployerConfig = - RestateDeployerConfig( - localContainers.associate { - it.first to LocalForwardServiceDeploymentConfig(it.second) - }, - localAdminPort = this.localAdminPort, - localIngressPort = this.localIngressPort, - localNodePort = this.localNodePort, - stateDirectoryMount = this.mountStateDirectory, - retainAfterEnd = this.retainAfterEnd) - registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) - - if (restateDeployerConfig.retainAfterEnd) { - // Disable ryuk, as it will otherwise cleanup the network after the JVM goes away. - // System.getenv().put("TESTCONTAINERS_RYUK_DISABLED", "true") - - } - - // Resolve test configurations - val testSuite = TestSuites.resolveSuites(testSuite)[0] - val testFilters = - listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(testName))) - - val report = testSuite.runTests(terminal, testRunnerOptions.reportDir, testFilters, true, false) - - report.printFailuresToTerminal(terminal) - report.printFailuresToFiles(testRunnerOptions.reportDir) - - if (report.failedTests.isNotEmpty()) { - // Exit - exitProcess(1) - } - } -} - -fun main(args: Array) = RestateSdkTestSuite().subcommands(Run(), Debug()).main(args) - -private fun testClassNameToFQCN(className: String): String { - if (className.contains('.')) { - // Then it's FQCN - return className - } - return "dev.restate.sdktesting.tests.${className}" -} +fun main(args: Array) = runMain(args, TestSuites) From e7480d4d744aa2656538b44c513bacd6bd2f854a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 10:32:34 +0200 Subject: [PATCH 2/4] Exclude custom tests from the test suite if they're not configured --- .../main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt index c62d0a5e..bdf0b9e6 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt @@ -63,6 +63,11 @@ class TestSuite( .selectors(DiscoverySelectors.selectPackage("dev.restate.sdktesting.tests")) .filters(TagFilter.includeTags(junitIncludeTags)) .filters(*filters.toTypedArray()) + .apply { + if (restateDeployerConfig.customTestsFile == null) { + filters(TagFilter.excludeTags("customTests")) + } + } // Redirect STDOUT/STDERR .configurationParameter(LauncherConstants.CAPTURE_STDOUT_PROPERTY_NAME, "true") .configurationParameter(LauncherConstants.CAPTURE_STDERR_PROPERTY_NAME, "true") From b247054fe2e63c7d2c602c3a7f8f15aa64e7f68f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 10:33:00 +0200 Subject: [PATCH 3/4] Spotless apply --- .../dev/restate/sdktesting/junit/TestSuites.kt | 3 ++- .../restate/sdktesting/junit/SuiteProvider.kt | 2 ++ .../main/kotlin/dev/restate/sdktesting/runner.kt | 16 ++++++++++------ sdk-tests/build.gradle.kts | 4 +--- .../dev/restate/sdktesting/junit/TestSuites.kt | 3 ++- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 8ce1913e..1fe60262 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -9,7 +9,8 @@ package dev.restate.sdktesting.junit object TestSuites : SuiteProvider { - override val defaultSuite: TestSuite get() = DEFAULT_SUITE + override val defaultSuite: TestSuite + get() = DEFAULT_SUITE val DEFAULT_SUITE = TestSuite("default", emptyMap(), "none() | always-suspending | only-single-node") diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt index d940f4aa..cd6f3784 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/SuiteProvider.kt @@ -10,6 +10,8 @@ package dev.restate.sdktesting.junit interface SuiteProvider { val defaultSuite: TestSuite + fun allSuites(): List + fun resolveSuites(suite: String?): List } diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt b/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt index 21478d07..24e8308c 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/runner.kt @@ -253,11 +253,13 @@ private class Run(private val suites: SuiteProvider) : } private class Debug(private val suites: SuiteProvider) : - TestRunCommand("Run a single test without a service container, forwarding to a local process.") { + TestRunCommand( + "Run a single test without a service container, forwarding to a local process.") { val testSuite by option() .default(suites.defaultSuite.name) - .help("Test suite to use for environment setup. Available: ${suites.allSuites().map { it.name }}") + .help( + "Test suite to use for environment setup. Available: ${suites.allSuites().map { it.name }}") val testName by option().required().help("Name of the test class to run") val localContainers by argument() @@ -271,8 +273,7 @@ private class Debug(private val suites: SuiteProvider) : option() .flag("--dont-retain-after-end", default = false) .help("Keep the Docker network alive after the test ends (requires manual cleanup)") - val mountStateDirectory by - option().help("Mount a local directory as the Restate data directory") + val mountStateDirectory by option().help("Mount a local directory as the Restate data directory") val localIngressPort by option().int().help("Bind Restate ingress to this host port") val localAdminPort by option().int().help("Bind Restate admin to this host port") val localNodePort by option().int().help("Bind Restate node-to-node port to this host port") @@ -282,7 +283,9 @@ private class Debug(private val suites: SuiteProvider) : val restateDeployerConfig = RestateDeployerConfig( - localContainers.associate { it.first to LocalForwardServiceDeploymentConfig(it.second) }, + localContainers.associate { + it.first to LocalForwardServiceDeploymentConfig(it.second) + }, localAdminPort = this.localAdminPort, localIngressPort = this.localIngressPort, localNodePort = this.localNodePort, @@ -291,7 +294,8 @@ private class Debug(private val suites: SuiteProvider) : registerGlobalConfig(testRunnerOptions.applyToDeployerConfig(restateDeployerConfig)) val suite = suites.resolveSuites(testSuite)[0] - val testFilters = listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(testName))) + val testFilters = + listOf(ClassNameFilter.includeClassNamePatterns(testClassNameToFQCN(testName))) val report = suite.runTests(terminal, testRunnerOptions.reportDir, testFilters, true, false) diff --git a/sdk-tests/build.gradle.kts b/sdk-tests/build.gradle.kts index 30e5a97c..89816a8a 100644 --- a/sdk-tests/build.gradle.kts +++ b/sdk-tests/build.gradle.kts @@ -55,9 +55,7 @@ allOpen { application { mainClass = "dev.restate.sdktesting.MainKt" } -tasks.shadowJar { - archiveClassifier = "" -} +tasks.shadowJar { archiveClassifier = "" } spotless { kotlin { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 53bea059..e90f5688 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -9,7 +9,8 @@ package dev.restate.sdktesting.junit object TestSuites : SuiteProvider { - override val defaultSuite: TestSuite get() = DEFAULT_SUITE + override val defaultSuite: TestSuite + get() = DEFAULT_SUITE val DEFAULT_SUITE = TestSuite("default", emptyMap(), "none() | always-suspending | customTests") val THREE_NODES_SUITE = From ccb9f8edcca037587dc5bee582ee6dae09693264 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 15 May 2026 11:25:35 +0200 Subject: [PATCH 4/4] More unification: all the kafka tests are now together, makes the containers setup lighter Also remove the tag based filtering, was cumbersome and hard to track. now one simply must add the selectors and that's it --- .../restate/sdktesting/junit/TestSuites.kt | 65 ++- .../tests/AwakeableLeaderTransferTest.kt | 2 - .../tests/BackwardCompatibilityTest.kt | 4 - .../tests/ForwardCompatibilityTest.kt | 3 - .../sdktesting/tests/InvokerMemoryTest.kt | 2 - .../dev/restate/sdktesting/tests/Kafka.kt | 83 ---- .../tests/KafkaAndWorkflowAPITest.kt | 151 ------- .../sdktesting/tests/KafkaDynamicSetupTest.kt | 155 ------- .../dev/restate/sdktesting/tests/KafkaTest.kt | 392 ++++++++++++++++++ .../sdktesting/tests/KafkaTracingTest.kt | 124 ------ .../PauseResumeChangingDeploymentTest.kt | 2 - .../restate/sdktesting/tests/UpgradeTest.kt | 5 - .../dev/restate/sdktesting/junit/TestSuite.kt | 18 +- .../restate/sdktesting/junit/TestSuites.kt | 120 +++++- .../restate/sdktesting/tests/Cancellation.kt | 2 - .../restate/sdktesting/tests/Combinators.kt | 2 - .../dev/restate/sdktesting/tests/Custom.kt | 2 - .../restate/sdktesting/tests/KillRuntime.kt | 3 - .../sdktesting/tests/NonDeterminismErrors.kt | 3 - .../dev/restate/sdktesting/tests/RunFlush.kt | 2 - .../dev/restate/sdktesting/tests/RunRetry.kt | 2 - .../tests/ServiceToServiceCommunication.kt | 3 - .../dev/restate/sdktesting/tests/Sleep.kt | 3 - .../sdktesting/tests/SleepWithFailures.kt | 2 - .../dev/restate/sdktesting/tests/State.kt | 3 - .../restate/sdktesting/tests/StopRuntime.kt | 3 - .../restate/sdktesting/tests/UserErrors.kt | 2 - .../restate/sdktesting/tests/WorkflowAPI.kt | 2 - 28 files changed, 577 insertions(+), 583 deletions(-) delete mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt delete mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt delete mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt create mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt delete mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 1fe60262..ce4ebfb5 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -8,25 +8,76 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.junit +import dev.restate.sdktesting.tests.AwakeableIngressEndpointTest +import dev.restate.sdktesting.tests.AwakeableLeaderTransferTest +import dev.restate.sdktesting.tests.BackwardCompatibilityTest +import dev.restate.sdktesting.tests.ForwardCompatibilityTest +import dev.restate.sdktesting.tests.IngressTest +import dev.restate.sdktesting.tests.InvokerMemoryTest +import dev.restate.sdktesting.tests.JournalRetentionTest +import dev.restate.sdktesting.tests.KafkaTest +import dev.restate.sdktesting.tests.OpenAPITest +import dev.restate.sdktesting.tests.PauseResumeChangingDeploymentTest +import dev.restate.sdktesting.tests.PauseResumeTest +import dev.restate.sdktesting.tests.RestartAsNewInvocationTest +import dev.restate.sdktesting.tests.StatePatchingTest +import dev.restate.sdktesting.tests.TracingTest +import dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation +import dev.restate.sdktesting.tests.UpgradeWithNewInvocation + object TestSuites : SuiteProvider { override val defaultSuite: TestSuite get() = DEFAULT_SUITE val DEFAULT_SUITE = - TestSuite("default", emptyMap(), "none() | always-suspending | only-single-node") + TestSuite( + "default", + emptyMap(), + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + val THREE_NODES_SUITE = TestSuite( "threeNodes", mapOf( "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(none() | always-suspending | only-multi-node) & !only-single-node", + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + ), 3) + private val ALWAYS_SUSPENDING_SUITE = TestSuite( "alwaysSuspending", mapOf("RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s"), - "always-suspending | only-always-suspending | only-single-node") + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + )) + private val THREE_NODES_ALWAYS_SUSPENDING_SUITE = TestSuite( "threeNodesAlwaysSuspending", @@ -34,10 +85,14 @@ object TestSuites : SuiteProvider { "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(always-suspending | only-always-suspending | only-multi-node) & !only-single-node", + listOf(clazz()), 3) + private val VERSION_COMPATIBILITY_SUITE = - TestSuite("versionCompat", emptyMap(), "version-compatibility") + TestSuite( + "versionCompat", + emptyMap(), + listOf(clazz(), clazz())) override fun allSuites(): List { return listOf( diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt index 8a02ea99..dec0362b 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableLeaderTransferTest.kt @@ -28,7 +28,6 @@ import org.apache.logging.log4j.LogManager import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension @@ -76,7 +75,6 @@ class AwakeableLeaderTransferTest { @Test @Timeout(180) - @Tag("only-multi-node") fun awakeableCompletionsAreNotLostDuringLeaderTransfer( @InjectClient ingressClient: Client, @InjectContainerHandle(hostName = RESTATE_RUNTIME) runtimeHandle: ContainerHandle, diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt index 1ffc0cdf..3c185cbb 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt @@ -51,7 +51,6 @@ import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.ClassOrderer import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Order -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestClassOrder import org.junit.jupiter.api.TestInstance @@ -64,7 +63,6 @@ import org.junit.jupiter.api.parallel.Isolated * Tests verifying backward compatibility (newer Restate version can read data written by older * version). */ -@Tag("version-compatibility") @Isolated @Execution(ExecutionMode.SAME_THREAD) @TestClassOrder(ClassOrderer.OrderAnnotation::class) @@ -145,7 +143,6 @@ class BackwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(1) @Isolated @@ -237,7 +234,6 @@ class BackwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(2) @Isolated diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt index 348c3a51..b20699b5 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt @@ -52,7 +52,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode import org.junit.jupiter.api.parallel.Isolated /** Tests to verify forward compatibility (older version can read data written by newer version). */ -@Tag("version-compatibility") @Isolated @Execution(ExecutionMode.SAME_THREAD) @TestClassOrder(ClassOrderer.OrderAnnotation::class) @@ -133,7 +132,6 @@ class ForwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(1) @Isolated @@ -223,7 +221,6 @@ class ForwardCompatibilityTest { } } - @Tag("version-compatibility") @Nested @Order(2) @Isolated diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt index c481fc29..a62adc50 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/InvokerMemoryTest.kt @@ -47,7 +47,6 @@ import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension @@ -69,7 +68,6 @@ import org.junit.jupiter.api.extension.RegisterExtension * The yield tests verify invocations eventually complete correctly. The oversized tests verify the * invocation is paused. */ -@Tag("only-single-node") class InvokerMemoryTest { @Service diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt deleted file mode 100644 index 57879dcb..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.admin.api.KafkaClusterApi -import dev.restate.admin.api.SubscriptionApi -import dev.restate.admin.client.ApiClient -import dev.restate.admin.model.CreateKafkaClusterRequest -import dev.restate.admin.model.CreateSubscriptionRequest -import dev.restate.sdktesting.infra.KafkaContainer -import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions -import dev.restate.sdktesting.infra.runtimeconfig.KafkaClusterOptions -import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema -import java.net.URI -import java.util.Properties -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer -import org.apache.kafka.clients.producer.ProducerRecord - -object Kafka { - fun produceMessagesToKafka(port: Int, topic: String, values: List>) { - val props = Properties() - props["bootstrap.servers"] = "PLAINTEXT://localhost:$port" - props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - - val producer: Producer = KafkaProducer(props) - for (value in values) { - producer.send(ProducerRecord(topic, value.first, value.second)) - } - producer.close() - } - - fun registerKafkaCluster( - adminURI: URI, - ) { - val kafkaClustersClient = - KafkaClusterApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) - retryOnServiceUnavailable { - kafkaClustersClient.createKafkaCluster( - CreateKafkaClusterRequest() - .name("my-cluster") - .properties( - mapOf( - "bootstrap.servers" to - "PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))) - } - } - - fun createKafkaSubscription( - adminURI: URI, - topic: String, - serviceName: String, - handlerName: String - ) { - val subscriptionsClient = - SubscriptionApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) - retryOnServiceUnavailable { - subscriptionsClient.createSubscription( - CreateSubscriptionRequest() - .source(URI.create("kafka://my-cluster/$topic")) - .sink(URI.create("service://$serviceName/$handlerName")) - .options(mapOf("auto.offset.reset" to "earliest"))) - } - } - - val configSchema: RestateConfigSchema.() -> Unit = { - this.withIngress( - IngressOptions() - .withKafkaClusters( - listOf( - KafkaClusterOptions() - .withName("my-cluster") - .withBrokers( - listOf("PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))))) - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt deleted file mode 100644 index 988d85f8..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.client.Client -import dev.restate.client.kotlin.attachSuspend -import dev.restate.client.kotlin.getOutputSuspend -import dev.restate.client.kotlin.response -import dev.restate.client.kotlin.toWorkflow -import dev.restate.client.kotlin.workflowHandle -import dev.restate.common.reflections.ReflectionUtils.extractServiceName -import dev.restate.sdk.annotation.Shared -import dev.restate.sdk.annotation.Workflow -import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.durablePromiseKey -import dev.restate.sdk.kotlin.promise -import dev.restate.sdk.kotlin.promiseHandle -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema -import java.net.URI -import kotlinx.serialization.json.Json -import org.assertj.core.api.Assertions.assertThat -import org.awaitility.kotlin.await -import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.RegisterExtension -import org.junit.jupiter.api.parallel.Execution -import org.junit.jupiter.api.parallel.ExecutionMode - -class KafkaAndWorkflowAPITest { - - @Workflow - class MyWorkflow { - - companion object { - val PROMISE = durablePromiseKey("promise") - } - - @Workflow suspend fun run(myTask: String) = "Run $myTask" - - @Shared - suspend fun setPromise(myValue: String) { - promiseHandle(PROMISE).resolve(myValue) - } - - @Shared suspend fun getPromise() = promise(PROMISE).future().await() - } - - companion object { - private const val SHARED_HANDLER_TOPIC = "shared-handler" - private const val WORKFLOW_TOPIC = "workflow" - - @RegisterExtension - val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withContainer("kafka", KafkaContainer(SHARED_HANDLER_TOPIC, WORKFLOW_TOPIC)) - withConfig(RestateConfigSchema().apply(Kafka.configSchema)) - withEndpoint(Endpoint.bind(MyWorkflow())) - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun callWorkflowHandler( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - // Create subscription - Kafka.createKafkaSubscription( - adminURI, WORKFLOW_TOPIC, extractServiceName(MyWorkflow::class.java), "run") - - val keyMessages = linkedMapOf("a" to "1", "b" to "2", "c" to "3") - - // Produce message to kafka - Kafka.produceMessagesToKafka( - kafkaPort, WORKFLOW_TOPIC, keyMessages.map { it.key to Json.encodeToString(it.value) }) - - // Now assert that those invocations are stored there, let's do this twice just for the sake of. - for (keyMessage in keyMessages) { - await withAlias - "Workflow invocations from Kafka" untilAsserted - { - assertThat( - ingressClient - .workflowHandle( - extractServiceName(MyWorkflow::class.java), keyMessage.key) - .attachSuspend() - .response) - .isEqualTo("Run ${keyMessage.value}") - } - } - - for (keyMessage in keyMessages) { - await withAlias - "Workflow invocations from Kafka" untilAsserted - { - assertThat( - ingressClient - .workflowHandle( - extractServiceName(MyWorkflow::class.java), keyMessage.key) - .getOutputSuspend() - .response - .value) - .isEqualTo("Run ${keyMessage.value}") - } - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun callSharedWorkflowHandler( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - // Create subscription - Kafka.createKafkaSubscription( - adminURI, SHARED_HANDLER_TOPIC, extractServiceName(MyWorkflow::class.java), "setPromise") - - val keyMessages = linkedMapOf("a" to "a", "b" to "b", "c" to "c") - - // Produce message to kafka - Kafka.produceMessagesToKafka( - kafkaPort, - SHARED_HANDLER_TOPIC, - keyMessages.map { it.key to Json.encodeToString(it.value) }) - - // Now assert that the promises are fulfilled. - for (keyMessage in keyMessages) { - await withAlias - "Workflow invocations from Kafka" untilAsserted - { - assertThat( - ingressClient - .toWorkflow(keyMessage.key) - .request { getPromise() } - .call() - .response) - .isEqualTo(keyMessage.value) - } - } - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt deleted file mode 100644 index 086fbda3..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.client.Client -import dev.restate.client.kotlin.response -import dev.restate.client.kotlin.toVirtualObject -import dev.restate.sdk.annotation.Handler -import dev.restate.sdk.annotation.Name -import dev.restate.sdk.annotation.Service -import dev.restate.sdk.annotation.VirtualObject -import dev.restate.sdk.common.StateKey -import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.state -import dev.restate.sdk.kotlin.stateKey -import dev.restate.sdktesting.infra.* -import dev.restate.sdktesting.tests.Kafka.createKafkaSubscription -import dev.restate.sdktesting.tests.Kafka.produceMessagesToKafka -import dev.restate.sdktesting.tests.Kafka.registerKafkaCluster -import java.net.URI -import java.util.* -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import org.assertj.core.api.Assertions.assertThat -import org.awaitility.kotlin.await -import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Tag -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.RegisterExtension -import org.junit.jupiter.api.parallel.Execution -import org.junit.jupiter.api.parallel.ExecutionMode -import org.junit.jupiter.api.parallel.Isolated - -@Tag("only-single-node" /* This test depends on metadata propagation happening immediately */) -@Isolated -class KafkaDynamicSetupTest { - - @VirtualObject - @Name("Counter") - class Counter { - companion object { - private val COUNTER_KEY: StateKey = stateKey("counter") - } - - @Handler - suspend fun add(value: Long): Long { - val current = state().get(COUNTER_KEY) ?: 0L - val newValue = current + value - state().set(COUNTER_KEY, newValue) - return newValue - } - - @Handler - suspend fun get(): Long { - return state().get(COUNTER_KEY) ?: 0L - } - } - - @Service - @Name("EventHandler") - class EventHandler { - @Serializable data class ProxyRequest(val key: String, val value: Long) - - @Handler - suspend fun oneWayCall(request: ProxyRequest) { - dev.restate.sdk.kotlin - .toVirtualObject(request.key) - .request { add(request.value) } - .send() - } - } - - companion object { - private const val COUNTER_TOPIC = "counter" - private const val EVENT_HANDLER_TOPIC = "event-handler" - - @RegisterExtension - val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withEndpoint(Endpoint.bind(Counter()).bind(EventHandler())) - withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC)) - } - - @JvmStatic - @BeforeAll - fun beforeAll( - @InjectAdminURI adminURI: URI, - ) { - registerKafkaCluster(adminURI) - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun handleEventInCounterService( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - val counter = UUID.randomUUID().toString() - - // Register subscription - createKafkaSubscription(adminURI, COUNTER_TOPIC, "Counter", "add") - - // Produce message to kafka - produceMessagesToKafka( - kafkaPort, COUNTER_TOPIC, listOf(counter to "1", counter to "2", counter to "3")) - - await withAlias - "Updates from Kafka are visible in the counter" untilAsserted - { - assertThat( - ingressClient.toVirtualObject(counter).request { get() }.call().response) - .isEqualTo(6L) - } - } - - @Test - @Execution(ExecutionMode.CONCURRENT) - fun handleEventInEventHandler( - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - @InjectClient ingressClient: Client - ) = runTest { - val counter = UUID.randomUUID().toString() - - // Register subscription - createKafkaSubscription(adminURI, EVENT_HANDLER_TOPIC, "EventHandler", "oneWayCall") - - // Produce message to kafka - produceMessagesToKafka( - kafkaPort, - EVENT_HANDLER_TOPIC, - listOf( - null to Json.encodeToString(EventHandler.ProxyRequest(counter, 1)), - null to Json.encodeToString(EventHandler.ProxyRequest(counter, 2)), - null to Json.encodeToString(EventHandler.ProxyRequest(counter, 3)))) - - await withAlias - "Updates from Kafka are visible in the counter" untilAsserted - { - assertThat( - ingressClient.toVirtualObject(counter).request { get() }.call().response) - .isEqualTo(6L) - } - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt new file mode 100644 index 00000000..22422675 --- /dev/null +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTest.kt @@ -0,0 +1,392 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.tests + +import dev.restate.admin.api.KafkaClusterApi +import dev.restate.admin.api.SubscriptionApi +import dev.restate.admin.client.ApiClient +import dev.restate.admin.model.CreateKafkaClusterRequest +import dev.restate.admin.model.CreateSubscriptionRequest +import dev.restate.client.Client +import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.getOutputSuspend +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject +import dev.restate.client.kotlin.toWorkflow +import dev.restate.client.kotlin.workflowHandle +import dev.restate.common.reflections.ReflectionUtils.extractServiceName +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Name +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.annotation.Shared +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.annotation.Workflow +import dev.restate.sdk.common.StateKey +import dev.restate.sdk.endpoint.Endpoint +import dev.restate.sdk.kotlin.durablePromiseKey +import dev.restate.sdk.kotlin.get +import dev.restate.sdk.kotlin.promise +import dev.restate.sdk.kotlin.promiseHandle +import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state +import dev.restate.sdk.kotlin.stateKey +import dev.restate.sdktesting.infra.InjectAdminURI +import dev.restate.sdktesting.infra.InjectClient +import dev.restate.sdktesting.infra.InjectContainerPort +import dev.restate.sdktesting.infra.KafkaContainer +import dev.restate.sdktesting.infra.RestateDeployerExtension +import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema +import dev.restate.sdktesting.tests.Tracing.JAEGER_HOSTNAME +import dev.restate.sdktesting.tests.Tracing.JAEGER_QUERY_PORT +import java.net.URI +import java.util.Properties +import java.util.UUID +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.assertj.core.api.Assertions.assertThat +import org.awaitility.kotlin.await +import org.awaitility.kotlin.withAlias +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode +import org.junit.jupiter.api.parallel.Isolated + +@Isolated +class KafkaTest { + + @Workflow + class MyWorkflow { + companion object { + val PROMISE = durablePromiseKey("promise") + } + + @Workflow suspend fun run(myTask: String) = "Run $myTask" + + @Shared + suspend fun setPromise(myValue: String) { + promiseHandle(PROMISE).resolve(myValue) + } + + @Shared suspend fun getPromise() = promise(PROMISE).future().await() + } + + @VirtualObject + @Name("Counter") + class Counter { + companion object { + private val COUNTER_KEY: StateKey = stateKey("counter") + } + + @Handler + suspend fun add(value: Long): Long { + val current = state().get(COUNTER_KEY) ?: 0L + val newValue = current + value + state().set(COUNTER_KEY, newValue) + return newValue + } + + @Handler suspend fun get(): Long = state().get(COUNTER_KEY) ?: 0L + } + + @Service + @Name("EventHandler") + class EventHandler { + @Serializable data class ProxyRequest(val key: String, val value: Long) + + @Handler + suspend fun oneWayCall(request: ProxyRequest) { + dev.restate.sdk.kotlin + .toVirtualObject(request.key) + .request { add(request.value) } + .send() + } + } + + @VirtualObject + @Name("TracingCounter") + class TracingCounter { + @Handler + suspend fun set(value: String) { + check(state().get("state") == null) + state().set("state", value) + } + + @Shared suspend fun get() = state().get("state") + } + + companion object { + private const val WORKFLOW_TOPIC = "workflow" + private const val SHARED_HANDLER_TOPIC = "shared-handler" + private const val COUNTER_TOPIC = "counter" + private const val EVENT_HANDLER_TOPIC = "event-handler" + private const val TRACING_TOPIC = "tracing" + + @RegisterExtension + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withContainer(Tracing.jaegerContainer()) + withContainer( + "kafka", + KafkaContainer( + WORKFLOW_TOPIC, + SHARED_HANDLER_TOPIC, + COUNTER_TOPIC, + EVENT_HANDLER_TOPIC, + TRACING_TOPIC)) + withConfig(RestateConfigSchema().apply(Tracing.configSchema)) + withEndpoint( + Endpoint.bind(MyWorkflow()).bind(Counter()).bind(EventHandler()).bind(TracingCounter())) + } + + fun produceMessagesToKafka(port: Int, topic: String, values: List>) { + val props = Properties() + props["bootstrap.servers"] = "PLAINTEXT://localhost:$port" + props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + + val producer: Producer = KafkaProducer(props) + for (value in values) { + producer.send(ProducerRecord(topic, value.first, value.second)) + } + producer.close() + } + + fun registerKafkaCluster( + adminURI: URI, + ) { + val kafkaClustersClient = + KafkaClusterApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) + retryOnServiceUnavailable { + kafkaClustersClient.createKafkaCluster( + CreateKafkaClusterRequest() + .name("my-cluster") + .properties( + mapOf( + "bootstrap.servers" to + "PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))) + } + } + + fun createKafkaSubscription( + adminURI: URI, + topic: String, + serviceName: String, + handlerName: String + ) { + val subscriptionsClient = + SubscriptionApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) + retryOnServiceUnavailable { + subscriptionsClient.createSubscription( + CreateSubscriptionRequest() + .source(URI.create("kafka://my-cluster/$topic")) + .sink(URI.create("service://$serviceName/$handlerName")) + .options(mapOf("auto.offset.reset" to "earliest"))) + } + } + + @JvmStatic + @BeforeAll + fun beforeAll( + @InjectAdminURI adminURI: URI, + ) { + registerKafkaCluster(adminURI) + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callWorkflowHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + createKafkaSubscription( + adminURI, WORKFLOW_TOPIC, extractServiceName(MyWorkflow::class.java), "run") + + val keyMessages = linkedMapOf("a" to "1", "b" to "2", "c" to "3") + + produceMessagesToKafka( + kafkaPort, WORKFLOW_TOPIC, keyMessages.map { it.key to Json.encodeToString(it.value) }) + + for (keyMessage in keyMessages) { + await withAlias + "Workflow invocations from Kafka" untilAsserted + { + assertThat( + ingressClient + .workflowHandle( + extractServiceName(MyWorkflow::class.java), keyMessage.key) + .attachSuspend() + .response) + .isEqualTo("Run ${keyMessage.value}") + } + } + + for (keyMessage in keyMessages) { + await withAlias + "Workflow invocations from Kafka" untilAsserted + { + assertThat( + ingressClient + .workflowHandle( + extractServiceName(MyWorkflow::class.java), keyMessage.key) + .getOutputSuspend() + .response + .value) + .isEqualTo("Run ${keyMessage.value}") + } + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callSharedWorkflowHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + createKafkaSubscription( + adminURI, SHARED_HANDLER_TOPIC, extractServiceName(MyWorkflow::class.java), "setPromise") + + val keyMessages = linkedMapOf("a" to "a", "b" to "b", "c" to "c") + + produceMessagesToKafka( + kafkaPort, + SHARED_HANDLER_TOPIC, + keyMessages.map { it.key to Json.encodeToString(it.value) }) + + for (keyMessage in keyMessages) { + await withAlias + "Workflow invocations from Kafka" untilAsserted + { + assertThat( + ingressClient + .toWorkflow(keyMessage.key) + .request { getPromise() } + .call() + .response) + .isEqualTo(keyMessage.value) + } + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callObjectHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + val counter = UUID.randomUUID().toString() + + createKafkaSubscription(adminURI, COUNTER_TOPIC, "Counter", "add") + + produceMessagesToKafka( + kafkaPort, COUNTER_TOPIC, listOf(counter to "1", counter to "2", counter to "3")) + + await withAlias + "Updates from Kafka are visible in the counter" untilAsserted + { + assertThat( + ingressClient.toVirtualObject(counter).request { get() }.call().response) + .isEqualTo(6L) + } + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun callServiceHandler( + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + @InjectClient ingressClient: Client + ) = runTest { + val counter = UUID.randomUUID().toString() + + createKafkaSubscription(adminURI, EVENT_HANDLER_TOPIC, "EventHandler", "oneWayCall") + + produceMessagesToKafka( + kafkaPort, + EVENT_HANDLER_TOPIC, + listOf( + null to Json.encodeToString(EventHandler.ProxyRequest(counter, 1)), + null to Json.encodeToString(EventHandler.ProxyRequest(counter, 2)), + null to Json.encodeToString(EventHandler.ProxyRequest(counter, 3)))) + + await withAlias + "Updates from Kafka are visible in the counter" untilAsserted + { + assertThat( + ingressClient.toVirtualObject(counter).request { get() }.call().response) + .isEqualTo(6L) + } + } + + @Test + fun shouldGenerateTraces( + @InjectClient ingressClient: Client, + @InjectAdminURI adminURI: URI, + @InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, + ) = runTest { + createKafkaSubscription( + adminURI, TRACING_TOPIC, extractServiceName(TracingCounter::class.java), "set") + + produceMessagesToKafka(kafkaPort, TRACING_TOPIC, listOf("a" to Json.encodeToString("a"))) + + val client = ingressClient.toVirtualObject("a") + await withAlias + "state is updated" untilAsserted + { + assertThat(client.request { get() }.call().response).isEqualTo("a") + } + + await withAlias + "traces are available" untilAsserted + { + val traces = Tracing.getTraces(jaegerPort, "Restate") + + assertThat(traces.result.resourceSpans).isNotEmpty() + + val counterAddSpans = + traces.result.resourceSpans + .flatMap { it.scopeSpans } + .flatMap { it.spans } + .filter { + it.name.contains( + "ingress_kafka ${extractServiceName(TracingCounter::class.java)}/{key}/set") + } + + assertThat(counterAddSpans).isNotEmpty() + + val span = counterAddSpans.first() + val attributes = span.attributes.associate { it.key to it.value.stringValue } + assertThat(attributes) + .containsEntry( + "restate.invocation.target", + "${extractServiceName(TracingCounter::class.java)}/a/set") + .containsEntry("messaging.system", "kafka") + .containsEntry("messaging.source.name", TRACING_TOPIC) + .containsEntry("messaging.operation.type", "process") + .containsKeys( + "restate.invocation.id", + "messaging.consumer.group.name", + "messaging.kafka.offset", + "messaging.source.partition.id") + } + } +} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt deleted file mode 100644 index 3d23e1b6..00000000 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH -// -// This file is part of the Restate SDK Test suite tool, -// which is released under the MIT license. -// -// You can find a copy of the license in file LICENSE in the root -// directory of this repository or package, or at -// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE -package dev.restate.sdktesting.tests - -import dev.restate.client.Client -import dev.restate.client.kotlin.response -import dev.restate.client.kotlin.toVirtualObject -import dev.restate.common.reflections.ReflectionUtils.extractServiceName -import dev.restate.sdk.annotation.Handler -import dev.restate.sdk.annotation.Name -import dev.restate.sdk.annotation.Shared -import dev.restate.sdk.annotation.VirtualObject -import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.get -import dev.restate.sdk.kotlin.set -import dev.restate.sdk.kotlin.state -import dev.restate.sdktesting.infra.InjectAdminURI -import dev.restate.sdktesting.infra.InjectClient -import dev.restate.sdktesting.infra.InjectContainerPort -import dev.restate.sdktesting.infra.KafkaContainer -import dev.restate.sdktesting.infra.RestateDeployerExtension -import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema -import dev.restate.sdktesting.tests.Tracing.JAEGER_HOSTNAME -import dev.restate.sdktesting.tests.Tracing.JAEGER_QUERY_PORT -import java.net.URI -import kotlinx.serialization.json.Json -import org.assertj.core.api.Assertions.assertThat -import org.awaitility.kotlin.await -import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.RegisterExtension - -class KafkaTracingTest { - - @VirtualObject - @Name("Counter") - class Counter { - @Handler - suspend fun set(value: String) { - check(state().get("state") == null) - state().set("state", value) - } - - @Shared suspend fun get() = state().get("state") - } - - companion object { - private const val TOPIC = "my-topic" - - @RegisterExtension - val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - // Add Jaeger and Kafka container - withContainer(Tracing.jaegerContainer()) - withContainer("kafka", KafkaContainer(TOPIC)) - - // Configure Restate to send traces to Jaeger - withConfig(RestateConfigSchema().apply(Tracing.configSchema).apply(Kafka.configSchema)) - - withEndpoint(Endpoint.bind(Counter())) - } - } - - @Test - fun shouldGenerateTraces( - @InjectClient ingressClient: Client, - @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) - kafkaPort: Int, - ) = runTest { - Kafka.createKafkaSubscription(adminURI, TOPIC, extractServiceName(Counter::class.java), "set") - - // Produce message to kafka - Kafka.produceMessagesToKafka(kafkaPort, TOPIC, listOf("a" to Json.encodeToString("a"))) - - // Await that state is updated - val client = ingressClient.toVirtualObject("a") - await withAlias - "state is updated" untilAsserted - { - assertThat(client.request { get() }.call().response).isEqualTo("a") - } - - // Check the traces - await withAlias - "traces are available" untilAsserted - { - val traces = Tracing.getTraces(jaegerPort, "Restate") - - assertThat(traces.result.resourceSpans).isNotEmpty() - - // Find the GreeterService spans - val counterAddSpans = - traces.result.resourceSpans - .flatMap { it.scopeSpans } - .flatMap { it.spans } - .filter { it.name.contains("ingress_kafka Counter/{key}/set") } - - assertThat(counterAddSpans).isNotEmpty() - - // Verify span attributes - val span = counterAddSpans.first() - - // Verify Restate-specific attributes - val attributes = span.attributes.associate { it.key to it.value.stringValue } - assertThat(attributes) - .containsEntry("restate.invocation.target", "Counter/a/set") - .containsEntry("messaging.system", "kafka") - .containsEntry("messaging.source.name", TOPIC) - .containsEntry("messaging.operation.type", "process") - .containsKeys( - "restate.invocation.id", - "messaging.consumer.group.name", - "messaging.kafka.offset", - "messaging.source.partition.id") - } - } -} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt index 38529e6b..aa134c5f 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt @@ -25,11 +25,9 @@ import java.net.URI import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension -@Tag("only-single-node" /* This test depends on metadata propagation happening immediately */) class PauseResumeChangingDeploymentTest { @Service diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt index 04500990..dbe792b3 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/UpgradeTest.kt @@ -32,7 +32,6 @@ import kotlinx.serialization.Serializable import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension @@ -79,8 +78,6 @@ class VersionedService(private val version: String) { } } -@Tag("always-suspending") -@Tag("only-single-node") class UpgradeWithNewInvocation { companion object { @@ -122,8 +119,6 @@ class UpgradeWithNewInvocation { } } -@Tag("always-suspending") -@Tag("only-single-node") class UpgradeWithInFlightInvocation { companion object { diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt index bdf0b9e6..ab2593e6 100644 --- a/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt +++ b/infra/src/main/kotlin/dev/restate/sdktesting/junit/TestSuite.kt @@ -20,18 +20,27 @@ import org.apache.logging.log4j.Level import org.apache.logging.log4j.core.config.Configurator import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration +import org.junit.platform.engine.DiscoverySelector import org.junit.platform.engine.Filter +import org.junit.platform.engine.discovery.ClassNameFilter import org.junit.platform.engine.discovery.DiscoverySelectors import org.junit.platform.launcher.* import org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder import org.junit.platform.launcher.core.LauncherFactory import org.junit.platform.reporting.legacy.xml.LegacyXmlReportGeneratingListener +inline fun clazz(): DiscoverySelector = DiscoverySelectors.selectClass(T::class.java) + +inline fun method(name: String): DiscoverySelector = + DiscoverySelectors.selectMethod(T::class.java, name) + +private const val CUSTOM_TEST_CLASS = "dev.restate.sdktesting.tests.Custom" + class TestSuite( val name: String, val additionalEnvs: Map, - val junitIncludeTags: String, - val restateNodes: Int = 1 + val selectors: List, + val restateNodes: Int = 1, ) { fun runTests( terminal: Terminal, @@ -60,12 +69,11 @@ class TestSuite( // Prepare launch request var builder = LauncherDiscoveryRequestBuilder.request() - .selectors(DiscoverySelectors.selectPackage("dev.restate.sdktesting.tests")) - .filters(TagFilter.includeTags(junitIncludeTags)) + .selectors(selectors) .filters(*filters.toTypedArray()) .apply { if (restateDeployerConfig.customTestsFile == null) { - filters(TagFilter.excludeTags("customTests")) + filters(ClassNameFilter.excludeClassNamePatterns(CUSTOM_TEST_CLASS)) } } // Redirect STDOUT/STDERR diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index e90f5688..e08bc3fe 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -8,24 +8,95 @@ // https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE package dev.restate.sdktesting.junit +import dev.restate.sdktesting.tests.CallOrdering +import dev.restate.sdktesting.tests.Cancellation +import dev.restate.sdktesting.tests.Combinators +import dev.restate.sdktesting.tests.Custom +import dev.restate.sdktesting.tests.Ingress +import dev.restate.sdktesting.tests.KillInvocation +import dev.restate.sdktesting.tests.KillRuntime +import dev.restate.sdktesting.tests.NonDeterminismErrors +import dev.restate.sdktesting.tests.ProxyRequestSigning +import dev.restate.sdktesting.tests.RunFlush +import dev.restate.sdktesting.tests.RunRetry +import dev.restate.sdktesting.tests.ServiceToServiceCommunication +import dev.restate.sdktesting.tests.Sleep +import dev.restate.sdktesting.tests.SleepWithFailures +import dev.restate.sdktesting.tests.State +import dev.restate.sdktesting.tests.StopRuntime +import dev.restate.sdktesting.tests.UserErrors +import dev.restate.sdktesting.tests.WorkflowAPI + object TestSuites : SuiteProvider { override val defaultSuite: TestSuite get() = DEFAULT_SUITE - val DEFAULT_SUITE = TestSuite("default", emptyMap(), "none() | always-suspending | customTests") + val DEFAULT_SUITE = + TestSuite( + "default", + emptyMap(), + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + val THREE_NODES_SUITE = TestSuite( "threeNodes", mapOf( "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(none() | always-suspending) & !only-single-node & !customTests", + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + ), 3) + private val ALWAYS_SUSPENDING_SUITE = TestSuite( "alwaysSuspending", mapOf("RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s"), - "always-suspending | only-always-suspending") + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + private val THREE_NODES_ALWAYS_SUSPENDING_SUITE = TestSuite( "threeNodesAlwaysSuspending", @@ -33,8 +104,20 @@ object TestSuites : SuiteProvider { "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - "(always-suspending | only-always-suspending) & !only-single-node", + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + ), 3) + private val SINGLE_THREAD_SINGLE_PARTITION_SUITE = TestSuite( "singleThreadSinglePartition", @@ -42,14 +125,32 @@ object TestSuites : SuiteProvider { "RESTATE_DEFAULT_NUM_PARTITIONS" to "1", "RESTATE_DEFAULT_THREAD_POOL_SIZE" to "1", ), - "none() | always-suspending | stop-runtime") + listOf( + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + clazz(), + )) + private val LAZY_STATE_SUITE = TestSuite( "lazyState", mapOf( "RESTATE_WORKER__INVOKER__DISABLE_EAGER_STATE" to "true", ), - "lazy-state") + listOf(clazz())) + private val LAZY_STATE_ALWAYS_SUSPENDING_SUITE = TestSuite( "lazyStateAlwaysSuspending", @@ -57,10 +158,13 @@ object TestSuites : SuiteProvider { "RESTATE_WORKER__INVOKER__DISABLE_EAGER_STATE" to "true", "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", ), - "lazy-state") + listOf(clazz())) + private val PERSISTED_TIMERS_SUITE = TestSuite( - "persistedTimers", mapOf("RESTATE_WORKER__NUM_TIMERS_IN_MEMORY_LIMIT" to "1"), "timers") + "persistedTimers", + mapOf("RESTATE_WORKER__NUM_TIMERS_IN_MEMORY_LIMIT" to "1"), + listOf(clazz(), method("manySleeps"))) override fun allSuites(): List { return listOf( diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt index d9b57c08..0a11fc50 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Cancellation.kt @@ -27,12 +27,10 @@ import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource -@Tag("always-suspending") class Cancellation { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt index 973e66e4..526c34ed 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Combinators.kt @@ -28,13 +28,11 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class Combinators { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt index d71bcf90..e65e60b1 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Custom.kt @@ -22,7 +22,6 @@ import java.util.stream.Stream import kotlin.time.Duration.Companion.minutes import kotlinx.serialization.Serializable import org.apache.logging.log4j.LogManager -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution @@ -30,7 +29,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource -@Tag("customTests") class Custom { @Serializable diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt index 507e6b6a..29c98a97 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/KillRuntime.kt @@ -24,13 +24,10 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.atMost import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension -@Tag("always-suspending") -@Tag("only-single-node") class KillRuntime { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt index 074d4721..029a3e25 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/NonDeterminismErrors.kt @@ -21,7 +21,6 @@ import java.util.UUID import org.assertj.core.api.Assertions.* import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode @@ -29,8 +28,6 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource /** Test non-determinism/journal mismatch checks in the SDKs. */ -@Tag("only-always-suspending") -@Tag("only-single-node") class NonDeterminismErrors { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt index 14e5636d..b0f83adc 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunFlush.kt @@ -17,13 +17,11 @@ import dev.restate.sdktesting.infra.RestateDeployerExtension import dev.restate.sdktesting.infra.ServiceSpec import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("only-always-suspending") class RunFlush { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt index eaff9585..f24ce66c 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/RunRetry.kt @@ -18,13 +18,11 @@ import dev.restate.sdktesting.infra.ServiceSpec import java.util.* import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class RunRetry { companion object { @RegisterExtension diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt index 904957dd..34d4701c 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/ServiceToServiceCommunication.kt @@ -28,14 +28,12 @@ import kotlin.time.Duration.Companion.seconds import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class ServiceToServiceCommunication { companion object { @@ -184,7 +182,6 @@ class ServiceToServiceCommunication { @Test @Execution(ExecutionMode.CONCURRENT) @Timeout(value = 30, unit = TimeUnit.SECONDS) - @Tag("timers") fun oneWayCallWithDelay(@InjectClient ingressClient: Client) = runTest(timeout = 30.seconds) { val counterId = UUID.randomUUID().toString() diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt index 4302ed40..1c9d161b 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/Sleep.kt @@ -24,15 +24,12 @@ import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") -@Tag("timers") class Sleep { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt index d918c1f5..2eab7868 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/SleepWithFailures.kt @@ -22,13 +22,11 @@ import kotlin.time.TimeSource import kotlin.time.toJavaDuration import kotlinx.coroutines.* import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension // -- Sleep tests with terminations/killings of service endpoint -@Tag("always-suspending") class SleepWithFailures { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt index e6aaa8ef..33821410 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/State.kt @@ -25,14 +25,11 @@ import java.util.function.Function import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") -@Tag("lazy-state") class State { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt index 9682e087..5606a7b6 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/StopRuntime.kt @@ -25,13 +25,10 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.atMost import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.extension.RegisterExtension -@Tag("always-suspending") -@Tag("only-single-node") class StopRuntime { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt index fb3f5349..de172673 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/UserErrors.kt @@ -23,13 +23,11 @@ import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class UserErrors { companion object { diff --git a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt index 85e67c63..c7a70719 100644 --- a/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt +++ b/sdk-tests/src/main/kotlin/dev/restate/sdktesting/tests/WorkflowAPI.kt @@ -18,13 +18,11 @@ import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.withAlias import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode -@Tag("always-suspending") class WorkflowAPI { companion object {