From 5c643b68c436199f60e10e9e5cc33b718aa4ddf5 Mon Sep 17 00:00:00 2001 From: ljoaquim Date: Mon, 16 Mar 2026 08:56:15 -0300 Subject: [PATCH 1/3] feat: added new configuration parameter 'jsonrpc_error_code_failover', an int array that if any of the values match the jsonrpc error code, make the request failover --- aggregator/node.go | 1 + loadbalance/selectors.go | 10 ++++++++++ loadbalance/wr.go | 30 +++++++++++++++++++++++++----- middleware/plugins/http_proxy.go | 28 ++++++++++++++++++++++++++++ middleware/plugins/load_balance.go | 1 + rpc/session.go | 1 + 6 files changed, 66 insertions(+), 5 deletions(-) diff --git a/aggregator/node.go b/aggregator/node.go index 0bfed53..27163bb 100644 --- a/aggregator/node.go +++ b/aggregator/node.go @@ -8,6 +8,7 @@ type Node struct { Weight int64 `json:"weight"` ReadOnly bool `json:"read_only"` Disabled bool `json:"disabled"` + JsonRpcErrorCodeFailover []int `json:"jsonrpc_error_code_failover,omitempty"` } func (node *Node) Host() string { diff --git a/loadbalance/selectors.go b/loadbalance/selectors.go index 9640873..5ee73ec 100644 --- a/loadbalance/selectors.go +++ b/loadbalance/selectors.go @@ -5,6 +5,7 @@ import ( "github.com/BlockPILabs/aggregator/config" "github.com/BlockPILabs/aggregator/log" "sync" + "time" ) var ( @@ -34,6 +35,15 @@ func NextNode(chain string) *aggregator.Node { return nil } +func TimeoutNode(chain string, nodeName string, d time.Duration) { + _mutex.Lock() + defer _mutex.Unlock() + selector := _selectors[chain] + if selector != nil { + selector.TimeoutNode(nodeName, d) + } +} + func LoadFromConfig() { for chain, nodes := range config.Default().Nodes { logger.Info("New load balancer", "chain", chain, "nodes", len(nodes)) diff --git a/loadbalance/wr.go b/loadbalance/wr.go index 5d8d78d..2c1d95f 100644 --- a/loadbalance/wr.go +++ b/loadbalance/wr.go @@ -5,12 +5,14 @@ import ( "github.com/BlockPILabs/aggregator/notify" "math/rand" "sync" + "time" ) // WrSelector weighted-random selector type WrSelector struct { nodes []aggregator.Node sumWeight int64 + timeouts map[string]time.Time mutex sync.Mutex } @@ -18,6 +20,9 @@ type WrSelector struct { func (s *WrSelector) SetNodes(nodes []aggregator.Node) { s.mutex.Lock() defer s.mutex.Unlock() + if s.timeouts == nil { + s.timeouts = map[string]time.Time{} + } var nodesSelected []aggregator.Node var sumWeight int64 = 0 @@ -37,6 +42,15 @@ func (s *WrSelector) SetNodes(nodes []aggregator.Node) { s.sumWeight = sumWeight } +func (s *WrSelector) TimeoutNode(name string, d time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.timeouts == nil { + s.timeouts = map[string]time.Time{} + } + s.timeouts[name] = time.Now().Add(d) +} + func (s *WrSelector) NextNode() *aggregator.Node { s.mutex.Lock() defer s.mutex.Unlock() @@ -44,13 +58,19 @@ func (s *WrSelector) NextNode() *aggregator.Node { if s.sumWeight > 0 { w := rand.Int63n(s.sumWeight) var weight int64 = 0 - for _, node := range s.nodes { - //if !node.Disabled { - weight += node.Weight + now := time.Now() + for i := 0; i < len(s.nodes); i++ { + name := s.nodes[i].Name + if until, ok := s.timeouts[name]; ok { + if now.Before(until) { + continue + } + delete(s.timeouts, name) + } + weight += s.nodes[i].Weight if weight >= w { - return &node + return &s.nodes[i] } - //} } } return nil diff --git a/middleware/plugins/http_proxy.go b/middleware/plugins/http_proxy.go index b0ae468..304eca0 100644 --- a/middleware/plugins/http_proxy.go +++ b/middleware/plugins/http_proxy.go @@ -1,7 +1,10 @@ package plugins import ( + "encoding/json" + "github.com/BlockPILabs/aggregator/aggregator" "github.com/BlockPILabs/aggregator/client" + "github.com/BlockPILabs/aggregator/loadbalance" "github.com/BlockPILabs/aggregator/log" "github.com/BlockPILabs/aggregator/middleware" "github.com/BlockPILabs/aggregator/rpc" @@ -19,6 +22,15 @@ type HttpProxyMiddleware struct { mu sync.Mutex } +func containsInt(values []int, v int) bool { + for i := 0; i < len(values); i++ { + if values[i] == v { + return true + } + } + return false +} + func NewHttpProxyMiddleware() *HttpProxyMiddleware { return &HttpProxyMiddleware{ enabled: true, @@ -71,6 +83,22 @@ func (m *HttpProxyMiddleware) OnProcess(session *rpc.Session) error { shouldDisableEndpoint = true } + if err == nil { + body, _ := ctx.Response.BodyUncompressed() + if len(body) > 0 { + resp := &rpc.JsonRpcResponse{} + if json.Unmarshal(body, resp) == nil { + if resp.Error != nil { + if session.Node != nil && containsInt(session.Node.JsonRpcErrorCodeFailover, resp.Error.Code) { + loadbalance.TimeoutNode(session.Chain, session.Node.Name, time.Second*time.Duration(session.Cfg.RequestTimeout)) + return aggregator.NewError(resp.Error.Code, resp.Error.Message) + } + return nil + } + } + } + } + if shouldDisableEndpoint { //todo disable endpoint } diff --git a/middleware/plugins/load_balance.go b/middleware/plugins/load_balance.go index aef24bb..931acfb 100644 --- a/middleware/plugins/load_balance.go +++ b/middleware/plugins/load_balance.go @@ -39,6 +39,7 @@ func (m *LoadBalanceMiddleware) OnRequest(session *rpc.Session) error { return aggregator.ErrServerError } session.NodeName = node.Name + session.Node = node //logger.Debug("load balance", "sid", session.SId(), "node", node.Name) if ctx, ok := session.RequestCtx.(*fasthttp.RequestCtx); ok { ctx.Request.SetRequestURI(node.Endpoint) diff --git a/rpc/session.go b/rpc/session.go index c23411a..ec02c8e 100644 --- a/rpc/session.go +++ b/rpc/session.go @@ -25,6 +25,7 @@ type Session struct { Tries int NodeName string + Node *aggregator.Node IsWriteRpcMethod bool //Tx *types.Transaction From ba36c523f52f7211eae3985330cc31f5f809b560 Mon Sep 17 00:00:00 2001 From: ljoaquim Date: Mon, 16 Mar 2026 09:04:44 -0300 Subject: [PATCH 2/3] feat: add per-node request headers config to inject custom headers into upstream RPC calls --- aggregator/node.go | 1 + middleware/plugins/load_balance.go | 14 ++++++++++++++ rpc/session.go | 1 + 3 files changed, 16 insertions(+) diff --git a/aggregator/node.go b/aggregator/node.go index 27163bb..7336372 100644 --- a/aggregator/node.go +++ b/aggregator/node.go @@ -9,6 +9,7 @@ type Node struct { ReadOnly bool `json:"read_only"` Disabled bool `json:"disabled"` JsonRpcErrorCodeFailover []int `json:"jsonrpc_error_code_failover,omitempty"` + Headers map[string]string `json:"headers,omitempty"` } func (node *Node) Host() string { diff --git a/middleware/plugins/load_balance.go b/middleware/plugins/load_balance.go index 931acfb..196ff51 100644 --- a/middleware/plugins/load_balance.go +++ b/middleware/plugins/load_balance.go @@ -42,6 +42,20 @@ func (m *LoadBalanceMiddleware) OnRequest(session *rpc.Session) error { session.Node = node //logger.Debug("load balance", "sid", session.SId(), "node", node.Name) if ctx, ok := session.RequestCtx.(*fasthttp.RequestCtx); ok { + if session.AppliedNodeHeaders != nil { + for k := range session.AppliedNodeHeaders { + ctx.Request.Header.Del(k) + } + session.AppliedNodeHeaders = nil + } + if node.Headers != nil { + for k, v := range node.Headers { + if len(k) > 0 { + ctx.Request.Header.Set(k, v) + } + } + session.AppliedNodeHeaders = node.Headers + } ctx.Request.SetRequestURI(node.Endpoint) } return nil diff --git a/rpc/session.go b/rpc/session.go index ec02c8e..de25a2a 100644 --- a/rpc/session.go +++ b/rpc/session.go @@ -26,6 +26,7 @@ type Session struct { Tries int NodeName string Node *aggregator.Node + AppliedNodeHeaders map[string]string IsWriteRpcMethod bool //Tx *types.Transaction From b99cca295aeca83564082fc7ab1f3630adc06d44 Mon Sep 17 00:00:00 2001 From: ljoaquim Date: Mon, 16 Mar 2026 09:09:59 -0300 Subject: [PATCH 3/3] updated README.md config example to include new jsonrpc_error_code_failover and headers fields --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7bb25ee..205c6cc 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ curl -u rpchub: 'http://localhost:8012/config' ``` To update the configuration, the following command can be run: ```shell -curl -u rpchub: -X POST 'http://localhost:8012/config' --header 'Content-Type: application/json' --data-raw '{"password":"123456","request_timeout":30,"max_retries":3,"nodes":{"arbitrum":[{"name":"blockpi-public-arbitrum","endpoint":"https://arbitrum.blockpi.network/v1/rpc/public","weight":90,"read_only":false,"disabled":false},{"name":"arbitrum-official","endpoint":"https://arb1.arbitrum.io/rpc","weight":10,"read_only":false,"disabled":false}],"bsc":[{"name":"blockpi-public-bsc","endpoint":"https://bsc.blockpi.network/v1/rpc/public","weight":100,"read_only":false,"disabled":false}]},"phishing_db":["https://cfg.rpchub.io/agg/scam-addresses.json"],"phishing_db_update_interval":3600}' +curl -u rpchub: -X POST 'http://localhost:8012/config' --header 'Content-Type: application/json' --data-raw '{"password":"123456","request_timeout":30,"max_retries":3,"nodes":{"arbitrum":[{"name":"blockpi-public-arbitrum","endpoint":"https://arbitrum.blockpi.network/v1/rpc/public","weight":90,"read_only":false,"disabled":false,"jsonrpc_error_code_failover":[429,-32003],"headers":{"Authorization":"Bearer "}},{"name":"arbitrum-official","endpoint":"https://arb1.arbitrum.io/rpc","weight":10,"read_only":false,"disabled":false}],"bsc":[{"name":"blockpi-public-bsc","endpoint":"https://bsc.blockpi.network/v1/rpc/public","weight":100,"read_only":false,"disabled":false,"jsonrpc_error_code_failover":[429],"headers":{"X-API-KEY":""}}]},"phishing_db":["https://cfg.rpchub.io/agg/scam-addresses.json"],"phishing_db_update_interval":3600}' ``` ## Reset configuration