From 464f07c023ab51e52ddeb30ad78051222496dc68 Mon Sep 17 00:00:00 2001 From: neabbas Date: Thu, 5 Dec 2024 11:41:57 -0800 Subject: [PATCH 1/4] works with custom model and datahandler, need to test to ensure it still works with mnist --- blockchain/.env | 18 ++-- blockchain/aggregator.py | 14 +-- blockchain/aggregator_server.py | 144 +++++++++++++++++++++++++--- blockchain/node.py | 161 ++++++++++++++++++++++++++------ blockchain/node_server.py | 20 ++-- 5 files changed, 293 insertions(+), 64 deletions(-) diff --git a/blockchain/.env b/blockchain/.env index bea9c6c..39a8b0e 100644 --- a/blockchain/.env +++ b/blockchain/.env @@ -1,11 +1,11 @@ # Firebase credentials file path -FIREBASE_CREDENTIALS=/Users/camillegandotra/Desktop/Anylog-Edgelake-CSE115D/blockchain/credentials/anylog-e398d-firebase-adminsdk-k9n6z-db9a2fff3e.json +FIREBASE_CREDENTIALS=C:/Users/nehab/anylog-edgelake-firebase-adminsdk-2v5t0-f8a3b0b914.json # Firebase database URL -DATABASE_URL=https://anylog-e398d-default-rtdb.firebaseio.com +DATABASE_URL=https://anylog-edgelake-default-rtdb.firebaseio.com/ # Ethereum provider URL -PROVIDER_URL=https://optimism-sepolia.infura.io/v3/524787abec0740b9a443cb825966c31e +PROVIDER_URL=https://optimism-sepolia.infura.io/v3/5c24484e6f08479ba04aa2d1359b0ef7 # Ethereum private key PRIVATE_KEY=f155acda1fc73fa6f50456545e3487b78fd517411708ffa1f67358c1d3d54977 @@ -14,10 +14,16 @@ PRIVATE_KEY=f155acda1fc73fa6f50456545e3487b78fd517411708ffa1f67358c1d3d54977 CONTRACT_ADDRESS=0xF21E95f39Ac900986c4D47Bb17De767d80451e3B # Node model definition path (PyTorch FL model) -MODEL_DEFINITION=/Users/camillegandotra/Desktop/Anylog-Edgelake-CSE115D/blockchain/configs/node/pytorch/pytorch_sequence.pt +MODEL_DEFINITION=C:/Users/nehab/Anylog-Edgelake-CSE115D/blockchain/configs/node/pytorch/pytorch_sequence.pt # Node dataset path -DATASET_PATH=/Users/camillegandotra/Desktop/Anylog-Edgelake-CSE115D/blockchain/data/mnist/data_party0.npz +DATASET_PATH=C:/Users/nehab/Anylog-Edgelake-CSE115D/blockchain/data/mnist/data_party0.npz # Contract ABI file path -CONTRACT_ABI_PATH=/Users/camillegandotra/Desktop/Anylog-Edgelake-CSE115D/blockchain/smart_contract/ModelParametersABI.json +CONTRACT_ABI_PATH=C:/Users/nehab/Anylog-Edgelake-CSE115D/blockchain/smart_contract/ModelParametersABI.json + +# same as master.env +EXTERNAL_IP=192.168.56.1 + +# command +# python -m edge_lake.edgelake "process C:\Users\nehab\Downloads\CSE-115D-start-script.al" \ No newline at end of file diff --git a/blockchain/aggregator.py b/blockchain/aggregator.py index 44d2671..192e5f9 100644 --- a/blockchain/aggregator.py +++ b/blockchain/aggregator.py @@ -11,8 +11,6 @@ from firebase_admin import credentials, db from ibmfl.model.model_update import ModelUpdate -CONTRACT_ADDRESS = "0x4ae311B85B017bf7EAa7a96D3109f58795F5F4BF" - load_dotenv() @@ -43,6 +41,7 @@ def start_round(self, initParamsLink, roundNumber): external_ip = os.getenv("EXTERNAL_IP") url = f'http://{external_ip}:32049' + # in khaled's: for node_num in range(1, int(minParams) + 1) headers = { 'User-Agent': 'AnyLog/1.23', 'Content-Type': 'text/plain', @@ -50,14 +49,13 @@ def start_round(self, initParamsLink, roundNumber): } # Format data exactly like the example curl command but with your values - # NOTE: ask why are we adding the node num from agg - data = f'''''' - print(f"Training initialized with {roundNumber} rounds") - response = requests.post(url, headers=headers, data=data) + print(response.status_code) if response.status_code == 200: return { 'status': 'success', @@ -96,12 +94,16 @@ def aggregate_model_params(self, node_param_download_links): raise ValueError(f"Error retrieving data from link {link}: {str(e)}") # do aggregation function here (doesn't return anything) + print("params to aggregate: ", decoded_params) self.fusion_model.update_weights(decoded_params) aggregate_params_weights = self.fusion_model.current_model_weights aggregate_model_update = ModelUpdate(weights=aggregate_params_weights) + if hasattr(aggregate_model_update, '__dict__'): + print("Attributes of aggregate_model_update:", vars(aggregate_model_update)) # Prints attributes in a dictionary form + # encode params back to string encoded_params = self.encode_params(aggregate_model_update) diff --git a/blockchain/aggregator_server.py b/blockchain/aggregator_server.py index ea74b1d..1ce1ffb 100644 --- a/blockchain/aggregator_server.py +++ b/blockchain/aggregator_server.py @@ -8,6 +8,15 @@ import requests import os +import torch + +import firebase_admin +from firebase_admin import credentials, db + +import base64 + +from ibmfl.model.pytorch_fl_model import PytorchFLModel + app = Flask(__name__) load_dotenv() @@ -25,10 +34,15 @@ -H "Content-Type: application/json" \ -d '{ "nodeUrls": [ - "http://localhost:8081", - "http://localhost:8082" + "http://localhost:8081" ], - "model_def": 1 + "model_path": "C:\\Users\\nehab\\cse115d\\testmodel.py", + "model_init_params": { "module__input_dim": 14 }, + "model_name": "custom_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", + "data_config": {"data": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv"} + }' ''' @@ -39,17 +53,115 @@ def deploy_contract(): try: data = request.json node_urls = data.get('nodeUrls', []) - model_def = data.get('model_def', 1) + + model_path = data.get('model_path', os.getenv('MODEL_PYTHON')) + model_init_params = data.get('model_init_params', None) + model_name = data.get('model_name', 'custom_test') + model_weights_path = data.get('model_weights_path') + + # upload model to firebase + firebase_model_path = f"/models/{model_name}" + pytorch_upload(model_path, model_init_params, model_name, firebase_model_path, model_weights_path) + + + data_handler_path = data.get('data_handler_path') + data_config = data.get('data_config') + + # upload datahandler to firebase + firebase_datahandler_path = f"/datahandlers/datahandler" + datahandler_upload(data_handler_path, data_config, firebase_datahandler_path) # Initialize the nodes and send the contract address - initialize_nodes(model_def, node_urls) + initialize_nodes(firebase_model_path, firebase_datahandler_path, node_urls) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 - return f"Initialized nodes with model definition: {model_def}", 200 - - -def initialize_nodes(model_def, node_urls): + return f"Initialized nodes with model definition: {model_path}", 200 + +# creates and uploads a model to firebase for the nodes to download +def pytorch_upload(model_file_path, model_init_params, model_name, firebase_model_path, model_weights_path): + + # Read the model class source code + with open(model_file_path, "r") as f: + model_source_code = f.read() + + # Dynamically load the model class + namespace = {} + exec(model_source_code, namespace) + + # Identify the model class dynamically-- this searches for all classes that are a subset of nn.Module + model_class = None + for obj_name, obj in namespace.items(): + if isinstance(obj, type) and issubclass(obj, torch.nn.Module) and obj != torch.nn.Module: + model_class = obj + break + + if model_class is None: + raise ValueError("No PyTorch model class found in the specified file.") + else: + print("Identified model class:", model_class) + + # initialize model + fl_model = PytorchFLModel( + model_name=model_name, + pytorch_module=model_class, + module_init_params=model_init_params, + ) + + # save current model weights (empty for now) + fl_model.save_model(filename=model_weights_path) + + # Encode the source code and model weights + encoded_source_code = encode_to_base64(model_source_code) + with open(model_weights_path, "rb") as f: + encoded_weights = encode_to_base64(f.read()) + + # define model info to upload + model_data = { + "source_code": encoded_source_code, + "weights": encoded_weights, + "init_params": model_init_params, + } + + firebase_model_ref = db.reference(firebase_model_path) + firebase_model_ref.set(model_data) + print(f"PytorchFLModel uploaded to Firebase Realtime Database at {firebase_model_path}.") + +# creates and uploads a datahandler for nodes to download +def datahandler_upload(datahandler_file_path, data_config, firebase_datahandler_path): + + # read source code + with open(datahandler_file_path, "r") as f: + datahandler_source_code = f.read() + + # Encode the source code and configuration + encoded_source_code = encode_to_base64(datahandler_source_code) + encoded_data_config = encode_to_base64(str(data_config)) + + # Prepare the data to upload + datahandler_data = { + "source_code": encoded_source_code, + "data_config": encoded_data_config, + } + + # upload to firebase + datahandler_firebase_ref = db.reference(firebase_datahandler_path) + datahandler_firebase_ref.set(datahandler_data) + print(f"DataHandler uploaded to Firebase Realtime Database at {firebase_datahandler_path}") + + +def encode_to_base64(data): + """Encode binary or text data to Base64.""" + if isinstance(data, bytes): + return base64.b64encode(data).decode("utf-8") + return base64.b64encode(data.encode("utf-8")).decode("utf-8") + +def decode_from_base64(data): + """Decode Base64 data to binary or text.""" + return base64.b64decode(data) + + +def initialize_nodes(firebase_model_path, firebase_datahandler_path, node_urls): """Send the deployed contract address to multiple node servers.""" for urlCount in range(len(node_urls)): try: @@ -58,7 +170,8 @@ def initialize_nodes(model_def, node_urls): response = requests.post(f'{url}/init-node', json={ 'replica_name': f"node{urlCount}", - 'model_def': model_def + 'firebase_model_path': firebase_model_path, + 'firebase_datahandler_path': firebase_datahandler_path }) # TODO: figure out how to handle response @@ -88,6 +201,7 @@ def initialize_nodes(model_def, node_urls): @app.route('/start-training', methods=['POST']) async def init_training(): """Start the training process by setting the number of rounds.""" + print('entered start_training endpoint') try: data = request.json num_rounds = data.get('totalRounds', 1) @@ -124,10 +238,10 @@ async def listen_for_update_agg(min_params, roundNumber): while True: try: - # Check parameter count + # Check parameter count for node responses count_response = requests.get(url, headers={ 'User-Agent': 'AnyLog/1.23', - "command": f"blockchain get a{roundNumber} count" + "command": f"blockchain get r{roundNumber} count" }) if count_response.status_code == 200: @@ -138,7 +252,7 @@ async def listen_for_update_agg(min_params, roundNumber): if count >= min_params: params_response = requests.get(url, headers={ 'User-Agent': 'AnyLog/1.23', - "command": f"blockchain get a{roundNumber}" + "command": f"blockchain get r{roundNumber}" }) if params_response.status_code == 200: @@ -146,9 +260,9 @@ async def listen_for_update_agg(min_params, roundNumber): if result and len(result) > 0: # Extract all trained_params into a list node_params_links = [ - item[f'a{roundNumber}']['trained_params'] + item[f'r{roundNumber}']['trained_params'] for item in result - if f'a{roundNumber}' in item + if f'r{roundNumber}' in item ] print(f"Collected trained_params links: {node_params_links}") # Debugging line diff --git a/blockchain/node.py b/blockchain/node.py index 65935c4..7aab9de 100644 --- a/blockchain/node.py +++ b/blockchain/node.py @@ -10,15 +10,21 @@ from ibmfl.util.data_handlers.mnist_pytorch_data_handler import MnistPytorchDataHandler from ibmfl.model.pytorch_fl_model import PytorchFLModel import requests +import torch +import time # import pathlib from dotenv import load_dotenv load_dotenv() +def decode_from_base64(data): + """Decode Base64 data to binary or text.""" + return base64.b64decode(data) + class Node: - def __init__(self, model_def, replica_name): + def __init__(self, firebase_model_path, firebase_datahandler_path, replica_name): print("Node initializing") @@ -38,31 +44,124 @@ def __init__(self, model_def, replica_name): self.currentRound = 1 - current_dir = os.path.dirname(os.path.abspath(__file__)) + # download model from firebase + fl_model = self.load_firebase_model(firebase_model_path); + # download datahandler from firebase + data_handler = self.load_firebase_datahandler(firebase_datahandler_path); + + # create the local training handler + self.local_training_handler = LocalTrainingHandler(fl_model=fl_model, data_handler=data_handler) - # USE MNIST DATASET FOR TESTING THIS FUNCTIONALITY - data_path = os.path.join(current_dir, "data", "mnist", "data_party0.npz") - data_config = { - "npz_file": str(data_path) - } # model_def == 1: PytorchFLModel - if model_def == 1: - model_path = os.path.join(current_dir, "configs", "node", "pytorch", "pytorch_sequence.pt") - - model_spec = { - "loss_criterion": "nn.NLLLoss", - "model_definition": str(model_path), - "model_name": "pytorch-nn", - "optimizer": "optim.Adadelta" - } - - fl_model = PytorchFLModel(model_name="pytorch-nn", model_spec=model_spec) - data_handler = MnistPytorchDataHandler(data_config=data_config) - self.local_training_handler = LocalTrainingHandler(fl_model=fl_model, data_handler=data_handler) + # if model_def == 1: + # model_path = os.path.join(current_dir, "configs", "node", "pytorch", "pytorch_sequence.pt") + + # model_spec = { + # "loss_criterion": "nn.NLLLoss", + # "model_definition": str(model_path), + # "model_name": "pytorch-nn", + # "optimizer": "optim.Adadelta" + # } + + # fl_model = PytorchFLModel(model_name="pytorch-nn", model_spec=model_spec) + # data_handler = MnistPytorchDataHandler(data_config=data_config) + # self.local_training_handler = LocalTrainingHandler(fl_model=fl_model, data_handler=data_handler) # add more model defs in elifs below # model_def == 2: Sklearn and so on + def load_firebase_model(self, firebase_model_path): + + # get model_data from firebase + firebase_model_ref = db.reference(firebase_model_path) + model_data = firebase_model_ref.get() + if model_data is None: + print("Error: No model data found in Firebase.") + return + + print('downloaded model_data') + + # derive fields, then code source code and weights + model_source_code = decode_from_base64(model_data["source_code"]).decode("utf-8") + model_weights = decode_from_base64(model_data["weights"]) + init_params = model_data["init_params"] + + print('derived fields') + + # Save the downloaded weights to a file-- necessary to load the model later + downloaded_weights_path = "downloaded_model_weights.pt" + with open(downloaded_weights_path, "wb") as f: + f.write(model_weights) + + print('saved weights to file') + + # Dynamically recreate the model class + downloaded_namespace = {} + exec(model_source_code, downloaded_namespace) + + # Identify the model class in the downloaded namespace + downloaded_model_class = None + for obj_name, obj in downloaded_namespace.items(): + if isinstance(obj, type) and issubclass(obj, torch.nn.Module) and obj != torch.nn.Module: + downloaded_model_class = obj + break + + if downloaded_model_class is None: + raise ValueError("No PyTorch model class found in the downloaded source code.") + else: + print("Recreated model class:", downloaded_model_class) + + # recreate the PytorchFLModel and load weights + fl_model = PytorchFLModel( + model_name="Pytorch_NN", + pytorch_module=downloaded_model_class, + module_init_params=init_params, + ) + fl_model.load_model( + pytorch_module=downloaded_model_class, + model_filename=downloaded_weights_path, + module_init_params=init_params, + ) + print("PytorchFLModel successfully reconstructed and loaded.") + return fl_model + + + def load_firebase_datahandler(self, firebase_datahandler_path): + + # get datahandler data from firebase + firebase_datahandler_ref = db.reference(firebase_datahandler_path) + datahandler_data = firebase_datahandler_ref.get() + if datahandler_data is None: + print("Error: No DataHandler data found in Firebase.") + return + + print('got datahandler data from firebase') + + # decode the source code and configuration + datahandler_source_code = decode_from_base64(datahandler_data["source_code"]).decode("utf-8") + data_config = eval(decode_from_base64(datahandler_data["data_config"]).decode("utf-8")) + + # dynamically recreate the DataHandler class + namespace = {} + exec(datahandler_source_code, namespace) + + # find the datahandler class, which must be a subclass of DataHandler + datahandler_class = None + for obj_name, obj in namespace.items(): + if isinstance(obj, type) and issubclass(obj, namespace.get('DataHandler', object)) and obj != namespace['DataHandler']: + datahandler_class = obj + break + + if datahandler_class is None: + raise ValueError("No DataHandler subclass found in the downloaded source code.") + else: + print("Recreated DataHandler class:", datahandler_class) + + # initialize the datahandler with the configuration + data_handler = datahandler_class(data_config=data_config) + print("DataHandler successfully reconstructed and initialized.") + return data_handler + ''' add_data_batch(data) - Adds passed in data to local storage @@ -79,8 +178,6 @@ def add_data_batch(self, data): ''' def add_node_params(self, round_number, newly_trained_params_db_link): - print("in add_node_params") - try: external_ip = os.getenv("EXTERNAL_IP") url = f'http://{external_ip}:32049' @@ -91,14 +188,14 @@ def add_node_params(self, round_number, newly_trained_params_db_link): 'command': 'blockchain insert where policy = !my_policy and local = true and blockchain = optimism' } - data = f'''''' + }} }}>''' - # print(f"Submitting results for round {round_number}") response = requests.post(url, headers=headers, data=data) - print(f"Results submitted for round {round_number} to {self.replicaName}") + print(f"{self.replicaName} has submitted results for round {round_number}") return { 'status': 'success', @@ -118,13 +215,18 @@ def add_node_params(self, round_number, newly_trained_params_db_link): ''' def train_model_params(self, aggregator_model_params_db_link, round_number): - print(f"in train_model_params for round {round_number}") + print(f"Training for round {round_number}") + + weights = '' # First round initialization if round_number == 1: + print('initializing weights, round1') weights = self.local_training_handler.fl_model.get_model_update() + print("round 1 weights", weights) else: try: + print('round1+, getting weights from aggregator') # Extract the key from the URL model_updates_key = aggregator_model_params_db_link.split('/')[-1].replace('.json', '') @@ -145,7 +247,8 @@ def train_model_params(self, aggregator_model_params_db_link, round_number): self.local_training_handler.update_model(weights) # Train model - self.local_training_handler.data_handler.load_dataset(nb_points=50) + # self.local_training_handler.data_handler.load_dataset(nb_points=50) + self.local_training_handler.data_handler.load_dataset() model_update = self.local_training_handler.train({}) # Save and return new weights @@ -155,6 +258,8 @@ def train_model_params(self, aggregator_model_params_db_link, round_number): 'model_update': encoded_params }) + print('pushed weights to db') + return f"{self.database_url}/node_model_updates/{data_pushed.key}.json" def encode_model(self, model_update): diff --git a/blockchain/node_server.py b/blockchain/node_server.py index 0a6eb21..72baa7e 100644 --- a/blockchain/node_server.py +++ b/blockchain/node_server.py @@ -65,11 +65,10 @@ def init_node(): """Receive the contract address from the aggregator server.""" global node_instance, listener_thread, stop_listening_thread try: - model_def = request.json.get('model_def', 1) replica_name = request.json.get('replica_name') + firebase_model_path = request.json.get('firebase_model_path') + firebase_datahandler_path = request.json.get('firebase_datahandler_path') - if not model_def: - return jsonify({'status': 'error', 'message': 'No config provided'}), 400 if listener_thread and listener_thread.is_alive(): stop_listening_thread = True listener_thread.join(timeout=1) @@ -78,7 +77,7 @@ def init_node(): stop_listening_thread = False # Instantiate the Node class - node_instance = Node(model_def, replica_name) + node_instance = Node(firebase_model_path, firebase_datahandler_path, replica_name) node_instance.currentRound = 1 print(f"{replica_name} successfully initialized") @@ -119,14 +118,15 @@ def listen_for_start_round(nodeInstance, stop_event): url = f'{external_ip}:32049' # next_round = nodeInstance.currentRound + 1 - print(f"listening for start round {nodeInstance.currentRound}") + print(f"Listening for start of round {nodeInstance.currentRound}") headers = { 'User-Agent': 'AnyLog/1.23', - 'command': f'blockchain get r{nodeInstance.currentRound}' + 'command': f'blockchain get a{nodeInstance.currentRound}' } response = requests.get(f'http://{url}', headers=headers) + # check if aggregator's params have been posted if response.status_code == 200: data = response.json() print(f"Response Data: {data}") # Debugging line to inspect the structure @@ -134,18 +134,20 @@ def listen_for_start_round(nodeInstance, stop_event): round_data = None for item in data: # Check if the key exists in the current dictionary - if f'r{nodeInstance.currentRound}' in item: - round_data = item[f'r{nodeInstance.currentRound}'] + if f'a{nodeInstance.currentRound}' in item: + round_data = item[f'a{nodeInstance.currentRound}'] break # Stop searching once the current round's data is found if round_data: print(f"Round Data: {round_data}") # Debugging line paramsLink = round_data.get('initParams', '') + # error here at modelUpdate = line modelUpdate = nodeInstance.train_model_params(paramsLink, nodeInstance.currentRound) + print(modelUpdate); nodeInstance.add_node_params(nodeInstance.currentRound, modelUpdate) nodeInstance.currentRound += 1 else: - print(f"No data found for round r{nodeInstance.currentRound}") + print(f"No aggregator parameters found for round {nodeInstance.currentRound}") time.sleep(2) # Poll every 2 seconds From 5e19734b3855eec099e189d7b259d9470c6523b1 Mon Sep 17 00:00:00 2001 From: neabbas Date: Thu, 5 Dec 2024 23:48:12 -0800 Subject: [PATCH 2/4] multi node custom model working, still working on mnist mods --- blockchain/aggregator.py | 54 ++++++++++++++------ blockchain/aggregator_server.py | 90 +++++++++++++++++++++++++++++++-- blockchain/node.py | 74 ++++++++++++++++++++------- blockchain/node_server.py | 33 ++++++++++-- 4 files changed, 209 insertions(+), 42 deletions(-) diff --git a/blockchain/aggregator.py b/blockchain/aggregator.py index 192e5f9..6cc0bf5 100644 --- a/blockchain/aggregator.py +++ b/blockchain/aggregator.py @@ -11,6 +11,8 @@ from firebase_admin import credentials, db from ibmfl.model.model_update import ModelUpdate +import time + load_dotenv() @@ -54,18 +56,38 @@ def start_round(self, initParamsLink, roundNumber): "initParams": "{initParamsLink}" }} }}>''' - response = requests.post(url, headers=headers, data=data) - print(response.status_code) - if response.status_code == 200: - return { - 'status': 'success', - 'message': 'initTraining called successfully' - } - else: - return { - 'status': 'error', - 'message': f'Request failed with status code: {response.status_code}' - } + retries = 0; + max_retries = 5; + while retries < max_retries: + response = requests.post(url, headers=headers, data=data) + if response.status_code == 200: + print(f"Aggregator has submitted parameters for round {roundNumber} to the blockchain.") + return { + 'status': 'success', + 'message': 'Aggregator model parameters added successfully' + } + else: + print(f"Failed to add aggregator params to blockchain. Response: {response}. Retrying ({retries + 1}/{max_retries})...") + retries += 1; + time.sleep(15); + + return { + 'status': 'error', + 'message': 'aggregator was unable to add to blockchain' + } + + # response = requests.post(url, headers=headers, data=data) + # print(response.status_code) + # if response.status_code == 200: + # return { + # 'status': 'success', + # 'message': 'initTraining called successfully' + # } + # else: + # return { + # 'status': 'error', + # 'message': f'Request failed with status code: {response.status_code}' + # } except Exception as e: return { @@ -94,16 +116,12 @@ def aggregate_model_params(self, node_param_download_links): raise ValueError(f"Error retrieving data from link {link}: {str(e)}") # do aggregation function here (doesn't return anything) - print("params to aggregate: ", decoded_params) self.fusion_model.update_weights(decoded_params) aggregate_params_weights = self.fusion_model.current_model_weights aggregate_model_update = ModelUpdate(weights=aggregate_params_weights) - if hasattr(aggregate_model_update, '__dict__'): - print("Attributes of aggregate_model_update:", vars(aggregate_model_update)) # Prints attributes in a dictionary form - # encode params back to string encoded_params = self.encode_params(aggregate_model_update) @@ -138,3 +156,7 @@ def decode_params(self, encoded_model_update): serialized_data = zlib.decompress(compressed_data) model_weights = pickle.loads(serialized_data) return model_weights + + def inference(self, model, data): + results = model.evaluate(data) + return results \ No newline at end of file diff --git a/blockchain/aggregator_server.py b/blockchain/aggregator_server.py index 1ce1ffb..b4660ff 100644 --- a/blockchain/aggregator_server.py +++ b/blockchain/aggregator_server.py @@ -28,7 +28,7 @@ aggregator = Aggregator(PROVIDER_URL, PRIVATE_KEY) ''' -CURL REQUEST FOR DEPLOYING CONTRACT +CURL REQUEST FOR DEPLOYING CONTRACT-- custom dataset and model, 1 node curl -X POST http://localhost:8080/init \ -H "Content-Type: application/json" \ @@ -41,10 +41,68 @@ "model_name": "custom_test", "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", + "data_config": {"data": ["C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv"]} + +}' +''' + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- custom dataset and model, 2 nodes + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081", + "http://localhost:8082" + ], + "model_path": "C:\\Users\\nehab\\cse115d\\testmodel.py", + "model_init_params": { "module__input_dim": 14 }, + "model_name": "custom_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", + "data_config": {"data": ["C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv", + "C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_1.csv"]} +}' +''' + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- mnist built in dataset and model, 2 nodes + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081", + "http://localhost:8082" + ], + "model_path": "C:\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", + "model_init_params": { "module__input_dim": 14 }, + "model_name": "mnist_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\venv38\\Lib\\site-packages\\ibmfl\\util\\data_handlers\\mnist_pytorch_data_handler.py", + "data_config": {"npz_file": ["C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz"]} +}' +''' + + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- MNIST built-in dataset and model + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081" + ], + "model_weights_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\configs\\node\\pytorch\\pytorch_sequence.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", "data_config": {"data": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv"} }' ''' +# with MNIST, we already have a pytorch @app.route('/init', methods=['POST']) @@ -56,7 +114,7 @@ def deploy_contract(): model_path = data.get('model_path', os.getenv('MODEL_PYTHON')) model_init_params = data.get('model_init_params', None) - model_name = data.get('model_name', 'custom_test') + model_name = data.get('model_name', 'model') model_weights_path = data.get('model_weights_path') # upload model to firebase @@ -193,7 +251,7 @@ def initialize_nodes(firebase_model_path, firebase_datahandler_path, node_urls): -H "Content-Type: application/json" \ -d '{ "totalRounds": 5, - "minParams": 1 + "minParams": 2 }' ''' @@ -217,7 +275,7 @@ async def init_training(): for r in range(1, num_rounds + 1): print(f"Starting round {r}") aggregator.start_round(initialParams, r) - print("Sent initial parameters to nodes") + print("Finished start_round function") # Listen for updates from nodes newAggregatorParams = await listen_for_update_agg(min_params, r) print("Received aggregated parameters") @@ -257,6 +315,8 @@ async def listen_for_update_agg(min_params, roundNumber): if params_response.status_code == 200: result = params_response.json() + # print(f"blockchain get r{roundNumber} returns {result}") + if result and len(result) > 0: # Extract all trained_params into a list node_params_links = [ @@ -278,6 +338,28 @@ async def listen_for_update_agg(min_params, roundNumber): await asyncio.sleep(2) +# added inference endpoint +@app.route('/inference', methods=['POST']) +def inference(): + """Inference on current model w/ data passed in.""" + try: + data = request.json + test_data = data.get('data', {}) + + results = aggregator.inference(aggregator.fusion_model.fl_model, test_data) + + response = { + 'status': 'success', + 'message': 'Inference completed successfully', + 'model_accuracy': results['acc'] * 100, + 'classification_report': results['classificatio_report'] + } + + return jsonify(response) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 + if __name__ == '__main__': # Add argument parsing to make the port configurable diff --git a/blockchain/node.py b/blockchain/node.py index 7aab9de..1d5b45f 100644 --- a/blockchain/node.py +++ b/blockchain/node.py @@ -45,12 +45,12 @@ def __init__(self, firebase_model_path, firebase_datahandler_path, replica_name) self.currentRound = 1 # download model from firebase - fl_model = self.load_firebase_model(firebase_model_path); + self.fl_model = self.load_firebase_model(firebase_model_path); # download datahandler from firebase - data_handler = self.load_firebase_datahandler(firebase_datahandler_path); + self.data_handler = self.load_firebase_datahandler(firebase_datahandler_path); # create the local training handler - self.local_training_handler = LocalTrainingHandler(fl_model=fl_model, data_handler=data_handler) + self.local_training_handler = LocalTrainingHandler(fl_model=self.fl_model, data_handler=self.data_handler) # model_def == 1: PytorchFLModel @@ -79,21 +79,19 @@ def load_firebase_model(self, firebase_model_path): print("Error: No model data found in Firebase.") return - print('downloaded model_data') + print('Downloaded model_data from Firebase') # derive fields, then code source code and weights model_source_code = decode_from_base64(model_data["source_code"]).decode("utf-8") model_weights = decode_from_base64(model_data["weights"]) init_params = model_data["init_params"] - print('derived fields') - # Save the downloaded weights to a file-- necessary to load the model later downloaded_weights_path = "downloaded_model_weights.pt" with open(downloaded_weights_path, "wb") as f: f.write(model_weights) - print('saved weights to file') + print('Saved weights to local file') # Dynamically recreate the model class downloaded_namespace = {} @@ -135,7 +133,7 @@ def load_firebase_datahandler(self, firebase_datahandler_path): print("Error: No DataHandler data found in Firebase.") return - print('got datahandler data from firebase') + print('Datahandler data acquired from firebase') # decode the source code and configuration datahandler_source_code = decode_from_base64(datahandler_data["source_code"]).decode("utf-8") @@ -157,8 +155,15 @@ def load_firebase_datahandler(self, firebase_datahandler_path): else: print("Recreated DataHandler class:", datahandler_class) + # data config contains the data paths for all nodes + # usually, i think it'd make more sense for the nodes to set this in their local .env files + # but for now, data config contains all paths-- we can access this specific node's from the replica name + replicaNumber = int(self.replicaName[-1]) + personal_data_config = {next(iter(data_config)): data_config["data"][replicaNumber]} + + # initialize the datahandler with the configuration - data_handler = datahandler_class(data_config=data_config) + data_handler = datahandler_class(data_config=personal_data_config) print("DataHandler successfully reconstructed and initialized.") return data_handler @@ -194,13 +199,35 @@ def add_node_params(self, round_number, newly_trained_params_db_link): "trained_params": "{newly_trained_params_db_link}" }} }}>''' - response = requests.post(url, headers=headers, data=data) - print(f"{self.replicaName} has submitted results for round {round_number}") - + retries = 0; + max_retries = 5; + while retries < max_retries: + response = requests.post(url, headers=headers, data=data) + if response.status_code == 200: + print(f"{self.replicaName} has submitted results for round {round_number}") + return { + 'status': 'success', + 'message': 'node model parameters added successfully' + } + else: + print(f"Failed to add node {self.replicaName} params to blockchain. Response: {response}. Retrying ({retries + 1}/{max_retries})...") + retries += 1; + time.sleep(15); + return { - 'status': 'success', - 'message': 'node model parameters added successfully' - } + 'status': 'error', + 'message': 'node was unable to add to blockchain' + } + + # response = requests.post(url, headers=headers, data=data) + # print("response after addding node data to blockchain ", response); + + # print(f"{self.replicaName} has submitted results for round {round_number}") + + # return { + # 'status': 'success', + # 'message': 'node model parameters added successfully' + # } except Exception as e: return { @@ -221,12 +248,12 @@ def train_model_params(self, aggregator_model_params_db_link, round_number): # First round initialization if round_number == 1: - print('initializing weights, round1') + #print('initializing weights, round1') weights = self.local_training_handler.fl_model.get_model_update() - print("round 1 weights", weights) + #print("round 1 weights", weights) else: try: - print('round1+, getting weights from aggregator') + #print('round1+, getting weights from aggregator') # Extract the key from the URL model_updates_key = aggregator_model_params_db_link.split('/')[-1].replace('.json', '') @@ -258,7 +285,7 @@ def train_model_params(self, aggregator_model_params_db_link, round_number): 'model_update': encoded_params }) - print('pushed weights to db') + print('Pushed weights to Firebase') return f"{self.database_url}/node_model_updates/{data_pushed.key}.json" @@ -273,3 +300,12 @@ def decode_params(self, encoded_model_update): serialized_data = zlib.decompress(compressed_data) model_weights = pickle.loads(serialized_data) return model_weights + + # modified to get test data from datahandler + def inference(self, data): + data1 = self.data_handler.get_data() + print("got data from inference handler") + data_test = data1[1]; + results = self.fl_model.evaluate(data_test) + print("results ", results); + return results diff --git a/blockchain/node_server.py b/blockchain/node_server.py index 72baa7e..250321d 100644 --- a/blockchain/node_server.py +++ b/blockchain/node_server.py @@ -129,7 +129,7 @@ def listen_for_start_round(nodeInstance, stop_event): # check if aggregator's params have been posted if response.status_code == 200: data = response.json() - print(f"Response Data: {data}") # Debugging line to inspect the structure + # print(f"Response Data: {data}") # Debugging line to inspect the structure round_data = None for item in data: @@ -141,9 +141,8 @@ def listen_for_start_round(nodeInstance, stop_event): if round_data: print(f"Round Data: {round_data}") # Debugging line paramsLink = round_data.get('initParams', '') - # error here at modelUpdate = line modelUpdate = nodeInstance.train_model_params(paramsLink, nodeInstance.currentRound) - print(modelUpdate); + # print(modelUpdate); nodeInstance.add_node_params(nodeInstance.currentRound, modelUpdate) nodeInstance.currentRound += 1 else: @@ -155,6 +154,34 @@ def listen_for_start_round(nodeInstance, stop_event): print(f"Error in listener thread: {str(e)}") time.sleep(2) +#inference untested +''' +curl -X POST http://localhost:8082/inference \ +-H "Content-Type: application/json" \ +-d '{ +}' +''' +@app.route('/inference', methods=['POST']) +def inference(): + """Inference on current model w/ data passed in.""" + try: + data = request.json + test_data = data.get('data', {}) + + results = node_instance.inference(test_data) + + print(results) + + response = { + 'status': 'success', + 'message': 'Inference completed successfully', + 'model_accuracy': results['accuracy_score'] * 100, + } + + return jsonify(response) + + except Exception as e: + return jsonify({'status': 'error', 'message': str(e)}), 500 if __name__ == '__main__': # Set up argument parser From 646c7f8f22403e38a9f41b8fafbf7c9420d419fc Mon Sep 17 00:00:00 2001 From: neabbas Date: Fri, 6 Dec 2024 01:52:34 -0800 Subject: [PATCH 3/4] works with both custom and mnist datasets, but has some blockchain bugs-- still runs fully without issue somehow --- blockchain/aggregator_server.py | 105 +++++++++++++++++++++----------- blockchain/node.py | 41 ++++++++----- 2 files changed, 98 insertions(+), 48 deletions(-) diff --git a/blockchain/aggregator_server.py b/blockchain/aggregator_server.py index b4660ff..d98e265 100644 --- a/blockchain/aggregator_server.py +++ b/blockchain/aggregator_server.py @@ -66,9 +66,26 @@ }' ''' -''' -CURL REQUEST FOR DEPLOYING CONTRACT-- mnist built in dataset and model, 2 nodes +# ''' +# CURL REQUEST FOR DEPLOYING CONTRACT-- mnist built in dataset and model, 2 nodes + +# curl -X POST http://localhost:8080/init \ +# -H "Content-Type: application/json" \ +# -d '{ +# "nodeUrls": [ +# "http://localhost:8081", +# "http://localhost:8082" +# ], +# "model_path": "C:\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", +# "model_name": "mnist_test", +# "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", +# "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\venv38\\Lib\\site-packages\\ibmfl\\util\\data_handlers\\mnist_pytorch_data_handler.py", +# "data_config": {"npz_file": ["C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", +# "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz"]} +# }' +# ''' +''' curl -X POST http://localhost:8080/init \ -H "Content-Type: application/json" \ -d '{ @@ -76,34 +93,18 @@ "http://localhost:8081", "http://localhost:8082" ], - "model_path": "C:\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", - "model_init_params": { "module__input_dim": 14 }, + "model_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", "model_name": "mnist_test", "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\venv38\\Lib\\site-packages\\ibmfl\\util\\data_handlers\\mnist_pytorch_data_handler.py", - "data_config": {"npz_file": ["C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", - "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz"]} -}' -''' - - -''' -CURL REQUEST FOR DEPLOYING CONTRACT-- MNIST built-in dataset and model - -curl -X POST http://localhost:8080/init \ --H "Content-Type: application/json" \ --d '{ - "nodeUrls": [ - "http://localhost:8081" - ], - "model_weights_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\configs\\node\\pytorch\\pytorch_sequence.pt", - "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", - "data_config": {"data": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv"} - + "data_config": { + "npz_file": [ + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz" + ] + } }' ''' -# with MNIST, we already have a pytorch - @app.route('/init', methods=['POST']) def deploy_contract(): @@ -155,19 +156,54 @@ def pytorch_upload(model_file_path, model_init_params, model_name, firebase_mode break if model_class is None: - raise ValueError("No PyTorch model class found in the specified file.") + print("No PyTorch model class found in the specified file.") + + print("Searching for nn.Sequential object...") + + # set up fields necessary for get_model_config + + # temporary folder for model serialization + folder_configs = os.path.join(os.getcwd(), "model"); + model_weights_path = os.path.join(folder_configs, "pytorch_sequence.pt"); + dataset = None; # this isn't even used in the model so let's skip for now + is_agg = False; # this is being uploaded for nodes to use, we want an actual model + party_id = None; # also isn't even used + + get_model_config = namespace['get_model_config'] + model_config = get_model_config(folder_configs, dataset, is_agg, party_id) + + if model_config is None or "spec" not in model_config: + raise ValueError("Failed to retrieve a valid model configuration.") + + #print("Model Config: ", model_config) + + # build model_specs + model_specs = model_config['spec'] + print("model specs: ", model_specs) + + # Initialize PytorchFLModel + fl_model = PytorchFLModel( + model_name="Pytorch_NN", + model_spec=model_specs + ) + + else: print("Identified model class:", model_class) - # initialize model - fl_model = PytorchFLModel( - model_name=model_name, - pytorch_module=model_class, - module_init_params=model_init_params, - ) + # Initialize PytorchFLModel + fl_model = PytorchFLModel( + model_name="Pytorch_NN", + pytorch_module=model_class, + module_init_params=model_init_params, + ) + + # Save model weights + model_weights_path = os.path.join(os.getcwd(), "model\\pytorch_sequence.pt"); + fl_model.save_model(filename=model_weights_path) + + model_specs = None - # save current model weights (empty for now) - fl_model.save_model(filename=model_weights_path) # Encode the source code and model weights encoded_source_code = encode_to_base64(model_source_code) @@ -179,6 +215,7 @@ def pytorch_upload(model_file_path, model_init_params, model_name, firebase_mode "source_code": encoded_source_code, "weights": encoded_weights, "init_params": model_init_params, + "model_spec": model_specs } firebase_model_ref = db.reference(firebase_model_path) diff --git a/blockchain/node.py b/blockchain/node.py index 1d5b45f..aeeefad 100644 --- a/blockchain/node.py +++ b/blockchain/node.py @@ -84,7 +84,8 @@ def load_firebase_model(self, firebase_model_path): # derive fields, then code source code and weights model_source_code = decode_from_base64(model_data["source_code"]).decode("utf-8") model_weights = decode_from_base64(model_data["weights"]) - init_params = model_data["init_params"] + init_params = model_data.get("init_params", None) + model_specs = model_data.get("model_spec", None) # Save the downloaded weights to a file-- necessary to load the model later downloaded_weights_path = "downloaded_model_weights.pt" @@ -105,21 +106,32 @@ def load_firebase_model(self, firebase_model_path): break if downloaded_model_class is None: - raise ValueError("No PyTorch model class found in the downloaded source code.") + print("No PyTorch model class found in the downloaded source code.") + + print("Creating nn.Sequential version...") + + # set up fields necessary for get_model_config + + fl_model = PytorchFLModel( + model_name="Pytorch_NN", + model_spec=model_specs + ) else: print("Recreated model class:", downloaded_model_class) - # recreate the PytorchFLModel and load weights - fl_model = PytorchFLModel( - model_name="Pytorch_NN", - pytorch_module=downloaded_model_class, - module_init_params=init_params, - ) - fl_model.load_model( - pytorch_module=downloaded_model_class, - model_filename=downloaded_weights_path, - module_init_params=init_params, - ) + # Reinitialize the PytorchFLModel and load the weights + fl_model = PytorchFLModel( + model_name="Pytorch_NN", + pytorch_module=downloaded_model_class, + module_init_params=init_params, + ) + + fl_model.load_model( + pytorch_module=downloaded_model_class, + model_filename=downloaded_weights_path, + module_init_params=init_params, + ) + print("PytorchFLModel successfully reconstructed and loaded.") return fl_model @@ -159,7 +171,8 @@ def load_firebase_datahandler(self, firebase_datahandler_path): # usually, i think it'd make more sense for the nodes to set this in their local .env files # but for now, data config contains all paths-- we can access this specific node's from the replica name replicaNumber = int(self.replicaName[-1]) - personal_data_config = {next(iter(data_config)): data_config["data"][replicaNumber]} + key = next(iter(data_config)) + personal_data_config = {key: data_config[key][replicaNumber]} # initialize the datahandler with the configuration From d5a81f19076a13e036c71f60027658e899769416 Mon Sep 17 00:00:00 2001 From: neabbas Date: Fri, 6 Dec 2024 15:28:54 -0800 Subject: [PATCH 4/4] updated readme working code --- README.md | 215 +++++++++++++++++++++++++++++++- blockchain/aggregator.py | 60 ++++----- blockchain/aggregator_server.py | 44 ++++--- blockchain/node.py | 52 ++++---- 4 files changed, 295 insertions(+), 76 deletions(-) diff --git a/README.md b/README.md index b32a6f3..a197ac2 100644 --- a/README.md +++ b/README.md @@ -1 +1,214 @@ -# Anylog-Edgelake-CSE115D \ No newline at end of file +# Federated Learning -- AnyLog EdgeLake + + + +## Why EdgeLake: + + **Efficiency:** Reduces data transfer by sharing model parameters instead of sharing entire datasets. + +- **Performance:** Our distributed system enables high-computation training to be split across multiple nodes, significantly enhancing speed and scalability compared to single-node processing. . + +- **Privacy:** Keeps data on nodes, minimizing exposure risks and enhancing security with privacy-preserving technologies such as blockchain. + + +## Prerequisites + +- pip install -r requirements.txt +### Software Requirements +- Python 3.7+ installed. + +### Repository Access +Clone the EdgeLake repository: +```bash +git clone https://github.com/EdgeLake/EdgeLake +``` + +### Files to Prepare + +- **master.env**: Ensure this file is available for configuration. +- **start-script.al**: This script is necessary for proper configuration. +- **Example CURL commands file**: Keep this handy for testing and reference. +- **/blockchain/.env**: Used to specify system variables, file paths, etc. Update with values corresponding to your system. + +--- + +## Setup Instructions + +### Step 1: Open the Project +- Open PyCharm and load the cloned EdgeLake repository. + +### Step 2: Run Initial Setup +- Locate the file `edgelake.py` within the `edge_lake` directory in PyCharm. +- Click on the file and press **Run**. Allow the file to execute and then stop the run. + +### Step 3: Edit Configuration +- Navigate to **Run > Edit Configurations** in PyCharm. +- Perform the following edits: + + #### 1. Paths to .env files: + - Add the path to the `master.env` file in the appropriate section. + - Open `master.env` and modify the `IP address` and `RPC provider` to match your environment. + + #### 2. Script Parameters: + - Add the command below to the `Script parameters` section: + ```bash + process [path_to]/CSE-115D-start-script.al + ``` + Replace `[path_to]` with the actual file path to `start-script.al`. + +### Step 4: Run EdgeLake + + +### Step 5: Testing with CURL Commands + + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- General Form + +curl -X POST http://localhost:[AGGREGATOR_PORT]/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:[NODE_0_PORT]", + "http://localhost:[NODE_1_PORT]" + ], + "model_path": "[FILE_PATH_TO_PYTORCH_MODEL_SOURCE_CODE]", + "model_init_params": [OPTIONAL, IF YOUR PYTORCH MODEL HAS ANY], + "model_name": "[MODEL_NAME]", + "model_weights_path": "[WHERE YOU WANT MODEL WEIGHTS SAVED]", + "data_handler_path": "[FILE_PATH_TO_DATA_HANDLER_SOURCE_CODE]", + "data_config": {"data": ["[DATA_FILE_FOR_NODE_0]", + "[DATA_FILE_FOR_NODE_1]]} +}' +''' + + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- custom dataset and model, 1 node + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081" + ], + "model_path": "C:\\Users\\nehab\\cse115d\\testmodel.py", + "model_init_params": { "module__input_dim": 14 }, + "model_name": "custom_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", + "data_config": {"data": ["C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv"]} + +}' +''' + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- custom dataset and model, 2 nodes + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081", + "http://localhost:8082" + ], + "model_path": "C:\\Users\\nehab\\cse115d\\testmodel.py", + "model_init_params": { "module__input_dim": 14 }, + "model_name": "custom_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d_anylog_edgelake\\custom_data_handler.py", + "data_config": {"data": ["C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_0.csv", + "C:\\Users\\nehab\\cse115d_anylog_edgelake\\heart_data\\party_data\\party_1.csv"]} +}' +''' + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- mnist built in dataset and model, 2 nodes + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081", + "http://localhost:8082" + ], + "model_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", + "model_name": "mnist_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\venv38\\Lib\\site-packages\\ibmfl\\util\\data_handlers\\mnist_pytorch_data_handler.py", + "data_config": { + "npz_file": [ + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz" + ] + } +}' +''' + +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- mnist dataset and sql model, 2 nodes + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081", + "http://localhost:8082" + ], + "model_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", + "model_name": "mnist_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\custom_sql_datahandler.py", + "data_config": { + "npz_file": [ + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz" + ] + } +}' +''' + +--- + +## Automating CURL Commands + + +```python +import subprocess + +# Define CURL commands +curl_commands = [ + "curl -X POST -H 'Content-Type: application/json' -d '{\"key\": \"value\"}' http://127.0.0.1:5000/api", + "curl -X GET http://127.0.0.1:5000/api/status", + # Add more commands here... +] + +# Execute each command +for cmd in curl_commands: + try: + result = subprocess.run(cmd, shell=True, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print(f"Command: {cmd}\nOutput:\n{result.stdout}") + except subprocess.CalledProcessError as e: + print(f"Error executing command: {cmd}\nError:\n{e.stderr}") +``` + + + + +### CURL Issues +- If CURL commands fail: + - Verify the EdgeLake service is running. + - Check network connectivity and server status. + +--- diff --git a/blockchain/aggregator.py b/blockchain/aggregator.py index 6cc0bf5..f45b73d 100644 --- a/blockchain/aggregator.py +++ b/blockchain/aggregator.py @@ -56,38 +56,38 @@ def start_round(self, initParamsLink, roundNumber): "initParams": "{initParamsLink}" }} }}>''' - retries = 0; - max_retries = 5; - while retries < max_retries: - response = requests.post(url, headers=headers, data=data) - if response.status_code == 200: - print(f"Aggregator has submitted parameters for round {roundNumber} to the blockchain.") - return { - 'status': 'success', - 'message': 'Aggregator model parameters added successfully' - } - else: - print(f"Failed to add aggregator params to blockchain. Response: {response}. Retrying ({retries + 1}/{max_retries})...") - retries += 1; - time.sleep(15); + # retries = 0; + # max_retries = 5; + # while retries < max_retries: + # response = requests.post(url, headers=headers, data=data) + # if response.status_code == 200: + # print(f"Aggregator has submitted parameters for round {roundNumber} to the blockchain.") + # return { + # 'status': 'success', + # 'message': 'Aggregator model parameters added successfully' + # } + # else: + # print(f"Failed to add aggregator params to blockchain. Response: {response}. Retrying ({retries + 1}/{max_retries})...") + # retries += 1; + # time.sleep(15); - return { - 'status': 'error', - 'message': 'aggregator was unable to add to blockchain' - } + # return { + # 'status': 'error', + # 'message': 'aggregator was unable to add to blockchain' + # } - # response = requests.post(url, headers=headers, data=data) - # print(response.status_code) - # if response.status_code == 200: - # return { - # 'status': 'success', - # 'message': 'initTraining called successfully' - # } - # else: - # return { - # 'status': 'error', - # 'message': f'Request failed with status code: {response.status_code}' - # } + response = requests.post(url, headers=headers, data=data) + print(response.status_code) + if response.status_code == 200: + return { + 'status': 'success', + 'message': 'initTraining called successfully' + } + else: + return { + 'status': 'error', + 'message': f'Request failed with status code: {response.status_code}' + } except Exception as e: return { diff --git a/blockchain/aggregator_server.py b/blockchain/aggregator_server.py index d98e265..0adf018 100644 --- a/blockchain/aggregator_server.py +++ b/blockchain/aggregator_server.py @@ -66,26 +66,9 @@ }' ''' -# ''' -# CURL REQUEST FOR DEPLOYING CONTRACT-- mnist built in dataset and model, 2 nodes - -# curl -X POST http://localhost:8080/init \ -# -H "Content-Type: application/json" \ -# -d '{ -# "nodeUrls": [ -# "http://localhost:8081", -# "http://localhost:8082" -# ], -# "model_path": "C:\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", -# "model_name": "mnist_test", -# "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", -# "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\venv38\\Lib\\site-packages\\ibmfl\\util\\data_handlers\\mnist_pytorch_data_handler.py", -# "data_config": {"npz_file": ["C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", -# "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz"]} -# }' -# ''' - ''' +CURL REQUEST FOR DEPLOYING CONTRACT-- mnist built in dataset and model, 2 nodes + curl -X POST http://localhost:8080/init \ -H "Content-Type: application/json" \ -d '{ @@ -106,6 +89,29 @@ }' ''' +''' +CURL REQUEST FOR DEPLOYING CONTRACT-- mnist dataset and sql model, 2 nodes + +curl -X POST http://localhost:8080/init \ +-H "Content-Type: application/json" \ +-d '{ + "nodeUrls": [ + "http://localhost:8081", + "http://localhost:8082" + ], + "model_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\federated-learning-lib-main\\examples\\iter_avg\\model_pytorch.py", + "model_name": "mnist_test", + "model_weights_path": "C:\\Users\\nehab\\cse115d\\model_weights.pt", + "data_handler_path": "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\custom_sql_datahandler.py", + "data_config": { + "npz_file": [ + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party0.npz", + "C:\\Users\\nehab\\cse115d\\Anylog-Edgelake-CSE115D\\blockchain\\data\\mnist\\data_party1.npz" + ] + } +}' +''' + @app.route('/init', methods=['POST']) def deploy_contract(): """Deploy the smart contract with predefined nodes.""" diff --git a/blockchain/node.py b/blockchain/node.py index aeeefad..474280f 100644 --- a/blockchain/node.py +++ b/blockchain/node.py @@ -81,7 +81,7 @@ def load_firebase_model(self, firebase_model_path): print('Downloaded model_data from Firebase') - # derive fields, then code source code and weights + # derive fields, then decode source code and weights model_source_code = decode_from_base64(model_data["source_code"]).decode("utf-8") model_weights = decode_from_base64(model_data["weights"]) init_params = model_data.get("init_params", None) @@ -212,35 +212,35 @@ def add_node_params(self, round_number, newly_trained_params_db_link): "trained_params": "{newly_trained_params_db_link}" }} }}>''' - retries = 0; - max_retries = 5; - while retries < max_retries: - response = requests.post(url, headers=headers, data=data) - if response.status_code == 200: - print(f"{self.replicaName} has submitted results for round {round_number}") - return { - 'status': 'success', - 'message': 'node model parameters added successfully' - } - else: - print(f"Failed to add node {self.replicaName} params to blockchain. Response: {response}. Retrying ({retries + 1}/{max_retries})...") - retries += 1; - time.sleep(15); + # retries = 0; + # max_retries = 5; + # while retries < max_retries: + # response = requests.post(url, headers=headers, data=data) + # if response.status_code == 200: + # print(f"{self.replicaName} has submitted results for round {round_number}") + # return { + # 'status': 'success', + # 'message': 'node model parameters added successfully' + # } + # else: + # print(f"Failed to add node {self.replicaName} params to blockchain. Response: {response}. Retrying ({retries + 1}/{max_retries})...") + # retries += 1; + # time.sleep(15); - return { - 'status': 'error', - 'message': 'node was unable to add to blockchain' - } + # return { + # 'status': 'error', + # 'message': 'node was unable to add to blockchain' + # } - # response = requests.post(url, headers=headers, data=data) - # print("response after addding node data to blockchain ", response); + response = requests.post(url, headers=headers, data=data) + print("response after addding node data to blockchain ", response); - # print(f"{self.replicaName} has submitted results for round {round_number}") + print(f"{self.replicaName} has submitted results for round {round_number}") - # return { - # 'status': 'success', - # 'message': 'node model parameters added successfully' - # } + return { + 'status': 'success', + 'message': 'node model parameters added successfully' + } except Exception as e: return {