Skip to content

Commit 914cd2d

Browse files
committed
feature: Add Upbit Connector
1 parent f625cf2 commit 914cd2d

9 files changed

Lines changed: 223 additions & 0 deletions

File tree

config.example.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,8 @@ exchanges:
1919
symbols:
2020
- BTC-USDT
2121
- ETH-USDT
22+
- name: upbit
23+
url: wss://api.upbit.com/websocket/v1
24+
symbols:
25+
- USDT-BTC
26+
- USDT-ETH

pkg/config/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//pkg/connector/bybit",
1515
"//pkg/connector/coinbase",
1616
"//pkg/connector/okx",
17+
"//pkg/connector/upbit",
1718
"@in_gopkg_yaml_v3//:yaml_v3",
1819
],
1920
)

pkg/config/exchange.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/nbitslabs/nOracle/pkg/connector/bybit"
1010
"github.com/nbitslabs/nOracle/pkg/connector/coinbase"
1111
"github.com/nbitslabs/nOracle/pkg/connector/okx"
12+
"github.com/nbitslabs/nOracle/pkg/connector/upbit"
1213
)
1314

1415
func LoadConnector(ctx context.Context, exchange string, url string, symbols []string) (connector.ExchangeConnector, error) {
@@ -21,6 +22,8 @@ func LoadConnector(ctx context.Context, exchange string, url string, symbols []s
2122
return bybit.NewConnector(ctx, url, symbols)
2223
case "coinbase":
2324
return coinbase.NewConnector(ctx, url, symbols)
25+
case "upbit":
26+
return upbit.NewConnector(ctx, url, symbols)
2427
default:
2528
return nil, fmt.Errorf("unknown exchange: %s", exchange)
2629
}

pkg/connector/upbit/BUILD.bazel

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
load("@rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "upbit",
5+
srcs = [
6+
"connector.go",
7+
"types.go",
8+
],
9+
importpath = "github.com/nbitslabs/nOracle/pkg/connector/upbit",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//pkg/connector",
13+
"//pkg/utils/ticker",
14+
"@com_github_google_uuid//:uuid",
15+
"@com_github_recws_org_recws//:recws",
16+
],
17+
)

pkg/connector/upbit/connector.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package upbit
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log/slog"
8+
"math/big"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/nbitslabs/nOracle/pkg/connector"
13+
"github.com/nbitslabs/nOracle/pkg/utils/ticker"
14+
"github.com/recws-org/recws"
15+
)
16+
17+
func NewConnector(ctx context.Context, wsUrl string, pairs []string) (connector.ExchangeConnector, error) {
18+
if wsUrl == "" {
19+
return nil, fmt.Errorf("wsUrl is required")
20+
}
21+
if len(pairs) == 0 {
22+
return nil, fmt.Errorf("pairs are required")
23+
}
24+
25+
wsUrlWithChannels := fmt.Sprintf("%s/ws/v5/public", wsUrl)
26+
ws := &recws.RecConn{
27+
KeepAliveTimeout: 10 * time.Second,
28+
}
29+
ws.Dial(wsUrlWithChannels, nil)
30+
31+
req := []map[string]interface{}{
32+
{"ticket": uuid.NewString()},
33+
{"type": "ticker", "codes": pairs, "isOnlyRealtime": true},
34+
{"format": "DEFAULT"},
35+
}
36+
37+
if err := ws.WriteJSON(req); err != nil {
38+
return nil, err
39+
}
40+
41+
return &Connector{
42+
ctx: ctx,
43+
pairs: pairs,
44+
ws: ws,
45+
}, nil
46+
}
47+
48+
func (c *Connector) Close() error {
49+
if c.ctx != nil {
50+
c.ctx.Done()
51+
}
52+
if c.ws != nil {
53+
c.ws.Close()
54+
}
55+
56+
return nil
57+
}
58+
59+
func (c *Connector) StreamTickers(ctx context.Context, out chan<- connector.TickerUpdate) error {
60+
go func() {
61+
for {
62+
select {
63+
case <-ctx.Done():
64+
return
65+
default:
66+
_, message, err := c.ws.ReadMessage()
67+
if err != nil {
68+
slog.Warn("error reading message", "error", err, "exchange", Name)
69+
continue
70+
}
71+
72+
var tickerResponse TickerResponse
73+
if err := json.Unmarshal(message, &tickerResponse); err != nil {
74+
fmt.Println(string(message))
75+
slog.Warn("error unmarshalling message", "error", err, "exchange", Name)
76+
continue
77+
}
78+
79+
out <- connector.TickerUpdate{
80+
Exchange: Name,
81+
Symbol: ticker.UpbitToStandardTicker(tickerResponse.Code),
82+
Price: big.NewFloat(tickerResponse.TradePrice),
83+
Volume: big.NewFloat(tickerResponse.TradeVolume),
84+
Timestamp: tickerResponse.TradeTimestamp,
85+
}
86+
}
87+
}
88+
}()
89+
90+
return nil
91+
}
92+
93+
func (c *Connector) Name() string {
94+
return Name
95+
}
96+
97+
func (c *Connector) Tickers() []string {
98+
return c.pairs
99+
}

