Skip to content
11 changes: 11 additions & 0 deletions examples/negotiation/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from rich import print

from examples.negotiation.agents import BuyerAgent, SellerAgent
from mesa_llm.parallel_stepping import enable_automatic_parallel_stepping
from mesa_llm.reasoning.reasoning import Reasoning


Expand Down Expand Up @@ -38,6 +39,16 @@ def __init__(
self.parallel_stepping = parallel_stepping
self.grid = MultiGrid(self.height, self.width, torus=False)

# Enable optimized parallel stepping if parallel_stepping is enabled
if self.parallel_stepping:
enable_automatic_parallel_stepping(
mode="asyncio",
max_concurrent=min(
20, initial_buyers + 2
), # Adjust based on agent count
request_timeout=30.0,
)

# ---------------------Create the buyer agents---------------------
buyer_system_prompt = "You are a buyer in a negotiation game. You are interested in buying a product from a seller. You are also interested in negotiating with the seller. Prefer speaking over changing location as long as you have a seller in sight. If no seller is in sight, move around randomly until yous see one"
buyer_internal_state = ""
Expand Down
3 changes: 3 additions & 0 deletions mesa_llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .parallel_stepping import (
enable_automatic_parallel_stepping,
step_agents_multithreaded,
step_agents_parallel,
step_agents_parallel_sync,
)
Expand All @@ -16,10 +17,12 @@

__all__ = [
"Observation",
"PerformanceBenchmark",
"Plan",
"ToolManager",
"enable_automatic_parallel_stepping",
"record_model",
"step_agents_multithreaded",
"step_agents_parallel",
"step_agents_parallel_sync",
]
Expand Down
210 changes: 210 additions & 0 deletions mesa_llm/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""
Performance benchmark framework for mesa-llm
"""

import csv
import os
import statistics
import time

# Import test models at module level to avoid conditional imports
try:
from tests.test_models import PerformanceTestModel
except ImportError:
PerformanceTestModel = None


class PerformanceBenchmark:
"""Performance testing and analysis framework"""

def __init__(self):
self.results: list[dict] = []

def run_single_test(
self, n_agents: int, runs: int = 3, test_model_class=None
) -> dict:
"""Run performance test for specific agent count"""
print(f"\n🔬 Testing {n_agents} agents...")

# Use test model class (imported at module level)
if test_model_class is None:
test_model_class = PerformanceTestModel

sequential_times = []
parallel_times = []

for run in range(runs):
print(f" Run {run + 1}/{runs}...")

# Test sequential execution
model_seq = test_model_class(n_agents=n_agents, enable_parallel=False)
step_start = time.time()
model_seq.step_sequential()
step_time = time.time() - step_start
sequential_times.append(step_time)

# Test parallel execution
model_par = test_model_class(n_agents=n_agents, enable_parallel=True)
step_start = time.time()
model_par.step_parallel()
step_time = time.time() - step_start
parallel_times.append(step_time)

print(
f" Sequential: {sequential_times[-1]:.2f}s, Parallel: {parallel_times[-1]:.2f}s"
)

# Calculate statistics
avg_seq = statistics.mean(sequential_times)
avg_par = statistics.mean(parallel_times)
speedup = avg_seq / avg_par if avg_par > 0 else float("inf")

result = {
"n_agents": n_agents,
"sequential_time": avg_seq,
"parallel_time": avg_par,
"speedup": speedup,
"per_agent_seq": avg_seq / n_agents,
"per_agent_par": avg_par / n_agents,
}

print(
f" 📊 Results: Sequential {avg_seq:.2f}s, Parallel {avg_par:.2f}s, Speedup {speedup:.2f}x"
)
return result

def run_benchmark(
self, agent_counts: list[int] | None = None, test_model_class=None
) -> list[dict]:
"""Run comprehensive performance benchmark"""
if agent_counts is None:
agent_counts = [5, 10, 15, 20, 25, 30, 40, 50]

self.results = []

print("🚀 Mesa-LLM Performance Benchmark")
print("=" * 50)
print("📋 Testing parallel vs sequential execution")
print("⚠️ Using 10ms simulated LLM work per agent")
print("")

for n_agents in agent_counts:
result = self.run_single_test(
n_agents, runs=3, test_model_class=test_model_class
)
self.results.append(result)

return self.results

def print_summary(self):
"""Print comprehensive performance analysis"""
print("\n📈 PERFORMANCE BENCHMARK RESULTS")
print("=" * 80)

print(
f"{'Agents':<8} {'Sequential':<12} {'Parallel':<12} {'Speedup':<10} {'Efficiency':<12}"
)
print("-" * 80)

for result in self.results:
n_agents = result["n_agents"]
seq_time = result["sequential_time"]
par_time = result["parallel_time"]
speedup = result["speedup"]
efficiency = speedup / n_agents if speedup != float("inf") else 0

print(
f"{n_agents:<8} {seq_time:<12.2f} {par_time:<12.2f} "
f"{speedup:<10.2f}x {efficiency:<12.4f}"
)

print("\n🔍 Performance Analysis:")

# Check scaling characteristics
if len(self.results) >= 3:
first_result = self.results[0]
last_result = self.results[-1]

seq_scaling = last_result["per_agent_seq"] / first_result["per_agent_seq"]
par_scaling = last_result["per_agent_par"] / first_result["per_agent_par"]

print(f"Sequential scaling factor: {seq_scaling:.2f}x (1.0 = ideal)")
print(f"Parallel scaling factor: {par_scaling:.2f}x (1.0 = ideal)")

# Evaluate sequential scaling
if seq_scaling > 2.0:
print("⚠️ SEQUENTIAL: Exponential scaling detected!")
elif seq_scaling > 1.5:
print("⚠️ SEQUENTIAL: Sub-linear scaling")
else:
print("✅ SEQUENTIAL: Perfect linear scaling")

# Evaluate parallel scaling
if par_scaling > 2.0:
print("⚠️ PARALLEL: Exponential scaling detected!")
elif par_scaling > 1.5:
print("⚠️ PARALLEL: Sub-linear scaling")
else:
print("✅ PARALLEL: Good linear scaling")

# Evaluate speedup
valid_speedups = [
r["speedup"] for r in self.results if r["speedup"] != float("inf")
]
if valid_speedups:
avg_speedup = statistics.mean(valid_speedups)
print(f"Average speedup: {avg_speedup:.2f}x")

if avg_speedup > 5.0:
print("🎉 EXCELLENT: Parallel provides outstanding speedup!")
elif avg_speedup > 3.0:
print("🎉 EXCELLENT: Parallel provides significant speedup!")
elif avg_speedup > 2.0:
print("✅ GOOD: Parallel provides moderate speedup")
elif avg_speedup > 1.5:
print("⚠️ MINIMAL: Parallel provides small speedup")
else:
print("❌ POOR: Parallel provides no speedup")

print("\n💡 Key Insights:")
print(" • Each agent simulates 10ms LLM API response time")
print(" • Parallel execution processes agents concurrently")
print(" • Speedup demonstrates effectiveness of optimizations")
print(" • Linear scaling confirms no performance bottlenecks")

print("\n📝 Notes:")
print(" • This benchmark tests parallel stepping infrastructure")
print(" • Real-world performance depends on actual API response times")
print(" • Results demonstrate performance optimizations work correctly")

def save_results(self, filename: str = "benchmark_results.csv"):
"""Save benchmark results to CSV file"""
if not self.results:
print("No results to save!")
return

# Save to results directory
results_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "results"
)
filepath = os.path.join(results_dir, filename)

# Ensure results directory exists
os.makedirs(results_dir, exist_ok=True)

with open(filepath, "w", newline="") as csvfile:
fieldnames = [
"n_agents",
"sequential_time",
"parallel_time",
"speedup",
"per_agent_seq",
"per_agent_par",
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

writer.writeheader()
for result in self.results:
writer.writerow(result)

print(f"💾 Results saved to {filepath}")
Loading