Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""
The Job Agent class instantiates a CE that acts as a client to a
compute resource and also to the WMS.
The Job Agent constructs a classAd based on the local resource description in the CS
and the current resource status that is used for matching.
The Job Agent class instantiates a CE that acts as a client to a
compute resource and also to the WMS.
The Job Agent constructs a classAd based on the local resource description in the CS
and the current resource status that is used for matching.
"""

import os
import re
import sys
Expand Down Expand Up @@ -34,6 +35,7 @@
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import RESCHEDULED
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper


Expand Down Expand Up @@ -712,9 +714,9 @@ def _checkSubmittedJobs(self):
self._rescheduleFailedJob(jobID, result["Message"])
self.hostFailureCount += 1

# The payload failed (if result["Value"] is not 0)
elif result["Value"]:
# In order to avoid overriding perfectly valid states, the status is updated iff the job was running
# The payload failed (if result["Value"] is not 0 and the job was not rescheduled)
elif result["Value"] and result["Value"] != RESCHEDULED:
# In order to avoid overriding perfectly valid states, the status is updated if the job was running
res = JobMonitoringClient().getJobsStatus(jobID)
if not res["OK"]:
return res
Expand Down
8 changes: 8 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@

CHILD_PID_POLL_INTERVALS = list(range(5, 40, 5))

SUBMISSION_FAILED = -1
SUBMISSION_REPORT_FAILED = -2
JOBWRAPPER_EXCEPTION = -3
INITIALIZATION_FAILED = 1
PAYLOAD_FAILED = 2
FINALIZATION_FAILED = 3
RESCHEDULED = 4


class JobWrapper:
"""The only user of the JobWrapper is the JobWrapperTemplate"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#!/usr/bin/env python
""" This template will become the job wrapper that's actually executed.
"""This template will become the job wrapper that's actually executed.

The JobWrapperTemplate is completed and invoked by the jobAgent and uses functionalities from JobWrapper module.
It has to be an executable.
The JobWrapperTemplate is completed and invoked by the jobAgent and uses functionalities from JobWrapper module.
It has to be an executable.

The JobWrapperTemplate will reschedule the job according to certain criteria:
- the working directory could not be created
- the jobWrapper initialization phase failed
- the inputSandbox download failed
- the resolution of the inpt data failed
- the JobWrapper ended with the status DErrno.EWMSRESC
The JobWrapperTemplate will reschedule the job according to certain criteria:
- the working directory could not be created
- the jobWrapper initialization phase failed
- the inputSandbox download failed
- the resolution of the inpt data failed
"""

import json
import os
import sys
Expand All @@ -24,6 +24,8 @@
Script.parseCommandLine()

from DIRAC import gLogger

from DIRAC.WorkloadManagementSystem.JobWrapper import JobWrapper as JW
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import (
createAndEnterWorkingDirectory,
Expand Down Expand Up @@ -52,7 +54,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
if "InputSandbox" in arguments["Job"]:
jobReport.commit()
if not transferInputSandbox(job, arguments["Job"]["InputSandbox"]):
return 1
return JW.INITIALIZATION_FAILED
else:
gLogger.verbose("Job has no InputSandbox requirement")

Expand All @@ -61,7 +63,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
if "InputData" in arguments["Job"]:
if arguments["Job"]["InputData"]:
if not resolveInputData(job):
return 1
return JW.INITIALIZATION_FAILED
else:
gLogger.verbose("Job has a null InputData requirement:")
gLogger.verbose(arguments)
Expand All @@ -71,7 +73,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
jobReport.commit()

if not executePayload(job):
return 1
return JW.INITIALIZATION_FAILED

if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]:
if not processJobOutputs(job):
Expand All @@ -85,7 +87,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
##########################################################


ret = -3
ret = JW.JOBWRAPPER_EXCEPTION
try:
jsonFileName = os.path.realpath(__file__) + ".json"
with open(jsonFileName) as f:
Expand All @@ -105,9 +107,9 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
gLogger.exception("JobWrapperTemplate exception")
try:
jobReport.commit()
ret = -1
ret = JW.SUBMISSION_FAILED
except Exception: # pylint: disable=broad-except
gLogger.exception("Could not commit the job report")
ret = -2
ret = JW.SUBMISSION_REPORT_FAILED

sys.exit(ret)
Loading