diff --git a/helm/java-generator-chart/templates/deployment.yaml b/helm/java-generator-chart/templates/deployment.yaml index 26f69f9..0412653 100644 --- a/helm/java-generator-chart/templates/deployment.yaml +++ b/helm/java-generator-chart/templates/deployment.yaml @@ -57,17 +57,11 @@ spec: - name: QUARKUS_OTEL_RESOURCE_ATTRIBUTES value: "service.name=sbomer-java-generator" - # Generator Logic - - name: SBOMER_GENERATOR_MAX_CONCURRENT - value: {{ .Values.config.maxConcurrent | quote }} - - name: SBOMER_GENERATOR_OOM_RETRIES - value: {{ .Values.config.oomRetries | quote }} - - name: SBOMER_GENERATOR_MEMORY_MULTIPLIER - value: {{ .Values.config.memoryMultiplier | quote }} - - name: SBOMER_GENERATOR_MAVEN_DEFAULT_MEMORY - value: {{ .Values.config.defaultMemory.maven | quote }} - - name: SBOMER_GENERATOR_DOMINO_DEFAULT_MEMORY - value: {{ .Values.config.defaultMemory.domino | quote }} + # Kueue Configuration + {{- if .Values.kueue.includeKueue }} + - name: SBOMER_GENERATOR_KUEUE_QUEUE_NAME + value: {{ .Values.kueue.localQueue.name | quote }} + {{- end }} # Tekton Task Identity diff --git a/helm/java-generator-chart/templates/kueue/clusterqueue.yaml b/helm/java-generator-chart/templates/kueue/clusterqueue.yaml new file mode 100644 index 0000000..646dc24 --- /dev/null +++ b/helm/java-generator-chart/templates/kueue/clusterqueue.yaml @@ -0,0 +1,25 @@ +{{- if .Values.kueue.includeKueue -}} +apiVersion: kueue.x-k8s.io/v1beta1 +kind: ClusterQueue +metadata: + name: {{ .Values.kueue.clusterQueue.name }} + labels: + {{- include "java-generator-chart.labels" . | nindent 4 }} +spec: + namespaceSelector: {} # Allow all namespaces + resourceGroups: + - coveredResources: ["cpu", "memory"] + flavors: + - name: {{ .Values.kueue.resourceFlavor.name }} + resources: + # CPU quota for all concurrent tasks + # Note: Kueue uses resource requests (not limits) for admission control + - name: "cpu" + nominalQuota: {{ .Values.kueue.clusterQueue.quotas.cpu | quote }} + # Memory quota for all concurrent tasks + # Note: Kueue uses resource requests (not limits) for admission control + - name: "memory" + nominalQuota: {{ .Values.kueue.clusterQueue.quotas.memory | quote }} + # Admission control configuration + queueingStrategy: {{ .Values.kueue.clusterQueue.queueingStrategy }} +{{- end }} diff --git a/helm/java-generator-chart/templates/kueue/localqueue.yaml b/helm/java-generator-chart/templates/kueue/localqueue.yaml new file mode 100644 index 0000000..da83adf --- /dev/null +++ b/helm/java-generator-chart/templates/kueue/localqueue.yaml @@ -0,0 +1,11 @@ +{{- if .Values.kueue.includeKueue -}} +apiVersion: kueue.x-k8s.io/v1beta1 +kind: LocalQueue +metadata: + name: {{ .Values.kueue.localQueue.name }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "java-generator-chart.labels" . | nindent 4 }} +spec: + clusterQueue: {{ .Values.kueue.clusterQueue.name }} +{{- end }} diff --git a/helm/java-generator-chart/templates/kueue/resourceflavor.yaml b/helm/java-generator-chart/templates/kueue/resourceflavor.yaml new file mode 100644 index 0000000..a4317bb --- /dev/null +++ b/helm/java-generator-chart/templates/kueue/resourceflavor.yaml @@ -0,0 +1,9 @@ +{{- if .Values.kueue.includeKueue -}} +apiVersion: kueue.x-k8s.io/v1beta1 +kind: ResourceFlavor +metadata: + name: {{ .Values.kueue.resourceFlavor.name }} + labels: + {{- include "java-generator-chart.labels" . | nindent 4 }} +spec: {} +{{- end }} \ No newline at end of file diff --git a/helm/java-generator-chart/templates/tekton/cdx-maven-plugin-generation-task.yaml b/helm/java-generator-chart/templates/tekton/cdx-maven-plugin-generation-task.yaml index 3d7d0b6..d9faf8e 100644 --- a/helm/java-generator-chart/templates/tekton/cdx-maven-plugin-generation-task.yaml +++ b/helm/java-generator-chart/templates/tekton/cdx-maven-plugin-generation-task.yaml @@ -40,11 +40,11 @@ spec: imagePullPolicy: {{ .Values.task.agent.pullPolicy }} resources: requests: - cpu: 500m - memory: 1000Mi + cpu: {{ .Values.task.resources.cdxMavenPlugin.requests.cpu }} + memory: {{ .Values.task.resources.cdxMavenPlugin.requests.memory }} limits: - cpu: 800m - memory: 2000Mi + cpu: {{ .Values.task.resources.cdxMavenPlugin.limits.cpu }} + memory: {{ .Values.task.resources.cdxMavenPlugin.limits.memory }} env: - name: TRACEPARENT value: "$(params.trace-parent)" diff --git a/helm/java-generator-chart/templates/tekton/domino-generation-task.yaml b/helm/java-generator-chart/templates/tekton/domino-generation-task.yaml index 41fdf6d..78914e6 100644 --- a/helm/java-generator-chart/templates/tekton/domino-generation-task.yaml +++ b/helm/java-generator-chart/templates/tekton/domino-generation-task.yaml @@ -34,11 +34,11 @@ spec: imagePullPolicy: {{ .Values.task.agent.pullPolicy }} resources: requests: - cpu: 500m - memory: 1000Mi + cpu: {{ .Values.task.resources.domino.requests.cpu }} + memory: {{ .Values.task.resources.domino.requests.memory }} limits: - cpu: 800m - memory: 2000Mi + cpu: {{ .Values.task.resources.domino.limits.cpu }} + memory: {{ .Values.task.resources.domino.limits.memory }} env: - name: TRACEPARENT value: "$(params.trace-parent)" diff --git a/helm/java-generator-chart/values.yaml b/helm/java-generator-chart/values.yaml index 8da086c..f94c542 100644 --- a/helm/java-generator-chart/values.yaml +++ b/helm/java-generator-chart/values.yaml @@ -43,6 +43,10 @@ rbac: - apiGroups: ["tekton.dev"] resources: ["taskruns", "pipelineruns", "tasks"] verbs: ["create", "get", "list", "watch", "update", "patch", "delete", "deletecollection"] + # Include Kueue for queue management + - apiGroups: ["kueue.x-k8s.io"] + resources: ["localqueues", "clusterqueues", "resourceflavors", "workloads"] + verbs: ["get", "list", "watch"] # --- APPLICATION CONFIG --- config: @@ -50,17 +54,9 @@ config: otel: protocol: grpc endpoint: "" # placeholder - maxConcurrent: 20 - oomRetries: 3 # Default internal cluster URL for storage storageUrl: "http://manifest-storage-service:8085" - # OOM Memory scaling configurations - memoryMultiplier: "1.5" - defaultMemory: - maven: "1Gi" - domino: "2Gi" - kafka: bootstrapServers: "kafka:9092" schemaRegistryUrl: "http://schema-registry:8080/apis/registry/v2" @@ -69,10 +65,38 @@ config: # Leave empty to let Quarkus auto-discover the K8s API (Standard K8s behavior) # The Super-Chart can override this if needed. kubernetesClientMasterUrl: "" - # Default to false (Secure). + # Default to false (Secure). # Can override this to "true" for local dev/minikube. trustCerts: "false" +# --- KUEUE CONFIG --- +# Kueue is mandatory for queue management +kueue: + includeKueue: true + # LocalQueue configuration (namespace-scoped) + localQueue: + name: "java-generator-queue" + + # ClusterQueue configuration (cluster-scoped) + clusterQueue: + name: "java-generator-cluster-queue" + queueingStrategy: BestEffortFIFO + quotas: + # Total CPU quota across all task types + # Calculated as: maxConcurrent * highest CPU request among task types + # Example: 20 pods * 500m (cdxMavenPlugin/domino request) = 10000m + cpu: "10000m" + # Total memory quota across all task types + # Calculated as: maxConcurrent * highest memory request among task types + # Example: 20 pods * 1000Mi = 20Gi + memory: "20Gi" + # Maximum number of concurrent TaskRuns (across all types) + pods: 20 + + # ResourceFlavor configuration + resourceFlavor: + name: "default-flavor" + # --- TEKTON TASK CONFIG --- task: name: @@ -86,6 +110,26 @@ task: # If tag is empty, the template will default to .Chart.AppVersion (The Git SHA) tag: "" pullPolicy: IfNotPresent + # Resource limits for TaskRun steps + # These values control CPU and memory allocation for generation tasks + # Adjust based on your workload requirements and cluster capacity + resources: + # CycloneDX Maven Plugin task resources + cdxMavenPlugin: + requests: + cpu: 500m + memory: 1000Mi + limits: + cpu: 800m + memory: 2000Mi + # Domino task resources (typically needs more memory) + domino: + requests: + cpu: 500m + memory: 1000Mi + limits: + cpu: 800m + memory: 2000Mi maven: configMapName: java-generator-maven-settings diff --git a/src/main/java/org/jboss/sbomer/java/generator/adapter/in/TaskReconciler.java b/src/main/java/org/jboss/sbomer/java/generator/adapter/in/TaskReconciler.java index 936313c..0c2ad44 100644 --- a/src/main/java/org/jboss/sbomer/java/generator/adapter/in/TaskReconciler.java +++ b/src/main/java/org/jboss/sbomer/java/generator/adapter/in/TaskReconciler.java @@ -8,7 +8,6 @@ import org.jboss.sbomer.java.generator.core.port.api.GenerationOrchestrator; import org.jboss.sbomer.java.generator.core.port.spi.FailureNotifier; import org.jboss.sbomer.java.generator.core.utility.FailureUtility; -import org.jboss.sbomer.java.generator.core.utility.TraceUtility; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -20,8 +19,7 @@ import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.inject.Inject; import lombok.extern.slf4j.Slf4j; @@ -38,39 +36,28 @@ public class TaskReconciler implements Reconciler { @Inject ObjectMapper objectMapper; - @Inject - Tracer tracer; - private static final String REASON_OOM_KILLED = "OOMKilled"; private static final String GENERATION_ID_LABEL = "sbomer.jboss.org/generation-id"; private static final String RESULT_NAME_SBOM_URL = "sbom-url"; - private static final String TRACEPARENT_ANNOTATION = "sbomer.jboss.org/traceparent"; + @WithSpan @Override public UpdateControl reconcile(TaskRun taskRun, Context context) { String taskName = taskRun.getMetadata().getName(); String generationId = taskRun.getMetadata().getLabels().get(GENERATION_ID_LABEL); - // Read trace context from TaskRun annotations - Map annotations = taskRun.getMetadata().getAnnotations(); - String traceParent = annotations != null ? annotations.get(TRACEPARENT_ANNOTATION) : null; - - // Extract status for span attributes and logging - String taskRunStatus = getConditionStatus(taskRun); + // Extract status for logging and tracing String taskRunReason = getConditionReason(taskRun); - // Create a child span under the original trace from the Kafka consumer - Span span = TraceUtility.childSpanBuilder(tracer, "TaskReconciler.reconcile", traceParent, generationId) - .setAttribute("taskrun.name", taskName != null ? taskName : "unknown") - .setAttribute("taskrun.status", taskRunStatus) - .setAttribute("taskrun.reason", taskRunReason) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - return doReconcile(taskRun, taskName, generationId, taskRunReason); - } finally { - span.end(); + // Add span attributes for observability + Span.current().setAttribute("taskrun.name", taskName != null ? taskName : "unknown"); + Span.current().setAttribute("taskrun.reason", taskRunReason); + if (generationId != null) { + Span.current().setAttribute("generation.id", generationId); } + + return doReconcile(taskRun, taskName, generationId, taskRunReason); } private UpdateControl doReconcile(TaskRun taskRun, String taskName, String generationId, String statusReason) { diff --git a/src/main/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutor.java b/src/main/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutor.java index b1eadf4..a4d03a5 100644 --- a/src/main/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutor.java +++ b/src/main/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutor.java @@ -10,7 +10,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.tekton.v1beta1.TaskRun; -import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -66,54 +65,8 @@ public void scheduleGeneration(GenerationTask generationTask) { } // Execute against the cluster + // Kueue manages the TaskRun lifecycle, including cleanup after completion kubernetesClient.resources(TaskRun.class).inNamespace(namespace).resource(taskRun).create(); } - @WithSpan - @Override - public void abortGeneration(@SpanAttribute("generation.id") String generationId) { - log.info("Aborting generation: {}", generationId); - kubernetesClient.resources(TaskRun.class) - .inNamespace(namespace) - .withLabel(LABEL_GENERATION_ID, generationId) - .delete(); - } - - // In this specific implementation, basically same logic as abortGeneration - @WithSpan - @Override - public void cleanupGeneration(@SpanAttribute("generation.id") String generationId) { - log.info("Cleaning up generation: {}", generationId); - kubernetesClient.resources(TaskRun.class) - .inNamespace(namespace) - .withLabel(LABEL_GENERATION_ID, generationId) - .delete(); - } - - @Override - public int countActiveExecutions() { - // Count TaskRuns for THIS generator that are NOT finished. - // This is the input for the Throttling logic. - return (int) kubernetesClient.resources(TaskRun.class).inNamespace(namespace) - .withLabel(LABEL_GENERATOR_TYPE, LABEL_GENERATOR_VALUE) - .list() - .getItems() - .stream() - .filter(tr -> !isFinished(tr)) - .count(); - } - - /** - * Helper to check Tekton Status Conditions - */ - private boolean isFinished(TaskRun taskRun) { - if (taskRun.getStatus() == null || taskRun.getStatus().getConditions() == null) { - return false; // No status means it's initializing/running - } - - // Check for "Succeeded" condition with Status "True" or "False" (False means failed, but it is still 'finished') - return taskRun.getStatus().getConditions().stream() - .anyMatch(c -> "Succeeded".equals(c.getType()) && - ("True".equals(c.getStatus()) || "False".equals(c.getStatus()))); - } } diff --git a/src/main/java/org/jboss/sbomer/java/generator/core/domain/model/GenerationTask.java b/src/main/java/org/jboss/sbomer/java/generator/core/domain/model/GenerationTask.java index ca8e090..d263369 100644 --- a/src/main/java/org/jboss/sbomer/java/generator/core/domain/model/GenerationTask.java +++ b/src/main/java/org/jboss/sbomer/java/generator/core/domain/model/GenerationTask.java @@ -5,19 +5,14 @@ import org.jboss.sbomer.events.common.GenerationRequestSpec; /** - * Internal domain model representing a unit of work waiting in the queue. + * Internal domain model representing a generation task. * It decouples the internal scheduling logic from the external Kafka event structure. */ public record GenerationTask( String generationId, GenerationRequestSpec spec, - int retryCount, // NOT max retries, the number of it retries it's currently on - String memoryOverride, // i.e. 2Gi Map generatorOptions, Map handlerProvidedOptions, String traceParent // W3C traceparent header (00---) ) { - public GenerationTask(String generationId, GenerationRequestSpec spec, Map generatorOptions, Map handlerProvidedOptions, String traceParent) { - this(generationId, spec, 0, null, generatorOptions, handlerProvidedOptions, traceParent); - } } diff --git a/src/main/java/org/jboss/sbomer/java/generator/core/port/spi/GenerationExecutor.java b/src/main/java/org/jboss/sbomer/java/generator/core/port/spi/GenerationExecutor.java index f521a26..57b7ad3 100644 --- a/src/main/java/org/jboss/sbomer/java/generator/core/port/spi/GenerationExecutor.java +++ b/src/main/java/org/jboss/sbomer/java/generator/core/port/spi/GenerationExecutor.java @@ -15,39 +15,10 @@ public interface GenerationExecutor { * Schedules the generation payload for execution. *

