From 0e3c228718ddbe565d9d83402a7a35eb3fd3b3c5 Mon Sep 17 00:00:00 2001 From: Mohcine Chraibi Date: Tue, 24 Mar 2026 11:19:35 +0100 Subject: [PATCH 1/4] Add test and fix for orientation issue --- fdsreader/bndf/obstruction.py | 602 +++++++++++++++++++--------- tests/acceptance_tests/test_bndf.py | 26 +- 2 files changed, 433 insertions(+), 195 deletions(-) diff --git a/fdsreader/bndf/obstruction.py b/fdsreader/bndf/obstruction.py index d9ba6a4..a325091 100644 --- a/fdsreader/bndf/obstruction.py +++ b/fdsreader/bndf/obstruction.py @@ -27,8 +27,18 @@ class Patch: :ivar _n_t: Total number of time steps for which output data has been written. """ - def __init__(self, file_path: str, dimension: Dimension, extent: Extent, orientation: int, cell_centered: bool, - patch_offset: int, initial_offset: int, n_t: int, mesh: 'Mesh'): + def __init__( + self, + file_path: str, + dimension: Dimension, + extent: Extent, + orientation: int, + cell_centered: bool, + patch_offset: int, + initial_offset: int, + n_t: int, + mesh: "Mesh", + ): self.file_path = file_path self.dimension = dimension self.extent = extent @@ -39,7 +49,7 @@ def __init__(self, file_path: str, dimension: Dimension, extent: Extent, orienta self._time_offset = -1 self._n_t = n_t self.mesh = mesh - self._boundary_parent: 'Boundary' = None + self._boundary_parent: "Boundary" = None def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. @@ -53,31 +63,30 @@ def n_t(self, count_duplicates=True) -> int: @property def shape(self) -> Tuple: - """Convenience function to calculate the shape of the array containing data for this patch. - """ + """Convenience function to calculate the shape of the array containing data for this patch.""" return self.dimension.shape(self.cell_centered) @property def size(self) -> int: - """Convenience function to calculate the number of data points in the array for this patch. - """ + """Convenience function to calculate the number of data points in the array for this patch.""" return self.dimension.size(self.cell_centered) def _post_init(self, time_offset: int): - """Fully initialize the patch as soon as the number of timesteps is known. - """ + """Fully initialize the patch as soon as the number of timesteps is known.""" self._time_offset = time_offset - def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[Literal['x', 'y', 'z'], np.ndarray]: + def get_coordinates( + self, ignore_cell_centered: bool = False + ) -> Dict[Literal["x", "y", "z"], np.ndarray]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. - For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. + For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. - :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. + :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ # orientation = ('x', 'y', 'z')[self.orientation - 1] if self.orientation != 0 else '' # coords = {'x': set(), 'y': set(), 'z': set()} - coords: Dict[Literal['x', 'y', 'z'], np.ndarray] = {} - for dim in ('x', 'y', 'z'): + coords: Dict[Literal["x", "y", "z"], np.ndarray] = {} + for dim in ("x", "y", "z"): co = self.mesh.coordinates[dim].copy() # In case the slice is cell-centered, we will shift the coordinates by half a cell # and remove the last coordinate @@ -85,39 +94,52 @@ def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[Literal['x co = co[:-1] co += abs(co[1] - co[0]) / 2 - coords[dim] = co[np.where(np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]))] + coords[dim] = co[ + np.where( + np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]) + ) + ] if coords[dim].size == 0: - coords[dim] = np.array([co[np.argmin(np.abs(co - self.extent[dim][0]))]]) + coords[dim] = np.array( + [co[np.argmin(np.abs(co - self.extent[dim][0]))]] + ) return coords @property def data(self): - """Method to load the quantity data for a single patch. - """ + """Method to load the quantity data for a single patch.""" if not hasattr(self, "_data"): - dtype_data = fdtype.new((('f', self.dimension.size(cell_centered=False)),)) + dtype_data = fdtype.new((("f", self.dimension.size(cell_centered=False)),)) if self._n_t == -1: - self._n_t = (os.stat(self.file_path).st_size - self._initial_offset) // self._time_offset + self._n_t = ( + os.stat(self.file_path).st_size - self._initial_offset + ) // self._time_offset self._data = np.empty((self.n_t(count_duplicates=True),) + self.shape) - with open(self.file_path, 'rb') as infile: + with open(self.file_path, "rb") as infile: for t in range(self.n_t(count_duplicates=True)): - infile.seek(self._initial_offset + self._patch_offset + t * self._time_offset) + infile.seek( + self._initial_offset + + self._patch_offset + + t * self._time_offset + ) data = np.fromfile(infile, dtype_data, 1)[0][1].reshape( - self.dimension.shape(cell_centered=False), order='F') + self.dimension.shape(cell_centered=False), order="F" + ) if self.cell_centered: self._data[t, :] = data[:-1, :-1] else: self._data[t, :] = data - unique_times_indices = np.unique(self._boundary_parent.times, return_index=True)[1] + unique_times_indices = np.unique( + self._boundary_parent.times, return_index=True + )[1] self._data = self._data[unique_times_indices] return self._data def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory. - """ + """Remove all data from the internal cache that has been loaded so far to free memory.""" if hasattr(self, "_data"): del self._data @@ -137,8 +159,15 @@ class Boundary: :ivar n_t: Total number of time steps for which output data has been written. """ - def __init__(self, quantity: Quantity, cell_centered: bool, times: Sequence[float], patches: List[Patch], - lower_bounds: np.ndarray, upper_bounds: np.ndarray): + def __init__( + self, + quantity: Quantity, + cell_centered: bool, + times: Sequence[float], + patches: List[Patch], + lower_bounds: np.ndarray, + upper_bounds: np.ndarray, + ): self.quantity = quantity self.cell_centered = cell_centered self._patches = patches @@ -146,7 +175,6 @@ def __init__(self, quantity: Quantity, cell_centered: bool, times: Sequence[floa self.lower_bounds = lower_bounds self.upper_bounds = upper_bounds - def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. :param count_duplicates: If true, return the total number of data points, even if there is @@ -157,24 +185,23 @@ def n_t(self, count_duplicates=True) -> int: @property def orientations(self): - """Return all orientations for which there is data available. - """ + """Return all orientations for which there is data available.""" return [p.orientation for p in self._patches] def get_nearest_timestep(self, time: float) -> int: - """Calculates the nearest timestep for which data has been output for this obstruction. - """ + """Calculates the nearest timestep for which data has been output for this obstruction.""" idx = np.searchsorted(self.times, time, side="left") - if time > 0 and (idx == len(self.times) or math.fabs( - time - self.times[idx - 1]) < math.fabs(time - self.times[idx])): + if time > 0 and ( + idx == len(self.times) + or math.fabs(time - self.times[idx - 1]) < math.fabs(time - self.times[idx]) + ): return idx - 1 else: return idx @property def data(self) -> Dict[int, Patch]: - """The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y). - """ + """The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y).""" return {p.orientation: p for p in self._patches} def vmin(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: @@ -204,8 +231,7 @@ def vmax(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: return float(np.max(self.data[orientation].data)) def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory. - """ + """Remove all data from the internal cache that has been loaded so far to free memory.""" for p in self._patches: p.clear_cache() @@ -223,13 +249,20 @@ class SubObstruction: :ivar show_times: List with points in time from when on the SubObstruction will be shown. """ - def __init__(self, side_surfaces: Tuple, bound_indices: Tuple[int, int, int, int, int, int], - extent: Extent, mesh: 'Mesh'): + def __init__( + self, + side_surfaces: Tuple, + bound_indices: Tuple[int, int, int, int, int, int], + extent: Extent, + mesh: "Mesh", + ): self.extent = extent self.side_surfaces = side_surfaces - self.bound_indices = {'x': (bound_indices[0], bound_indices[1]), - 'y': (bound_indices[2], bound_indices[3]), - 'z': (bound_indices[4], bound_indices[5])} + self.bound_indices = { + "x": (bound_indices[0], bound_indices[1]), + "y": (bound_indices[2], bound_indices[3]), + "z": (bound_indices[4], bound_indices[5]), + } self.mesh = mesh self._boundary_data: Dict[int, Boundary] = dict() @@ -237,12 +270,27 @@ def __init__(self, side_surfaces: Tuple, bound_indices: Tuple[int, int, int, int self.hide_times = list() self.show_times = list() - def _add_patches(self, bid: int, cell_centered: bool, quantity: str, short_name: str, unit: str, - patches: List[Patch], times: Sequence[float], lower_bounds: np.ndarray, - upper_bounds: np.ndarray): + def _add_patches( + self, + bid: int, + cell_centered: bool, + quantity: str, + short_name: str, + unit: str, + patches: List[Patch], + times: Sequence[float], + lower_bounds: np.ndarray, + upper_bounds: np.ndarray, + ): if bid not in self._boundary_data: - self._boundary_data[bid] = Boundary(Quantity(quantity, short_name, unit), cell_centered, times, - patches, lower_bounds, upper_bounds) + self._boundary_data[bid] = Boundary( + Quantity(quantity, short_name, unit), + cell_centered, + times, + patches, + lower_bounds, + upper_bounds, + ) # Add reference to parent boundary class in patches for patch in patches: patch._boundary_parent = self._boundary_data[bid] @@ -252,32 +300,39 @@ def _add_patches(self, bid: int, cell_centered: bool, quantity: str, short_name: @property def orientations(self): - """Return all orientations for which there is data available. - """ + """Return all orientations for which there is data available.""" if self.has_boundary_data: return next(iter(self._boundary_data.values())).orientations return [] - def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ - int, Dict[Literal['x', 'y', 'z'], np.ndarray]]: + def get_coordinates( + self, ignore_cell_centered: bool = False + ) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. - For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. + For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. - :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. + :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: - return {orientation: patch.get_coordinates(ignore_cell_centered) for orientation, patch in - next(iter(self._boundary_data.values())).data.items()} + return { + orientation: patch.get_coordinates(ignore_cell_centered) + for orientation, patch in next( + iter(self._boundary_data.values()) + ).data.items() + } return {} - def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, value: float) -> int: - """Get the nearest mesh coordinate index in a specific dimension for a specific orientation. - """ + def get_nearest_index( + self, dimension: Literal["x", "y", "z"], orientation: int, value: float + ) -> int: + """Get the nearest mesh coordinate index in a specific dimension for a specific orientation.""" if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") - if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs( - value - coords[idx])): + if idx > 0 and ( + idx == coords.size + or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx]) + ): return idx - 1 else: return idx @@ -290,8 +345,12 @@ def has_boundary_data(self): def get_data(self, quantity: Union[str, Quantity]): if type(quantity) == Quantity: quantity = quantity.name - return next(b for b in self._boundary_data.values() if - b.quantity.name.lower() == quantity.lower() or b.quantity.short_name.lower() == quantity.lower()) + return next( + b + for b in self._boundary_data.values() + if b.quantity.name.lower() == quantity.lower() + or b.quantity.short_name.lower() == quantity.lower() + ) def __getitem__(self, item): if type(item) == int: @@ -313,20 +372,20 @@ def n_t(self, count_duplicates=True) -> int: simulation in between the simulation run. """ if self.has_boundary_data: - return next(iter(self._boundary_data.values())).n_t(count_duplicates=count_duplicates) + return next(iter(self._boundary_data.values())).n_t( + count_duplicates=count_duplicates + ) return 0 @property def times(self): - """Return all timesteps for which boundary data is available, if any. - """ + """Return all timesteps for which boundary data is available, if any.""" if self.has_boundary_data: return next(iter(self._boundary_data.values())).times return np.array([]) def get_visible_times(self, times: Sequence[float]) -> np.ndarray: - """Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden. - """ + """Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden.""" ret = list() hidden = False for time in times: @@ -338,7 +397,11 @@ def get_visible_times(self, times: Sequence[float]) -> np.ndarray: ret.append(time) return np.array(ret) - def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: + def vmin( + self, + quantity: Union[str, Quantity], + orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, + ) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. @@ -347,7 +410,11 @@ def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, return self.get_data(quantity).vmin(orientation) return np.nan - def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: + def vmax( + self, + quantity: Union[str, Quantity], + orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, + ) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. @@ -357,8 +424,7 @@ def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, return np.nan def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory. - """ + """Remove all data from the internal cache that has been loaded so far to free memory.""" for bndf in self._boundary_data.values(): bndf.clear_cache() @@ -386,8 +452,14 @@ class Obstruction: (ranging from 0.0 to 1.0). """ - def __init__(self, oid: str, color_index: int, block_type: int, texture_origin: Tuple[float, float, float], - rgba: Union[Tuple[()], Tuple[float, float, float, float]] = ()): + def __init__( + self, + oid: str, + color_index: int, + block_type: int, + texture_origin: Tuple[float, float, float], + rgba: Union[Tuple[()], Tuple[float, float, float, float]] = (), + ): self.id = oid self.color_index = color_index self.block_type = block_type @@ -399,18 +471,21 @@ def __init__(self, oid: str, color_index: int, block_type: int, texture_origin: @property def bounding_box(self) -> Extent: - """:class:`Extent` object representing the bounding box around the Obstruction. - """ + """:class:`Extent` object representing the bounding box around the Obstruction.""" extents = [sub.extent for sub in self._subobstructions.values()] - return Extent(min(extents, key=lambda e: e.x_start).x_start, max(extents, key=lambda e: e.x_end).x_end, - min(extents, key=lambda e: e.y_start).y_start, max(extents, key=lambda e: e.y_end).y_end, - min(extents, key=lambda e: e.z_start).z_start, max(extents, key=lambda e: e.z_end).z_end) + return Extent( + min(extents, key=lambda e: e.x_start).x_start, + max(extents, key=lambda e: e.x_end).x_end, + min(extents, key=lambda e: e.y_start).y_start, + max(extents, key=lambda e: e.y_end).y_end, + min(extents, key=lambda e: e.z_start).z_start, + max(extents, key=lambda e: e.z_end).z_end, + ) @property def orientations(self): - """Return all orientations for which there is data available. - """ + """Return all orientations for which there is data available.""" if self.has_boundary_data: orientations = set() for subobst in self._subobstructions.values(): @@ -431,36 +506,37 @@ def n_t(self, count_duplicates=True) -> int: @property def times(self): - """Return all timesteps for which boundary data is available, if any. - """ + """Return all timesteps for which boundary data is available, if any.""" for subobst in self._subobstructions.values(): if subobst.has_boundary_data: return subobst.times return np.array([]) def get_visible_times(self, times: Sequence[float]): - """Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden. - """ + """Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden.""" for subobst in self._subobstructions.values(): return subobst.get_visible_times(times) return np.array([]) - def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ - int, Dict[Literal['x', 'y', 'z'], np.ndarray]]: + def get_coordinates( + self, ignore_cell_centered: bool = False + ) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. - For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. + For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. - :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. + :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: all_coords = dict() for orientation_int in self.orientations: - orientation = ('x', 'y', 'z')[abs(orientation_int) - 1] - coords = {'x': set(), 'y': set(), 'z': set()} - for dim in ('x', 'y', 'z'): + orientation = ("x", "y", "z")[abs(orientation_int) - 1] + coords = {"x": set(), "y": set(), "z": set()} + for dim in ("x", "y", "z"): if orientation == dim: bounding_box_index = 0 if orientation_int < 0 else 1 - coords[dim] = np.array([self.bounding_box[dim][bounding_box_index]]) + coords[dim] = np.array( + [self.bounding_box[dim][bounding_box_index]] + ) continue min_delta = math.inf for subobst in self._subobstructions.values(): @@ -485,12 +561,19 @@ def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ for subobst in self._subobstructions.values(): mesh = subobst.mesh mesh_coords = mesh.coordinates[dim] - idx = np.searchsorted(mesh_coords, single_coordinate, side="left") - if idx > 0 and (idx == mesh_coords.size or math.fabs( - single_coordinate - mesh_coords[idx - 1]) < math.fabs( - single_coordinate - mesh_coords[idx])): + idx = np.searchsorted( + mesh_coords, single_coordinate, side="left" + ) + if idx > 0 and ( + idx == mesh_coords.size + or math.fabs(single_coordinate - mesh_coords[idx - 1]) + < math.fabs(single_coordinate - mesh_coords[idx]) + ): idx = idx + 1 - if mesh_coords[idx] - single_coordinate < nearest_coordinate - single_coordinate: + if ( + mesh_coords[idx] - single_coordinate + < nearest_coordinate - single_coordinate + ): nearest_coordinate = mesh_coords[idx] coords[dim] = np.array([nearest_coordinate]) all_coords[orientation_int] = coords @@ -498,14 +581,17 @@ def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ return all_coords return dict() - def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, value: float) -> int: - """Get the nearest mesh coordinate index in a specific dimension for a specific orientation. - """ + def get_nearest_index( + self, dimension: Literal["x", "y", "z"], orientation: int, value: float + ) -> int: + """Get the nearest mesh coordinate index in a specific dimension for a specific orientation.""" if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") - if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs( - value - coords[idx])): + if idx > 0 and ( + idx == coords.size + or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx]) + ): return idx - 1 else: return idx @@ -513,8 +599,7 @@ def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, @property def quantities(self) -> List[Quantity]: - """Get a list of all quantities for which boundary data exists. - """ + """Get a list of all quantities for which boundary data exists.""" if self.has_boundary_data: qs = set() for subobst in self._subobstructions.values(): @@ -524,22 +609,29 @@ def quantities(self) -> List[Quantity]: return [] @property - def meshes(self) -> List['Mesh']: - """Returns a list of all meshes this slice cuts through. - """ + def meshes(self) -> List["Mesh"]: + """Returns a list of all meshes this slice cuts through.""" return [subobst.mesh for subobst in self._subobstructions.values()] - def filter_by_orientation(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> List[SubObstruction]: + def filter_by_orientation( + self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0 + ) -> List[SubObstruction]: """Filter all SubObstructions by a specific orientation. All returned SubObstructions will contain boundary data - in the specified orientation. + in the specified orientation. """ if self.has_boundary_data: - return [subobst for subobst in self._subobstructions.values() if - orientation in subobst.orientations] + return [ + subobst + for subobst in self._subobstructions.values() + if orientation in subobst.orientations + ] return [] - def get_boundary_data(self, quantity: Union[Quantity, str], - orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> Dict[str, Boundary]: + def get_boundary_data( + self, + quantity: Union[Quantity, str], + orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, + ) -> Dict[str, Boundary]: """Gets the boundary data for a specific quantity of all SubObstructions. :param quantity: The quantity to filter by. @@ -549,20 +641,26 @@ def get_boundary_data(self, quantity: Union[Quantity, str], if type(quantity) == Quantity: quantity = quantity.name - ret = {subobst.mesh.id: subobst.get_data(quantity) for subobst in self._subobstructions.values() if - subobst.has_boundary_data} + ret = { + subobst.mesh.id: subobst.get_data(quantity) + for subobst in self._subobstructions.values() + if subobst.has_boundary_data + } if orientation == 0: return ret - return {mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys()} + return { + mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys() + } def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int: - """Calculates the nearest timestep for which data has been output for this obstruction. - """ + """Calculates the nearest timestep for which data has been output for this obstruction.""" if self.has_boundary_data: times = self.get_visible_times(self.times) if visible_only else self.times idx = np.searchsorted(times, time, side="left") - if time > 0 and (idx == len(times) or math.fabs( - time - times[idx - 1]) < math.fabs(time - times[idx])): + if time > 0 and ( + idx == len(times) + or math.fabs(time - times[idx - 1]) < math.fabs(time - times[idx]) + ): return idx - 1 else: return idx @@ -570,28 +668,47 @@ def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int: def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): """Gets the patch of the :class:`SubObstruction` that has the least distance to the given point. - If there are multiple patches with the same distance, a random one will be selected. + If there are multiple patches with the same distance, a random one will be selected. """ if self.has_boundary_data: d_min = np.finfo(float).max - patches_min = list() + patches_with_dist = list() for subobst in self._subobstructions.values(): for patch in subobst.get_data(self.quantities[0])._patches: - dx = max(patch.extent.x_start - x, 0, x - patch.extent.x_end) if x is not None else 0 - dy = max(patch.extent.y_start - y, 0, y - patch.extent.y_end) if y is not None else 0 - dz = max(patch.extent.z_start - z, 0, z - patch.extent.z_end) if z is not None else 0 + dx = ( + max(patch.extent.x_start - x, 0, x - patch.extent.x_end) + if x is not None + else 0 + ) + dy = ( + max(patch.extent.y_start - y, 0, y - patch.extent.y_end) + if y is not None + else 0 + ) + dz = ( + max(patch.extent.z_start - z, 0, z - patch.extent.z_end) + if z is not None + else 0 + ) d = np.sqrt(dx * dx + dy * dy + dz * dz) - if d <= d_min: + if d < d_min: d_min = d - patches_min.append(patch) - - if x is not None: - patches_min.sort(key=lambda patch: (patch.extent.x_end - patch.extent.x_start)) - if y is not None: - patches_min.sort(key=lambda patch: (patch.extent.y_end - patch.extent.y_start)) - if z is not None: - patches_min.sort(key=lambda patch: (patch.extent.z_end - patch.extent.z_start)) + patches_with_dist = [(patch, dx, dy, dz)] + elif d == d_min: + patches_with_dist.append((patch, dx, dy, dz)) + + patches_min = [p[0] for p in patches_with_dist] + + if x is not None and len(patches_with_dist) > 1: + patches_with_dist.sort(key=lambda p: p[1]) + patches_min = [p[0] for p in patches_with_dist] + elif y is not None and len(patches_with_dist) > 1: + patches_with_dist.sort(key=lambda p: p[2]) + patches_min = [p[0] for p in patches_with_dist] + elif z is not None and len(patches_with_dist) > 1: + patches_with_dist.sort(key=lambda p: p[3]) + patches_min = [p[0] for p in patches_with_dist] if len(patches_min) > 0: return patches_min[0] @@ -599,29 +716,38 @@ def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): @property def has_boundary_data(self): - """Whether boundary data has been output in the simulation. - """ - return any(subobst.has_boundary_data for subobst in self._subobstructions.values()) - - def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dict[ - int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: + """Whether boundary data has been output in the simulation.""" + return any( + subobst.has_boundary_data for subobst in self._subobstructions.values() + ) + + def get_global_boundary_data_arrays( + self, quantity: Union[str, Quantity] + ) -> Dict[int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: """Creates a global numpy ndarray from all subobstruction's boundary data for each orientation. - :param quantity: The quantity's name or short name for which to load the global arrays. + :param quantity: The quantity's name or short name for which to load the global arrays. """ if not self.has_boundary_data: return dict() for subobst in self._subobstructions.values(): if subobst.has_boundary_data: - cell_centered = next(iter(subobst._boundary_data.values())).cell_centered + cell_centered = next( + iter(subobst._boundary_data.values()) + ).cell_centered result = dict() for orientation_int in self.orientations: subobst_sets = [list(), list()] - dim = ['x', 'y', 'z'][abs(orientation_int) - 1] + dim = ["x", "y", "z"][abs(orientation_int) - 1] random_subobst = next( - subobst for subobst in self._subobstructions.values() if orientation_int in subobst.get_coordinates()) - base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[orientation_int][dim][0] + subobst + for subobst in self._subobstructions.values() + if orientation_int in subobst.get_coordinates() + ) + base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[ + orientation_int + ][dim][0] for subobst in self._subobstructions.values(): if not subobst.has_boundary_data: @@ -638,28 +764,43 @@ def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dic first_result_grid = None for subobsts in subobst_sets: - coord_min = {'x': math.inf, 'y': math.inf, 'z': math.inf} - coord_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} - for dim in ('x', 'y', 'z'): + coord_min = {"x": math.inf, "y": math.inf, "z": math.inf} + coord_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf} + for dim in ("x", "y", "z"): for subobst in subobsts: - patch_extent = subobst.get_data(quantity).data[orientation_int].extent - co = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int][dim] - co = co[np.where(np.logical_and(co >= patch_extent[dim][0], co <= patch_extent[dim][1]))] + patch_extent = ( + subobst.get_data(quantity).data[orientation_int].extent + ) + co = subobst.get_coordinates(ignore_cell_centered=True)[ + orientation_int + ][dim] + co = co[ + np.where( + np.logical_and( + co >= patch_extent[dim][0], + co <= patch_extent[dim][1], + ) + ) + ] coord_min[dim] = min(co[0], coord_min[dim]) coord_max[dim] = max(co[-1], coord_max[dim]) # The global grid will use the finest mesh as base and duplicate values of the coarser meshes # Therefore we first find the finest mesh and calculate the step size in each dimension - step_sizes_min = {'x': coord_max['x'] - coord_min['x'], - 'y': coord_max['y'] - coord_min['y'], - 'z': coord_max['z'] - coord_min['z']} - step_sizes_max = {'x': 0, 'y': 0, 'z': 0} + step_sizes_min = { + "x": coord_max["x"] - coord_min["x"], + "y": coord_max["y"] - coord_min["y"], + "z": coord_max["z"] - coord_min["z"], + } + step_sizes_max = {"x": 0, "y": 0, "z": 0} steps = dict() - global_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} + global_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf} - for dim in ('x', 'y', 'z'): + for dim in ("x", "y", "z"): for subobst in subobsts: - subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] + subobst_coords = subobst.get_coordinates( + ignore_cell_centered=True + )[orientation_int] if len(subobst_coords[dim]) <= 1: step_size = 0 else: @@ -668,36 +809,74 @@ def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dic step_sizes_max[dim] = max(step_size, step_sizes_max[dim]) global_max[dim] = max(subobst_coords[dim][-1], global_max[dim]) - for dim in ('x', 'y', 'z'): + for dim in ("x", "y", "z"): if step_sizes_min[dim] == 0: step_sizes_min[dim] = math.inf steps[dim] = 1 else: - steps[dim] = max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + ( - 0 if cell_centered else 1) - - grid = np.full((self.n_t(count_duplicates=False), steps['x'], steps['y'], steps['z']), np.nan) - - start_coordinates = {'x': coord_min['x'], 'y': coord_min['y'], 'z': coord_min['z']} + steps[dim] = max( + int( + round( + (coord_max[dim] - coord_min[dim]) + / step_sizes_min[dim] + ) + ), + 1, + ) + (0 if cell_centered else 1) + + grid = np.full( + ( + self.n_t(count_duplicates=False), + steps["x"], + steps["y"], + steps["z"], + ), + np.nan, + ) + + start_coordinates = { + "x": coord_min["x"], + "y": coord_min["y"], + "z": coord_min["z"], + } start_idx = dict() end_idx = dict() for subobst in subobsts: - patch_data = np.expand_dims(subobst.get_data(quantity).data[orientation_int].data, - axis=abs(orientation_int)) - subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] + patch_data = np.expand_dims( + subobst.get_data(quantity).data[orientation_int].data, + axis=abs(orientation_int), + ) + subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[ + orientation_int + ] for axis in (0, 1, 2): - dim = ('x', 'y', 'z')[axis] + dim = ("x", "y", "z")[axis] if axis == abs(orientation_int) - 1: start_idx[dim] = 0 end_idx[dim] = 1 continue n_repeat = max( - int(round((subobst_coords[dim][1] - subobst_coords[dim][0]) / step_sizes_min[dim])), 1) + int( + round( + (subobst_coords[dim][1] - subobst_coords[dim][0]) + / step_sizes_min[dim] + ) + ), + 1, + ) start_idx[dim] = int( - round((subobst_coords[dim][0] - start_coordinates[dim]) / step_sizes_min[dim])) + round( + (subobst_coords[dim][0] - start_coordinates[dim]) + / step_sizes_min[dim] + ) + ) end_idx[dim] = int( - round((subobst_coords[dim][-1] - start_coordinates[dim]) / step_sizes_min[dim])) + round( + (subobst_coords[dim][-1] - start_coordinates[dim]) + / step_sizes_min[dim] + ) + ) # We ignore border points unless they are actually on the border of the simulation space as all # other border points actually appear twice, as the subobstructions overlap. This only @@ -706,27 +885,45 @@ def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dic if axis != abs(orientation_int) - 1: reduced_shape = list(patch_data.shape) reduced_shape[axis + 1] -= 1 - reduced_data_slices = tuple(slice(s) for s in reduced_shape) + reduced_data_slices = tuple( + slice(s) for s in reduced_shape + ) patch_data = patch_data[reduced_data_slices] # Temporarily save border points to add them back to the array again later if subobst_coords[dim][-1] == global_max[dim]: end_idx[dim] += 1 temp_data_slices = [slice(s) for s in patch_data.shape] - temp_data_slices[axis + 1] = slice(patch_data.shape[axis + 1] - 1, None) + temp_data_slices[axis + 1] = slice( + patch_data.shape[axis + 1] - 1, None + ) temp_data = patch_data[tuple(temp_data_slices)] if n_repeat > 1: patch_data = np.repeat(patch_data, n_repeat, axis=axis + 1) # Add border points back again if needed - if not cell_centered and subobst_coords[dim][-1] == global_max[dim]: - patch_data = np.concatenate((patch_data, temp_data), axis=axis + 1) - - grid[:, start_idx['x']: end_idx['x'], start_idx['y']: end_idx['y'], - start_idx['z']: end_idx['z']] = patch_data.reshape( - (self.n_t(count_duplicates=False), end_idx['x'] - start_idx['x'], end_idx['y'] - start_idx['y'], - end_idx['z'] - start_idx['z'])) + if ( + not cell_centered + and subobst_coords[dim][-1] == global_max[dim] + ): + patch_data = np.concatenate( + (patch_data, temp_data), axis=axis + 1 + ) + + grid[ + :, + start_idx["x"] : end_idx["x"], + start_idx["y"] : end_idx["y"], + start_idx["z"] : end_idx["z"], + ] = patch_data.reshape( + ( + self.n_t(count_duplicates=False), + end_idx["x"] - start_idx["x"], + end_idx["y"] - start_idx["y"], + end_idx["z"] - start_idx["z"], + ) + ) # Remove empty dimensions, but make sure to note remove the time dimension if there is only a single timestep grid = np.squeeze(grid) @@ -739,33 +936,43 @@ def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dic return result def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory. - """ + """Remove all data from the internal cache that has been loaded so far to free memory.""" for s in self._subobstructions.values(): s.clear_cache() - def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: + def vmin( + self, + quantity: Union[str, Quantity], + orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, + ) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: - return min(s.vmin(quantity, orientation) for s in self._subobstructions.values()) + return min( + s.vmin(quantity, orientation) for s in self._subobstructions.values() + ) else: return np.nan - def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: + def vmax( + self, + quantity: Union[str, Quantity], + orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, + ) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: - return max(s.vmax(quantity, orientation) for s in self._subobstructions.values()) + return max( + s.vmax(quantity, orientation) for s in self._subobstructions.values() + ) return np.nan - def __getitem__(self, key: Union[int, str, 'Mesh']): - """Gets either the nth :class:`SubObstruction` or the one with the given mesh-id. - """ + def __getitem__(self, key: Union[int, str, "Mesh"]): + """Gets either the nth :class:`SubObstruction` or the one with the given mesh-id.""" if type(key) == int: return tuple(self._subobstructions.values())[key] @@ -777,5 +984,12 @@ def __eq__(self, other): return self.id == other.id def __repr__(self, *args, **kwargs): - return f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, SubObstructions={len(self._subobstructions)}" + ( - f", Quantities={[q.short_name for q in self.quantities]}" if self.has_boundary_data else "") + ")" + return ( + f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, SubObstructions={len(self._subobstructions)}" + + ( + f", Quantities={[q.short_name for q in self.quantities]}" + if self.has_boundary_data + else "" + ) + + ")" + ) diff --git a/tests/acceptance_tests/test_bndf.py b/tests/acceptance_tests/test_bndf.py index 1513649..8e9e1a9 100644 --- a/tests/acceptance_tests/test_bndf.py +++ b/tests/acceptance_tests/test_bndf.py @@ -1,9 +1,33 @@ +import os from fdsreader import Simulation +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) + def test_bndf(): - sim = Simulation("./bndf_data") + sim = Simulation(os.path.join(TEST_DIR, "../cases/bndf_data")) obst = sim.obstructions.get_nearest(-0.8, 1, 1) face = obst.get_global_boundary_data_arrays("Wall Temperature")[1] assert len(face[-1]) == 68 + + +def test_get_nearest_patch(): + """Test that get_nearest_patch returns correct face based on distance.""" + sim = Simulation(os.path.join(TEST_DIR, "../cases/bndf_data")) + obst = sim.obstructions[0] + + # Get patches for different points around the obstruction + # Bounding box: x=[-1, 2], y=[-1.2, 2.4], z=[-0.1, 0] + patch_x_plus = obst.get_nearest_patch(3, 0.5, 0) + patch_x_minus = obst.get_nearest_patch(-2, 0.5, 0) + patch_y_plus = obst.get_nearest_patch(0.5, 3, 0) + patch_y_minus = obst.get_nearest_patch(0.5, -2, 0) + patch_z_plus = obst.get_nearest_patch(0.5, 0.5, 1) + + # Orientations: 1=X+, 2=Y+, 3=Z+, -1=X-, -2=Y-, -3=Z- + assert patch_x_plus.orientation == 1 + assert patch_x_minus.orientation == -1 + assert patch_y_plus.orientation == 2 + assert patch_y_minus.orientation == -2 + assert patch_z_plus.orientation == 3 From 6b329054e46918004cefca47f5c65bbea7685cb1 Mon Sep 17 00:00:00 2001 From: Mohcine Chraibi Date: Tue, 24 Mar 2026 11:28:27 +0100 Subject: [PATCH 2/4] remove formating. Formating is an issue for another PR --- fdsreader/bndf/obstruction.py | 575 +++++++++------------------- tests/acceptance_tests/test_bndf.py | 2 + 2 files changed, 186 insertions(+), 391 deletions(-) diff --git a/fdsreader/bndf/obstruction.py b/fdsreader/bndf/obstruction.py index a325091..716382b 100644 --- a/fdsreader/bndf/obstruction.py +++ b/fdsreader/bndf/obstruction.py @@ -27,18 +27,8 @@ class Patch: :ivar _n_t: Total number of time steps for which output data has been written. """ - def __init__( - self, - file_path: str, - dimension: Dimension, - extent: Extent, - orientation: int, - cell_centered: bool, - patch_offset: int, - initial_offset: int, - n_t: int, - mesh: "Mesh", - ): + def __init__(self, file_path: str, dimension: Dimension, extent: Extent, orientation: int, cell_centered: bool, + patch_offset: int, initial_offset: int, n_t: int, mesh: 'Mesh'): self.file_path = file_path self.dimension = dimension self.extent = extent @@ -49,7 +39,7 @@ def __init__( self._time_offset = -1 self._n_t = n_t self.mesh = mesh - self._boundary_parent: "Boundary" = None + self._boundary_parent: 'Boundary' = None def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. @@ -63,30 +53,31 @@ def n_t(self, count_duplicates=True) -> int: @property def shape(self) -> Tuple: - """Convenience function to calculate the shape of the array containing data for this patch.""" + """Convenience function to calculate the shape of the array containing data for this patch. + """ return self.dimension.shape(self.cell_centered) @property def size(self) -> int: - """Convenience function to calculate the number of data points in the array for this patch.""" + """Convenience function to calculate the number of data points in the array for this patch. + """ return self.dimension.size(self.cell_centered) def _post_init(self, time_offset: int): - """Fully initialize the patch as soon as the number of timesteps is known.""" + """Fully initialize the patch as soon as the number of timesteps is known. + """ self._time_offset = time_offset - def get_coordinates( - self, ignore_cell_centered: bool = False - ) -> Dict[Literal["x", "y", "z"], np.ndarray]: + def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[Literal['x', 'y', 'z'], np.ndarray]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. - For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. + For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. - :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. + :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ # orientation = ('x', 'y', 'z')[self.orientation - 1] if self.orientation != 0 else '' # coords = {'x': set(), 'y': set(), 'z': set()} - coords: Dict[Literal["x", "y", "z"], np.ndarray] = {} - for dim in ("x", "y", "z"): + coords: Dict[Literal['x', 'y', 'z'], np.ndarray] = {} + for dim in ('x', 'y', 'z'): co = self.mesh.coordinates[dim].copy() # In case the slice is cell-centered, we will shift the coordinates by half a cell # and remove the last coordinate @@ -94,52 +85,39 @@ def get_coordinates( co = co[:-1] co += abs(co[1] - co[0]) / 2 - coords[dim] = co[ - np.where( - np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]) - ) - ] + coords[dim] = co[np.where(np.logical_and(co >= self.extent[dim][0], co <= self.extent[dim][1]))] if coords[dim].size == 0: - coords[dim] = np.array( - [co[np.argmin(np.abs(co - self.extent[dim][0]))]] - ) + coords[dim] = np.array([co[np.argmin(np.abs(co - self.extent[dim][0]))]]) return coords @property def data(self): - """Method to load the quantity data for a single patch.""" + """Method to load the quantity data for a single patch. + """ if not hasattr(self, "_data"): - dtype_data = fdtype.new((("f", self.dimension.size(cell_centered=False)),)) + dtype_data = fdtype.new((('f', self.dimension.size(cell_centered=False)),)) if self._n_t == -1: - self._n_t = ( - os.stat(self.file_path).st_size - self._initial_offset - ) // self._time_offset + self._n_t = (os.stat(self.file_path).st_size - self._initial_offset) // self._time_offset self._data = np.empty((self.n_t(count_duplicates=True),) + self.shape) - with open(self.file_path, "rb") as infile: + with open(self.file_path, 'rb') as infile: for t in range(self.n_t(count_duplicates=True)): - infile.seek( - self._initial_offset - + self._patch_offset - + t * self._time_offset - ) + infile.seek(self._initial_offset + self._patch_offset + t * self._time_offset) data = np.fromfile(infile, dtype_data, 1)[0][1].reshape( - self.dimension.shape(cell_centered=False), order="F" - ) + self.dimension.shape(cell_centered=False), order='F') if self.cell_centered: self._data[t, :] = data[:-1, :-1] else: self._data[t, :] = data - unique_times_indices = np.unique( - self._boundary_parent.times, return_index=True - )[1] + unique_times_indices = np.unique(self._boundary_parent.times, return_index=True)[1] self._data = self._data[unique_times_indices] return self._data def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory.""" + """Remove all data from the internal cache that has been loaded so far to free memory. + """ if hasattr(self, "_data"): del self._data @@ -159,15 +137,8 @@ class Boundary: :ivar n_t: Total number of time steps for which output data has been written. """ - def __init__( - self, - quantity: Quantity, - cell_centered: bool, - times: Sequence[float], - patches: List[Patch], - lower_bounds: np.ndarray, - upper_bounds: np.ndarray, - ): + def __init__(self, quantity: Quantity, cell_centered: bool, times: Sequence[float], patches: List[Patch], + lower_bounds: np.ndarray, upper_bounds: np.ndarray): self.quantity = quantity self.cell_centered = cell_centered self._patches = patches @@ -175,6 +146,7 @@ def __init__( self.lower_bounds = lower_bounds self.upper_bounds = upper_bounds + def n_t(self, count_duplicates=True) -> int: """Get the number of timesteps for which data was output. :param count_duplicates: If true, return the total number of data points, even if there is @@ -185,23 +157,24 @@ def n_t(self, count_duplicates=True) -> int: @property def orientations(self): - """Return all orientations for which there is data available.""" + """Return all orientations for which there is data available. + """ return [p.orientation for p in self._patches] def get_nearest_timestep(self, time: float) -> int: - """Calculates the nearest timestep for which data has been output for this obstruction.""" + """Calculates the nearest timestep for which data has been output for this obstruction. + """ idx = np.searchsorted(self.times, time, side="left") - if time > 0 and ( - idx == len(self.times) - or math.fabs(time - self.times[idx - 1]) < math.fabs(time - self.times[idx]) - ): + if time > 0 and (idx == len(self.times) or math.fabs( + time - self.times[idx - 1]) < math.fabs(time - self.times[idx])): return idx - 1 else: return idx @property def data(self) -> Dict[int, Patch]: - """The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y).""" + """The :class:`Patch` in each direction (-3=-z, -2=-y, -1=-x, 1=x, 2=y, 3=y). + """ return {p.orientation: p for p in self._patches} def vmin(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: @@ -231,7 +204,8 @@ def vmax(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: return float(np.max(self.data[orientation].data)) def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory.""" + """Remove all data from the internal cache that has been loaded so far to free memory. + """ for p in self._patches: p.clear_cache() @@ -249,20 +223,13 @@ class SubObstruction: :ivar show_times: List with points in time from when on the SubObstruction will be shown. """ - def __init__( - self, - side_surfaces: Tuple, - bound_indices: Tuple[int, int, int, int, int, int], - extent: Extent, - mesh: "Mesh", - ): + def __init__(self, side_surfaces: Tuple, bound_indices: Tuple[int, int, int, int, int, int], + extent: Extent, mesh: 'Mesh'): self.extent = extent self.side_surfaces = side_surfaces - self.bound_indices = { - "x": (bound_indices[0], bound_indices[1]), - "y": (bound_indices[2], bound_indices[3]), - "z": (bound_indices[4], bound_indices[5]), - } + self.bound_indices = {'x': (bound_indices[0], bound_indices[1]), + 'y': (bound_indices[2], bound_indices[3]), + 'z': (bound_indices[4], bound_indices[5])} self.mesh = mesh self._boundary_data: Dict[int, Boundary] = dict() @@ -270,27 +237,12 @@ def __init__( self.hide_times = list() self.show_times = list() - def _add_patches( - self, - bid: int, - cell_centered: bool, - quantity: str, - short_name: str, - unit: str, - patches: List[Patch], - times: Sequence[float], - lower_bounds: np.ndarray, - upper_bounds: np.ndarray, - ): + def _add_patches(self, bid: int, cell_centered: bool, quantity: str, short_name: str, unit: str, + patches: List[Patch], times: Sequence[float], lower_bounds: np.ndarray, + upper_bounds: np.ndarray): if bid not in self._boundary_data: - self._boundary_data[bid] = Boundary( - Quantity(quantity, short_name, unit), - cell_centered, - times, - patches, - lower_bounds, - upper_bounds, - ) + self._boundary_data[bid] = Boundary(Quantity(quantity, short_name, unit), cell_centered, times, + patches, lower_bounds, upper_bounds) # Add reference to parent boundary class in patches for patch in patches: patch._boundary_parent = self._boundary_data[bid] @@ -300,39 +252,32 @@ def _add_patches( @property def orientations(self): - """Return all orientations for which there is data available.""" + """Return all orientations for which there is data available. + """ if self.has_boundary_data: return next(iter(self._boundary_data.values())).orientations return [] - def get_coordinates( - self, ignore_cell_centered: bool = False - ) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]: + def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ + int, Dict[Literal['x', 'y', 'z'], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. - For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. + For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. - :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. + :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: - return { - orientation: patch.get_coordinates(ignore_cell_centered) - for orientation, patch in next( - iter(self._boundary_data.values()) - ).data.items() - } + return {orientation: patch.get_coordinates(ignore_cell_centered) for orientation, patch in + next(iter(self._boundary_data.values())).data.items()} return {} - def get_nearest_index( - self, dimension: Literal["x", "y", "z"], orientation: int, value: float - ) -> int: - """Get the nearest mesh coordinate index in a specific dimension for a specific orientation.""" + def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, value: float) -> int: + """Get the nearest mesh coordinate index in a specific dimension for a specific orientation. + """ if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") - if idx > 0 and ( - idx == coords.size - or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx]) - ): + if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs( + value - coords[idx])): return idx - 1 else: return idx @@ -345,12 +290,8 @@ def has_boundary_data(self): def get_data(self, quantity: Union[str, Quantity]): if type(quantity) == Quantity: quantity = quantity.name - return next( - b - for b in self._boundary_data.values() - if b.quantity.name.lower() == quantity.lower() - or b.quantity.short_name.lower() == quantity.lower() - ) + return next(b for b in self._boundary_data.values() if + b.quantity.name.lower() == quantity.lower() or b.quantity.short_name.lower() == quantity.lower()) def __getitem__(self, item): if type(item) == int: @@ -372,20 +313,20 @@ def n_t(self, count_duplicates=True) -> int: simulation in between the simulation run. """ if self.has_boundary_data: - return next(iter(self._boundary_data.values())).n_t( - count_duplicates=count_duplicates - ) + return next(iter(self._boundary_data.values())).n_t(count_duplicates=count_duplicates) return 0 @property def times(self): - """Return all timesteps for which boundary data is available, if any.""" + """Return all timesteps for which boundary data is available, if any. + """ if self.has_boundary_data: return next(iter(self._boundary_data.values())).times return np.array([]) def get_visible_times(self, times: Sequence[float]) -> np.ndarray: - """Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden.""" + """Returns a ndarray filtering all time steps when the SubObstruction is visible/not hidden. + """ ret = list() hidden = False for time in times: @@ -397,11 +338,7 @@ def get_visible_times(self, times: Sequence[float]) -> np.ndarray: ret.append(time) return np.array(ret) - def vmin( - self, - quantity: Union[str, Quantity], - orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, - ) -> float: + def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. @@ -410,11 +347,7 @@ def vmin( return self.get_data(quantity).vmin(orientation) return np.nan - def vmax( - self, - quantity: Union[str, Quantity], - orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, - ) -> float: + def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. @@ -424,7 +357,8 @@ def vmax( return np.nan def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory.""" + """Remove all data from the internal cache that has been loaded so far to free memory. + """ for bndf in self._boundary_data.values(): bndf.clear_cache() @@ -452,14 +386,8 @@ class Obstruction: (ranging from 0.0 to 1.0). """ - def __init__( - self, - oid: str, - color_index: int, - block_type: int, - texture_origin: Tuple[float, float, float], - rgba: Union[Tuple[()], Tuple[float, float, float, float]] = (), - ): + def __init__(self, oid: str, color_index: int, block_type: int, texture_origin: Tuple[float, float, float], + rgba: Union[Tuple[()], Tuple[float, float, float, float]] = ()): self.id = oid self.color_index = color_index self.block_type = block_type @@ -471,21 +399,18 @@ def __init__( @property def bounding_box(self) -> Extent: - """:class:`Extent` object representing the bounding box around the Obstruction.""" + """:class:`Extent` object representing the bounding box around the Obstruction. + """ extents = [sub.extent for sub in self._subobstructions.values()] - return Extent( - min(extents, key=lambda e: e.x_start).x_start, - max(extents, key=lambda e: e.x_end).x_end, - min(extents, key=lambda e: e.y_start).y_start, - max(extents, key=lambda e: e.y_end).y_end, - min(extents, key=lambda e: e.z_start).z_start, - max(extents, key=lambda e: e.z_end).z_end, - ) + return Extent(min(extents, key=lambda e: e.x_start).x_start, max(extents, key=lambda e: e.x_end).x_end, + min(extents, key=lambda e: e.y_start).y_start, max(extents, key=lambda e: e.y_end).y_end, + min(extents, key=lambda e: e.z_start).z_start, max(extents, key=lambda e: e.z_end).z_end) @property def orientations(self): - """Return all orientations for which there is data available.""" + """Return all orientations for which there is data available. + """ if self.has_boundary_data: orientations = set() for subobst in self._subobstructions.values(): @@ -506,37 +431,36 @@ def n_t(self, count_duplicates=True) -> int: @property def times(self): - """Return all timesteps for which boundary data is available, if any.""" + """Return all timesteps for which boundary data is available, if any. + """ for subobst in self._subobstructions.values(): if subobst.has_boundary_data: return subobst.times return np.array([]) def get_visible_times(self, times: Sequence[float]): - """Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden.""" + """Returns an ndarray filtering all time steps when theSubObstruction is visible/not hidden. + """ for subobst in self._subobstructions.values(): return subobst.get_visible_times(times) return np.array([]) - def get_coordinates( - self, ignore_cell_centered: bool = False - ) -> Dict[int, Dict[Literal["x", "y", "z"], np.ndarray]]: + def get_coordinates(self, ignore_cell_centered: bool = False) -> Dict[ + int, Dict[Literal['x', 'y', 'z'], np.ndarray]]: """Returns a dictionary containing a numpy ndarray with coordinates for each dimension. - For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. + For cell-centered boundary data, the coordinates can be adjusted to represent cell-centered coordinates. - :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. + :param ignore_cell_centered: Whether to shift the coordinates when the bndf is cell_centered or not. """ if self.has_boundary_data: all_coords = dict() for orientation_int in self.orientations: - orientation = ("x", "y", "z")[abs(orientation_int) - 1] - coords = {"x": set(), "y": set(), "z": set()} - for dim in ("x", "y", "z"): + orientation = ('x', 'y', 'z')[abs(orientation_int) - 1] + coords = {'x': set(), 'y': set(), 'z': set()} + for dim in ('x', 'y', 'z'): if orientation == dim: bounding_box_index = 0 if orientation_int < 0 else 1 - coords[dim] = np.array( - [self.bounding_box[dim][bounding_box_index]] - ) + coords[dim] = np.array([self.bounding_box[dim][bounding_box_index]]) continue min_delta = math.inf for subobst in self._subobstructions.values(): @@ -561,19 +485,12 @@ def get_coordinates( for subobst in self._subobstructions.values(): mesh = subobst.mesh mesh_coords = mesh.coordinates[dim] - idx = np.searchsorted( - mesh_coords, single_coordinate, side="left" - ) - if idx > 0 and ( - idx == mesh_coords.size - or math.fabs(single_coordinate - mesh_coords[idx - 1]) - < math.fabs(single_coordinate - mesh_coords[idx]) - ): + idx = np.searchsorted(mesh_coords, single_coordinate, side="left") + if idx > 0 and (idx == mesh_coords.size or math.fabs( + single_coordinate - mesh_coords[idx - 1]) < math.fabs( + single_coordinate - mesh_coords[idx])): idx = idx + 1 - if ( - mesh_coords[idx] - single_coordinate - < nearest_coordinate - single_coordinate - ): + if mesh_coords[idx] - single_coordinate < nearest_coordinate - single_coordinate: nearest_coordinate = mesh_coords[idx] coords[dim] = np.array([nearest_coordinate]) all_coords[orientation_int] = coords @@ -581,17 +498,14 @@ def get_coordinates( return all_coords return dict() - def get_nearest_index( - self, dimension: Literal["x", "y", "z"], orientation: int, value: float - ) -> int: - """Get the nearest mesh coordinate index in a specific dimension for a specific orientation.""" + def get_nearest_index(self, dimension: Literal['x', 'y', 'z'], orientation: int, value: float) -> int: + """Get the nearest mesh coordinate index in a specific dimension for a specific orientation. + """ if self.has_boundary_data: coords = self.get_coordinates()[orientation][dimension] idx = np.searchsorted(coords, value, side="left") - if idx > 0 and ( - idx == coords.size - or math.fabs(value - coords[idx - 1]) < math.fabs(value - coords[idx]) - ): + if idx > 0 and (idx == coords.size or math.fabs(value - coords[idx - 1]) < math.fabs( + value - coords[idx])): return idx - 1 else: return idx @@ -599,7 +513,8 @@ def get_nearest_index( @property def quantities(self) -> List[Quantity]: - """Get a list of all quantities for which boundary data exists.""" + """Get a list of all quantities for which boundary data exists. + """ if self.has_boundary_data: qs = set() for subobst in self._subobstructions.values(): @@ -609,29 +524,22 @@ def quantities(self) -> List[Quantity]: return [] @property - def meshes(self) -> List["Mesh"]: - """Returns a list of all meshes this slice cuts through.""" + def meshes(self) -> List['Mesh']: + """Returns a list of all meshes this slice cuts through. + """ return [subobst.mesh for subobst in self._subobstructions.values()] - def filter_by_orientation( - self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0 - ) -> List[SubObstruction]: + def filter_by_orientation(self, orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> List[SubObstruction]: """Filter all SubObstructions by a specific orientation. All returned SubObstructions will contain boundary data - in the specified orientation. + in the specified orientation. """ if self.has_boundary_data: - return [ - subobst - for subobst in self._subobstructions.values() - if orientation in subobst.orientations - ] + return [subobst for subobst in self._subobstructions.values() if + orientation in subobst.orientations] return [] - def get_boundary_data( - self, - quantity: Union[Quantity, str], - orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, - ) -> Dict[str, Boundary]: + def get_boundary_data(self, quantity: Union[Quantity, str], + orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> Dict[str, Boundary]: """Gets the boundary data for a specific quantity of all SubObstructions. :param quantity: The quantity to filter by. @@ -641,26 +549,20 @@ def get_boundary_data( if type(quantity) == Quantity: quantity = quantity.name - ret = { - subobst.mesh.id: subobst.get_data(quantity) - for subobst in self._subobstructions.values() - if subobst.has_boundary_data - } + ret = {subobst.mesh.id: subobst.get_data(quantity) for subobst in self._subobstructions.values() if + subobst.has_boundary_data} if orientation == 0: return ret - return { - mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys() - } + return {mesh: bndf for mesh, bndf in ret.items() if orientation in bndf.data.keys()} def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int: - """Calculates the nearest timestep for which data has been output for this obstruction.""" + """Calculates the nearest timestep for which data has been output for this obstruction. + """ if self.has_boundary_data: times = self.get_visible_times(self.times) if visible_only else self.times idx = np.searchsorted(times, time, side="left") - if time > 0 and ( - idx == len(times) - or math.fabs(time - times[idx - 1]) < math.fabs(time - times[idx]) - ): + if time > 0 and (idx == len(times) or math.fabs( + time - times[idx - 1]) < math.fabs(time - times[idx])): return idx - 1 else: return idx @@ -668,7 +570,7 @@ def get_nearest_timestep(self, time: float, visible_only: bool = False) -> int: def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): """Gets the patch of the :class:`SubObstruction` that has the least distance to the given point. - If there are multiple patches with the same distance, a random one will be selected. + If there are multiple patches with the same distance, a random one will be selected. """ if self.has_boundary_data: d_min = np.finfo(float).max @@ -676,21 +578,9 @@ def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): for subobst in self._subobstructions.values(): for patch in subobst.get_data(self.quantities[0])._patches: - dx = ( - max(patch.extent.x_start - x, 0, x - patch.extent.x_end) - if x is not None - else 0 - ) - dy = ( - max(patch.extent.y_start - y, 0, y - patch.extent.y_end) - if y is not None - else 0 - ) - dz = ( - max(patch.extent.z_start - z, 0, z - patch.extent.z_end) - if z is not None - else 0 - ) + dx = max(patch.extent.x_start - x, 0, x - patch.extent.x_end) if x is not None else 0 + dy = max(patch.extent.y_start - y, 0, y - patch.extent.y_end) if y is not None else 0 + dz = max(patch.extent.z_start - z, 0, z - patch.extent.z_end) if z is not None else 0 d = np.sqrt(dx * dx + dy * dy + dz * dz) if d < d_min: d_min = d @@ -716,38 +606,29 @@ def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): @property def has_boundary_data(self): - """Whether boundary data has been output in the simulation.""" - return any( - subobst.has_boundary_data for subobst in self._subobstructions.values() - ) - - def get_global_boundary_data_arrays( - self, quantity: Union[str, Quantity] - ) -> Dict[int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: + """Whether boundary data has been output in the simulation. + """ + return any(subobst.has_boundary_data for subobst in self._subobstructions.values()) + + def get_global_boundary_data_arrays(self, quantity: Union[str, Quantity]) -> Dict[ + int, Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: """Creates a global numpy ndarray from all subobstruction's boundary data for each orientation. - :param quantity: The quantity's name or short name for which to load the global arrays. + :param quantity: The quantity's name or short name for which to load the global arrays. """ if not self.has_boundary_data: return dict() for subobst in self._subobstructions.values(): if subobst.has_boundary_data: - cell_centered = next( - iter(subobst._boundary_data.values()) - ).cell_centered + cell_centered = next(iter(subobst._boundary_data.values())).cell_centered result = dict() for orientation_int in self.orientations: subobst_sets = [list(), list()] - dim = ["x", "y", "z"][abs(orientation_int) - 1] + dim = ['x', 'y', 'z'][abs(orientation_int) - 1] random_subobst = next( - subobst - for subobst in self._subobstructions.values() - if orientation_int in subobst.get_coordinates() - ) - base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[ - orientation_int - ][dim][0] + subobst for subobst in self._subobstructions.values() if orientation_int in subobst.get_coordinates()) + base_coord = random_subobst.get_coordinates(ignore_cell_centered=False)[orientation_int][dim][0] for subobst in self._subobstructions.values(): if not subobst.has_boundary_data: @@ -764,43 +645,28 @@ def get_global_boundary_data_arrays( first_result_grid = None for subobsts in subobst_sets: - coord_min = {"x": math.inf, "y": math.inf, "z": math.inf} - coord_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf} - for dim in ("x", "y", "z"): + coord_min = {'x': math.inf, 'y': math.inf, 'z': math.inf} + coord_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} + for dim in ('x', 'y', 'z'): for subobst in subobsts: - patch_extent = ( - subobst.get_data(quantity).data[orientation_int].extent - ) - co = subobst.get_coordinates(ignore_cell_centered=True)[ - orientation_int - ][dim] - co = co[ - np.where( - np.logical_and( - co >= patch_extent[dim][0], - co <= patch_extent[dim][1], - ) - ) - ] + patch_extent = subobst.get_data(quantity).data[orientation_int].extent + co = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int][dim] + co = co[np.where(np.logical_and(co >= patch_extent[dim][0], co <= patch_extent[dim][1]))] coord_min[dim] = min(co[0], coord_min[dim]) coord_max[dim] = max(co[-1], coord_max[dim]) # The global grid will use the finest mesh as base and duplicate values of the coarser meshes # Therefore we first find the finest mesh and calculate the step size in each dimension - step_sizes_min = { - "x": coord_max["x"] - coord_min["x"], - "y": coord_max["y"] - coord_min["y"], - "z": coord_max["z"] - coord_min["z"], - } - step_sizes_max = {"x": 0, "y": 0, "z": 0} + step_sizes_min = {'x': coord_max['x'] - coord_min['x'], + 'y': coord_max['y'] - coord_min['y'], + 'z': coord_max['z'] - coord_min['z']} + step_sizes_max = {'x': 0, 'y': 0, 'z': 0} steps = dict() - global_max = {"x": -math.inf, "y": -math.inf, "z": -math.inf} + global_max = {'x': -math.inf, 'y': -math.inf, 'z': -math.inf} - for dim in ("x", "y", "z"): + for dim in ('x', 'y', 'z'): for subobst in subobsts: - subobst_coords = subobst.get_coordinates( - ignore_cell_centered=True - )[orientation_int] + subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] if len(subobst_coords[dim]) <= 1: step_size = 0 else: @@ -809,74 +675,36 @@ def get_global_boundary_data_arrays( step_sizes_max[dim] = max(step_size, step_sizes_max[dim]) global_max[dim] = max(subobst_coords[dim][-1], global_max[dim]) - for dim in ("x", "y", "z"): + for dim in ('x', 'y', 'z'): if step_sizes_min[dim] == 0: step_sizes_min[dim] = math.inf steps[dim] = 1 else: - steps[dim] = max( - int( - round( - (coord_max[dim] - coord_min[dim]) - / step_sizes_min[dim] - ) - ), - 1, - ) + (0 if cell_centered else 1) - - grid = np.full( - ( - self.n_t(count_duplicates=False), - steps["x"], - steps["y"], - steps["z"], - ), - np.nan, - ) - - start_coordinates = { - "x": coord_min["x"], - "y": coord_min["y"], - "z": coord_min["z"], - } + steps[dim] = max(int(round((coord_max[dim] - coord_min[dim]) / step_sizes_min[dim])), 1) + ( + 0 if cell_centered else 1) + + grid = np.full((self.n_t(count_duplicates=False), steps['x'], steps['y'], steps['z']), np.nan) + + start_coordinates = {'x': coord_min['x'], 'y': coord_min['y'], 'z': coord_min['z']} start_idx = dict() end_idx = dict() for subobst in subobsts: - patch_data = np.expand_dims( - subobst.get_data(quantity).data[orientation_int].data, - axis=abs(orientation_int), - ) - subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[ - orientation_int - ] + patch_data = np.expand_dims(subobst.get_data(quantity).data[orientation_int].data, + axis=abs(orientation_int)) + subobst_coords = subobst.get_coordinates(ignore_cell_centered=True)[orientation_int] for axis in (0, 1, 2): - dim = ("x", "y", "z")[axis] + dim = ('x', 'y', 'z')[axis] if axis == abs(orientation_int) - 1: start_idx[dim] = 0 end_idx[dim] = 1 continue n_repeat = max( - int( - round( - (subobst_coords[dim][1] - subobst_coords[dim][0]) - / step_sizes_min[dim] - ) - ), - 1, - ) + int(round((subobst_coords[dim][1] - subobst_coords[dim][0]) / step_sizes_min[dim])), 1) start_idx[dim] = int( - round( - (subobst_coords[dim][0] - start_coordinates[dim]) - / step_sizes_min[dim] - ) - ) + round((subobst_coords[dim][0] - start_coordinates[dim]) / step_sizes_min[dim])) end_idx[dim] = int( - round( - (subobst_coords[dim][-1] - start_coordinates[dim]) - / step_sizes_min[dim] - ) - ) + round((subobst_coords[dim][-1] - start_coordinates[dim]) / step_sizes_min[dim])) # We ignore border points unless they are actually on the border of the simulation space as all # other border points actually appear twice, as the subobstructions overlap. This only @@ -885,45 +713,27 @@ def get_global_boundary_data_arrays( if axis != abs(orientation_int) - 1: reduced_shape = list(patch_data.shape) reduced_shape[axis + 1] -= 1 - reduced_data_slices = tuple( - slice(s) for s in reduced_shape - ) + reduced_data_slices = tuple(slice(s) for s in reduced_shape) patch_data = patch_data[reduced_data_slices] # Temporarily save border points to add them back to the array again later if subobst_coords[dim][-1] == global_max[dim]: end_idx[dim] += 1 temp_data_slices = [slice(s) for s in patch_data.shape] - temp_data_slices[axis + 1] = slice( - patch_data.shape[axis + 1] - 1, None - ) + temp_data_slices[axis + 1] = slice(patch_data.shape[axis + 1] - 1, None) temp_data = patch_data[tuple(temp_data_slices)] if n_repeat > 1: patch_data = np.repeat(patch_data, n_repeat, axis=axis + 1) # Add border points back again if needed - if ( - not cell_centered - and subobst_coords[dim][-1] == global_max[dim] - ): - patch_data = np.concatenate( - (patch_data, temp_data), axis=axis + 1 - ) - - grid[ - :, - start_idx["x"] : end_idx["x"], - start_idx["y"] : end_idx["y"], - start_idx["z"] : end_idx["z"], - ] = patch_data.reshape( - ( - self.n_t(count_duplicates=False), - end_idx["x"] - start_idx["x"], - end_idx["y"] - start_idx["y"], - end_idx["z"] - start_idx["z"], - ) - ) + if not cell_centered and subobst_coords[dim][-1] == global_max[dim]: + patch_data = np.concatenate((patch_data, temp_data), axis=axis + 1) + + grid[:, start_idx['x']: end_idx['x'], start_idx['y']: end_idx['y'], + start_idx['z']: end_idx['z']] = patch_data.reshape( + (self.n_t(count_duplicates=False), end_idx['x'] - start_idx['x'], end_idx['y'] - start_idx['y'], + end_idx['z'] - start_idx['z'])) # Remove empty dimensions, but make sure to note remove the time dimension if there is only a single timestep grid = np.squeeze(grid) @@ -936,43 +746,33 @@ def get_global_boundary_data_arrays( return result def clear_cache(self): - """Remove all data from the internal cache that has been loaded so far to free memory.""" + """Remove all data from the internal cache that has been loaded so far to free memory. + """ for s in self._subobstructions.values(): s.clear_cache() - def vmin( - self, - quantity: Union[str, Quantity], - orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, - ) -> float: + def vmin(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Minimum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: - return min( - s.vmin(quantity, orientation) for s in self._subobstructions.values() - ) + return min(s.vmin(quantity, orientation) for s in self._subobstructions.values()) else: return np.nan - def vmax( - self, - quantity: Union[str, Quantity], - orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0, - ) -> float: + def vmax(self, quantity: Union[str, Quantity], orientation: Literal[-3, -2, -1, 0, 1, 2, 3] = 0) -> float: """Maximum value of all patches at any time for a specific quantity. :param orientation: Optionally filter by patches with a specific orientation. """ if self.has_boundary_data: - return max( - s.vmax(quantity, orientation) for s in self._subobstructions.values() - ) + return max(s.vmax(quantity, orientation) for s in self._subobstructions.values()) return np.nan - def __getitem__(self, key: Union[int, str, "Mesh"]): - """Gets either the nth :class:`SubObstruction` or the one with the given mesh-id.""" + def __getitem__(self, key: Union[int, str, 'Mesh']): + """Gets either the nth :class:`SubObstruction` or the one with the given mesh-id. + """ if type(key) == int: return tuple(self._subobstructions.values())[key] @@ -984,12 +784,5 @@ def __eq__(self, other): return self.id == other.id def __repr__(self, *args, **kwargs): - return ( - f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, SubObstructions={len(self._subobstructions)}" - + ( - f", Quantities={[q.short_name for q in self.quantities]}" - if self.has_boundary_data - else "" - ) - + ")" - ) + return f"Obstruction(id={self.id}, Bounding-Box={self.bounding_box}, SubObstructions={len(self._subobstructions)}" + ( + f", Quantities={[q.short_name for q in self.quantities]}" if self.has_boundary_data else "") + ")" diff --git a/tests/acceptance_tests/test_bndf.py b/tests/acceptance_tests/test_bndf.py index 8e9e1a9..3efa406 100644 --- a/tests/acceptance_tests/test_bndf.py +++ b/tests/acceptance_tests/test_bndf.py @@ -31,3 +31,5 @@ def test_get_nearest_patch(): assert patch_y_plus.orientation == 2 assert patch_y_minus.orientation == -2 assert patch_z_plus.orientation == 3 + + # Note: no -3 (z-minus) patch exists in this test data, so -z is not tested From 884bd7430fc27a3d67f7ab86a321c19d4be17a2d Mon Sep 17 00:00:00 2001 From: Mohcine Chraibi Date: Tue, 24 Mar 2026 13:06:33 +0100 Subject: [PATCH 3/4] Update fdsreader/bndf/obstruction.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- fdsreader/bndf/obstruction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdsreader/bndf/obstruction.py b/fdsreader/bndf/obstruction.py index 716382b..1082df8 100644 --- a/fdsreader/bndf/obstruction.py +++ b/fdsreader/bndf/obstruction.py @@ -585,7 +585,7 @@ def get_nearest_patch(self, x: float = None, y: float = None, z: float = None): if d < d_min: d_min = d patches_with_dist = [(patch, dx, dy, dz)] - elif d == d_min: + elif math.isclose(d, d_min, rel_tol=1e-9, abs_tol=0.0): patches_with_dist.append((patch, dx, dy, dz)) patches_min = [p[0] for p in patches_with_dist] From 6797f89dcd23b92adb81f1ca96ffac94c0b12d4d Mon Sep 17 00:00:00 2001 From: Mohcine Chraibi Date: Tue, 24 Mar 2026 13:07:12 +0100 Subject: [PATCH 4/4] Update tests/acceptance_tests/test_bndf.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/acceptance_tests/test_bndf.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/acceptance_tests/test_bndf.py b/tests/acceptance_tests/test_bndf.py index 3efa406..1dabfdb 100644 --- a/tests/acceptance_tests/test_bndf.py +++ b/tests/acceptance_tests/test_bndf.py @@ -19,10 +19,11 @@ def test_get_nearest_patch(): # Get patches for different points around the obstruction # Bounding box: x=[-1, 2], y=[-1.2, 2.4], z=[-0.1, 0] - patch_x_plus = obst.get_nearest_patch(3, 0.5, 0) - patch_x_minus = obst.get_nearest_patch(-2, 0.5, 0) - patch_y_plus = obst.get_nearest_patch(0.5, 3, 0) - patch_y_minus = obst.get_nearest_patch(0.5, -2, 0) + # Use z slightly inside the obstruction for side faces to avoid edge/face ties. + patch_x_plus = obst.get_nearest_patch(3, 0.5, -0.05) + patch_x_minus = obst.get_nearest_patch(-2, 0.5, -0.05) + patch_y_plus = obst.get_nearest_patch(0.5, 3, -0.05) + patch_y_minus = obst.get_nearest_patch(0.5, -2, -0.05) patch_z_plus = obst.get_nearest_patch(0.5, 0.5, 1) # Orientations: 1=X+, 2=Y+, 3=Z+, -1=X-, -2=Y-, -3=Z-