diff --git a/README.md b/README.md index 7bb25ee..205c6cc 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ curl -u rpchub: 'http://localhost:8012/config' ``` To update the configuration, the following command can be run: ```shell -curl -u rpchub: -X POST 'http://localhost:8012/config' --header 'Content-Type: application/json' --data-raw '{"password":"123456","request_timeout":30,"max_retries":3,"nodes":{"arbitrum":[{"name":"blockpi-public-arbitrum","endpoint":"https://arbitrum.blockpi.network/v1/rpc/public","weight":90,"read_only":false,"disabled":false},{"name":"arbitrum-official","endpoint":"https://arb1.arbitrum.io/rpc","weight":10,"read_only":false,"disabled":false}],"bsc":[{"name":"blockpi-public-bsc","endpoint":"https://bsc.blockpi.network/v1/rpc/public","weight":100,"read_only":false,"disabled":false}]},"phishing_db":["https://cfg.rpchub.io/agg/scam-addresses.json"],"phishing_db_update_interval":3600}' +curl -u rpchub: -X POST 'http://localhost:8012/config' --header 'Content-Type: application/json' --data-raw '{"password":"123456","request_timeout":30,"max_retries":3,"nodes":{"arbitrum":[{"name":"blockpi-public-arbitrum","endpoint":"https://arbitrum.blockpi.network/v1/rpc/public","weight":90,"read_only":false,"disabled":false,"jsonrpc_error_code_failover":[429,-32003],"headers":{"Authorization":"Bearer "}},{"name":"arbitrum-official","endpoint":"https://arb1.arbitrum.io/rpc","weight":10,"read_only":false,"disabled":false}],"bsc":[{"name":"blockpi-public-bsc","endpoint":"https://bsc.blockpi.network/v1/rpc/public","weight":100,"read_only":false,"disabled":false,"jsonrpc_error_code_failover":[429],"headers":{"X-API-KEY":""}}]},"phishing_db":["https://cfg.rpchub.io/agg/scam-addresses.json"],"phishing_db_update_interval":3600}' ``` ## Reset configuration diff --git a/aggregator/node.go b/aggregator/node.go index 0bfed53..7336372 100644 --- a/aggregator/node.go +++ b/aggregator/node.go @@ -8,6 +8,8 @@ type Node struct { Weight int64 `json:"weight"` ReadOnly bool `json:"read_only"` Disabled bool `json:"disabled"` + JsonRpcErrorCodeFailover []int `json:"jsonrpc_error_code_failover,omitempty"` + Headers map[string]string `json:"headers,omitempty"` } func (node *Node) Host() string { diff --git a/loadbalance/selectors.go b/loadbalance/selectors.go index 9640873..5ee73ec 100644 --- a/loadbalance/selectors.go +++ b/loadbalance/selectors.go @@ -5,6 +5,7 @@ import ( "github.com/BlockPILabs/aggregator/config" "github.com/BlockPILabs/aggregator/log" "sync" + "time" ) var ( @@ -34,6 +35,15 @@ func NextNode(chain string) *aggregator.Node { return nil } +func TimeoutNode(chain string, nodeName string, d time.Duration) { + _mutex.Lock() + defer _mutex.Unlock() + selector := _selectors[chain] + if selector != nil { + selector.TimeoutNode(nodeName, d) + } +} + func LoadFromConfig() { for chain, nodes := range config.Default().Nodes { logger.Info("New load balancer", "chain", chain, "nodes", len(nodes)) diff --git a/loadbalance/wr.go b/loadbalance/wr.go index 5d8d78d..2c1d95f 100644 --- a/loadbalance/wr.go +++ b/loadbalance/wr.go @@ -5,12 +5,14 @@ import ( "github.com/BlockPILabs/aggregator/notify" "math/rand" "sync" + "time" ) // WrSelector weighted-random selector type WrSelector struct { nodes []aggregator.Node sumWeight int64 + timeouts map[string]time.Time mutex sync.Mutex } @@ -18,6 +20,9 @@ type WrSelector struct { func (s *WrSelector) SetNodes(nodes []aggregator.Node) { s.mutex.Lock() defer s.mutex.Unlock() + if s.timeouts == nil { + s.timeouts = map[string]time.Time{} + } var nodesSelected []aggregator.Node var sumWeight int64 = 0 @@ -37,6 +42,15 @@ func (s *WrSelector) SetNodes(nodes []aggregator.Node) { s.sumWeight = sumWeight } +func (s *WrSelector) TimeoutNode(name string, d time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.timeouts == nil { + s.timeouts = map[string]time.Time{} + } + s.timeouts[name] = time.Now().Add(d) +} + func (s *WrSelector) NextNode() *aggregator.Node { s.mutex.Lock() defer s.mutex.Unlock() @@ -44,13 +58,19 @@ func (s *WrSelector) NextNode() *aggregator.Node { if s.sumWeight > 0 { w := rand.Int63n(s.sumWeight) var weight int64 = 0 - for _, node := range s.nodes { - //if !node.Disabled { - weight += node.Weight + now := time.Now() + for i := 0; i < len(s.nodes); i++ { + name := s.nodes[i].Name + if until, ok := s.timeouts[name]; ok { + if now.Before(until) { + continue + } + delete(s.timeouts, name) + } + weight += s.nodes[i].Weight if weight >= w { - return &node + return &s.nodes[i] } - //} } } return nil diff --git a/middleware/plugins/http_proxy.go b/middleware/plugins/http_proxy.go index b0ae468..304eca0 100644 --- a/middleware/plugins/http_proxy.go +++ b/middleware/plugins/http_proxy.go @@ -1,7 +1,10 @@ package plugins import ( + "encoding/json" + "github.com/BlockPILabs/aggregator/aggregator" "github.com/BlockPILabs/aggregator/client" + "github.com/BlockPILabs/aggregator/loadbalance" "github.com/BlockPILabs/aggregator/log" "github.com/BlockPILabs/aggregator/middleware" "github.com/BlockPILabs/aggregator/rpc" @@ -19,6 +22,15 @@ type HttpProxyMiddleware struct { mu sync.Mutex } +func containsInt(values []int, v int) bool { + for i := 0; i < len(values); i++ { + if values[i] == v { + return true + } + } + return false +} + func NewHttpProxyMiddleware() *HttpProxyMiddleware { return &HttpProxyMiddleware{ enabled: true, @@ -71,6 +83,22 @@ func (m *HttpProxyMiddleware) OnProcess(session *rpc.Session) error { shouldDisableEndpoint = true } + if err == nil { + body, _ := ctx.Response.BodyUncompressed() + if len(body) > 0 { + resp := &rpc.JsonRpcResponse{} + if json.Unmarshal(body, resp) == nil { + if resp.Error != nil { + if session.Node != nil && containsInt(session.Node.JsonRpcErrorCodeFailover, resp.Error.Code) { + loadbalance.TimeoutNode(session.Chain, session.Node.Name, time.Second*time.Duration(session.Cfg.RequestTimeout)) + return aggregator.NewError(resp.Error.Code, resp.Error.Message) + } + return nil + } + } + } + } + if shouldDisableEndpoint { //todo disable endpoint } diff --git a/middleware/plugins/load_balance.go b/middleware/plugins/load_balance.go index aef24bb..196ff51 100644 --- a/middleware/plugins/load_balance.go +++ b/middleware/plugins/load_balance.go @@ -39,8 +39,23 @@ func (m *LoadBalanceMiddleware) OnRequest(session *rpc.Session) error { return aggregator.ErrServerError } session.NodeName = node.Name + session.Node = node //logger.Debug("load balance", "sid", session.SId(), "node", node.Name) if ctx, ok := session.RequestCtx.(*fasthttp.RequestCtx); ok { + if session.AppliedNodeHeaders != nil { + for k := range session.AppliedNodeHeaders { + ctx.Request.Header.Del(k) + } + session.AppliedNodeHeaders = nil + } + if node.Headers != nil { + for k, v := range node.Headers { + if len(k) > 0 { + ctx.Request.Header.Set(k, v) + } + } + session.AppliedNodeHeaders = node.Headers + } ctx.Request.SetRequestURI(node.Endpoint) } return nil diff --git a/rpc/session.go b/rpc/session.go index c23411a..de25a2a 100644 --- a/rpc/session.go +++ b/rpc/session.go @@ -25,6 +25,8 @@ type Session struct { Tries int NodeName string + Node *aggregator.Node + AppliedNodeHeaders map[string]string IsWriteRpcMethod bool //Tx *types.Transaction