diff --git a/EdgeLake/docker_makefile/edgelake_operator1.env b/EdgeLake/docker_makefile/edgelake_operator1.env index e12e6ad..f388c30 100644 --- a/EdgeLake/docker_makefile/edgelake_operator1.env +++ b/EdgeLake/docker_makefile/edgelake_operator1.env @@ -28,7 +28,7 @@ DB_USER="demo" # Password correlated to database user DB_PASSWD="passwd" # Database IP address -DB_IP=192.168.1.125 +DB_IP=192.168.108.182 # Database port number DB_PORT=5432 # Whether to set autocommit data diff --git a/EdgeLake/docker_makefile/edgelake_operator2.env b/EdgeLake/docker_makefile/edgelake_operator2.env index 3079e9c..dfb5422 100644 --- a/EdgeLake/docker_makefile/edgelake_operator2.env +++ b/EdgeLake/docker_makefile/edgelake_operator2.env @@ -28,7 +28,7 @@ DB_USER="demo" # Password correlated to database user DB_PASSWD="passwd" # Database IP address -DB_IP=192.168.1.125 +DB_IP=192.168.108.182 # Database port number DB_PORT=5432 # Whether to set autocommit data diff --git a/EdgeLake/docker_makefile/edgelake_operator3.env b/EdgeLake/docker_makefile/edgelake_operator3.env index 858bad4..3ea211e 100644 --- a/EdgeLake/docker_makefile/edgelake_operator3.env +++ b/EdgeLake/docker_makefile/edgelake_operator3.env @@ -28,7 +28,7 @@ DB_USER="demo" # Password correlated to database user DB_PASSWD="passwd" # Database IP address -DB_IP=192.168.1.125 +DB_IP=192.168.108.182 # Database port number DB_PORT=5432 # Whether to set autocommit data diff --git a/README.md b/README.md index f3a0700..0a524ec 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,8 @@ the listed variables in the following files `mnist1.env`, `mnist2.env`, `mnist3. Note that you do not need to change the ports, they're preconfigured to work. -In addition, update the IP address in for the `EXTERNAL_TCP_IP_PORT` and `EXTERNAL_IP` in the `mnist-agg.env` file. +In addition, update the IP address in for the `EXTERNAL_TCP_IP_PORT` and `EXTERNAL_IP` in the `mnist-agg.env` file. +Double check also to make sure that your `GITHUB_DIR` is pointing the right directory. Now we are ready to start the simulation. @@ -173,7 +174,7 @@ curl -X POST http://localhost:8080/start-training \ ``` `totalRounds` defines how many continuous rounds to train for. `minParams` defines how many parameters -the aggregator should wait for before starting the next round. +the aggregator should wait for before starting the next round. `index` defines the specific model that is being trained (Note: you may run multiple models/indices on a singular training node, but make sure to wait for initialization before sending a new "init" to ensure no data overwrites). At any point during the training process, you can add additional nodes to the process by calling initialization again on the new nodes (must use the same `index`) and `minParams` will be dynamically adjusted as necessary. @@ -461,14 +462,28 @@ To take down the containers, simply run: docker compose down ``` +## Decentralized Federated Learning +[Decentralized Federated Learning](https://ieeexplore.ieee.org/document/10743046) is a more secure approach to the basic “centralized” federated learning approach (as seen above), where there is no aggregator node anymore, but rather, each training node acts as their OWN aggregator. The way it works is that each training node waits for a `minParams` number of nodes before it performs the aggregation itself (rather than having one global aggregator do it for all of them). With centralized FL, having an aggregator poses security concerns and lack of reliance since the data is aggregated only with one server. With DFL, having training nodes do aggregation themselves allows for fault-tolerance in case any of them fail. +Example: With node 1, node 2, and node 3, all three first publish their initial params to the blockchain, assuming `minParams` = 2. Node 1 will wait for node 2 and node 3’s initial params, and once fetched, it will aggregate, and node 1 will then publish those model params to the blockchain. Node 2 will do the same, but for node 1 and node 3, and so forth. +Instead of starting the aggregator server (8080), you only need to initialize the training nodes. Example with training node 1: +``` +curl -X POST http://localhost:8081/self-init -H "Content-Type: application/json" -d '{ + "index": "test-index", + "replica_name": "node1", + "module_name": "MnistDataHandler", + "module_file": "custom_data_handler.py", + "db_name": "mnist_fl", + "min_params": 2, + "max_rounds": 10 +}' +``` +If using three training nodes, you want to run these commands three times, focusing on each node (change the replica name and port). Be sure that across all three curls, `min_params` and `max_rounds` are the same. - - - +_Note: DFL is still a work in progress for smaller features. Currently supports: dockerization of APIs, edge inference, running multiple models simultaneously; Not supported: updating minParams, continued training_ diff --git a/edgefl/env_files/mnist/mnist-agg.env b/edgefl/env_files/mnist/mnist-agg.env index ca93a8a..1221bed 100644 --- a/edgefl/env_files/mnist/mnist-agg.env +++ b/edgefl/env_files/mnist/mnist-agg.env @@ -2,15 +2,15 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir/ # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32049" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32048" +EXTERNAL_IP="192.168.108.182:32049" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32048" AGG_NAME=agg @@ -18,7 +18,7 @@ AGG_NAME=agg PSQL_DB_NAME="mnist_fl" PSQL_DB_USER="demo" PSQL_DB_PASSWORD="passwd" -PSQL_HOST="192.168.56.1" +PSQL_HOST="192.168.1.125" PSQL_PORT="5432" FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/env_files/mnist/mnist1.env b/edgefl/env_files/mnist/mnist1.env index 5c0e084..b13f142 100644 --- a/edgefl/env_files/mnist/mnist1.env +++ b/edgefl/env_files/mnist/mnist1.env @@ -1,21 +1,21 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir/ # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32149" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32148" +EXTERNAL_IP="192.168.108.182:32149" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32148" # LOCAL PSQL DB NAME PSQL_DB_NAME=mnist_fl PSQL_DB_USER=demo PSQL_DB_PASSWORD=passwd -PSQL_HOST=192.168.56.1 +PSQL_HOST=192.168.108.182 PSQL_PORT=5432 FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/env_files/mnist/mnist2.env b/edgefl/env_files/mnist/mnist2.env index 025fbf8..7073039 100644 --- a/edgefl/env_files/mnist/mnist2.env +++ b/edgefl/env_files/mnist/mnist2.env @@ -2,21 +2,21 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir/ # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32249" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32248" +EXTERNAL_IP="192.168.108.182:32249" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32248" # LOCAL PSQL DB NAME PSQL_DB_NAME="mnist_fl" PSQL_DB_USER="demo" PSQL_DB_PASSWORD="passwd" -PSQL_HOST="172.30.176.90" +PSQL_HOST="192.168.108.182" PSQL_PORT="5432" FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/env_files/mnist/mnist3.env b/edgefl/env_files/mnist/mnist3.env index 3f0df9e..59734a1 100644 --- a/edgefl/env_files/mnist/mnist3.env +++ b/edgefl/env_files/mnist/mnist3.env @@ -2,21 +2,21 @@ ### Note that paths are for Mac/Linux. If using Windows provide your Windows path ### an confirm that the other paths are correct. You may need to make '/' to '\' -GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D +GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers MODULE_NAME=MnistDataHandler TMP_DIR=edgefl/tmp_dir # External IP Address for CURL commands to Edgelake -EXTERNAL_IP="192.168.1.125:32349" -EXTERNAL_TCP_IP_PORT="192.168.1.125:32348" +EXTERNAL_IP="192.168.108.182:32349" +EXTERNAL_TCP_IP_PORT="192.168.108.182:32348" # LOCAL PSQL DB NAME PSQL_DB_NAME="mnist_fl" PSQL_DB_USER="demo" PSQL_DB_PASSWORD="passwd" -PSQL_HOST="172.30.176.90" +PSQL_HOST="192.168.108.182" PSQL_PORT="5432" FILE_WRITE_DESTINATION="edgefl/file_write" diff --git a/edgefl/platform_components/node/node.py b/edgefl/platform_components/node/node.py index 02e0397..5dbdbf6 100644 --- a/edgefl/platform_components/node/node.py +++ b/edgefl/platform_components/node/node.py @@ -7,6 +7,7 @@ import logging import os import pickle +import requests from asyncio import sleep # import keras @@ -19,6 +20,7 @@ from platform_components.EdgeLake_functions.mongo_file_store import read_file, write_file, copy_file_from_container from platform_components.lib.logger.logger_config import configure_logging from platform_components.helpers.LoadClassFromFile import load_class_from_file +from platform_components.lib.modules.local_model_update import LocalModelUpdate from dotenv import load_dotenv load_dotenv() @@ -269,6 +271,129 @@ def train_model_params(self, aggregator_model_params_db_link, round_number, ip_p return f'{self.docker_file_write_destination}/{index}/{file}' return file_name + ''' + dfl_train_model_params() + - + ''' + def dfl_train_model_params(self, round_number, index): + self.logger.debug(f"[{index}] in dfl_train_model_params for round {round_number}") + + # Train model + # model_update = self.local_training_handler.train({}) + # print(f"[INFO] [{index}][Round {round_number}] ========== Model training progress ==========") + model_params = self.data_handlers[index].train(round_number) + self.logger.info(f"[{index}][Round {round_number}] Step 2 Complete: Model training done") + + # Save and return new weights + encoded_params = self.encode_model(model_params) + file = f"{round_number}-replica-{self.replica_name}.pkl" + # make sure directory exists + os.makedirs(os.path.dirname(f"{self.file_write_destination}/{index}/"), exist_ok=True) + file_name = f"{self.file_write_destination}/{index}/{file}" + with open(f"{file_name}", "wb") as f: + f.write(encoded_params) + + if self.docker_running: + self.logger.debug(f'[{index}] written to container at {f"{self.docker_file_write_destination}/{index}/{file}"}') + copy_file_to_container(os.path.join(self.tmp_dir, index), self.docker_container_name, file_name, f"{self.docker_file_write_destination}/{index}/{file}") + return f'{self.docker_file_write_destination}/{index}/{file}' + return file_name + + ''' + listen_for_update_dfl(self, min_params, round_number, index): + - helper for a singular training node to asynchronously poll the blockchain for other training nodes' model params/weights + ''' + async def listen_for_update_dfl(self, min_params, round_number, index): + logger = self.logger + decoded_params = {} + + while True: + try: + headers = { + 'User-Agent': 'AnyLog/1.23', + 'command': f'blockchain get {index}-a{round_number}' + } + response = requests.get(self.edgelake_node_url, headers=headers) + response.raise_for_status() + results = response.json() + + node_params_links = [] + ip_ports = [] + + # for item in results: + # key = f"{index}-a{round_number}" + # if key in item and item[key]['node'] != self.replica_name: # skip own model + # node_params_links.append(item[key]['trained_params_local_path']) + # ip_ports.append(item[key]['ip_port']) + for item in results: + key = f"{index}-a{round_number}" + if key in item: + source_node = item[key]['node'] + if source_node != self.replica_name: + link = item[key]['trained_params_local_path'] + node_params_links.append(link) + ip_ports.append(item[key]['ip_port']) + logger.info( + f"[{index}] [Round {round_number}] Found peer model from {source_node} at {link}") + else: + logger.debug(f"[{index}] Skipping own model {source_node}") + + # Fetch and decode + self.fetch_decoded_params(decoded_params, node_params_links, ip_ports, index) + + if len(decoded_params) >= min_params: + return list(decoded_params.values()) + + except Exception as e: + logger.error(f"[{index}] DFL listen error: {str(e)}") + + await sleep(2) + + # same as in aggregator.py + def fetch_decoded_params(self, decoded_params_dict, node_param_download_links, ip_ports, index): + # use the node_param_download_links to get all the file + # in the form of tuples, like ["('blobs_admin', 'node_model_updates', '1-replica-node1.pkl')"] + # node_ref = db.reference('node_model_updates') + + # Loop through each provided download link to retrieve node parameter objects + for i, path in enumerate(node_param_download_links): + # Don't fetch for existing paths + if path in decoded_params_dict: + continue + + try: + # Make sure the directory exists + filename = path.split('/')[-1] + local_path = f'{self.file_write_destination}/{index}/{filename}' + if self.docker_running: + docker_file_path = f'{self.docker_file_write_destination}/{index}/{filename}' + response = read_file(self.edgelake_node_url, path, + docker_file_path, ip_ports[i]) + copy_file_from_container(os.path.join(self.tmp_dir, index), self.docker_container_name, + docker_file_path, + local_path) + else: + response = read_file(self.edgelake_node_url, path, + local_path, ip_ports[i]) + + if response.status_code != 200: + raise ValueError( + f"Failed to retrieve node params from link: {filename}. HTTP Status: {response.status_code}" + ) + + # Decode the model weights from the file + sleep(1) + with open(local_path, 'rb') as f: + data = pickle.load(f) + if not data: + raise ValueError(f"Missing model_weights in data from file: {filename}") + + decoded_params_dict[path] = LocalModelUpdate(weights=data) + + except Exception as e: + self.logger.error(f"Error retrieving data from link {filename}: {str(e)}") + continue + def encode_model(self, model_update): serialized_data = pickle.dumps(model_update) return serialized_data diff --git a/edgefl/platform_components/node/node_server.py b/edgefl/platform_components/node/node_server.py index 3a075e9..9cc61f8 100644 --- a/edgefl/platform_components/node/node_server.py +++ b/edgefl/platform_components/node/node_server.py @@ -18,6 +18,7 @@ import argparse import requests import warnings +import asyncio from uvicorn import run from fastapi import FastAPI, HTTPException, status @@ -63,6 +64,96 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) +# self initialization for DFL (no reliance on aggregator server) +class SelfInitRequest(BaseModel): + index: str + replica_name: str + module_name: str + module_file: str + db_name: str + min_params: int = 2 + max_rounds: int = 10 + +@app.post('/self-init') +def self_init(request: SelfInitRequest): + global node_instance, listener_thread, stop_listening_thread + try: + ip = get_local_ip() + + port = os.getenv("PORT") + replica_name = request.replica_name + index = request.index + module_name = request.module_name + module_file = request.module_file + min_params = request.min_params + max_rounds = request.max_rounds + + db_name = request.db_name # testing winniio_fl + mnist_fl DBs + + # Verify filepath exists + module_path = os.path.join(os.getenv('TRAINING_APPLICATION_DIR'), module_file) + if not os.path.exists(os.path.join(os.getenv("GITHUB_DIR"), module_path)): + raise FileNotFoundError(f"Module '{module_file}' does not exist within the given path: '{module_path}'.") + + # Connect to DB if it's not in the EdgeLake node + if db_name not in db_list: + connect_to_db(edgelake_node_url, db_name, db_user, db_password, db_host, db_port) + db_list.add(db_name) + + # Fetch and check for existing data + query = f"sql {db_name} SELECT * FROM node_{replica_name} LIMIT 1" + check_data = fetch_data_from_db(edgelake_node_url, query) + if not check_data: + raise ValueError(f"No data found in the database: {db_name}.") + + # Instantiate the Node class + logger.info(f"{replica_name} before initialized") + if not node_instance: + node_instance = Node(replica_name, ip, port, logger) + + if index not in node_instance.databases: + node_instance.databases[index] = db_name + + node_instance.initialize_specific_node_on_index(index, module_name, module_path) + node_instance.round_number[index] = 1 + + logger.info(f"{replica_name} successfully initialized for ({index})") + # logger.info(f"indexes: {node_instance.indexes}") + # logger.info(f"module names: {node_instance.module_names}") + # logger.info(f"module paths: {node_instance.module_paths}") + # logger.info(f"starting round numbers: {node_instance.round_number}") + # logger.info(f"training apps: {node_instance.data_handlers}") + + + logger.info(f"[{index}] Node {replica_name} using DFL mode") + listener_thread = threading.Thread( + name=f"{replica_name}--DFL-{index}", + target=run_dfl_training_loop, + args=(node_instance, index, max_rounds, min_params), + daemon=True + ) + listener_thread.start() + + return { + 'status': 'success', + 'message': 'Node initialized successfully' + } + except ValueError as e: + raise ValueError( + f"No data found in the database: {db_name}" + ) + except HTTPException as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"/init-node - {str(e)}" + ) + except ConnectionError as e: + raise ConnectionError( + f"Unable to access the database tables: {str(e)}" + ) +# -- end self init --- + + class InitNodeRequest(BaseModel): replica_name: str @@ -146,6 +237,7 @@ def init_node(request: InitNodeRequest): f"Unable to access the database tables: {str(e)}" ) + ''' /receive_data [POST] (data) - Endpoint to receive data block from the simulated data stream @@ -210,6 +302,54 @@ def listen_for_start_round(nodeInstance, index, stop_event): logger.error(f"[{index}] Error in listener thread: {str(e)}") time.sleep(2) +# DFL +def run_dfl_training_loop(node_instance, index, max_rounds=10, min_params=2): + current_round = node_instance.round_number.get(index, 1) + + while current_round <= max_rounds: + try: + node_instance.logger.info(f"[{index}] [Round {current_round}] Starting decentralized training") + + #0. for first round, just update with default weights + if current_round == 1: + weights = node_instance.data_handlers[index].get_weights() + # Update model with weights + node_instance.data_handlers[index].update_model(weights) + + # 1. Get peer models (skip on round 1) then their weights + elif current_round > 1: + node_instance.logger.info(f"[{index}] [Round {current_round}] Waiting for {min_params} peer models...") + # asynchronously poll from blockchain to get other training nodes' model weights + peer_params = asyncio.run(node_instance.listen_for_update_dfl(min_params, current_round - 1, index)) + node_instance.logger.info(f"[{index}] [Round {current_round}] Retrieved {len(peer_params)} peer models") + # combine the fetched training nodes' params + agg_weights = node_instance.data_handlers[index].aggregate_model_weights(peer_params) + node_instance.logger.info(f"[{index}] [Round {current_round}] Aggregated weights from peers") + # update the current nodes' model + node_instance.data_handlers[index].update_model(agg_weights) + node_instance.logger.info(f"[{index}] [Round {current_round}] Done updating the model weights!") + + # 2. Local training with the updated model + modelUpdate_metadata = node_instance.dfl_train_model_params( + round_number=current_round, + index=index + ) + node_instance.logger.info(f"[{index}] [Round {current_round}] Local training complete") + + # 3. Publish to blockchain so that other nodes can fetch + node_instance.add_node_params(current_round, modelUpdate_metadata, index) + node_instance.logger.info(f"[{index}] [Round {current_round}] Published model to blockchain") + + current_round += 1 + node_instance.round_number[index] = current_round + + time.sleep(2) # Optionally throttle to reduce load + except Exception as e: + node_instance.logger.error(f"[{index}] Error in DFL loop round {current_round}: {str(e)}") + time.sleep(5) + + + # TODO: move to a (helper) file (i.e. 'node_helpers.py'?) # Extracts initParams from the policy 'index-r' at the specified index def get_most_recent_agg_params(index):