-
Notifications
You must be signed in to change notification settings - Fork 333
Open
Description
While working with akka streams and kamon, where I propagate context I realized that when using Source from akka streams in looses tags and token, here is PoC code:
package repro
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import kamon.Kamon
import org.slf4j.LoggerFactory
import scala.concurrent.Future
object Main extends App {
Kamon.init()
private val log = LoggerFactory.getLogger(getClass)
implicit val system: ActorSystem = ActorSystem("Repro")
implicit val mat: Materializer = Materializer(system)
implicit val ec = system.dispatcher
def withFakeSecurityContext[T](body: => T): T = {
val ctx = Kamon
.currentContext()
.withTag("user.login", "john.doe@gmail.com")
.withTag("tenant", "qwerty")
.withTag("auth.token", "fake-jwt-signature")
Kamon.runWithContext(ctx)(body)
}
val stream =
Source(1 to 1)
.map { i =>
log.info(
s"[map-1] ctx ${Kamon.currentContext()}"
)
i
}
.async // <-- explicit async boundary
.map { i =>
log.info(
s"[map-2] ctx ${Kamon.currentContext()}"
)
i
}
.runWith(Sink.ignore)
withFakeSecurityContext {
log.info(s"[before] ctx ${Kamon.currentContext()}")
stream
}
withFakeSecurityContext {
println(s"[Future1] ctx ${Kamon.currentContext()}")
Future {
log.info(s"[Future2] ctx ${Kamon.currentContext()}")
}
log.info(s"[Future3] ctx ${Kamon.currentContext()}")
}
val graph =
Source(1 to 1)
.map(i => { log.info(s"[map-1-v2] ctx ${Kamon.currentContext()}"); i })
.async
.map(i => { log.info(s"[map-2-v2] ctx ${Kamon.currentContext()}"); i })
val stream2 =
withFakeSecurityContext {
log.info(s"[before2-v2] ctx ${Kamon.currentContext()}")
graph.runWith(Sink.ignore) // ← materialise here, tags & span present
}
stream.onComplete(_ => {
system.terminate()
Kamon.stop()
})
}
build.sbt
ThisBuild / scalaVersion := "2.13.15"
lazy val root = (project in file("."))
.settings(
name := "kamon-context-loss-repro",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.6.21",
"com.typesafe.akka" %% "akka-stream" % "2.6.21",
"io.kamon" %% "kamon-bundle" % "2.7.7",
"ch.qos.logback" % "logback-classic" % "1.3.15"
)
)
and log output:
Initializing Kamon Telemetry v2.7.7 / Kanela v1.0.18
13:19:57.186 [main] INFO repro.Main$ -- [before] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
[Future1] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
13:19:57.190 [main] INFO repro.Main$ -- [Future3] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
13:19:57.191 [Repro-akka.actor.default-dispatcher-5] INFO repro.Main$ -- [Future2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
13:19:57.191 [main] INFO repro.Main$ -- [before2-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-7] INFO repro.Main$ -- [map-1] ctx Context{Entries{span=Span.Empty},Tags{}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-8] INFO repro.Main$ -- [map-1-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-6] INFO repro.Main$ -- [map-2] ctx Context{Entries{span=Span.Empty},Tags{}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-5] INFO repro.Main$ -- [map-2-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,user.login=john.doe@gmail.com,auth.token=fake-jwt-signature}}
Metadata
Metadata
Assignees
Labels
No labels