Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import community.flock.aigentic.core.message.ToolCall
import community.flock.aigentic.core.message.asJson
import community.flock.aigentic.core.message.mapToTextMessages
import community.flock.aigentic.core.model.ModelResponse
import community.flock.aigentic.core.platform.EvaluationSubmitResult
import community.flock.aigentic.core.platform.RunSentResult
import community.flock.aigentic.core.platform.addToEvaluationSet
import community.flock.aigentic.core.platform.getRuns
import community.flock.aigentic.core.platform.sendRun
import community.flock.aigentic.core.tool.Parameter
Expand All @@ -51,6 +53,7 @@ suspend inline fun <reified I : Any, reified O : Any> Agent<I, O>.start(vararg a
suspend inline fun <reified I : Any, reified O : Any> Agent<I, O>.start(
input: I? = null,
vararg attachments: Attachment,
expected: Expected<O>? = null,
): AgentRun<O> =
coroutineScope {
val agent = this@start
Expand All @@ -59,13 +62,13 @@ suspend inline fun <reified I : Any, reified O : Any> Agent<I, O>.start(
val logging = async { state.getStatus().map { it.text }.collect(::println) }
try {
val run = executeAction(Initialize(state, agent, input, attachments.toList())).toRun()
publishRun(agent, run, state)
run
val platformRunId = publishRun(agent, run, state, expected)
run.copy(platformRunId = platformRunId)
} catch (e: AigenticException) {
state.events.emit(AgentStatus.Fatal(e.message))
val run = (state to Outcome.Fatal(e.message)).toRun<O>()
publishRun(agent, run, state)
run
val platformRunId = publishRun(agent, run, state, expected)
run.copy(platformRunId = platformRunId)
} finally {
delay(10) // Allow some time for the logging to finish
logging.cancelAndJoin()
Expand All @@ -77,22 +80,45 @@ internal suspend inline fun <reified I : Any, reified O : Any> publishRun(
agent: Agent<I, O>,
run: AgentRun<O>,
state: State,
) {
if (agent.platform != null) {
runCatching {
agent.platform.sendRun(run, agent)
}.onSuccess { result ->
expected: Expected<O>?,
): RunId? {
if (agent.platform == null) return null
return runCatching {
agent.platform.sendRun(run, agent, expected)
}.fold(
onSuccess = { result ->
when (result) {
RunSentResult.Success -> state.events.emit(AgentStatus.PublishedRunSuccess)
RunSentResult.Unauthorized -> state.events.emit(AgentStatus.PublishedRunUnauthorized)
is RunSentResult.Error -> state.events.emit(AgentStatus.PublishedRunError(result.message))
is RunSentResult.Success -> {
state.events.emit(AgentStatus.PublishedRunSuccess)
result.runId
}

RunSentResult.Unauthorized -> {
state.events.emit(AgentStatus.PublishedRunUnauthorized)
null
}

is RunSentResult.Error -> {
state.events.emit(AgentStatus.PublishedRunError(result.message))
null
}
}
}.onFailure { exception ->
},
onFailure = { exception ->
state.events.emit(AgentStatus.PublishedRunError(exception.message ?: "Unknown error"))
}
}
null
},
)
}

suspend inline fun <reified I : Any, reified O : Any> Agent<I, O>.addToEvaluationSet(
runId: String,
evaluationSet: String,
expected: O,
): EvaluationSubmitResult =
platform?.addToEvaluationSet(RunId(runId), evaluationSet, expected)
?: aigenticException("Platform must be configured to add a run to an evaluation set")

suspend inline fun <reified I : Any, reified O : Any> executeAction(action: Action<I, O>): Pair<State, Outcome<O>> {
var currentAction = action
while (true) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package community.flock.aigentic.core.agent

data class Expected<O : Any>(
val evaluationSet: String,
val output: O,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ data class AgentRun<O : Any>(
override val modelRequests: List<ModelRequestInfo>,
val systemPromptMessage: Message.SystemPrompt,
val exampleRunIds: List<RunId> = emptyList(),
val platformRunId: RunId? = null,
) : Run<O>()

data class WorkflowRun<O : Any>(
Expand Down Expand Up @@ -120,5 +121,6 @@ internal inline fun <reified O : Any> AgentRun<String>.decode(): AgentRun<O> {
modelRequests = modelRequests,
exampleRunIds = exampleRunIds,
systemPromptMessage = systemPromptMessage,
platformRunId = platformRunId,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package community.flock.aigentic.core.platform

import community.flock.aigentic.core.agent.Agent
import community.flock.aigentic.core.agent.AgentRun
import community.flock.aigentic.core.agent.Expected
import community.flock.aigentic.core.agent.RunId
import community.flock.aigentic.core.agent.RunTag
import community.flock.aigentic.core.agent.decode
Expand Down Expand Up @@ -30,8 +31,16 @@ interface PlatformClient {
run: AgentRun<O>,
agent: Agent<I, O>,
outputSerializer: KSerializer<O>,
expected: Expected<O>?,
): RunSentResult

suspend fun <O : Any> addToEvaluationSet(
runId: RunId,
evaluationSet: String,
expected: O,
outputSerializer: KSerializer<O>,
): EvaluationSubmitResult

suspend fun getRuns(tags: List<RunTag>): List<Pair<RunId, AgentRun<String>>>
}

Expand All @@ -44,7 +53,14 @@ interface Platform {
suspend inline fun <reified I : Any, reified O : Any> Platform.sendRun(
run: AgentRun<O>,
agent: Agent<I, O>,
): RunSentResult = client.sendRun(run, agent, serializer<O>())
expected: Expected<O>? = null,
): RunSentResult = client.sendRun(run, agent, serializer<O>(), expected)

suspend inline fun <reified O : Any> Platform.addToEvaluationSet(
runId: RunId,
evaluationSet: String,
expected: O,
): EvaluationSubmitResult = client.addToEvaluationSet(runId, evaluationSet, expected, serializer<O>())

suspend inline fun <reified O : Any> Platform.getRuns(tags: List<RunTag>): List<Pair<RunId, AgentRun<O>>> =
client
Expand All @@ -54,11 +70,25 @@ suspend inline fun <reified O : Any> Platform.getRuns(tags: List<RunTag>): List<
}

sealed interface RunSentResult {
data object Success : RunSentResult
data class Success(
val runId: RunId,
) : RunSentResult

data object Unauthorized : RunSentResult

data class Error(
val message: String,
) : RunSentResult
}

sealed interface EvaluationSubmitResult {
data object Success : EvaluationSubmitResult

data object Unauthorized : EvaluationSubmitResult

data object NotFound : EvaluationSubmitResult

data class Error(
val message: String,
) : EvaluationSubmitResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package community.flock.aigentic.platform.client

import community.flock.aigentic.core.agent.Agent
import community.flock.aigentic.core.agent.AgentRun
import community.flock.aigentic.core.agent.Expected
import community.flock.aigentic.core.agent.RunId
import community.flock.aigentic.core.agent.RunTag
import community.flock.aigentic.core.exception.aigenticException
import community.flock.aigentic.core.platform.Authentication
import community.flock.aigentic.core.platform.EvaluationSubmitResult
import community.flock.aigentic.core.platform.PlatformApiUrl
import community.flock.aigentic.core.platform.PlatformClient
import community.flock.aigentic.core.platform.RunSentResult
import community.flock.aigentic.gateway.wirespec.endpoint.AddRunAnnotations
import community.flock.aigentic.gateway.wirespec.endpoint.Gateway
import community.flock.aigentic.gateway.wirespec.endpoint.GetRuns
import community.flock.aigentic.gateway.wirespec.model.RunCreatedDto
import community.flock.aigentic.gateway.wirespec.model.RunEvaluationDto
import community.flock.aigentic.platform.mapper.toDto
import community.flock.aigentic.platform.mapper.toRun
import community.flock.wirespec.kotlin.Wirespec
Expand Down Expand Up @@ -45,7 +50,8 @@ import kotlin.reflect.KType

interface PlatformEndpoints :
Gateway.Handler,
GetRuns.Handler
GetRuns.Handler,
AddRunAnnotations.Handler

const val defaultPlatformApiUrl = "https://aigentic-backend-kib53ypjwq-ez.a.run.app/"

Expand All @@ -58,12 +64,16 @@ class AigenticPlatformClient(
run: AgentRun<O>,
agent: Agent<I, O>,
outputSerializer: KSerializer<O>,
expected: Expected<O>?,
): RunSentResult {
val runDto = run.toDto(agent, outputSerializer)
val runDto = run.toDto(agent, outputSerializer, expected)
val request = Gateway.Request(body = runDto)
return when (val response = endpoints.gateway(request)) {
is Gateway.Response201 -> {
RunSentResult.Success
response.body.runId
.takeIf { it.isNotBlank() }
?.let { RunSentResult.Success(RunId(it)) }
?: RunSentResult.Error("Gateway accepted the run but returned no run id")
}

is Gateway.Response401 -> {
Expand All @@ -82,6 +92,44 @@ class AigenticPlatformClient(
}
}

override suspend fun <O : Any> addToEvaluationSet(
runId: RunId,
evaluationSet: String,
expected: O,
outputSerializer: KSerializer<O>,
): EvaluationSubmitResult {
val request =
AddRunAnnotations.Request(
runId = runId.value,
body =
RunEvaluationDto(
evaluationSet = evaluationSet,
expectedResponse = Json.encodeToString(outputSerializer, expected),
),
)
return when (val response = endpoints.addRunAnnotations(request)) {
is AddRunAnnotations.Response200 -> {
EvaluationSubmitResult.Success
}

is AddRunAnnotations.Response401 -> {
EvaluationSubmitResult.Unauthorized
}

is AddRunAnnotations.Response404 -> {
EvaluationSubmitResult.NotFound
}

is AddRunAnnotations.Response400 -> {
EvaluationSubmitResult.Error(response.body.message)
}

is AddRunAnnotations.Response500 -> {
EvaluationSubmitResult.Error("${response.body.name} - ${response.body.description}")
}
}
}

override suspend fun getRuns(tags: List<RunTag>): List<Pair<RunId, AgentRun<String>>> =
when (val response = endpoints.getRuns(GetRuns.Request(tags = tags.joinToString(",") { it.value }))) {
is GetRuns.Response200 -> response.body
Expand Down Expand Up @@ -166,6 +214,11 @@ class AigenticPlatformEndpoints(
val edge = Gateway.Handler.client(serialization)
val rawRequest = edge.to(request)
val rawResponse = executeRequest(rawRequest)
// Backward compatibility: older gateways answer 201 with an empty body (the previous `201 -> Unit`).
// Surface it as a 201 with a blank runId so it doesn't throw; sendRun maps the blank id to an Error.
if (rawResponse.statusCode == 201 && rawResponse.body?.isEmpty() != false) {
return Gateway.Response201(RunCreatedDto(runId = ""))
}
return edge.from(rawResponse)
}

Expand All @@ -176,6 +229,13 @@ class AigenticPlatformEndpoints(
return edge.from(rawResponse)
}

override suspend fun addRunAnnotations(request: AddRunAnnotations.Request): AddRunAnnotations.Response<*> {
val edge = AddRunAnnotations.Handler.client(serialization)
val rawRequest = edge.to(request)
val rawResponse = executeRequest(rawRequest)
return edge.from(rawResponse)
}

private suspend fun executeRequest(rawRequest: Wirespec.RawRequest): Wirespec.RawResponse {
val response =
httpClient.request {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package community.flock.aigentic.platform.mapper

import community.flock.aigentic.core.agent.Agent
import community.flock.aigentic.core.agent.AgentRun
import community.flock.aigentic.core.agent.Expected
import community.flock.aigentic.core.agent.state.ModelRequestInfo
import community.flock.aigentic.core.agent.tool.Outcome
import community.flock.aigentic.core.message.Message
Expand Down Expand Up @@ -34,6 +35,7 @@ import community.flock.aigentic.gateway.wirespec.model.PrimitiveValueNumberDto
import community.flock.aigentic.gateway.wirespec.model.PrimitiveValueStringDto
import community.flock.aigentic.gateway.wirespec.model.PrimitiveValueTypeDto
import community.flock.aigentic.gateway.wirespec.model.RunDto
import community.flock.aigentic.gateway.wirespec.model.RunEvaluationDto
import community.flock.aigentic.gateway.wirespec.model.SenderDto
import community.flock.aigentic.gateway.wirespec.model.StructuredOutputMessageDto
import community.flock.aigentic.gateway.wirespec.model.StuckResultDto
Expand Down Expand Up @@ -65,6 +67,7 @@ private fun Parameter.toJsonSchemaString(): String =
fun <I : Any, O : Any> AgentRun<O>.toDto(
agent: Agent<I, O>,
outputSerializer: KSerializer<O>,
expected: Expected<O>? = null,
): RunDto =
RunDto(
startedAt = startedAt.toString(),
Expand Down Expand Up @@ -99,6 +102,13 @@ fun <I : Any, O : Any> AgentRun<O>.toDto(
messages = messages.mapNotNull { it.toDto() },
modelRequests = modelRequests.map { it.toDto() },
result = outcome.toDto(outputSerializer),
evaluation =
expected?.let {
RunEvaluationDto(
evaluationSet = it.evaluationSet,
expectedResponse = Json.encodeToString(outputSerializer, it.output),
)
},
)

private fun Parameter.toDto(): ParameterDto =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package community.flock.aigentic.platform.client

import community.flock.aigentic.core.agent.RunId
import community.flock.aigentic.core.platform.Authentication
import community.flock.aigentic.core.platform.PlatformApiUrl
import community.flock.aigentic.core.platform.RunSentResult
import community.flock.aigentic.gateway.wirespec.endpoint.Gateway
import community.flock.aigentic.gateway.wirespec.model.GatewayClientErrorDto
import community.flock.aigentic.gateway.wirespec.model.RunCreatedDto
import community.flock.aigentic.gateway.wirespec.model.ServerErrorDto
import community.flock.aigentic.platform.util.createAgent
import community.flock.aigentic.platform.util.createAgentRun
Expand All @@ -20,7 +22,11 @@ class AigenticPlatformClientTest :

withData(
nameFn = { "Should map ${it.wirespecResponse} to ${it.runSentResult}" },
TestCase(Gateway.Response201(body = Unit), RunSentResult.Success),
TestCase(Gateway.Response201(body = RunCreatedDto("run-123")), RunSentResult.Success(RunId("run-123"))),
TestCase(
Gateway.Response201(body = RunCreatedDto("")),
RunSentResult.Error("Gateway accepted the run but returned no run id"),
),
TestCase(Gateway.Response401(body = Unit), RunSentResult.Unauthorized),
TestCase(
Gateway.Response400(body = GatewayClientErrorDto("invalid request")),
Expand Down Expand Up @@ -50,7 +56,7 @@ class AigenticPlatformClientTest :
platformEndpoints,
)

val result = client.sendRun(run, agent, serializer<String>())
val result = client.sendRun(run, agent, serializer<String>(), null)

result shouldBe it.runSentResult
}
Expand Down
Loading
Loading