Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test: build
go test ./... -count=1 --race --timeout=5s
go test ./... -count=1 --race --timeout=30s

proto:
protoc --go_out=. --go-vtproto_out=. --go_opt=paths=source_relative --proto_path=. actor/actor.proto
Expand All @@ -15,6 +15,8 @@ build:
go build -o bin/metrics examples/metrics/main.go
go build -o bin/chatserver examples/chat/server/main.go
go build -o bin/chatclient examples/chat/client/main.go
go build -o bin/natsserver examples/nats/server/main.go
go build -o bin/natsclient examples/nats/client/main.go
go build -o bin/cluster_member_1 examples/cluster/member_1/main.go
go build -o bin/cluster_member_2 examples/cluster/member_2/main.go

Expand All @@ -24,4 +26,10 @@ bench:
bench-profile:
go test -bench='^BenchmarkHollywood$$' -run=NONE -cpuprofile cpu.prof -memprofile mem.prof ./_bench

bench-nats:
go test -bench='^BenchmarkHollywoodNATS$$' -run=NONE ./_bench

bench-local:
go test -bench='^BenchmarkHollywoodLocal$$' -run=NONE ./_bench

.PHONY: proto
106 changes: 97 additions & 9 deletions _bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/remote"
nserver "github.com/nats-io/nats-server/v2/server"
)

//go:generate protoc --proto_path=. --go_out=. --go_opt=paths=source_relative message.proto
Expand Down Expand Up @@ -67,6 +68,8 @@ type Benchmark struct {
actorsPerEngine int
senders int
engines []*Engine
remotes []actor.Remoter
newRemoter func(int) actor.Remoter
}

