Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 140 additions & 13 deletions src/queens/drivers/jobscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#
"""Driver to run a jobscript."""


import logging
from collections.abc import Callable
from dataclasses import dataclass
Expand All @@ -27,7 +26,7 @@
from queens.utils.injector import inject, inject_in_template
from queens.utils.io import read_file
from queens.utils.logger_settings import log_init_args
from queens.utils.metadata import SimulationMetadata
from queens.utils.metadata import SimulationMetadata, get_metadata_from_job_dir, get_metadata_path
from queens.utils.path import create_folder_if_not_existent
from queens.utils.run_subprocess import run_subprocess

Expand Down Expand Up @@ -85,7 +84,9 @@ class Jobscript(Driver):
jobscript_options (dict): Dictionary containing jobscript options.
jobscript_file_name (str): Jobscript file name (default: 'jobscript.sh').
raise_error_on_jobscript_failure (bool): Whether to raise an error for a non-zero jobscript
exit code.
exit code.
reuse_existing_jobs (bool, opt): Whether to reuse existing jobs if the input parameters are
the same and the previous jobscript ran successfully.
"""

@log_init_args
Expand All @@ -101,6 +102,7 @@ def __init__(
jobscript_file_name="jobscript.sh",
extra_options=None,
raise_error_on_jobscript_failure=True,
reuse_existing_jobs=True,
):
"""Initialize Jobscript object.

