Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ curl -u rpchub:<password> 'http://localhost:8012/config'
```
To update the configuration, the following command can be run:
```shell
curl -u rpchub:<password> -X POST 'http://localhost:8012/config' --header 'Content-Type: application/json' --data-raw '{"password":"123456","request_timeout":30,"max_retries":3,"nodes":{"arbitrum":[{"name":"blockpi-public-arbitrum","endpoint":"https://arbitrum.blockpi.network/v1/rpc/public","weight":90,"read_only":false,"disabled":false},{"name":"arbitrum-official","endpoint":"https://arb1.arbitrum.io/rpc","weight":10,"read_only":false,"disabled":false}],"bsc":[{"name":"blockpi-public-bsc","endpoint":"https://bsc.blockpi.network/v1/rpc/public","weight":100,"read_only":false,"disabled":false}]},"phishing_db":["https://cfg.rpchub.io/agg/scam-addresses.json"],"phishing_db_update_interval":3600}'
curl -u rpchub:<password> -X POST 'http://localhost:8012/config' --header 'Content-Type: application/json' --data-raw '{"password":"123456","request_timeout":30,"max_retries":3,"nodes":{"arbitrum":[{"name":"blockpi-public-arbitrum","endpoint":"https://arbitrum.blockpi.network/v1/rpc/public","weight":90,"read_only":false,"disabled":false,"jsonrpc_error_code_failover":[429,-32003],"headers":{"Authorization":"Bearer <token>"}},{"name":"arbitrum-official","endpoint":"https://arb1.arbitrum.io/rpc","weight":10,"read_only":false,"disabled":false}],"bsc":[{"name":"blockpi-public-bsc","endpoint":"https://bsc.blockpi.network/v1/rpc/public","weight":100,"read_only":false,"disabled":false,"jsonrpc_error_code_failover":[429],"headers":{"X-API-KEY":"<api-key>"}}]},"phishing_db":["https://cfg.rpchub.io/agg/scam-addresses.json"],"phishing_db_update_interval":3600}'
```

## Reset configuration
Expand Down
2 changes: 2 additions & 0 deletions aggregator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ type Node struct {
Weight int64 `json:"weight"`
ReadOnly bool `json:"read_only"`
Disabled bool `json:"disabled"`
JsonRpcErrorCodeFailover []int `json:"jsonrpc_error_code_failover,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}

func (node *Node) Host() string {
Expand Down
10 changes: 10 additions & 0 deletions loadbalance/selectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/BlockPILabs/aggregator/config"
"github.com/BlockPILabs/aggregator/log"
"sync"
"time"
)

var (
Expand Down Expand Up @@ -34,6 +35,15 @@ func NextNode(chain string) *aggregator.Node {
return nil
}

func TimeoutNode(chain string, nodeName string, d time.Duration) {
_mutex.Lock()
defer _mutex.Unlock()
selector := _selectors[chain]
if selector != nil {
selector.TimeoutNode(nodeName, d)
}
}

func LoadFromConfig() {
for chain, nodes := range config.Default().Nodes {
logger.Info("New load balancer", "chain", chain, "nodes", len(nodes))
Expand Down
30 changes: 25 additions & 5 deletions loadbalance/wr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@ import (
"github.com/BlockPILabs/aggregator/notify"
"math/rand"
"sync"
"time"
)

// WrSelector weighted-random selector
type WrSelector struct {
nodes []aggregator.Node
sumWeight int64
timeouts map[string]time.Time

mutex sync.Mutex
}

func (s *WrSelector) SetNodes(nodes []aggregator.Node) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.timeouts == nil {
s.timeouts = map[string]time.Time{}
}

var nodesSelected []aggregator.Node
var sumWeight int64 = 0
Expand All @@ -37,20 +42,35 @@ func (s *WrSelector) SetNodes(nodes []aggregator.Node) {
s.sumWeight = sumWeight
}

func (s *WrSelector) TimeoutNode(name string, d time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.timeouts == nil {
s.timeouts = map[string]time.Time{}
}
s.timeouts[name] = time.Now().Add(d)
}

func (s *WrSelector) NextNode() *aggregator.Node {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.sumWeight > 0 {
w := rand.Int63n(s.sumWeight)
var weight int64 = 0
for _, node := range s.nodes {
//if !node.Disabled {
weight += node.Weight
now := time.Now()
for i := 0; i < len(s.nodes); i++ {
name := s.nodes[i].Name
if until, ok := s.timeouts[name]; ok {
if now.Before(until) {
continue
}
delete(s.timeouts, name)
}
weight += s.nodes[i].Weight
if weight >= w {
return &node
return &s.nodes[i]
}
//}
}
}
return nil
Expand Down
28 changes: 28 additions & 0 deletions middleware/plugins/http_proxy.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package plugins

import (
"encoding/json"
"github.com/BlockPILabs/aggregator/aggregator"
"github.com/BlockPILabs/aggregator/client"
"github.com/BlockPILabs/aggregator/loadbalance"
"github.com/BlockPILabs/aggregator/log"
"github.com/BlockPILabs/aggregator/middleware"
"github.com/BlockPILabs/aggregator/rpc"
Expand All @@ -19,6 +22,15 @@ type HttpProxyMiddleware struct {
mu sync.Mutex
}

func containsInt(values []int, v int) bool {
for i := 0; i < len(values); i++ {
if values[i] == v {
return true
}
}
return false
}

func NewHttpProxyMiddleware() *HttpProxyMiddleware {
return &HttpProxyMiddleware{
enabled: true,
Expand Down Expand Up @@ -71,6 +83,22 @@ func (m *HttpProxyMiddleware) OnProcess(session *rpc.Session) error {
shouldDisableEndpoint = true
}

if err == nil {
body, _ := ctx.Response.BodyUncompressed()
if len(body) > 0 {
resp := &rpc.JsonRpcResponse{}
if json.Unmarshal(body, resp) == nil {
if resp.Error != nil {
if session.Node != nil && containsInt(session.Node.JsonRpcErrorCodeFailover, resp.Error.Code) {
loadbalance.TimeoutNode(session.Chain, session.Node.Name, time.Second*time.Duration(session.Cfg.RequestTimeout))
return aggregator.NewError(resp.Error.Code, resp.Error.Message)
}
return nil
}
}
}
}

if shouldDisableEndpoint {
//todo disable endpoint
}
Expand Down
15 changes: 15 additions & 0 deletions middleware/plugins/load_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,23 @@ func (m *LoadBalanceMiddleware) OnRequest(session *rpc.Session) error {
return aggregator.ErrServerError
}
session.NodeName = node.Name
session.Node = node
//logger.Debug("load balance", "sid", session.SId(), "node", node.Name)
if ctx, ok := session.RequestCtx.(*fasthttp.RequestCtx); ok {
if session.AppliedNodeHeaders != nil {
for k := range session.AppliedNodeHeaders {
ctx.Request.Header.Del(k)
}
session.AppliedNodeHeaders = nil
}
if node.Headers != nil {
for k, v := range node.Headers {
if len(k) > 0 {
ctx.Request.Header.Set(k, v)
}
}
session.AppliedNodeHeaders = node.Headers
}
ctx.Request.SetRequestURI(node.Endpoint)
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions rpc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Session struct {

Tries int
NodeName string
Node *aggregator.Node
AppliedNodeHeaders map[string]string
IsWriteRpcMethod bool

//Tx *types.Transaction
Expand Down