From 6ec52682b60abf1b5edbefa0f125cab0cb7a6aaa Mon Sep 17 00:00:00 2001 From: Tuhalf <37873266+tuhalf@users.noreply.github.com> Date: Fri, 13 Feb 2026 12:25:53 +0100 Subject: [PATCH 1/6] rpc: harden client lifecycle and cache synchronization --- rpc/bridge.go | 14 +- rpc/client.go | 348 +++++++++++++++++++++++++++++++++++------------- rpc/datapool.go | 74 ++++++---- rpc/ssl.go | 6 + rpc/type.go | 4 +- 5 files changed, 314 insertions(+), 132 deletions(-) diff --git a/rpc/bridge.go b/rpc/bridge.go index 9f9669c3..b60dba8b 100644 --- a/rpc/bridge.go +++ b/rpc/bridge.go @@ -390,8 +390,10 @@ func validateRLPValue(k rlp.Kind, content []byte) error { return nil } -// recvMessageLoop infinite loop to read message from server -func (client *Client) recvMessageLoop() { +// recvMessageLoop infinite loop to read message from server. +// It uses the connection captured at start so Close() can safely +// close the connection without racing on client.s pointer updates. +func (client *Client) recvMessageLoop(ssl *SSL) { msgBuffer := make(chan edge.Message, 20) defer close(msgBuffer) @@ -402,16 +404,16 @@ func (client *Client) recvMessageLoop() { }() for { - if client.s == nil { - if !client.isClosed { + if ssl == nil { + if !client.isClientClosed() { client.Log().Info("Client connection closed prematurely.") client.Close() } return } - msg, err := client.s.readMessage() + msg, err := ssl.readMessage() if err != nil { - if !client.isClosed { + if !client.isClientClosed() { client.Log().Info("Client connection closed unexpectedly: %v", err) client.Close() } diff --git a/rpc/client.go b/rpc/client.go index e6fe35ef..2fb1f2e2 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -36,6 +36,7 @@ const ( packetLimit = 65000 ticketBound = 4194304 callQueueSize = 1024 + blockHeaderFetchWorkerCount = 8 blockquickDowngradeThreshold = 5 blockquickValidationError = "couldn't validate any new blocks" ) @@ -72,11 +73,11 @@ type Client struct { // close event OnClose func() - isClosed bool - closing uint32 // set before Close() callback runs; allows GetNearestClient to exclude client earlier - srv *genserver.GenServer - timer *Timer - + closed uint32 + closing uint32 // set before Close() callback runs; allows GetNearestClient to exclude client earlier + stateMu sync.RWMutex + srv *genserver.GenServer + timer *Timer portOpen2Handler atomic.Value } @@ -120,12 +121,67 @@ func NewClient(host string, clientMan *ClientManager, cfg *config.Config, pool * } func (client *Client) averageLatency() int64 { - return client.latencySum / client.latencyCount + latencyCount := atomic.LoadInt64(&client.latencyCount) + if latencyCount <= 0 { + return 0 + } + latencySum := atomic.LoadInt64(&client.latencySum) + return latencySum / latencyCount } func (client *Client) addLatencyMeasurement(latency time.Duration) { - client.latencySum += latency.Milliseconds() - client.latencyCount++ + atomic.AddInt64(&client.latencySum, latency.Milliseconds()) + atomic.AddInt64(&client.latencyCount, 1) +} + +func (client *Client) isClientClosed() bool { + return atomic.LoadUint32(&client.closed) == 1 +} + +func (client *Client) setClientClosed() { + atomic.StoreUint32(&client.closed, 1) +} + +func (client *Client) getBlockquickWindow() *blockquick.Window { + client.stateMu.RLock() + defer client.stateMu.RUnlock() + return client.bq +} + +func (client *Client) setBlockquickWindow(win *blockquick.Window) { + client.stateMu.Lock() + client.bq = win + client.stateMu.Unlock() +} + +func (client *Client) getLastTicket() *edge.DeviceTicket { + client.stateMu.RLock() + defer client.stateMu.RUnlock() + return client.lastTicket +} + +func (client *Client) setLastTicket(ticket *edge.DeviceTicket) { + client.stateMu.Lock() + client.lastTicket = ticket + client.stateMu.Unlock() +} + +func (client *Client) clearStateTicketsAndWindow() { + client.stateMu.Lock() + client.bq = nil + client.lastTicket = nil + client.stateMu.Unlock() +} + +func isClientClosedError(err error) bool { + if err == nil { + return false + } + if err == errClientClosed { + return true + } + msg := err.Error() + return strings.Contains(msg, "dead genserver") || strings.Contains(msg, errClientClosed.Error()) } func (client *Client) doConnect() (err error) { @@ -202,26 +258,43 @@ func (client *Client) GetDeviceKey(ref string) string { func (client *Client) waitResponse(call *Call) (res interface{}, err error) { defer call.Clean(CLOSED) defer client.srv.Cast(func() { client.cm.RemoveCallByID(call.id) }) - resp, ok := <-call.response - if !ok { - host, _ := client.Host() - err = CancelledError{host} - if call.sender != nil { - call.sender.remoteErr = io.EOF - call.sender.Close() + timeout := client.config.RemoteRPCTimeout + if timeout <= 0 { + timeout = client.localTimeout + } + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case resp, ok := <-call.response: + if !ok { + host, _ := client.Host() + err = CancelledError{host} + if call.sender != nil { + call.sender.remoteErr = io.EOF + call.sender.Close() + } + return } - return - } - if rpcError, ok := resp.(edge.Error); ok { - err = RPCError{rpcError} + if rpcError, ok := resp.(edge.Error); ok { + err = RPCError{rpcError} + if call.sender != nil { + call.sender.remoteErr = RPCError{rpcError} + call.sender.Close() + } + return + } + res = resp + return res, nil + case <-timer.C: + err = TimeoutError{Timeout: timeout} + client.Log().Debug("RPC timeout after %s (method=%s id=%d)", timeout, call.method, call.id) if call.sender != nil { - call.sender.remoteErr = RPCError{rpcError} + call.sender.remoteErr = err call.sender.Close() } - return + return nil, err } - res = resp - return res, nil } // RespondContext sends a message (a response) without expecting a response @@ -244,9 +317,13 @@ func (client *Client) RespondContext(requestID uint64, responseType string, meth func (client *Client) callTimeout(fun func()) error { // Long enough timeout to at least survive initial reconnect attempts if client == nil { - return fmt.Errorf("Client disconnected") + return errClientClosed + } + err := client.srv.CallTimeout(fun, 30*time.Second) + if err != nil && strings.Contains(err.Error(), "dead genserver") { + return errClientClosed } - return client.srv.CallTimeout(fun, 30*time.Second) + return err } // CastContext returns a response future after calling the rpc @@ -271,17 +348,10 @@ func (client *Client) CastContext(sender *ConnectedPort, method string, args ... } func (client *Client) insertCall(call *Call) (err error) { - timeout := client.callTimeout(func() { - if client.isClosed { - err = errClientClosed - return - } - err = client.cm.Insert(call) - }) - if err == nil { - err = timeout + if client.isClientClosed() { + return errClientClosed } - return + return client.cm.Insert(call) } // CallContext returns the response after calling the rpc @@ -309,6 +379,33 @@ func (client *Client) CallContext(method string, args ...interface{}) (res inter return } +// CheckTicket should client send traffic ticket to server +func (client *Client) CheckTicket() { + defer client.timer.profile(time.Now(), "CheckTicket") + + client.srv.Cast(func() { + if client.s == nil || client.getBlockquickWindow() == nil { + return + } + + lastTicket := client.getLastTicket() + if client.s.TotalBytes() < client.s.Counter()+ticketBound && + lastTicket != nil && client.isRecentTicket(lastTicket) { + return + } + + ticket, err := client.newTicket() + if err != nil { + return + } + + err = client.submitTicket(ticket) + if err == nil { + client.setLastTicket(ticket) + } + }) +} + func (client *Client) isRecentTicket(tck *edge.DeviceTicket) bool { lvbn, _ := client.LastValid() @@ -341,7 +438,11 @@ func (client *Client) validateNetwork() error { // Fetching at least window size blocks -- this should be cached on disk instead. blockHeaders, err := client.GetBlockHeadersUnsafe(blockNumMin, lvbn) if err != nil { - client.Log().Error("Cannot fetch blocks %v-%v error: %v", blockNumMin, lvbn, err) + if isClientClosedError(err) { + client.Log().Debug("Cannot fetch blocks %v-%v: client is closed", blockNumMin, lvbn) + } else { + client.Log().Error("Cannot fetch blocks %v-%v error: %v", blockNumMin, lvbn, err) + } return err } if len(blockHeaders) != windowSize { @@ -414,9 +515,7 @@ func (client *Client) validateNetwork() error { } } - if err = client.callTimeout(func() { client.bq = win }); err != nil { - return err - } + client.setBlockquickWindow(win) client.storeLastValid() return nil } @@ -467,37 +566,67 @@ func (client *Client) GetBlockHeaderUnsafe(blockNum uint64) (bh blockquick.Block // TODO: use copy instead reference of BlockHeader func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquick.BlockHeader, error) { count := len(blockNumbers) - headersCount := 0 + if count == 0 { + return []blockquick.BlockHeader{}, nil + } + + workerCount := blockHeaderFetchWorkerCount + if workerCount > count { + workerCount = count + } + responses := make(map[uint64]blockquick.BlockHeader, count) + errorMessages := []string{} + var stop uint32 + jobs := make(chan uint64, count) mx := sync.Mutex{} wg := sync.WaitGroup{} - wg.Add(count) - errorMessages := []string{} - for _, i := range blockNumbers { - go func(bn uint64) { + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { defer wg.Done() - header, err := client.GetBlockHeaderUnsafe(bn) - if err != nil { + for bn := range jobs { + if atomic.LoadUint32(&stop) == 1 { + return + } + + header, err := client.GetBlockHeaderUnsafe(bn) + if err != nil { + mx.Lock() + errorMessages = append(errorMessages, fmt.Sprintf("block %v: %v", bn, err)) + mx.Unlock() + if isClientClosedError(err) { + atomic.StoreUint32(&stop, 1) + } + continue + } + mx.Lock() - errorMessages = append(errorMessages, fmt.Sprintf("block %v: %v", bn, err.Error())) + responses[bn] = header mx.Unlock() - return } - mx.Lock() - headersCount++ - responses[bn] = header - mx.Unlock() - }(i) + }() } + + for _, blockNumber := range blockNumbers { + if atomic.LoadUint32(&stop) == 1 { + break + } + jobs <- blockNumber + } + close(jobs) wg.Wait() - if headersCount != count { + if len(responses) != count { + if len(errorMessages) == 0 { + return []blockquick.BlockHeader{}, fmt.Errorf("failed fetching some blocks: missing block headers") + } return []blockquick.BlockHeader{}, fmt.Errorf("failed fetching some blocks: %v", strings.Join(errorMessages, ", ")) } // copy responses to headers - headers := make([]blockquick.BlockHeader, headersCount) + headers := make([]blockquick.BlockHeader, count) for i, bn := range blockNumbers { if bh, ok := responses[bn]; ok { headers[i] = bh @@ -509,8 +638,7 @@ func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquic // GetBlockHeaderValid returns a validated recent block header // (only available for the last windowsSize blocks) func (client *Client) GetBlockHeaderValid(blockNum uint64) blockquick.BlockHeader { - var bq *blockquick.Window - client.callTimeout(func() { bq = client.bq }) + bq := client.getBlockquickWindow() if bq != nil { return bq.GetBlockHeader(blockNum) } @@ -579,7 +707,7 @@ func (client *Client) greet() error { go func() { time.Sleep(10 * time.Second) s := client.s - if s != nil && client.lastTicket == nil { + if s != nil && client.getLastTicket() == nil { client.SubmitTicketForUsage(big.NewInt(int64(s.TotalBytes()))) } }() @@ -589,7 +717,7 @@ func (client *Client) greet() error { // SubmitTicketForUsage submits a ticket covering at least minBytes. func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { client.srv.Cast(func() { - if client.bq == nil || client.isClosed || client.s == nil { + if client.getBlockquickWindow() == nil || client.isClientClosed() || client.s == nil { return } if minBytes == nil { @@ -610,8 +738,7 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { client.s.setTotalBytes(minUsage) } - var ticket *edge.DeviceTicket - ticket, err = client.newTicket() + ticket, err := client.newTicket() if err != nil { return } @@ -620,10 +747,18 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { } err = client.submitTicket(ticket) if err == nil { - client.lastTicket = ticket + client.setLastTicket(ticket) } }) - return + return nil +} + +func (client *Client) SubmitNewTicket() error { + current := uint64(0) + if client.s != nil { + current = client.s.TotalBytes() + } + return client.SubmitTicketForUsage(new(big.Int).SetUint64(current)) } // HandleTicketRequest responds to a server ticket request. @@ -665,12 +800,16 @@ func (client *Client) SignTransaction(tx *edge.Transaction) (err error) { // NewTicket returns ticket func (client *Client) newTicket() (*edge.DeviceTicket, error) { - serverID, err := client.s.GetServerID() + ssl := client.s + if ssl == nil || ssl.Closed() { + return nil, errClientClosed + } + serverID, err := ssl.GetServerID() if err != nil { return nil, err } - total := client.s.TotalBytes() - client.s.UpdateCounter(total) + total := ssl.TotalBytes() + ssl.UpdateCounter(total) lvbn, lvbh := client.LastValid() ticket := &edge.DeviceTicket{ @@ -679,7 +818,7 @@ func (client *Client) newTicket() (*edge.DeviceTicket, error) { BlockNumber: lvbn, BlockHash: lvbh[:], FleetAddr: client.config.FleetAddr, - TotalConnections: big.NewInt(int64(client.s.TotalConnections())), + TotalConnections: big.NewInt(int64(ssl.TotalConnections())), TotalBytes: big.NewInt(int64(total)), LocalAddr: []byte{}, } @@ -699,7 +838,7 @@ func (client *Client) newTicket() (*edge.DeviceTicket, error) { if err := ticket.ValidateValues(); err != nil { return nil, err } - privKey, err := client.s.GetClientPrivateKey() + privKey, err := ssl.GetClientPrivateKey() if err != nil { return nil, err } @@ -731,6 +870,9 @@ func (client *Client) submitTicket(ticket *edge.DeviceTicket) error { if lastTicket, ok := resp.(edge.DeviceTicket); ok { if lastTicket.Err == edge.ErrTicketTooLow { ssl := client.s + if ssl == nil || ssl.Closed() { + return + } sid, _ := ssl.GetServerID() lastTicket.ServerID = sid lastTicket.FleetAddr = client.config.FleetAddr @@ -740,10 +882,11 @@ func (client *Client) submitTicket(ticket *edge.DeviceTicket) error { client.Log().Warn("received fake ticket.. last_ticket=%v", lastTicket) } else { client.Log().Debug("insufficient ticket, resending... last_ticket=%v", lastTicket) - ssl.totalConnections = lastTicket.TotalConnections.Uint64() + 1 - err = client.SubmitTicketForUsage(lastTicket.TotalBytes) - if err != nil { - client.Log().Error("failed to re-submit ticket: %v", err) + ssl.setTotalBytes(lastTicket.TotalBytes.Uint64() + 1024) + ssl.setTotalConnections(lastTicket.TotalConnections.Uint64() + 1) + retryErr := client.SubmitTicketForUsage(lastTicket.TotalBytes) + if retryErr != nil { + client.Log().Error("failed to re-submit ticket: %v", retryErr) } } } else if lastTicket.Err == edge.ErrTicketTooOld { @@ -1218,8 +1361,7 @@ func (client *Client) ResolveBlockHash(blockNumber uint64) (blockHash []byte, er if blockNumber == 0 { return } - var bq *blockquick.Window - client.callTimeout(func() { bq = client.bq }) + bq := client.getBlockquickWindow() var blockHeader blockquick.BlockHeader if bq != nil { blockHeader = bq.GetBlockHeader(blockNumber) @@ -1382,13 +1524,13 @@ func (client *Client) IsDeviceAllowlisted(fleetAddr Address, clientAddr Address) // Closed returns whether client had closed func (client *Client) Closed() bool { - return client.isClosed + return client.isClientClosed() } // Closing returns true if the client is closed or in the process of closing (flag set before Close callback runs). // Used by ClientManager to avoid returning a client that is about to be closed. func (client *Client) Closing() bool { - return atomic.LoadUint32(&client.closing) == 1 || client.isClosed + return atomic.LoadUint32(&client.closing) == 1 || client.isClientClosed() } // Close rpc client @@ -1399,11 +1541,11 @@ func (client *Client) Close() { } doCleanup := true timeout := client.callTimeout(func() { - if client.isClosed { + if client.isClientClosed() { doCleanup = false return } - client.isClosed = true + client.setClientClosed() // remove existing calls client.cm.RemoveCalls() if client.OnClose != nil { @@ -1411,13 +1553,24 @@ func (client *Client) Close() { } if client.s != nil { client.s.Close() - client.s = nil } }) if timeout == nil && !doCleanup { client.srv.Shutdown(0) return } + if timeout == errClientClosed && doCleanup { + // Genserver is already down; finish best-effort cleanup so callers + // don't block waiting for this client to terminate. + client.setClientClosed() + client.cm.RemoveCalls() + if client.OnClose != nil { + client.OnClose() + } + if client.s != nil { + client.s.Close() + } + } // remove open ports (best-effort even if callTimeout failed) if client.pool != nil { client.pool.ClosePorts(client) @@ -1429,28 +1582,30 @@ func (client *Client) Close() { func (client *Client) Start() { client.srv.Cast(func() { if err := client.doStart(); err != nil { - if !client.isClosed { + if !client.isClientClosed() { client.Log().Warn("Client connect failed: %v", err) } client.srv.Shutdown(0) + return } - }) - go func() { - if err := client.initialize(); err != nil { - if !client.isClosed { - client.Log().Warn("Client start failed: %v", err) - client.Close() + go func() { + if err := client.initialize(); err != nil { + if !client.isClientClosed() { + client.Log().Warn("Client start failed: %v", err) + client.Close() + } } - } - }() + }() + }) } func (client *Client) doStart() (err error) { if err = client.doConnect(); err != nil { return } - go client.recvMessageLoop() + ssl := client.s + go client.recvMessageLoop(ssl) client.cm.SendCallPtr = client.sendCall return } @@ -1468,8 +1623,7 @@ func (client *Client) doWatchLatestBlock() { if client.isRebuildingBlockquick() { return } - var bq *blockquick.Window - client.callTimeout(func() { bq = client.bq }) + bq := client.getBlockquickWindow() if bq == nil { return } @@ -1481,6 +1635,9 @@ func (client *Client) doWatchLatestBlock() { client.srv.Cast(func() { client.addLatencyMeasurement(elapsed) }) if err != nil { + if isClientClosedError(err) { + return + } client.Log().Error("Couldn't getblockpeak: %v", err) return } @@ -1494,6 +1651,9 @@ func (client *Client) doWatchLatestBlock() { for num := lastblock + 1; num <= blockPeak; num++ { blockHeader, err := client.GetBlockHeaderUnsafe(uint64(num)) if err != nil { + if isClientClosedError(err) { + return + } client.Log().Error("Couldn't download block header %v", err) return } @@ -1514,10 +1674,8 @@ func (client *Client) doWatchLatestBlock() { } func (client *Client) clearBlockquickWindow() error { - return client.callTimeout(func() { - client.bq = nil - client.lastTicket = nil - }) + client.clearStateTicketsAndWindow() + return nil } func (client *Client) forceResetBlockquickState() { diff --git a/rpc/datapool.go b/rpc/datapool.go index 8553a01d..58b14487 100644 --- a/rpc/datapool.go +++ b/rpc/datapool.go @@ -130,11 +130,8 @@ func (p *DataPool) GetCacheOrResolveBNS(deviceName string, client *Client) ([]Ad bns, cached := p.getCacheBNS(bnsKey) if cached { //check if cache is expired if so call updateCacheResolveBNS async. Expire duration is config.AppConfig.ResolveCacheTime - if !p.bnsCacheUpdatingFlag[bnsKey] && config.AppConfig.ResolveCacheTime > 0 { - if expireTime, ok := p.bnsCacheExpireItem[bnsKey]; ok && time.Now().After(expireTime) { - p.bnsCacheUpdatingFlag[bnsKey] = true - go p.updateCacheResolveBNS(deviceName, client) - } + if p.markCacheRefreshIfNeeded(bnsKey) { + go p.updateCacheResolveBNS(deviceName, client) } return bns, nil } @@ -154,11 +151,8 @@ func (p *DataPool) GetCacheOrResolvePeers(deviceName string, client *Client) ([] peers, cached := p.getCacheBNS(peerKey) if cached { //check if cache is expired if so call updateCacheResolvePeers async. Expire duration is config.AppConfig.ResolveCacheTime - if !p.bnsCacheUpdatingFlag[peerKey] && config.AppConfig.ResolveCacheTime > 0 { - if expireTime, ok := p.bnsCacheExpireItem[peerKey]; ok && time.Now().After(expireTime) { - p.bnsCacheUpdatingFlag[peerKey] = true - go p.updateCacheResolvePeers(deviceName, client) - } + if p.markCacheRefreshIfNeeded(peerKey) { + go p.updateCacheResolvePeers(deviceName, client) } return peers, nil } @@ -180,11 +174,8 @@ func (p *DataPool) GetCacheOrResolveAllPeersOfAddrs(addr Address, client *Client addrs := []Address{addr} if cached { //check if cache is expired if so call updateCacheResolveAllPeersOfAddrs async. Expire duration is config.AppConfig.ResolveCacheTime - if !p.bnsCacheUpdatingFlag[peerKey] && config.AppConfig.ResolveCacheTime > 0 { - if expireTime, ok := p.bnsCacheExpireItem[peerKey]; ok && time.Now().After(expireTime) { - p.bnsCacheUpdatingFlag[peerKey] = true - go p.updateCacheResolveAllPeersOfAddrs(addrs, client) - } + if p.markCacheRefreshIfNeeded(peerKey) { + go p.updateCacheResolveAllPeersOfAddrs(addrs, client) } return peers, nil @@ -203,10 +194,7 @@ func (p *DataPool) GetCacheOrResolveAllPeersOfAddrs(addr Address, client *Client func (p *DataPool) updateCacheResolveAllPeersOfAddrs(members []Address, client *Client) { peerKey := fmt.Sprintf("peers:%s", members[0].HexString()) peers := resolveAllPeersOfAddrs(members, client) - - p.SetCacheBNS(peerKey, peers) - p.bnsCacheExpireItem[peerKey] = time.Now().Add(config.AppConfig.ResolveCacheTime) - p.bnsCacheUpdatingFlag[peerKey] = false + p.storeCacheBNS(peerKey, peers) } func (p *DataPool) updateCacheResolvePeers(deviceName string, client *Client) { @@ -214,24 +202,22 @@ func (p *DataPool) updateCacheResolvePeers(deviceName string, client *Client) { var addr []Address bnsResult, err := p.GetCacheOrResolveBNS(deviceName, client) if err != nil { + p.setCacheUpdating(peerKey, false) return } addr = resolveAllPeersOfAddrs(bnsResult, client) - - p.SetCacheBNS(peerKey, addr) - p.bnsCacheExpireItem[peerKey] = time.Now().Add(config.AppConfig.ResolveCacheTime) - p.bnsCacheUpdatingFlag[peerKey] = false + p.storeCacheBNS(peerKey, addr) } func (p *DataPool) updateCacheResolveBNS(deviceName string, client *Client) { bnsKey := fmt.Sprintf("bns:%s", deviceName) bns, err := client.ResolveBNS(deviceName) if err == nil { - p.SetCacheBNS(bnsKey, bns) - p.bnsCacheExpireItem[bnsKey] = time.Now().Add(config.AppConfig.ResolveCacheTime) + p.storeCacheBNS(bnsKey, bns) + return } - p.bnsCacheUpdatingFlag[bnsKey] = false + p.setCacheUpdating(bnsKey, false) } func resolveAllPeersOfAddrs(members []Address, client *Client) (peers []Address) { @@ -305,16 +291,48 @@ func (p *DataPool) ClosePorts(client *Client) { } func (p *DataPool) SetCacheBNS(key string, bns []Address) { - p.srv.Cast(func() { + p.srv.Call(func() { p.bnsCache.Set(key, bns, cache.DefaultExpiration) }) } + func (p *DataPool) DeleteCacheBNS(key string) { - p.srv.Cast(func() { + p.srv.Call(func() { p.bnsCache.Delete(key) }) } +func (p *DataPool) markCacheRefreshIfNeeded(key string) (shouldRefresh bool) { + p.srv.Call(func() { + if config.AppConfig.ResolveCacheTime <= 0 || p.bnsCacheUpdatingFlag[key] { + return + } + if expireTime, ok := p.bnsCacheExpireItem[key]; ok && time.Now().After(expireTime) { + p.bnsCacheUpdatingFlag[key] = true + shouldRefresh = true + } + }) + return +} + +func (p *DataPool) setCacheUpdating(key string, updating bool) { + p.srv.Call(func() { + p.bnsCacheUpdatingFlag[key] = updating + }) +} + +func (p *DataPool) storeCacheBNS(key string, bns []Address) { + p.srv.Call(func() { + p.bnsCache.Set(key, bns, cache.DefaultExpiration) + if config.AppConfig.ResolveCacheTime > 0 { + p.bnsCacheExpireItem[key] = time.Now().Add(config.AppConfig.ResolveCacheTime) + } else { + delete(p.bnsCacheExpireItem, key) + } + p.bnsCacheUpdatingFlag[key] = false + }) +} + func (p *DataPool) GetCacheDevice(key Address) (deviceCache *DeviceCache) { return p.GetCache(string(key[:])) } diff --git a/rpc/ssl.go b/rpc/ssl.go index 5e2f392c..22da40ed 100644 --- a/rpc/ssl.go +++ b/rpc/ssl.go @@ -231,6 +231,12 @@ func (s *SSL) setTotalBytes(n uint64) { s.totalBytes = n } +func (s *SSL) setTotalConnections(n uint64) { + s.rm.Lock() + defer s.rm.Unlock() + s.totalConnections = n +} + func (s *SSL) incrementTotalBytes(n int) { s.rm.Lock() defer s.rm.Unlock() diff --git a/rpc/type.go b/rpc/type.go index 92689100..3783c761 100644 --- a/rpc/type.go +++ b/rpc/type.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/diodechain/diode_client/blockquick" "github.com/diodechain/diode_client/crypto" "github.com/diodechain/diode_client/db" "github.com/diodechain/diode_client/edge" @@ -112,8 +111,7 @@ func WindowSize() int { // LastValid returns the last valid block number and block header func (client *Client) LastValid() (uint64, crypto.Sha3) { - var bq *blockquick.Window - client.callTimeout(func() { bq = client.bq }) + bq := client.getBlockquickWindow() if bq == nil { return restoreLastValid() } From 5727a2d34e7a9a0f01b721bc673da123f481ee3b Mon Sep 17 00:00:00 2001 From: Tuhalf <37873266+tuhalf@users.noreply.github.com> Date: Fri, 13 Feb 2026 13:02:40 +0100 Subject: [PATCH 2/6] rpc: fix late-response panic race and serialize ticket access - make waitResponse cleanup synchronous via RemoveCallByID to avoid timeout/late-response send-on-closed-channel panics - move greet() delayed ticket fallback access to client.s into srv.Cast - move SubmitNewTicket() client.s reads into srv.Cast - add regression test: TestWaitResponseTimeoutWithLateClaimedResponseDoesNotPanic --- rpc/client.go | 27 ++++++----- rpc/client_waitresponse_test.go | 79 +++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 11 deletions(-) create mode 100644 rpc/client_waitresponse_test.go diff --git a/rpc/client.go b/rpc/client.go index 2fb1f2e2..7950b568 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -256,8 +256,9 @@ func (client *Client) GetDeviceKey(ref string) string { } func (client *Client) waitResponse(call *Call) (res interface{}, err error) { - defer call.Clean(CLOSED) - defer client.srv.Cast(func() { client.cm.RemoveCallByID(call.id) }) + // Remove the call synchronously on exit. This avoids a race where a timed out + // call channel is closed before the call is removed from the manager. + defer client.cm.RemoveCallByID(call.id) timeout := client.config.RemoteRPCTimeout if timeout <= 0 { timeout = client.localTimeout @@ -706,10 +707,11 @@ func (client *Client) greet() error { // In case the server does not request a ticket, we will submit one after 10 seconds go func() { time.Sleep(10 * time.Second) - s := client.s - if s != nil && client.getLastTicket() == nil { - client.SubmitTicketForUsage(big.NewInt(int64(s.TotalBytes()))) - } + client.srv.Cast(func() { + if client.s != nil && client.getLastTicket() == nil { + client.SubmitTicketForUsage(big.NewInt(int64(client.s.TotalBytes()))) + } + }) }() return nil } @@ -754,11 +756,14 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { } func (client *Client) SubmitNewTicket() error { - current := uint64(0) - if client.s != nil { - current = client.s.TotalBytes() - } - return client.SubmitTicketForUsage(new(big.Int).SetUint64(current)) + client.srv.Cast(func() { + current := uint64(0) + if client.s != nil { + current = client.s.TotalBytes() + } + _ = client.SubmitTicketForUsage(new(big.Int).SetUint64(current)) + }) + return nil } // HandleTicketRequest responds to a server ticket request. diff --git a/rpc/client_waitresponse_test.go b/rpc/client_waitresponse_test.go new file mode 100644 index 00000000..c47d5354 --- /dev/null +++ b/rpc/client_waitresponse_test.go @@ -0,0 +1,79 @@ +package rpc + +import ( + "testing" + "time" + + "github.com/diodechain/diode_client/config" +) + +func testClientConfig(t *testing.T, timeout time.Duration) *config.Config { + t.Helper() + + cfg := &config.Config{ + RemoteRPCTimeout: timeout, + } + logger, err := config.NewLogger(cfg) + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + cfg.Logger = &logger + return cfg +} + +func TestWaitResponseTimeoutWithLateClaimedResponseDoesNotPanic(t *testing.T) { + timeout := 10 * time.Millisecond + cfg := testClientConfig(t, timeout) + client := &Client{ + cm: NewCallManager(4), + config: cfg, + localTimeout: timeout, + } + + call := &Call{ + id: 42, + method: "late_response", + state: STARTED, + response: make(chan interface{}, 1), + } + client.cm.calls[call.id] = call + + releaseSend := make(chan struct{}) + panicCh := make(chan interface{}, 1) + errCh := make(chan error, 1) + + go func() { + c := client.cm.CallByID(call.id) + if c == nil { + errCh <- nil + return + } + + <-releaseSend + + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + errCh <- c.enqueueResponse("late") + }() + + _, err := client.waitResponse(call) + if _, ok := err.(TimeoutError); !ok { + t.Fatalf("expected TimeoutError, got %T (%v)", err, err) + } + + close(releaseSend) + + select { + case p := <-panicCh: + t.Fatalf("enqueueResponse panicked after timeout: %v", p) + case enqueueErr := <-errCh: + if enqueueErr != nil { + t.Fatalf("unexpected enqueue error: %v", enqueueErr) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for late response goroutine") + } +} From 315ebbd4e641ebf6aea902e4bc4615cbce288118 Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Wed, 18 Feb 2026 11:26:37 +0100 Subject: [PATCH 3/6] Fix genserver races in go1.25 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 9a06ac54..86a16c72 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/mholt/acmez v1.2.0 // indirect github.com/miekg/dns v1.1.57 // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect - github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 // indirect + github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect diff --git a/go.sum b/go.sum index 898128b7..f6c06381 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,8 @@ github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 h1:+qIP3OMrT7SN5kLnTcVEISPOMB/97RyAKTg1UWA738E= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= +github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741 h1:KPpdlQLZcHfTMQRi6bFQ7ogNO0ltFT4PmtwTLW4W+14= +github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= From 2092f837064713d359647f9f0cfae2ba57cc644f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20AKG=C3=9CL?= <37873266+tuhalf@users.noreply.github.com> Date: Wed, 18 Feb 2026 16:18:05 +0100 Subject: [PATCH 4/6] rpc: simplify client state sync after goid update The goid upgrade resolved most genserver race behavior, so remove extra workaround locking layered on top of actor-based access. - drop Client.stateMu and bq/lastTicket accessor helpers - route bq/lastTicket reads and writes back through callTimeout/srv context - restore insertCall serialization via callTimeout - remove unused CheckTicket helper Keep the essential race/panic fixes (lifecycle hardening, late-response cleanup, ticket-path serialization) while reducing complexity and keeping the codebase maintainable. --- rpc/client.go | 97 ++++++++++++++------------------------------------- rpc/type.go | 4 ++- 2 files changed, 29 insertions(+), 72 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 7950b568..7c7a7796 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -75,7 +75,6 @@ type Client struct { closed uint32 closing uint32 // set before Close() callback runs; allows GetNearestClient to exclude client earlier - stateMu sync.RWMutex srv *genserver.GenServer timer *Timer portOpen2Handler atomic.Value @@ -142,37 +141,6 @@ func (client *Client) setClientClosed() { atomic.StoreUint32(&client.closed, 1) } -func (client *Client) getBlockquickWindow() *blockquick.Window { - client.stateMu.RLock() - defer client.stateMu.RUnlock() - return client.bq -} - -func (client *Client) setBlockquickWindow(win *blockquick.Window) { - client.stateMu.Lock() - client.bq = win - client.stateMu.Unlock() -} - -func (client *Client) getLastTicket() *edge.DeviceTicket { - client.stateMu.RLock() - defer client.stateMu.RUnlock() - return client.lastTicket -} - -func (client *Client) setLastTicket(ticket *edge.DeviceTicket) { - client.stateMu.Lock() - client.lastTicket = ticket - client.stateMu.Unlock() -} - -func (client *Client) clearStateTicketsAndWindow() { - client.stateMu.Lock() - client.bq = nil - client.lastTicket = nil - client.stateMu.Unlock() -} - func isClientClosedError(err error) bool { if err == nil { return false @@ -349,10 +317,17 @@ func (client *Client) CastContext(sender *ConnectedPort, method string, args ... } func (client *Client) insertCall(call *Call) (err error) { - if client.isClientClosed() { - return errClientClosed + timeout := client.callTimeout(func() { + if client.isClientClosed() { + err = errClientClosed + return + } + err = client.cm.Insert(call) + }) + if err == nil { + err = timeout } - return client.cm.Insert(call) + return } // CallContext returns the response after calling the rpc @@ -380,33 +355,6 @@ func (client *Client) CallContext(method string, args ...interface{}) (res inter return } -// CheckTicket should client send traffic ticket to server -func (client *Client) CheckTicket() { - defer client.timer.profile(time.Now(), "CheckTicket") - - client.srv.Cast(func() { - if client.s == nil || client.getBlockquickWindow() == nil { - return - } - - lastTicket := client.getLastTicket() - if client.s.TotalBytes() < client.s.Counter()+ticketBound && - lastTicket != nil && client.isRecentTicket(lastTicket) { - return - } - - ticket, err := client.newTicket() - if err != nil { - return - } - - err = client.submitTicket(ticket) - if err == nil { - client.setLastTicket(ticket) - } - }) -} - func (client *Client) isRecentTicket(tck *edge.DeviceTicket) bool { lvbn, _ := client.LastValid() @@ -516,7 +464,9 @@ func (client *Client) validateNetwork() error { } } - client.setBlockquickWindow(win) + if err = client.callTimeout(func() { client.bq = win }); err != nil { + return err + } client.storeLastValid() return nil } @@ -639,7 +589,8 @@ func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquic // GetBlockHeaderValid returns a validated recent block header // (only available for the last windowsSize blocks) func (client *Client) GetBlockHeaderValid(blockNum uint64) blockquick.BlockHeader { - bq := client.getBlockquickWindow() + var bq *blockquick.Window + client.callTimeout(func() { bq = client.bq }) if bq != nil { return bq.GetBlockHeader(blockNum) } @@ -708,7 +659,7 @@ func (client *Client) greet() error { go func() { time.Sleep(10 * time.Second) client.srv.Cast(func() { - if client.s != nil && client.getLastTicket() == nil { + if client.s != nil && client.lastTicket == nil { client.SubmitTicketForUsage(big.NewInt(int64(client.s.TotalBytes()))) } }) @@ -719,7 +670,7 @@ func (client *Client) greet() error { // SubmitTicketForUsage submits a ticket covering at least minBytes. func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { client.srv.Cast(func() { - if client.getBlockquickWindow() == nil || client.isClientClosed() || client.s == nil { + if client.bq == nil || client.isClientClosed() || client.s == nil { return } if minBytes == nil { @@ -749,7 +700,7 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { } err = client.submitTicket(ticket) if err == nil { - client.setLastTicket(ticket) + client.lastTicket = ticket } }) return nil @@ -1366,7 +1317,8 @@ func (client *Client) ResolveBlockHash(blockNumber uint64) (blockHash []byte, er if blockNumber == 0 { return } - bq := client.getBlockquickWindow() + var bq *blockquick.Window + client.callTimeout(func() { bq = client.bq }) var blockHeader blockquick.BlockHeader if bq != nil { blockHeader = bq.GetBlockHeader(blockNumber) @@ -1628,7 +1580,8 @@ func (client *Client) doWatchLatestBlock() { if client.isRebuildingBlockquick() { return } - bq := client.getBlockquickWindow() + var bq *blockquick.Window + client.callTimeout(func() { bq = client.bq }) if bq == nil { return } @@ -1679,8 +1632,10 @@ func (client *Client) doWatchLatestBlock() { } func (client *Client) clearBlockquickWindow() error { - client.clearStateTicketsAndWindow() - return nil + return client.callTimeout(func() { + client.bq = nil + client.lastTicket = nil + }) } func (client *Client) forceResetBlockquickState() { diff --git a/rpc/type.go b/rpc/type.go index 3783c761..92689100 100644 --- a/rpc/type.go +++ b/rpc/type.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/diodechain/diode_client/blockquick" "github.com/diodechain/diode_client/crypto" "github.com/diodechain/diode_client/db" "github.com/diodechain/diode_client/edge" @@ -111,7 +112,8 @@ func WindowSize() int { // LastValid returns the last valid block number and block header func (client *Client) LastValid() (uint64, crypto.Sha3) { - bq := client.getBlockquickWindow() + var bq *blockquick.Window + client.callTimeout(func() { bq = client.bq }) if bq == nil { return restoreLastValid() } From c5d79c49e3b5fdf484911a97f50376bca95e7dea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20AKG=C3=9CL?= <37873266+tuhalf@users.noreply.github.com> Date: Wed, 18 Feb 2026 21:56:04 +0100 Subject: [PATCH 5/6] Fix lint --- crypto/crypto.go | 1 + crypto/ecies/ecies.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/crypto/crypto.go b/crypto/crypto.go index ba150185..13a6d633 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -138,6 +138,7 @@ func toECDSA(d []byte, strict bool) (*ecdsa.PrivateKey, error) { return nil, fmt.Errorf("invalid private key, zero or negative") } + //lint:ignore SA1019 secp256k1 uses custom curve ops; crypto/ecdh does not support this curve priv.PublicKey.X, priv.PublicKey.Y = priv.PublicKey.Curve.ScalarBaseMult(d) if priv.PublicKey.X == nil { return nil, errors.New("invalid private key") diff --git a/crypto/ecies/ecies.go b/crypto/ecies/ecies.go index 9fc154ba..bb622023 100644 --- a/crypto/ecies/ecies.go +++ b/crypto/ecies/ecies.go @@ -127,6 +127,7 @@ func (prv *PrivateKey) GenerateShared(pub *PublicKey, skLen, macLen int) (sk []b return nil, ErrSharedKeyTooBig } + //lint:ignore SA1019 secp256k1 ECDH still relies on curve ScalarMult in this implementation x, _ := pub.Curve.ScalarMult(pub.X, pub.Y, prv.D.Bytes()) if x == nil { return nil, ErrSharedKeyIsPointAtInfinity @@ -337,6 +338,7 @@ func (prv *PrivateKey) Decrypt(c, s1, s2 []byte) (m []byte, err error) { err = ErrInvalidPublicKey return } + //lint:ignore SA1019 secp256k1 curve validation uses IsOnCurve and cannot use crypto/ecdh directly if !R.Curve.IsOnCurve(R.X, R.Y) { err = ErrInvalidCurve return From 025d89a844b4e8b281b439cce61f61977debd0b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20AKG=C3=9CL?= <37873266+tuhalf@users.noreply.github.com> Date: Wed, 18 Feb 2026 23:32:30 +0100 Subject: [PATCH 6/6] trim non-essential client race-condition changes --- rpc/client.go | 97 +++++++++------------------------------------------ 1 file changed, 17 insertions(+), 80 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 7c7a7796..87baa97c 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -36,7 +36,6 @@ const ( packetLimit = 65000 ticketBound = 4194304 callQueueSize = 1024 - blockHeaderFetchWorkerCount = 8 blockquickDowngradeThreshold = 5 blockquickValidationError = "couldn't validate any new blocks" ) @@ -141,17 +140,6 @@ func (client *Client) setClientClosed() { atomic.StoreUint32(&client.closed, 1) } -func isClientClosedError(err error) bool { - if err == nil { - return false - } - if err == errClientClosed { - return true - } - msg := err.Error() - return strings.Contains(msg, "dead genserver") || strings.Contains(msg, errClientClosed.Error()) -} - func (client *Client) doConnect() (err error) { err = client.doDial() if err != nil { @@ -387,11 +375,7 @@ func (client *Client) validateNetwork() error { // Fetching at least window size blocks -- this should be cached on disk instead. blockHeaders, err := client.GetBlockHeadersUnsafe(blockNumMin, lvbn) if err != nil { - if isClientClosedError(err) { - client.Log().Debug("Cannot fetch blocks %v-%v: client is closed", blockNumMin, lvbn) - } else { - client.Log().Error("Cannot fetch blocks %v-%v error: %v", blockNumMin, lvbn, err) - } + client.Log().Error("Cannot fetch blocks %v-%v error: %v", blockNumMin, lvbn, err) return err } if len(blockHeaders) != windowSize { @@ -517,67 +501,37 @@ func (client *Client) GetBlockHeaderUnsafe(blockNum uint64) (bh blockquick.Block // TODO: use copy instead reference of BlockHeader func (client *Client) GetBlockHeadersUnsafe2(blockNumbers []uint64) ([]blockquick.BlockHeader, error) { count := len(blockNumbers) - if count == 0 { - return []blockquick.BlockHeader{}, nil - } - - workerCount := blockHeaderFetchWorkerCount - if workerCount > count { - workerCount = count - } - + headersCount := 0 responses := make(map[uint64]blockquick.BlockHeader, count) - errorMessages := []string{} - var stop uint32 - jobs := make(chan uint64, count) mx := sync.Mutex{} wg := sync.WaitGroup{} + wg.Add(count) + errorMessages := []string{} - for i := 0; i < workerCount; i++ { - wg.Add(1) - go func() { + for _, i := range blockNumbers { + go func(bn uint64) { defer wg.Done() - for bn := range jobs { - if atomic.LoadUint32(&stop) == 1 { - return - } - - header, err := client.GetBlockHeaderUnsafe(bn) - if err != nil { - mx.Lock() - errorMessages = append(errorMessages, fmt.Sprintf("block %v: %v", bn, err)) - mx.Unlock() - if isClientClosedError(err) { - atomic.StoreUint32(&stop, 1) - } - continue - } - + header, err := client.GetBlockHeaderUnsafe(bn) + if err != nil { mx.Lock() - responses[bn] = header + errorMessages = append(errorMessages, fmt.Sprintf("block %v: %v", bn, err.Error())) mx.Unlock() + return } - }() - } - - for _, blockNumber := range blockNumbers { - if atomic.LoadUint32(&stop) == 1 { - break - } - jobs <- blockNumber + mx.Lock() + headersCount++ + responses[bn] = header + mx.Unlock() + }(i) } - close(jobs) wg.Wait() - if len(responses) != count { - if len(errorMessages) == 0 { - return []blockquick.BlockHeader{}, fmt.Errorf("failed fetching some blocks: missing block headers") - } + if headersCount != count { return []blockquick.BlockHeader{}, fmt.Errorf("failed fetching some blocks: %v", strings.Join(errorMessages, ", ")) } // copy responses to headers - headers := make([]blockquick.BlockHeader, count) + headers := make([]blockquick.BlockHeader, headersCount) for i, bn := range blockNumbers { if bh, ok := responses[bn]; ok { headers[i] = bh @@ -706,17 +660,6 @@ func (client *Client) SubmitTicketForUsage(minBytes *big.Int) (err error) { return nil } -func (client *Client) SubmitNewTicket() error { - client.srv.Cast(func() { - current := uint64(0) - if client.s != nil { - current = client.s.TotalBytes() - } - _ = client.SubmitTicketForUsage(new(big.Int).SetUint64(current)) - }) - return nil -} - // HandleTicketRequest responds to a server ticket request. func (client *Client) HandleTicketRequest(req *edge.TicketRequest) { if req == nil { @@ -1593,9 +1536,6 @@ func (client *Client) doWatchLatestBlock() { client.srv.Cast(func() { client.addLatencyMeasurement(elapsed) }) if err != nil { - if isClientClosedError(err) { - return - } client.Log().Error("Couldn't getblockpeak: %v", err) return } @@ -1609,9 +1549,6 @@ func (client *Client) doWatchLatestBlock() { for num := lastblock + 1; num <= blockPeak; num++ { blockHeader, err := client.GetBlockHeaderUnsafe(uint64(num)) if err != nil { - if isClientClosedError(err) { - return - } client.Log().Error("Couldn't download block header %v", err) return }