diff --git a/src/om/lib/cheetah.py b/src/om/lib/cheetah.py index 69c792d..9c2ca2e 100644 --- a/src/om/lib/cheetah.py +++ b/src/om/lib/cheetah.py @@ -212,6 +212,10 @@ def __init__( parameters.processed_directory ).resolve() + self._processed_filename_extension: str = ( + f".{parameters.processed_filename_extension}" + ) + self._frames_filename: pathlib.Path = processed_directory / "frames.txt" self._frames_file: TextIO = open(self._frames_filename, "w") self._frames_file.write( @@ -329,6 +333,12 @@ def sort_frames_and_close_files(self) -> None: "ave_intensity\n" ) for frame in frame_list: + if frame.filename != "---": + frame.filename = str( + pathlib.Path(frame.filename).with_suffix( + self._processed_filename_extension + ) + ) fh.write( f"{frame.timestamp}, {frame.event_id}, {frame.frame_is_hit}, " f"{frame.filename}, {frame.index_in_file}, {frame.num_peaks}, " @@ -1027,3 +1037,101 @@ def write_sums( f'Another application is reading the file "{self._filename} exclusively. ' "Five attempts to open the files failed. Cannot update the file." ) + + +def write_VDS_master_file( + *, + parameters: CheetahParameters, +): + """ + Writes the master HDF5 file. + + This function creates an HDF5 file containing a virtual dataset that aggregates the + data stored in the HDF5 files created by the `HDF5Writer` class. The master file is + created in the same directory as the other files, and contains links to all the + data sorted by timestamp. + + Arguments: + parameters: A set of OM configuration parameters collected together in a + parameter group. + """ + processed_directory: pathlib.Path = pathlib.Path( + parameters.processed_directory + ).resolve() + master_filename: pathlib.Path = ( + processed_directory / f"{parameters.processed_filename_prefix}-master." + f"{parameters.processed_filename_extension}" + ) + + cleaned_filename: pathlib.Path = processed_directory / "cleaned.txt" + if not cleaned_filename.exists(): + log.warning( + f'Cannot create the master file: "{cleaned_filename}" file does not exist.' + ) + return + + fh: TextIO + frames: list[FramelistData] = [] + with open(cleaned_filename, "r") as fh: + for line in fh: + if line.startswith("#") or line.strip() == "": + continue + items: list[str] = line.split(",") + filename: str = items[3].strip() + if filename == "---" or not pathlib.Path(filename).exists(): + continue + frames.append( + FramelistData( + timestamp=float(items[0].strip()), + event_id=items[1].strip(), + frame_is_hit=int(items[2].strip()), + filename=filename, + index_in_file=int(items[4].strip()), + num_peaks=int(items[5].strip()), + average_intensity=float(items[6].strip()), + ) + ) + + # Sort by timestamp: + frames.sort(key=lambda frame: frame.timestamp) + + # Copy all datasets from the individual files into the master file + source_file: Any = h5py.File(str(frames[0].filename), "r") + master_file: Any = h5py.File(str(master_filename), "w") + + datasets: list[str] = [] + source_file.visit( + lambda key: ( + datasets.append(key) if isinstance(source_file[key], h5py.Dataset) else None + ) + ) + + n_frames: int = len(frames) + virtual_layouts: dict[str, h5py.VirtualLayout] = {} + for dataset_name in datasets: + dataset_shape: tuple[int, ...] = source_file[dataset_name].shape[1:] + dataset_dtype: Any = source_file[dataset_name].dtype + virtual_layouts[dataset_name] = h5py.VirtualLayout( + shape=(n_frames,) + dataset_shape, dtype=dataset_dtype + ) + source_file.close() + + source_files: dict[str, Any] = {} + for dataset_name in datasets: + virtual_sources: dict[str, Any] = {} + for i, frame in enumerate(frames): + if frame.filename not in source_files: + source_files[frame.filename] = h5py.File(str(frame.filename), "r") + if frame.filename not in virtual_sources: + virtual_sources[frame.filename] = h5py.VirtualSource( + source_files[frame.filename][dataset_name] + ) + virtual_layouts[dataset_name][i] = virtual_sources[frame.filename][ + frame.index_in_file + ] + master_file.create_virtual_dataset(dataset_name, virtual_layouts[dataset_name]) + for source_file in source_files.values(): + source_file.close() + + log.info(f"Master file created: {master_filename}, containing {n_frames} frames.") + master_file.close() diff --git a/src/om/processing_layer/__init__.py b/src/om/processing_layer/__init__.py index bedbe9d..cae3421 100644 --- a/src/om/processing_layer/__init__.py +++ b/src/om/processing_layer/__init__.py @@ -23,7 +23,11 @@ different OnDA Monitor. """ -from .cheetah import CheetahProcessing, StreamingCheetahProcessing # noqa: F401 +from .cheetah import ( + CheetahProcessing, + StreamingCheetahProcessing, + CheetahNoProcessing, +) # noqa: F401 from .crystallography import CrystallographyProcessing # noqa: F401 from .swaxs import SwaxsCheetahProcessing, SwaxsProcessing # noqa: F401 from .testing import TestProcessing # noqa: F401 diff --git a/src/om/processing_layer/cheetah.py b/src/om/processing_layer/cheetah.py index 75f5e0c..02c007e 100644 --- a/src/om/processing_layer/cheetah.py +++ b/src/om/processing_layer/cheetah.py @@ -41,6 +41,7 @@ CheetahStatusFileWriter, FramelistData, HDF5Writer, + write_VDS_master_file, ) from om.lib.crystallography import CrystallographyPeakFinding from om.lib.event_management import EventCounter @@ -58,6 +59,385 @@ msgpack_numpy.patch() +class CheetahNoProcessing(OmProcessingProtocol): + """ + See documentation for the `__init__` function. + """ + + def __init__(self, *, parameters: MonitorParameters) -> None: + """ + Cheetah No Processing. + + This Processing class implements the Cheetah software package, but without + performing any actual processing of the data. It only retrieves the data and + saves it to HDF5 files. + + Arguments: + + monitor_parameters: An object storing OM's configuration parameters. + """ + if parameters.cheetah is None: + log_error_and_exit( + "'cheetah' section is not present in the configuration file" + ) + return # For the type checker + self._cheetah_parameters: CheetahParameters = parameters.cheetah + self._monitor_parameters: MonitorParameters = parameters + + # Processed data directory + Path(parameters.cheetah.processed_directory).mkdir(exist_ok=True) + + def initialize_processing_node( + self, *, node_rank: int, node_pool_size: int + ) -> None: + """ + Initializes the processing nodes for Cheetah No Processing. + + This function initializes the HDF5 file writer and class sums accumulator. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + """ + # HDF5 file writer + self._file_writer: HDF5Writer = HDF5Writer( + parameters=self._cheetah_parameters, + node_rank=node_rank, + ) + + # Class sums accumulation + self._class_sum_accumulator: CheetahClassSumsAccumulator = ( + CheetahClassSumsAccumulator( + parameters=self._cheetah_parameters, + num_classes=1, + ) + ) + + log_info(f"Processing node {node_rank} starting") + + def initialize_collecting_node( + self, *, node_rank: int, node_pool_size: int + ) -> None: + """ + Initializes the collecting node for Cheetah No Processing. + + This function initializes the data accumulation algorithms, the storage buffers + used to compute statistics on the processed data, and some internal counters. + Additionally, it prepares all the file writers. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + """ + + # Status file + self._status_file_writer: CheetahStatusFileWriter = CheetahStatusFileWriter( + parameters=self._cheetah_parameters + ) + self._status_file_writer.update_status(status="Not finished") + + # Event counting + self._event_counter: EventCounter = EventCounter( + speed_report_interval=( + self._cheetah_parameters.status_file_update_interval + ), + node_pool_size=node_pool_size, + ) + + # list files + self._list_files_writer: CheetahlistFilesWriter = CheetahlistFilesWriter( + parameters=self._cheetah_parameters, + ) + + # Class sums collection + self._class_sum_collector: CheetahClassSumsCollector = ( + CheetahClassSumsCollector( + parameters=self._cheetah_parameters, num_classes=1 + ) + ) + + # Console + log_info("Starting the monitor...") + + def process_data( # noqa: C901 + self, *, node_rank: int, node_pool_size: int, data: dict[str, Any] + ) -> tuple[dict[str, Any], int]: + """ + Processes a detector data frame. + + This functions saves the retrieved data to HDF5 file, and adds the data to the + class sum. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + + data: A dictionary containing the data that OM retrieved for the detector + data frame being processed. + + * The dictionary keys describe the Data Sources for which OM has + retrieved data. The keys must match the source names listed in the + `required_data` entry of OM's `om` configuration parameter group. + + * The corresponding dictionary values must store the the data that OM + retrieved for each of the Data Sources. + + Returns: + + A tuple with two entries. The first entry is a dictionary storing the + processed data that should be sent to the collecting node. The second + entry is the OM rank number of the node that processed the information. + """ + + # Empty peak list + peak_list: PeakList = PeakList( + num_peaks=0, + fs=[], + ss=[], + intensity=[], + num_pixels=[], + max_pixel_intensity=[], + snr=[], + ) + + # Add data to the class sum + self._class_sum_accumulator.add_frame( + class_number=0, + frame_data=data["detector_data"], + peak_list=peak_list, + ) + + # Saving data to HDF5 file + data_to_write: dict[str, Any] = { + "detector_data": data["detector_data"], + "event_id": data["event_id"], + "timestamp": data["timestamp"], + "beam_energy": data["beam_energy"], + "detector_distance": data["detector_distance"], + } + if "optical_laser_active" in data.keys(): + data_to_write["optical_laser_active"] = int(data["optical_laser_active"]) + if "lcls_extra" in data.keys(): + data_to_write["lcls_extra"] = data["lcls_extra"] + self._file_writer.write_frame(processed_data=data_to_write) + + # Data to send to the collecting node + processed_data = { + "timestamp": data["timestamp"], + "event_id": data["event_id"], + "class_sums": self._class_sum_accumulator.get_sums_for_sending(), + "filename": self._file_writer.get_current_filename(), + "index": self._file_writer.get_num_written_frames(), + "peak_list": peak_list, + } + + return (processed_data, node_rank) + + def wait_for_data( + self, + *, + node_rank: int, + node_pool_size: int, + ) -> None: + """ + Receives and handles requests from external programs. + + This function is not used in Cheetah, and therefore does nothing. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + + """ + pass + + def collect_data( # noqa: C901 + self, + *, + node_rank: int, + node_pool_size: int, + processed_data: tuple[dict[str, Any], int], + ) -> dict[str, dict[str, Any]] | None: + """ + Computes statistics on aggregated data and saves them to files. + + This function collects and accumulates frame- and peak-related information + received from the processing nodes. Optionally, it computes the sums of hit + and non-hit detector frames and the corresponding virtual powder patterns, and + saves them to file. Additionally, this function writes information about the + processing statistics (number of processed events, number of found hits and the + elapsed time) to a status file at regular intervals. External programs can + inspect the file to determine the advancement of the data processing. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + + processed_data (tuple[dict, int]): A tuple whose first entry is a + dictionary storing the data received from a processing node, and whose + second entry is the OM rank number of the node that processed the + information. + """ + + received_data: dict[str, Any] = processed_data[0] + + # Collect class sums + if received_data["class_sums"] is not None: + self._class_sum_collector.add_sums(class_sums=received_data["class_sums"]) + + # End processing + if "end_processing" in received_data: + return None + + # Event counting + self._event_counter.add_hit_event() + + # Write frame and peaks data to list files + frame_data: FramelistData = FramelistData( + received_data["timestamp"], + received_data["event_id"], + 1, + received_data["filename"], + received_data["index"], + 0, + 0, + ) + self._list_files_writer.add_frame( + frame_data=frame_data, peak_list=received_data["peak_list"] + ) + + # Update status file + num_events: int = self._event_counter.get_num_events() + if num_events % self._cheetah_parameters.status_file_update_interval == 0: + self._status_file_writer.update_status( + status="Not finished", + num_frames=num_events, + num_hits=self._event_counter.get_num_hits(), + ) + self._list_files_writer.flush_files() + + self._event_counter.report_speed() + + return None + + def end_processing_on_processing_node( + self, + *, + node_rank: int, + node_pool_size: int, + ) -> dict[str, Any] | None: + """ + Ends processing on the processing nodes for Cheetah No Processing. + + This function prints a message on the console, closes the output HDF5 files + and ends the processing. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + + Returns: + + Usually nothing. Optionally, a dictionary storing information to be sent to + the processing node. + """ + + log_info(f"Processing node {node_rank} shutting down.") + self._file_writer.close() + # Send last class sums to the collecting node + return { + "class_sums": self._class_sum_accumulator.get_sums_for_sending( + disregard_counter=True + ), + "end_processing": True, + } + + def end_processing_on_collecting_node( + self, *, node_rank: int, node_pool_size: int + ) -> None: + """ + Ends processing on the collecting node for Cheetah. + + This function prints a message on the console, writes the final information in + the sum and status files, closes the files and ends the processing. + + Please see the documentation of the base Protocol class for additional + information about this method. + + Arguments: + + node_rank: The OM rank of the current node, which is an integer that + unambiguously identifies the current node in the OM node pool. + + node_pool_size: The total number of nodes in the OM pool, including all the + processing nodes and the collecting node. + """ + # Save final accumulated class sums + self._class_sum_collector.save_sums() + + # Sort frames and write final list files + self._list_files_writer.sort_frames_and_close_files() + + # Write master file with sorted frames + write_VDS_master_file( + parameters=self._cheetah_parameters, + ) + + # Write final status + self._status_file_writer.update_status( + status="Finished", + num_frames=self._event_counter.get_num_events(), + num_hits=self._event_counter.get_num_hits(), + ) + + log_info( + "Processing finished. OM has processed " + f"{self._event_counter.get_num_events()} events in total." + ) + + class OmCheetahMixin: def __init__(self, *, parameters: MonitorParameters) -> None: """ @@ -671,6 +1051,11 @@ def end_processing_on_collecting_node( # Sort frames and write final list files self._list_files_writer.sort_frames_and_close_files() + # Write master file with sorted frames + write_VDS_master_file( + parameters=self._cheetah_parameters, + ) + # Write final status self._status_file_writer.update_status( status="Finished", @@ -923,7 +1308,7 @@ def collect_data( # noqa: C901 received_data["timestamp"], received_data["event_id"], int(received_data["frame_is_hit"]), - "", + "---", -1, received_data["peak_list"].num_peaks, numpy.mean(cast(numpy.floating[Any], received_data["peak_list"].intensity)),