* In a Kubernetes/Tekton implementation, this creates the TaskRun resource. + * Kueue manages the TaskRun lifecycle, including cleanup after completion. *

* * @param generationTask The object carrying information about a generation task */ void scheduleGeneration(GenerationTask generationTask); - - /** - * Aborts resources associated with a specific generation. - *

- * Used for manual cancellation. - *

- * - * @param generationId The unique ID to identify the resources. - */ - void abortGeneration(String generationId); - - /** - * Cleans up resources associated with a specific generation. - *

- * Used for cleaning up the environment after the generation has ended. - *

- * - * @param generationId The unique ID to identify the resources. - */ - void cleanupGeneration(String generationId); - - /** - * Returns the number of currently active/running executions managed by this generator. - *

- * This is critical for the Core Domain's "Throttling" logic. - *

- * - * @return count of active jobs. - */ - int countActiveExecutions(); } diff --git a/src/main/java/org/jboss/sbomer/java/generator/core/service/GeneratorService.java b/src/main/java/org/jboss/sbomer/java/generator/core/service/GeneratorService.java index b7a3dd4..1502080 100644 --- a/src/main/java/org/jboss/sbomer/java/generator/core/service/GeneratorService.java +++ b/src/main/java/org/jboss/sbomer/java/generator/core/service/GeneratorService.java @@ -2,11 +2,7 @@ import java.util.List; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.sbomer.events.common.GenerationRequestSpec; import org.jboss.sbomer.java.generator.core.domain.GenerationStatus; import org.jboss.sbomer.java.generator.core.domain.model.GenerationTask; @@ -15,15 +11,9 @@ import org.jboss.sbomer.java.generator.core.port.spi.GenerationExecutor; import org.jboss.sbomer.java.generator.core.port.spi.StatusNotifier; import org.jboss.sbomer.java.generator.core.utility.FailureUtility; -import org.jboss.sbomer.java.generator.core.utility.TraceUtility; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; -import io.quarkus.scheduler.Scheduled; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import lombok.extern.slf4j.Slf4j; @@ -41,48 +31,30 @@ public class GeneratorService implements GenerationOrchestrator { @Inject FailureNotifier failureNotifier; - @Inject - Tracer tracer; - - @ConfigProperty(name = "sbomer.generator.max-concurrent", defaultValue = "20") - int maxConcurrent; - - // Config: How many times to retry OOM? - @ConfigProperty(name = "sbomer.generator.oom-retries", defaultValue = "3") - int maxOomRetries; - - // Config: Multiplier (e.g. 2.0 = double memory each time) - @ConfigProperty(name = "sbomer.generator.memory-multiplier", defaultValue = "1.5") - double memoryMultiplier; - - // Default memory to start multiplying from (if not defined in original request) - @ConfigProperty(name = "sbomer.generator.maven.default-memory", defaultValue = "1Gi") - String defaultMavenMemory; - - @ConfigProperty(name = "sbomer.generator.domino.default-memory", defaultValue = "2Gi") - String defaultDominoMemory; - - // In-memory buffer (FOR NOW - SHOULD LATER BE PERSISTENT) - private final Queue pendingQueue = new ConcurrentLinkedQueue<>(); - private final Map activeTasks = new ConcurrentHashMap<>(); - + @WithSpan @Override - public void acceptRequest(String generationId, GenerationRequestSpec request, Map generatorOptions, Map handlerProvidedOptions, String traceParent) { + public void acceptRequest(@SpanAttribute("generation.id") String generationId, GenerationRequestSpec request, Map generatorOptions, Map handlerProvidedOptions, String traceParent) { log.info("Accepted request for generation: {}", generationId); - // Determine the tool type from the options (default to maven if not specified) - String toolType = "maven"; - if (handlerProvidedOptions != null && handlerProvidedOptions.containsKey("type")) { - toolType = handlerProvidedOptions.get("type"); - } else if (generatorOptions != null && generatorOptions.containsKey("type")) { - toolType = generatorOptions.get("type"); - } + GenerationTask task = new GenerationTask(generationId, request, generatorOptions, handlerProvidedOptions, traceParent); - // Assign the correct starting memory baseline - String initialMemory = "domino".equalsIgnoreCase(toolType) ? defaultDominoMemory : defaultMavenMemory; + try { + // Delegate directly to executor - Kueue handles queuing and admission control + executor.scheduleGeneration(task); + + // Notify that the task has been queued + notifier.notifyStatus( + generationId, + GenerationStatus.GENERATING, + "Queued for execution", + null + ); - // Use the full constructor to set retryCount to 0 and inject the initialMemory - pendingQueue.add(new GenerationTask(generationId, request, 0, initialMemory, generatorOptions, handlerProvidedOptions, traceParent)); + } catch (Exception e) { + log.error("Failed to schedule generation {}", generationId, e); + notifier.notifyStatus(generationId, GenerationStatus.FAILED, e.getMessage(), null); + failureNotifier.notify(FailureUtility.buildFailureSpecFromException(e), generationId, null); + } } @WithSpan @@ -90,142 +62,10 @@ public void acceptRequest(String generationId, GenerationRequestSpec request, Ma public void handleUpdate(@SpanAttribute("generation.id") String generationId, GenerationStatus status, String reason, List resultUrls) { log.info("Handling update for generation {}: {}", generationId, status); - // If we hit OOM, we retry with more resources - if (status == GenerationStatus.FAILED && "OOMKilled".equals(reason)) { - executor.cleanupGeneration(generationId); - handleOomRetry(generationId); - return; // Stop here. Method will do its own notification if needed - } - // Notify the status (sbom-service will listen to this) notifier.notifyStatus(generationId, status, reason, resultUrls); - // If it was a running job that finished, trigger a cleanup - // via the executor (e.g. delete the TaskRun) - doCleanupIfFinished(generationId, status); - } - - @Scheduled(every = "{sbomer.generator.poll-interval:10s}") - public void processQueue() { - if (pendingQueue.isEmpty()) { - return; - } - - int activeCount = executor.countActiveExecutions(); - int slots = maxConcurrent - activeCount; - - if (slots <= 0) { - log.debug("Cluster at capacity ({}/{})", activeCount, maxConcurrent); - return; - } - - log.info("Cluster has capacity. Scheduling {} tasks...", slots); - - for (int i = 0; i < slots; i++) { - GenerationTask task = pendingQueue.poll(); - if (task == null) { - break; - } - - // Create child span under original Kafka consumer trace so outgoing - // Kafka messages (notifyStatus) carry trace context - Span span = TraceUtility.childSpanBuilder(tracer,"GeneratorService.processQueue", task.traceParent(), task.generationId()) - .setAttribute("target.image", task.spec().getTarget().getIdentifier()) - .setAttribute("retry.count", task.retryCount()) - .setAttribute("memory.override", task.memoryOverride()) - .startSpan(); - try (Scope ignored = span.makeCurrent()) { - try { - // Put into active tasks - activeTasks.put(task.generationId(), task); - - executor.scheduleGeneration(task); - - // Send an event out to declare it has started generating - notifier.notifyStatus( - task.generationId(), - GenerationStatus.GENERATING, - "Scheduled in execution environment", - null - ); - - } catch (Exception e) { - log.error("Failed to schedule generation {}", task.generationId(), e); - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - notifier.notifyStatus(task.generationId(), GenerationStatus.FAILED, e.getMessage(), null); - failureNotifier.notify(FailureUtility.buildFailureSpecFromException(e), task.generationId(), null); - doCleanupIfFinished(task.generationId(), GenerationStatus.FAILED); - } - } finally { - span.end(); - } - } - } - - private void handleOomRetry(String generationId) { - GenerationTask task = activeTasks.get(generationId); - if (task == null) { - log.warn("Cannot retry OOM for {}, task state lost.", generationId); - notifier.notifyStatus(generationId, GenerationStatus.FAILED, "OOMKilled (Retry failed - state lost)", null); - doCleanupIfFinished(generationId, GenerationStatus.FAILED); - return; - } - - if (task.retryCount() >= maxOomRetries) { - log.warn("Max OOM retries reached for {}. Giving up.", generationId); - // We fail the task, notify it failed, and do cleanup - GenerationStatus newStatus = GenerationStatus.FAILED; - notifier.notifyStatus(generationId, newStatus, "OOMKilled (Max retries exceeded)", null); - doCleanupIfFinished(generationId, newStatus); - return; - } - - // Calculate new memory - String currentMemory = task.memoryOverride(); - String newMemory = calculateNewMemory(currentMemory); - - log.info("Retrying {} due to OOM. Attempt {}/{}. Increasing memory: {} -> {}", - generationId, task.retryCount() + 1, maxOomRetries, currentMemory, newMemory); - - // Create new task with incremented count and new memory, preserving trace field - GenerationTask retryTask = new GenerationTask( - task.generationId(), - task.spec(), - task.retryCount() + 1, - newMemory, - task.generatorOptions(), - task.handlerProvidedOptions(), - task.traceParent() - ); - - // Update state and re-queue - activeTasks.put(generationId, retryTask); - pendingQueue.add(retryTask); - } - - private String calculateNewMemory(String current) { - // Simple parser assuming "Gi" or "Mi" - // For robust parsing, use Fabric8 Quantity class or regex - // Logic: 1Gi -> 2Gi - try { - // Quick hack for PoC: assume Gi - double val = Double.parseDouble(current.replace("Gi", "").replace("Mi", "")); - // If Mi, convert to Gi for simplicity or just multiply - if (current.endsWith("Mi")) val = val / 1024.0; - - double newVal = val * memoryMultiplier; - return (int)Math.ceil(newVal) + "Gi"; - } catch (Exception e) { - return "2Gi"; // Fallback - } - } - - private void doCleanupIfFinished(String generationId, GenerationStatus status) { - if (status == GenerationStatus.FINISHED || status == GenerationStatus.FAILED) { - activeTasks.remove(generationId); - executor.cleanupGeneration(generationId); - } + // Note: Kueue manages TaskRun lifecycle, including cleanup after completion } } diff --git a/src/main/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactory.java b/src/main/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactory.java index 4e34944..2cab934 100644 --- a/src/main/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactory.java +++ b/src/main/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactory.java @@ -13,7 +13,6 @@ import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder; import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource; -import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.tekton.v1beta1.*; import jakarta.enterprise.context.ApplicationScoped; @@ -35,7 +34,9 @@ public class TaskRunFactory { @ConfigProperty(name = "sbomer.generator.java.maven-settings-configmap", defaultValue = "java-generator-maven-settings") String mavenSettingsConfigMapName; - private static final String ANNOTATION_RETRY_COUNT = "sbomer.jboss.org/retry-count"; + @ConfigProperty(name = "sbomer.generator.kueue.queue-name", defaultValue = "java-generator-queue") + String kueueQueueName; + private static final String ANNOTATION_TRACEPARENT = "sbomer.jboss.org/traceparent"; public TaskRun createCdxMavenPluginTaskRun(GenerationTask generationTask) { @@ -99,26 +100,14 @@ private TaskRun createBaseTaskRun(GenerationTask generationTask, String taskName .build() )); - // 5. Memory Overrides - if (generationTask.memoryOverride() != null) { - specBuilder.addToStepOverrides(new TaskRunStepOverrideBuilder() - .withName("generate") - .withNewResources() - .withRequests(Map.of("memory", new Quantity(generationTask.memoryOverride()))) - .withLimits(Map.of("memory", new Quantity(generationTask.memoryOverride()))) - .endResources() - .build()); - } - - // 6. Build Final TaskRun - Map labels = Map.of( - LABEL_GENERATION_ID, generationId, - LABEL_GENERATOR_TYPE, LABEL_GENERATOR_VALUE, - "app.kubernetes.io/managed-by", "sbomer-java-generator" - ); + // 5. Build Final TaskRun + Map labels = new HashMap<>(); + labels.put(LABEL_GENERATION_ID, generationId); + labels.put(LABEL_GENERATOR_TYPE, LABEL_GENERATOR_VALUE); + labels.put("app.kubernetes.io/managed-by", "sbomer-java-generator"); + labels.put("kueue.x-k8s.io/queue-name", kueueQueueName); Map annotations = new HashMap<>(); - annotations.put(ANNOTATION_RETRY_COUNT, String.valueOf(generationTask.retryCount())); if (generationTask.traceParent() != null) { annotations.put(ANNOTATION_TRACEPARENT, generationTask.traceParent()); } diff --git a/src/main/java/org/jboss/sbomer/java/generator/core/utility/TraceUtility.java b/src/main/java/org/jboss/sbomer/java/generator/core/utility/TraceUtility.java deleted file mode 100644 index 5d05776..0000000 --- a/src/main/java/org/jboss/sbomer/java/generator/core/utility/TraceUtility.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.jboss.sbomer.java.generator.core.utility; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.TraceFlags; -import io.opentelemetry.api.trace.TraceState; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class TraceUtility { - - private TraceUtility() {} - - /** - * Returns a SpanBuilder configured as the child of given traceparent. - * Callers can add their own attributes before calling startSpan(). - * - * @param tracer the OTel tracer. - * @param spanName the span operation name. - * @param traceParent the W3C traceparent header. - * @param generationId the generation ID to set as a span attribute. - * @return a configured {@link SpanBuilder}. - */ - public static SpanBuilder childSpanBuilder(Tracer tracer, String spanName, String traceParent, String generationId) { - SpanBuilder spanBuilder = tracer.spanBuilder(spanName) - .setSpanKind(SpanKind.INTERNAL) - .setAttribute("generation.id", generationId != null ? generationId : "unknown"); - SpanContext parentContext = parseTraceParent(traceParent); - if (parentContext != null && parentContext.isValid()) { - spanBuilder.setParent(Context.root().with(Span.wrap(parentContext))); - } - return spanBuilder; - } - - /** - * Parses W3C traceparent header into SpanContext. - * Expected format: 00--- - * - * @param traceParent the traceparent header value. - * @return the parsed SpanContext, or null if input is absent or malformed. - */ - public static SpanContext parseTraceParent(String traceParent) { - if (traceParent == null || traceParent.isEmpty()) { - return null; - } - String[] parts = traceParent.split("-"); - if (parts.length != 4) { - log.warn("Invalid traceparent header format: {}", traceParent); - return null; - } - try { - return SpanContext.createFromRemoteParent( - parts[1], // traceId - parts[2], // spanId - TraceFlags.fromHex(parts[3], 0), // traceFlags - TraceState.getDefault() - ); - } catch (Exception e) { - log.warn("Failed to parse traceparent header '{}': {}", traceParent, e.getMessage()); - return null; - } - } -} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 21389c7..37a4aea 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -25,12 +25,6 @@ quarkus.operator-sdk.controllers.java-task-reconciler.selector=sbomer.jboss.org/ sbomer.generator.java.domino.task-name=generator-domino sbomer.generator.java.cdx-maven-plugin.task-name=generator-cdx-maven-plugin -sbomer.generator.max-concurrent=20 -sbomer.generator.oom-retries=3 -sbomer.generator.memory-multiplier=1.5 -sbomer.generator.maven.default-memory=1Gi -sbomer.generator.domino.default-memory=2Gi - sbomer.generator.java.maven-settings-configmap=java-generator-maven-settings sbomer.generator.java.gradle-init-configmap=java-generator-gradle-init @@ -38,6 +32,9 @@ sbomer.generator.java.gradle-init-configmap=java-generator-gradle-init # Default value (Overridden by env variable in Podman/Prod) sbomer.storage.url=http://localhost:8085 +# Kueue queue name for TaskRun scheduling +sbomer.generator.kueue.queue-name=java-generator-queue + #======================================= # KAFKA - GLOBAL PRODUCER CONFIG #======================================= diff --git a/src/test/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutorTest.java b/src/test/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutorTest.java index 721bf5a..9602a0f 100644 --- a/src/test/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutorTest.java +++ b/src/test/java/org/jboss/sbomer/java/generator/adapter/out/TektonGenerationExecutorTest.java @@ -5,7 +5,6 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.util.List; import java.util.Map; import org.jboss.sbomer.events.common.GenerationRequestSpec; @@ -27,7 +26,6 @@ import io.fabric8.tekton.v1beta1.TaskRun; import io.fabric8.tekton.v1beta1.TaskRunBuilder; import io.fabric8.tekton.v1beta1.TaskRunList; -import io.fabric8.tekton.v1beta1.TaskRunListBuilder; @ExtendWith(MockitoExtension.class) class TektonGenerationExecutorTest { @@ -62,7 +60,7 @@ void setUp() { @Test void testScheduleGeneration_WithDominoType() { - GenerationTask task = new GenerationTask("gen-1", mockRequest, 0, null, Map.of("type", DOMINO_GENERATOR_SUBTYPE), null, null); + GenerationTask task = new GenerationTask("gen-1", mockRequest, Map.of("type", DOMINO_GENERATOR_SUBTYPE), null, null); when(taskRunFactory.createDominoTaskRun(task)).thenReturn(dummyTaskRun); executor.scheduleGeneration(task); @@ -73,7 +71,7 @@ void testScheduleGeneration_WithDominoType() { @Test void testScheduleGeneration_WithMavenType() { - GenerationTask task = new GenerationTask("gen-2", mockRequest, 0, null, null, Map.of("type", CDX_MAVEN_PLUGIN_GENERATOR_SUBTYPE), null); + GenerationTask task = new GenerationTask("gen-2", mockRequest, null, Map.of("type", CDX_MAVEN_PLUGIN_GENERATOR_SUBTYPE), null); when(taskRunFactory.createCdxMavenPluginTaskRun(task)).thenReturn(dummyTaskRun); executor.scheduleGeneration(task); @@ -84,7 +82,7 @@ void testScheduleGeneration_WithMavenType() { @Test void testScheduleGeneration_DefaultsToMavenWhenEmpty() { - GenerationTask task = new GenerationTask("gen-3", mockRequest, 0, null, null, null, null); + GenerationTask task = new GenerationTask("gen-3", mockRequest, null, null, null); when(taskRunFactory.createCdxMavenPluginTaskRun(task)).thenReturn(dummyTaskRun); executor.scheduleGeneration(task); @@ -95,7 +93,7 @@ void testScheduleGeneration_DefaultsToMavenWhenEmpty() { @Test void testScheduleGeneration_ThrowsOnUnknownType() { - GenerationTask task = new GenerationTask("gen-invalid", mockRequest, 0, null, Map.of("type", "magic-generator"), null, null); + GenerationTask task = new GenerationTask("gen-invalid", mockRequest, Map.of("type", "magic-generator"), null, null); GenerationValidationException ex = assertThrows(GenerationValidationException.class, () -> { executor.scheduleGeneration(task); @@ -105,33 +103,4 @@ void testScheduleGeneration_ThrowsOnUnknownType() { verifyNoInteractions(taskRunFactory); } - @Test - void testCleanupGeneration_DeletesFromKubernetes() { - executor.cleanupGeneration("gen-cleanup"); - - // Verify the end of the chain was called - verify(labelClient).delete(); - } - - @Test - void testCountActiveExecutions_FiltersFinishedTasks() { - TaskRun runningTask = new TaskRunBuilder().withNewStatus().endStatus().build(); - String finishedJson = """ - { - "status": { - "conditions": [ { "type": "Succeeded", "status": "True" } ] - } - } - """; - TaskRun finishedTask = io.fabric8.kubernetes.client.utils.Serialization.unmarshal(finishedJson, TaskRun.class); - - var taskList = new TaskRunListBuilder().withItems(List.of(runningTask, finishedTask)).build(); - - // Wire the specific list call - when(labelClient.list()).thenReturn(taskList); - - int activeCount = executor.countActiveExecutions(); - - assertEquals(1, activeCount); - } -} \ No newline at end of file +} diff --git a/src/test/java/org/jboss/sbomer/java/generator/core/service/GeneratorServiceTest.java b/src/test/java/org/jboss/sbomer/java/generator/core/service/GeneratorServiceTest.java index c423869..0152b15 100644 --- a/src/test/java/org/jboss/sbomer/java/generator/core/service/GeneratorServiceTest.java +++ b/src/test/java/org/jboss/sbomer/java/generator/core/service/GeneratorServiceTest.java @@ -1,45 +1,26 @@ package org.jboss.sbomer.java.generator.core.service; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.util.List; -import java.util.Map; - import org.jboss.sbomer.events.common.GenerationRequestSpec; import org.jboss.sbomer.events.common.Target; import org.jboss.sbomer.java.generator.core.domain.GenerationStatus; -import org.jboss.sbomer.java.generator.core.domain.model.GenerationTask; import org.jboss.sbomer.java.generator.core.port.spi.FailureNotifier; import org.jboss.sbomer.java.generator.core.port.spi.GenerationExecutor; import org.jboss.sbomer.java.generator.core.port.spi.StatusNotifier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Answers; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Scope; - @ExtendWith(MockitoExtension.class) class GeneratorServiceTest { @Mock GenerationExecutor executor; @Mock StatusNotifier notifier; @Mock FailureNotifier failureNotifier; - @Mock Tracer tracer; - - @Mock(answer = Answers.RETURNS_SELF) - SpanBuilder spanBuilder; - - @Mock Span span; - @Mock Scope scope; GeneratorService service; GenerationRequestSpec mockRequest; @@ -50,19 +31,6 @@ void setUp() { service.executor = executor; service.notifier = notifier; service.failureNotifier = failureNotifier; - service.tracer = tracer; - - // Set default config properties - service.maxConcurrent = 2; - service.maxOomRetries = 3; - service.memoryMultiplier = 1.5; - service.defaultMavenMemory = "1Gi"; - service.defaultDominoMemory = "2Gi"; - - // OTel Mocks - No need to mock setAttribute, setParent, or setSpanKind anymore! - lenient().when(tracer.spanBuilder(anyString())).thenReturn(spanBuilder); - lenient().when(spanBuilder.startSpan()).thenReturn(span); - lenient().when(span.makeCurrent()).thenReturn(scope); // Dummy request spec mockRequest = new GenerationRequestSpec(); @@ -73,85 +41,51 @@ void setUp() { } @Test - void testAcceptRequest_AssignsCorrectDefaultMemory() { - // Accept Maven request + void testAcceptRequest_SchedulesImmediately() { + // Accept request service.acceptRequest("gen-1", mockRequest, null, null, "trace-1"); - // Accept Domino request - service.acceptRequest("gen-2", mockRequest, Map.of("type", "domino"), null, "trace-2"); - - // Schedule them to verify their state - when(executor.countActiveExecutions()).thenReturn(0); - service.processQueue(); - - ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(GenerationTask.class); - verify(executor, times(2)).scheduleGeneration(taskCaptor.capture()); - - List tasks = taskCaptor.getAllValues(); - assertEquals("gen-1", tasks.get(0).generationId()); - assertEquals("1Gi", tasks.get(0).memoryOverride(), "Maven should default to 1Gi"); - - assertEquals("gen-2", tasks.get(1).generationId()); - assertEquals("2Gi", tasks.get(1).memoryOverride(), "Domino should default to 2Gi"); - } - - @Test - void testHandleUpdate_OomKilled_TriggersRetry() { - // Put task in active state - service.acceptRequest("gen-oom", mockRequest, null, null, null); - service.processQueue(); - - // Trigger OOM Update - service.handleUpdate("gen-oom", GenerationStatus.FAILED, "OOMKilled", null); - - // Verify old task is cleaned up - verify(executor).cleanupGeneration("gen-oom"); - - // Ensure status was NOT notified as failed yet - verify(notifier, never()).notifyStatus(eq("gen-oom"), eq(GenerationStatus.FAILED), anyString(), any()); - - // Process queue to run the retry - when(executor.countActiveExecutions()).thenReturn(1); // One is still running from the retry - service.processQueue(); - - ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(GenerationTask.class); - verify(executor, times(2)).scheduleGeneration(taskCaptor.capture()); - - GenerationTask retryTask = taskCaptor.getAllValues().get(1); - assertEquals(1, retryTask.retryCount()); - assertEquals("2Gi", retryTask.memoryOverride(), "Memory should have scaled from 1Gi to 2Gi (ceil of 1.5)"); + // Verify executor was called immediately + verify(executor, times(1)).scheduleGeneration(any()); + + // Verify status notification was sent + verify(notifier, times(1)).notifyStatus( + eq("gen-1"), + eq(GenerationStatus.GENERATING), + eq("Queued for execution"), + isNull() + ); } @Test - void testHandleUpdate_OomKilled_MaxRetriesReached() { - // Setup task with max retries already hit - GenerationTask exhaustedTask = new GenerationTask("gen-max", mockRequest, 3, "4Gi", null, null, null); - service.acceptRequest("gen-max", mockRequest, null, null, null); - service.processQueue(); - - // Force the active task state - service.handleUpdate("gen-max", GenerationStatus.FAILED, "OOMKilled", null); // attempt 1 - service.handleUpdate("gen-max", GenerationStatus.FAILED, "OOMKilled", null); // attempt 2 - service.handleUpdate("gen-max", GenerationStatus.FAILED, "OOMKilled", null); // attempt 3 (Max) - - // The 4th OOM should give up - service.handleUpdate("gen-max", GenerationStatus.FAILED, "OOMKilled", null); - - verify(notifier).notifyStatus(eq("gen-max"), eq(GenerationStatus.FAILED), contains("Max retries exceeded"), any()); + void testAcceptRequest_HandlesExecutorFailure() { + // Make executor throw exception + doThrow(new RuntimeException("Executor failed")).when(executor).scheduleGeneration(any()); + + // Accept request + service.acceptRequest("gen-fail", mockRequest, null, null, null); + + // Verify failure was notified + verify(notifier, times(1)).notifyStatus( + eq("gen-fail"), + eq(GenerationStatus.FAILED), + eq("Executor failed"), + isNull() + ); + verify(failureNotifier, times(1)).notify(any(), eq("gen-fail"), isNull()); } @Test - void testProcessQueue_RespectsMaxConcurrent() { - service.acceptRequest("gen-1", mockRequest, null, null, null); - service.acceptRequest("gen-2", mockRequest, null, null, null); - service.acceptRequest("gen-3", mockRequest, null, null, null); - - // Tell service there is only 1 slot left (max is 2) - when(executor.countActiveExecutions()).thenReturn(1); - - service.processQueue(); - - // Should only schedule 1 task - verify(executor, times(1)).scheduleGeneration(any()); + void testHandleUpdate_ForwardsStatusNotification() { + // Test that handleUpdate simply forwards the status notification + // Kueue manages TaskRun lifecycle, so no cleanup or conditional logic needed + service.handleUpdate("gen-123", GenerationStatus.FINISHED, "Completed", null); + + verify(notifier, times(1)).notifyStatus( + eq("gen-123"), + eq(GenerationStatus.FINISHED), + eq("Completed"), + isNull() + ); } } \ No newline at end of file diff --git a/src/test/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactoryTest.java b/src/test/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactoryTest.java index 43ed03d..eb2d6b5 100644 --- a/src/test/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactoryTest.java +++ b/src/test/java/org/jboss/sbomer/java/generator/core/service/TaskRunFactoryTest.java @@ -35,22 +35,17 @@ void setUp() { @Test void testCreateDominoTaskRun_Basic() { - GenerationTask task = new GenerationTask("gen-123456789", mockRequest, 0, "4Gi", null, null, "trace-abc"); + GenerationTask task = new GenerationTask("gen-123456789", mockRequest, null, null, "trace-abc"); TaskRun run = factory.createDominoTaskRun(task); // Assert Metadata assertTrue(run.getMetadata().getGenerateName().startsWith("java-domino-gen-gen-1234")); assertEquals("gen-123456789", run.getMetadata().getLabels().get("sbomer.jboss.org/generation-id")); - assertEquals("0", run.getMetadata().getAnnotations().get("sbomer.jboss.org/retry-count")); // Assert Spec Ref assertEquals("generator-domino", run.getSpec().getTaskRef().getName()); - // Assert Memory Overrides using Fabric8's Quantity object - io.fabric8.kubernetes.api.model.Quantity expectedMemory = new io.fabric8.kubernetes.api.model.Quantity("4Gi"); - assertEquals(expectedMemory, run.getSpec().getStepOverrides().get(0).getResources().getLimits().get("memory")); - // Assert Params Map params = extractParams(run); assertEquals("gen-123456789", params.get("generation-id")); @@ -62,7 +57,7 @@ void testCreateTaskRun_OptionMergingAndGitRev() { Map generatorOptions = Map.of("args", "--gen-arg", "java-version", "11"); Map handlerOptions = Map.of("args", "--handler-arg", "git-rev", "v1.0.0"); - GenerationTask task = new GenerationTask("gen-opts", mockRequest, 0, null, generatorOptions, handlerOptions, null); + GenerationTask task = new GenerationTask("gen-opts", mockRequest, generatorOptions, handlerOptions, null); TaskRun run = factory.createCdxMavenPluginTaskRun(task); Map params = extractParams(run); @@ -83,7 +78,7 @@ void testArchiveUrl_DoesNotAppendGitRev() { target.setIdentifier("https://example.com/source.zip"); mockRequest.setTarget(target); - GenerationTask task = new GenerationTask("gen-arc", mockRequest, 0, null, null, Map.of("git-rev", "main"), null); + GenerationTask task = new GenerationTask("gen-arc", mockRequest, null, Map.of("git-rev", "main"), null); TaskRun run = factory.createDominoTaskRun(task); Map params = extractParams(run);