diff --git a/.gitignore b/.gitignore index c7461ca..1ba0ce6 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,6 @@ glucosepane/ logs/ homosapiens* homo_sapiens*/ +.DS_Store +*.o +contact_map diff --git a/pyproject.toml b/pyproject.toml index 2cff7ca..73481b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,8 +22,8 @@ classifiers = [ "Programming Language :: Python :: 3.11", ] dependencies = [ - "numpy==1.25", - "vermouth==0.9.1", + "numpy>=1.26", + "vermouth==0.9.6", "pandas==2.2.2", "biopython==1.84", "scikit-learn", diff --git a/src/colbuilder/colbuilder.py b/src/colbuilder/colbuilder.py index 21d03ff..6a8b478 100644 --- a/src/colbuilder/colbuilder.py +++ b/src/colbuilder/colbuilder.py @@ -89,11 +89,6 @@ def print_version(ctx: click.Context, param: click.Parameter, value: bool) -> No ctx.exit() -from colbuilder.core.sequence.main_sequence import build_sequence -from colbuilder.core.geometry.main_geometry import build_geometry_anywhere -from colbuilder.core.topology.main_topology import build_topology - - def configure_loggers(): """Configure external loggers to prevent duplicated output.""" problematic_loggers = [ @@ -203,6 +198,7 @@ async def run_sequence_generation(config: ColbuilderConfig) -> Tuple[Optional[Pa """ try: LOG.subsection("Generating Sequence") + from colbuilder.core.sequence.main_sequence import build_sequence return await build_sequence(config) except Exception as e: LOG.error(f"Sequence generation failed: {str(e)}") @@ -234,6 +230,7 @@ async def run_geometry_generation( """ try: LOG.subsection("Building Geometry or Mixing") + from colbuilder.core.geometry.main_geometry import build_geometry_anywhere current_file_manager = file_manager or FileManager(config) @@ -324,6 +321,8 @@ async def run_topology_generation( TopologyGenerationError: If topology generation fails """ try: + from colbuilder.core.topology.main_topology import build_topology + if file_manager is None: file_manager = FileManager(config) @@ -1157,4 +1156,4 @@ def main(**kwargs: Any) -> int: if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/src/colbuilder/core/topology/amber.py b/src/colbuilder/core/topology/amber.py index 7abd107..ad710cb 100644 --- a/src/colbuilder/core/topology/amber.py +++ b/src/colbuilder/core/topology/amber.py @@ -7,6 +7,7 @@ """ import os +import re import shutil from pathlib import Path import asyncio @@ -902,6 +903,12 @@ def write_itp( output_file = itp_file.parent / str(itp_file.name).replace("top", "itp") + # pdb2gmx -merge all writes a single moleculetype. Older GROMACS + # versions name it "Protein_chain_A"; newer versions (>=2023) write + # just "Protein". Match either so the topology body is copied + # regardless of GROMACS version. + molname_re = re.compile(r'^\s*Protein(?:_chain_\w+)?\s+\d+\s*$') + with open(output_file, 'w') as f: write = False for line in itp_model: @@ -909,7 +916,7 @@ def write_itp( break if write: f.write(line) - elif 'Protein_chain_A' in line: + elif molname_re.match(line): f.write('[ moleculetype ]\n') f.write(f'{molecule_name} 3\n') write = True diff --git a/src/colbuilder/core/topology/martini.py b/src/colbuilder/core/topology/martini.py index d119e50..2e4990e 100644 --- a/src/colbuilder/core/topology/martini.py +++ b/src/colbuilder/core/topology/martini.py @@ -13,7 +13,6 @@ The module requires the Martinize2 tool and custom contact map utilities. """ -import cmd import os import subprocess import shutil @@ -79,64 +78,32 @@ def __init__(self, system: Any = None, ff: Optional[str] = None): def merge_pdbs(self, model_id: Optional[int] = None, cnt_model: Optional[int] = None) -> Optional[str]: """ Merge multiple PDB files based on system connectivity. - - Primary path: use System connectivity (model.connect). - Fallback: if no connections are present in the System, infer connect IDs - from files on disk matching '{model_id}.[0-9]*.CG.pdb'. + + Merges all CG PDB files created for a model's connections into a single file. + This includes both self-connections and cross-connections to ensure proper + crosslink detection. """ + model = self.system.get_model(model_id=model_id) + if model is None or model.connect is None: + LOG.error(f"No model or connections for model_id {model_id}") + return None + if cnt_model is None: LOG.error("cnt_model is None in merge_pdbs") return None - + output_file = f"{int(cnt_model)}.merge.pdb" merged_count = 0 - # Try to read connections from the System - sys_connect_ids: Optional[list] = None - if model_id is not None and self.system is not None: - try: - model = self.system.get_model(model_id=model_id) - if model is not None and getattr(model, "connect", None): - sys_connect_ids = list(model.connect) - except Exception as e: - LOG.debug(f"Could not read connect list from System for model {model_id}: {e}") - - # Fallback: infer from files - if not sys_connect_ids: - inferred_files = sorted(Path().glob(f"{int(model_id)}.[0-9]*.CG.pdb")) if model_id is not None else [] - if not inferred_files: - LOG.error(f"No model or connections for model_id {model_id} and no CG files found on disk") - if os.path.exists(output_file): - os.remove(output_file) - return None - LOG.debug(f"Model {model_id}: no connections in System; merging from disk ({len(inferred_files)} CG files).") - - try: - with open(output_file, "w") as f: - for cg_path in inferred_files: - with open(cg_path, "r") as infile: - lines_written = 0 - for line in infile: - if line[0:6] in self.is_line: - f.write(line) - lines_written += 1 - LOG.debug(f"Merged {lines_written} lines from {cg_path.name}") - merged_count += 1 - f.write("END\n") - LOG.debug(f"Successfully merged {merged_count} files into {output_file}") - return output_file - except Exception as e: - LOG.error(f"Error merging PDBs (fallback): {e}") - return None - - # Normal path using System connect IDs try: with open(output_file, "w") as f: - for connect_id in sys_connect_ids: + for connect_id in model.connect: input_file = f"{int(model_id)}.{int(connect_id)}.CG.pdb" + if not os.path.exists(input_file): LOG.warning(f"CG PDB file not found: {input_file}") continue + with open(input_file, "r") as infile: lines_written = 0 for line in infile: @@ -145,20 +112,23 @@ def merge_pdbs(self, model_id: Optional[int] = None, cnt_model: Optional[int] = lines_written += 1 LOG.debug(f"Merged {lines_written} lines from {input_file}") merged_count += 1 - f.write("END\n") + f.write("END\n") + if merged_count == 0: LOG.error(f"No CG files were merged for model {model_id}") if os.path.exists(output_file): os.remove(output_file) return None - + LOG.debug(f"Successfully merged {merged_count} files into {output_file}") - if sys_connect_ids and len(sys_connect_ids) > 1: - LOG.debug(f"Model {model_id} merge contains {merged_count} structures from connections: {sys_connect_ids}") + + if len(model.connect) > 1: + LOG.debug(f"Model {model_id} merge file contains {merged_count} structures from connections: {model.connect}") + return output_file except Exception as e: - LOG.error(f"Error merging PDBs: {e}") + LOG.error(f"Error merging PDBs: {str(e)}") return None def read_pdb(self, pdb_id: Optional[int] = None) -> List[str]: @@ -280,68 +250,72 @@ def set_pdb(self, pdb: Optional[List[str]] = None) -> Tuple[List[str], List[str] def cap_pdb(self, pdb: Optional[List[str]] = None) -> Tuple[List[str], str, str]: """ - Decide martinize2 -nter/-cter flags robustly. - 1) Rename terminal ALA -> CLA in the PDB *before* flag decisions. - 2) If any chain starts/ends on a special block, use 'none' for that side. - Special blocks include ACE/CLA and crosslink blocks (LY2/LY3/L4Y/L5Y/LYX), - and also NME (since it's already an explicit cap). + Cap PDBs according to connect_id in system by adding terminal residues. + + Parameters + ---------- + pdb : Optional[List[str]] + List of PDB file lines + + Returns + ------- + Tuple[List[str], str, str] + Tuple containing (modified_pdb, cter, nter) with capped PDB and terminal types """ if not pdb: LOG.warning("Empty PDB provided to cap_pdb") - return [], "none", "none" + return [], "NME", "ACE" try: - # 1) Rename terminal ALA -> CLA - chain_length = self.get_chain_length(pdb) # e.g. {"A": " 178", "B": "...", "C": "..."} - for i in range(len(pdb)): - line = pdb[i] - if not line.startswith(("ATOM ", "HETATM")): - continue - if line[17:20] == "ALA": - tag = line[21:26] # e.g. "A 178" - if tag in {f"A{chain_length['A']}", f"B{chain_length['B']}", f"C{chain_length['C']}"}: - pdb[i] = line[:17] + "CLA " + line[21:] - - # 2) Inspect first/last residue per chain *after* the rename above - first_res = {} # chain -> resname - last_res = {} # chain -> (resid, resname) + cter, nter = "NME", "ACE" + chain_length = self.get_chain_length(pdb) + + first_residues = {} for line in pdb: - if not line.startswith(("ATOM ", "HETATM")): - continue - ch = line[21:22] - if ch not in self.is_chain: - continue - resname = line[17:20].strip() - try: - resid = int(line[22:26]) - except ValueError: - continue - if ch not in first_res: - first_res[ch] = resname - if ch not in last_res or resid >= last_res[ch][0]: - last_res[ch] = (resid, resname) - - firsts = set(first_res.values()) if first_res else set() - lasts = {name for _, name in last_res.values()} if last_res else set() - - # Anything here means "don't ask martinize2 to apply a terminal mod": - special_first = {"ACE", "CLA", "LY2", "LY3", "L4Y", "L5Y", "LYX"} - special_last = {"ACE", "CLA", "LY2", "LY3", "L4Y", "L5Y", "LYX", "NME"} - - # N-terminus: conservative (ACE often causes issues on GLN etc. in some FF) - nter_flag = "none" if (not firsts or (firsts & special_first)) else "none" - - # C-terminus: allow NME only if all chains end on standard residues (not special, not NME) - cter_flag = "NME" - if not lasts or (lasts & special_last): - cter_flag = "none" - - # LOG.debug(f"cap_pdb decided: -nter {nter_flag}, -cter {cter_flag} (first={firsts}, last={lasts})") - return pdb, cter_flag, nter_flag + if line[0:6] in ("ATOM ", "HETATM"): + chain_id = line[21:22] + if chain_id in self.is_chain and chain_id not in first_residues: + first_residues[chain_id] = line[17:20].strip() + + for line_it in range(len(pdb)): + if pdb[line_it][17:20] == "ALA": + if pdb[line_it][21:26] == "A" + chain_length["A"]: + pdb[line_it] = pdb[line_it][0:17] + "CLA " + pdb[line_it][21:] + elif pdb[line_it][21:26] == "B" + chain_length["B"]: + pdb[line_it] = pdb[line_it][0:17] + "CLA " + pdb[line_it][21:] + elif pdb[line_it][21:26] == "C" + chain_length["C"]: + pdb[line_it] = pdb[line_it][0:17] + "CLA " + pdb[line_it][21:] + + has_ace = any(res == "ACE" for res in first_residues.values()) + all_ace = all(res == "ACE" for res in first_residues.values()) + all_gln = all(res == "GLN" for res in first_residues.values()) + + if all_ace: + nter = "none" + elif all_gln: + nter = "N-ter" + elif has_ace: + nter = "none" + else: + nter = "ACE" + last_residues: Dict[str, str] = {} + for line in pdb: + if line[0:6] in ("ATOM ", "HETATM"): + cid = line[21:22] + if cid in self.is_chain: + last_residues[cid] = line[17:20].strip() + + all_nme = len(last_residues) >= 1 and all(r == "NME" for r in last_residues.values()) + if all_nme: + cter = "none" + elif len(pdb) > 2 and pdb[-2][17:20] == "CLA": + cter = "CLA" + + return pdb, cter, nter except Exception as e: LOG.error(f"Error in cap_pdb: {str(e)}") - return pdb or [], "none", "none" + return pdb, "NME", "ACE" def get_chain_length(self, pdb: Optional[List[str]] = None) -> Dict[str, str]: """ @@ -447,7 +421,10 @@ def get_system_pdb(self, size: Optional[int] = None) -> List[str]: return [] def write_system_topology( - self, topology_file: str = "system.top", size: Optional[int] = None + self, + topology_file: str = "system.top", + size: Optional[int] = None, + processed_ids: Optional[List[int]] = None, ) -> None: """ Write final topology file for the system. @@ -470,9 +447,12 @@ def write_system_topology( f.write('#include "martini_v3.0.0.itp"\n') f.write('#include "go-sites.itp"\n\n') - for m in range(size): - f.write(f'#include "col_{m}.itp"\n') - f.write(f'#include "col_{m}_go-excl.itp"\n') + ids_for_includes = ( + processed_ids if processed_ids is not None else list(range(size)) + ) + for m in ids_for_includes: + f.write(f'#include "col_{int(m)}.itp"\n') + f.write(f'#include "col_{int(m)}_go-excl.itp"\n') f.write('\n#include "martini_v3.0.0_solvents_v1.itp"\n') f.write('#include "martini_v3.0.0_ions_v1.itp"\n') @@ -481,8 +461,8 @@ def write_system_topology( f.write("Collagen, Martini 3 and Go-Potentials \n") f.write("\n[ molecules ]\n") - for t in range(size): - f.write(f"col_{t} 1\n") + for t in ids_for_includes: + f.write(f"col_{int(t)} 1\n") self.write_go_topology(name_type="sites.itp") @@ -521,29 +501,32 @@ def write_go_topology(self, name_type: Optional[str] = None) -> None: LOG.error(f"Error writing GO topology: {str(e)}") def translate_pdb(self, pdb: Optional[List[str]] = None) -> List[str]: + """ + Translate PDB for Martinize2 by formatting coordinates. + + Ensures all coordinates are formatted with exactly 3 decimal places, + which is required for proper processing by Martinize2. + """ if not pdb: LOG.warning("Empty PDB provided to translate_pdb") return [] - + translated = [] for line in pdb: if line[0:6] in self.is_line: try: - # Columns: x[30:38], y[38:46], z[46:54] x = float(line[30:38]) y = float(line[38:46]) z = float(line[46:54]) new_line = f"{line[:30]}{x:8.3f}{y:8.3f}{z:8.3f}{line[54:]}" translated.append(new_line) except ValueError: - # If parse fails, keep original to avoid data loss but log it LOG.warning(f"Bad coord formatting, keeping original: {line.rstrip()}") translated.append(line) else: translated.append(line) return translated - def write_gro( self, system: Optional[Any] = None, @@ -577,15 +560,35 @@ def check_output_files(topology_dir: Path, expected_models: int): return len(merge_files) + @timeit async def build_martini3( system: System, config: ColbuilderConfig, file_manager: Optional[FileManager] = None ) -> Martini: """ Build a Martini 3 topology for the given molecular system. + + Parameters + ---------- + system : System + The molecular system to process + config : ColbuilderConfig + Configuration object containing settings + file_manager : Optional[FileManager] + File manager for consistent file handling + + Returns + ------- + Martini + A Martini object representing the processed system + + Raises + ------ + TopologyGenerationError + If topology generation fails """ ff = f"{config.force_field}" - go_epsilon = getattr(config, "go_epsilon", 9.414) + go_epsilon = config.go_epsilon if hasattr(config, "go_epsilon") else 9.414 if file_manager is None: file_manager = FileManager(config) @@ -642,12 +645,14 @@ async def build_martini3( else: for src_file in source_contactmap_dir.glob("*.c"): shutil.copy2(src_file, local_contactmap_dir / src_file.name) + for header_file in source_contactmap_dir.glob("*.h"): shutil.copy2(header_file, local_contactmap_dir / header_file.name) source_makefile = source_contactmap_dir / "makefile" if source_makefile.exists(): shutil.copy2(source_makefile, local_contactmap_dir / "makefile") + make_process = await asyncio.create_subprocess_shell( "make", cwd=str(local_contactmap_dir), @@ -655,6 +660,7 @@ async def build_martini3( stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await make_process.communicate() + if make_process.returncode != 0: LOG.error(f"Failed to build contact map tool: {stderr.decode()}") else: @@ -663,12 +669,14 @@ async def build_martini3( source_makefile = source_ff_dir / "makefile" if source_makefile.exists(): shutil.copy2(source_makefile, Path("makefile")) + make_process = await asyncio.create_subprocess_shell( "make", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await make_process.communicate() + if make_process.returncode != 0: LOG.error(f"Failed to build contact map tool: {stderr.decode()}") else: @@ -689,12 +697,17 @@ async def build_martini3( ) LOG.info(f"Step 3/{steps} Processing models with Martinize2") + connect_size = system.get_connect_size() processed_models = [] try: martinize2_command = getattr(config, "martinize2_command", None) + martinize2_env = getattr(config, "martinize2_env", None) + use_conda_run = getattr(config, "use_conda_run", False) + if not martinize2_command: from shutil import which + martinize2_cmd = which("martinize2") if martinize2_cmd: config.martinize2_command = "martinize2" @@ -704,8 +717,16 @@ async def build_martini3( error_code="TOP_MART_005", context={"force_field": ff}, ) + except Exception as e: LOG.error(f"Error checking for Martinize2 command: {str(e)}") + raise TopologyGenerationError( + message="Failed to configure Martinize2 command", + original_error=e, + error_code="TOP_MART_005", + context={"force_field": ff}, + ) + LOG.info(f"{Fore.BLUE}Building coarse-grained topology:{Style.RESET_ALL}") if len(list(system.get_models())) > 0: @@ -715,32 +736,29 @@ async def build_martini3( type_dir.mkdir(exist_ok=True, parents=True) models_list = [model_id for model_id in system.get_models()] + model_status = {} - failed_martinize, failed_contact, failed_go, failed_itp = [], [], [], [] - processed_in_topology: set = set() - + failed_martinize = [] + failed_contact = [] + failed_go = [] + failed_itp = [] + + processed_in_topology = set() + for model_id in tqdm(models_list, desc="Building topology", unit="%"): model = system.get_model(model_id=model_id) - if model is None: - LOG.warning(f"Skipping model {model_id}: model not found in System") - model_status[model_id] = "no_model" + if model is None or model.connect is None: + LOG.warning(f"Skipping model {model_id}: No connections found") + model_status[model_id] = "no_connections" continue - # build connect_ids fallback - sys_connect = getattr(model, "connect", None) - if sys_connect: - connect_ids = list(sys_connect) - else: - connect_ids = [model_id] # self-connection - LOG.debug(f"Model {model_id}: no connections in System; processing as self-connection") - - if not connect_ids: - LOG.warning(f"Skipping model {model_id}: no connections and no fallback") - model_status[model_id] = "no_connections" + if model_id in processed_in_topology: + LOG.debug(f"Skipping model {model_id}: Already included in another model's topology file") + model_status[model_id] = "already_processed" continue try: - for connect_id in connect_ids: + for connect_id in model.connect: try: pdb = martini.read_pdb(pdb_id=connect_id) if not pdb: @@ -748,34 +766,31 @@ async def build_martini3( continue pdb = martini.translate_pdb(pdb=pdb) - pdb_capped, cter, nter = martini.cap_pdb(pdb=pdb) - order, map_pdb = martini.set_pdb(pdb=pdb_capped) + cap_pdb, cter, nter = martini.cap_pdb(pdb=pdb) + order, map_pdb = martini.set_pdb(pdb=cap_pdb) martini.write_pdb(pdb=order, file="tmp.pdb") martini.write_pdb(pdb=map_pdb, file="map.pdb") - def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: - args = ( - f"-f tmp.pdb -sep -merge A,B,C " - f"-collagen -from amber99 -o topol.top -bonds-fudge 1.4 -p backbone " - f"-ff {martini.ff}00C -x {int(model_id)}.{int(connect_id)}.CG.pdb " - f"-nter {nter_flag} -cter {cter_flag} " - f"-govs-include -govs-moltype col_{int(model_id)}.{int(connect_id)} -maxwarn 4" - ) - return get_conda_command_with_path("martinize2", args) + martinize_args = ( + f"-f tmp.pdb -sep -merge A,B,C " + f"-collagen -from amber99 -o topol.top -bonds-fudge 1.4 -p backbone " + f"-ff {martini.ff}00C -x {int(model_id)}.{int(connect_id)}.CG.pdb " + f"-nter {nter} -cter {cter} " + f"-govs-include -govs-moltype col_{int(model_id)}.{int(connect_id)} -maxwarn 4" + ) + + cmd = get_conda_command_with_path("martinize2", martinize_args) - cmd = _martinize_cmd(nter, cter) process = await asyncio.create_subprocess_shell( - cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() - stderr_txt = stderr.decode(errors="ignore") - stdout_txt = stdout.decode(errors="ignore") if process.returncode != 0: LOG.error(f"Martinize2 failed for model {model_id} -> connection {connect_id}") - LOG.error(f"martinize2 stderr:\n{stderr_txt}") - LOG.debug(f"martinize2 stdout:\n{stdout_txt}") failed_martinize.append((model_id, connect_id)) continue @@ -793,6 +808,7 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: stderr=asyncio.subprocess.PIPE, ) contact_stdout, contact_stderr = await contact_process.communicate() + if contact_process.returncode != 0: LOG.error(f"Contact map failed for model {model_id} -> connection {connect_id}") failed_contact.append((model_id, connect_id)) @@ -802,12 +818,14 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: f"python create_goVirt.py -s {int(model_id)}.{int(connect_id)}.CG.pdb " f"-f map.out --moltype col_{int(model_id)}.{int(connect_id)} --go_eps {go_epsilon}" ) + go_process = await asyncio.create_subprocess_shell( go_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) go_stdout, go_stderr = await go_process.communicate() + if go_process.returncode != 0: LOG.error(f"GO virtual sites creation failed for model {model_id} -> connection {connect_id}") failed_go.append((model_id, connect_id)) @@ -824,13 +842,12 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: itp_ = Itp(system=system, model_id=int(model_id)) itp_.read_model(model_id=int(model_id)) itp_.go_to_pairs(model_id=int(model_id)) - itp_.make_topology(model_id=int(model_id), cnt_model=cnt_model) - processed_models.append(model_id) - - # Track what we processed - for connect_id in connect_ids: + itp_.make_topology(model_id=int(model_id), cnt_model=int(cnt_model)) + processed_models.append(cnt_model) + + for connect_id in model.connect: processed_in_topology.add(connect_id) - + except Exception as e: LOG.error(f"Error processing ITP for model {model_id}: {str(e)}") failed_itp.append(model_id) @@ -845,7 +862,7 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: model_status[model_id] = f"error: {str(e)}" LOG.info(f"Successfully processed: {len(processed_models)} models") - + if failed_martinize: LOG.warning(f"Failed at Martinize2: {failed_martinize}") if failed_contact: @@ -854,7 +871,7 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: LOG.warning(f"Failed at GO creation: {failed_go}") if failed_itp: LOG.warning(f"Failed at ITP processing: {failed_itp}") - + check_output_files(Path.cwd(), cnt_model) if not processed_models: @@ -871,8 +888,9 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: system_pdb = martini.get_system_pdb(size=cnt_model) pdb_file_path = Path(f"collagen_fibril_CG_{config.species}.pdb") martini.write_pdb(pdb=system_pdb, file=str(pdb_file_path)) + if pdb_file_path.exists(): - file_manager.copy_to_directory(pdb_file_path, dest_dir=output_topology_dir) + dest_path = file_manager.copy_to_directory(pdb_file_path, dest_dir=output_topology_dir) except Exception as e: raise TopologyGenerationError( message="Failed to create system PDB file", @@ -883,24 +901,29 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: try: final_topology_file = f"collagen_fibril_{config.species}.top" - martini.write_system_topology(topology_file=final_topology_file, size=cnt_model) + processed_int_ids = sorted({int(m) for m in processed_models}) + martini.write_system_topology( + topology_file=final_topology_file, + size=cnt_model, + processed_ids=processed_int_ids, + ) topology_file_path = Path(final_topology_file) if topology_file_path.exists(): - file_manager.copy_to_directory(topology_file_path, dest_dir=output_topology_dir) + dest_path = file_manager.copy_to_directory(topology_file_path, dest_dir=output_topology_dir) for itp_file in Path().glob("col_[0-9]*.itp"): - file_manager.copy_to_directory(itp_file, dest_dir=output_topology_dir) + dest_path = file_manager.copy_to_directory(itp_file, dest_dir=output_topology_dir) for excl_file in Path().glob("col_[0-9]*_go-excl.itp"): - file_manager.copy_to_directory(excl_file, dest_dir=output_topology_dir) + dest_path = file_manager.copy_to_directory(excl_file, dest_dir=output_topology_dir) for go_site_file in Path().glob("*go-sites.itp"): - file_manager.copy_to_directory(go_site_file, dest_dir=output_topology_dir) + dest_path = file_manager.copy_to_directory(go_site_file, dest_dir=output_topology_dir) source_martini_files = list(source_ff_dir.glob("martini_v3.0.0*")) for source_file in source_martini_files: - file_manager.copy_to_directory(source_file, dest_dir=output_topology_dir) + dest_path = file_manager.copy_to_directory(source_file, dest_dir=output_topology_dir) if not source_martini_files: LOG.warning("No Martini force field files found in source directory") @@ -922,6 +945,7 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: LOG.info(f"{Fore.BLUE}Martini topology generated successfully for {len(processed_models)} models.{Style.RESET_ALL}") os.chdir(original_dir) + return martini except TopologyGenerationError: @@ -938,4 +962,4 @@ def _martinize_cmd(nter_flag: str, cter_flag: str) -> str: "error_details": str(e), "location": "build_martini3", }, - ) + ) \ No newline at end of file