diff --git a/aaanalysis/_utils/plotting.py b/aaanalysis/_utils/plotting.py index 68140079b..42028aee1 100644 --- a/aaanalysis/_utils/plotting.py +++ b/aaanalysis/_utils/plotting.py @@ -137,10 +137,10 @@ def _check_marker_size(marker_size: Union[int, float, List[Union[int, float]]] = if isinstance(marker_size, (int, float)): check_number_range(name='marker_size', val=marker_size, min_val=0, accept_none=True, just_int=False) elif isinstance(marker_size, list): + if len(marker_size) != len(list_cat): + raise ValueError(f"Length must match of 'marker_size' (marker_size) and categories ({list_cat}).") for i in marker_size: check_number_range(name='marker_size', val=i, min_val=0, accept_none=True, just_int=False) - elif isinstance(marker_size, list) and len(marker_size) != len(list_cat): - raise ValueError(f"Length must match of 'marker_size' (marker_size) and categories ({list_cat}).") else: raise ValueError(f"'marker_size' has wrong data type: {type(marker_size)}") # Create marker_size list diff --git a/aaanalysis/config.py b/aaanalysis/config.py index 45bdfa667..7407c511d 100644 --- a/aaanalysis/config.py +++ b/aaanalysis/config.py @@ -27,6 +27,12 @@ } +# Tracks whether allow_multiprocessing=False set the loky CPU cap, and the user's prior +# LOKY_MAX_CPU_COUNT value, so re-enabling multiprocessing restores it (never loses it). +_loky_capped_by_options = False +_loky_prev_value = None + + # Check system level (option) parameters or depending on parameters def check_verbose(verbose=None): """Check if general verbosity is on or off. Adjusted based on options setting and value provided to object""" @@ -67,12 +73,28 @@ def check_n_jobs(n_jobs=None): global_n_jobs = options["n_jobs"] if global_n_jobs != "off": n_jobs = global_n_jobs + global _loky_capped_by_options, _loky_prev_value allow_multiprocessing = options["allow_multiprocessing"] check_bool(name="allow_multiprocessing (options)", val=allow_multiprocessing) - # Disable multiprocessing + # Cap loky when multiprocessing is disabled, remembering the user's prior value so it is + # restored (not lost) once multiprocessing is re-enabled on the next parallel-capable call. if not allow_multiprocessing: n_jobs = 1 + if not _loky_capped_by_options: + _loky_prev_value = os.environ.get('LOKY_MAX_CPU_COUNT') + _loky_capped_by_options = True os.environ['LOKY_MAX_CPU_COUNT'] = "1" + elif _loky_capped_by_options: + # Only undo our own cap if it is still in place. If the user set their own + # LOKY_MAX_CPU_COUNT (e.g. for another loky/joblib library) while multiprocessing + # was disabled, the value is no longer "1" -> leave it untouched. + if os.environ.get('LOKY_MAX_CPU_COUNT') == "1": + if _loky_prev_value is None: + os.environ.pop('LOKY_MAX_CPU_COUNT', None) + else: + os.environ['LOKY_MAX_CPU_COUNT'] = _loky_prev_value + _loky_capped_by_options = False + _loky_prev_value = None # Set n_jobs to maximum number of CPUs if n_jobs == -1: n_jobs = os.cpu_count() @@ -130,10 +152,13 @@ def _check_option(name_option="", option=None): """Check if option is valid""" if name_option == "verbose": if option != "off": - check_verbose(verbose=option) + # Validate the incoming candidate directly (check_verbose resolves against + # the current global and would skip validating a new value once one is set). + check_bool(name=name_option, val=option) if name_option == "random_state": if option != "off": - check_random_state(random_state=option) + check_number_range(name=name_option, val=option, min_val=0, + accept_none=True, just_int=True) if name_option == "n_jobs": if option != "off": # Concrete override: -1 (all cores) or a positive int. None is not a @@ -144,12 +169,12 @@ def _check_option(name_option="", option=None): accept_none=False, just_int=True) if name_option == "allow_multiprocessing": check_bool(name=name_option, val=option) - if "jmd" in name_option: - if "len" in name_option: - check_number_range(name=name_option, val=option, - min_val=0, accept_none=True, just_int=True) - if "name" in name_option: - check_str(name=name_option, val=option, accept_none=False) + if "jmd" in name_option and "len" in name_option: + check_number_range(name=name_option, val=option, + min_val=0, accept_none=True, just_int=True) + if "name" in name_option: + # Covers name_tmd, name_jmd_n, name_jmd_c + check_str(name=name_option, val=option, accept_none=False) if name_option == "ext_len": check_number_range(name=name_option, val=option, min_val=0, accept_none=False, just_int=True) if "df" in name_option: diff --git a/aaanalysis/data_handling/_backend/parse_fasta.py b/aaanalysis/data_handling/_backend/parse_fasta.py index e8ea46f68..b49cc4284 100644 --- a/aaanalysis/data_handling/_backend/parse_fasta.py +++ b/aaanalysis/data_handling/_backend/parse_fasta.py @@ -24,7 +24,10 @@ def get_entries_from_fasta(file_path=None, col_id="entry", col_seq="sequence", c if len(list_info) > 1: for i in range(1, len(list_info[1:])+1): dict_current_entry[f'info{i}'] = list_info[i] - else: + elif line: + if not dict_current_entry: + raise ValueError(f"'file_path' ('{file_path}') is not a valid FASTA file: " + f"sequence data appears before the first '>' header.") dict_current_entry[col_seq] += line if dict_current_entry: list_entries.append(dict_current_entry) diff --git a/aaanalysis/data_handling/_load_dataset.py b/aaanalysis/data_handling/_load_dataset.py index abc57d965..eb06a4609 100644 --- a/aaanalysis/data_handling/_load_dataset.py +++ b/aaanalysis/data_handling/_load_dataset.py @@ -62,7 +62,7 @@ def post_check_df_seq(df_seq=None, n=None, name=None) -> None: f"\nThis maximum value depends on the filtering settings used." # Validation of sequence and domain datasets if n is not None and len(df_seq) != n*2: - warnings.warn(warning_message) + warnings.warn(warning_message, UserWarning) # Helper functions @@ -256,7 +256,9 @@ def load_dataset(name: str = "Overview", # Load overview table if name == "Overview": return ut.read_csv_cached(FOLDER_BENCHMARKS + f"Overview.{ut.STR_FILE_TYPE}").copy() - df = ut.read_csv_cached(FOLDER_BENCHMARKS + name + f".{ut.STR_FILE_TYPE}") + # Copy the cached frame (like the Overview branch) so downstream filtering / + # non-canonical-AA substitution can't mutate the shared cache in place + df = ut.read_csv_cached(FOLDER_BENCHMARKS + name + f".{ut.STR_FILE_TYPE}").copy() # Filter data if min_len is not None: n_before = len(df) diff --git a/aaanalysis/data_handling/_load_features.py b/aaanalysis/data_handling/_load_features.py index eebcc3c52..0ebc69954 100644 --- a/aaanalysis/data_handling/_load_features.py +++ b/aaanalysis/data_handling/_load_features.py @@ -51,4 +51,5 @@ def load_features(name: Literal["DOM_GSEC"] = "DOM_GSEC") -> pd.DataFrame: check_name(name=name) # Load features df_feat = ut.read_csv_cached(FOLDER_FEATURES + f"FEATURES_{name}.{ut.STR_FILE_TYPE}") - return df_feat + # Copy the cached frame so a caller's in-place edit can't corrupt the shared cache + return df_feat.copy() diff --git a/aaanalysis/data_handling/_load_scales.py b/aaanalysis/data_handling/_load_scales.py index 52981c433..486522eaa 100644 --- a/aaanalysis/data_handling/_load_scales.py +++ b/aaanalysis/data_handling/_load_scales.py @@ -245,7 +245,7 @@ def load_scales(name: Literal["scales", "scales_raw", "scales_cat", "scales_pc", # Check input check_name_of_scale(name=name) ut.check_bool(name="just_aaindex", val=just_aaindex) - ut.check_bool(name="unclassified_in", val=unclassified_out) + ut.check_bool(name="unclassified_out", val=unclassified_out) top60_n = check_top60_n(name=name, top60_n=top60_n) check_top_explain(name=name, top_explain_n=top_explain_n, top_explain_min_th=top_explain_min_th, top60_n=top60_n) diff --git a/aaanalysis/data_handling/_read_fasta.py b/aaanalysis/data_handling/_read_fasta.py index 2a9c07da9..c3920f66f 100644 --- a/aaanalysis/data_handling/_read_fasta.py +++ b/aaanalysis/data_handling/_read_fasta.py @@ -18,7 +18,7 @@ def post_check_unique_entries(list_entries=None, col_id=None) -> None: if len(list_duplicates) > 0: str_warning = (f"Entries from '{col_id}' should be unique. " f"\nFollowing entries are duplicated: {list_duplicates}") - warnings.warn(str_warning) + warnings.warn(str_warning, UserWarning) def post_check_col_db(df_seq=None, col_db=None, sep="|") -> None: @@ -26,7 +26,7 @@ def post_check_col_db(df_seq=None, col_db=None, sep="|") -> None: columns = list(df_seq) if col_db is not None and col_db not in columns: str_warning = f"'col_db' ('{col_db}') not in 'df_seq'. Check if 'sep' ('{sep}') is matching." - warnings.warn(str_warning) + warnings.warn(str_warning, UserWarning) def _adjust_columns(df_seq=None, col_seq=None, col_id=None, cols_info=None, col_db=None): diff --git a/aaanalysis/feature_engineering/_aaclust.py b/aaanalysis/feature_engineering/_aaclust.py index 2e20e0081..a1933e2f8 100644 --- a/aaanalysis/feature_engineering/_aaclust.py +++ b/aaanalysis/feature_engineering/_aaclust.py @@ -42,7 +42,7 @@ def check_match_X_n_clusters(X=None, n_clusters=None, accept_none=True) -> None: if n_samples < n_clusters: raise ValueError(f"n_samples={n_samples} (in 'X') should be >= 'n_clusters' ({n_clusters})") if n_unique_samples < n_clusters: - raise ValueError(f"'n_clusters' ({n_clusters}) should be >= n_unique_samples={n_unique_samples} (in 'X').") + raise ValueError(f"'n_clusters' ({n_clusters}) should be <= n_unique_samples={n_unique_samples} (in 'X').") def check_match_df_seq_X(df_seq=None, X=None) -> None: diff --git a/aaanalysis/feature_engineering/_backend/check_aaclust.py b/aaanalysis/feature_engineering/_backend/check_aaclust.py index f357d3e4e..d8c577bc8 100644 --- a/aaanalysis/feature_engineering/_backend/check_aaclust.py +++ b/aaanalysis/feature_engineering/_backend/check_aaclust.py @@ -11,7 +11,7 @@ def check_metric(metric=None): """""" if metric not in ut.LIST_METRICS: - error = f"'metric' should be None or one of following: {ut.LIST_METRICS}" + error = f"'metric' should be one of following: {ut.LIST_METRICS}" raise ValueError(error) diff --git a/aaanalysis/feature_engineering/_backend/check_feature.py b/aaanalysis/feature_engineering/_backend/check_feature.py index e2f8f3146..b0b95232c 100644 --- a/aaanalysis/feature_engineering/_backend/check_feature.py +++ b/aaanalysis/feature_engineering/_backend/check_feature.py @@ -460,7 +460,8 @@ def check_match_df_parts_df_scales(df_parts=None, df_scales=None, accept_gaps=Fa char_scales.append(ut.STR_AA_GAP) missing_char = [x for x in char_parts if x not in char_scales] # Replace gaps by default amino acid gap - if accept_gaps: + if accept_gaps and missing_char: + df_parts = df_parts.copy() # copy so we don't mutate the caller's df_parts in place for col in list(df_parts): for mc in missing_char: df_parts[col] = df_parts[col].str.replace(mc, ut.STR_AA_GAP) diff --git a/aaanalysis/feature_engineering/_backend/cpp/cpp_eval.py b/aaanalysis/feature_engineering/_backend/cpp/cpp_eval.py index f333eebf0..8c0c4dbb2 100644 --- a/aaanalysis/feature_engineering/_backend/cpp/cpp_eval.py +++ b/aaanalysis/feature_engineering/_backend/cpp/cpp_eval.py @@ -36,7 +36,8 @@ def get_min_cor(X, labels=None): def get_best_n_clusters(X=None, min_th=0.3, random_state=None): """Obtain the best number of clusters based on internal cluster minimum correlation.""" n_features, _ = X.shape - max_n = min([100, n_features-1]) + # Clamp to >= 1 so a single-feature set does not fall through to KMeans(n_clusters=0) + max_n = max(1, min(100, n_features - 1)) best_n_clusters = max_n for n_clusters in range(2, max_n): kmeans = KMeans(n_clusters=n_clusters, n_init="auto", random_state=random_state) diff --git a/aaanalysis/plotting/_plot_legend.py b/aaanalysis/plotting/_plot_legend.py index 0ccfe8d87..f64354f52 100644 --- a/aaanalysis/plotting/_plot_legend.py +++ b/aaanalysis/plotting/_plot_legend.py @@ -153,10 +153,10 @@ def plot_legend(ax: Optional[Axes] = None, ut.check_number_val(name="lw", val=lw, accept_none=True, just_int=False) args_non_neg = {"labelspacing": labelspacing, "columnspacing": columnspacing, "handletextpad": handletextpad, "handlelength": handlelength, - "fontsize": fontsize, "fontsize_legend": fontsize_title} + "fontsize": fontsize, "fontsize_title": fontsize_title} for key in args_non_neg: ut.check_number_range(name=key, val=args_non_neg[key], min_val=0, accept_none=True, just_int=False) - ut.check_bool(name="add_legend", val=keep_legend, accept_none=False) + ut.check_bool(name="keep_legend", val=keep_legend, accept_none=False) # Create new legend ax = ut.plot_legend_(ax=ax, dict_color=dict_color, list_cat=list_cat, labels=labels, loc=loc, loc_out=loc_out, y=y, x=x, n_cols=n_cols, diff --git a/aaanalysis/protein_engineering/_backend/seqmut/seqmut.py b/aaanalysis/protein_engineering/_backend/seqmut/seqmut.py index c036e95da..c2e03341a 100644 --- a/aaanalysis/protein_engineering/_backend/seqmut/seqmut.py +++ b/aaanalysis/protein_engineering/_backend/seqmut/seqmut.py @@ -191,7 +191,7 @@ def comp_scan_scores(dX=None, mean_dif=None, weight_vec=None): def build_scan_output(df_plan=None, delta_cpp=None, shift_score=None, - delta_pred=None, wt_pred=None, wt_pred_std=None): + delta_pred=None, wt_pred=None, wt_pred_std=None, sort=True): """Assemble the tidy scan output DataFrame, sorted by descending |ΔCPP|. When ``delta_pred`` is given (a model is bound), the model prediction-shift columns @@ -209,7 +209,11 @@ def build_scan_output(df_plan=None, delta_cpp=None, shift_score=None, df_out[ut.COL_WT_PRED_STD] = wt_pred_std cols = cols + [ut.COL_DELTA_PRED, ut.COL_WT_PRED, ut.COL_WT_PRED_STD] df_out = df_out[cols] - df_out = df_out.sort_values(ut.COL_DELTA_CPP, ascending=False).reset_index(drop=True) + if sort: + df_out = df_out.sort_values(ut.COL_DELTA_CPP, ascending=False).reset_index(drop=True) + else: + # Keep df_plan row order so callers can align results positionally (no re-join). + df_out = df_out.reset_index(drop=True) return df_out diff --git a/aaanalysis/protein_engineering/_seqmut.py b/aaanalysis/protein_engineering/_seqmut.py index 7579aa800..cb02dac62 100644 --- a/aaanalysis/protein_engineering/_seqmut.py +++ b/aaanalysis/protein_engineering/_seqmut.py @@ -186,7 +186,7 @@ def __init__(self, # Helper methods def _delta_table(self, df_plan=None, df_seq=None, df_feat=None, jmd_n_len=10, jmd_c_len=10, - weight=None): + weight=None, sort=True): """Run the ΔCPP (+ model ΔP) engine for a mutation plan and return the scored output.""" features = list(df_feat[ut.COL_FEATURE]) mean_dif = df_feat[ut.COL_MEAN_DIF].to_numpy(dtype=float) @@ -201,8 +201,9 @@ def _delta_table(self, df_plan=None, df_seq=None, df_feat=None, jmd_n_len=10, jm X_wt=X_wt, X_mut=X_mut, wt_rows=wt_rows, model=self._model, target_class=self._target_class) return build_scan_output(df_plan=df_plan, delta_cpp=delta_cpp, shift_score=shift_score, - delta_pred=delta_pred, wt_pred=wt_pred, wt_pred_std=wt_pred_std) - return build_scan_output(df_plan=df_plan, delta_cpp=delta_cpp, shift_score=shift_score) + delta_pred=delta_pred, wt_pred=wt_pred, wt_pred_std=wt_pred_std, + sort=sort) + return build_scan_output(df_plan=df_plan, delta_cpp=delta_cpp, shift_score=shift_score, sort=sort) # Main methods def mutate(self, @@ -279,14 +280,14 @@ def mutate(self, for p, ts, te in zip(df_plan[ut.COL_POS], df_plan[ut.COL_TMD_START], df_plan[ut.COL_TMD_STOP])] + # Score in mutation order (sort=False) so results align row-for-row with df_mut; + # no label re-join, so duplicate mutation rows can never desync or crash. df_scored = self._delta_table(df_plan=df_plan, df_seq=df_seq, df_feat=df_feat, - jmd_n_len=jmd_n_len, jmd_c_len=jmd_c_len) - df_scored = df_scored.set_index(ut.COL_MUTATION) - df_mut[ut.COL_DELTA_CPP] = df_scored.loc[df_mut[ut.COL_MUTATION], ut.COL_DELTA_CPP].to_numpy() - df_mut[ut.COL_SHIFT_SCORE] = df_scored.loc[df_mut[ut.COL_MUTATION], ut.COL_SHIFT_SCORE].to_numpy() + jmd_n_len=jmd_n_len, jmd_c_len=jmd_c_len, sort=False) + df_mut[ut.COL_DELTA_CPP] = df_scored[ut.COL_DELTA_CPP].to_numpy() + df_mut[ut.COL_SHIFT_SCORE] = df_scored[ut.COL_SHIFT_SCORE].to_numpy() if self._model is not None: - df_mut[ut.COL_DELTA_PRED] = df_scored.loc[df_mut[ut.COL_MUTATION], - ut.COL_DELTA_PRED].to_numpy() + df_mut[ut.COL_DELTA_PRED] = df_scored[ut.COL_DELTA_PRED].to_numpy() return df_mut def scan(self, diff --git a/aaanalysis/seq_analysis_pro/_backend/comp_seq_sim.py b/aaanalysis/seq_analysis_pro/_backend/comp_seq_sim.py index 81545a8dc..868817065 100644 --- a/aaanalysis/seq_analysis_pro/_backend/comp_seq_sim.py +++ b/aaanalysis/seq_analysis_pro/_backend/comp_seq_sim.py @@ -37,9 +37,9 @@ def comp_pw_seq_sim_(df_seq=None): sim_score = comp_seq_sim_(seq1=seq1, seq2=seq2) df_pw_sim.at[id1, id2] = sim_score df_pw_sim.at[id2, id1] = sim_score - # Fill diagonal with 1s for self-similarity + # Fill diagonal with 100 for self-similarity (matches the [0, 100] scale of off-diagonal cells) arr = df_pw_sim.to_numpy(copy=True) - np.fill_diagonal(arr, 1) + np.fill_diagonal(arr, 100) df_pw_sim.iloc[:, :] = arr df_pw_sim = df_pw_sim.round(4) return df_pw_sim diff --git a/aaanalysis/seq_analysis_pro/_comp_seq_sim.py b/aaanalysis/seq_analysis_pro/_comp_seq_sim.py index 271022983..070f2f86b 100644 --- a/aaanalysis/seq_analysis_pro/_comp_seq_sim.py +++ b/aaanalysis/seq_analysis_pro/_comp_seq_sim.py @@ -63,7 +63,7 @@ def comp_seq_sim(seq1: Optional[str] = None, ut.check_df(name="df_seq", df=df_seq, accept_none=False, accept_nan=False, cols_required=[ut.COL_SEQ, ut.COL_ENTRY]) for entry, seq in zip(df_seq[ut.COL_ENTRY], df_seq[ut.COL_SEQ]): - ut.check_str(name=f"sequence ({entry}", val=seq, accept_none=False) + ut.check_str(name=f"sequence ({entry})", val=seq, accept_none=False) if df_seq is None: # Compute similarity seq_sim = comp_seq_sim_(seq1=seq1, seq2=seq2) diff --git a/aaanalysis/show_html/_display_df.py b/aaanalysis/show_html/_display_df.py index 13e137741..3a478c134 100644 --- a/aaanalysis/show_html/_display_df.py +++ b/aaanalysis/show_html/_display_df.py @@ -39,7 +39,7 @@ def _check_show(name="row_to_show", val=None, df=None): if val not in rows_or_columns: raise ValueError(f"'{name}' ('{val}') should be one of: {rows_or_columns}") elif isinstance(val, int): - ut.check_number_range(name=name, val=val, accept_none=True, min_val=0, max_val=n, just_int=True) + ut.check_number_range(name=name, val=val, accept_none=True, min_val=0, max_val=n - 1, just_int=True) else: raise ValueError(f"'{name}' ('{val}') should be int (<{n}) or one of following {str_row_or_column} names: {rows_or_columns}") diff --git a/tests/unit/aaclust_tests/test_aac_branch.py b/tests/unit/aaclust_tests/test_aac_branch.py index 6a38a72a5..df3ece4b2 100644 --- a/tests/unit/aaclust_tests/test_aac_branch.py +++ b/tests/unit/aaclust_tests/test_aac_branch.py @@ -39,7 +39,7 @@ def test_n_clusters_exceeds_n_unique_samples(self): X = np.array([[1.0, 2.0], [1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]]) aac = aa.AAclust(verbose=False, random_state=42) - with pytest.raises(ValueError, match="should be >= n_unique_samples"): + with pytest.raises(ValueError, match="should be <= n_unique_samples"): aac.fit(X, n_clusters=5) @given(n_clusters=some.integers(min_value=2, max_value=6)) diff --git a/tests/unit/protein_engineering_tests/test_seqmut_mutate.py b/tests/unit/protein_engineering_tests/test_seqmut_mutate.py index 595325799..2ee73b148 100644 --- a/tests/unit/protein_engineering_tests/test_seqmut_mutate.py +++ b/tests/unit/protein_engineering_tests/test_seqmut_mutate.py @@ -45,6 +45,14 @@ def test_without_df_feat_no_delta(self, df_seq_pos): df = aa.SeqMut().mutate(df_seq=df_seq_pos, mutations=_muts()) assert ut.COL_DELTA_CPP not in df.columns + def test_with_df_feat_duplicate_rows_no_crash(self, df_seq_pos, df_feat): + # Two identical (entry, pos, to_aa) rows share the same mutation label; the scored + # results must still align row-for-row (no non-unique re-join crash). + muts = _muts(("P1", "P1"), (12, 12), ("K", "K")) + df = aa.SeqMut().mutate(df_seq=df_seq_pos, mutations=muts, df_feat=df_feat) + assert len(df) == 2 + assert df[ut.COL_DELTA_CPP].iloc[0] == pytest.approx(df[ut.COL_DELTA_CPP].iloc[1], abs=1e-12) + def test_jmd_n_len(self, df_seq_pos, df_feat): df = aa.SeqMut().mutate(df_seq=df_seq_pos, mutations=_muts(("P1",), (12,), ("K",)), df_feat=df_feat, jmd_n_len=8) diff --git a/tests/unit/test_correctness_batch_342.py b/tests/unit/test_correctness_batch_342.py new file mode 100644 index 000000000..1c6cda97c --- /dev/null +++ b/tests/unit/test_correctness_batch_342.py @@ -0,0 +1,168 @@ +"""Regression tests pinning a batch of low-risk correctness fixes so each defect +cannot silently return.""" +import numpy as np +import pandas as pd +import pytest + +import aaanalysis as aa +import aaanalysis.utils as ut + + +# load_dataset(non_canonical_aa="gap") must not corrupt the shared cache +def test_load_dataset_gap_does_not_corrupt_cache(): + name = "SEQ_CAPSID" # contains non-canonical amino acids (B, U, X) + df_gap = aa.load_dataset(name=name, non_canonical_aa="gap") + assert df_gap[ut.COL_SEQ].str.contains(ut.STR_AA_GAP, regex=False).any(), \ + "test dataset must contain non-canonical AAs so the gap path is exercised" + df_keep = aa.load_dataset(name=name, non_canonical_aa="keep") + assert not df_keep[ut.COL_SEQ].str.contains(ut.STR_AA_GAP, regex=False).any(), \ + "'keep' returned gapped sequences -> the earlier 'gap' call corrupted the cache" + + +# load_features must return a fresh copy, not the shared cached object +def test_load_features_returns_independent_copy(): + d1 = aa.load_features(name="DOM_GSEC") + d2 = aa.load_features(name="DOM_GSEC") + assert d1 is not d2 + d1.iloc[0, 0] = "__SENTINEL__" + d3 = aa.load_features(name="DOM_GSEC") + assert d3.iloc[0, 0] != "__SENTINEL__" + + +# read_fasta -> clear ValueError on pre-header text; leading blank is skipped +def test_read_fasta_preheader_text_raises_valueerror(tmp_path): + bad = tmp_path / "bad.fasta" + bad.write_text("junk before header\n>A\nMKV\n") + with pytest.raises(ValueError): + aa.read_fasta(file_path=str(bad)) + + +def test_read_fasta_leading_blank_line_ok(tmp_path): + ok = tmp_path / "ok.fasta" + ok.write_text("\n>A\nMKV\n>B\nAAA\n") + df = aa.read_fasta(file_path=str(ok)) + assert len(df) == 2 + + +# comp_seq_sim self-similarity diagonal on the [0, 100] scale +def test_comp_seq_sim_diagonal_is_100(): + pytest.importorskip("Bio") + df_seq = pd.DataFrame({ut.COL_ENTRY: ["P1", "P2"], + ut.COL_SEQ: ["ACDEFGHIKL", "ACDEFGHIKM"]}) + res = aa.comp_seq_sim(df_seq=df_seq) + diag = np.diag(np.asarray(res, dtype=float)) + assert np.allclose(diag, 100.0) + + +# get_best_n_clusters must not return 0 for a single-feature set (KMeans(0)) +def test_get_best_n_clusters_single_feature(): + from aaanalysis.feature_engineering._backend.cpp.cpp_eval import get_best_n_clusters + X = np.array([[0.1, 0.2, 0.3, 0.4]]) # one feature (row) + assert get_best_n_clusters(X=X, min_th=0.3, random_state=0) >= 1 + + +# wrong-length marker_size list -> ValueError, not a later IndexError +def test_check_marker_size_wrong_length_raises_valueerror(): + from aaanalysis._utils.plotting import _check_marker_size + with pytest.raises(ValueError): + _check_marker_size(marker_size=[10, 12], list_cat=["a", "b", "c"]) + assert _check_marker_size(marker_size=[10, 12, 14], list_cat=["a", "b", "c"]) == [10, 12, 14] + + +# display_df row/col selector is 0..n-1 (off-by-one) +def test_display_df_out_of_bounds_selector_raises(): + df = pd.DataFrame({"x": [1, 2, 3]}) + with pytest.raises(ValueError): + aa.display_df(df=df, row_to_show=3) # valid rows are 0..2 + + +# check_match_X_n_clusters states the correct inequality +def test_aaclust_n_clusters_message_uses_leq(): + from aaanalysis.feature_engineering._aaclust import check_match_X_n_clusters + X = np.array([[1.0, 2.0], [1.0, 2.0], [1.0, 2.0]]) # 3 samples, 1 unique + with pytest.raises(ValueError) as e: + check_match_X_n_clusters(X=X, n_clusters=2) # n_samples>=n_clusters, n_unique "1" + os.environ["LOKY_MAX_CPU_COUNT"] = "32" # user sets their own value while disabled + aa.options["allow_multiprocessing"] = True + _cfg.check_n_jobs(n_jobs=1) + assert os.environ.get("LOKY_MAX_CPU_COUNT") == "32" # user's value not clobbered + finally: + aa.options["allow_multiprocessing"] = True + _cfg.check_n_jobs(n_jobs=1) + if prev is None: + os.environ.pop("LOKY_MAX_CPU_COUNT", None) + else: + os.environ["LOKY_MAX_CPU_COUNT"] = prev