Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7728b2b
init dev-testing
ebootehsaz Mar 31, 2025
3957519
conf. load tester works as expected; stores metrics generated by ab c…
Apr 1, 2025
31cb1f8
added venv to gitignore
Apr 1, 2025
ea2c235
work on metrics and load tester
Apr 8, 2025
222e5ce
testing and visualization
ebootehsaz Apr 13, 2025
7320738
update controlled granularity
Apr 13, 2025
8143cb0
created json files
ebootehsaz Apr 18, 2025
e3cbc67
Merge branch 'dev-testing' of https://github.com/CSCI555-Spring25/Sca…
ebootehsaz Apr 18, 2025
c50b3d5
update
Apr 18, 2025
0ce5cb6
update
Apr 18, 2025
4fa922a
update
Apr 18, 2025
2463401
timezone hardcode
Apr 18, 2025
b7fbc55
req
Apr 18, 2025
8688461
bugfix
Apr 18, 2025
d4f4faf
update timezone to utc
Apr 18, 2025
d78364c
update timezone to utc
Apr 18, 2025
11599cc
fixesd parser
ebootehsaz Apr 19, 2025
857c158
Merge branch 'dev-testing' of https://github.com/CSCI555-Spring25/Sca…
ebootehsaz Apr 19, 2025
998ea7f
archiving old data. small fixes
ebootehsaz Apr 19, 2025
801f6fc
update lookahead to just 3mins
Apr 20, 2025
6d9b361
test1 nearly done
Apr 21, 2025
5e5da1c
test1 nearly done
Apr 21, 2025
c061d80
Merge branch 'dev-testing' of https://github.com/CSCI555-Spring25/Sca…
Apr 21, 2025
8d2a322
new traffic pattern; new params for wrk2
Apr 21, 2025
2ca0d50
jsons made
Apr 21, 2025
1420df7
update timezone to pst
Apr 22, 2025
a546006
making it slightly conservative
Apr 22, 2025
6e808b1
update threshold
Apr 23, 2025
18f1559
update lookforward to 4 mins
Apr 23, 2025
606c265
jsons
Apr 26, 2025
3489676
base server change
Apr 26, 2025
f60351e
Merge branch 'dev-testing' of https://github.com/CSCI555-Spring25/Sca…
Apr 26, 2025
6476038
running tests
Apr 26, 2025
1e5383b
new visualization file
Apr 28, 2025
e50ef0e
logs plot
Apr 28, 2025
07a2e0d
work on visualizing data
Apr 28, 2025
8bf46f7
Merge branch 'dev-testing' of https://github.com/CSCI555-Spring25/Sca…
Apr 28, 2025
4f90f0a
reorganizing plotted dirs. working on visualization cache
Apr 28, 2025
2a12e7f
saving state; most visuals work; cache works
Apr 28, 2025
89fb5fd
lookforwards 4->2
Apr 29, 2025
4b3edc9
save state 13.127
May 1, 2025
97cac90
update look ahead to 15
May 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
minikube-linux-amd64
venv
221 changes: 176 additions & 45 deletions controller/controller.py

Large diffs are not rendered by default.

323 changes: 323 additions & 0 deletions controller/controller.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
#!/usr/bin/env python3
import kopf
import kubernetes
import os
import json
import datetime
import time
import threading
import logging

# K8s config
kubernetes.config.load_incluster_config()

# Clients
api_client = kubernetes.client.ApiClient()
apps_api = kubernetes.client.AppsV1Api(api_client)
autoscaling_api = kubernetes.client.AutoscalingV2Api(api_client)
custom_api = kubernetes.client.CustomObjectsApi(api_client)

# Constants
DATA_DIR = "/data"
GROUP = "scaler.cs.usc.edu"
VERSION = "v1"
PLURAL = "predictiveautoscalers"

logger = logging.getLogger(__name__)
thread_logger = logging.getLogger(f"{__name__}.thread")

