diff --git a/docs/source/user_guide/benchmarks/physicality.rst b/docs/source/user_guide/benchmarks/physicality.rst index 0d5b55d25..d28e04acf 100644 --- a/docs/source/user_guide/benchmarks/physicality.rst +++ b/docs/source/user_guide/benchmarks/physicality.rst @@ -135,3 +135,64 @@ Data availability ----------------- None required; diatomics are generated in ASE. + + +Compression +========= + +Summary +------- +This benchmark is a many-body analogue to the diatomics benchmark. +Single element crystal structures are generated using common prototypes, namely (sc, bcc, fcc, hcp and diamond). +Further structures are randomly generated, subject to minimum distance constraints, using pxtal. +The final structures are then isotropically scaled such that min(d_ij/(r_i + r_j)) = 1.0, +where d_ij is the distance between atoms i and j, and r_i and r_j are the covalent radii of the respective atoms. +A linear grid of scale factors containing 100 points is generated from 0.25 to 3.0. +The structures are then scaled (scale factor applied to all cell vectors) and the energy and pressure are calculated. +The resulting energies and projected forces are analysed for unphysical oscillations and "holes". +All structures were generated by running ml_peg/calcs/physicality/compression/calc_compression.py directly. + +Metrics +------- + +1. Pressure sign flips and Big Pressure sign flips + + Analogous to force flips in the diatomics benchmark, this metric counts the number of times the isotropic pressure changes sign. + Pressure sign flips uses a tolerance of 0.001 GPa whilst Big Pressure sign flips uses a tolerance of 1 GPa, to avoid counting noise-induced flips. + Whilst it is plausible that some, very contrived, configurations should show multiple sign flips in the pressure curve almost all materials should show 1. + + +2. Energy minima and Deep Energy minima + + Mean count of distinct minima in the energy-scale factor profile. Local minima are + found from the second derivative using tolerance of 1meV/atom and 100 meV/atom for Energy minima and Deep Energy minima respectively. + + +3. Holes + + A "hole" is defined as a point on the energy-scale factor curve where the energy is less than the energy at the equilibrium scale factor by more than 100 eV/atom. + This is a crude and "cautious" definition, with the idea of flagging catestrophic model failures only. + +4. :math:`\rho(-E, V_{\text{small}})` + + Spearman correlation between scale factor and minus energy for :math:`scale < scale(E_{\min})` (i.e. the compressed side of the curve). + A perfect curve should show a strong positive correlation, so a value of +1, indicating that as the structure is compressed, the energy increases. + As above, this assumes a single minimum in the energy-scale factor curve, which is the case for almost all materials. + This metric will be polluted in models containing holes. + +5. :math:`\rho(E, V_{\text{large}})` + + Spearman correlation between scale factor and energy for :math:`scale > scale(E_{\min})` (i.e. the expanded side of the curve). + A perfect curve should show a strong positive correlation, so a value of +1, indicating that as the structure is expanded, the energy increases. + As above, this assumes a single minimum in the energy-scale factor curve, which is the case for almost all materials. + This metric will be polluted in models containing holes. + +Computational cost +------------------ + +High: Expected to take hours to run on GPU, or around one day for slower MLIPs. + +Data availability +----------------- + +None required; prototype structures are generated in ASE and random structures are generated using pxtal. diff --git a/ml_peg/analysis/physicality/compression/analyse_compression.py b/ml_peg/analysis/physicality/compression/analyse_compression.py new file mode 100644 index 000000000..0f8d26efa --- /dev/null +++ b/ml_peg/analysis/physicality/compression/analyse_compression.py @@ -0,0 +1,686 @@ +"""Analyse compression benchmark.""" + +from __future__ import annotations + +from collections import defaultdict +import json +from pathlib import Path + +from ase.formula import Formula +import numpy as np +import pandas as pd +import plotly.graph_objects as go +import pytest +from scipy.signal import find_peaks + +from ml_peg.analysis.utils.decorators import build_table +from ml_peg.analysis.utils.utils import load_metrics_config +from ml_peg.app import APP_ROOT +from ml_peg.calcs import CALCS_ROOT +from ml_peg.models.get_models import get_model_names +from ml_peg.models.models import current_models + +MODELS = get_model_names(current_models) +CALC_PATH = CALCS_ROOT / "physicality" / "compression" / "outputs" +OUT_PATH = APP_ROOT / "data" / "physicality" / "compression" +CURVE_PATH = OUT_PATH / "curves" +FIGURE_PATH = OUT_PATH / "figures" + +# Palette for overlaid structure curves +_PALETTE = [ + "royalblue", + "firebrick", + "seagreen", + "darkorange", + "mediumpurple", + "deeppink", + "teal", + "goldenrod", + "slategray", + "crimson", +] + +# Symlog scaling parameters +LINEAR_FRAC: float = 0.6 +LINTHRESH_ENERGY: float = 10.0 # eV/atom +SYMLOG_DECADES: int = 5 +DEFAULT_WEIGHTS = { + "Holes": 1.0, + "Energy minima": 0.1, + "Deep Energy minima": 1.0, + "Energy inflections": 0.1, + "Big Pressure sign flips": 1.0, + "Pressure sign flips": 0.1, + "ρ(-E,Vsmall)": 1.0, + "ρ(E,Vlarge)": 0.1, +} + + +def _symlog( + values: list[float] | np.ndarray, + linthresh: float = LINTHRESH_ENERGY, + linear_frac: float = LINEAR_FRAC, + decades: int = SYMLOG_DECADES, +) -> list[float]: + """ + Apply a symmetric-log transform to *values*. + + Linear within ``[-linthresh, linthresh]``, logarithmic outside. + The linear region is scaled so that it occupies *linear_frac* of the + total axis range (assuming *decades* log decades on each side). + + Parameters + ---------- + values + Raw data values. + linthresh + Linear threshold. + linear_frac + Fraction of the full axis height reserved for the linear region. + decades + Number of log decades shown on each side. + + Returns + ------- + list[float] + Transformed values. + """ + lin_half = linear_frac / 2.0 + log_scale = (1.0 - linear_frac) / (2.0 * max(decades, 1)) + + arr = np.asarray(values, dtype=float) + out = np.where( + np.abs(arr) <= linthresh, + lin_half * arr / linthresh, + np.sign(arr) * (lin_half + log_scale * np.log10(np.abs(arr) / linthresh)), + ) + return out.tolist() + + +def _symlog_ticks( + linthresh: float = LINTHRESH_ENERGY, + decades: int = SYMLOG_DECADES, + linear_frac: float = LINEAR_FRAC, +) -> tuple[list[float], list[str]]: + """ + Generate tick positions and labels for a symlog axis. + + Parameters + ---------- + linthresh + Linear threshold matching ``_symlog``. + decades + Number of decades to show on each side of zero. + linear_frac + Must match the value passed to ``_symlog``. + + Returns + ------- + tuple[list[float], list[str]] + Tick values (in transformed space) and their display labels. + """ + ticks_val: list[float] = [] + ticks_txt: list[str] = [] + + kw = {"linthresh": linthresh, "linear_frac": linear_frac, "decades": decades} + + # Negative decades + for d in range(decades, 0, -1): + raw = -(linthresh * 10**d) + ticks_val.append(_symlog([raw], **kw)[0]) + ticks_txt.append(f"{raw:.0e}") + # Linear region + for v in [-linthresh, -linthresh / 2, 0, linthresh / 2, linthresh]: + ticks_val.append(_symlog([v], **kw)[0]) + ticks_txt.append(f"{v:g}") + # Positive decades + for d in range(1, decades + 1): + raw = linthresh * 10**d + ticks_val.append(_symlog([raw], **kw)[0]) + ticks_txt.append(f"{raw:.0e}") + + return ticks_val, ticks_txt + + +METRICS_CONFIG_PATH = Path(__file__).with_name("metrics.yml") +DEFAULT_THRESHOLDS, DEFAULT_TOOLTIPS, _ = load_metrics_config(METRICS_CONFIG_PATH) + + +def load_model_data(model_name: str) -> pd.DataFrame: + """ + Load compression curve data for a model. + + Parameters + ---------- + model_name + Model identifier. + + Returns + ------- + pd.DataFrame + Dataframe containing structure, scale, volume, energy, and pressure columns. + """ + csv_path = CALC_PATH / model_name / "compression.csv" + if not csv_path.exists(): + return pd.DataFrame() + return pd.read_csv(csv_path) + + +def prepare_structure_series( + struct_dataframe: pd.DataFrame, +) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """ + Sort and align energy/pressure series for a crystal structure. + + Parameters + ---------- + struct_dataframe + Structure-specific dataframe. + + Returns + ------- + tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] + Volume per atom, shifted energy per atom, pressure, and scale factors + sorted by increasing volume. + """ + df_sorted = struct_dataframe.sort_values("volume_per_atom").drop_duplicates( + "volume_per_atom" + ) + if len(df_sorted) < 3: + return np.array([]), np.array([]), np.array([]), np.array([]) + + volumes = df_sorted["volume_per_atom"].to_numpy() + energies = df_sorted["energy_per_atom"].to_numpy() + pressures = df_sorted["pressure"].to_numpy() + scales = df_sorted["scale"].to_numpy() + + # Shift energies so the equilibrium value (scale closest to 1.0) is zero + # eq_idx = int(np.argmin(np.abs(scales - 1.0))) + # shifted_energies = energies - energies[eq_idx] + shifted_energies = ( + energies - energies[-1] + ) # Shift by the last energy value (largest volume) + + return volumes, shifted_energies, pressures, scales + + +def count_sign_changes(array: np.ndarray, tol: float) -> int: + """ + Count sign changes in a sequence while ignoring small magnitudes. + + Parameters + ---------- + array + Input values. + tol + Absolute tolerance below which values are treated as zero. + + Returns + ------- + int + Number of sign changes exceeding the specified tolerance. + """ + if array.size < 3: + return 0 + clipped = array[np.abs(array) > tol] + if clipped.size < 2: + return 0 + signs = np.sign(clipped) + sign_flips = signs[:-1] != signs[1:] + return int(np.sum(sign_flips)) + + +def compute_structure_metrics( + df_struct: pd.DataFrame, +) -> dict[str, float] | None: + """ + Compute diagnostics for a single crystal structure compression curve. + + Parameters + ---------- + df_struct + Structure-specific dataframe. + + Returns + ------- + dict[str, float] | None + Dictionary of metrics, or None if insufficient data. + """ + volumes, shifted_energies, pressures, scales = prepare_structure_series(df_struct) + if volumes.size < 3: + return None + + # Energy minima: find peaks in inverted energy + minima = 0 + if shifted_energies.size >= 3: + minima_indices, _ = find_peaks( + -shifted_energies, + prominence=0.001, + width=1, + ) + minima = len(minima_indices) + + minima_indices, _ = find_peaks( + -shifted_energies, + prominence=0.1, + width=1, + ) + deep_minima = len(minima_indices) + + # Is there a hole present or not + # Relative to the largest volume energy + relative_energies = shifted_energies - shifted_energies[-1] + holes = 0 + # If any energy is 100 eV/atom lower than the largest volume + # energy, we safely consider it a hole + if np.any(relative_energies < -100): + holes = 1 + + # Pressure sign flips + pressure_flips = count_sign_changes(pressures, tol=1e-3) + big_pressure_flips = count_sign_changes(pressures, tol=1) + + # Spearman correlations in compressed and expanded regimes + spearman_compression = np.nan + spearman_expansion = np.nan + + try: + from scipy import stats + + # NOTE: this isn't perfect, minimum will be in the + # wrong place if holes are present. + # BUT for models without holes, this is another way + # of finding spurious minima. + eq_idx = np.argmin(shifted_energies) + + # Compressed regime + if volumes[:eq_idx].size > 2: + spearman_compression = float( + stats.spearmanr( + volumes[: eq_idx + 1], shifted_energies[: eq_idx + 1] + ).statistic + ) + # Expanded regime + if volumes[eq_idx:].size > 2: + spearman_expansion = float( + stats.spearmanr(volumes[eq_idx:], shifted_energies[eq_idx:]).statistic + ) + except Exception: + pass + + return { + "Holes": float(holes), + "Energy minima": float(minima), + "Deep Energy minima": float(deep_minima), + # "Energy inflections": float(inflections), + "Big Pressure sign flips": float(big_pressure_flips), + "Pressure sign flips": float(pressure_flips), + "ρ(-E,Vsmall)": -float(spearman_compression), + "ρ(E,Vlarge)": float(spearman_expansion), + } + + +def aggregate_model_metrics( + model_dataframe: pd.DataFrame, +) -> dict[str, float]: + """ + Aggregate metrics across all crystal structures for a model. + + Parameters + ---------- + model_dataframe + Per-model compression dataset. + + Returns + ------- + dict[str, float] + Aggregated model metrics (averaged across all structures). + """ + if model_dataframe.empty: + return {} + + structure_metrics: list[dict[str, float]] = [] + + for _struct, struct_dataframe in model_dataframe.groupby("structure"): + metrics = compute_structure_metrics(struct_dataframe) + if metrics is None: + continue + structure_metrics.append(metrics) + + if not structure_metrics: + return {} + + return { + key: float(np.nanmean([m.get(key, np.nan) for m in structure_metrics])) + for key in DEFAULT_THRESHOLDS.keys() + } + + +# Helper to load per-model data ----------------------------------------------- + + +def _load_structure_data() -> dict[str, pd.DataFrame]: + """ + Load per-model compression curve dataframes from calculator outputs. + + Returns + ------- + dict[str, pd.DataFrame] + Mapping of model name to curve dataframe. + """ + structure_data: dict[str, pd.DataFrame] = {} + for model_name in MODELS: + model_dataframe = load_model_data(model_name) + if not model_dataframe.empty: + structure_data[model_name] = model_dataframe + return structure_data + + +def _write_curve_payloads( + curve_dir: Path, model_name: str, frame: pd.DataFrame +) -> None: + """ + Serialise per-structure JSON payloads for the Dash app callbacks. + + Parameters + ---------- + curve_dir + Base directory for curve JSON files. + model_name + Name of the model. + frame + Dataframe with all structure data for this model. + """ + model_curve_dir = curve_dir / model_name + model_curve_dir.mkdir(parents=True, exist_ok=True) + + for struct_label, group in frame.groupby("structure"): + ordered = group.sort_values("volume_per_atom").drop_duplicates( + "volume_per_atom" + ) + volumes = ordered["volume_per_atom"].tolist() + energies = ordered["energy_per_atom"].tolist() + pressures = ordered["pressure"].tolist() + scales = ordered["scale"].tolist() + + # Compute dE/dV (numerical derivative of energy per atom w.r.t. volume) + vol_arr = np.array(volumes) + eng_arr = np.array(energies) + if vol_arr.size >= 2: + de_dv = np.gradient(eng_arr, vol_arr).tolist() + else: + de_dv = [0.0] * len(volumes) + + payload = { + "structure": str(struct_label), + "volume_per_atom": volumes, + "energy_per_atom": energies, + "pressure": pressures, + "scale": scales, + "dEdV": de_dv, + } + filepath = model_curve_dir / f"{struct_label}.json" + with filepath.open("w", encoding="utf8") as fh: + json.dump(payload, fh) + + +def _chemical_formula_from_label(label: str) -> str: + """ + Convert a structure label to its reduced chemical formula. + + Parameters + ---------- + label + Structure label such as ``"C2H4_pyxtal_0"`` or ``"C_bcc"``. + + Returns + ------- + str + Reduced formula, e.g. ``"CH2"`` or ``"C"``. + """ + formula_str = label.split("_")[0] + f = Formula(formula_str) + return str(f.reduce()[0]) + + +def _build_formula_figures(model_name: str, frame: pd.DataFrame) -> None: + """ + Build and save an energy-vs-scale scatter figure for each formula. + + All structures that share the same reduced chemical formula are overlaid + on a single figure. Figures are written to + ``FIGURE_PATH / model_name / {formula}.json``. + + Parameters + ---------- + model_name + Model identifier. + frame + Dataframe with columns *structure*, *scale*, *energy_per_atom* + (at minimum) for this model. + """ + model_fig_dir = FIGURE_PATH / model_name + model_fig_dir.mkdir(parents=True, exist_ok=True) + + # Group structure labels by reduced formula + formula_groups: dict[str, list[str]] = defaultdict(list) + for struct_label in frame["structure"].unique(): + formula = _chemical_formula_from_label(str(struct_label)) + formula_groups[formula].append(str(struct_label)) + + # Pre-compute symlog ticks + tick_vals, tick_text = _symlog_ticks( + LINTHRESH_ENERGY, + decades=SYMLOG_DECADES, + linear_frac=LINEAR_FRAC, + ) + + for formula, struct_labels in formula_groups.items(): + fig = go.Figure() + y_plot_range = [0, 0] + for idx, struct_label in enumerate(sorted(struct_labels)): + group = ( + frame[frame["structure"] == struct_label] + .sort_values("volume_per_atom") + .drop_duplicates("volume_per_atom") + ) + scales = group["scale"].tolist() + energies = group["energy_per_atom"].tolist() + if not scales or not energies: + continue + + color = _PALETTE[idx % len(_PALETTE)] + fig.add_trace( + go.Scatter( + x=scales, + y=_symlog(energies), + mode="lines", + name=struct_label, + line={"color": color}, + marker={"size": 5, "color": color}, + customdata=energies, + hovertemplate=( + "scale: %{x:.3f}
E/atom: %{customdata:.4f} eV" + ), + ) + ) + y_plot_range[0] = min(y_plot_range[0], min(_symlog(energies))) + y_plot_range[1] = max(y_plot_range[1], max(_symlog(energies))) + + default_range = [ + max(tick_vals[0], y_plot_range[0]), + min(tick_vals[-1], y_plot_range[1]), + ] + + default_range[0] -= abs(default_range[0]) * 0.1 + default_range[1] += abs(default_range[1]) * 0.1 + + fig.update_layout( + title=f"{model_name} — {formula}", + xaxis_title="Scale factor", + yaxis_title="Energy per atom (eV, symlog)", + yaxis={ + "tickvals": tick_vals, + "ticktext": tick_text, + "range": default_range, + }, + height=500, + showlegend=True, + template="plotly_white", + ) + + fig.write_json(model_fig_dir / f"{formula}.json") + + +def persist_compression_data() -> dict[str, pd.DataFrame]: + """ + Persist curve payloads, build figures, and return per-model dataframes. + + Returns + ------- + dict[str, pd.DataFrame] + Mapping of model name to per-structure curve data. + """ + data = _load_structure_data() + CURVE_PATH.mkdir(parents=True, exist_ok=True) + FIGURE_PATH.mkdir(parents=True, exist_ok=True) + for model_name, frame in data.items(): + if frame is not None and not frame.empty: + _write_curve_payloads(CURVE_PATH, model_name, frame) + _build_formula_figures(model_name, frame) + return data + + +@pytest.fixture +def compression_data_fixture() -> dict[str, pd.DataFrame]: + """ + Load curve data and persist gallery assets for pytest use. + + Returns + ------- + dict[str, pd.DataFrame] + Mapping of model name to per-structure curve data. + """ + return persist_compression_data() + + +def collect_metrics( + structure_data: dict[str, pd.DataFrame] | None = None, +) -> pd.DataFrame: + """ + Gather metrics for all models. + + Metrics are averaged across all crystal structures. + + Parameters + ---------- + structure_data + Optional mapping of model names to curve dataframes. When ``None``, + the data is loaded via ``persist_compression_data``. + + Returns + ------- + pd.DataFrame + Aggregated metrics table (all structures). + """ + metrics_rows: list[dict[str, float | str]] = [] + + OUT_PATH.mkdir(parents=True, exist_ok=True) + + data = structure_data if structure_data is not None else persist_compression_data() + + for model_name, model_dataframe in data.items(): + metrics = aggregate_model_metrics(model_dataframe) + row = {"Model": model_name} | metrics + metrics_rows.append(row) + + columns = ["Model"] + list(DEFAULT_THRESHOLDS.keys()) + + return pd.DataFrame(metrics_rows).reindex(columns=columns) + + +@pytest.fixture +def compression_collection( + compression_data_fixture: dict[str, pd.DataFrame], +) -> pd.DataFrame: + """ + Collect compression metrics across all models. + + Parameters + ---------- + compression_data_fixture + Mapping of model names to curve dataframes generated by the fixture. + + Returns + ------- + pd.DataFrame + Aggregated metrics dataframe. + """ + return collect_metrics(compression_data_fixture) + + +@pytest.fixture +def compression_metrics_dataframe( + compression_collection: pd.DataFrame, +) -> pd.DataFrame: + """ + Provide the aggregated compression metrics dataframe. + + Parameters + ---------- + compression_collection + Metrics dataframe produced by ``collect_metrics``. + + Returns + ------- + pd.DataFrame + Aggregated compression metrics indexed by model. + """ + return compression_collection + + +@pytest.fixture +@build_table( + filename=OUT_PATH / "compression_metrics_table.json", + metric_tooltips=DEFAULT_TOOLTIPS, + thresholds=DEFAULT_THRESHOLDS, + weights=DEFAULT_WEIGHTS, +) +def metrics( + compression_metrics_dataframe: pd.DataFrame, +) -> dict[str, dict]: + """ + Compute compression metrics for all models. + + Parameters + ---------- + compression_metrics_dataframe + Aggregated per-model metrics produced by ``collect_metrics``. + + Returns + ------- + dict[str, dict] + Mapping of metric names to per-model results. + """ + metrics_df = compression_metrics_dataframe + metrics_dict: dict[str, dict[str, float | None]] = {} + for column in metrics_df.columns: + if column == "Model": + continue + values = [ + value if pd.notna(value) else None for value in metrics_df[column].tolist() + ] + metrics_dict[column] = dict(zip(metrics_df["Model"], values, strict=False)) + return metrics_dict + + +def test_compression(metrics: dict[str, dict]) -> None: + """ + Run compression analysis. + + Parameters + ---------- + metrics + Benchmark metrics generated by fixtures. + """ + return diff --git a/ml_peg/analysis/physicality/compression/metrics.yml b/ml_peg/analysis/physicality/compression/metrics.yml new file mode 100644 index 000000000..a7e3fa6d3 --- /dev/null +++ b/ml_peg/analysis/physicality/compression/metrics.yml @@ -0,0 +1,41 @@ +metrics: + Holes: + good: 0.0 + bad: 1.0 + unit: null + tooltip: Presence of holes in the energy-per-atom curve, any curve with an energy value that is 100 eV/atom lower than the largest volume energy contains a hole + Deep Energy minima: + good: 1.0 + bad: 5.0 + unit: null + tooltip: Average number of deep, > 0.1 eV/atom, energy-per-atom minima per structure. Designed to ignore very shallow minma, particularly in the expanded regime. + Energy minima: + good: 1.0 + bad: 5.0 + unit: null + tooltip: Average number of energy-per-atom minima per structure. Tolerance of 1 meV/atom is used. + # Energy inflections: + # good: 1.0 + # bad: 5.0 + # unit: null + # tooltip: Average number of energy curvature sign changes per structure + Pressure sign flips: + good: 1.0 + bad: 5.0 + unit: null + tooltip: Average number of hydrostatic-pressure sign changes per structure + Big Pressure sign flips: + good: 1.0 + bad: 5.0 + unit: null + tooltip: Average number of large, > 1GPa, hydrostatic-pressure sign changes per structure + ρ(-E,Vsmall): + good: 1.0 + bad: -1.0 + unit: null + tooltip: Spearman correlation between energy and volume in the compressed region, defined as V < V(Emin) so this metric is polluted when Holes are present. + ρ(E,Vlarge): + good: 1.0 + bad: -1.0 + unit: null + tooltip: Spearman correlation between energy and volume in the expanded region, defined as V > V(Emin) so this metric is polluted when Holes are present. diff --git a/ml_peg/app/physicality/compression/app_compression.py b/ml_peg/app/physicality/compression/app_compression.py new file mode 100644 index 000000000..8cbbb960d --- /dev/null +++ b/ml_peg/app/physicality/compression/app_compression.py @@ -0,0 +1,175 @@ +"""Run compression benchmark app.""" + +from __future__ import annotations + +from dash import Dash, Input, Output, callback, dcc +from dash.dcc import Loading +from dash.exceptions import PreventUpdate +from dash.html import Div, Label +from plotly.io import read_json + +from ml_peg.app import APP_ROOT +from ml_peg.app.base_app import BaseApp +from ml_peg.models.get_models import get_model_names +from ml_peg.models.models import current_models + +# Get all models +MODELS = get_model_names(current_models) +BENCHMARK_NAME = "Compression" +DATA_PATH = APP_ROOT / "data" / "physicality" / "compression" +FIGURE_PATH = DATA_PATH / "figures" +DOCS_URL = ( + "https://ddmms.github.io/ml-peg/user_guide/benchmarks/physicality.html#compression" +) + + +def _available_formulas(model_name: str) -> list[str]: + """ + List unique formulas available for a given model. + + Parameters + ---------- + model_name + Selected model identifier. + + Returns + ------- + list[str] + Sorted list of unique formulas. + """ + model_dir = FIGURE_PATH / model_name + if not model_dir.exists(): + return [] + return sorted(p.stem for p in model_dir.glob("*.json")) + + +class CompressionApp(BaseApp): + """Compression benchmark app layout and callbacks.""" + + def register_callbacks(self) -> None: + """Register dropdown-driven compression curve callbacks.""" + model_dropdown_id = f"{BENCHMARK_NAME}-model-dropdown" + composition_dropdown_id = f"{BENCHMARK_NAME}-composition-dropdown" + figure_id = f"{BENCHMARK_NAME}-figure" + + @callback( + Output(composition_dropdown_id, "options"), + Output(composition_dropdown_id, "value"), + Input(model_dropdown_id, "value"), + ) + def _update_composition_options(model_name: str): + """ + Update composition dropdown options based on selected model. + + Parameters + ---------- + model_name + Currently selected model identifier. + + Returns + ------- + tuple + Dropdown options list and default value. + """ + if not model_name: + raise PreventUpdate + formulas = _available_formulas(model_name) + options = [{"label": f, "value": f} for f in formulas] + default = formulas[0] if formulas else None + return options, default + + @callback( + Output(figure_id, "figure"), + Input(model_dropdown_id, "value"), + Input(composition_dropdown_id, "value"), + ) + def _update_figure(model_name: str, composition: str | None): + """ + Load pre-built energy-per-atom vs scale factor figure. + + Parameters + ---------- + model_name + Currently selected model identifier. + composition + Reduced chemical formula for the composition group. + + Returns + ------- + Figure + Plotly figure loaded from the pre-built JSON file. + """ + if not model_name or not composition: + raise PreventUpdate + + figure_file = FIGURE_PATH / model_name / f"{composition}.json" + if not figure_file.exists(): + raise PreventUpdate + + return read_json(figure_file) + + +def get_app() -> CompressionApp: + """ + Get compression benchmark app layout and callback registration. + + Returns + ------- + CompressionApp + Benchmark layout and callback registration. + """ + model_options = [{"label": model, "value": model} for model in MODELS] + default_model = model_options[0]["value"] if model_options else None + + extra_components = [ + Div( + [ + Label("Select model:"), + dcc.Dropdown( + id=f"{BENCHMARK_NAME}-model-dropdown", + options=model_options, + value=default_model, + clearable=False, + style={"width": "300px", "marginBottom": "20px"}, + ), + Label("Select composition:"), + dcc.Dropdown( + id=f"{BENCHMARK_NAME}-composition-dropdown", + options=[], + value=None, + clearable=False, + style={"width": "300px"}, + ), + ], + style={"marginBottom": "20px"}, + ), + Loading( + dcc.Graph( + id=f"{BENCHMARK_NAME}-figure", + style={"height": "500px", "width": "100%", "marginTop": "20px"}, + ), + type="circle", + ), + ] + + return CompressionApp( + name=BENCHMARK_NAME, + description=( + "A handful of common prototype (and randomly generated via pyxtal)" + "structures are isotropically scaled across a wide range." + "A scale factor of 1.0 means that a pair of atoms in the" + "structure is separated by the sum of their covalent radii. " + "e.g. min(d_ij/(r_cov_i + r_cov_j)) = 1.0." + ), + docs_url=DOCS_URL, + table_path=DATA_PATH / "compression_metrics_table.json", + extra_components=extra_components, + ) + + +if __name__ == "__main__": + dash_app = Dash(__name__, assets_folder=DATA_PATH.parent.parent) + compression_app = get_app() + dash_app.layout = compression_app.layout + compression_app.register_callbacks() + dash_app.run(port=8056, debug=True) diff --git a/ml_peg/calcs/physicality/compression/calc_compression.py b/ml_peg/calcs/physicality/compression/calc_compression.py new file mode 100644 index 000000000..45bf8b8b9 --- /dev/null +++ b/ml_peg/calcs/physicality/compression/calc_compression.py @@ -0,0 +1,688 @@ +"""Run calculations for uniform crystal compression benchmark.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from ase import Atoms, units +from ase.build import bulk, make_supercell +from ase.data import atomic_numbers, covalent_radii +from ase.io import read as ase_read +from ase.io import write +import numpy as np +import pandas as pd +import pytest +from pyxtal.tolerance import Tol_matrix +import tqdm + +from ml_peg.models.get_models import load_models +from ml_peg.models.models import current_models + +MODELS = load_models(current_models) + +# Local directory for calculator outputs +OUT_PATH = Path(__file__).parent / "outputs" +DATA_PATH = Path(__file__).parent / "data" + +# Benchmark configuration +# Compression/expansion factors applied uniformly to the cell volume. +# A factor of 1.0 corresponds to the equilibrium structure. +MIN_SCALE = 0.25 +MAX_SCALE = 3.0 +N_POINTS = 100 + +# ELEMENTS: list[str] = [symbol for symbol in chemical_symbols if symbol] +ELEMENTS: list[str] = [ + "H", + "C", + "N", + "O", + "Ti", + "Cu", + "Cl", + "P", + "W", + "Rb", + "Xe", +] # limit to common elements for testing +PROTOTYPES: list[str] = [ + "sc", + "bcc", + "fcc", + "hcp", + "diamond", +] # common crystal prototypes +MAX_ATOMS_PER_CELL = 6 # limit to small cells for testing +RANDOM_STRUCTURES: list[dict[str, int]] = [ + [1, len(ELEMENTS), 5], + [2, 10, 10], + [3, 10, 10], +] # [[Number of elements, number of compositions, repeats per composition], ...] + + +def _scale_grid( + min_scale: float, + max_scale: float, + n_points: int, +) -> np.ndarray: + """ + Return a uniformly spaced grid of isotropic scaling factors. + + Each factor multiplies the *linear* cell dimensions so that the + corresponding volume scales as ``factor**3``. + + Parameters + ---------- + min_scale, max_scale + Bounds of the linear scaling grid. + n_points + Number of points in the grid. + + Returns + ------- + np.ndarray + Monotonic array of scaling factors. + """ + return np.linspace(min_scale, max_scale, n_points, dtype=float) + + +def _scale_using_isotropic_guess(atoms: Atoms) -> Atoms: + """ + Isotropically scale a structure so nearest-neighbour distances match covalent radii. + + For each pair of atoms the target bond length is the sum of their + covalent radii (from ``ase.data.covalent_radii``). The structure is + uniformly rescaled so that the shortest interatomic distance equals + the smallest such target across all pairs. + + Parameters + ---------- + atoms + Input structure (must be periodic with ``pbc=True``). + + Returns + ------- + Atoms + A copy of *atoms* with the cell and positions rescaled. + + Raises + ------ + ValueError + If the shortest interatomic distance at the current cell is zero + or negative (degenerate geometry). + """ + atoms = atoms.copy() + sc = make_supercell(atoms, 2 * np.eye(3)) + distances = sc.get_all_distances(mic=True) + np.fill_diagonal(distances, np.inf) + + # Build a matrix of target bond lengths (sum of covalent radii per pair) + radii = np.array( + [covalent_radii[atomic_numbers[s]] for s in sc.get_chemical_symbols()] + ) + target_matrix = radii[:, None] + radii[None, :] + + # Mask infinite entries (self-distances) + mask = np.isfinite(distances) + if not np.any(mask): + raise ValueError("No finite interatomic distances found") + + # Find the pair with the smallest distance and its corresponding target + ratios = np.full_like(distances, np.inf) + ratios[mask] = target_matrix[mask] / distances[mask] + + # The required scale factor is the ratio for the closest pair + closest_idx = np.unravel_index(np.argmin(distances), distances.shape) + min_dist = distances[closest_idx] + target_bond = target_matrix[closest_idx] + + if min_dist <= 0.0: + raise ValueError("Degenerate geometry: shortest interatomic distance <= 0") + + scale_factor = target_bond / min_dist + atoms.set_cell(atoms.cell * scale_factor, scale_atoms=True) + return atoms + + +def _generate_prototype_structures( + elements: list[str], + prototypes: list[str], +) -> dict[str, Atoms]: + """ + Generate crystal structures from element/prototype combinations using ASE. + + For each element and crystal prototype, ``ase.build.bulk`` is called + with a lattice constant of 1.0. The resulting structure is then + rescaled via ``_scale_using_isotropic_guess`` so that the shortest + interatomic distance matches the sum of covalent radii for the + closest pair. Combinations that ASE cannot build are silently + skipped. + + Parameters + ---------- + elements + Chemical symbols to build structures for. + prototypes + Crystal-structure prototypes recognised by ``ase.build.bulk`` + (e.g. ``"fcc"``, ``"bcc"``, ``"hcp"``, ``"diamond"``, ``"sc"``). + + Returns + ------- + dict[str, Atoms] + Mapping of ``"{element}_{prototype}"`` labels to ASE ``Atoms``. + """ + structures: dict[str, Atoms] = {} + for element in elements: + for proto in prototypes: + label = f"{element}_{proto}" + try: + atoms = bulk(element, crystalstructure=proto, a=1.0) + atoms = _scale_using_isotropic_guess(atoms) + + assert np.all(atoms.pbc), ( + f"Generated non-periodic structure for {label}" + ) + assert atoms.get_volume() > 0.0, ( + f"Generated zero-volume structure for {label}" + ) + + structures[label] = atoms + except (ValueError, KeyError, RuntimeError) as exc: + print(f"Skipping {label}: {exc}") + continue + return structures + + +def _gen_random_structure( + composition: dict[str, int], + space_group: int | None = None, + dim: int = 3, + seed: int | None = None, + max_attempts: int = 230, + volume_factor: float = 0.35, +) -> Atoms: + """ + Generate a random crystal structure using PyXtal. + + Parameters + ---------- + composition + Mapping of chemical symbol to number of atoms in the + conventional cell (e.g. ``{"C": 12}`` or ``{"Ba": 1, "Ti": 1, "O": 3}``). + space_group + International space-group number (1–230). When ``None`` (default), + a space group is chosen at random. If the chosen space group is + incompatible with the composition, additional random space groups + are tried until one succeeds. + dim + Dimensionality of the crystal (3 for bulk). Default is 3. + seed + Random seed for reproducibility. When ``None`` (default), a + random seed is generated automatically. + max_attempts + Maximum number of space-group attempts before giving up. + volume_factor + Volume factor used when generating random structures. + Smaller factors will results in denser structures. + + Returns + ------- + Atoms + ASE ``Atoms`` with periodic boundary conditions and a cell + scaled so that nearest-neighbour distances match the sum of + covalent radii. + + Raises + ------ + RuntimeError + If no valid structure could be generated within *max_attempts*. + """ + from pyxtal import pyxtal as pyxtal_cls + + rng = np.random.default_rng(seed) + + species = list(composition.keys()) + num_atoms = list(composition.values()) + + # If a specific space group was requested, try it first + sg_candidates: list[int] = [] + if space_group is not None: + # sg_candidates.append(space_group) + sg_candidates = [ + space_group + ] * max_attempts # try the same one repeatedly to allow for + # randomness in the structure generation + + # Fill remaining attempts with random space groups + while len(sg_candidates) < max_attempts: + sg_candidates.append(int(rng.integers(1, 231))) + + custom_tol = Tol_matrix(prototype="atomic", factor=1.0) + for sg in sg_candidates: + crystal = pyxtal_cls() + crystal.from_random( + dim=dim, + group=sg, + species=species, + numIons=num_atoms, + random_state=int(rng.integers(0, 2**31)), + tm=custom_tol, + max_count=1000, + factor=volume_factor, + ) + if not crystal.valid: + continue + + atoms = crystal.to_ase() + atoms.pbc = True + atoms = _scale_using_isotropic_guess(atoms) + + return atoms # noqa: RET504 + # except Exception: + # continue + + raise RuntimeError( + f"PyXtal failed to generate a valid structure for " + f"composition={composition} after {max_attempts} attempts" + ) + + +def _composition_label(composition: dict[str, int]) -> str: + """ + Build a canonical string label for a composition. + + Elements are sorted alphabetically and counts are appended only when + greater than one (e.g. ``{"C": 1, "O": 2}`` → ``"C-O2"``). + Elements are split by ``"-"`` to avoid confusion with the + underscore used for separating the composition from the rest + of the label (e.g. ``"C-O2_pyxtal_0"``). + + Parameters + ---------- + composition + Mapping of element symbol to atom count. + + Returns + ------- + str + Compact composition label. + """ + parts: list[str] = [] + for elem in sorted(composition): + count = composition[elem] + parts.append(f"{elem}{count if count > 1 else ''}") + return "-".join(parts) + + +def _generate_all_random( + random_specs: list[list[int]], + elements: list[str], + max_atoms: int, + data_path: Path, + seed: int | None = None, +) -> dict[str, Atoms]: + """ + Generate random crystal structures for multiple compositions. + + For each entry ``[n_elements, n_compositions, repeats]`` in + *random_specs*, *n_compositions* unique compositions are sampled + (drawing *n_elements* species from *elements* and random atom + counts respecting *max_atoms*). For every composition, + ``_gen_random_structure`` is called *repeats* times, producing + structures labelled ``"{composition}_RSS_{i}"``. + + Single-element structures are additionally saved to *data_path* so + they can be reloaded later via ``_load_structures``. + + Parameters + ---------- + random_specs + List of ``[n_elements, n_compositions, repeats]`` triples. + elements + Pool of chemical symbols to draw from. + max_atoms + Maximum total number of atoms in any generated cell. + data_path + Directory where the configs ``.xyz`` files are written. + seed + Master random seed for reproducibility. + + Returns + ------- + dict[str, Atoms] + All successfully generated structures keyed by label. + """ + rng = np.random.default_rng(seed) + structures: dict[str, Atoms] = {} + data_path.mkdir(parents=True, exist_ok=True) + + # Collect frames grouped by number of elements for saving + frames_by_n_elements: dict[int, list[Atoms]] = {} + + for n_elements, n_compositions, repeats in random_specs: + if n_elements > len(elements): + print( + f"Requested {n_elements} elements but only {len(elements)} " + f"available — skipping this spec." + ) + continue + + # Generate unique compositions + generated_compositions: list[dict[str, int]] = [] + seen_labels: set[str] = set() + max_comp_attempts = n_compositions * 50 # safety limit + attempts = 0 + + while ( + len(generated_compositions) < n_compositions + and attempts < max_comp_attempts + ): + attempts += 1 + if n_elements == 1: + # Pick the element which occurs least frequently + # in the already generated compositions to + # increase diversity. + elem_counts = dict.fromkeys(elements, 0) + for comp in generated_compositions: + for elem in comp: + elem_counts[elem] += comp[elem] + least_common = sorted(elem_counts, key=elem_counts.get) + chosen_elements = [least_common[0]] + else: + chosen_elements = sorted( + rng.choice(elements, size=n_elements, replace=False).tolist() + ) + # Random atom counts that sum to at most max_atoms (each >= 1) + counts = rng.integers( + 1, max(2, max_atoms - n_elements + 2), size=n_elements + ) + if len(counts) == 1: + counts = np.random.choice( + range(max_atoms // 2, max_atoms + 1), size=1 + ) # for single element, pick a random count + # between max_atoms//2 and max_atoms + + # Clip total to max_atoms + total = int(counts.sum()) + if total > max_atoms: + counts = (counts * max_atoms / total).astype(int) + counts = np.maximum(counts, 1) + + composition = dict(zip(chosen_elements, counts.tolist(), strict=True)) + label = _composition_label(composition) + if label not in seen_labels: + seen_labels.add(label) + generated_compositions.append(composition) + + print( + f"Generated {len(generated_compositions)} unique " + f"compositions with {n_elements} elements: " + f"{generated_compositions}" + ) + # Generate structures for each composition + for composition in tqdm.tqdm(generated_compositions): + # comp_label = _composition_label(composition) + for i in range(repeats): + # struct_label = f"{comp_label}_pyxtal_{i}" + try: + atoms = _gen_random_structure( + composition, + seed=int(rng.integers(0, 2**31)), + space_group=1, # random space group + ) + + struct_label = f"{atoms.get_chemical_formula()}_pyxtal_{i}" + atoms.info["label"] = struct_label + structures[struct_label] = atoms + + # Collect for saving grouped by number of elements + frames_by_n_elements.setdefault(n_elements, []).append(atoms) + except Exception as exc: + print(f"Failed to generate {struct_label}: {exc}") + continue + + # Save structures to data_path as multi-frame xyz, grouped by element count + for n_elem, atom_list in frames_by_n_elements.items(): + filepath = data_path / f"{n_elem}.xyz" + write(filepath, atom_list, format="extxyz") + print(f"Saved {len(atom_list)} structures to {filepath}") + + return structures + + +def _load_structures(data_path: Path) -> dict[str, Atoms]: + """ + Load reference crystal structures from the data directory. + + Each ``.xyz`` / ``.extxyz`` file may contain multiple frames. For + single-frame files the label is the filename stem; for multi-frame + files each frame is labelled ``"{stem}_{index}"``. + + Parameters + ---------- + data_path + Directory containing reference crystal structure files. + + Returns + ------- + dict[str, Atoms] + Mapping of structure label to ASE ``Atoms``. + """ + structures: dict[str, Atoms] = {} + for ext in ("*.xyz", "*.extxyz", "*.cif"): + for filepath in sorted(data_path.glob(ext)): + try: + frames = ase_read(filepath, index=":") + except Exception: + continue + if not isinstance(frames, list): + frames = [frames] + stem = filepath.stem + if len(frames) == 1: + atoms = frames[0] + # Prefer the stored label if present + label = atoms.info.get("label", stem) + if not np.any(atoms.pbc): + atoms.pbc = True + structures[label] = atoms + else: + for idx, atoms in enumerate(frames): + label = atoms.info.get("label", f"{stem}_{idx}") + if not np.any(atoms.pbc): + atoms.pbc = True + structures[label] = atoms + return structures + + +def _collect_structures( + elements: list[str], + prototypes: list[str], + data_path: Path, +) -> dict[str, Atoms]: + """ + Build the full set of structures to benchmark. + + Combines prototype-generated structures (one per element/prototype + combination produced by ``ase.build.bulk``) with any additional + structures loaded from files in ``data_path``. + + Parameters + ---------- + elements + Chemical symbols for prototype generation. + prototypes + Crystal prototypes for ``ase.build.bulk``. + data_path + Directory containing additional reference structure files. + + Returns + ------- + dict[str, Atoms] + Combined mapping of structure labels to ASE ``Atoms``. + Prototype labels have the form ``"{element}_{prototype}"``; + file-loaded labels use the filename stem. If a file label + collides with a prototype label, the file-loaded structure + takes precedence. + """ + structures = _generate_prototype_structures(elements, prototypes) + file_structures = _load_structures(data_path) + # File-loaded structures override any prototype with the same label + structures.update(file_structures) + return structures + + +def run_compression(model_name: str, model) -> None: + """ + Evaluate energy-volume compression curves for a single model. + + For every reference crystal structure, the cell is uniformly scaled + over a range of linear factors. At each scale point the energy, + volume (per atom), and stress tensor are recorded. + + Parameters + ---------- + model_name + Name of the model being evaluated. + model + Model wrapper providing ``get_calculator``. + """ + calc = model.get_calculator() + write_dir = OUT_PATH / model_name + write_dir.mkdir(parents=True, exist_ok=True) + traj_dir = write_dir / "compression" + traj_dir.mkdir(parents=True, exist_ok=True) + + reference_structures = _collect_structures(ELEMENTS, PROTOTYPES, DATA_PATH) + if not reference_structures: + print(f"[{model_name}] No structures generated or found in {DATA_PATH}") + return + + records: list[dict[str, float | str]] = [] + supported_structures: set[str] = set() + failed_structures: dict[str, str] = {} + + scales = _scale_grid(MIN_SCALE, MAX_SCALE, N_POINTS) + + for struct_label, ref_atoms in tqdm.tqdm( + reference_structures.items(), + desc=f"{model_name} compression", + unit="struct", + ): + try: + ref_cell = ref_atoms.cell.copy() + ref_positions = ref_atoms.get_scaled_positions() + n_atoms = len(ref_atoms) + trajectory: list[Atoms] = [] + + for scale in scales: + atoms = ref_atoms.copy() + # Uniformly scale the cell (isotropic compression/expansion) + atoms.set_cell(ref_cell * scale, scale_atoms=True) + atoms.set_scaled_positions(ref_positions) + + atoms.info.setdefault("charge", 0) + atoms.info.setdefault("spin", 1) + + atoms.calc = calc + + energy = float(atoms.get_potential_energy()) + volume = float(atoms.get_volume()) + volume_per_atom = volume / n_atoms + energy_per_atom = energy / n_atoms + + # Stress tensor (Voigt notation, eV/ų) + try: + stress_voigt = atoms.get_stress(voigt=False) + pressure = ( + -1 / 3 * np.trace(stress_voigt) / units.GPa + ) # hydrostatic pressure + except Exception: + stress_voigt = np.zeros(6) + pressure = np.nan + + # Store trajectory snapshot + atoms_copy = atoms.copy() + atoms_copy.calc = None + atoms_copy.info.update( + { + "structure": struct_label, + "scale": float(scale), + "energy": energy, + "energy_per_atom": energy_per_atom, + "volume": volume, + "volume_per_atom": volume_per_atom, + "pressure": pressure, + "model": model_name, + } + ) + trajectory.append(atoms_copy) + + records.append( + { + "structure": struct_label, + "scale": float(scale), + "energy": energy, + "energy_per_atom": energy_per_atom, + "volume": volume, + "volume_per_atom": volume_per_atom, + "pressure": pressure, + } + ) + + if trajectory: + write( + traj_dir / f"{struct_label}.xyz", + trajectory, + format="extxyz", + ) + supported_structures.add(struct_label) + + except Exception as exc: + failed_structures[struct_label] = str(exc) + print(f"[{model_name}] Skipping {struct_label}: {exc}") + continue + + if records: + df = pd.DataFrame.from_records(records) + df.to_csv(write_dir / "compression.csv", index=False) + + metadata = { + "supported_structures": sorted(supported_structures), + "failed_structures": failed_structures, + "config": { + "min_scale": MIN_SCALE, + "max_scale": MAX_SCALE, + "n_points": N_POINTS, + }, + } + (write_dir / "metadata.json").write_text(json.dumps(metadata, indent=2)) + + +@pytest.mark.slow +@pytest.mark.parametrize("model_name", MODELS) +def test_compression(model_name: str) -> None: + """ + Run compression benchmark for each registered model. + + Parameters + ---------- + model_name + Name of the model to evaluate. + """ + run_compression(model_name, MODELS[model_name]) + + +if __name__ == "__main__": + # generate the random structures and save to data directory + # this only needs to be done once + # code included here so that the structures can be easily + # regenerated in the same way if needed, or more can be + # created etc. + _generate_all_random( + RANDOM_STRUCTURES, + ELEMENTS, + MAX_ATOMS_PER_CELL, + DATA_PATH, + seed=42, + )