From cf787fb91ba58fbb628a79bed57e2a6725f5309a Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Mon, 26 May 2025 20:39:01 -0700 Subject: [PATCH 1/8] configs to make premain work, 5/26 --- EdgeLake/docker_makefile/edgelake_operator1.env | 4 ++-- EdgeLake/docker_makefile/edgelake_operator2.env | 4 ++-- EdgeLake/docker_makefile/edgelake_operator3.env | 4 ++-- edgefl/env_files/mnist/mnist-agg.env | 8 ++++---- edgefl/env_files/mnist/mnist1.env | 8 ++++---- edgefl/env_files/mnist/mnist2.env | 8 ++++---- edgefl/env_files/mnist/mnist3.env | 8 ++++---- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/EdgeLake/docker_makefile/edgelake_operator1.env b/EdgeLake/docker_makefile/edgelake_operator1.env index eb2ced8..65eaa8e 100644 --- a/EdgeLake/docker_makefile/edgelake_operator1.env +++ b/EdgeLake/docker_makefile/edgelake_operator1.env @@ -28,7 +28,7 @@ DB_USER="demo" # Password correlated to database user DB_PASSWD="passwd" # Database IP address -DB_IP=192.168.56.1 +DB_IP=192.168.108.182 # Database port number DB_PORT=5432 # Whether to set autocommit data @@ -58,7 +58,7 @@ MEMORY=false # TCP connection information for Master Node ## Use your Master node's docker IP ## ## Command docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' edgelake-master ## -LEDGER_CONN=172.19.0.2:32048 +LEDGER_CONN=172.20.0.2:32048 # How often to sync from blockchain BLOCKCHAIN_SYNC=10 seconds diff --git a/EdgeLake/docker_makefile/edgelake_operator2.env b/EdgeLake/docker_makefile/edgelake_operator2.env index 0c3b89a..faa5793 100644 --- a/EdgeLake/docker_makefile/edgelake_operator2.env +++ b/EdgeLake/docker_makefile/edgelake_operator2.env @@ -28,7 +28,7 @@ DB_USER="demo" # Password correlated to database user DB_PASSWD="passwd" # Database IP address -DB_IP=192.168.56.1 +DB_IP=192.168.108.182 # Database port number DB_PORT=5432 # Whether to set autocommit data @@ -58,7 +58,7 @@ MEMORY=false # TCP connection information for Master Node ## Use your Master node's docker IP ## ## Command docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' edgelake-master ## -LEDGER_CONN=172.19.0.2:32048 +LEDGER_CONN=172.20.0.2:32048 # How often to sync from blockchain BLOCKCHAIN_SYNC=10 seconds diff --git a/EdgeLake/docker_makefile/edgelake_operator3.env b/EdgeLake/docker_makefile/edgelake_operator3.env index 3f82e8a..d47600d 100644 --- a/EdgeLake/docker_makefile/edgelake_operator3.env +++ b/EdgeLake/docker_makefile/edgelake_operator3.env @@ -28,7 +28,7 @@ DB_USER="demo" # Password correlated to database user DB_PASSWD="passwd" # Database IP address -DB_IP=192.168.56.1 +DB_IP=192.168.108.182 # Database port number DB_PORT=5432 # Whether to set autocommit data @@ -58,7 +58,7 @@ MEMORY=false # TCP connection information for Master Node ## Use your Master node's docker IP ## ## Command docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' edgelake-master ## -LEDGER_CONN=172.19.0.2:32048 +LEDGER_CONN=172.20.0.2:32048 # How often to sync from blockchain BLOCKCHAIN_SYNC=10 seconds diff --git a/edgefl/env_files/mnist/mnist-agg.env b/edgefl/env_files/mnist/mnist-agg.env index a94bb1a..1221bed 100644 --- a/edgefl/env_files/mnist/mnist-agg.env +++ b/edgefl/env_files/mnist/mnist-agg.env @@ -2,15 +2,15 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/home/miguel06/PycharmProjects/Anylog-Edgelake-Federated-Learning-Platform +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir/ # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32049" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32048" +EXTERNAL_IP="192.168.108.182:32049" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32048" AGG_NAME=agg @@ -18,7 +18,7 @@ AGG_NAME=agg PSQL_DB_NAME="mnist_fl" PSQL_DB_USER="demo" PSQL_DB_PASSWORD="passwd" -PSQL_HOST="192.168.56.1" +PSQL_HOST="192.168.1.125" PSQL_PORT="5432" FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/env_files/mnist/mnist1.env b/edgefl/env_files/mnist/mnist1.env index 17f1679..b13f142 100644 --- a/edgefl/env_files/mnist/mnist1.env +++ b/edgefl/env_files/mnist/mnist1.env @@ -1,21 +1,21 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/home/miguel06/PycharmProjects/Anylog-Edgelake-Federated-Learning-Platform +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir/ # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32149" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32148" +EXTERNAL_IP="192.168.108.182:32149" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32148" # LOCAL PSQL DB NAME PSQL_DB_NAME=mnist_fl PSQL_DB_USER=demo PSQL_DB_PASSWORD=passwd -PSQL_HOST=192.168.56.1 +PSQL_HOST=192.168.108.182 PSQL_PORT=5432 FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/env_files/mnist/mnist2.env b/edgefl/env_files/mnist/mnist2.env index 15ff090..7073039 100644 --- a/edgefl/env_files/mnist/mnist2.env +++ b/edgefl/env_files/mnist/mnist2.env @@ -2,21 +2,21 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/home/miguel06/PycharmProjects/Anylog-Edgelake-Federated-Learning-Platform +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir/ # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32249" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32248" +EXTERNAL_IP="192.168.108.182:32249" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32248" # LOCAL PSQL DB NAME PSQL_DB_NAME="mnist_fl" PSQL_DB_USER="demo" PSQL_DB_PASSWORD="passwd" -PSQL_HOST="172.30.176.90" +PSQL_HOST="192.168.108.182" PSQL_PORT="5432" FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/env_files/mnist/mnist3.env b/edgefl/env_files/mnist/mnist3.env index da97652..59734a1 100644 --- a/edgefl/env_files/mnist/mnist3.env +++ b/edgefl/env_files/mnist/mnist3.env @@ -2,21 +2,21 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/home/miguel06/PycharmProjects/Anylog-Edgelake-Federated-Learning-Platform +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32349" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32348" +EXTERNAL_IP="192.168.108.182:32349" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32348" # LOCAL PSQL DB NAME PSQL_DB_NAME="mnist_fl" PSQL_DB_USER="demo" PSQL_DB_PASSWORD="passwd" -PSQL_HOST="172.30.176.90" +PSQL_HOST="192.168.108.182" PSQL_PORT="5432" FILE_WRITE_DESTINATION="edgefl/file_write" From a6af41f81c53449be61bc404b49f937ce39fb211 Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Tue, 27 May 2025 02:09:49 -0700 Subject: [PATCH 2/8] hardcoded listener to make DFL work; TODO: config init curl to handle DFL vs CFL header, verify with 4 training nodes --- edgefl/platform_components/node/node.py | 126 ++++++++++++++++++ .../platform_components/node/node_server.py | 59 +++++++- 2 files changed, 182 insertions(+), 3 deletions(-) diff --git a/edgefl/platform_components/node/node.py b/edgefl/platform_components/node/node.py index 02e0397..796ff78 100644 --- a/edgefl/platform_components/node/node.py +++ b/edgefl/platform_components/node/node.py @@ -7,6 +7,7 @@ import logging import os import pickle +import requests from asyncio import sleep # import keras @@ -19,6 +20,7 @@ from platform_components.EdgeLake_functions.mongo_file_store import read_file, write_file, copy_file_from_container from platform_components.lib.logger.logger_config import configure_logging from platform_components.helpers.LoadClassFromFile import load_class_from_file +from platform_components.lib.modules.local_model_update import LocalModelUpdate from dotenv import load_dotenv load_dotenv() @@ -269,6 +271,130 @@ def train_model_params(self, aggregator_model_params_db_link, round_number, ip_p return f'{self.docker_file_write_destination}/{index}/{file}' return file_name + + # DFL + def dfl_train_model_params(self, round_number, index): + self.logger.debug(f"[{index}] in train_model_params for round {round_number}") + + weights = self.data_handlers[index].get_weights() + + if round_number == 1: + # Update model with weights + self.data_handlers[index].update_model(weights) + + # Train model + # model_update = self.local_training_handler.train({}) + # print(f"[INFO] [{index}][Round {round_number}] ========== Model training progress ==========") + model_params = self.data_handlers[index].train(round_number) + self.logger.info(f"[{index}][Round {round_number}] Step 2 Complete: Model training done") + + # Save and return new weights + encoded_params = self.encode_model(model_params) + file = f"{round_number}-replica-{self.replica_name}.pkl" + # make sure directory exists + os.makedirs(os.path.dirname(f"{self.file_write_destination}/{index}/"), exist_ok=True) + file_name = f"{self.file_write_destination}/{index}/{file}" + with open(f"{file_name}", "wb") as f: + f.write(encoded_params) + + if self.docker_running: + self.logger.debug(f'[{index}] written to container at {f"{self.docker_file_write_destination}/{index}/{file}"}') + copy_file_to_container(os.path.join(self.tmp_dir, index), self.docker_container_name, file_name, f"{self.docker_file_write_destination}/{index}/{file}") + return f'{self.docker_file_write_destination}/{index}/{file}' + return file_name + + # DFL + async def listen_for_update_dfl(self, min_params, round_number, index): + logger = self.logger + decoded_params = {} + + while True: + try: + headers = { + 'User-Agent': 'AnyLog/1.23', + 'command': f'blockchain get {index}-a{round_number}' + } + response = requests.get(self.edgelake_node_url, headers=headers) + response.raise_for_status() + results = response.json() + + node_params_links = [] + ip_ports = [] + + # for item in results: + # key = f"{index}-a{round_number}" + # if key in item and item[key]['node'] != self.replica_name: # skip own model + # node_params_links.append(item[key]['trained_params_local_path']) + # ip_ports.append(item[key]['ip_port']) + for item in results: + key = f"{index}-a{round_number}" + if key in item: + source_node = item[key]['node'] + if source_node != self.replica_name: + link = item[key]['trained_params_local_path'] + node_params_links.append(link) + ip_ports.append(item[key]['ip_port']) + logger.info( + f"[{index}] [Round {round_number}] Found peer model from {source_node} at {link}") + else: + logger.debug(f"[{index}] Skipping own model {source_node}") + + # Fetch and decode + self.fetch_decoded_params(decoded_params, node_params_links, ip_ports, index) + + if len(decoded_params) >= min_params: + return list(decoded_params.values()) + + except Exception as e: + logger.error(f"[{index}] DFL listen error: {str(e)}") + + await sleep(2) + + # DFL + def fetch_decoded_params(self, decoded_params_dict, node_param_download_links, ip_ports, index): + # use the node_param_download_links to get all the file + # in the form of tuples, like ["('blobs_admin', 'node_model_updates', '1-replica-node1.pkl')"] + # node_ref = db.reference('node_model_updates') + + # Loop through each provided download link to retrieve node parameter objects + for i, path in enumerate(node_param_download_links): + # Don't fetch for existing paths + if path in decoded_params_dict: + continue + + try: + # Make sure the directory exists + filename = path.split('/')[-1] + local_path = f'{self.file_write_destination}/{index}/{filename}' + if self.docker_running: + docker_file_path = f'{self.docker_file_write_destination}/{index}/{filename}' + response = read_file(self.edgelake_node_url, path, + docker_file_path, ip_ports[i]) + copy_file_from_container(os.path.join(self.tmp_dir, index), self.docker_container_name, + docker_file_path, + local_path) + else: + response = read_file(self.edgelake_node_url, path, + local_path, ip_ports[i]) + + if response.status_code != 200: + raise ValueError( + f"Failed to retrieve node params from link: {filename}. HTTP Status: {response.status_code}" + ) + + # Decode the model weights from the file + sleep(1) + with open(local_path, 'rb') as f: + data = pickle.load(f) + if not data: + raise ValueError(f"Missing model_weights in data from file: {filename}") + + decoded_params_dict[path] = LocalModelUpdate(weights=data) + + except Exception as e: + self.logger.error(f"Error retrieving data from link {filename}: {str(e)}") + continue + def encode_model(self, model_update): serialized_data = pickle.dumps(model_update) return serialized_data diff --git a/edgefl/platform_components/node/node_server.py b/edgefl/platform_components/node/node_server.py index 3a075e9..4e2fe87 100644 --- a/edgefl/platform_components/node/node_server.py +++ b/edgefl/platform_components/node/node_server.py @@ -18,6 +18,7 @@ import argparse import requests import warnings +import asyncio from uvicorn import run from fastapi import FastAPI, HTTPException, status @@ -119,13 +120,23 @@ def init_node(request: InitNodeRequest): # print(f"starting round numbers: {node_instance.round_number}") # print(f"training apps: {node_instance.data_handlers}") + # CFL: # Start event listener for start round + # listener_thread = threading.Thread( + # name=f"{replica_name}--{index}", + # target=listen_for_start_round, + # args=(node_instance, index, lambda: stop_listening_thread) + # ) + # listener_thread.daemon = True # Make thread daemon so it exits when main thread exits + # listener_thread.start() + + # DFL: listener_thread = threading.Thread( name=f"{replica_name}--{index}", - target=listen_for_start_round, - args=(node_instance, index, lambda: stop_listening_thread) + target=run_dfl_training_loop, + args=(node_instance, index), + daemon=True ) - listener_thread.daemon = True # Make thread daemon so it exits when main thread exits listener_thread.start() return { @@ -210,6 +221,48 @@ def listen_for_start_round(nodeInstance, index, stop_event): logger.error(f"[{index}] Error in listener thread: {str(e)}") time.sleep(2) +# DFL +def run_dfl_training_loop(node_instance, index, max_rounds=10, min_params=2): + current_round = node_instance.round_number.get(index, 1) + + while current_round <= max_rounds: + try: + node_instance.logger.info(f"[{index}] [Round {current_round}] Starting decentralized training") + + # 1. Get peer models (skip on round 1) then their weights + if current_round > 1: + node_instance.logger.info(f"[{index}] [Round {current_round}] Waiting for {min_params} peer models...") + # asynchronously poll from blockchain to get other training nodes' model weights + peer_params = asyncio.run(node_instance.listen_for_update_dfl(min_params, current_round - 1, index)) + node_instance.logger.info(f"[{index}] [Round {current_round}] Retrieved {len(peer_params)} peer models") + # combine the fetched training nodes' params + agg_weights = node_instance.data_handlers[index].aggregate_model_weights(peer_params) + node_instance.logger.info(f"[{index}] [Round {current_round}] Aggregated weights from peers") + # update the current nodes' model + node_instance.data_handlers[index].update_model(agg_weights) + node_instance.logger.info(f"[{index}] [Round {current_round}] Done updating the model weights!") + + # 2. Local training with the updated model + modelUpdate_metadata = node_instance.dfl_train_model_params( + round_number=current_round, + index=index + ) + node_instance.logger.info(f"[{index}] [Round {current_round}] Local training complete") + + # 3. Publish to blockchain so that other nodes can fetch + node_instance.add_node_params(current_round, modelUpdate_metadata, index) + node_instance.logger.info(f"[{index}] [Round {current_round}] Published model to blockchain") + + current_round += 1 + node_instance.round_number[index] = current_round + + time.sleep(2) # Optionally throttle to reduce load + except Exception as e: + node_instance.logger.error(f"[{index}] Error in DFL loop round {current_round}: {str(e)}") + time.sleep(5) + + + # TODO: move to a (helper) file (i.e. 'node_helpers.py'?) # Extracts initParams from the policy 'index-r' at the specified index def get_most_recent_agg_params(index): From 2425769901ed877dc79d419f54bf5273660a8cf8 Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Wed, 28 May 2025 03:08:00 -0700 Subject: [PATCH 3/8] configure so that user can do either CFL or DFL in init-training request --- .../aggregator/aggregator_server.py | 10 ++++-- .../platform_components/node/node_server.py | 35 ++++++++++--------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/edgefl/platform_components/aggregator/aggregator_server.py b/edgefl/platform_components/aggregator/aggregator_server.py index b979b19..4def425 100644 --- a/edgefl/platform_components/aggregator/aggregator_server.py +++ b/edgefl/platform_components/aggregator/aggregator_server.py @@ -55,6 +55,7 @@ class InitRequest(BaseModel): module: str module_file: str db_name: str + training_method: str = "CFL" class TrainingRequest(BaseModel): totalRounds: int @@ -85,6 +86,7 @@ def init(request: InitRequest): node_urls, index = request.nodeUrls, request.index module_name, module_file = request.module, request.module_file db_name = request.db_name + training_method = request.training_method # Verify filepath exists module_path = os.path.join(aggregator.training_app_dir, module_file) @@ -98,7 +100,7 @@ def init(request: InitRequest): if not index in aggregator.round_number: aggregator.round_number[index] = 1 - initialize_nodes(node_urls, index, module_name, module_path, db_name) + initialize_nodes(node_urls, index, module_name, module_path, db_name, training_method) aggregator.set_module_at_index(index, module_name, module_path) aggregator.initialize_index_on_blockchain(index, module_name, module_path, db_name) @@ -134,7 +136,7 @@ def is_node_online(node_url: str): except requests.exceptions.RequestException: return False -def initialize_nodes(node_urls: list[str], index, module_name, module_path, db_name): +def initialize_nodes(node_urls: list[str], index, module_name, module_path, db_name, training_method): """Send the deployed contract address to multiple node servers.""" def init_node(node_url: str): try: @@ -170,7 +172,8 @@ def init_node(node_url: str): 'round_number': aggregator.round_number[index], 'module_name': module_name, 'module_path': module_path, - 'db_name': db_name + 'db_name': db_name, + 'training_method': training_method }) with aggregator.lock: @@ -272,6 +275,7 @@ async def init_training(request: TrainingRequest): def start_training(aggregator, initial_params, starting_round, end_round, index): try: + logger.info("[AGGREGATOR_SERVER.py] inside start_training!!") for r in range(starting_round, end_round + 1): aggregator.round_number[index] = r logger.info(f"[{index}] Starting training round {r}") diff --git a/edgefl/platform_components/node/node_server.py b/edgefl/platform_components/node/node_server.py index 4e2fe87..fe8c005 100644 --- a/edgefl/platform_components/node/node_server.py +++ b/edgefl/platform_components/node/node_server.py @@ -74,6 +74,7 @@ class InitNodeRequest(BaseModel): module_name: str module_path: str db_name: str + training_method: str = "CFL" # Default to centralized @app.post('/init-node') @@ -88,6 +89,7 @@ def init_node(request: InitNodeRequest): index = request.replica_index module_name = request.module_name module_path = request.module_path + training_method = request.training_method db_name = request.db_name # testing winniio_fl + mnist_fl DBs @@ -120,23 +122,22 @@ def init_node(request: InitNodeRequest): # print(f"starting round numbers: {node_instance.round_number}") # print(f"training apps: {node_instance.data_handlers}") - # CFL: - # Start event listener for start round - # listener_thread = threading.Thread( - # name=f"{replica_name}--{index}", - # target=listen_for_start_round, - # args=(node_instance, index, lambda: stop_listening_thread) - # ) - # listener_thread.daemon = True # Make thread daemon so it exits when main thread exits - # listener_thread.start() - - # DFL: - listener_thread = threading.Thread( - name=f"{replica_name}--{index}", - target=run_dfl_training_loop, - args=(node_instance, index), - daemon=True - ) + if training_method == "DFL": + logger.info(f"[{index}] Node {replica_name} using DFL mode") + listener_thread = threading.Thread( + name=f"{replica_name}--DFL-{index}", + target=run_dfl_training_loop, + args=(node_instance, index), + daemon=True + ) + else: # CFL + logger.info(f"[{index}] Node {replica_name} using CFL mode") + listener_thread = threading.Thread( + name=f"{replica_name}--CFL-{index}", + target=listen_for_start_round, + args=(node_instance, index, lambda: stop_listening_thread), + daemon=True + ) listener_thread.start() return { From 298a1b9b291b442d4aa490da12474e4b2567287a Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Mon, 2 Jun 2025 03:18:02 -0700 Subject: [PATCH 4/8] some clean up to dfl node methods --- edgefl/platform_components/node/node.py | 21 +++++++++---------- .../platform_components/node/node_server.py | 8 ++++++- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/edgefl/platform_components/node/node.py b/edgefl/platform_components/node/node.py index 796ff78..5dbdbf6 100644 --- a/edgefl/platform_components/node/node.py +++ b/edgefl/platform_components/node/node.py @@ -271,16 +271,12 @@ def train_model_params(self, aggregator_model_params_db_link, round_number, ip_p return f'{self.docker_file_write_destination}/{index}/{file}' return file_name - - # DFL + ''' + dfl_train_model_params() + - + ''' def dfl_train_model_params(self, round_number, index): - self.logger.debug(f"[{index}] in train_model_params for round {round_number}") - - weights = self.data_handlers[index].get_weights() - - if round_number == 1: - # Update model with weights - self.data_handlers[index].update_model(weights) + self.logger.debug(f"[{index}] in dfl_train_model_params for round {round_number}") # Train model # model_update = self.local_training_handler.train({}) @@ -303,7 +299,10 @@ def dfl_train_model_params(self, round_number, index): return f'{self.docker_file_write_destination}/{index}/{file}' return file_name - # DFL + ''' + listen_for_update_dfl(self, min_params, round_number, index): + - helper for a singular training node to asynchronously poll the blockchain for other training nodes' model params/weights + ''' async def listen_for_update_dfl(self, min_params, round_number, index): logger = self.logger decoded_params = {} @@ -350,7 +349,7 @@ async def listen_for_update_dfl(self, min_params, round_number, index): await sleep(2) - # DFL + # same as in aggregator.py def fetch_decoded_params(self, decoded_params_dict, node_param_download_links, ip_ports, index): # use the node_param_download_links to get all the file # in the form of tuples, like ["('blobs_admin', 'node_model_updates', '1-replica-node1.pkl')"] diff --git a/edgefl/platform_components/node/node_server.py b/edgefl/platform_components/node/node_server.py index fe8c005..5dcdc99 100644 --- a/edgefl/platform_components/node/node_server.py +++ b/edgefl/platform_components/node/node_server.py @@ -230,8 +230,14 @@ def run_dfl_training_loop(node_instance, index, max_rounds=10, min_params=2): try: node_instance.logger.info(f"[{index}] [Round {current_round}] Starting decentralized training") + #0. for first round, just update with default weights + if current_round == 1: + weights = node_instance.data_handlers[index].get_weights() + # Update model with weights + node_instance.data_handlers[index].update_model(weights) + # 1. Get peer models (skip on round 1) then their weights - if current_round > 1: + elif current_round > 1: node_instance.logger.info(f"[{index}] [Round {current_round}] Waiting for {min_params} peer models...") # asynchronously poll from blockchain to get other training nodes' model weights peer_params = asyncio.run(node_instance.listen_for_update_dfl(min_params, current_round - 1, index)) From bcfced916d138fa0949691a629692e5cfaa8ed1a Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Thu, 5 Jun 2025 03:08:34 -0700 Subject: [PATCH 5/8] fully offload from aggregator server to ensure actual DFl --- .../aggregator/aggregator_server.py | 11 +- .../platform_components/node/node_server.py | 105 +++++++++++++++--- 2 files changed, 90 insertions(+), 26 deletions(-) diff --git a/edgefl/platform_components/aggregator/aggregator_server.py b/edgefl/platform_components/aggregator/aggregator_server.py index d922c0c..1a4c099 100644 --- a/edgefl/platform_components/aggregator/aggregator_server.py +++ b/edgefl/platform_components/aggregator/aggregator_server.py @@ -55,7 +55,6 @@ class InitRequest(BaseModel): module: str module_file: str db_name: str - training_method: str = "CFL" class TrainingRequest(BaseModel): totalRounds: int @@ -86,7 +85,6 @@ def init(request: InitRequest): node_urls, index = request.nodeUrls, request.index module_name, module_file = request.module, request.module_file db_name = request.db_name - training_method = request.training_method # Verify filepath exists module_path = os.path.join(aggregator.training_app_dir, module_file) @@ -100,7 +98,7 @@ def init(request: InitRequest): if not index in aggregator.round_number: aggregator.round_number[index] = 1 - initialize_nodes(node_urls, index, module_name, module_path, db_name, training_method) + initialize_nodes(node_urls, index, module_name, module_path, db_name) aggregator.set_module_at_index(index, module_name, module_path) aggregator.initialize_index_on_blockchain(index, module_name, module_path, db_name) @@ -136,7 +134,7 @@ def is_node_online(node_url: str): except requests.exceptions.RequestException: return False -def initialize_nodes(node_urls: list[str], index, module_name, module_path, db_name, training_method): +def initialize_nodes(node_urls: list[str], index, module_name, module_path, db_name): """Send the deployed contract address to multiple node servers.""" def init_node(node_url: str): try: @@ -172,8 +170,7 @@ def init_node(node_url: str): 'round_number': aggregator.round_number[index], 'module_name': module_name, 'module_path': module_path, - 'db_name': db_name, - 'training_method': training_method + 'db_name': db_name }) # init end_round @@ -279,8 +276,6 @@ async def init_training(request: TrainingRequest): def start_training(aggregator, initial_params, starting_round, end_round, index): try: - logger.info("[AGGREGATOR_SERVER.py] inside start_training!!") - aggregator.end_round[index] = end_round # for r in range(starting_round, end_round + 1): while starting_round <= aggregator.end_round[index]: diff --git a/edgefl/platform_components/node/node_server.py b/edgefl/platform_components/node/node_server.py index 5dcdc99..3be0bc6 100644 --- a/edgefl/platform_components/node/node_server.py +++ b/edgefl/platform_components/node/node_server.py @@ -64,6 +64,85 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) +# self initialization for DFL (no reliance on aggregator server) +class SelfInitRequest(BaseModel): + index: str + replica_name: str + module_name: str + module_path: str + db_name: str + min_params: int = 2 + max_rounds: int = 10 + +@app.post('/self-init') +def self_init(request: SelfInitRequest): + global node_instance, listener_thread, stop_listening_thread + try: + ip = get_local_ip() + + port = os.getenv("PORT") + replica_name = request.replica_name + index = request.index + module_name = request.module_name + module_path = request.module_path + min_params = request.min_params + max_rounds = request.max_rounds + + db_name = request.db_name # testing winniio_fl + mnist_fl DBs + + # Connect to DB if it's not in the EdgeLake node + if db_name not in db_list: + connect_to_db(edgelake_node_url, db_name, db_user, db_password, db_host, db_port) + db_list.add(db_name) + + # Fetch and check for existing data + query = f"sql {db_name} SELECT * FROM node_{replica_name} LIMIT 1" + check_data = fetch_data_from_db(edgelake_node_url, query) + if not check_data: + raise ValueError(f"No data found in the database: {db_name}.") + + # Instantiate the Node class + logger.info(f"{replica_name} before initialized") + if not node_instance: + node_instance = Node(replica_name, ip, port, logger) + + if index not in node_instance.databases: + node_instance.databases[index] = db_name + + node_instance.initialize_specific_node_on_index(index, module_name, module_path) + node_instance.round_number[index] = 1 + + logger.info(f"{replica_name} successfully initialized for ({index})") + + logger.info(f"[{index}] Node {replica_name} using DFL mode") + listener_thread = threading.Thread( + name=f"{replica_name}--DFL-{index}", + target=run_dfl_training_loop, + args=(node_instance, index, max_rounds, min_params), + daemon=True + ) + listener_thread.start() + + return { + 'status': 'success', + 'message': 'Node initialized successfully' + } + except ValueError as e: + raise ValueError( + f"No data found in the database: {db_name}" + ) + except HTTPException as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"/init-node - {str(e)}" + ) + except ConnectionError as e: + raise ConnectionError( + f"Unable to access the database tables: {str(e)}" + ) +# -- end self init --- + + class InitNodeRequest(BaseModel): replica_name: str @@ -74,7 +153,6 @@ class InitNodeRequest(BaseModel): module_name: str module_path: str db_name: str - training_method: str = "CFL" # Default to centralized @app.post('/init-node') @@ -89,7 +167,6 @@ def init_node(request: InitNodeRequest): index = request.replica_index module_name = request.module_name module_path = request.module_path - training_method = request.training_method db_name = request.db_name # testing winniio_fl + mnist_fl DBs @@ -122,22 +199,13 @@ def init_node(request: InitNodeRequest): # print(f"starting round numbers: {node_instance.round_number}") # print(f"training apps: {node_instance.data_handlers}") - if training_method == "DFL": - logger.info(f"[{index}] Node {replica_name} using DFL mode") - listener_thread = threading.Thread( - name=f"{replica_name}--DFL-{index}", - target=run_dfl_training_loop, - args=(node_instance, index), - daemon=True - ) - else: # CFL - logger.info(f"[{index}] Node {replica_name} using CFL mode") - listener_thread = threading.Thread( - name=f"{replica_name}--CFL-{index}", - target=listen_for_start_round, - args=(node_instance, index, lambda: stop_listening_thread), - daemon=True - ) + # Start event listener for start round + listener_thread = threading.Thread( + name=f"{replica_name}--{index}", + target=listen_for_start_round, + args=(node_instance, index, lambda: stop_listening_thread) + ) + listener_thread.daemon = True # Make thread daemon so it exits when main thread exits listener_thread.start() return { @@ -158,6 +226,7 @@ def init_node(request: InitNodeRequest): f"Unable to access the database tables: {str(e)}" ) + ''' /receive_data [POST] (data) - Endpoint to receive data block from the simulated data stream From e13de8a29f721c51f5aee7479cec6cf508b39846 Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Thu, 5 Jun 2025 18:16:31 -0700 Subject: [PATCH 6/8] copy over module file logic from agg to fix dockerize apis issue; DFL status --> edge inf working, dockerized DFL working --- edgefl/platform_components/node/node_server.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/edgefl/platform_components/node/node_server.py b/edgefl/platform_components/node/node_server.py index 3be0bc6..9cc61f8 100644 --- a/edgefl/platform_components/node/node_server.py +++ b/edgefl/platform_components/node/node_server.py @@ -69,7 +69,7 @@ class SelfInitRequest(BaseModel): index: str replica_name: str module_name: str - module_path: str + module_file: str db_name: str min_params: int = 2 max_rounds: int = 10 @@ -84,12 +84,17 @@ def self_init(request: SelfInitRequest): replica_name = request.replica_name index = request.index module_name = request.module_name - module_path = request.module_path + module_file = request.module_file min_params = request.min_params max_rounds = request.max_rounds db_name = request.db_name # testing winniio_fl + mnist_fl DBs + # Verify filepath exists + module_path = os.path.join(os.getenv('TRAINING_APPLICATION_DIR'), module_file) + if not os.path.exists(os.path.join(os.getenv("GITHUB_DIR"), module_path)): + raise FileNotFoundError(f"Module '{module_file}' does not exist within the given path: '{module_path}'.") + # Connect to DB if it's not in the EdgeLake node if db_name not in db_list: connect_to_db(edgelake_node_url, db_name, db_user, db_password, db_host, db_port) @@ -113,6 +118,12 @@ def self_init(request: SelfInitRequest): node_instance.round_number[index] = 1 logger.info(f"{replica_name} successfully initialized for ({index})") + # logger.info(f"indexes: {node_instance.indexes}") + # logger.info(f"module names: {node_instance.module_names}") + # logger.info(f"module paths: {node_instance.module_paths}") + # logger.info(f"starting round numbers: {node_instance.round_number}") + # logger.info(f"training apps: {node_instance.data_handlers}") + logger.info(f"[{index}] Node {replica_name} using DFL mode") listener_thread = threading.Thread( From 215181f8cffb6c13787e11fe16c687309d35e8cc Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Thu, 5 Jun 2025 19:05:34 -0700 Subject: [PATCH 7/8] DFL readme addition; other tiny readme fixes --- README.md | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f3a0700..ea11e73 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,8 @@ the listed variables in the following files `mnist1.env`, `mnist2.env`, `mnist3. Note that you do not need to change the ports, they're preconfigured to work. -In addition, update the IP address in for the `EXTERNAL_TCP_IP_PORT` and `EXTERNAL_IP` in the `mnist-agg.env` file. +In addition, update the IP address in for the `EXTERNAL_TCP_IP_PORT` and `EXTERNAL_IP` in the `mnist-agg.env` file. +Double check also to make sure that your `GITHUB_DIR` is pointing the right directory. Now we are ready to start the simulation. @@ -173,7 +174,7 @@ curl -X POST http://localhost:8080/start-training \ ``` `totalRounds` defines how many continuous rounds to train for. `minParams` defines how many parameters -the aggregator should wait for before starting the next round. +the aggregator should wait for before starting the next round. `index` defines the specific model that is being trained (Note: you may run multiple models/indices on a singular training node, but make sure to wait for initialization before sending a new "init" to ensure no data overwrites). At any point during the training process, you can add additional nodes to the process by calling initialization again on the new nodes (must use the same `index`) and `minParams` will be dynamically adjusted as necessary. @@ -461,14 +462,29 @@ To take down the containers, simply run: docker compose down ``` +## Decentralized Federated Learning +[Decentralized Federated Learning](https://ieeexplore.ieee.org/document/10743046) is a more secure approach to the basic “centralized” federated learning approach (as seen above), where there is no aggregator node anymore, but rather, each training node acts as their OWN aggregator. The way it works is that each training node waits for a `minParams` number of nodes before it performs the aggregation itself (rather than having one global aggregator do it for all of them). With centralized FL, having an aggregator poses security concerns and lack of reliance since the data is aggregated only with one server. With DFL, having training nodes do aggregation themselves allows for fault-tolerance in case any of them fail. +Example: With node 1, node 2, and node 3, all three first publish their initial params to the blockchain, assuming `minParams` = 2. Node 1 will wait for node 2 and node 3’s initial params, and once fetched, it will aggregate, and node 1 will then publish those model params to the blockchain. Node 2 will do the same, but for node 1 and node 3, and so forth. +Instead of starting the aggregator server (8080), you only need to initialize the training nodes. Example with training node 1: +``` +curl -X POST http://localhost:8081/self-init -H "Content-Type: application/json" -d '{ + "index": "test-index", + "replica_name": "node1", + "module_name": "MnistDataHandler", + "module_path": "custom_data_handler.py", + "db_name": "mnist_fl", + "min_params": 2, + "max_rounds": 10 +}' +``` +If using three training nodes, you want to run these commands three times, focusing on each node (change the replica name and port). Be sure that across all three curls, `min_params` and `max_rounds` are the same. - - +_Note: DFL is still a work in progress for smaller features. Currently supports: dockerization of APIs, edge inference, running multiple models simultaneously; Not supported: updating minParams, continued training_ From e411d63ac02c5571dcdbb1326ba634776d5fc72a Mon Sep 17 00:00:00 2001 From: Animesh Tiwary Date: Thu, 5 Jun 2025 19:09:39 -0700 Subject: [PATCH 8/8] fix dfl command header --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index ea11e73..0a524ec 100644 --- a/README.md +++ b/README.md @@ -475,12 +475,11 @@ curl -X POST http://localhost:8081/self-init -H "Content-Type: application/json" "index": "test-index", "replica_name": "node1", "module_name": "MnistDataHandler", - "module_path": "custom_data_handler.py", + "module_file": "custom_data_handler.py", "db_name": "mnist_fl", "min_params": 2, "max_rounds": 10 }' - ``` If using three training nodes, you want to run these commands three times, focusing on each node (change the replica name and port). Be sure that across all three curls, `min_params` and `max_rounds` are the same.