func (b *Benchmark) randomEngine() *Engine {
Expand All @@ -88,22 +91,25 @@ func (e *Engine) randomTargetEngine() *Engine {
return e.targetEngines[rand.Intn(len(e.targetEngines))]
}

func newBenchmark(engineCount, actorsPerEngine, senders int) *Benchmark {
func newBenchmark(engineCount, actorsPerEngine, senders int, remoterFactory func(int) actor.Remoter) *Benchmark {
b := &Benchmark{
engineCount: engineCount,
actorsPerEngine: actorsPerEngine,
engines: make([]*Engine, engineCount),
remotes: make([]actor.Remoter, engineCount),
senders: senders,
newRemoter: remoterFactory,
}
return b
}
func (b *Benchmark) spawnEngines() error {
for i := 0; i < b.engineCount; i++ {
r := remote.New(fmt.Sprintf("localhost:%d", 4000+i), remote.NewConfig())
r := b.newRemoter(i)
e, err := actor.NewEngine(actor.NewEngineConfig().WithRemote(r))
if err != nil {
return fmt.Errorf("failed to create engine: %w", err)
}
b.remotes[i] = r
// spawn the monitor
b.engines[i] = &Engine{
engineID: i,
Expand Down Expand Up @@ -136,6 +142,16 @@ func (b *Benchmark) spawnActors() error {
fmt.Printf("spawned %d actors per engine\n", b.actorsPerEngine)
return nil
}

func (b *Benchmark) stopRemotes() {
for i := range b.remotes {
if b.remotes[i] == nil {
continue
}
b.remotes[i].Stop().Wait()
}
}

func (b *Benchmark) sendMessages(d time.Duration) error {
wg := sync.WaitGroup{}
wg.Add(b.senders)
Expand All @@ -146,10 +162,16 @@ func (b *Benchmark) sendMessages(d time.Duration) error {
for time.Now().Before(deadline) {
// pick a random engine to send from
engine := b.randomEngine()
// pick a random target engine:
targetEngine := engine.randomTargetEngine()
// pick a random target actor from the engine
targetActor := targetEngine.randomActor()
var targetActor *actor.PID
if len(engine.targetEngines) > 0 {
// pick a random target engine:
targetEngine := engine.randomTargetEngine()
// pick a random target actor from the target engine
targetActor = targetEngine.randomActor()
} else {
// no remote target engines (local benchmark mode)
targetActor = engine.randomActor()
}
// send the message
engine.engine.Send(targetActor, &Message{})
sendCount.Add(1)
Expand All @@ -165,13 +187,22 @@ func (b *Benchmark) sendMessages(d time.Duration) error {
return nil
}

func benchmark() error {
func resetStats() {
receiveCount.Store(0)
sendCount.Store(0)
deadLetters.Store(0)
}

func runBenchmark(remoterFactory func(int) actor.Remoter) error {
const (
engines = 10
actorsPerEngine = 2000
senders = 20
duration = time.Second * 10
)
engines := 10
if remoterFactory == nil {
engines = 1
}

if runtime.GOMAXPROCS(runtime.NumCPU()) == 1 {
return errors.New("GOMAXPROCS must be greater than 1")
Expand All @@ -180,12 +211,14 @@ func benchmark() error {
Level: slog.LevelError,
}))
slog.SetDefault(lh)
resetStats()

benchmark := newBenchmark(engines, actorsPerEngine, senders)
benchmark := newBenchmark(engines, actorsPerEngine, senders, remoterFactory)
err := benchmark.spawnEngines()
if err != nil {
return fmt.Errorf("failed to spawn engines: %w", err)
}
defer benchmark.stopRemotes()
err = benchmark.spawnActors()
if err != nil {
return fmt.Errorf("failed to spawn actors: %w", err)
Expand Down Expand Up @@ -215,6 +248,61 @@ func benchmark() error {
return nil
}

func benchmark() error {
return runBenchmark(func(i int) actor.Remoter {
return remote.New(fmt.Sprintf("localhost:%d", 4000+i), remote.NewConfig())
})
}

func benchmarkNATS() error {
embedded, err := startEmbeddedNATSServer()
if err != nil {
return err
}
defer embedded.Close()

return runBenchmark(func(i int) actor.Remoter {
return remote.NewNats(
fmt.Sprintf("nats-node-%d", i),
remote.NatsConfig{}.WithURL(embedded.ClientURL()),
)
})
}

func benchmarkLocal() error {
return runBenchmark(nil)
}

type embeddedNATSServer struct {
srv *nserver.Server
}

func startEmbeddedNATSServer() (*embeddedNATSServer, error) {
opts := &nserver.Options{
Host: "127.0.0.1",
Port: -1,
}
srv, err := nserver.NewServer(opts)
if err != nil {
return nil, fmt.Errorf("nserver.NewServer: %w", err)
}
go srv.Start()
if !srv.ReadyForConnections(5 * time.Second) {
srv.Shutdown()
return nil, errors.New("embedded NATS server did not become ready")
}
return &embeddedNATSServer{srv: srv}, nil
}

func (s *embeddedNATSServer) ClientURL() string {
return s.srv.ClientURL()
}

func (s *embeddedNATSServer) Close() {
s.srv.Shutdown()
s.srv.WaitForShutdown()
}

func main() {
err := benchmark()
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions _bench/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ func BenchmarkHollywood(b *testing.B) {
}
}

func BenchmarkHollywoodNATS(b *testing.B) {
err := benchmarkNATS()
if err != nil {
b.Fatal(err)
}
}

func BenchmarkHollywoodLocal(b *testing.B) {
err := benchmarkLocal()
if err != nil {
b.Fatal(err)
}
}

/*
func Benchmark_Latency(b *testing.B) {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})))
Expand Down
30 changes: 30 additions & 0 deletions examples/nats/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"log/slog"
"os"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/examples/remote/msg"
"github.com/anthdm/hollywood/remote"
)

func main() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

// Uses NATS default URL (nats://127.0.0.1:4222) when no URL is configured.
r := remote.NewNats("nats-client-node", remote.NatsConfig{})
e, err := actor.NewEngine(actor.NewEngineConfig().WithRemote(r))
if err != nil {
panic(err)
}

serverPID := actor.NewPID("nats-server-node", "server/primary")
// The server will be started with id "primary" on node "nats-server-node".
for {
e.Send(serverPID, &msg.Message{Data: "hello over NATS!"})
slog.Debug("sent message", "to", serverPID.String())
time.Sleep(time.Second)
}
}
46 changes: 46 additions & 0 deletions examples/nats/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"fmt"
"log/slog"
"os"
"reflect"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/examples/remote/msg"
"github.com/anthdm/hollywood/remote"
)

type server struct{}

func newServer() actor.Receiver {
return &server{}
}

func (s *server) Receive(ctx *actor.Context) {
switch m := ctx.Message().(type) {
case actor.Started:
slog.Info("nats server started")
fmt.Println("nats server has started")
case *actor.PID:
slog.Info("nats server got pid", "pid", m)
case *msg.Message:
slog.Info("nats server got message", "msg", m)
default:
slog.Warn("nats server got unknown message", "msg", m, "type", reflect.TypeOf(m).String())
}
}

func main() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

// Uses NATS default URL (nats://127.0.0.1:4222) when no URL is configured.
r := remote.NewNats("nats-server-node", remote.NatsConfig{})
e, err := actor.NewEngine(actor.NewEngineConfig().WithRemote(r))
if err != nil {
panic(err)
}

e.Spawn(newServer, "server", actor.WithID("primary"))
select {}
}
Loading