Библиотека выбора лидера в кластере.
Все ноды при старте имеют статус follower.
При старте все ноды проверяют статусы всех нод (инициализация нод),
если нет лидера запускается механизм голосования.
Нода leader выбирается только большинством нод.
Нода может иметь статус leader, candidate или follower.
Если у лидера осталось менее половины доступных нод, то он переходит в статус follower
и запускается механизм голосования.
certs:
ca: certs/ca-cert.pem
server:
cert: certs/server-cert.pem
key: certs/server-key.pem
nodes:
1:
addr: 127.0.0.1
port: 50001
2:
addr: 127.0.0.1
port: 50002
3:
addr: 127.0.0.1
port: 50003package main
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/NovikovRoman/leadelect/node"
"gopkg.in/yaml.v3"
)
type config struct {
Certs struct {
Ca string `yaml:"ca"`
Server struct {
Cert string `yaml:"cert"`
Key string `yaml:"key"`
} `yaml:"server"`
}
Nodes map[string]struct {
Addr string `yaml:"addr"`
Port string `yaml:"port"`
} `yaml:"nodes"`
}
func main() {
b, err := os.ReadFile("cfg.yaml")
if err != nil {
slog.Error(fmt.Sprintf("failed to read configuration file: %v", err))
os.Exit(1)
}
var cfg config
if err = yaml.Unmarshal(b, &cfg); err != nil {
slog.Error(fmt.Sprintf("failed to parse configuration file: %v", err))
os.Exit(1)
}
// Ensure node ID is provided as an argument
if len(os.Args) < 2 {
slog.Error("node ID argument missing")
os.Exit(1)
}
currentNodeID := os.Args[1]
cfgCurrNode, ok := cfg.Nodes[os.Args[1]]
if !ok {
slog.Error(fmt.Sprintf("node %s not found in configuration", currentNodeID))
os.Exit(1)
}
opts := []node.NodeOpt{
node.ClientTimeout(time.Second * 10),
node.HeartbeatTimeout(time.Second * 3),
node.CheckElectionTimeout(time.Second * 10),
node.WithLogger(nil), // default slog
}
// with custom slog:
// var buf bytes.Buffer
// w := bufio.NewWriter(&buf)
// customSlog := slog.NewTextHandler(w, nil)
// opts = append(opts, node.WithLogger(node.NewLogger(customSlog)))
port, _ := strconv.ParseInt(cfgCurrNode.Port, 10, 64)
currNode := node.New(currentNodeID, cfgCurrNode.Addr, int(port), opts...)
// Setup TLS if specified in configuration
if cfg.Certs.Ca != "" {
if err = currNode.ClientTLS(cfg.Certs.Ca, cfgCurrNode.Addr); err != nil {
slog.Error(fmt.Sprintf("failed to set up client TLS %v", err))
os.Exit(1)
}
}
if cfg.Certs.Server.Cert != "" {
if err = currNode.ServerTLS(cfg.Certs.Server.Cert, cfg.Certs.Server.Key); err != nil {
slog.Error(fmt.Sprintf("failed to set up server TLS %v", err))
os.Exit(1)
}
}
for id, v := range cfg.Nodes {
if id == currentNodeID {
continue
}
p, err := strconv.Atoi(v.Port)
if err != nil {
slog.Warn(fmt.Sprintf("invalid port %d for node %s, skipping", p, id))
continue
}
currNode.AddNode(node.New(id, v.Addr, p))
}
ctx, cancel := context.WithCancel(context.Background())
// Start the node
go currNode.Run(ctx)
go func() {
for {
// …
// your code
// …
// this code is an example
time.Sleep(time.Second * 10)
fmt.Println("Node status", currNode.Status())
}
}()
// Handle graceful shutdown
shutdown := make(chan bool)
defer close(shutdown)
interrupt := make(chan os.Signal, 1)
defer close(interrupt)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
go func() {
<-interrupt
cancel()
log.Println("Shutting down...")
// time to complete the process
time.Sleep(time.Second * 3)
shutdown <- true
}()
<-shutdown
log.Println("Completed")
}Сборка:
go build -o appСгенерировать сертификаты:
./gen_cert.shЗапуск в разных консолях:
./app 1
./app 2
./app 3protoc --go-grpc_out=. --go_out=. */*.protogo test ./node