Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 98 additions & 44 deletions rp2paths/enumerate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,29 @@ def build_reaction_dicts(stoich_mat, reactions, logger = logging.getLogger(__nam
reaction2products[rxn] = products
return substrates2reactions, reaction2products

def check_cycle(rxn, current_path, consumed_cmpds, products, logger = logging.getLogger(__name__)):
def check_cycle(rxn, current_path, consumed_cmpds, products, logger=logging.getLogger(__name__)):
"""
Check if the current reaction leads to a cycle.
A cycle is detected if the reaction is already in the current path,
or if any of the products are already consumed in the current path.
Check if the current reaction leads to a cycle. A cycle is considered as
such when the following conditions are met:
1. If the reaction is already in the current path.
2. If all of the products are already described in the current path.
"""
if rxn in current_path:
logger.debug(f"Cycle detected: reaction {rxn} is already in the current path {current_path}.")
return True
for product in products:
if product in consumed_cmpds:
logger.debug(f"Cycle detected: product {product} is already consumed in the current path {consumed_cmpds}.")
return True
return False
if rxn == "TRS_0_2_1":
logger.debug("Debug point for TRS_0_2_1")
logger.debug(f"Current path: {current_path}")
logger.debug(f"Consumed compounds: {consumed_cmpds}")
logger.debug(f"Products: {products}")
logger.debug(f"Checking product: {product}")
if product not in consumed_cmpds:
return False
logger.debug(f"Cycle detected: all products {products} are already in the current path {current_path}.")
return True

def check_limits(nb_paths, max_paths, depth, max_depth, logger = logging.getLogger(__name__)):
def check_limits(nb_paths, max_paths, depth, max_depth, logger=logging.getLogger(__name__)):
"""
Check if the current limits for paths and depth are reached.
"""
Expand All @@ -79,6 +86,34 @@ def check_limits(nb_paths, max_paths, depth, max_depth, logger = logging.getLogg
limit_reached = True
return limit_reached, return_value

def check_max_depth(depth, max_depth, logger=logging.getLogger(__name__)):
"""
Check if the current depth reached the maximum allowed depth.
"""
if max_depth > 0 and depth > max_depth:
logger.debug(f"Current depth {depth} reached maximum depth {max_depth}.")
return True
return False

def check_max_paths(nb_paths, max_paths, logger=logging.getLogger(__name__)):
"""
Check if the current number of paths reached the maximum allowed paths.
"""
if max_paths > 0 and nb_paths >= max_paths:
logger.info(f"Current number of paths {nb_paths} reached maximum paths {max_paths}.")
return True
return False

def append_path_if_not_exists(current_path, all_paths, start_cmpd, logger=logging.getLogger(__name__)):
"""
Append the current path to all_paths if it is not already present.
"""
if current_path not in all_paths:
all_paths.append(current_path)
logger.debug(f"Added path {len(all_paths)}: {start_cmpd}, " + " -> ".join([f"({step})" for step in current_path]))
else:
logger.debug(f"Path {current_path} already exists in all_paths. Skipping.")

def find_max_shortest_path(
start_cmpd,
substrates2reactions,
Expand All @@ -90,23 +125,37 @@ def find_max_shortest_path(
start_cmpd to each compound.
"""
shortest_lengths = {}
sub = start_cmpd
shortest_lengths[sub] = 0
for iter in range(1000): # arbitrary large number to avoid infinite loops
visited = set()
next_subs = [start_cmpd]
shortest_lengths[start_cmpd] = 0

for _ in range(1_000_000): # arbitrary large number to avoid infinite loops
if not next_subs:
# No more compounds to explore
break

# Get the next compound to explore
sub = next_subs.pop(0)
visited.add(sub)

# Explore reactions from this compound as substrate, if any
if sub not in substrates2reactions:
continue

# Update shortest lengths for products of reactions using this substrate
for rxn in substrates2reactions[sub]:
products = reaction2products[rxn]
for product in products:
current_length = shortest_lengths.get(product, float('inf'))
new_length = shortest_lengths.get(sub, 0) + 1
if new_length < current_length:
shortest_lengths[product] = new_length
# Update sub to the next compound to explore
next_subs = [p for rxn in substrates2reactions.get(sub, []) for p in reaction2products[rxn]]
if not next_subs:
break
sub = next_subs[0] # just pick one to continue

# Update next_subs for future exploration
for rxn in substrates2reactions.get(sub, []):
for p in reaction2products[rxn]:
if p not in visited and p not in next_subs:
next_subs.append(p)

# Finally extract the maximum shortest path lengths
max_shortest_length = max(shortest_lengths.values(), default=0)
Expand Down Expand Up @@ -147,35 +196,40 @@ def dfs(current_cmpd, current_path, depth, consumed_cmpds):
# Log the current state of the search by adding {depth} times a tab character
logger.debug(f"{'-> ' * depth}Exploring paths from compound {current_cmpd} with path: {current_path}")
# Basis case of recursion:
# If the current compound is not a substrate for a reaction, we reached a dead end
if current_cmpd not in substrates2reactions:
# If the current compound is not a substrate for a reaction, we reached a dead end
logger.debug(f"Dead end reached at compound {current_cmpd} with path: {current_path}")
# If the current path is not already in all_paths, add it
if current_path not in all_paths:
all_paths.append(current_path)
logger.debug(f"Added path {len(all_paths)}: {start_cmpd}, " + " -> ".join([f"({step})" for step in current_path]))
else:
logger.debug(f"Path {current_path} already exists in all_paths. Skipping.")
else:
for rxn in substrates2reactions[current_cmpd]:
# Check if the maximum number of paths or depth is reached
limit_reached, return_value = check_limits(
len(all_paths), max_paths, depth, max_depth, logger
)
if limit_reached:
return return_value
products = reaction2products[rxn]
# Check if the current reaction leads to a cycle
cycle_detected = check_cycle(rxn, current_path, consumed_cmpds, products, logger)
if not cycle_detected:
for product in products:
max_paths_reached = dfs(product, current_path + [rxn], depth + 1, consumed_cmpds + [current_cmpd])
logger.debug(f"{'-> ' * depth}Returning from a recursive call with product {product} and path: {current_path + [rxn]}")
# If maximum paths reached, stop further exploration,
# otherwise it means only depth limit was reached,
# and we can continue exploring other parts of the graph
if max_paths_reached:
return max_paths_reached
append_path_if_not_exists(current_path, all_paths, start_cmpd, logger)
return False

if check_max_depth(depth, max_depth, logger):
# If maximum depth is reached, we stop exploring further
logger.debug(f"Maximum depth reached at compound {current_cmpd} with path: {current_path}")
append_path_if_not_exists(current_path, all_paths, start_cmpd, logger)
return False

if check_max_paths(len(all_paths), max_paths, logger):
# If maximum number of paths is reached, we stop exploring further
logger.info(f"Maximum number of paths ({max_paths}) reached. Stopping enumeration.")
return True # Indicate that max paths reached

# Recursive case: explore each reaction from the current compound
for rxn in substrates2reactions[current_cmpd]:
products = reaction2products[rxn]
for product in products:
if check_cycle(rxn, current_path, consumed_cmpds, products, logger):
# If a cycle is detected, we stop exploring further
logger.debug(f"Cycle detected at compound {current_cmpd} with path: {current_path}")
append_path_if_not_exists(current_path, all_paths, start_cmpd, logger)
return False

max_paths_reached = dfs(product, current_path + [rxn], depth + 1, consumed_cmpds + [current_cmpd])
logger.debug(f"{'-> ' * depth}Returning from a recursive call with product {product} and path: {current_path + [rxn]}")
# If maximum paths reached, stop further exploration,
# otherwise it means only depth limit was reached,
# and we can continue exploring other parts of the graph
if max_paths_reached:
return max_paths_reached

dfs(start_cmpd, [], 0, [start_cmpd])
return all_paths
Expand Down
Loading