diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala index 8749561300e..2b10ecd1847 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala @@ -19,7 +19,7 @@ object PackageUtils { val PackageName = s"synapseml_$ScalaVersionSuffix" val PackageMavenCoordinate = s"$PackageGroup:$PackageName:${BuildInfo.version}" // Use a fixed version for local testing - // val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.9" + // val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.10" private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1" val PackageRepository: String = SparkMLRepository diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/core/test/base/TestBase.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/core/test/base/TestBase.scala index a90f1ea81eb..7464287d6bd 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/core/test/base/TestBase.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/core/test/base/TestBase.scala @@ -203,14 +203,25 @@ abstract class TestBase extends AnyFunSuite with BeforeAndAfterEachTestData with try { return block() //scalastyle:ignore return } catch { - case e: Exception if (i + 1) < times.length => + case e: Exception => println(s"RETRYING after $t ms: Caught error: $e ") blocking { Thread.sleep(t.toLong) } } } - throw new RuntimeException("This error should not occur, bug has been introduced in tryWithRetries") + block() + } + + def retryUntil(completed: () => Boolean, retryDelaySchedule: Seq[Int] = Seq(0, 100, 500, 1000, 3000, 5000)): Boolean = { + val schedule = 0 +: retryDelaySchedule // Try with no delay before delays + (schedule).find { delay => + if (delay > 0) blocking(Thread.sleep(delay.toLong)) + completed() || { print("."); false } + } match { + case Some(_) => println("Succeeded"); true + case _ => println("Failed"); false + } } def withoutLogging[T](e: => T): T = { diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksRapidsTests.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksRapidsTests.scala index b549a153cf7..fc5405ff213 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksRapidsTests.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksRapidsTests.scala @@ -3,8 +3,6 @@ package com.microsoft.azure.synapse.ml.nbtest -import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities._ - import com.microsoft.azure.synapse.ml.build.BuildInfo import com.microsoft.azure.synapse.ml.core.env.FileUtilities import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities._ diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala index 650de52407f..113ebc43465 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala @@ -226,7 +226,9 @@ object DatabricksUtilities { | "init_scripts": $initScripts |} """.stripMargin - databricksPost("clusters/create", body).select[String]("cluster_id") + val cluster_id = databricksPost("clusters/create", body).select[String]("cluster_id") + println(s"Created cluster with Id $cluster_id, name $clusterName") + cluster_id } def installLibraries(clusterId: String, libraries: String): Unit = { @@ -272,7 +274,7 @@ object DatabricksUtilities { val body = s""" |{ - | "run_name": "test1", + | "run_name": "${clusterId}-${notebookPath.split("/").last.replace(" ", "_").replace(".ipynb", "")}", | "existing_cluster_id": "$clusterId", | "timeout_seconds": ${TimeoutInMillis / 1000}, | "notebook_task": { @@ -436,18 +438,16 @@ abstract class DatabricksTestHelper extends TestBase { notebooks: Seq[File], maxConcurrency: Int = 8): Unit = { + implicit val retryDelaySchedule = Seq.fill(60 * 20)(1000).toArray + println("Checking if cluster is active") - tryWithRetries(Seq.fill(60 * 20)(1000).toArray) { () => - assert(isClusterActive(clusterId)) - } + assert(retryUntil(() => isClusterActive(clusterId))) Thread.sleep(1000) // Ensure cluster is not overwhelmed + println("Installing libraries") installLibraries(clusterId, libraries) - tryWithRetries(Seq.fill(60 * 6)(1000).toArray) { () => - assert(areLibrariesInstalled(clusterId)) - } - + assert(retryUntil(() => areLibrariesInstalled(clusterId))) assert(notebooks.nonEmpty) val executorService = Executors.newFixedThreadPool(maxConcurrency)