diff --git a/CHANGELOG.md b/CHANGELOG.md index 9518c1a..f888653 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ The following emojis are used to highlight certain changes: ### Fixed +- Fix bitswap unable to retrieve content from providers whose advertised addresses are stale or unreachable but whose real listen addresses the DHT host has already learned. With `--dht-shared-host=false` (the default), addresses from the DHT host's peerstore are now merged into the bitswap host's peerstore on each `Connect`, matching kubo and ipfs-check (both of which run DHT and bitswap on the same host). ([#372](https://github.com/ipfs/rainbow/pull/372)) + ### Removed ### Security diff --git a/setup.go b/setup.go index 4ac8165..e0c1359 100644 --- a/setup.go +++ b/setup.go @@ -39,6 +39,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/gologshim" "github.com/libp2p/go-libp2p/p2p/host/observedaddrs" @@ -325,13 +326,14 @@ func SetupWithLibp2p(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCac } var ( - vs routing.ValueStore - cr routing.ContentRouting - pr routing.PeerRouting + vs routing.ValueStore + cr routing.ContentRouting + pr routing.PeerRouting + dhtHost host.Host ) opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - cr, pr, vs, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache) + cr, pr, vs, dhtHost, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache) return pr, err })) h, err := libp2p.New(opts...) @@ -361,7 +363,13 @@ func SetupWithLibp2p(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCac blkst = blockstore.NewIdStore(blkst) n.blockstore = blkst - bsrv = blockservice.New(blkst, setupBitswapExchange(ctx, cfg, h, cr, blkst), + // Bridge the DHT host's peerstore into bitswap's view only when the + // hosts are split; otherwise the peerstore is already shared. + var dhtAddrs peerstore.AddrBook + if dhtHost != nil && dhtHost != h { + dhtAddrs = dhtHost.Peerstore() + } + bsrv = blockservice.New(blkst, setupBitswapExchange(ctx, cfg, h, dhtAddrs, cr, blkst), // if we are doing things right, our bitswap wantlists should // not have blocks that we already have (see // https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus diff --git a/setup_bitswap.go b/setup_bitswap.go index 132d1e4..2e78bd4 100644 --- a/setup_bitswap.go +++ b/setup_bitswap.go @@ -2,6 +2,7 @@ package main import ( "context" + "slices" "time" "github.com/ipfs/boxo/routing/providerquerymanager" @@ -19,15 +20,83 @@ import ( metri "github.com/ipfs/go-metrics-interface" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" ) -func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface { +// peerstoreMergingHost wraps the bitswap host so Connect sees addresses +// the DHT host has learned for a peer. +// +// BasicHost.Connect runs AddAddrs(pi.Addrs, TempAddrTTL) and then dials +// every address in the host's peerstore for that peer. Sharing a libp2p +// host (kubo, ipfs-check) means identify exchanges, DHT response messages, +// and DCUtR coordination all enrich the same peerstore that bitswap reads +// at dial time. Rainbow's --dht-shared-host=false default runs the DHT on +// a separate libp2p host, so that enrichment lands on a peerstore the +// bitswap host never reads. The wrapper bridges that gap. +// +// IP-based addresses are filtered down to publicly routable ones. The DHT +// host's peerstore can hold loopback or RFC1918 entries that misconfigured +// peers stored in their routing tables; forwarding those would just waste +// resource-manager budget on dials that can never connect. Non-IP addresses +// (relay-only /p2p-circuit hops, DNS-based transports) cannot be classified +// without resolving them, so they pass through and let the swarm decide. +type peerstoreMergingHost struct { + host.Host + dhtAddrs peerstore.AddrBook +} + +// Connect copies dialable DHT-known addresses for pi.ID into the main host's +// peerstore at TempAddrTTL (the same TTL BasicHost.Connect uses for the +// AddrInfo it receives), then delegates to the embedded host. Identify on +// the resulting connection refreshes the durable set on its own. +func (h *peerstoreMergingHost) Connect(ctx context.Context, pi peer.AddrInfo) error { + dialable := slices.DeleteFunc(h.dhtAddrs.Addrs(pi.ID), isUndialableMergedAddr) + if len(dialable) > 0 { + h.Host.Peerstore().AddAddrs(pi.ID, dialable, peerstore.TempAddrTTL) + } + return h.Host.Connect(ctx, pi) +} + +// isUndialableMergedAddr reports whether a DHT-learned address should be +// dropped before forwarding into the bitswap host's peerstore. IP- and +// DNS-rooted addresses are filtered through manet.IsPublicAddr (which +// rejects RFC1918, loopback, link-local, and special-use DNS names like +// .local, .localhost, .invalid, .test). Everything else (relay-only +// /p2p-circuit hops, unknown transports) cannot be classified locally +// and is kept so the swarm can attempt it. +// +// Mirrors the hasIPOrDNSComponent + IsPublicAddr guard go-libp2p uses +// in BasicHost.filterPublicAddrs (p2p/host/basic/addrs_manager.go). +// Phrased as the predicate slices.DeleteFunc wants (true means drop). +func isUndialableMergedAddr(a ma.Multiaddr) bool { + if len(a) == 0 { + return true + } + switch a[0].Protocol().Code { + case ma.P_IP4, ma.P_IP6, ma.P_IP6ZONE, ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR: + return !manet.IsPublicAddr(a) + } + return false +} + +// setupBitswapExchange wires bitswap onto h, the main libp2p host. In the +// split-host setup (dhtAddrs non-nil), h is wrapped so each bitswap Connect +// copies DHT-known public addresses into the peerstore before dialing. +func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, dhtAddrs peerstore.AddrBook, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface { bsctx := metri.CtxScope(ctx, "ipfs_bitswap") connEvtMgr := network.NewConnectEventManager() + + bitswapHost := h + if dhtAddrs != nil { + bitswapHost = &peerstoreMergingHost{Host: h, dhtAddrs: dhtAddrs} + } + var exnet network.BitSwapNetwork - bn := bsnet.NewFromIpfsHost(h, bsnet.WithConnectEventManager(connEvtMgr)) + bn := bsnet.NewFromIpfsHost(bitswapHost, bsnet.WithConnectEventManager(connEvtMgr)) if cfg.HTTPRetrievalEnable { diff --git a/setup_bitswap_test.go b/setup_bitswap_test.go new file mode 100644 index 0000000..f249062 --- /dev/null +++ b/setup_bitswap_test.go @@ -0,0 +1,66 @@ +package main + +import ( + "testing" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestIsUndialableMergedAddr(t *testing.T) { + // The bridge from dhtHost.Peerstore() into the bitswap host must drop + // IP-rooted addresses that aren't publicly routable, but it must not + // reject addresses whose first component isn't an IP just because + // manet.IsPublicAddr returns false for them. + cases := []struct { + addr string + drop bool + }{ + // IPv4 public. + {"/ip4/1.2.3.4/tcp/4001", false}, + {"/ip4/89.124.92.77/udp/4001/quic-v1", false}, + // IPv4 non-public. + {"/ip4/127.0.0.1/tcp/4001", true}, + {"/ip4/10.0.0.1/tcp/4001", true}, + {"/ip4/192.168.1.1/tcp/4001", true}, + {"/ip4/172.16.0.1/tcp/4001", true}, + {"/ip4/169.254.1.1/tcp/4001", true}, + // IPv6 public and non-public. + {"/ip6/2606:4700::1/tcp/4001", false}, + {"/ip6/::1/tcp/4001", true}, + {"/ip6/fc00::1/tcp/4001", true}, + // IP6ZONE-prefixed (link-local with zone id) must still be evaluated. + {"/ip6zone/eth0/ip6/fe80::1/tcp/4001", true}, + // Pure relay hop without an IP head. manet.IsPublicAddr returns false + // here, but we must keep it: the swarm resolves the relay separately. + {"/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq/p2p-circuit/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq", false}, + // DNS-rooted transports with ordinary names: kept; BasicHost resolves + // them on dial. + {"/dns4/example.com/tcp/4001", false}, + {"/dns6/example.com/tcp/4001", false}, + {"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", false}, + // Special-use DNS names that can never resolve to a useful public + // destination: dropped. Same reasoning as RFC1918/loopback IPs. + {"/dns4/foo.local/tcp/4001", true}, + {"/dns/printer.home.arpa/tcp/4001", true}, + {"/dns4/myhost.localhost/tcp/4001", true}, + {"/dns/bogus.invalid/tcp/4001", true}, + {"/dns4/widget.test/tcp/4001", true}, + // Relay address with a public IP head is still public. + {"/ip4/147.75.83.83/tcp/4001/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq/p2p-circuit", false}, + // Relay address with a private IP head is not. + {"/ip4/192.168.1.1/tcp/4001/p2p/12D3KooWPZgaSPM84PKCb78vEugeJTA7X2k6WYcKV1TGv1M9FJGq/p2p-circuit", true}, + } + + for _, tc := range cases { + t.Run(tc.addr, func(t *testing.T) { + addr, err := ma.NewMultiaddr(tc.addr) + require.NoError(t, err) + require.Equal(t, tc.drop, isUndialableMergedAddr(addr)) + }) + } + + t.Run("empty multiaddr", func(t *testing.T) { + require.True(t, isUndialableMergedAddr(ma.Multiaddr{})) + }) +} diff --git a/setup_routing.go b/setup_routing.go index 78a1424..a27566a 100644 --- a/setup_routing.go +++ b/setup_routing.go @@ -138,21 +138,21 @@ func parseBootstrapPeers(peers []string, warnOnAuto bool) ([]peer.AddrInfo, erro return bootstrapPeers, nil } -func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, error) { +func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, host.Host, error) { if cfg.DHTRouting == DHTOff { - return nil, nil + return nil, nil, nil } // Parse bootstrap peers bootstrapPeers, err := parseBootstrapPeers(cfg.Bootstrap, true) if err != nil { - return nil, err + return nil, nil, err } // If no bootstrap peers provided, use defaults for seed peering or error otherwise if len(bootstrapPeers) == 0 { if !cfg.SeedPeering { - return nil, fmt.Errorf("no valid bootstrap peers configured - provide bootstrap peers or enable autoconf") + return nil, nil, fmt.Errorf("no valid bootstrap peers configured - provide bootstrap peers or enable autoconf") } // Use default bootstrap peers for seed peering bootstrapPeers = dht.GetDefaultBootstrapPeerAddrInfos() @@ -171,7 +171,7 @@ func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore. libp2p.ResourceManager(dhtRcMgr), ) if err != nil { - return nil, err + return nil, nil, err } } @@ -181,11 +181,11 @@ func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore. dht.Mode(dht.ModeClient), ) if err != nil { - return nil, err + return nil, nil, err } if cfg.DHTRouting == DHTStandard { - return standardClient, nil + return standardClient, dhtHost, nil } if cfg.DHTRouting == DHTAccelerated { @@ -200,15 +200,15 @@ func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore. dht.BucketSize(20), )) if err != nil { - return nil, err + return nil, nil, err } return &bundledDHT{ standard: standardClient, fullRT: fullRTClient, - }, nil + }, dhtHost, nil } - return nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting) + return nil, nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting) } func setupCompositeRouting(delegatedRouters []routing.Routing, dht routing.Routing, cfg Config) routing.Routing { @@ -253,15 +253,15 @@ func setupCompositeRouting(delegatedRouters []routing.Routing, dht routing.Routi return router } -func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, error) { +func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, host.Host, error) { delegatedRouters, err := setupDelegatedRouting(cfg, dnsCache) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - dhtRouter, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc) + dhtRouter, dhtHost, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } router := setupCompositeRouting(delegatedRouters, dhtRouter, cfg) @@ -277,7 +277,7 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat if len(cfg.RemoteBackends) > 0 && cfg.RemoteBackendsIPNS { remoteValueStore, err := gateway.NewRemoteValueStore(cfg.RemoteBackends, nil) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } vs = setupCompositeRouting(append(delegatedRouters, &routinghelpers.Compose{ ValueStore: remoteValueStore, @@ -291,7 +291,7 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat // Parse bootstrap peers for seed peering DHT (don't warn on auto since it's expected) seedBootstrapPeers, err := parseBootstrapPeers(cfg.Bootstrap, false) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } // Use provided bootstrap peers or fall back to defaults @@ -308,11 +308,11 @@ func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Bat pr, err = dht.New(ctx, h, dhtOpts...) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } - return cr, pr, vs, nil + return cr, pr, vs, dhtHost, nil } func setupRoutingNoLibp2p(cfg Config, dnsCache *cachedDNS) (routing.ValueStore, error) {