Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion EdgeLake/docker_makefile/edgelake_operator1.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DB_USER="demo"
# Password correlated to database user
DB_PASSWD="passwd"
# Database IP address
DB_IP=192.168.1.125
DB_IP=192.168.108.182
# Database port number
DB_PORT=5432
# Whether to set autocommit data
Expand Down
2 changes: 1 addition & 1 deletion EdgeLake/docker_makefile/edgelake_operator2.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DB_USER="demo"
# Password correlated to database user
DB_PASSWD="passwd"
# Database IP address
DB_IP=192.168.1.125
DB_IP=192.168.108.182
# Database port number
DB_PORT=5432
# Whether to set autocommit data
Expand Down
2 changes: 1 addition & 1 deletion EdgeLake/docker_makefile/edgelake_operator3.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DB_USER="demo"
# Password correlated to database user
DB_PASSWD="passwd"
# Database IP address
DB_IP=192.168.1.125
DB_IP=192.168.108.182
# Database port number
DB_PORT=5432
# Whether to set autocommit data
Expand Down
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ the listed variables in the following files `mnist1.env`, `mnist2.env`, `mnist3.

Note that you do not need to change the ports, they're preconfigured to work.

In addition, update the IP address in for the `EXTERNAL_TCP_IP_PORT` and `EXTERNAL_IP` in the `mnist-agg.env` file.
In addition, update the IP address in for the `EXTERNAL_TCP_IP_PORT` and `EXTERNAL_IP` in the `mnist-agg.env` file.
Double check also to make sure that your `GITHUB_DIR` is pointing the right directory.

Now we are ready to start the simulation.

Expand Down Expand Up @@ -173,7 +174,7 @@ curl -X POST http://localhost:8080/start-training \
```

`totalRounds` defines how many continuous rounds to train for. `minParams` defines how many parameters
the aggregator should wait for before starting the next round.
the aggregator should wait for before starting the next round. `index` defines the specific model that is being trained (Note: you may run multiple models/indices on a singular training node, but make sure to wait for initialization before sending a new "init" to ensure no data overwrites).

