Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,54 @@ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAg
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.locks.Lock
import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.util.Using

object Utils extends LazyLogging {

/**
* Gets the real path of the amber home directory by:
* 1): check if the current directory is texera/amber
* 1): checking whether the current directory is `texera/amber`
* if it's not then:
* 2): search the siblings and children to find the texera home path
* 2): searching siblings and children for an amber home path, preferring matches under the
* current working directory before falling back to the first discovered match
*
* @return the real absolute path to amber home directory
*/
lazy val amberHomePath: Path = {
val currentWorkingDirectory = Paths.get(".").toRealPath()
// check if the current directory is the amber home path
if (isAmberHomePath(currentWorkingDirectory)) {
currentWorkingDirectory
resolveAmberHomePath(Paths.get(".").toRealPath())
}

private[common] def resolveAmberHomePath(currentWorkingDirectory: Path): Path = {
val realCurrentWorkingDirectory = currentWorkingDirectory.toRealPath()

if (isAmberHomePath(realCurrentWorkingDirectory)) {
realCurrentWorkingDirectory
} else {
// from current path's parent directory, search its children to find amber home path
// current max depth is set to 2 (current path's siblings and direct children)
val searchChildren = Files
.walk(currentWorkingDirectory.getParent, 2)
.filter((path: Path) => isAmberHomePath(path))
.findAny
if (searchChildren.isPresent) {
searchChildren.get
} else {
val parent = Option(realCurrentWorkingDirectory.getParent).getOrElse {
throw new RuntimeException(
"Finding texera home path failed. Current working directory is " + currentWorkingDirectory
s"Cannot search for amber home from filesystem root: $realCurrentWorkingDirectory"
)
}

val amberCandidates =
Using.resource(Files.walk(parent, 2)) { stream =>
stream.iterator().asScala.flatMap(normalizeAmberHomePath).toVector
}

// Sort candidates to avoid dependence on Files.walk traversal order.
val amberCandidatesSorted = amberCandidates.sortBy(_.toString)

// Preserve the current behavior by preferring an amber directory discovered under the CWD.
amberCandidatesSorted
.filter(_.startsWith(realCurrentWorkingDirectory))
.maxByOption(_.getNameCount)
.orElse(amberCandidatesSorted.headOption)
.getOrElse {
throw new RuntimeException(
s"Finding amber home path failed. Current working directory is $realCurrentWorkingDirectory"
)
}
}
}
val AMBER_HOME_FOLDER_NAME = "amber";
Expand Down Expand Up @@ -87,7 +105,12 @@ object Utils extends LazyLogging {
}

private def isAmberHomePath(path: Path): Boolean = {
path.toRealPath().endsWith(AMBER_HOME_FOLDER_NAME)
normalizeAmberHomePath(path).nonEmpty
}

private def normalizeAmberHomePath(path: Path): Option[Path] = {
val realPath = path.toRealPath()
Option.when(realPath.endsWith(AMBER_HOME_FOLDER_NAME))(realPath)
}

def aggregatedStateToString(state: WorkflowAggregatedState): String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.engine.common

import org.scalatest.flatspec.AnyFlatSpec

import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{FileVisitResult, FileVisitor, FileSystems, Files, Path}

class UtilsSpec extends AnyFlatSpec {

"resolveAmberHomePath" should "prefer an amber directory under the current working directory" in
withTempDirectory { tempDirectory =>
val preferredRepo = Files.createDirectory(tempDirectory.resolve("preferred-repo"))
val siblingRepo = Files.createDirectory(tempDirectory.resolve("sibling-repo"))
val preferredAmber = Files.createDirectories(preferredRepo.resolve("amber"))
Files.createDirectories(siblingRepo.resolve("amber"))

Comment on lines +32 to +35
assert(Utils.resolveAmberHomePath(preferredRepo) == preferredAmber.toRealPath())
}

it should "fall back to a sibling amber directory when the current working directory has none" in
withTempDirectory { tempDirectory =>
val repoRoot = Files.createDirectory(tempDirectory.resolve("repo-root"))
val moduleDirectory = Files.createDirectory(repoRoot.resolve("module"))
val amberDirectory = Files.createDirectories(repoRoot.resolve("amber"))

assert(Utils.resolveAmberHomePath(moduleDirectory) == amberDirectory.toRealPath())
}

it should "use amber-specific wording when searching from a filesystem root" in {
val filesystemRoot = FileSystems.getDefault.getRootDirectories.iterator().next().toRealPath()
val exception = intercept[RuntimeException] {
Utils.resolveAmberHomePath(filesystemRoot)
}

assert(exception.getMessage.contains("amber home"))
}

private def withTempDirectory(test: Path => Any): Unit = {
val tempDirectory = Files.createTempDirectory("utils-spec-")
try {
test(tempDirectory.toRealPath())
} finally {
deleteRecursively(tempDirectory)
}
}

private def deleteRecursively(path: Path): Unit = {
if (!Files.exists(path)) {
return
}

Files.walkFileTree(
path,
new FileVisitor[Path] {
override def preVisitDirectory(
directory: Path,
attributes: BasicFileAttributes
): FileVisitResult = FileVisitResult.CONTINUE

override def visitFile(file: Path, attributes: BasicFileAttributes): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def visitFileFailed(file: Path, exception: java.io.IOException): FileVisitResult =
throw exception

override def postVisitDirectory(
directory: Path,
exception: java.io.IOException
): FileVisitResult = {
Option(exception).foreach(throw _)
Files.delete(directory)
FileVisitResult.CONTINUE
}
}
)
}
}
Loading