def ensure_data_dir():
os.makedirs(DATA_DIR, exist_ok=True)

def get_history_file_path(namespace=None, name=None):
if not namespace and not name:
return os.path.join(DATA_DIR, "realistic-traffic.json")
else:
return os.path.join(DATA_DIR, f"{namespace}_{name}_history.json")

def load_historical_data(namespace=None, name=None):
file_path = get_history_file_path(namespace, name)
if not os.path.exists(file_path):
return {"data": []}

try:
with open(file_path, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
return {"data": []}

def save_historical_data(data, namespace=None, name=None):
ensure_data_dir()
file_path = get_history_file_path(namespace, name)

with open(file_path, 'w') as f:
json.dump(data, f, indent=2)

def get_current_pod_count(namespace, deployment_name):
try:
deployment = apps_api.read_namespaced_deployment(deployment_name, namespace)
return deployment.status.replicas or 0
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
logger.info(f"Deployment {deployment_name} not found in namespace {namespace}")
return 0
raise

def get_historical_pod_count_at_time(data, time_offset_minutes=0):
now = datetime.datetime.now(datetime.timezone.utc)
target_time = now - datetime.timedelta(minutes=time_offset_minutes)

target_hour = target_time.hour
target_minute = (target_time.minute // 5) * 5 # Floor to nearest 5-minute

# Target timestamp in HH:MM format
target_timestamp = f"{target_hour:02d}:{target_minute:02d}"
logger.info(f"Looking for historical data at {target_timestamp}")

#Exact match check
for entry in data["data"]:
if entry["timestamp"] == target_timestamp:
logger.info(f"Found exact match for {target_timestamp} with pod count {entry['podCount']}")
return entry["podCount"]

# If exact match not found, find closest timestamp that's earlier
target_minutes = target_hour * 60 + target_minute

closest_entry = None
closest_diff = float('inf')

for entry in data["data"]:
h, m = map(int, entry["timestamp"].split(':'))
entry_minutes = h * 60 + m
if entry_minutes <= target_minutes:
diff = target_minutes - entry_minutes
if diff < closest_diff:
closest_diff = diff
closest_entry = entry

if closest_entry:
logger.info(f"Found closest earlier timestamp {closest_entry['timestamp']} with pod count {closest_entry['podCount']}")
return closest_entry["podCount"]

logger.info(f"No historical data found for {target_timestamp}, using default pod count 1")
return 1

def update_historical_data(data, current_pods, historical_weight=0.7, current_weight=0.3):
now = datetime.datetime.now(datetime.timezone.utc)
timestamp = f"{now.hour:02d}:{(now.minute // 5) * 5:02d}"

for entry in data["data"]:
if entry["timestamp"] == timestamp:
historical_count = entry["podCount"]
entry["podCount"] = int(historical_weight * historical_count + current_weight * current_pods)
logger.info(f"Updated historical data for timestamp {timestamp} to {entry['podCount']}")
break
else:
data["data"].append({"timestamp": timestamp, "podCount": current_pods})
logger.info(f"Created new historical data entry for timestamp {timestamp} with pod count {current_pods}")

def timestamp_to_minutes(ts):
h, m = map(int, ts.split(':'))
return h * 60 + m

data["data"] = sorted(data["data"], key=lambda x: timestamp_to_minutes(x["timestamp"]))

return data

def prune_old_data(data, retention_days=7):
#Prune data older than retention_days
logger.info(f"Historical data has {len(data['data'])} entries")
return data

def calculate_required_pods(current_pods, historical_data, max_replicas, prediction_window_minutes=10):
historical_pods_now = get_historical_pod_count_at_time(historical_data, 0)

if historical_pods_now == 0: # Divide by zero error fix
historical_pods_now = 1

historical_pods_ahead = get_historical_pod_count_at_time(historical_data, -prediction_window_minutes) # Buggy fix, Need to update function add offset
thread_logger.info(f"Historical data retrieved - historical pods now: {historical_pods_now}, historical pods ahead: {historical_pods_ahead}")

ratio = current_pods / historical_pods_now
thread_logger.info(f"Ratio calculated: {ratio} (current pods: {current_pods}, historical pods now: {historical_pods_now})")
required_pods = ratio * historical_pods_ahead

required_pods = min(int(required_pods), max_replicas)
required_pods = max(required_pods, 1)
thread_logger.info(f"Required pods calculated: {required_pods}")

return required_pods

def update_hpa(namespace, hpa_name, min_replicas):
try:
hpa = autoscaling_api.read_namespaced_horizontal_pod_autoscaler(hpa_name, namespace)

hpa.spec.min_replicas = min_replicas
autoscaling_api.patch_namespaced_horizontal_pod_autoscaler(
name=hpa_name,
namespace=namespace,
body={"spec": {"minReplicas": min_replicas}}
)
thread_logger.info(f"Updated HPA {hpa_name} minReplicas to {min_replicas}")
return True
except kubernetes.client.exceptions.ApiException as e:
thread_logger.exception(f"Failed to update HPA {hpa_name}: {e}")
return False

def update_status(namespace, name, status_data):
try:
if "lastUpdated" in status_data:
status_data["lastUpdated"] = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")

custom_api.patch_namespaced_custom_object_status(
GROUP, VERSION, namespace, PLURAL, name,
{"status": status_data}
)
except kubernetes.client.exceptions.ApiException as e:
thread_logger.exception(f"Failed to update status: {e}")

timers = {}

@kopf.on.create('scaler.cs.usc.edu', 'v1', 'predictiveautoscalers')
def create_fn(spec, name, namespace, logger, **kwargs):
logger.info(f"Creating PredictiveAutoscaler {name} in namespace {namespace}")

# Configuration
target_deployment = spec['targetDeployment']
target_hpa = spec['targetHPA']
max_replicas = spec['maxReplicas']
update_interval = spec.get('updateInterval', 5) # default 5 minutes

historical_data = load_historical_data(namespace, name)
current_pods = get_current_pod_count(namespace, target_deployment)

if current_pods > 0:
now = datetime.datetime.now(datetime.timezone.utc)
timestamp = f"{now.hour:02d}:{(now.minute // 5) * 5:02d}"
historical_data["data"].append({"timestamp": timestamp, "podCount": current_pods})
save_historical_data(historical_data, namespace, name)
logger.info(f"Initialized historical data with timestamp {timestamp} and pod count {current_pods}")

def recurring_update():
if not check_if_cr_exists(namespace, name):
thread_logger.info(f"PredictiveAutoscaler {name} no longer exists, stopping timer")
if f"{namespace}_{name}" in timers:
timers[f"{namespace}_{name}"].cancel()
del timers[f"{namespace}_{name}"]
return

try:
cr = custom_api.get_namespaced_custom_object(
GROUP, VERSION, namespace, PLURAL, name
)
spec = cr.get('spec', {})

target_deployment = spec['targetDeployment']
target_hpa = spec['targetHPA']
max_replicas = spec['maxReplicas']
historical_weight = spec.get('historicalWeight', 0.7)
current_weight = spec.get('currentWeight', 0.3)
history_retention_days = spec.get('historyRetentionDays', 7)
prediction_window_minutes = spec.get('predictionWindowMinutes', 10)
update_interval = spec.get('updateInterval', 5)

current_pods = get_current_pod_count(namespace, target_deployment)
thread_logger.info(f"Current pod count for {target_deployment}: {current_pods}")

historical_data = load_historical_data(namespace, name)

if current_pods > 0:
required_pods = calculate_required_pods(
current_pods, historical_data, max_replicas, prediction_window_minutes)
thread_logger.info(f"Required pods for next {prediction_window_minutes} minutes: {required_pods}")

# Update HPA
success = update_hpa(namespace, target_hpa, required_pods)

# Update historical data
updated_data = update_historical_data(
historical_data, current_pods, historical_weight, current_weight)
updated_data = prune_old_data(updated_data, history_retention_days)
save_historical_data(updated_data, namespace, name)
thread_logger.info(f"Updated historical data to {updated_data}")

status_data = {
"lastUpdated": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"currentPrediction": required_pods
}
update_status(namespace, name, status_data)
else:
thread_logger.warning(f"Deployment {target_deployment} has 0 pods, skipping update")
status_data = {
"lastUpdated": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"lastError": "Deployment has 0 pods"
}
update_status(namespace, name, status_data)

timer = threading.Timer(update_interval * 60, recurring_update)
timer.daemon = True
timers[f"{namespace}_{name}"] = timer
timer.start()

except Exception as e:
thread_logger.exception(f"Error in recurring update: {e}")
status_data = {
"lastUpdated": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"lastError": str(e)
}
update_status(namespace, name, status_data)

timer = threading.Timer(update_interval * 60, recurring_update)
timer.daemon = True
timers[f"{namespace}_{name}"] = timer
timer.start()

timer = threading.Timer(update_interval * 60, recurring_update)
timer.daemon = True
timers[f"{namespace}_{name}"] = timer
timer.start()

return {'autoscalerStarted': True}

def check_if_cr_exists(namespace, name):
try:
custom_api.get_namespaced_custom_object(
GROUP, VERSION, namespace, PLURAL, name
)
return True
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
return False
raise

@kopf.on.delete('scaler.cs.usc.edu', 'v1', 'predictiveautoscalers')
def delete_fn(spec, name, namespace, logger, **kwargs):
logger.info(f"Deleting PredictiveAutoscaler {name} in namespace {namespace}")

if f"{namespace}_{name}" in timers:
timers[f"{namespace}_{name}"].cancel()
del timers[f"{namespace}_{name}"]

return {'autoscalerStopped': True}

@kopf.on.update('scaler.cs.usc.edu', 'v1', 'predictiveautoscalers')
def update_fn(spec, old, name, namespace, logger, **kwargs):
logger.info(f"Updating PredictiveAutoscaler {name} in namespace {namespace}")
if f"{namespace}_{name}" in timers:
old_timer = timers[f"{namespace}_{name}"]
old_timer.cancel()

def immediate_update():
if f"{namespace}_{name}" in timers:
timers[f"{namespace}_{name}"].cancel()
create_fn.__wrapped__(spec=spec, name=name, namespace=namespace, logger=logger)

timer = threading.Timer(0, immediate_update)
timer.daemon = True
timers[f"{namespace}_{name}"] = timer
timer.start()

return {'autoscalerUpdated': True}

if __name__ == "__main__":
kopf.run()
3 changes: 2 additions & 1 deletion controller/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
kopf==1.36.0
kubernetes==26.1.0
kubernetes==26.1.0
pytz==2023.3
7 changes: 6 additions & 1 deletion crd/predictive-autoscaler-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ spec:
predictionWindowMinutes:
type: integer
minimum: 1
default: 10
default: 15
predictionWindowSeconds:
type: integer
minimum: 1
default: 900
description: "Number of seconds to look ahead for predictions"
historicalWeight:
type: number
minimum: 0
Expand Down
5 changes: 3 additions & 2 deletions deploy/predictive-autoscaler-instance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ spec:
targetHPA: simpleweb-hpa
maxReplicas: 10
historyRetentionDays: 7
updateInterval: 5 # minutes
predictionWindowMinutes: 10
updateInterval: 1 # Reduced to 1 minute for testing with 1-minute interval data
predictionWindowMinutes: 15 # Deprecated
predictionWindowSeconds: 900 # Using 60 seconds (1 minute) for traffic_1_interval.json
historicalWeight: 0.7
currentWeight: 0.3
---
2 changes: 1 addition & 1 deletion deploy/simpleweb-hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ spec:
name: cpu
target:
type: Utilization
averageUtilization: 50
averageUtilization: 95
Loading