-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
113 lines (94 loc) · 2.86 KB
/
main.go
File metadata and controls
113 lines (94 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/fDero/keva/cluster"
"github.com/fDero/keva/core"
"github.com/fDero/keva/history"
"github.com/fDero/keva/misc"
"github.com/urfave/cli/v2"
)
var App = &cli.App{
Name: "keva",
Usage: "A distributed fault-tolerant key-value storage system",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config-file",
Usage: "TOML encoded file with cluster configuration",
Required: true,
},
&cli.StringFlag{
Name: "working-directory",
Usage: "Directory where internal system state can be stored for persistence",
Required: true,
},
&cli.StringFlag{
Name: "node-identity",
Usage: "Name that uniquely identifies this node in the cluster",
Value: misc.FailOnError(os.Hostname()),
Required: false,
},
},
Action: executeCommand,
}
func executeCommand(c *cli.Context) error {
var cluster_nodes []cluster.ClusterNode
var errors [5]error
config_file := c.String("config-file")
workidr := c.String("working-directory")
self_identity := c.String("node-identity")
lockfile := filepath.Join(workidr, "lock.json")
errors[0] = misc.PurgeUnusedLockFiles(lockfile)
errors[1] = misc.CreateLockFile(lockfile)
cluster_nodes, errors[2] = cluster.LoadClusterConfig(config_file)
self_config_ptr, other_nodes := cluster.SplitClusterNodes(self_identity, cluster_nodes)
if self_config_ptr == nil {
return fmt.Errorf("node identity '%s' not found in cluster configuration", self_identity)
}
if err := misc.FirstOfManyErrorsOrNone(errors[:3]); err != nil {
return fmt.Errorf("failed to initialize cluster node: %w", err)
}
var history_file history.HistoryFile
pm := misc.NewDiskPersistenceHandler(workidr, "history.dat")
self_config := *self_config_ptr
default_fallback_header := history.GetDefaultHistoryFileHeader(0, 0)
history_file, errors[4] = history.NewHistoryFile(pm, default_fallback_header)
if err := misc.FirstOfManyErrorsOrNone(errors[:5]); err != nil {
return fmt.Errorf("failed to initialize cluster node: %w", err)
}
global_mutex := &sync.Mutex{}
storage := core.NewStorageSettings()
for history_log := range history_file.Iterate() {
event := core.DecodeEvent(history_log)
storage.ProcessEvent(event)
}
raft_settings := cluster.NewRaftSettings(
other_nodes,
self_config,
global_mutex,
history_file.GetLastEventID(),
misc.ProcessingPipeline(
history_file.AppendEvent,
core.DecodeAndForward(storage.ProcessEvent),
),
history_file.GetEventByID,
)
server_settings := core.NewServerSettings(
global_mutex,
raft_settings.GetLeaderDescriptor,
storage.FetchRecord,
)
go raft_settings.StartClusterEventLoop()
go raft_settings.StartClusterInternalServer(self_config.KevaPort)
go server_settings.StartUserAPIServer(self_config.UserPort)
select {}
}
func main() {
err := App.Run(os.Args)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}