Skip to content

Commit 645d29b

Browse files
authored
Merge pull request #24 from nightowlnerd/feat/parallel-verify
feat(verify): add parallel verification for >5 servers
2 parents 69a325c + 0673a81 commit 645d29b

1 file changed

Lines changed: 72 additions & 4 deletions

File tree

verify.go

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ import (
77
"io"
88
"os/exec"
99
"strings"
10+
"sync"
1011
"time"
1112
)
1213

14+
const VerifyWorkers = 10
15+
1316
type Verifier interface {
1417
Verify(ctx context.Context, ips []string) []string
1518
Name() string
@@ -47,12 +50,23 @@ func (v *SlipstreamVerifier) Verify(ctx context.Context, ips []string) []string
4750
tickCtx, stopTick := context.WithCancel(ctx)
4851
go v.tick(tickCtx, prog)
4952

53+
var verified []string
54+
if len(ips) <= 5 {
55+
verified = v.sequential(ctx, ips, prog)
56+
} else {
57+
verified = v.parallel(ctx, ips, prog)
58+
}
59+
60+
stopTick()
61+
v.summary(prog, len(verified), len(ips))
62+
return verified
63+
}
64+
65+
func (v *SlipstreamVerifier) sequential(ctx context.Context, ips []string, prog *Progress) []string {
5066
var verified []string
5167
for _, ip := range ips {
5268
select {
5369
case <-ctx.Done():
54-
stopTick()
55-
v.summary(prog, len(verified), len(ips))
5670
return verified
5771
default:
5872
}
@@ -62,9 +76,63 @@ func (v *SlipstreamVerifier) Verify(ctx context.Context, ips []string) []string
6276
verified = append(verified, ip)
6377
}
6478
}
79+
return verified
80+
}
6581

66-
stopTick()
67-
v.summary(prog, len(verified), len(ips))
82+
func (v *SlipstreamVerifier) parallel(ctx context.Context, ips []string, prog *Progress) []string {
83+
workers := min(len(ips), VerifyWorkers)
84+
85+
ipChan := make(chan string, len(ips))
86+
resultChan := make(chan string, workers)
87+
88+
go func() {
89+
defer close(ipChan)
90+
for _, ip := range ips {
91+
select {
92+
case ipChan <- ip:
93+
case <-ctx.Done():
94+
return
95+
}
96+
}
97+
}()
98+
99+
var wg sync.WaitGroup
100+
for i := 0; i < workers; i++ {
101+
wg.Add(1)
102+
go func() {
103+
defer wg.Done()
104+
for {
105+
select {
106+
case <-ctx.Done():
107+
return
108+
case ip, ok := <-ipChan:
109+
if !ok {
110+
return
111+
}
112+
passed := v.testIP(ip)
113+
prog.Increment()
114+
if passed {
115+
prog.Success()
116+
select {
117+
case resultChan <- ip:
118+
case <-ctx.Done():
119+
return
120+
}
121+
}
122+
}
123+
}
124+
}()
125+
}
126+
127+
go func() {
128+
wg.Wait()
129+
close(resultChan)
130+
}()
131+
132+
var verified []string
133+
for ip := range resultChan {
134+
verified = append(verified, ip)
135+
}
68136
return verified
69137
}
70138

0 commit comments

Comments
 (0)