Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- Fix bitswap unable to retrieve content from providers whose advertised addresses are stale or unreachable but whose real listen addresses the DHT host has already learned. With `--dht-shared-host=false` (the default), addresses from the DHT host's peerstore are now merged into the bitswap host's peerstore on each `Connect`, matching kubo and ipfs-check (both of which run DHT and bitswap on the same host). ([#372](https://github.com/ipfs/rainbow/pull/372))

### Removed

### Security
Expand Down
18 changes: 13 additions & 5 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/gologshim"
"github.com/libp2p/go-libp2p/p2p/host/observedaddrs"
Expand Down Expand Up @@ -325,13 +326,14 @@ func SetupWithLibp2p(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCac
}

var (
vs routing.ValueStore
cr routing.ContentRouting
pr routing.PeerRouting
vs routing.ValueStore
cr routing.ContentRouting
pr routing.PeerRouting
dhtHost host.Host
)

opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
cr, pr, vs, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache)
cr, pr, vs, dhtHost, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache)
return pr, err
}))
h, err := libp2p.New(opts...)
Expand Down Expand Up @@ -361,7 +363,13 @@ func SetupWithLibp2p(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCac
blkst = blockstore.NewIdStore(blkst)
n.blockstore = blkst

bsrv = blockservice.New(blkst, setupBitswapExchange(ctx, cfg, h, cr, blkst),
// Bridge the DHT host's peerstore into bitswap's view only when the
// hosts are split; otherwise the peerstore is already shared.
var dhtAddrs peerstore.AddrBook
if dhtHost != nil && dhtHost != h {
dhtAddrs = dhtHost.Peerstore()
}
bsrv = blockservice.New(blkst, setupBitswapExchange(ctx, cfg, h, dhtAddrs, cr, blkst),
// if we are doing things right, our bitswap wantlists should
// not have blocks that we already have (see
// https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus
Expand Down
73 changes: 71 additions & 2 deletions setup_bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"slices"
"time"

"github.com/ipfs/boxo/routing/providerquerymanager"
Expand All @@ -19,15 +20,83 @@ import (
metri "github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
// peerstoreMergingHost wraps the bitswap host so Connect sees addresses
// the DHT host has learned for a peer.
//
// BasicHost.Connect runs AddAddrs(pi.Addrs, TempAddrTTL) and then dials
// every address in the host's peerstore for that peer. Sharing a libp2p
// host (kubo, ipfs-check) means identify exchanges, DHT response messages,
// and DCUtR coordination all enrich the same peerstore that bitswap reads
// at dial time. Rainbow's --dht-shared-host=false default runs the DHT on
// a separate libp2p host, so that enrichment lands on a peerstore the
// bitswap host never reads. The wrapper bridges that gap.
//
// IP-based addresses are filtered down to publicly routable ones. The DHT
// host's peerstore can hold loopback or RFC1918 entries that misconfigured
// peers stored in their routing tables; forwarding those would just waste
// resource-manager budget on dials that can never connect. Non-IP addresses
// (relay-only /p2p-circuit hops, DNS-based transports) cannot be classified
// without resolving them, so they pass through and let the swarm decide.
type peerstoreMergingHost struct {
host.Host
dhtAddrs peerstore.AddrBook
}

// Connect copies dialable DHT-known addresses for pi.ID into the main host's
// peerstore at TempAddrTTL (the same TTL BasicHost.Connect uses for the
// AddrInfo it receives), then delegates to the embedded host. Identify on
// the resulting connection refreshes the durable set on its own.
func (h *peerstoreMergingHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
dialable := slices.DeleteFunc(h.dhtAddrs.Addrs(pi.ID), isUndialableMergedAddr)
if len(dialable) > 0 {
h.Host.Peerstore().AddAddrs(pi.ID, dialable, peerstore.TempAddrTTL)
}
return h.Host.Connect(ctx, pi)
}

// isUndialableMergedAddr reports whether a DHT-learned address should be
// dropped before forwarding into the bitswap host's peerstore. IP- and
// DNS-rooted addresses are filtered through manet.IsPublicAddr (which
// rejects RFC1918, loopback, link-local, and special-use DNS names like
// .local, .localhost, .invalid, .test). Everything else (relay-only
// /p2p-circuit hops, unknown transports) cannot be classified locally
// and is kept so the swarm can attempt it.
//
// Mirrors the hasIPOrDNSComponent + IsPublicAddr guard go-libp2p uses
// in BasicHost.filterPublicAddrs (p2p/host/basic/addrs_manager.go).
// Phrased as the predicate slices.DeleteFunc wants (true means drop).
func isUndialableMergedAddr(a ma.Multiaddr) bool {
if len(a) == 0 {
return true
}
switch a[0].Protocol().Code {
case ma.P_IP4, ma.P_IP6, ma.P_IP6ZONE, ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR:
return !manet.IsPublicAddr(a)
}
return false
}

// setupBitswapExchange wires bitswap onto h, the main libp2p host. In the
// split-host setup (dhtAddrs non-nil), h is wrapped so each bitswap Connect
// copies DHT-known public addresses into the peerstore before dialing.
func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, dhtAddrs peerstore.AddrBook, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")

connEvtMgr := network.NewConnectEventManager()

bitswapHost := h
if dhtAddrs != nil {
bitswapHost = &peerstoreMergingHost{Host: h, dhtAddrs: dhtAddrs}
}

var exnet network.BitSwapNetwork
bn := bsnet.NewFromIpfsHost(h, bsnet.WithConnectEventManager(connEvtMgr))
bn := bsnet.NewFromIpfsHost(bitswapHost, bsnet.WithConnectEventManager(connEvtMgr))

if cfg.HTTPRetrievalEnable {

Expand Down
66 changes: 66 additions & 0 deletions setup_bitswap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"testing"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestIsUndialableMergedAddr(t *testing.T) {
// The bridge from dhtHost.Peerstore() into the bitswap host must drop
// IP-rooted addresses that aren't publicly routable, but it must not
// reject addresses whose first component isn't an IP just because
// manet.IsPublicAddr returns false for them.
cases := []struct {
addr string
drop bool
}{
// IPv4 public.
{"/ip4/1.2.3.4/tcp/4001", false},
{"/ip4/89.124.92.77/udp/4001/quic-v1", false},
// IPv4 non-public.
{"/ip4/127.0.0.1/tcp/4001", true},
{"/ip4/10.0.0.1/tcp/4001", true},
{"/ip4/192.168.1.1/tcp/4001", true},
{"/ip4/172.16.0.1/tcp/4001", true},
{"/ip4/169.254.1.1/tcp/4001", true},
// IPv6 public and non-public.
{"/ip6/2606:4700::1/tcp/4001", false},
{"/ip6/::1/tcp/4001", true},
{"/ip6/fc00::1/tcp/4001", true},
// IP6ZONE-prefixed (link-local with zone id) must still be evaluated.
{"/ip6zone/eth0/ip6/fe80::1/tcp/4001", true},
// Pure relay hop without an IP head. manet.IsPublicAddr returns false
// here, but we must keep it: the swarm resolves the relay separately.
{"/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq/p2p-circuit/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq", false},
// DNS-rooted transports with ordinary names: kept; BasicHost resolves
// them on dial.
{"/dns4/example.com/tcp/4001", false},
{"/dns6/example.com/tcp/4001", false},
{"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", false},
// Special-use DNS names that can never resolve to a useful public
// destination: dropped. Same reasoning as RFC1918/loopback IPs.
{"/dns4/foo.local/tcp/4001", true},
{"/dns/printer.home.arpa/tcp/4001", true},
{"/dns4/myhost.localhost/tcp/4001", true},
{"/dns/bogus.invalid/tcp/4001", true},
{"/dns4/widget.test/tcp/4001", true},
// Relay address with a public IP head is still public.
{"/ip4/147.75.83.83/tcp/4001/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq/p2p-circuit", false},
// Relay address with a private IP head is not.
{"/ip4/192.168.1.1/tcp/4001/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq/p2p-circuit", true},
}

for _, tc := range cases {
t.Run(tc.addr, func(t *testing.T) {
addr, err := ma.NewMultiaddr(tc.addr)
require.NoError(t, err)
require.Equal(t, tc.drop, isUndialableMergedAddr(addr))
})
}

t.Run("empty multiaddr", func(t *testing.T) {
require.True(t, isUndialableMergedAddr(ma.Multiaddr{}))
})
}
36 changes: 18 additions & 18 deletions setup_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,21 @@ func parseBootstrapPeers(peers []string, warnOnAuto bool) ([]peer.AddrInfo, erro
return bootstrapPeers, nil
}

func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, error) {
func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, host.Host, error) {
if cfg.DHTRouting == DHTOff {
return nil, nil
return nil, nil, nil
}

// Parse bootstrap peers
bootstrapPeers, err := parseBootstrapPeers(cfg.Bootstrap, true)
if err != nil {
return nil, err
return nil, nil, err
}

// If no bootstrap peers provided, use defaults for seed peering or error otherwise
if len(bootstrapPeers) == 0 {
if !cfg.SeedPeering {
return nil, fmt.Errorf("no valid bootstrap peers configured - provide bootstrap peers or enable autoconf")
return nil, nil, fmt.Errorf("no valid bootstrap peers configured - provide bootstrap peers or enable autoconf")
}
// Use default bootstrap peers for seed peering
bootstrapPeers = dht.GetDefaultBootstrapPeerAddrInfos()
Expand All @@ -171,7 +171,7 @@ func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.
libp2p.ResourceManager(dhtRcMgr),
)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand All @@ -181,11 +181,11 @@ func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.
dht.Mode(dht.ModeClient),
)
if err != nil {
return nil, err
return nil, nil, err
}

if cfg.DHTRouting == DHTStandard {
return standardClient, nil
return standardClient, dhtHost, nil
}

if cfg.DHTRouting == DHTAccelerated {
Expand All @@ -200,15 +200,15 @@ func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.
dht.BucketSize(20),
))
if err != nil {
return nil, err
return nil, nil, err
}
return &bundledDHT{
standard: standardClient,
fullRT: fullRTClient,
}, nil
}, dhtHost, nil
}

return nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting)
return nil, nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting)
}