Expand All @@ -118,6 +120,8 @@ def __init__(
extra_options (dict, opt): Extra options to inject into jobscript template.
raise_error_on_jobscript_failure (bool, opt): Whether to raise an error for a non-zero
jobscript exit code.
reuse_existing_jobs (bool, opt): Whether to reuse existing jobs if the input parameters
are the same and the previous jobscript ran successfully.
"""
super().__init__(parameters=parameters, files_to_copy=files_to_copy)
self.input_templates = self.create_input_templates_dict(input_templates)
Expand All @@ -133,6 +137,7 @@ def __init__(
self.jobscript_options["executable"] = executable
self.jobscript_file_name = jobscript_file_name
self.raise_error_on_jobscript_failure = raise_error_on_jobscript_failure
self.reuse_existing_jobs = reuse_existing_jobs

@staticmethod
def create_input_templates_dict(input_templates):
Expand Down Expand Up @@ -202,15 +207,91 @@ def run(
) -> dict:
"""Run the driver.

Either reuse existing results or run the jobscript.

Args:
sample (np.array): Input sample.
job_id (int): Job ID.
num_procs (int): Number of processors.
experiment_dir (Path): Path to QUEENS experiment directory.
experiment_name (str): Name of QUEENS experiment.
sample: Input sample.
job_id: Job ID.
num_procs: Number of processors.
experiment_dir: Path to QUEENS experiment directory.
experiment_name: Name of QUEENS experiment.

Returns:
Results.
"""
job_dir = self.get_job_dir(job_id, experiment_dir)

if self.reuse_existing_jobs and self.metadata_exists(job_dir):
existing_metadata = get_metadata_from_job_dir(job_dir)
if not self.equal_inputs(existing_metadata, sample):
raise RuntimeError("Input parameters differ from existing job metadata. Abort...")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

Suggested change
raise RuntimeError("Input parameters differ from existing job metadata. Abort...")
raise RuntimeError("Input parameters differ from existing job metadata.")


if self.successful_jobscript_run(existing_metadata):
return self.get_existing_results(job_dir)

return self.run_jobscript(sample, job_id, num_procs, experiment_dir, experiment_name)

@staticmethod
def metadata_exists(job_dir: Path) -> bool:
"""Check if metadata file exists in job directory.

Args:
job_dir: Path to job directory.

Returns:
True if metadata file exists, False otherwise.
"""
metadata_path = get_metadata_path(job_dir)
return metadata_path.is_file()

@staticmethod
def successful_jobscript_run(existing_metadata: dict) -> bool:
"""Check if the jobscript run was successful.

Args:
existing_metadata: Metadata from existing job.

Returns:
True if the jobscript run was successful, False otherwise.
"""
jobscript_status = (
existing_metadata.get("times", {}).get("run_jobscript", {}).get("status", "")
)
return jobscript_status == "successful"
Comment on lines +248 to +260

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be safer to introduce a top level boolean flag with "job_successful". A user might have more than one timed tracked subjob.


def equal_inputs(self, existing_metadata: dict, new_inputs: np.ndarray) -> bool:
"""Check if the input parameters are equal.

Args:
existing_metadata: Metadata from existing job.
new_inputs: New input parameters.

Returns:
True if the input parameters are equal, False otherwise.
"""
existing_inputs = existing_metadata.get("inputs", {})
new_inputs_dict = self.parameters.sample_as_dict(new_inputs)
return existing_inputs == new_inputs_dict
Comment on lines +272 to +274

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check might lead to false results. Keep in mind that the types of new_inputs and existing_inputs might be different, as the latter ones were read from a YAML file. Or, for example, in the case of a numpy array, where the comparison returns an array with elementwise comparison. Possible solutions with different consequences:

  • Use pickle files for the metadata
  • Hash the inputs and export the hash to verify they changed
  • Round-trip of the new inputs, mimics them being read from YAML


def run_jobscript(
self,
sample: np.ndarray,
job_id: int,
num_procs: int,
experiment_dir: Path,
experiment_name: str,
) -> dict:
"""Run the jobscript.

Args:
sample: Input sample.
job_id: Job ID.
num_procs: Number of processors.
experiment_dir: Path to QUEENS experiment directory.
experiment_name: Name of QUEENS experiment.

Returns:
Result and potentially the gradient.
Jobscript results.
"""
job_dir, output_dir, output_file, input_files, log_file = self._manage_paths(
job_id, experiment_dir
Expand Down Expand Up @@ -250,12 +331,59 @@ def run(
execute_cmd = f"bash {jobscript_file} >{log_file} 2>&1"
self._run_executable(job_id, execute_cmd)

with metadata.time_code("data_processing"):
with metadata.time_code("process_data"):
results = self._get_results(output_dir)
metadata.outputs = results

return results

def get_existing_results(self, job_dir: Path) -> dict:
"""Get existing results from a previous driver run.

Args:
job_dir: Path to job directory.

Returns:
Results.
"""
metadata = SimulationMetadata.init_from_file(job_dir)
output_dir = self.get_output_dir(job_dir)

with metadata.time_code("process_data_again"):
results = self._get_results(output_dir)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to

Note that the data processor is called again, even if the previous jobscript results are reused. Assuming that data processing should typically be fast, this allows the user to adapt the data processor without rerunning the most expensive part of the QUEENS run, which is the simulation executed via the jobscript.

When you call this function, you rerun the data processors. This, depending on the data processor can be:

  • expensive, for example, special integration over the domain
  • not possible for example, since the original data was deleted to reduce storage usage
  • only makes sense for offline analyses

I think the existing outputs should be returned from the metadata file (yes there are some other issues to be fixed) without rerunning anything, i.e. returning the same output. If the goal is to rerun a new dataprocessor, I think this functionality should be done differently to avoid confusion. Btw, I'm not arguing against this use case, I hacked it myself a lot 😅 but it is limited to offline studies. An iterative iterator can not use a different dataprocessor, as this would change the next inputs for the next driver calls and hence require a full rerun.

metadata.outputs = results

return results

@staticmethod
def get_job_dir(job_id: int, experiment_dir: Path) -> Path:
"""Get job directory path.

Args:
job_id: Job ID.
experiment_dir: Path to QUEENS experiment directory.

Returns:
Path to job directory.
"""
job_dir = experiment_dir / str(job_id)
return job_dir

@staticmethod
def get_output_dir(job_dir: Path, output_folder_name="output") -> Path:
"""Get output directory path.

Args:
job_dir: Job directory path.
output_folder_name: Name of output folder.

Returns:
Path to output directory.
"""
output_dir = job_dir / output_folder_name
create_folder_if_not_existent(output_dir)
return output_dir

def _manage_paths(
self, job_id, experiment_dir, output_folder_name="output", output_prefix="output"
):
Expand All @@ -274,9 +402,8 @@ def _manage_paths(
input_files (dict): Dict with name and path of the input file(s).
log_file (Path): Path to log file.
"""
job_dir = experiment_dir / str(job_id)
output_dir = job_dir / output_folder_name
output_dir = create_folder_if_not_existent(output_dir)
job_dir = self.get_job_dir(job_id, experiment_dir)
output_dir = self.get_output_dir(job_dir, output_folder_name)

output_file = output_dir / output_prefix
log_file = output_dir / (output_prefix + ".log")
Expand Down
10 changes: 5 additions & 5 deletions src/queens/models/adjoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,18 @@ def grad(self, samples, upstream_gradient):
:math:`\frac{\partial g}{\partial f} \frac{df}{dx}`
"""
num_samples = samples.shape[0]
# get last job_ids
last_job_ids = [self.scheduler.next_job_id - num_samples + i for i in range(num_samples)]
next_job_ids = self.scheduler.get_job_ids(num_samples)
experiment_dir = self.scheduler.experiment_dir

# write adjoint data for each sample to adjoint files in old job directories
for job_id, grad_objective in zip(last_job_ids, upstream_gradient):
# write adjoint data for each sample to adjoint files in new job directories
for job_id, grad_objective in zip(next_job_ids, upstream_gradient):
job_dir = current_job_directory(experiment_dir, job_id)
job_dir.mkdir(exist_ok=True)
adjoint_file_path = job_dir.joinpath(self.adjoint_file)
write_to_csv(adjoint_file_path, grad_objective.reshape(1, -1))

# evaluate the adjoint model
gradient = self.create_result_dict_from_scheduler_output(
self.scheduler.evaluate(samples, self.gradient_driver, job_ids=last_job_ids)
self.scheduler.evaluate(samples, self.gradient_driver, job_ids=next_job_ids)
)["result"]
return gradient
65 changes: 57 additions & 8 deletions src/queens/utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#
"""Metadata objects."""

from __future__ import annotations

from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
Expand All @@ -35,7 +37,7 @@
class SimulationMetadata:
"""Simulation metadata object.

This objects holds metadata, times code sections and exports them to yaml.
This objects holds metadata, times code sections, and exports them to yaml.

Attributes:
job_id: Id of the job
Expand All @@ -55,16 +57,32 @@ def __init__(self, job_id: int, inputs: dict, job_dir: Path) -> None:
job_dir: Directory in which to write the metadata
"""
self.job_id = job_id
self.timestamp: str | None = None
self.timestamp = self._get_timestamp()
self.inputs = inputs
self.file_path = (Path(job_dir) / METADATA_FILENAME).with_suffix(METADATA_FILETYPE)
self.file_path = get_metadata_path(job_dir)
self.outputs = None
self.times: dict = {}
self._create_timestamp()

def _create_timestamp(self) -> None:
"""Create timestamp in a nice format."""
self.timestamp = datetime.now().strftime("%d-%m-%Y, %H:%M:%S")
@classmethod
def init_from_file(cls, job_dir: Path) -> SimulationMetadata:
"""Initialize a SimulationMetadata object from an existing metadata file.

Args:
job_dir: Job directory in which the metadata file is located.

Returns:
SimulationMetadata object.
"""
simulation_metadata = cls(-1, {}, job_dir)
metadata_dict = yaml.safe_load(simulation_metadata.file_path.read_text(encoding="utf-8"))
for key, value in metadata_dict.items():
setattr(simulation_metadata, key, value)
return simulation_metadata

def _get_timestamp(self) -> str:
"""Get timestamp in a nice format."""
timestamp = datetime.now().strftime("%d-%m-%Y, %H:%M:%S")
return timestamp
Comment on lines +66 to +85

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were more motivated than myself, I just created a new metadatafile with "_restart" at the end 😅


def to_dict(self) -> dict[str, Any]:
"""Create dictionary from object.
Expand Down Expand Up @@ -117,6 +135,9 @@ def time_code(self, code_section_name: str) -> Iterator[None]:
run_time = perf_counter() - start
# Add the runtime of this code section
self.times[code_section_name]["time"] = run_time
# Add the timestamp when re-using existing job data
if code_section_name == "process_data_again":

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like this idea, why not do it for every measured job and not only for restarts?

self.times[code_section_name]["timestamp"] = self._get_timestamp()
# Export since the job is either finished or failed
self.export()

Expand All @@ -129,6 +150,34 @@ def __str__(self) -> str:
return get_str_table("Simulation Metadata", self.to_dict())


def get_metadata_path(job_dir: str | Path) -> Path:
"""Get metadata file path from a job directory.

Args:
job_dir: Job directory

Returns:
metadata_path: Path to the metadata file
"""
return (Path(job_dir) / METADATA_FILENAME).with_suffix(METADATA_FILETYPE)


def get_metadata_from_job_dir(job_dir: Path) -> dict:
"""Get metadata from a job directory.

Args:
job_dir: Job directory

Returns:
metadata (dict): metadata of a job
"""
metadata_path = get_metadata_path(job_dir)
metadata = yaml.safe_load(metadata_path.read_text())
if metadata is None:
metadata = {}
return metadata


def get_metadata_from_experiment_dir(experiment_dir: Path | str) -> Iterator[Any]:
"""Get metadata from experiment_dir.

Expand All @@ -141,7 +190,7 @@ def get_metadata_from_experiment_dir(experiment_dir: Path | str) -> Iterator[Any
Metadata of a job
"""
for job_dir in job_dirs_in_experiment_dir(experiment_dir):
metadata_path = (job_dir / METADATA_FILENAME).with_suffix(METADATA_FILETYPE)
metadata_path = get_metadata_path(job_dir)
yield yaml.safe_load(metadata_path.read_text())


Expand Down
Loading
Loading