At any point during the training process, you can add additional nodes to the process by calling initialization again on the new nodes
(must use the same `index`) and `minParams` will be dynamically adjusted as necessary.
Expand Down Expand Up @@ -461,14 +462,28 @@ To take down the containers, simply run:
docker compose down
```

## Decentralized Federated Learning

[Decentralized Federated Learning](https://ieeexplore.ieee.org/document/10743046) is a more secure approach to the basic “centralized” federated learning approach (as seen above), where there is no aggregator node anymore, but rather, each training node acts as their OWN aggregator. The way it works is that each training node waits for a `minParams` number of nodes before it performs the aggregation itself (rather than having one global aggregator do it for all of them). With centralized FL, having an aggregator poses security concerns and lack of reliance since the data is aggregated only with one server. With DFL, having training nodes do aggregation themselves allows for fault-tolerance in case any of them fail.

Example: With node 1, node 2, and node 3, all three first publish their initial params to the blockchain, assuming `minParams` = 2. Node 1 will wait for node 2 and node 3’s initial params, and once fetched, it will aggregate, and node 1 will then publish those model params to the blockchain. Node 2 will do the same, but for node 1 and node 3, and so forth.

Instead of starting the aggregator server (8080), you only need to initialize the training nodes. Example with training node 1:

```
curl -X POST http://localhost:8081/self-init -H "Content-Type: application/json" -d '{
"index": "test-index",
"replica_name": "node1",
"module_name": "MnistDataHandler",
"module_file": "custom_data_handler.py",
"db_name": "mnist_fl",
"min_params": 2,
"max_rounds": 10
}'
```
If using three training nodes, you want to run these commands three times, focusing on each node (change the replica name and port). Be sure that across all three curls, `min_params` and `max_rounds` are the same.




_Note: DFL is still a work in progress for smaller features. Currently supports: dockerization of APIs, edge inference, running multiple models simultaneously; Not supported: updating minParams, continued training_



Expand Down
8 changes: 4 additions & 4 deletions edgefl/env_files/mnist/mnist-agg.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@
### Note that paths are for Mac/Linux. If using Windows provide your Windows path
### an confirm that the other paths are correct. You may need to make '/' to '\'

GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D
GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D

TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers
MODULE_NAME=MnistDataHandler

TMP_DIR=edgefl/tmp_dir/
# External IP Address for CURL commands to Edgelake
EXTERNAL_IP="192.168.1.125:32049"
EXTERNAL_TCP_IP_PORT="192.168.1.125:32048"
EXTERNAL_IP="192.168.108.182:32049"
EXTERNAL_TCP_IP_PORT="192.168.108.182:32048"

AGG_NAME=agg

# LOCAL PSQL DB NAME
PSQL_DB_NAME="mnist_fl"
PSQL_DB_USER="demo"
PSQL_DB_PASSWORD="passwd"
PSQL_HOST="192.168.56.1"
PSQL_HOST="192.168.1.125"
PSQL_PORT="5432"

FILE_WRITE_DESTINATION="edgefl/file_write"
Expand Down
8 changes: 4 additions & 4 deletions edgefl/env_files/mnist/mnist1.env
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@

### Note that paths are for Mac/Linux. If using Windows provide your Windows path
### an confirm that the other paths are correct. You may need to make '/' to '\'
GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D
GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D

TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers
MODULE_NAME=MnistDataHandler

TMP_DIR=edgefl/tmp_dir/
# External IP Address for CURL commands to Edgelake
EXTERNAL_IP="192.168.1.125:32149"
EXTERNAL_TCP_IP_PORT="192.168.1.125:32148"
EXTERNAL_IP="192.168.108.182:32149"
EXTERNAL_TCP_IP_PORT="192.168.108.182:32148"

# LOCAL PSQL DB NAME
PSQL_DB_NAME=mnist_fl
PSQL_DB_USER=demo
PSQL_DB_PASSWORD=passwd
PSQL_HOST=192.168.56.1
PSQL_HOST=192.168.108.182
PSQL_PORT=5432

FILE_WRITE_DESTINATION="edgefl/file_write"
Expand Down
8 changes: 4 additions & 4 deletions edgefl/env_files/mnist/mnist2.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
### Note that paths are for Mac/Linux. If using Windows provide your Windows path
### an confirm that the other paths are correct. You may need to make '/' to '\'

GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D
GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D

TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers
MODULE_NAME=MnistDataHandler

TMP_DIR=edgefl/tmp_dir/
# External IP Address for CURL commands to Edgelake
EXTERNAL_IP="192.168.1.125:32249"
EXTERNAL_TCP_IP_PORT="192.168.1.125:32248"
EXTERNAL_IP="192.168.108.182:32249"
EXTERNAL_TCP_IP_PORT="192.168.108.182:32248"

# LOCAL PSQL DB NAME
PSQL_DB_NAME="mnist_fl"
PSQL_DB_USER="demo"
PSQL_DB_PASSWORD="passwd"
PSQL_HOST="172.30.176.90"
PSQL_HOST="192.168.108.182"
PSQL_PORT="5432"

FILE_WRITE_DESTINATION="edgefl/file_write"
Expand Down
8 changes: 4 additions & 4 deletions edgefl/env_files/mnist/mnist3.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
### Note that paths are for Mac/Linux. If using Windows provide your Windows path
### an confirm that the other paths are correct. You may need to make '/' to '\'

GITHUB_DIR=/Users/roy/Github-Repos/Anylog-Edgelake-CSE115D
GITHUB_DIR=/mnt/c/Users/tiwar/anylog/Anylog-Edgelake-CSE115D

TRAINING_APPLICATION_DIR=edgefl/platform_components/data_handlers
MODULE_NAME=MnistDataHandler

TMP_DIR=edgefl/tmp_dir
# External IP Address for CURL commands to Edgelake
EXTERNAL_IP="192.168.1.125:32349"
EXTERNAL_TCP_IP_PORT="192.168.1.125:32348"
EXTERNAL_IP="192.168.108.182:32349"
EXTERNAL_TCP_IP_PORT="192.168.108.182:32348"

# LOCAL PSQL DB NAME
PSQL_DB_NAME="mnist_fl"
PSQL_DB_USER="demo"
PSQL_DB_PASSWORD="passwd"
PSQL_HOST="172.30.176.90"
PSQL_HOST="192.168.108.182"
PSQL_PORT="5432"

FILE_WRITE_DESTINATION="edgefl/file_write"
Expand Down
125 changes: 125 additions & 0 deletions edgefl/platform_components/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import pickle
import requests
from asyncio import sleep

# import keras
Expand All @@ -19,6 +20,7 @@
from platform_components.EdgeLake_functions.mongo_file_store import read_file, write_file, copy_file_from_container
from platform_components.lib.logger.logger_config import configure_logging
from platform_components.helpers.LoadClassFromFile import load_class_from_file
from platform_components.lib.modules.local_model_update import LocalModelUpdate

from dotenv import load_dotenv
load_dotenv()
Expand Down Expand Up @@ -269,6 +271,129 @@ def train_model_params(self, aggregator_model_params_db_link, round_number, ip_p
return f'{self.docker_file_write_destination}/{index}/{file}'
return file_name

'''
dfl_train_model_params()
-
'''
def dfl_train_model_params(self, round_number, index):
self.logger.debug(f"[{index}] in dfl_train_model_params for round {round_number}")

# Train model
# model_update = self.local_training_handler.train({})
# print(f"[INFO] [{index}][Round {round_number}] ========== Model training progress ==========")
model_params = self.data_handlers[index].train(round_number)
self.logger.info(f"[{index}][Round {round_number}] Step 2 Complete: Model training done")

# Save and return new weights
encoded_params = self.encode_model(model_params)
file = f"{round_number}-replica-{self.replica_name}.pkl"
# make sure directory exists
os.makedirs(os.path.dirname(f"{self.file_write_destination}/{index}/"), exist_ok=True)
file_name = f"{self.file_write_destination}/{index}/{file}"
with open(f"{file_name}", "wb") as f:
f.write(encoded_params)

if self.docker_running:
self.logger.debug(f'[{index}] written to container at {f"{self.docker_file_write_destination}/{index}/{file}"}')
copy_file_to_container(os.path.join(self.tmp_dir, index), self.docker_container_name, file_name, f"{self.docker_file_write_destination}/{index}/{file}")
return f'{self.docker_file_write_destination}/{index}/{file}'
return file_name

'''
listen_for_update_dfl(self, min_params, round_number, index):
- helper for a singular training node to asynchronously poll the blockchain for other training nodes' model params/weights
'''
async def listen_for_update_dfl(self, min_params, round_number, index):
logger = self.logger
decoded_params = {}

while True:
try:
headers = {
'User-Agent': 'AnyLog/1.23',
'command': f'blockchain get {index}-a{round_number}'
}
response = requests.get(self.edgelake_node_url, headers=headers)
response.raise_for_status()
results = response.json()

node_params_links = []
ip_ports = []

# for item in results:
# key = f"{index}-a{round_number}"
# if key in item and item[key]['node'] != self.replica_name: # skip own model
# node_params_links.append(item[key]['trained_params_local_path'])
# ip_ports.append(item[key]['ip_port'])
for item in results:
key = f"{index}-a{round_number}"
if key in item:
source_node = item[key]['node']
if source_node != self.replica_name:
link = item[key]['trained_params_local_path']
node_params_links.append(link)
ip_ports.append(item[key]['ip_port'])
logger.info(
f"[{index}] [Round {round_number}] Found peer model from {source_node} at {link}")
else:
logger.debug(f"[{index}] Skipping own model {source_node}")

# Fetch and decode
self.fetch_decoded_params(decoded_params, node_params_links, ip_ports, index)

if len(decoded_params) >= min_params:
return list(decoded_params.values())

except Exception as e:
logger.error(f"[{index}] DFL listen error: {str(e)}")

await sleep(2)

# same as in aggregator.py
def fetch_decoded_params(self, decoded_params_dict, node_param_download_links, ip_ports, index):
# use the node_param_download_links to get all the file
# in the form of tuples, like ["('blobs_admin', 'node_model_updates', '1-replica-node1.pkl')"]
# node_ref = db.reference('node_model_updates')

# Loop through each provided download link to retrieve node parameter objects
for i, path in enumerate(node_param_download_links):
# Don't fetch for existing paths
if path in decoded_params_dict:
continue

try:
# Make sure the directory exists
filename = path.split('/')[-1]
local_path = f'{self.file_write_destination}/{index}/{filename}'
if self.docker_running:
docker_file_path = f'{self.docker_file_write_destination}/{index}/{filename}'
response = read_file(self.edgelake_node_url, path,
docker_file_path, ip_ports[i])
copy_file_from_container(os.path.join(self.tmp_dir, index), self.docker_container_name,
docker_file_path,
local_path)
else:
response = read_file(self.edgelake_node_url, path,
local_path, ip_ports[i])

if response.status_code != 200:
raise ValueError(
f"Failed to retrieve node params from link: {filename}. HTTP Status: {response.status_code}"
)

# Decode the model weights from the file
sleep(1)
with open(local_path, 'rb') as f:
data = pickle.load(f)
if not data:
raise ValueError(f"Missing model_weights in data from file: {filename}")

decoded_params_dict[path] = LocalModelUpdate(weights=data)

except Exception as e:
self.logger.error(f"Error retrieving data from link {filename}: {str(e)}")
continue

def encode_model(self, model_update):
serialized_data = pickle.dumps(model_update)
return serialized_data
Expand Down
Loading