func setupCompositeRouting(delegatedRouters []routing.Routing, dht routing.Routing, cfg Config) routing.Routing {
Expand Down Expand Up @@ -253,15 +253,15 @@ func setupCompositeRouting(delegatedRouters []routing.Routing, dht routing.Routi
return router
}

func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, error) {
func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, host.Host, error) {
delegatedRouters, err := setupDelegatedRouting(cfg, dnsCache)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

dhtRouter, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc)
dhtRouter, dhtHost, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

router := setupCompositeRouting(delegatedRouters, dhtRouter, cfg)
Expand All @@ -277,7 +277,7 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat
if len(cfg.RemoteBackends) > 0 && cfg.RemoteBackendsIPNS {
remoteValueStore, err := gateway.NewRemoteValueStore(cfg.RemoteBackends, nil)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
vs = setupCompositeRouting(append(delegatedRouters, &routinghelpers.Compose{
ValueStore: remoteValueStore,
Expand All @@ -291,7 +291,7 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat
// Parse bootstrap peers for seed peering DHT (don't warn on auto since it's expected)
seedBootstrapPeers, err := parseBootstrapPeers(cfg.Bootstrap, false)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// Use provided bootstrap peers or fall back to defaults
Expand All @@ -308,11 +308,11 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat

pr, err = dht.New(ctx, h, dhtOpts...)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
}

return cr, pr, vs, nil
return cr, pr, vs, dhtHost, nil
}

func setupRoutingNoLibp2p(cfg Config, dnsCache *cachedDNS) (routing.ValueStore, error) {
Expand Down
Loading