Skip to content

Commit f625cf2

Browse files
committed
feature: Update to use WS wrapper with auto reconnect
1 parent 40daeb7 commit f625cf2

13 files changed

Lines changed: 43 additions & 56 deletions

File tree

MODULE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ go_sdk.from_file(go_mod = "//:go.mod")
1313

1414
go_deps = use_extension("@gazelle//:extensions.bzl", "go_deps")
1515
go_deps.from_file(go_mod = "//:go.mod")
16-
use_repo(go_deps, "com_github_gin_gonic_gin", "com_github_google_uuid", "com_github_gorilla_websocket", "com_github_stretchr_testify", "in_gopkg_yaml_v3")
16+
use_repo(go_deps, "com_github_gin_gonic_gin", "com_github_google_uuid", "com_github_recws_org_recws", "com_github_stretchr_testify", "in_gopkg_yaml_v3")

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.24.3
55
require (
66
github.com/gin-gonic/gin v1.10.0
77
github.com/google/uuid v1.6.0
8-
github.com/gorilla/websocket v1.5.3
8+
github.com/recws-org/recws v1.4.0
99
github.com/stretchr/testify v1.10.0
1010
gopkg.in/yaml.v3 v3.0.1
1111
)
@@ -21,6 +21,8 @@ require (
2121
github.com/go-playground/universal-translator v0.18.1 // indirect
2222
github.com/go-playground/validator/v10 v10.26.0 // indirect
2323
github.com/goccy/go-json v0.10.5 // indirect
24+
github.com/gorilla/websocket v1.5.3 // indirect
25+
github.com/jpillora/backoff v1.0.0 // indirect
2426
github.com/json-iterator/go v1.1.12 // indirect
2527
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
2628
github.com/leodido/go-urn v1.4.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
3232
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
3333
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
3434
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
35+
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
36+
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
3537
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
3638
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
3739
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -51,6 +53,8 @@ github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0
5153
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
5254
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
5355
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
56+
github.com/recws-org/recws v1.4.0 h1:y9LLddtAicjejikNZXiaY9DQjIwcAQ82acd1XU6n0lU=
57+
github.com/recws-org/recws v1.4.0/go.mod h1:7+NQkTmBdU98VSzkzq9/P7+X0xExioUVBx9OeRKQIkk=
5458
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5559
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
5660
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=

pkg/connector/binance/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ go_library(
1010
visibility = ["//visibility:public"],
1111
deps = [
1212
"//pkg/connector",
13-
"@com_github_gorilla_websocket//:websocket",
13+
"@com_github_recws_org_recws//:recws",
1414
],
1515
)

pkg/connector/binance/connector.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import (
66
"fmt"
77
"log/slog"
88
"strings"
9+
"time"
910

10-
"github.com/gorilla/websocket"
1111
"github.com/nbitslabs/nOracle/pkg/connector"
12+
"github.com/recws-org/recws"
1213
)
1314

1415
func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.ExchangeConnector, error) {
@@ -26,10 +27,10 @@ func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.
2627

2728
wsUrlWithChannels := fmt.Sprintf("%s/stream?streams=%s", wsUrl, strings.Join(channels, "/"))
2829

29-
ws, _, err := websocket.DefaultDialer.Dial(wsUrlWithChannels, nil)
30-
if err != nil {
31-
return nil, err
30+
ws := &recws.RecConn{
31+
KeepAliveTimeout: 10 * time.Second,
3232
}
33+
ws.Dial(wsUrlWithChannels, nil)
3334

3435
return &Connector{
3536
ctx: ctx,
@@ -43,7 +44,7 @@ func (c *Connector) Close() error {
4344
c.ctx.Done()
4445
}
4546
if c.ws != nil {
46-
return c.ws.Close()
47+
c.ws.Close()
4748
}
4849

4950
return nil

pkg/connector/binance/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"context"
55
"math/big"
66

7-
"github.com/gorilla/websocket"
7+
"github.com/recws-org/recws"
88
)
99

1010
type Connector struct {
1111
ctx context.Context
1212
pairs []string
1313

14-
ws *websocket.Conn
14+
ws *recws.RecConn
1515
}
1616

1717
const Name = "binance"

pkg/connector/bybit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ go_library(
1111
deps = [
1212
"//pkg/connector",
1313
"@com_github_google_uuid//:uuid",
14-
"@com_github_gorilla_websocket//:websocket",
14+
"@com_github_recws_org_recws//:recws",
1515
],
1616
)

pkg/connector/bybit/connector.go

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ import (
99
"time"
1010

1111
"github.com/google/uuid"
12-
"github.com/gorilla/websocket"
1312
"github.com/nbitslabs/nOracle/pkg/connector"
13+
"github.com/recws-org/recws"
1414
)
1515

1616
const Name = "bybit"
1717

1818
type Connector struct {
1919
ctx context.Context
2020
pairs []string
21-
ws *websocket.Conn
21+
ws *recws.RecConn
2222
}
2323

2424
func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.ExchangeConnector, error) {
@@ -31,10 +31,10 @@ func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.
3131

3232
wsUrlWithChannels := fmt.Sprintf("%s/v5/public/spot", wsUrl)
3333

34-
ws, _, err := websocket.DefaultDialer.Dial(wsUrlWithChannels, nil)
35-
if err != nil {
36-
return nil, err
34+
ws := &recws.RecConn{
35+
KeepAliveTimeout: 10 * time.Second,
3736
}
37+
ws.Dial(wsUrlWithChannels, nil)
3838

3939
args := make([]string, 0, len(pairs))
4040
for _, pair := range pairs {
@@ -51,23 +51,19 @@ func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.
5151
return nil, err
5252
}
5353

54-
connector := &Connector{
54+
return &Connector{
5555
ctx: ctx,
5656
pairs: pairs,
5757
ws: ws,
58-
}
59-
60-
// We send heart beat every 10 seconds
61-
connector.sendHeartbeat()
62-
return connector, nil
58+
}, nil
6359
}
6460

6561
func (c *Connector) Close() error {
6662
if c.ctx != nil {
6763
c.ctx.Done()
6864
}
6965
if c.ws != nil {
70-
return c.ws.Close()
66+
c.ws.Close()
7167
}
7268

7369
return nil
@@ -112,20 +108,3 @@ func (c *Connector) Name() string {
112108
func (c *Connector) Tickers() []string {
113109
return c.pairs
114110
}
115-
116-
func (c *Connector) sendHeartbeat() {
117-
req := SubscriptionMessage{
118-
Op: "ping",
119-
ReqId: "100001",
120-
}
121-
122-
// We send heart beat every 10 seconds
123-
go func() {
124-
for {
125-
time.Sleep(10 * time.Second)
126-
if err := c.ws.WriteJSON(req); err != nil {
127-
slog.Warn("error sending heartbeat", "error", err, "exchange", Name)
128-
}
129-
}
130-
}()
131-
}

pkg/connector/coinbase/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ go_library(
1111
deps = [
1212
"//pkg/connector",
1313
"//pkg/utils/ticker",
14-
"@com_github_gorilla_websocket//:websocket",
14+
"@com_github_recws_org_recws//:recws",
1515
],
1616
)

pkg/connector/coinbase/connector.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ import (
55
"encoding/json"
66
"fmt"
77
"log/slog"
8+
"time"
89

9-
"github.com/gorilla/websocket"
1010
"github.com/nbitslabs/nOracle/pkg/connector"
1111
"github.com/nbitslabs/nOracle/pkg/utils/ticker"
12+
"github.com/recws-org/recws"
1213
)
1314

1415
const Name = "coinbase"
1516

1617
type Connector struct {
1718
ctx context.Context
1819
pairs []string
19-
ws *websocket.Conn
20+
ws *recws.RecConn
2021
}
2122

2223
func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.ExchangeConnector, error) {
@@ -32,10 +33,10 @@ func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.
3233
Channels: []Channel{{Name: "ticker", ProductIds: pairs}},
3334
}
3435

35-
ws, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
36-
if err != nil {
37-
return nil, err
36+
ws := &recws.RecConn{
37+
KeepAliveTimeout: 10 * time.Second,
3838
}
39+
ws.Dial(wsUrl, nil)
3940

4041
if err := ws.WriteJSON(req); err != nil {
4142
return nil, err
@@ -53,7 +54,7 @@ func (c *Connector) Close() error {
5354
c.ctx.Done()
5455
}
5556
if c.ws != nil {
56-
return c.ws.Close()
57+
c.ws.Close()
5758
}
5859
return nil
5960
}

0 commit comments

Comments
 (0)