pkg/connector/upbit/types.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package upbit
2+
3+
import (
4+
"context"
5+
6+
"github.com/recws-org/recws"
7+
)
8+
9+
type Connector struct {
10+
ctx context.Context
11+
pairs []string
12+
13+
ws *recws.RecConn
14+
}
15+
16+
const Name = "upbit"
17+
18+
type TickerResponse struct {
19+
Type string `json:"type"`
20+
Code string `json:"code"`
21+
22+
TradePrice float64 `json:"trade_price"`
23+
TradeVolume float64 `json:"trade_volume"`
24+
TradeTimestamp int64 `json:"trade_timestamp"`
25+
26+
// Optional fields
27+
OpeningPrice float64 `json:"opening_price"`
28+
HighPrice float64 `json:"high_price"`
29+
LowPrice float64 `json:"low_price"`
30+
PrevClosingPrice float64 `json:"prev_closing_price"`
31+
AccTradePrice float64 `json:"acc_trade_price"`
32+
Change string `json:"change"`
33+
ChangePrice float64 `json:"change_price"`
34+
SignedChangePrice float64 `json:"signed_change_price"`
35+
ChangeRate float64 `json:"change_rate"`
36+
SignedChangeRate float64 `json:"signed_change_rate"`
37+
AskBid string `json:"ask_bid"`
38+
AccTradeVolume float64 `json:"acc_trade_volume"`
39+
TradeDate string `json:"trade_date"`
40+
TradeTime string `json:"trade_time"`
41+
AccAskVolume float64 `json:"acc_ask_volume"`
42+
AccBidVolume float64 `json:"acc_bid_volume"`
43+
Highest52WeekPrice float64 `json:"highest_52_week_price"`
44+
Highest52WeekDate string `json:"highest_52_week_date"`
45+
Lowest52WeekPrice float64 `json:"lowest_52_week_price"`
46+
Lowest52WeekDate string `json:"lowest_52_week_date"`
47+
MarketState string `json:"market_state"`
48+
IsTradingSuspended bool `json:"is_trading_suspended"`
49+
DelistingDate any `json:"delisting_date"`
50+
MarketWarning string `json:"market_warning"`
51+
Timestamp int64 `json:"timestamp"`
52+
AccTradePrice24H float64 `json:"acc_trade_price_24h"`
53+
AccTradeVolume24H float64 `json:"acc_trade_volume_24h"`
54+
StreamType string `json:"stream_type"`
55+
}

pkg/utils/ticker/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
"coinbase.go",
77
"okx.go",
88
"shared.go",
9+
"upbit.go",
910
],
1011
importpath = "github.com/nbitslabs/nOracle/pkg/utils/ticker",
1112
visibility = ["//visibility:public"],
@@ -16,6 +17,7 @@ go_test(
1617
srcs = [
1718
"coinbase_test.go",
1819
"okx_test.go",
20+
"upbit_test.go",
1921
],
2022
embed = [":ticker"],
2123
deps = ["@com_github_stretchr_testify//assert"],

pkg/utils/ticker/upbit.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package ticker
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
)
7+
8+
func UpbitToStandardTicker(upbitTicker string) string {
9+
// Upbit ticker format is like this: USDT-BTC
10+
// We need to convert it to the standard format: USDTBTC
11+
// We need to remove the dash and convert the uppercase to lowercase
12+
13+
parts := strings.Split(upbitTicker, "-")
14+
if len(parts) != 2 {
15+
// If the ticker is not in the format USDT-BTC, we return the same string
16+
return upbitTicker
17+
}
18+
19+
return fmt.Sprintf("%s%s", strings.ToUpper(parts[1]), strings.ToUpper(parts[0]))
20+
}

pkg/utils/ticker/upbit_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package ticker
2+
3+
import "testing"
4+
5+
func TestUpbitToStandardTicker(t *testing.T) {
6+
tests := []struct {
7+
upbitTicker string
8+
want string
9+
}{
10+
{"USDT-BTC", "BTCUSDT"},
11+
{"USDT-ETH", "ETHUSDT"},
12+
{"", ""},
13+
{"BTCUSD", "BTCUSD"},
14+
}
15+
16+
for _, test := range tests {
17+
if got := UpbitToStandardTicker(test.upbitTicker); got != test.want {
18+
t.Errorf("UpbitToStandardTicker(%s) = %s, want %s", test.upbitTicker, got, test.want)
19+
}
20+
}
21+
}

0 commit comments

Comments
 (0)