diff --git a/crypto/crypto.go b/crypto/crypto.go index ba150185..13a6d633 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -138,6 +138,7 @@ func toECDSA(d []byte, strict bool) (*ecdsa.PrivateKey, error) { return nil, fmt.Errorf("invalid private key, zero or negative") } + //lint:ignore SA1019 secp256k1 uses custom curve ops; crypto/ecdh does not support this curve priv.PublicKey.X, priv.PublicKey.Y = priv.PublicKey.Curve.ScalarBaseMult(d) if priv.PublicKey.X == nil { return nil, errors.New("invalid private key") diff --git a/crypto/ecies/ecies.go b/crypto/ecies/ecies.go index 9fc154ba..bb622023 100644 --- a/crypto/ecies/ecies.go +++ b/crypto/ecies/ecies.go @@ -127,6 +127,7 @@ func (prv *PrivateKey) GenerateShared(pub *PublicKey, skLen, macLen int) (sk []b return nil, ErrSharedKeyTooBig } + //lint:ignore SA1019 secp256k1 ECDH still relies on curve ScalarMult in this implementation x, _ := pub.Curve.ScalarMult(pub.X, pub.Y, prv.D.Bytes()) if x == nil { return nil, ErrSharedKeyIsPointAtInfinity @@ -337,6 +338,7 @@ func (prv *PrivateKey) Decrypt(c, s1, s2 []byte) (m []byte, err error) { err = ErrInvalidPublicKey return } + //lint:ignore SA1019 secp256k1 curve validation uses IsOnCurve and cannot use crypto/ecdh directly if !R.Curve.IsOnCurve(R.X, R.Y) { err = ErrInvalidCurve return diff --git a/go.mod b/go.mod index 9a06ac54..86a16c72 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/mholt/acmez v1.2.0 // indirect github.com/miekg/dns v1.1.57 // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect - github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 // indirect + github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect diff --git a/go.sum b/go.sum index 898128b7..f6c06381 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,8 @@ github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 h1:+qIP3OMrT7SN5kLnTcVEISPOMB/97RyAKTg1UWA738E= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= +github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741 h1:KPpdlQLZcHfTMQRi6bFQ7ogNO0ltFT4PmtwTLW4W+14= +github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/rpc/bridge.go b/rpc/bridge.go index 9f9669c3..b60dba8b 100644 --- a/rpc/bridge.go +++ b/rpc/bridge.go @@ -390,8 +390,10 @@ func validateRLPValue(k rlp.Kind, content []byte) error { return nil } -// recvMessageLoop infinite loop to read message from server -func (client *Client) recvMessageLoop() { +// recvMessageLoop infinite loop to read message from server. +// It uses the connection captured at start so Close() can safely +// close the connection without racing on client.s pointer updates. +func (client *Client) recvMessageLoop(ssl *SSL) { msgBuffer := make(chan edge.Message, 20) defer close(msgBuffer) @@ -402,16 +404,16 @@ func (client *Client) recvMessageLoop() { }() for { - if client.s == nil { - if !client.isClosed { + if ssl == nil { + if !client.isClientClosed() { client.Log().Info("Client connection closed prematurely.") client.Close() } return } - msg, err := client.s.readMessage() + msg, err := ssl.readMessage() if err != nil { - if !client.isClosed { + if !client.isClientClosed() { client.Log().Info("Client connection closed unexpectedly: %v", err) client.Close() } diff --git a/rpc/client.go b/rpc/client.go index e6fe35ef..87baa97c 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -72,11 +72,10 @@ type Client struct { // close event OnClose func() - isClosed bool - closing uint32 // set before Close() callback runs; allows GetNearestClient to exclude client earlier - srv *genserver.GenServer - timer *Timer - + closed uint32 + closing uint32 // set before Close() callback runs; allows GetNearestClient to exclude client earlier + srv *genserver.GenServer + timer *Timer portOpen2Handler atomic.Value } @@ -120,12 +119,25 @@ func NewClient(host string, clientMan *ClientManager, cfg *config.Config, pool * } func (client *Client) averageLatency() int64 { - return client.latencySum / client.latencyCount + latencyCount := atomic.LoadInt64(&client.latencyCount) + if latencyCount <= 0 { + return 0 + } + latencySum := atomic.LoadInt64(&client.latencySum) + return latencySum / latencyCount } func (client *Client) addLatencyMeasurement(latency time.Duration) { - client.latencySum += latency.Milliseconds() - client.latencyCount++ + atomic.AddInt64(&client.latencySum, latency.Milliseconds()) + atomic.AddInt64(&client.latencyCount, 1) +} + +func (client *Client) isClientClosed() bool { + return atomic.LoadUint32(&client.closed) == 1 +} + +func (client *Client) setClientClosed() { + atomic.StoreUint32(&client.closed, 1) } func (client *Client) doConnect() (err error) { @@ -200,28 +212,46 @@ func (client *Client) GetDeviceKey(ref string) string { } func (client *Client) waitResponse(call *Call) (res interface{}, err error) { - defer call.Clean(CLOSED) - defer client.srv.Cast(func() { client.cm.RemoveCallByID(call.id) }) - resp, ok := <-call.response - if !ok { - host, _ := client.Host() - err = CancelledError{host} - if call.sender != nil { - call.sender.remoteErr = io.EOF - call.sender.Close() + // Remove the call synchronously on exit. This avoids a race where a timed out + // call channel is closed before the call is removed from the manager. + defer client.cm.RemoveCallByID(call.id) + timeout := client.config.RemoteRPCTimeout + if timeout <= 0 { + timeout = client.localTimeout + } + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case resp, ok := <-call.response: + if !ok { + host, _ := client.Host() + err = CancelledError{host} + if call.sender != nil { + call.sender.remoteErr = io.EOF + call.sender.Close() + } + return } - return - } - if rpcError, ok := resp.(edge.Error); ok { - err = RPCError{rpcError} + if rpcError, ok := resp.(edge.Error); ok { + err = RPCError{rpcError} + if call.sender != nil { + call.sender.remoteErr = RPCError{rpcError} + call.sender.Close() + } + return + } + res = resp + return res, nil + case <-timer.C: + err = TimeoutError{Timeout: timeout} + client.Log().Debug("RPC timeout after %s (method=%s id=%d)", timeout, call.method, call.id) if call.sender != nil { - call.sender.remoteErr = RPCError{rpcError} + call.sender.remoteErr = err call.sender.Close() } - return + return nil, err } - res = resp - return res, nil } // RespondContext sends a message (a response) without expecting a response @@ -244,9 +274,13 @@ func (client *Client) RespondContext(requestID uint64, responseType string, meth func (client *Client) callTimeout(fun func()) error { // Long enough timeout to at least survive initial reconnect attempts if client == nil { - return fmt.Errorf("Client disconnected") + return errClientClosed + } + err := client.srv.CallTimeout(fun, 30*time.Second) + if err != nil && strings.Contains(err.Error(), "dead genserver") { + return errClientClosed } - return client.srv.CallTimeout(fun, 30*time.Second) + return err } // CastContext returns a response future after calling the rpc @@ -272,7 +306,7 @@ func (client *Client) CastContext(sender *ConnectedPort, method string, args ... func (client *Client) insertCall(call *Call) (err error) { timeout := client.callTimeout(func() { - if client.isClosed { + if client.isClientClosed() { err = errClientClosed return } @@ -578,10 +612,11 @@ func (client *Client) greet() error { // In case the server does not request a ticket, we will submit one after 10 seconds go func() { time.Sleep(10 * time.Second) - s := client.s - if s != nil && client.lastTicket == nil { - client.SubmitTicketForUsage(big.NewInt(int64(s.TotalBytes()))) - } + client.srv.Cast(func() { + if client.s != nil && client.lastTicket == nil { + client.SubmitTicketForUsage(big.NewInt(int64(client.s.TotalBytes()))) + } + }) }() return nil } @@ -589,7 +624,7 @@ func (client *Client) greet() error { // SubmitTicketForUsage submits a ticket covering at least minBytes. func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { client.srv.Cast(func() { - if client.bq == nil || client.isClosed || client.s == nil { + if client.bq == nil || client.isClientClosed() || client.s == nil { return } if minBytes == nil { @@ -610,8 +645,7 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { client.s.setTotalBytes(minUsage) } - var ticket *edge.DeviceTicket - ticket, err = client.newTicket() + ticket, err := client.newTicket() if err != nil { return } @@ -623,7 +657,7 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { client.lastTicket = ticket } }) - return + return nil } // HandleTicketRequest responds to a server ticket request. @@ -665,12 +699,16 @@ func (client *Client) SignTransaction(tx *edge.Transaction) (err error) { // NewTicket returns ticket func (client *Client) newTicket() (*edge.DeviceTicket, error) { - serverID, err := client.s.GetServerID() + ssl := client.s + if ssl == nil || ssl.Closed() { + return nil, errClientClosed + } + serverID, err := ssl.GetServerID() if err != nil { return nil, err } - total := client.s.TotalBytes() - client.s.UpdateCounter(total) + total := ssl.TotalBytes() + ssl.UpdateCounter(total) lvbn, lvbh := client.LastValid() ticket := &edge.DeviceTicket{ @@ -679,7 +717,7 @@ func (client *Client) newTicket() (*edge.DeviceTicket, error) { BlockNumber: lvbn, BlockHash: lvbh[:], FleetAddr: client.config.FleetAddr, - TotalConnections: big.NewInt(int64(client.s.TotalConnections())), + TotalConnections: big.NewInt(int64(ssl.TotalConnections())), TotalBytes: big.NewInt(int64(total)), LocalAddr: []byte{}, } @@ -699,7 +737,7 @@ func (client *Client) newTicket() (*edge.DeviceTicket, error) { if err := ticket.ValidateValues(); err != nil { return nil, err } - privKey, err := client.s.GetClientPrivateKey() + privKey, err := ssl.GetClientPrivateKey() if err != nil { return nil, err } @@ -731,6 +769,9 @@ func (client *Client) submitTicket(ticket *edge.DeviceTicket) error { if lastTicket, ok := resp.(edge.DeviceTicket); ok { if lastTicket.Err == edge.ErrTicketTooLow { ssl := client.s + if ssl == nil || ssl.Closed() { + return + } sid, _ := ssl.GetServerID() lastTicket.ServerID = sid lastTicket.FleetAddr = client.config.FleetAddr @@ -740,10 +781,11 @@ func (client *Client) submitTicket(ticket *edge.DeviceTicket) error { client.Log().Warn("received fake ticket.. last_ticket=%v", lastTicket) } else { client.Log().Debug("insufficient ticket, resending... last_ticket=%v", lastTicket) - ssl.totalConnections = lastTicket.TotalConnections.Uint64() + 1 - err = client.SubmitTicketForUsage(lastTicket.TotalBytes) - if err != nil { - client.Log().Error("failed to re-submit ticket: %v", err) + ssl.setTotalBytes(lastTicket.TotalBytes.Uint64() + 1024) + ssl.setTotalConnections(lastTicket.TotalConnections.Uint64() + 1) + retryErr := client.SubmitTicketForUsage(lastTicket.TotalBytes) + if retryErr != nil { + client.Log().Error("failed to re-submit ticket: %v", retryErr) } } } else if lastTicket.Err == edge.ErrTicketTooOld { @@ -1382,13 +1424,13 @@ func (client *Client) IsDeviceAllowlisted(fleetAddr Address, clientAddr Address) // Closed returns whether client had closed func (client *Client) Closed() bool { - return client.isClosed + return client.isClientClosed() } // Closing returns true if the client is closed or in the process of closing (flag set before Close callback runs). // Used by ClientManager to avoid returning a client that is about to be closed. func (client *Client) Closing() bool { - return atomic.LoadUint32(&client.closing) == 1 || client.isClosed + return atomic.LoadUint32(&client.closing) == 1 || client.isClientClosed() } // Close rpc client @@ -1399,11 +1441,11 @@ func (client *Client) Close() { } doCleanup := true timeout := client.callTimeout(func() { - if client.isClosed { + if client.isClientClosed() { doCleanup = false return } - client.isClosed = true + client.setClientClosed() // remove existing calls client.cm.RemoveCalls() if client.OnClose != nil { @@ -1411,13 +1453,24 @@ func (client *Client) Close() { } if client.s != nil { client.s.Close() - client.s = nil } }) if timeout == nil && !doCleanup { client.srv.Shutdown(0) return } + if timeout == errClientClosed && doCleanup { + // Genserver is already down; finish best-effort cleanup so callers + // don't block waiting for this client to terminate. + client.setClientClosed() + client.cm.RemoveCalls() + if client.OnClose != nil { + client.OnClose() + } + if client.s != nil { + client.s.Close() + } + } // remove open ports (best-effort even if callTimeout failed) if client.pool != nil { client.pool.ClosePorts(client) @@ -1429,28 +1482,30 @@ func (client *Client) Close() { func (client *Client) Start() { client.srv.Cast(func() { if err := client.doStart(); err != nil { - if !client.isClosed { + if !client.isClientClosed() { client.Log().Warn("Client connect failed: %v", err) } client.srv.Shutdown(0) + return } - }) - go func() { - if err := client.initialize(); err != nil { - if !client.isClosed { - client.Log().Warn("Client start failed: %v", err) - client.Close() + go func() { + if err := client.initialize(); err != nil { + if !client.isClientClosed() { + client.Log().Warn("Client start failed: %v", err) + client.Close() + } } - } - }() + }() + }) } func (client *Client) doStart() (err error) { if err = client.doConnect(); err != nil { return } - go client.recvMessageLoop() + ssl := client.s + go client.recvMessageLoop(ssl) client.cm.SendCallPtr = client.sendCall return } diff --git a/rpc/client_waitresponse_test.go b/rpc/client_waitresponse_test.go new file mode 100644 index 00000000..c47d5354 --- /dev/null +++ b/rpc/client_waitresponse_test.go @@ -0,0 +1,79 @@ +package rpc + +import ( + "testing" + "time" + + "github.com/diodechain/diode_client/config" +) + +func testClientConfig(t *testing.T, timeout time.Duration) *config.Config { + t.Helper() + + cfg := &config.Config{ + RemoteRPCTimeout: timeout, + } + logger, err := config.NewLogger(cfg) + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + cfg.Logger = &logger + return cfg +} + +func TestWaitResponseTimeoutWithLateClaimedResponseDoesNotPanic(t *testing.T) { + timeout := 10 * time.Millisecond + cfg := testClientConfig(t, timeout) + client := &Client{ + cm: NewCallManager(4), + config: cfg, + localTimeout: timeout, + } + + call := &Call{ + id: 42, + method: "late_response", + state: STARTED, + response: make(chan interface{}, 1), + } + client.cm.calls[call.id] = call + + releaseSend := make(chan struct{}) + panicCh := make(chan interface{}, 1) + errCh := make(chan error, 1) + + go func() { + c := client.cm.CallByID(call.id) + if c == nil { + errCh <- nil + return + } + + <-releaseSend + + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + errCh <- c.enqueueResponse("late") + }() + + _, err := client.waitResponse(call) + if _, ok := err.(TimeoutError); !ok { + t.Fatalf("expected TimeoutError, got %T (%v)", err, err) + } + + close(releaseSend) + + select { + case p := <-panicCh: + t.Fatalf("enqueueResponse panicked after timeout: %v", p) + case enqueueErr := <-errCh: + if enqueueErr != nil { + t.Fatalf("unexpected enqueue error: %v", enqueueErr) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for late response goroutine") + } +} diff --git a/rpc/datapool.go b/rpc/datapool.go index 8553a01d..58b14487 100644 --- a/rpc/datapool.go +++ b/rpc/datapool.go @@ -130,11 +130,8 @@ func (p *DataPool) GetCacheOrResolveBNS(deviceName string, client *Client) ([]Ad bns, cached := p.getCacheBNS(bnsKey) if cached { //check if cache is expired if so call updateCacheResolveBNS async. Expire duration is config.AppConfig.ResolveCacheTime - if !p.bnsCacheUpdatingFlag[bnsKey] && config.AppConfig.ResolveCacheTime > 0 { - if expireTime, ok := p.bnsCacheExpireItem[bnsKey]; ok && time.Now().After(expireTime) { - p.bnsCacheUpdatingFlag[bnsKey] = true - go p.updateCacheResolveBNS(deviceName, client) - } + if p.markCacheRefreshIfNeeded(bnsKey) { + go p.updateCacheResolveBNS(deviceName, client) } return bns, nil } @@ -154,11 +151,8 @@ func (p *DataPool) GetCacheOrResolvePeers(deviceName string, client *Client) ([] peers, cached := p.getCacheBNS(peerKey) if cached { //check if cache is expired if so call updateCacheResolvePeers async. Expire duration is config.AppConfig.ResolveCacheTime - if !p.bnsCacheUpdatingFlag[peerKey] && config.AppConfig.ResolveCacheTime > 0 { - if expireTime, ok := p.bnsCacheExpireItem[peerKey]; ok && time.Now().After(expireTime) { - p.bnsCacheUpdatingFlag[peerKey] = true - go p.updateCacheResolvePeers(deviceName, client) - } + if p.markCacheRefreshIfNeeded(peerKey) { + go p.updateCacheResolvePeers(deviceName, client) } return peers, nil } @@ -180,11 +174,8 @@ func (p *DataPool) GetCacheOrResolveAllPeersOfAddrs(addr Address, client *Client addrs := []Address{addr} if cached { //check if cache is expired if so call updateCacheResolveAllPeersOfAddrs async. Expire duration is config.AppConfig.ResolveCacheTime - if !p.bnsCacheUpdatingFlag[peerKey] && config.AppConfig.ResolveCacheTime > 0 { - if expireTime, ok := p.bnsCacheExpireItem[peerKey]; ok && time.Now().After(expireTime) { - p.bnsCacheUpdatingFlag[peerKey] = true - go p.updateCacheResolveAllPeersOfAddrs(addrs, client) - } + if p.markCacheRefreshIfNeeded(peerKey) { + go p.updateCacheResolveAllPeersOfAddrs(addrs, client) } return peers, nil @@ -203,10 +194,7 @@ func (p *DataPool) GetCacheOrResolveAllPeersOfAddrs(addr Address, client *Client func (p *DataPool) updateCacheResolveAllPeersOfAddrs(members []Address, client *Client) { peerKey := fmt.Sprintf("peers:%s", members[0].HexString()) peers := resolveAllPeersOfAddrs(members, client) - - p.SetCacheBNS(peerKey, peers) - p.bnsCacheExpireItem[peerKey] = time.Now().Add(config.AppConfig.ResolveCacheTime) - p.bnsCacheUpdatingFlag[peerKey] = false + p.storeCacheBNS(peerKey, peers) } func (p *DataPool) updateCacheResolvePeers(deviceName string, client *Client) { @@ -214,24 +202,22 @@ func (p *DataPool) updateCacheResolvePeers(deviceName string, client *Client) { var addr []Address bnsResult, err := p.GetCacheOrResolveBNS(deviceName, client) if err != nil { + p.setCacheUpdating(peerKey, false) return } addr = resolveAllPeersOfAddrs(bnsResult, client) - - p.SetCacheBNS(peerKey, addr) - p.bnsCacheExpireItem[peerKey] = time.Now().Add(config.AppConfig.ResolveCacheTime) - p.bnsCacheUpdatingFlag[peerKey] = false + p.storeCacheBNS(peerKey, addr) } func (p *DataPool) updateCacheResolveBNS(deviceName string, client *Client) { bnsKey := fmt.Sprintf("bns:%s", deviceName) bns, err := client.ResolveBNS(deviceName) if err == nil { - p.SetCacheBNS(bnsKey, bns) - p.bnsCacheExpireItem[bnsKey] = time.Now().Add(config.AppConfig.ResolveCacheTime) + p.storeCacheBNS(bnsKey, bns) + return } - p.bnsCacheUpdatingFlag[bnsKey] = false + p.setCacheUpdating(bnsKey, false) } func resolveAllPeersOfAddrs(members []Address, client *Client) (peers []Address) { @@ -305,16 +291,48 @@ func (p *DataPool) ClosePorts(client *Client) { } func (p *DataPool) SetCacheBNS(key string, bns []Address) { - p.srv.Cast(func() { + p.srv.Call(func() { p.bnsCache.Set(key, bns, cache.DefaultExpiration) }) } + func (p *DataPool) DeleteCacheBNS(key string) { - p.srv.Cast(func() { + p.srv.Call(func() { p.bnsCache.Delete(key) }) } +func (p *DataPool) markCacheRefreshIfNeeded(key string) (shouldRefresh bool) { + p.srv.Call(func() { + if config.AppConfig.ResolveCacheTime <= 0 || p.bnsCacheUpdatingFlag[key] { + return + } + if expireTime, ok := p.bnsCacheExpireItem[key]; ok && time.Now().After(expireTime) { + p.bnsCacheUpdatingFlag[key] = true + shouldRefresh = true + } + }) + return +} + +func (p *DataPool) setCacheUpdating(key string, updating bool) { + p.srv.Call(func() { + p.bnsCacheUpdatingFlag[key] = updating + }) +} + +func (p *DataPool) storeCacheBNS(key string, bns []Address) { + p.srv.Call(func() { + p.bnsCache.Set(key, bns, cache.DefaultExpiration) + if config.AppConfig.ResolveCacheTime > 0 { + p.bnsCacheExpireItem[key] = time.Now().Add(config.AppConfig.ResolveCacheTime) + } else { + delete(p.bnsCacheExpireItem, key) + } + p.bnsCacheUpdatingFlag[key] = false + }) +} + func (p *DataPool) GetCacheDevice(key Address) (deviceCache *DeviceCache) { return p.GetCache(string(key[:])) } diff --git a/rpc/ssl.go b/rpc/ssl.go index 5e2f392c..22da40ed 100644 --- a/rpc/ssl.go +++ b/rpc/ssl.go @@ -231,6 +231,12 @@ func (s *SSL) setTotalBytes(n uint64) { s.totalBytes = n } +func (s *SSL) setTotalConnections(n uint64) { + s.rm.Lock() + defer s.rm.Unlock() + s.totalConnections = n +} + func (s *SSL) incrementTotalBytes(n int) { s.rm.Lock() defer s.rm.Unlock()