11use std:: collections:: HashMap ;
22
3+ use chrono:: { DateTime , Utc } ;
4+ use k8s_openapi:: apimachinery:: pkg:: apis:: meta:: v1:: Time ;
35use kube:: {
46 Api , Client , Discovery ,
57 api:: { Patch , PatchParams } ,
@@ -16,6 +18,7 @@ use trino_lb_core::{
1618use super :: ScalerTrait ;
1719
1820const K8S_FIELD_MANAGER : & str = "trino-lb" ;
21+ const MIN_READY_SECONDS_SINCE_LAST_TRANSITION : i64 = 5 ;
1922
2023#[ derive( Snafu , Debug ) ]
2124pub enum Error {
@@ -123,6 +126,16 @@ pub enum Error {
123126 cluster : TrinoClusterName ,
124127 namespace : String ,
125128 } ,
129+
130+ #[ snafu( display(
131+ "Could not parse the lastTransitionTime {last_transition_time:?} for the Trino cluster {cluster:?} in namespace {namespace:?}"
132+ ) ) ]
133+ ParseLastTransitionTime {
134+ source : serde_json:: Error ,
135+ last_transition_time : Value ,
136+ cluster : TrinoClusterName ,
137+ namespace : String ,
138+ } ,
126139}
127140
128141pub struct StackableScaler {
@@ -160,7 +173,7 @@ impl StackableScaler {
160173 } ;
161174 let ( trino_resource, _) = discovery
162175 . resolve_gvk ( & trino_gvk)
163- . context ( ResolveGvkSnafu { gvk : trino_gvk } ) ?;
176+ . with_context ( || ResolveGvkSnafu { gvk : trino_gvk } ) ?;
164177
165178 for cluster in trino_cluster_groups
166179 . values ( )
@@ -185,13 +198,13 @@ impl StackableScaler {
185198 let api: Api < DynamicObject > =
186199 Api :: namespaced_with ( client. clone ( ) , & cluster. namespace , & trino_resource) ;
187200
188- let trino = api
189- . get_opt ( & cluster. name )
190- . await
191- . context ( ReadTrinoClusterSnafu {
192- cluster : & cluster. name ,
193- namespace : & cluster. namespace ,
194- } ) ?;
201+ let trino =
202+ api . get_opt ( & cluster. name )
203+ . await
204+ . with_context ( |_| ReadTrinoClusterSnafu {
205+ cluster : & cluster. name ,
206+ namespace : & cluster. namespace ,
207+ } ) ?;
195208
196209 if trino. is_none ( ) {
197210 TrinoClusterNotFoundSnafu {
@@ -219,7 +232,7 @@ impl StackableScaler {
219232 let cluster = self
220233 . clusters
221234 . get ( cluster)
222- . context ( ClusterNotFoundSnafu { cluster } ) ?;
235+ . with_context ( || ClusterNotFoundSnafu { cluster } ) ?;
223236
224237 let patch = serde_json:: json!( {
225238 "apiVersion" : "trino.stackable.tech/v1alpha1" ,
@@ -241,7 +254,7 @@ impl StackableScaler {
241254 . patch ( & cluster. name , & params, & patch)
242255 . instrument ( debug_span ! ( "Patching Trino cluster" ) )
243256 . await
244- . context ( PatchTrinoClusterSnafu {
257+ . with_context ( |_| PatchTrinoClusterSnafu {
245258 cluster : & cluster. name ,
246259 namespace : & cluster. namespace ,
247260 } ) ?;
@@ -266,14 +279,14 @@ impl ScalerTrait for StackableScaler {
266279 let cluster = self
267280 . clusters
268281 . get ( cluster)
269- . context ( ClusterNotFoundSnafu { cluster } ) ?;
282+ . with_context ( || ClusterNotFoundSnafu { cluster } ) ?;
270283
271284 let status = cluster
272285 . api
273286 . get_status ( & cluster. name )
274287 . instrument ( debug_span ! ( "Get Trino cluster status" ) )
275288 . await
276- . context ( GetTrinoClusterStatusSnafu {
289+ . with_context ( |_| GetTrinoClusterStatusSnafu {
277290 cluster : & cluster. name ,
278291 namespace : & cluster. namespace ,
279292 } ) ?;
@@ -286,47 +299,90 @@ impl ScalerTrait for StackableScaler {
286299 let conditions = status
287300 . data
288301 . get ( "status" )
289- . context ( StatusFieldMissingInTrinoClusterSnafu {
302+ . with_context ( || StatusFieldMissingInTrinoClusterSnafu {
290303 cluster : & cluster. name ,
291304 namespace : & cluster. namespace ,
292305 } ) ?
293306 . get ( "conditions" )
294- . context ( StatusConditionsFieldMissingInTrinoClusterSnafu {
307+ . with_context ( || StatusConditionsFieldMissingInTrinoClusterSnafu {
295308 cluster : & cluster. name ,
296309 namespace : & cluster. namespace ,
297310 } ) ?
298311 . as_array ( )
299- . context ( StatusConditionsFieldIsNotArraySnafu {
312+ . with_context ( || StatusConditionsFieldIsNotArraySnafu {
300313 cluster : & cluster. name ,
301314 namespace : & cluster. namespace ,
302315 } ) ?;
303316
304317 let available = conditions
305318 . iter ( )
306319 . find ( |c| c. get ( "type" ) == Some ( & Value :: String ( "Available" . to_string ( ) ) ) )
307- . context ( NoAvailableEntryInStatusConditionsListSnafu {
320+ . with_context ( || NoAvailableEntryInStatusConditionsListSnafu {
308321 cluster : & cluster. name ,
309322 namespace : & cluster. namespace ,
310323 } ) ?;
311324
312- let available = available. get ( "status" ) . context (
325+ let status = available. get ( "status" ) . with_context ( || {
313326 NoStatusInAvailableEntryInStatusConditionsListSnafu {
314327 cluster : & cluster. name ,
315328 namespace : & cluster. namespace ,
316- } ,
317- ) ?;
329+ }
330+ } ) ?;
318331
319- let available = match available {
320- Value :: String ( available ) if available == "True" => true ,
321- Value :: String ( available ) if available == "False" => false ,
332+ let is_available = match status {
333+ Value :: String ( status ) if status == "True" => true ,
334+ Value :: String ( status ) if status == "False" => false ,
322335 _ => StatusNotParsableInAvailableEntryInStatusConditionsListSnafu {
323336 cluster : & cluster. name ,
324337 namespace : & cluster. namespace ,
325338 }
326339 . fail ( ) ?,
327340 } ;
328341
329- Ok ( available)
342+ // Return early if the cluster is not available
343+ if !is_available {
344+ return Ok ( false ) ;
345+ }
346+
347+ // Careful investigation has shown that trino-lb can quickly react to TrinoClusters coming
348+ // available. When trying to immediately send queries to Trino we encountered errors such as:
349+ //
350+ // WARN trino_lb::http_server::v1::statement: Error while processing request
351+ // error=SendQueryToTrino { source: ContactTrinoPostQuery { source: reqwest::Error { kind: Request,
352+ // url: "https://trino-m-1-coordinator-default.default.svc.cluster.local:8443/v1/statement",
353+ // source: hyper_util::client::legacy::Error(Connect, ConnectError("dns error", Custom { kind: Uncategorized,
354+ // error: "failed to lookup address information: Name or service not known" })) } } }
355+ //
356+ // This is because the coordinator is ready but it might take some additional time for the
357+ // DNS record of the Service to propagate.
358+ // To prevent that we only consider TrinoClusters healthy if a minimum amount of seconds
359+ // passed after it was marked as healthy.
360+
361+ // It's valid for the lastTransitionTime to be not set, we assume the cluster is old in this
362+ // case
363+ if let Some ( last_transition_time) = available. get ( "lastTransitionTime" ) {
364+ let last_transition_time: Time = serde_json:: from_value ( last_transition_time. clone ( ) )
365+ . with_context ( |_| ParseLastTransitionTimeSnafu {
366+ last_transition_time : last_transition_time. clone ( ) ,
367+ cluster : & cluster. name ,
368+ namespace : & cluster. namespace ,
369+ } ) ?;
370+
371+ let seconds_since_last_transition = elapsed_seconds_since ( last_transition_time. 0 ) ;
372+ if seconds_since_last_transition < MIN_READY_SECONDS_SINCE_LAST_TRANSITION {
373+ tracing:: debug!(
374+ seconds_since_last_transition,
375+ min_ready_seconds_since_last_transition =
376+ MIN_READY_SECONDS_SINCE_LAST_TRANSITION ,
377+ "The trino cluster recently turned ready, not marking as ready yet"
378+ ) ;
379+
380+ return Ok ( false ) ;
381+ }
382+ }
383+
384+ // All checks succeeded, TrinoCluster is ready
385+ Ok ( true )
330386 }
331387
332388 #[ instrument( name = "StackableScaler::is_activated" , skip( self ) ) ]
@@ -358,3 +414,8 @@ impl ScalerTrait for StackableScaler {
358414 } ) ?)
359415 }
360416}
417+
418+ fn elapsed_seconds_since ( datetime : DateTime < Utc > ) -> i64 {
419+ let now = Utc :: now ( ) ;
420+ ( now - datetime) . num_seconds ( )
421+ }
0 commit comments