From dcadf20a8b7a0d6982fc833f3ed02356d5845b36 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 20 May 2026 17:49:27 +0200 Subject: [PATCH 1/5] Tests for scope and concurrency limit --- .../restate/sdktesting/junit/TestSuites.kt | 9 +- .../sdktesting/tests/ConcurrencyLimitTest.kt | 172 +++ .../sdktesting/tests/ScopeIsolationTest.kt | 88 ++ .../restate/sdktesting/infra/RuleBookUtils.kt | 38 + .../restate/sdktesting/infra/ScopedIngress.kt | 61 + infra/src/main/openapi/admin.json | 1335 +++++++++++------ 6 files changed, 1207 insertions(+), 496 deletions(-) create mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt create mode 100644 e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt create mode 100644 infra/src/main/kotlin/dev/restate/sdktesting/infra/RuleBookUtils.kt create mode 100644 infra/src/main/kotlin/dev/restate/sdktesting/infra/ScopedIngress.kt diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index ce4ebfb5..0cf0b69e 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -11,6 +11,7 @@ package dev.restate.sdktesting.junit import dev.restate.sdktesting.tests.AwakeableIngressEndpointTest import dev.restate.sdktesting.tests.AwakeableLeaderTransferTest import dev.restate.sdktesting.tests.BackwardCompatibilityTest +import dev.restate.sdktesting.tests.ConcurrencyLimitTest import dev.restate.sdktesting.tests.ForwardCompatibilityTest import dev.restate.sdktesting.tests.IngressTest import dev.restate.sdktesting.tests.InvokerMemoryTest @@ -20,6 +21,7 @@ import dev.restate.sdktesting.tests.OpenAPITest import dev.restate.sdktesting.tests.PauseResumeChangingDeploymentTest import dev.restate.sdktesting.tests.PauseResumeTest import dev.restate.sdktesting.tests.RestartAsNewInvocationTest +import dev.restate.sdktesting.tests.ScopeIsolationTest import dev.restate.sdktesting.tests.StatePatchingTest import dev.restate.sdktesting.tests.TracingTest import dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation @@ -35,6 +37,7 @@ object TestSuites : SuiteProvider { emptyMap(), listOf( clazz(), + clazz(), clazz(), clazz(), clazz(), @@ -43,6 +46,7 @@ object TestSuites : SuiteProvider { clazz(), clazz(), clazz(), + clazz(), clazz(), clazz(), clazz(), @@ -56,6 +60,8 @@ object TestSuites : SuiteProvider { "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), listOf( + clazz(), + clazz(), clazz(), clazz(), clazz(), @@ -72,6 +78,7 @@ object TestSuites : SuiteProvider { "alwaysSuspending", mapOf("RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s"), listOf( + clazz(), clazz(), clazz(), clazz(), @@ -85,7 +92,7 @@ object TestSuites : SuiteProvider { "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - listOf(clazz()), + listOf(clazz(), clazz()), 3) private val VERSION_COMPATIBILITY_SUITE = diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt new file mode 100644 index 00000000..705a7b30 --- /dev/null +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt @@ -0,0 +1,172 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.tests + +import dev.restate.client.Client +import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Name +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.annotation.Shared +import dev.restate.sdk.annotation.VirtualObject +import dev.restate.sdk.common.TerminalException +import dev.restate.sdk.endpoint.Endpoint +import dev.restate.sdk.kotlin.awakeable +import dev.restate.sdk.kotlin.awakeableHandle +import dev.restate.sdk.kotlin.call +import dev.restate.sdk.kotlin.get +import dev.restate.sdk.kotlin.resolve +import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state +import dev.restate.sdk.kotlin.toVirtualObject +import dev.restate.sdktesting.infra.* +import dev.restate.serde.TypeTag +import java.net.URI +import java.util.UUID +import kotlin.time.Duration.Companion.seconds +import kotlinx.coroutines.delay +import kotlinx.serialization.json.Json +import org.assertj.core.api.Assertions.assertThat +import org.awaitility.kotlin.await +import org.awaitility.kotlin.withAlias +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension + +/** + * Verifies the runtime enforces the rule-book action-concurrency limit on a scope. Scoped + * invocations are limited to N in flight, and the held excess progresses as the running ones + * complete. + */ +class ConcurrencyLimitTest { + + @Service + @Name("BlockingProxy") + class BlockingProxy { + @Handler + suspend fun block(key: String): String = + toVirtualObject(key).request { run() }.call().await() + } + + @VirtualObject + @Name("Blocker") + class Blocker { + @Handler + suspend fun run(): String { + val awk = awakeable() + state().set("awk", awk.id) + return awk.await() + } + + @Shared suspend fun getAwakeable(): String = state().get("awk") ?: "" + + @Shared + suspend fun resolveAwakeable(value: String) { + val id = state().get("awk") ?: "" + if (id.isEmpty()) throw TerminalException("Awakeable not registered yet") + awakeableHandle(id).resolve(value) + } + } + + companion object { + @RegisterExtension + @JvmField + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withEndpoint(Endpoint.bind(BlockingProxy()).bind(Blocker())) + // Scoped invocations and rule-book require the vqueues experimental feature. + // TODO: drop this once the minimum supported Restate version is v1.8, where vqueues is on by + // default. + withEnv("RESTATE_EXPERIMENTAL_ENABLE_VQUEUES", "true") + // Reduce rule-book activation latency so the test isn't gated on the default 30s poll. + withEnv("RESTATE_WORKER__RULE_BOOK_POLL_INTERVAL", "1s") + } + } + + @Test + @DisplayName( + "Action concurrency limit on a scope holds excess invocations and releases on completion") + fun actionConcurrencyLimitIsRespected( + @InjectIngressURI ingressURI: URI, + @InjectAdminURI adminURI: URI, + @InjectClient ingressClient: Client, + ) = + runTest(timeout = 120.seconds) { + val runId = UUID.randomUUID().toString().take(8) + val scope = "myscope-$runId" + val limit = 2 + val invocationCount = 4 + val blockerKeys = (0 until invocationCount).map { "block-key-$runId-$it" } + + upsertActionConcurrencyRule(adminURI, pattern = scope, actionConcurrency = limit) + + val outerIds = + blockerKeys.map { key -> + sendInvocationWithScope( + ingressURI, scope, "BlockingProxy", "block", Json.encodeToString(key)) + } + + val blockerTargetFilter = "target LIKE 'Blocker/%/run'" + + // Wait until exactly `limit` Blocker invocations exist; the rest are held by the rule. + await withAlias + "exactly $limit Blocker invocations are in flight" untilAsserted + { + assertThat(getAllInvocations(adminURI, blockerTargetFilter)).hasSize(limit) + } + // Briefly confirm the count stays at `limit`, defending against a slow rule activation + // where all 4 would have started before our first check. + delay(1.seconds) + assertThat(getAllInvocations(adminURI, blockerTargetFilter)).hasSize(limit) + + // Resolve one awakeable at a time. After each resolve, a held outer becomes running and + // spawns its Blocker. + val unresolvedKeys = blockerKeys.toMutableSet() + repeat(invocationCount) { + var activeKey: String? = null + await withAlias + "find a Blocker (among unresolved keys) that has registered its awakeable" untilAsserted + { + val found = + unresolvedKeys.firstNotNullOfOrNull { key -> + val awkId = + ingressClient + .toVirtualObject(key) + .request { getAwakeable() } + .options(idempotentCallOptions) + .call() + .response + if (awkId.isNotEmpty()) key else null + } + assertThat(found).isNotNull + activeKey = found + } + + ingressClient + .toVirtualObject(activeKey!!) + .request { resolveAwakeable("done") } + .options(idempotentCallOptions) + .call() + unresolvedKeys.remove(activeKey!!) + } + + // All four outer BlockingProxy invocations must complete successfully. + outerIds.forEach { outerId -> + val response = + ingressClient + .invocationHandle(outerId, TypeTag.of(String::class.java)) + .attachSuspend() + .response + assertThat(response).isEqualTo("done") + } + + bulkDeleteRules(adminURI, listOf(scope)) + } +} diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt new file mode 100644 index 00000000..00940013 --- /dev/null +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt @@ -0,0 +1,88 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.tests + +import dev.restate.client.Client +import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.response +import dev.restate.sdk.annotation.Handler +import dev.restate.sdk.annotation.Name +import dev.restate.sdk.annotation.Service +import dev.restate.sdk.endpoint.Endpoint +import dev.restate.sdk.kotlin.runBlock +import dev.restate.sdktesting.infra.* +import dev.restate.serde.TypeTag +import java.net.URI +import java.util.UUID +import kotlinx.serialization.json.Json +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension + +/** + * Verifies that scope is part of an invocation's identity: the same idempotency key sent in two + * different scopes produces two distinct invocations rather than colliding on dedup. This is a + * runtime guarantee independent of any rule-book rule. + */ +class ScopeIsolationTest { + + @Service + @Name("ScopedEcho") + class ScopedEcho { + /** + * Returns a fresh random value via `runBlock` per invocation. Used to prove the two scoped + * calls actually executed independently rather than the second one being deduplicated to the + * first one's stored result. + */ + @Handler suspend fun echo(input: String): String = runBlock { UUID.randomUUID().toString() } + } + + companion object { + @RegisterExtension + @JvmField + val deployerExt: RestateDeployerExtension = RestateDeployerExtension { + withEndpoint(Endpoint.bind(ScopedEcho())) + // Scoped invocations require the vqueues experimental feature. + // TODO: drop this once the minimum supported Restate version is v1.8, where vqueues is on by + // default. + withEnv("RESTATE_EXPERIMENTAL_ENABLE_VQUEUES", "true") + } + } + + @Test + @DisplayName("Same idempotency key in two different scopes produces two distinct invocations") + fun sameIdempotencyKeyAcrossScopesIsolates( + @InjectIngressURI ingressURI: URI, + @InjectClient ingressClient: Client, + ) = runTest { + val sharedIdempotencyKey = "shared-idempotency" + val scopeA = UUID.randomUUID().toString() + val scopeB = UUID.randomUUID().toString() + val body = Json.encodeToString("foo") + + val idA = + sendInvocationWithScope( + ingressURI, scopeA, "ScopedEcho", "echo", body, idempotencyKey = sharedIdempotencyKey) + val idB = + sendInvocationWithScope( + ingressURI, scopeB, "ScopedEcho", "echo", body, idempotencyKey = sharedIdempotencyKey) + + assertThat(idA).isNotEqualTo(idB) + + // Each invocation generates its own random value via `runBlock`. If the runtime had + // collapsed the two calls under shared idempotency, both responses would be identical; + // distinct values prove the scope isolated them. + val respA = + ingressClient.invocationHandle(idA, TypeTag.of(String::class.java)).attachSuspend().response + val respB = + ingressClient.invocationHandle(idB, TypeTag.of(String::class.java)).attachSuspend().response + assertThat(respA).isNotEqualTo(respB) + } +} diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/infra/RuleBookUtils.kt b/infra/src/main/kotlin/dev/restate/sdktesting/infra/RuleBookUtils.kt new file mode 100644 index 00000000..0054ef2b --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/infra/RuleBookUtils.kt @@ -0,0 +1,38 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.infra + +import dev.restate.admin.api.RuleApi +import dev.restate.admin.client.ApiClient +import dev.restate.admin.model.DeleteRuleRequest +import dev.restate.admin.model.RuleResponse +import dev.restate.admin.model.UpsertRuleRequest +import dev.restate.admin.model.UserLimits +import java.net.URI + +private fun ruleApi(adminURI: URI): RuleApi = + RuleApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) + +/** Upsert a single rule that caps action concurrency for a given pattern. */ +fun upsertActionConcurrencyRule( + adminURI: URI, + pattern: String, + actionConcurrency: Int +): RuleResponse { + val req = + UpsertRuleRequest().pattern(pattern).limits(UserLimits().actionConcurrency(actionConcurrency)) + return ruleApi(adminURI).upsertRules(listOf(req)).single() +} + +/** Bulk-delete rules by pattern. Returns the patterns that were actually removed. */ +fun bulkDeleteRules(adminURI: URI, patterns: List): List { + if (patterns.isEmpty()) return emptyList() + val reqs = patterns.map { DeleteRuleRequest().pattern(it) } + return ruleApi(adminURI).bulkDeleteRules(reqs) +} diff --git a/infra/src/main/kotlin/dev/restate/sdktesting/infra/ScopedIngress.kt b/infra/src/main/kotlin/dev/restate/sdktesting/infra/ScopedIngress.kt new file mode 100644 index 00000000..523910f1 --- /dev/null +++ b/infra/src/main/kotlin/dev/restate/sdktesting/infra/ScopedIngress.kt @@ -0,0 +1,61 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate SDK Test suite tool, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE +package dev.restate.sdktesting.infra + +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.util.UUID +import kotlinx.coroutines.future.await +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import org.assertj.core.api.Assertions.assertThat + +@Serializable +data class IngressSendResponse( + @SerialName("invocationId") val invocationId: String, + val status: String? = null, +) + +private val ingressJson = Json { ignoreUnknownKeys = true } + +/** + * Send a scoped invocation to a `@Service` via the new `/restate/scope/{scope}/send/...` ingress + * path. Returns the assigned invocation id. + * + * TODO: replace with the ingress client API once it supports scope. The scoped feature is currently + * exposed for `@Service` only, so this helper does not accept a virtual object key. + */ +suspend fun sendInvocationWithScope( + ingressURI: URI, + scope: String, + service: String, + handler: String, + body: String, + idempotencyKey: String = UUID.randomUUID().toString(), +): String { + val request = + HttpRequest.newBuilder() + .uri(ingressURI.resolve("restate/scope/$scope/send/$service/$handler")) + .header("Content-Type", "application/json") + .header("idempotency-key", idempotencyKey) + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build() + + val response = + HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await() + assertThat(response.statusCode()) + .withFailMessage { + "Unexpected ingress response: ${response.statusCode()} ${response.body()}" + } + .isBetween(200, 299) + return ingressJson.decodeFromString(response.body()).invocationId +} diff --git a/infra/src/main/openapi/admin.json b/infra/src/main/openapi/admin.json index fc072a39..60fd4b63 100644 --- a/infra/src/main/openapi/admin.json +++ b/infra/src/main/openapi/admin.json @@ -10,7 +10,7 @@ "name": "MIT", "url": "https://opensource.org/license/mit" }, - "version": "1.6.3-dev" + "version": "1.7.0-dev" }, "paths": { "/cluster-health": { @@ -327,6 +327,69 @@ } } }, + "/invocations/{invocation_id}": { + "delete": { + "tags": [ + "invocation" + ], + "summary": "Delete an invocation", + "description": "Use kill_invocation/cancel_invocation/purge_invocation instead.", + "operationId": "delete_invocation", + "parameters": [ + { + "name": "invocation_id", + "in": "path", + "description": "Invocation identifier.", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "mode", + "in": "query", + "description": "If cancel, it will gracefully terminate the invocation.\nIf kill, it will terminate the invocation with a hard stop.\nIf purge, it will only cleanup the response for completed invocations,\nand leave unaffected an in-flight invocation.", + "required": false, + "schema": { + "oneOf": [ + { + "type": "null" + }, + { + "type": "string", + "enum": [ + "Cancel", + "Kill", + "Purge" + ] + } + ] + } + } + ], + "responses": { + "202": { + "description": "Accepted" + }, + "400": { + "$ref": "#/components/responses/BadRequest" + }, + "404": { + "$ref": "#/components/responses/NotFound" + }, + "405": { + "$ref": "#/components/responses/MethodNotAllowed" + }, + "409": { + "$ref": "#/components/responses/Conflict" + }, + "500": { + "$ref": "#/components/responses/InternalServerError" + } + }, + "deprecated": true + } + }, "/invocations/{invocation_id}/cancel": { "patch": { "tags": [ @@ -1642,6 +1705,100 @@ } } }, + "/limits/rules": { + "put": { + "tags": [ + "rule" + ], + "summary": "Upsert a batch of rules.", + "description": "Each entry carries an optional [`Precondition`]. Setting it to\n`DoesNotExist` makes the entry a strict insert; `Matches(v)` rejects\nthe batch unless the rule's current version is `v`; omitting it\n(`None`) is unconditional. The whole batch is atomic: any failed\nprecondition or cap-exceeded condition rolls back the rest.", + "operationId": "upsert_rules", + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/UpsertRuleRequest" + } + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Rules upserted", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/RuleResponse" + } + } + } + } + }, + "409": { + "$ref": "#/components/responses/Conflict" + }, + "422": { + "$ref": "#/components/responses/UnprocessableEntity" + }, + "500": { + "$ref": "#/components/responses/InternalServerError" + } + } + } + }, + "/limits/rules/bulk-delete": { + "post": { + "tags": [ + "rule" + ], + "summary": "Delete a batch of rules by pattern.", + "description": "Each entry may carry an `expected_version`; if present the rule\nmust exist at that exact version, otherwise the batch fails.\nWithout `expected_version` the delete is unconditional and\nidempotent: a pattern that's already absent is silently skipped.\nReturns the patterns that this batch actually removed.", + "operationId": "bulk_delete_rules", + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/DeleteRuleRequest" + } + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Patterns that were actually removed", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "string" + } + } + } + } + }, + "409": { + "$ref": "#/components/responses/Conflict" + }, + "422": { + "$ref": "#/components/responses/UnprocessableEntity" + }, + "500": { + "$ref": "#/components/responses/InternalServerError" + } + } + } + }, "/query": { "post": { "tags": [ @@ -2340,506 +2497,490 @@ } } }, - "DeploymentId": { - "type": "string" - }, - "DeploymentResponse": { - "anyOf": [ - { - "$ref": "#/components/schemas/HttpDeploymentResponse" - }, - { - "$ref": "#/components/schemas/LambdaDeploymentResponse" - } - ] - }, - "DetailedDeploymentResponse": { - "anyOf": [ - { - "$ref": "#/components/schemas/HttpDetailedDeploymentResponse" - }, - { - "$ref": "#/components/schemas/LambdaDetailedDeploymentResponse" - } - ] - }, - "EmbeddedMetadataClusterHealth": { - "type": "object", - "required": [ - "members" - ], - "properties": { - "members": { - "type": "array", - "items": { - "$ref": "#/components/schemas/PlainNodeId" - }, - "description": "Current members of the embedded metadata cluster" - } - } - }, - "EndpointLambdaCompression": { - "type": "string", - "description": "Lambda compression", - "enum": [ - "Zstd" - ] - }, - "ErrorDescriptionResponse": { + "DeleteRuleRequest": { "type": "object", - "description": "# Error description response\n\nError details of the response", + "description": "One entry in the body of `POST /limits/rules/bulk-delete`.", "required": [ - "message" + "pattern" ], "properties": { - "message": { - "type": "string" - }, - "restate_code": { + "expected_version": { "type": [ - "string", + "integer", "null" ], - "description": "# Restate code\n\nRestate error code describing this error" + "format": "int32", + "description": "Optimistic-concurrency match. Absent → unconditional delete\n(idempotent: deleting an already-absent rule succeeds as a\nno-op). Present → reject unless the rule's current version is\nthe supplied value.", + "minimum": 0 + }, + "pattern": { + "type": "string" } } }, - "HandlerMetadata": { - "type": "object", - "description": "Handler metadata", - "required": [ - "name", - "input_description", - "output_description" - ], - "properties": { - "abort_timeout": { - "type": [ - "string", - "null" - ], - "description": "# Abort timeout\n\nThis timer guards against stalled service/handler invocations that are supposed to\nterminate. The abort timeout is started after the 'inactivity timeout' has expired\nand the service/handler invocation has been asked to gracefully terminate. Once the\ntimer expires, it will abort the service/handler invocation.\n\nThis timer potentially **interrupts** user code. If the user code needs longer to\ngracefully terminate, then this value needs to be set accordingly.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`.\n\nIf set, it overrides the value set in the service." - }, - "documentation": { - "type": [ - "string", - "null" - ], - "description": "# Documentation\n\nDocumentation of the handler, as propagated by the SDKs." - }, - "enable_lazy_state": { - "type": [ - "boolean", - "null" + "DeploymentId": { + "type": "string" + }, + "DeploymentResponse": { + "oneOf": [ + { + "type": "object", + "title": "HttpDeploymentResponse", + "description": "Deployment response for HTTP deployments", + "required": [ + "id", + "uri", + "protocol_type", + "http_version", + "created_at", + "min_protocol_version", + "max_protocol_version", + "services" ], - "description": "# Enable lazy state\n\nIf true, lazy state will be enabled for all invocations to this service.\nThis is relevant only for Workflows and Virtual Objects.\n\nIf set, it overrides the value set in the service." + "properties": { + "additional_headers": { + "$ref": "#/components/schemas/SerdeableHeaderHashMap", + "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." + }, + "created_at": { + "type": "string" + }, + "http_version": { + "type": "string", + "description": "# HTTP Version\n\nHTTP Version used to invoke this service deployment." + }, + "id": { + "$ref": "#/components/schemas/DeploymentId", + "description": "# Deployment ID" + }, + "info": { + "type": "array", + "items": { + "$ref": "#/components/schemas/SchemaInfo" + }, + "description": "# Info\n\nList of configuration/deprecation information related to this deployment." + }, + "max_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "metadata": { + "type": "object", + "description": "# Metadata\n\nDeployment metadata.", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } + }, + "min_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "protocol_type": { + "$ref": "#/components/schemas/ProtocolType", + "description": "# Protocol Type\n\nProtocol type used to invoke this service deployment." + }, + "sdk_version": { + "type": [ + "string", + "null" + ], + "description": "# SDK version\n\nSDK library and version declared during registration." + }, + "services": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceNameRevPair" + }, + "description": "# Services\n\nList of services exposed by this deployment." + }, + "uri": { + "type": "string", + "format": "uri", + "description": "# Deployment URI\n\nURI used to invoke this service deployment." + } + } }, - "idempotency_retention": { - "type": [ - "string", - "null" + { + "type": "object", + "title": "LambdaDeploymentResponse", + "description": "Deployment response for Lambda deployments", + "required": [ + "id", + "arn", + "created_at", + "min_protocol_version", + "max_protocol_version", + "services" ], - "description": "# Idempotency retention\n\nThe retention duration of idempotent requests for this handler. If set, it overrides the value set in the service.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`." - }, - "inactivity_timeout": { - "type": [ - "string", - "null" - ], - "description": "# Inactivity timeout\n\nThis timer guards against stalled service/handler invocations. Once it expires,\nRestate triggers a graceful termination by asking the service invocation to\nsuspend (which preserves intermediate progress).\n\nThe 'abort timeout' is used to abort the invocation, in case it doesn't react to\nthe request to suspend.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`.\n\nIf set, it overrides the value set in the service." - }, - "info": { - "type": "array", - "items": { - "$ref": "#/components/schemas/SchemaInfo" - }, - "description": "# Info\n\nList of configuration/deprecation information related to this handler." - }, - "input_description": { - "type": "string", - "description": "# Human readable input description\n\nIf empty, no schema was provided by the user at discovery time." - }, - "input_json_schema": { - "description": "# Input JSON Schema\n\nJSON Schema of the handler input" - }, - "journal_retention": { - "type": [ - "string", - "null" - ], - "description": "# Journal retention\n\nThe journal retention. When set, this applies to all requests to this handler.\n\nIn case the invocation has an idempotency key, the `idempotency_retention` caps the maximum `journal_retention` time.\nIn case this handler is a workflow handler, the `workflow_completion_retention` caps the maximum `journal_retention` time.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`.\n\nIf set, it overrides the value set in the service." - }, - "metadata": { - "type": "object", - "description": "# Metadata\n\nAdditional handler metadata, as propagated by the SDKs.", - "additionalProperties": { - "type": "string" - }, - "propertyNames": { - "type": "string" - } - }, - "name": { - "type": "string", - "description": "# Name\n\nThe handler name." - }, - "output_description": { - "type": "string", - "description": "# Human readable output description\n\nIf empty, no schema was provided by the user at discovery time." - }, - "output_json_schema": { - "description": "# Output JSON Schema\n\nJSON Schema of the handler output" - }, - "public": { - "type": "boolean", - "description": "# Public\n\nIf true, this handler can be invoked through the ingress.\nIf false, this handler can be invoked only from another Restate service." - }, - "retry_policy": { - "$ref": "#/components/schemas/HandlerRetryPolicyMetadata", - "description": "# Retry policy\n\nRetry policy overrides applied for this handler." - }, - "ty": { - "oneOf": [ - { - "type": "null" + "properties": { + "additional_headers": { + "$ref": "#/components/schemas/SerdeableHeaderHashMap", + "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." }, - { - "$ref": "#/components/schemas/HandlerMetadataType", - "description": "# Type\n\nThe handler type." + "arn": { + "$ref": "#/components/schemas/LambdaARN", + "description": "# Lambda ARN\n\nLambda ARN used to invoke this service deployment." + }, + "assume_role_arn": { + "type": [ + "string", + "null" + ], + "description": "# Assume role ARN\n\nAssume role ARN used to invoke this deployment. Check https://docs.restate.dev/category/aws-lambda for more details." + }, + "compression": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/EndpointLambdaCompression", + "description": "# Compression\n\nCompression algorithm used for invoking Lambda." + } + ] + }, + "created_at": { + "type": "string" + }, + "id": { + "$ref": "#/components/schemas/DeploymentId", + "description": "# Deployment ID" + }, + "info": { + "type": "array", + "items": { + "$ref": "#/components/schemas/SchemaInfo" + }, + "description": "# Info\n\nList of configuration/deprecation information related to this deployment." + }, + "max_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "metadata": { + "type": "object", + "description": "# Metadata\n\nDeployment metadata.", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } + }, + "min_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "sdk_version": { + "type": [ + "string", + "null" + ], + "description": "# SDK version\n\nSDK library and version declared during registration." + }, + "services": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceNameRevPair" + }, + "description": "# Services\n\nList of services exposed by this deployment." } - ] + } } - } - }, - "HandlerMetadataType": { - "type": "string", - "enum": [ - "Exclusive", - "Shared", - "Workflow" ] }, - "HandlerRetryPolicyMetadata": { - "type": "object", - "description": "# Handler retry policy overrides", - "properties": { - "exponentiation_factor": { - "type": [ - "number", - "null" - ], - "format": "float", - "description": "# Factor\n\nThe factor to use to compute the next retry attempt." - }, - "initial_interval": { - "type": [ - "string", - "null" - ], - "description": "# Initial Interval\n\nInitial interval for the first retry attempt.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`." - }, - "max_attempts": { - "type": [ - "integer", - "null" + "DetailedDeploymentResponse": { + "oneOf": [ + { + "type": "object", + "title": "HttpDetailedDeploymentResponse", + "description": "Detailed deployment response for HTTP deployments", + "required": [ + "id", + "uri", + "protocol_type", + "http_version", + "created_at", + "min_protocol_version", + "max_protocol_version", + "services" ], - "description": "# Max attempts\n\nNumber of maximum attempts (including the initial) before giving up. Infinite retries if unset. No retries if set to 1.", - "minimum": 1 + "properties": { + "additional_headers": { + "$ref": "#/components/schemas/SerdeableHeaderHashMap", + "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." + }, + "created_at": { + "type": "string" + }, + "http_version": { + "type": "string", + "description": "# HTTP Version\n\nHTTP Version used to invoke this service deployment." + }, + "id": { + "$ref": "#/components/schemas/DeploymentId", + "description": "# Deployment ID" + }, + "info": { + "type": "array", + "items": { + "$ref": "#/components/schemas/SchemaInfo" + }, + "description": "# Info\n\nList of configuration/deprecation information related to this deployment." + }, + "max_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "metadata": { + "type": "object", + "description": "# Metadata\n\nDeployment metadata.", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } + }, + "min_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "protocol_type": { + "$ref": "#/components/schemas/ProtocolType", + "description": "# Protocol Type\n\nProtocol type used to invoke this service deployment." + }, + "sdk_version": { + "type": [ + "string", + "null" + ], + "description": "# SDK version\n\nSDK library and version declared during registration." + }, + "services": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceMetadata" + }, + "description": "# Services\n\nList of services exposed by this deployment." + }, + "uri": { + "type": "string", + "format": "uri", + "description": "# Deployment URI\n\nURI used to invoke this service deployment." + } + } }, - "max_interval": { - "type": [ - "string", - "null" + { + "type": "object", + "title": "LambdaDetailedDeploymentResponse", + "description": "Detailed deployment response for Lambda deployments", + "required": [ + "id", + "arn", + "created_at", + "min_protocol_version", + "max_protocol_version", + "services" ], - "description": "# Max interval\n\nMaximum interval between retries.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`." - }, - "on_max_attempts": { - "oneOf": [ - { - "type": "null" + "properties": { + "additional_headers": { + "$ref": "#/components/schemas/SerdeableHeaderHashMap", + "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." }, - { - "$ref": "#/components/schemas/OnMaxAttempts", - "description": "# On max attempts\n\nBehavior when max attempts are reached." + "arn": { + "$ref": "#/components/schemas/LambdaARN", + "description": "# Lambda ARN\n\nLambda ARN used to invoke this service deployment." + }, + "assume_role_arn": { + "type": [ + "string", + "null" + ], + "description": "# Assume role ARN\n\nAssume role ARN used to invoke this deployment. Check https://docs.restate.dev/category/aws-lambda for more details." + }, + "compression": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/EndpointLambdaCompression", + "description": "# Compression\n\nCompression algorithm used for invoking Lambda." + } + ] + }, + "created_at": { + "type": "string" + }, + "id": { + "$ref": "#/components/schemas/DeploymentId", + "description": "# Deployment ID" + }, + "info": { + "type": "array", + "items": { + "$ref": "#/components/schemas/SchemaInfo" + }, + "description": "# Info\n\nList of configuration/deprecation information related to this deployment." + }, + "max_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "metadata": { + "type": "object", + "description": "# Metadata\n\nDeployment metadata.", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } + }, + "min_protocol_version": { + "type": "integer", + "format": "int32", + "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + }, + "sdk_version": { + "type": [ + "string", + "null" + ], + "description": "# SDK version\n\nSDK library and version declared during registration." + }, + "services": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ServiceMetadata" + }, + "description": "# Services\n\nList of services exposed by this deployment." } - ] + } } - } + ], + "description": "Detailed information about Restate deployments" }, - "HttpDeploymentResponse": { + "EmbeddedMetadataClusterHealth": { "type": "object", - "description": "Deployment response for HTTP deployments", "required": [ - "id", - "uri", - "protocol_type", - "http_version", - "created_at", - "min_protocol_version", - "max_protocol_version", - "services" + "members" ], "properties": { - "additional_headers": { - "$ref": "#/components/schemas/SerdeableHeaderHashMap", - "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." - }, - "created_at": { - "type": "string" - }, - "http_version": { - "type": "string", - "description": "# HTTP Version\n\nHTTP Version used to invoke this service deployment." - }, - "id": { - "$ref": "#/components/schemas/DeploymentId", - "description": "# Deployment ID" - }, - "info": { - "type": "array", - "items": { - "$ref": "#/components/schemas/SchemaInfo" - }, - "description": "# Info\n\nList of configuration/deprecation information related to this deployment." - }, - "max_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." - }, - "metadata": { - "type": "object", - "description": "# Metadata\n\nDeployment metadata.", - "additionalProperties": { - "type": "string" - }, - "propertyNames": { - "type": "string" - } - }, - "min_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." - }, - "protocol_type": { - "$ref": "#/components/schemas/ProtocolType", - "description": "# Protocol Type\n\nProtocol type used to invoke this service deployment." - }, - "sdk_version": { - "type": [ - "string", - "null" - ], - "description": "# SDK version\n\nSDK library and version declared during registration." - }, - "services": { + "members": { "type": "array", "items": { - "$ref": "#/components/schemas/ServiceNameRevPair" + "$ref": "#/components/schemas/PlainNodeId" }, - "description": "# Services\n\nList of services exposed by this deployment." - }, - "uri": { - "type": "string", - "format": "uri", - "description": "# Deployment URI\n\nURI used to invoke this service deployment." + "description": "Current members of the embedded metadata cluster" } } }, - "HttpDetailedDeploymentResponse": { + "EndpointLambdaCompression": { + "type": "string", + "description": "Lambda compression", + "enum": [ + "Zstd" + ] + }, + "ErrorDescriptionResponse": { "type": "object", - "description": "Detailed deployment response for HTTP deployments", + "description": "# Error description response\n\nError details of the response", "required": [ - "id", - "uri", - "protocol_type", - "http_version", - "created_at", - "min_protocol_version", - "max_protocol_version", - "services" + "message" ], "properties": { - "additional_headers": { - "$ref": "#/components/schemas/SerdeableHeaderHashMap", - "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." - }, - "created_at": { + "message": { "type": "string" }, - "http_version": { - "type": "string", - "description": "# HTTP Version\n\nHTTP Version used to invoke this service deployment." - }, - "id": { - "$ref": "#/components/schemas/DeploymentId", - "description": "# Deployment ID" - }, - "info": { - "type": "array", - "items": { - "$ref": "#/components/schemas/SchemaInfo" - }, - "description": "# Info\n\nList of configuration/deprecation information related to this deployment." - }, - "max_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." - }, - "metadata": { - "type": "object", - "description": "# Metadata\n\nDeployment metadata.", - "additionalProperties": { - "type": "string" - }, - "propertyNames": { - "type": "string" - } - }, - "min_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." - }, - "protocol_type": { - "$ref": "#/components/schemas/ProtocolType", - "description": "# Protocol Type\n\nProtocol type used to invoke this service deployment." - }, - "sdk_version": { + "restate_code": { "type": [ "string", "null" ], - "description": "# SDK version\n\nSDK library and version declared during registration." - }, - "services": { - "type": "array", - "items": { - "$ref": "#/components/schemas/ServiceMetadata" - }, - "description": "# Services\n\nList of services exposed by this deployment." - }, - "uri": { - "type": "string", - "format": "uri", - "description": "# Deployment URI\n\nURI used to invoke this service deployment." + "description": "# Restate code\n\nRestate error code describing this error" } } }, - "KafkaClusterName": { - "type": "string", - "format": "hostname", - "description": "# Kafka cluster name\n\nValid name to use as a kafka cluster identifier. MUST conform a valid hostname format." - }, - "KafkaClusterResponse": { + "HandlerMetadata": { "type": "object", - "description": "Kafka cluster details with subscriptions.", + "description": "Handler metadata", "required": [ "name", - "properties", - "created_at", - "subscriptions" - ], - "properties": { - "created_at": { - "type": "string", - "description": "# Created at\n\nWhen the Kafka cluster configuration was created." - }, - "info": { - "type": "array", - "items": { - "$ref": "#/components/schemas/SchemaInfo" - }, - "description": "# Info\n\nList of configuration/deprecation information related to this deployment." - }, - "name": { - "$ref": "#/components/schemas/KafkaClusterName", - "description": "# Cluster Name\n\nName for the Kafka cluster, used to identify this Kafka cluster configuration in subscriptions. Must be a valid hostname format." - }, - "properties": { - "type": "object", - "description": "# Properties\n\nProperties for connecting to the kafka cluster.\n\nFor a full list of configuration properties, check the [librdkafka documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).", - "additionalProperties": { - "type": "string" - }, - "propertyNames": { - "type": "string" - } - }, - "subscriptions": { - "type": "array", - "items": { - "$ref": "#/components/schemas/SubscriptionResponse" - }, - "description": "# Subscriptions\n\nSubscriptions to this Kafka cluster, returned only when `include_subscriptions` is enabled." - } - } - }, - "LambdaARN": { - "type": "string" - }, - "LambdaDeploymentResponse": { - "type": "object", - "description": "Deployment response for Lambda deployments", - "required": [ - "id", - "arn", - "created_at", - "min_protocol_version", - "max_protocol_version", - "services" + "input_description", + "output_description" ], "properties": { - "additional_headers": { - "$ref": "#/components/schemas/SerdeableHeaderHashMap", - "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." - }, - "arn": { - "$ref": "#/components/schemas/LambdaARN", - "description": "# Lambda ARN\n\nLambda ARN used to invoke this service deployment." + "abort_timeout": { + "type": [ + "string", + "null" + ], + "description": "# Abort timeout\n\nThis timer guards against stalled service/handler invocations that are supposed to\nterminate. The abort timeout is started after the 'inactivity timeout' has expired\nand the service/handler invocation has been asked to gracefully terminate. Once the\ntimer expires, it will abort the service/handler invocation.\n\nThis timer potentially **interrupts** user code. If the user code needs longer to\ngracefully terminate, then this value needs to be set accordingly.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`.\n\nIf set, it overrides the value set in the service." }, - "assume_role_arn": { + "documentation": { "type": [ "string", "null" ], - "description": "# Assume role ARN\n\nAssume role ARN used to invoke this deployment. Check https://docs.restate.dev/category/aws-lambda for more details." + "description": "# Documentation\n\nDocumentation of the handler, as propagated by the SDKs." }, - "compression": { - "oneOf": [ - { - "type": "null" - }, - { - "$ref": "#/components/schemas/EndpointLambdaCompression", - "description": "# Compression\n\nCompression algorithm used for invoking Lambda." - } - ] + "enable_lazy_state": { + "type": [ + "boolean", + "null" + ], + "description": "# Enable lazy state\n\nIf true, lazy state will be enabled for all invocations to this service.\nThis is relevant only for Workflows and Virtual Objects.\n\nIf set, it overrides the value set in the service." }, - "created_at": { - "type": "string" + "idempotency_retention": { + "type": [ + "string", + "null" + ], + "description": "# Idempotency retention\n\nThe retention duration of idempotent requests for this handler. If set, it overrides the value set in the service.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`." }, - "id": { - "$ref": "#/components/schemas/DeploymentId", - "description": "# Deployment ID" + "inactivity_timeout": { + "type": [ + "string", + "null" + ], + "description": "# Inactivity timeout\n\nThis timer guards against stalled service/handler invocations. Once it expires,\nRestate triggers a graceful termination by asking the service invocation to\nsuspend (which preserves intermediate progress).\n\nThe 'abort timeout' is used to abort the invocation, in case it doesn't react to\nthe request to suspend.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`.\n\nIf set, it overrides the value set in the service." }, "info": { "type": "array", "items": { "$ref": "#/components/schemas/SchemaInfo" }, - "description": "# Info\n\nList of configuration/deprecation information related to this deployment." + "description": "# Info\n\nList of configuration/deprecation information related to this handler." }, - "max_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + "input_description": { + "type": "string", + "description": "# Human readable input description\n\nIf empty, no schema was provided by the user at discovery time." + }, + "input_json_schema": { + "description": "# Input JSON Schema\n\nJSON Schema of the handler input" + }, + "journal_retention": { + "type": [ + "string", + "null" + ], + "description": "# Journal retention\n\nThe journal retention. When set, this applies to all requests to this handler.\n\nIn case the invocation has an idempotency key, the `idempotency_retention` caps the maximum `journal_retention` time.\nIn case this handler is a workflow handler, the `workflow_completion_retention` caps the maximum `journal_retention` time.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`.\n\nIf set, it overrides the value set in the service." }, "metadata": { "type": "object", - "description": "# Metadata\n\nDeployment metadata.", + "description": "# Metadata\n\nAdditional handler metadata, as propagated by the SDKs.", "additionalProperties": { "type": "string" }, @@ -2847,71 +2988,111 @@ "type": "string" } }, - "min_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + "name": { + "type": "string", + "description": "# Name\n\nThe handler name." }, - "sdk_version": { - "type": [ - "string", - "null" - ], - "description": "# SDK version\n\nSDK library and version declared during registration." + "output_description": { + "type": "string", + "description": "# Human readable output description\n\nIf empty, no schema was provided by the user at discovery time." }, - "services": { - "type": "array", - "items": { - "$ref": "#/components/schemas/ServiceNameRevPair" - }, - "description": "# Services\n\nList of services exposed by this deployment." + "output_json_schema": { + "description": "# Output JSON Schema\n\nJSON Schema of the handler output" + }, + "public": { + "type": "boolean", + "description": "# Public\n\nIf true, this handler can be invoked through the ingress.\nIf false, this handler can be invoked only from another Restate service." + }, + "retry_policy": { + "$ref": "#/components/schemas/HandlerRetryPolicyMetadata", + "description": "# Retry policy\n\nRetry policy overrides applied for this handler." + }, + "ty": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/HandlerMetadataType", + "description": "# Type\n\nThe handler type." + } + ] } } }, - "LambdaDetailedDeploymentResponse": { + "HandlerMetadataType": { + "type": "string", + "enum": [ + "Exclusive", + "Shared", + "Workflow" + ] + }, + "HandlerRetryPolicyMetadata": { "type": "object", - "description": "Detailed deployment response for Lambda deployments", - "required": [ - "id", - "arn", - "created_at", - "min_protocol_version", - "max_protocol_version", - "services" - ], + "description": "# Handler retry policy overrides", "properties": { - "additional_headers": { - "$ref": "#/components/schemas/SerdeableHeaderHashMap", - "description": "# Additional headers\n\nAdditional headers used to invoke this service deployment." + "exponentiation_factor": { + "type": [ + "number", + "null" + ], + "format": "float", + "description": "# Factor\n\nThe factor to use to compute the next retry attempt." + }, + "initial_interval": { + "type": [ + "string", + "null" + ], + "description": "# Initial Interval\n\nInitial interval for the first retry attempt.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`." }, - "arn": { - "$ref": "#/components/schemas/LambdaARN", - "description": "# Lambda ARN\n\nLambda ARN used to invoke this service deployment." + "max_attempts": { + "type": [ + "integer", + "null" + ], + "description": "# Max attempts\n\nNumber of maximum attempts (including the initial) before giving up. Infinite retries if unset. No retries if set to 1.", + "minimum": 1 }, - "assume_role_arn": { + "max_interval": { "type": [ "string", "null" ], - "description": "# Assume role ARN\n\nAssume role ARN used to invoke this deployment. Check https://docs.restate.dev/category/aws-lambda for more details." + "description": "# Max interval\n\nMaximum interval between retries.\n\nCan be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`." }, - "compression": { + "on_max_attempts": { "oneOf": [ { "type": "null" }, { - "$ref": "#/components/schemas/EndpointLambdaCompression", - "description": "# Compression\n\nCompression algorithm used for invoking Lambda." + "$ref": "#/components/schemas/OnMaxAttempts", + "description": "# On max attempts\n\nBehavior when max attempts are reached." } ] - }, + } + } + }, + "KafkaClusterName": { + "type": "string", + "format": "hostname", + "description": "# Kafka cluster name\n\nValid name to use as a kafka cluster identifier. MUST conform a valid hostname format." + }, + "KafkaClusterResponse": { + "type": "object", + "description": "Kafka cluster details with subscriptions.", + "required": [ + "name", + "properties", + "created_at", + "subscriptions" + ], + "properties": { "created_at": { - "type": "string" - }, - "id": { - "$ref": "#/components/schemas/DeploymentId", - "description": "# Deployment ID" + "type": "string", + "description": "# Created at\n\nWhen the Kafka cluster configuration was created." }, "info": { "type": "array", @@ -2920,14 +3101,13 @@ }, "description": "# Info\n\nList of configuration/deprecation information related to this deployment." }, - "max_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Maximum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." + "name": { + "$ref": "#/components/schemas/KafkaClusterName", + "description": "# Cluster Name\n\nName for the Kafka cluster, used to identify this Kafka cluster configuration in subscriptions. Must be a valid hostname format." }, - "metadata": { + "properties": { "type": "object", - "description": "# Metadata\n\nDeployment metadata.", + "description": "# Properties\n\nProperties for connecting to the kafka cluster.\n\nFor a full list of configuration properties, check the [librdkafka documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).", "additionalProperties": { "type": "string" }, @@ -2935,27 +3115,18 @@ "type": "string" } }, - "min_protocol_version": { - "type": "integer", - "format": "int32", - "description": "# Minimum Service Protocol version\n\nDuring registration, the SDKs declare a range from minimum (included) to maximum (included) Service Protocol supported version." - }, - "sdk_version": { - "type": [ - "string", - "null" - ], - "description": "# SDK version\n\nSDK library and version declared during registration." - }, - "services": { + "subscriptions": { "type": "array", "items": { - "$ref": "#/components/schemas/ServiceMetadata" + "$ref": "#/components/schemas/SubscriptionResponse" }, - "description": "# Services\n\nList of services exposed by this deployment." + "description": "# Subscriptions\n\nSubscriptions to this Kafka cluster, returned only when `include_subscriptions` is enabled." } } }, + "LambdaARN": { + "type": "string" + }, "ListDeploymentsResponse": { "type": "object", "description": "List of all registered deployments", @@ -3104,6 +3275,13 @@ "type": "string", "description": "# Service key\n\nTo what virtual object key to apply this change" }, + "scope": { + "type": [ + "string", + "null" + ], + "description": "# Scope\n\nOptional scope for the virtual object instance. When set, targets the scoped\ninstance instead of the unscoped one. Since v1.7.0." + }, "version": { "type": [ "string", @@ -3125,6 +3303,57 @@ "format": "int32", "minimum": 0 }, + "Precondition": { + "oneOf": [ + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "none" + ] + } + } + }, + { + "type": "object", + "required": [ + "version", + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "matches" + ] + }, + "version": { + "$ref": "#/components/schemas/Version" + } + } + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "does_not_exist" + ] + } + } + } + ], + "description": "Optimistic-concurrency guard for a [`RuleChange`].\n\n- [`Precondition::None`] applies the change unconditionally.\n- [`Precondition::Matches`] requires the rule to be present at the\n given version; otherwise the change is rejected with\n [`RuleBookError::PreconditionFailed`].\n- [`Precondition::DoesNotExist`] requires the rule to be absent.\n Combined with `Upsert` this is a pure insert." + }, "ProtocolType": { "type": "string", "enum": [ @@ -3329,6 +3558,45 @@ } } }, + "RuleResponse": { + "type": "object", + "required": [ + "pattern", + "limits", + "disabled", + "version", + "last_modified_millis_since_epoch" + ], + "properties": { + "description": { + "type": [ + "string", + "null" + ] + }, + "disabled": { + "type": "boolean" + }, + "last_modified_millis_since_epoch": { + "type": "integer", + "format": "int64", + "description": "Millis since UNIX epoch.", + "minimum": 0 + }, + "limits": { + "$ref": "#/components/schemas/UserLimits" + }, + "pattern": { + "type": "string" + }, + "version": { + "type": "integer", + "format": "int32", + "description": "Per-rule version: bumped on runtime-relevant changes.", + "minimum": 0 + } + } + }, "SchemaInfo": { "type": "object", "required": [ @@ -3687,15 +3955,78 @@ } } }, + "UpsertRuleRequest": { + "type": "object", + "description": "One entry in the body of `PUT /limits/rules`.\n\nEach entry carries a fully-specified rule body plus an optional\n[`Precondition`]. Omitting the `precondition` field defaults to\n`Precondition::None` (unconditional upsert).", + "required": [ + "pattern" + ], + "properties": { + "description": { + "type": [ + "string", + "null" + ], + "description": "Free-form description shown in the rule book; not consulted at\nruntime." + }, + "disabled": { + "type": "boolean", + "description": "Soft-tombstone toggle. `true` parks the rule (the runtime treats\nit as absent) without removing it." + }, + "limits": { + "$ref": "#/components/schemas/UserLimits" + }, + "pattern": { + "type": "string", + "description": "The pattern that selects which scope/limit-key combinations the\nrule applies to. Examples: `\"*\"`, `\"scope1/*\"`, `\"scope1/foo/bar\"`." + }, + "precondition": { + "$ref": "#/components/schemas/Precondition", + "description": "Optimistic-concurrency guard. `{ \"type\": \"matches\", \"version\": v }`\nrequires the rule's current version to be `v`;\n`{ \"type\": \"does_not_exist\" }` requires the rule to be absent\n(strict insert); `{ \"type\": \"none\" }` (or omitted) is\nunconditional." + } + } + }, + "UserLimits": { + "type": "object", + "description": "Per-rule effective limits.\n\n`None` on a field means \"unlimited\" (no rule constrains this dimension).\nUnder the `bilrost` feature this type is also the wire shape persisted\ninside [`crate::PersistedRule`]; under `serde` it's the JSON wire shape\nfor the admin REST model — adding a new limit kind here means allocating\na fresh `bilrost(tag(...))` next to the new field.", + "properties": { + "action_concurrency": { + "type": [ + "integer", + "null" + ], + "format": "int32", + "description": "Maximum concurrent invocations. `None` means unlimited.", + "minimum": 1 + } + } + }, + "Version": { + "type": "integer", + "format": "int32", + "description": "A type used for versioned metadata.", + "minimum": 0 + }, "VersionInformation": { "type": "object", "description": "Admin API version information", "required": [ "version", "min_admin_api_version", - "max_admin_api_version" + "max_admin_api_version", + "features" ], "properties": { + "features": { + "type": "object", + "description": "# Restate experimental features\n\nList experimental features with their\nenabled state.", + "additionalProperties": { + "type": "boolean" + }, + "propertyNames": { + "type": "string" + } + }, "ingress_endpoint": { "oneOf": [ { @@ -3781,6 +4112,16 @@ } } } + }, + "UnprocessableEntity": { + "description": "Unprocessable entity", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorDescriptionResponse" + } + } + } } } }, @@ -3832,6 +4173,10 @@ { "name": "introspection", "description": "System introspection" + }, + { + "name": "rule", + "description": "Limiter rule book management" } ], "externalDocs": { From 2ea0e612ea22b0b611c458bf28844b56b6229772 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 22 May 2026 10:56:58 +0200 Subject: [PATCH 2/5] Await rule book --- .../sdktesting/tests/ConcurrencyLimitTest.kt | 10 ++- .../sdktesting/tests/ScopeIsolationTest.kt | 31 ++++---- .../dev/restate/sdktesting/tests/utils.kt | 72 +++++++++++++++++++ 3 files changed, 94 insertions(+), 19 deletions(-) diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt index 705a7b30..bf447706 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt @@ -32,7 +32,6 @@ import dev.restate.serde.TypeTag import java.net.URI import java.util.UUID import kotlin.time.Duration.Companion.seconds -import kotlinx.coroutines.delay import kotlinx.serialization.json.Json import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await @@ -105,7 +104,10 @@ class ConcurrencyLimitTest { val invocationCount = 4 val blockerKeys = (0 until invocationCount).map { "block-key-$runId-$it" } - upsertActionConcurrencyRule(adminURI, pattern = scope, actionConcurrency = limit) + val ruleVersion = + upsertActionConcurrencyRule(adminURI, pattern = scope, actionConcurrency = limit) + .version + awaitRuleBookApplied(adminURI, ruleVersion) val outerIds = blockerKeys.map { key -> @@ -121,10 +123,6 @@ class ConcurrencyLimitTest { { assertThat(getAllInvocations(adminURI, blockerTargetFilter)).hasSize(limit) } - // Briefly confirm the count stays at `limit`, defending against a slow rule activation - // where all 4 would have started before our first check. - delay(1.seconds) - assertThat(getAllInvocations(adminURI, blockerTargetFilter)).hasSize(limit) // Resolve one awakeable at a time. After each resolve, a held outer becomes running and // spawns its Blocker. diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt index 00940013..1eb56392 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ScopeIsolationTest.kt @@ -34,24 +34,19 @@ import org.junit.jupiter.api.extension.RegisterExtension class ScopeIsolationTest { @Service - @Name("ScopedEcho") - class ScopedEcho { - /** - * Returns a fresh random value via `runBlock` per invocation. Used to prove the two scoped - * calls actually executed independently rather than the second one being deduplicated to the - * first one's stored result. - */ - @Handler suspend fun echo(input: String): String = runBlock { UUID.randomUUID().toString() } + @Name("Random") + class Random { + @Handler suspend fun genRandomUUID(input: String) = runBlock { UUID.randomUUID().toString() } } companion object { @RegisterExtension @JvmField val deployerExt: RestateDeployerExtension = RestateDeployerExtension { - withEndpoint(Endpoint.bind(ScopedEcho())) + withEndpoint(Endpoint.bind(Random())) // Scoped invocations require the vqueues experimental feature. - // TODO: drop this once the minimum supported Restate version is v1.8, where vqueues is on by - // default. + // TODO: drop this once the minimum supported Restate version is v1.8, + // where vqueues are enabled by default. withEnv("RESTATE_EXPERIMENTAL_ENABLE_VQUEUES", "true") } } @@ -69,10 +64,20 @@ class ScopeIsolationTest { val idA = sendInvocationWithScope( - ingressURI, scopeA, "ScopedEcho", "echo", body, idempotencyKey = sharedIdempotencyKey) + ingressURI, + scopeA, + "Random", + "genRandomUUID", + body, + idempotencyKey = sharedIdempotencyKey) val idB = sendInvocationWithScope( - ingressURI, scopeB, "ScopedEcho", "echo", body, idempotencyKey = sharedIdempotencyKey) + ingressURI, + scopeB, + "Random", + "genRandomUUID", + body, + idempotencyKey = sharedIdempotencyKey) assertThat(idA).isNotEqualTo(idB) diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index 71d47a79..d44ffae0 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -41,6 +41,8 @@ import org.apache.logging.log4j.kotlin.additionalLoggingContext import org.assertj.core.api.Assertions.assertThat import org.awaitility.Awaitility import org.awaitility.core.ConditionFactory +import org.awaitility.kotlin.await +import org.awaitility.kotlin.withAlias import org.testcontainers.Testcontainers private val LOG = LogManager.getLogger("dev.restate.sdktesting.tests") @@ -99,6 +101,17 @@ data class SysJournalEntry(val index: Int, @SerialName("entry_type") val entryTy @Serializable data class SysInvocationEntry(val id: String, val status: String) +/** Data classes for partition_state query result */ +@Serializable +data class PartitionStateQueryResult(val rows: List = emptyList()) + +@Serializable +data class PartitionStateEntry( + @SerialName("partition_id") val partitionId: Long, + @SerialName("plain_node_id") val plainNodeId: String, + @SerialName("applied_rule_book_version") val appliedRuleBookVersion: Long? = null, +) + /** JSON parser with configuration for sys_journal and sys_invocation query results */ private val sysQueryJson = Json { ignoreUnknownKeys = true @@ -187,6 +200,65 @@ suspend fun getAllInvocations(adminURI: URI, filter: String? = null): List(response.body()).rows } +/** + * Queries the partition_state table and returns one row per partition processor. + * + * `applied_rule_book_version` is the version of the rule book currently applied by the partition + * processor (see Restate PR #4783); `null` until the first `UpsertRuleBook` has been observed. + */ +suspend fun getAllPartitionStates(adminURI: URI): List { + val request = + HttpRequest.newBuilder() + .uri(URI.create("http://${adminURI.host}:${adminURI.port}/query")) + .header("accept", "application/json") + .header("content-type", "application/json") + .POST( + HttpRequest.BodyPublishers.ofString( + """{"query": "SELECT partition_id, plain_node_id, applied_rule_book_version FROM partition_state"}""")) + .build() + + val response = + HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await() + + return sysQueryJson.decodeFromString(response.body()).rows +} + +/** + * Block until every partition processor reports `applied_rule_book_version >= expectedVersion`. + * + * After an admin upsert succeeds, the new rule book still has to propagate from the metadata store + * to each partition processor's state machine; without this wait, scoped invocations issued + * immediately after the upsert may race the rule and run unthrottled. + * + * Pass the `version` from the [dev.restate.admin.model.RuleResponse] returned by upsert: for a + * freshly-created rule the per-rule version equals the post-bump rule-book version, so it's a safe + * lower bound to wait for. + */ +suspend fun awaitRuleBookApplied( + adminURI: URI, + expectedVersion: Int, + timeout: Duration = 30.seconds +) { + await withAlias + "partition_state.applied_rule_book_version >= $expectedVersion on all partitions" withTimeout + timeout untilAsserted + { + val states = getAllPartitionStates(adminURI) + assertThat(states).isNotEmpty + assertThat(states).allSatisfy { row -> + assertThat(row.appliedRuleBookVersion) + .withFailMessage( + "partition %d on node %s has not applied rule-book version %d yet (currently %s)", + row.partitionId, + row.plainNodeId, + expectedVersion, + row.appliedRuleBookVersion) + .isNotNull + .isGreaterThanOrEqualTo(expectedVersion.toLong()) + } + } +} + /** * Starts a local Restate HTTP server for a given Endpoint and exposes the port to Testcontainers. * Returns an AutoCloseable handle that contains the URI and closes the server on close(). From 19fdacfeb94f7022cb702affc1534409b2c17442 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 22 May 2026 13:02:44 +0200 Subject: [PATCH 3/5] fix --- .../sdktesting/tests/ConcurrencyLimitTest.kt | 3 +- .../dev/restate/sdktesting/tests/utils.kt | 56 +++++++++++-------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt index bf447706..fe5369bb 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/ConcurrencyLimitTest.kt @@ -95,6 +95,7 @@ class ConcurrencyLimitTest { fun actionConcurrencyLimitIsRespected( @InjectIngressURI ingressURI: URI, @InjectAdminURI adminURI: URI, + @InjectContainerHandle(hostName = RESTATE_RUNTIME) runtimeHandle: ContainerHandle, @InjectClient ingressClient: Client, ) = runTest(timeout = 120.seconds) { @@ -107,7 +108,7 @@ class ConcurrencyLimitTest { val ruleVersion = upsertActionConcurrencyRule(adminURI, pattern = scope, actionConcurrency = limit) .version - awaitRuleBookApplied(adminURI, ruleVersion) + awaitRuleBookApplied(runtimeHandle, ruleVersion) val outerIds = blockerKeys.map { key -> diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index d44ffae0..350957c5 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -16,6 +16,7 @@ import dev.restate.admin.model.RegisterHttpDeploymentRequest import dev.restate.common.InvocationOptions import dev.restate.sdk.endpoint.Endpoint import dev.restate.sdk.http.vertx.RestateHttpServer +import dev.restate.sdktesting.infra.ContainerHandle import io.vertx.core.http.HttpServer import java.net.URI import java.net.http.HttpClient @@ -101,10 +102,13 @@ data class SysJournalEntry(val index: Int, @SerialName("entry_type") val entryTy @Serializable data class SysInvocationEntry(val id: String, val status: String) -/** Data classes for partition_state query result */ -@Serializable -data class PartitionStateQueryResult(val rows: List = emptyList()) - +/** + * One row of `partition_state` as returned by `restatectl sql --json`. + * + * `partition_state` is an internal cluster-ctrl table not exposed on the admin `/query` port — see + * https://github.com/restatedev/restate/pull/4783 — so we have to shell out to `restatectl` + * inside the runtime container instead. + */ @Serializable data class PartitionStateEntry( @SerialName("partition_id") val partitionId: Long, @@ -201,26 +205,30 @@ suspend fun getAllInvocations(adminURI: URI, filter: String? = null): List { - val request = - HttpRequest.newBuilder() - .uri(URI.create("http://${adminURI.host}:${adminURI.port}/query")) - .header("accept", "application/json") - .header("content-type", "application/json") - .POST( - HttpRequest.BodyPublishers.ofString( - """{"query": "SELECT partition_id, plain_node_id, applied_rule_book_version FROM partition_state"}""")) - .build() - - val response = - HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await() - - return sysQueryJson.decodeFromString(response.body()).rows +suspend fun getAllPartitionStates(runtimeHandle: ContainerHandle): List { + val result = + withContext(Dispatchers.IO) { + runtimeHandle.container.execInContainer( + "restatectl", + "sql", + "--json", + "SELECT partition_id, plain_node_id, applied_rule_book_version FROM partition_state") + } + check(result.exitCode == 0) { + "restatectl sql exited with ${result.exitCode}: stdout=${result.stdout}, stderr=${result.stderr}" + } + // `restatectl sql --json` only writes the row count + timing to stderr; stdout is a single + // arrow JSON array. An empty result set is "" (no rows ever printed), so guard for that. + val stdout = result.stdout.trim() + if (stdout.isEmpty()) return emptyList() + return sysQueryJson.decodeFromString(stdout) } /** @@ -235,7 +243,7 @@ suspend fun getAllPartitionStates(adminURI: URI): List { * lower bound to wait for. */ suspend fun awaitRuleBookApplied( - adminURI: URI, + runtimeHandle: ContainerHandle, expectedVersion: Int, timeout: Duration = 30.seconds ) { @@ -243,7 +251,7 @@ suspend fun awaitRuleBookApplied( "partition_state.applied_rule_book_version >= $expectedVersion on all partitions" withTimeout timeout untilAsserted { - val states = getAllPartitionStates(adminURI) + val states = getAllPartitionStates(runtimeHandle) assertThat(states).isNotEmpty assertThat(states).allSatisfy { row -> assertThat(row.appliedRuleBookVersion) From fafe44580df93040f791bf7774d7fd37cb5cdbdc Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 22 May 2026 17:54:21 +0200 Subject: [PATCH 4/5] These are not supposed to work with suspensions! --- .../main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt | 3 +-- .../src/main/kotlin/dev/restate/sdktesting/tests/utils.kt | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt index 0cf0b69e..9e4b2c34 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/junit/TestSuites.kt @@ -78,7 +78,6 @@ object TestSuites : SuiteProvider { "alwaysSuspending", mapOf("RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s"), listOf( - clazz(), clazz(), clazz(), clazz(), @@ -92,7 +91,7 @@ object TestSuites : SuiteProvider { "RESTATE_WORKER__INVOKER__INACTIVITY_TIMEOUT" to "0s", "RESTATE_DEFAULT_NUM_PARTITIONS" to "4", ), - listOf(clazz(), clazz()), + listOf(clazz()), 3) private val VERSION_COMPATIBILITY_SUITE = diff --git a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index 350957c5..e1db067c 100644 --- a/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/e2e-tests/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -106,8 +106,8 @@ data class SysJournalEntry(val index: Int, @SerialName("entry_type") val entryTy * One row of `partition_state` as returned by `restatectl sql --json`. * * `partition_state` is an internal cluster-ctrl table not exposed on the admin `/query` port — see - * https://github.com/restatedev/restate/pull/4783 — so we have to shell out to `restatectl` - * inside the runtime container instead. + * https://github.com/restatedev/restate/pull/4783 — so we have to shell out to `restatectl` inside + * the runtime container instead. */ @Serializable data class PartitionStateEntry( From d9551d57552ceb3731da13270d55a7c2aa2da1ca Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 5 Jun 2026 14:32:44 +0200 Subject: [PATCH 5/5] Fix openapi --- infra/src/main/openapi/admin.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infra/src/main/openapi/admin.json b/infra/src/main/openapi/admin.json index 60fd4b63..8c481074 100644 --- a/infra/src/main/openapi/admin.json +++ b/infra/src/main/openapi/admin.json @@ -2522,7 +2522,7 @@ "type": "string" }, "DeploymentResponse": { - "oneOf": [ + "anyOf": [ { "type": "object", "title": "HttpDeploymentResponse", @@ -2697,7 +2697,7 @@ ] }, "DetailedDeploymentResponse": { - "oneOf": [ + "anyOf": [ { "type": "object", "title": "HttpDetailedDeploymentResponse",