-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshard.go
More file actions
90 lines (77 loc) · 1.78 KB
/
shard.go
File metadata and controls
90 lines (77 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package shard
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/prxssh/shard/api"
"github.com/prxssh/shard/internal/master"
"github.com/prxssh/shard/internal/worker"
)
func Run(cfg *Config) error {
if envAddr := os.Getenv("SHARD_MASTER_ADDR"); envAddr != "" {
cfg.MasterAddress = envAddr
}
if cfg.MasterAddress == "" {
return fmt.Errorf("master address is required (set via config SHARD_MASTER_ADDR)")
}
mode := strings.ToLower(os.Getenv("SHARD_MODE"))
if mode == "" {
cfg.Logger.Info("SHARD_MODE not set, defaulting to 'master'", nil)
mode = "master"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cfg.Logger.Info("shutting down", nil)
cancel()
}()
cfg.Logger.Info("starting", api.LogFields{
"mode": mode,
"master_addr": cfg.MasterAddress,
"output_dir": cfg.OutputDir,
})
switch mode {
case "master":
master, err := master.NewMaster(
&master.Config{
Address: cfg.MasterAddress,
WorkerMaxTasks: cfg.MaxConcurrency,
SplitSize: cfg.ChunkSize,
InputFiles: cfg.inputFiles,
},
cfg.Logger,
)
if err != nil {
return err
}
return master.Start(ctx)
case "worker":
worker, err := worker.NewWorker(
cfg.Mapper,
cfg.Reducer,
cfg.Partitioner,
cfg.Combiner,
cfg.Filesystem,
&worker.Config{
OutputDir: cfg.OutputDir,
InputPath: cfg.InputPath,
NumReducers: cfg.NumReducers,
MaxConcurrency: cfg.MaxConcurrency,
MasterAddr: cfg.MasterAddress,
},
cfg.Logger,
)
if err != nil {
return err
}
return worker.Start(ctx)
default:
return fmt.Errorf("unkown SHARD_MODE: %s", mode)
}
}