-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathws.go
More file actions
136 lines (120 loc) · 3.25 KB
/
ws.go
File metadata and controls
136 lines (120 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package chadango
import (
"context"
"time"
"golang.org/x/net/websocket"
)
// WebSocket represents a WebSocket connection.
// It implements `golang.org/x/net/websocket` under the hood and wraps it into a channel,
// allowing it to be select-able along with other channels.
type WebSocket struct {
Connected bool // Connected indicates whether the WebSocket connection is currently active.
Events chan string // Events is a channel for receiving WebSocket events and messages.
OnError func(error) // OnError is a callback function that will be called in case of an error during WebSocket operation.
url string // url is the WebSocket server URL.
client *websocket.Conn // client is the underlying WebSocket connection.
context context.Context // context is the context used for managing the WebSocket connection's lifecycle.
cancelCtx context.CancelFunc // cancelFunc is the function to cancel the WebSocket connection's lifecycle context.
}
// Connect establishes a WebSocket connection to the specified URL.
//
// Args:
// - url: The WebSocket server URL.
//
// Returns:
// - error: An error if the connection fails.
func (w *WebSocket) Connect(url string) (err error) {
if w.Connected {
return
}
w.url = url
w.client, err = websocket.Dial(url, "", WEBSOCKET_ORIGIN)
if err != nil {
return err
}
w.Connected = true
w.Events = make(chan string, EVENT_BUFFER_SIZE)
return
}
// Close closes the WebSocket connection.
func (w *WebSocket) Close() {
if w.Connected {
w.Connected = false
if w.cancelCtx != nil {
w.cancelCtx()
}
w.client.Close()
}
}
// Sustain starts pumping events and keeps the WebSocket connection alive.
//
// Args:
// - ctx: The context used for managing the WebSocket connection's lifecycle.
func (w *WebSocket) Sustain(ctx context.Context) {
w.context, w.cancelCtx = context.WithCancel(ctx)
go w.pumpEvent()
go w.keepAlive()
}
// pumpEvent pumps incoming events to the Events channel.
func (w *WebSocket) pumpEvent() {
defer func() {
w.Close()
close(w.Events)
}()
var msg string
var err error
for {
if msg, err = w.Recv(); err != nil {
if w.OnError != nil {
w.OnError(err)
}
return
}
w.Events <- msg
}
}
// keepAlive sends periodic ping messages to keep the WebSocket connection alive.
func (w *WebSocket) keepAlive() {
ticker := time.NewTicker(PING_INTERVAL)
defer ticker.Stop()
// This is added as a precaution in case the parent context is canceled before calling `w.Close()`.
defer w.Close()
for {
select {
case <-ticker.C:
if w.Send("\r\n") != nil {
return
}
case <-w.context.Done():
return
}
}
}
// Send sends a message over the WebSocket connection.
//
// Args:
// - msg: The message to send.
//
// Returns:
// - error: An error if the sending fails.
func (w *WebSocket) Send(msg string) (err error) {
if w.Connected {
err = websocket.Message.Send(w.client, msg)
} else {
err = ErrNotConnected
}
return
}
// Recv receives a message from the WebSocket connection.
//
// Returns:
// - string: The received message.
// - error: An error if the receiving fails.
func (w *WebSocket) Recv() (msg string, err error) {
if w.Connected {
err = websocket.Message.Receive(w.client, &msg)
} else {
err = ErrNotConnected
}
return
}