diff --git a/CHANGELOG.md b/CHANGELOG.md index 3206f006e..f55e94a2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The following emojis are used to highlight certain changes: - 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150) - `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150) +- `routing/providerquerymanager`: new `WithFindPeerFallback` option. When set, a one-shot `FindPeer` fallback runs if the first dial to a provider fails; the manager retries the dial with the fresh `AddrInfo`, but only if `FindPeer` surfaced at least one address that wasn't already in the routing-record set just tried. Rescues providers whose routing-record snapshot is thin or stale but whose actual addresses are still reachable, without wasting a retry on a duplicate address set. Disabled by default; pass `WithFindPeerFallback(myDHT)` to enable. ### Changed - upgrade to `go-libp2p-kad-dht` [v0.39.2](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.39.2) diff --git a/routing/providerquerymanager/providerquerymanager.go b/routing/providerquerymanager/providerquerymanager.go index cd8ed8786..5a82ac5d7 100644 --- a/routing/providerquerymanager/providerquerymanager.go +++ b/routing/providerquerymanager/providerquerymanager.go @@ -13,6 +13,7 @@ import ( peer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" swarm "github.com/libp2p/go-libp2p/p2p/net/swarm" + ma "github.com/multiformats/go-multiaddr" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -50,6 +51,16 @@ type ProviderQueryDialer interface { Connect(context.Context, peer.AddrInfo) error } +// ProviderQueryPeerRouter is the subset of routing.PeerRouting used by the +// FindPeer fallback configured with WithFindPeerFallback: when a dial to a +// provider fails, the manager asks for fresh addresses for that peer and +// retries the dial once, but only if FindPeer returned at least one +// address that wasn't already in the routing-record AddrInfo. Typically +// satisfied by a kademlia DHT client. +type ProviderQueryPeerRouter interface { + FindPeer(context.Context, peer.ID) (peer.AddrInfo, error) +} + type providerQueryMessage interface { debugMessage() handle(pqm *ProviderQueryManager) @@ -90,6 +101,7 @@ type ProviderQueryManager struct { closing chan struct{} dialer ProviderQueryDialer router routing.ContentDiscovery + peerRouting ProviderQueryPeerRouter providerQueryMessages chan providerQueryMessage providerRequestsProcessing *chanqueue.ChanQueue[*findProviderRequest] @@ -145,6 +157,23 @@ func WithIgnoreProviders(peers ...peer.ID) Option { } } +// WithFindPeerFallback enables a one-shot FindPeer fallback: when the first +// dial to a provider fails, the manager asks the supplied peer router for +// fresh addresses and retries the dial once, but only if FindPeer returned +// at least one address that wasn't already in the routing-record AddrInfo +// just attempted. This rescues providers whose routing-record snapshot is +// thin or stale but whose actual addresses can still be reached, without +// wasting a retry when the peer router would only hand back the same set. +// +// Pass nil (or omit the option) to disable. Typically wired with a DHT +// client, e.g. WithFindPeerFallback(myDHT). +func WithFindPeerFallback(pr ProviderQueryPeerRouter) Option { + return func(mgr *ProviderQueryManager) error { + mgr.peerRouting = pr + return nil + } +} + // New initializes a new ProviderQueryManager for a given context and a given // network provider. func New(dialer ProviderQueryDialer, router routing.ContentDiscovery, opts ...Option) (*ProviderQueryManager, error) { @@ -392,6 +421,19 @@ func (pqm *ProviderQueryManager) findProviderWorker() { } err := pqm.dialer.Connect(findProviderCtx, p) + if err != nil && err != swarm.ErrDialToSelf && pqm.peerRouting != nil { + // One-shot rescue: ask the peer router for a fresh + // AddrInfo and retry once, but only if FindPeer + // surfaced at least one address that wasn't in the + // routing-record set we just tried. Retrying with + // the same addresses would just burn another dial + // round for nothing. + refreshed, ferr := pqm.peerRouting.FindPeer(findProviderCtx, p.ID) + if ferr == nil && hasNewAddrs(refreshed.Addrs, p.Addrs) { + span.AddEvent("RetryAfterFindPeer", trace.WithAttributes(attribute.Stringer("peer", p.ID))) + err = pqm.dialer.Connect(findProviderCtx, refreshed) + } + } if err != nil && err != swarm.ErrDialToSelf { span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p.ID))) log.Debugf("failed to connect to provider %s: %s", p.ID, err) @@ -437,6 +479,25 @@ func (pqm *ProviderQueryManager) cleanupInProcessRequests() { } } +// hasNewAddrs reports whether candidate contains at least one multiaddr +// that isn't already in existing. Used to decide whether a FindPeer +// fallback returned anything worth retrying a dial with. +func hasNewAddrs(candidate, existing []ma.Multiaddr) bool { + if len(candidate) == 0 { + return false + } + seen := make(map[string]struct{}, len(existing)) + for _, a := range existing { + seen[a.String()] = struct{}{} + } + for _, a := range candidate { + if _, ok := seen[a.String()]; !ok { + return true + } + } + return false +} + func (pqm *ProviderQueryManager) run() { defer pqm.cleanupInProcessRequests() diff --git a/routing/providerquerymanager/providerquerymanager_test.go b/routing/providerquerymanager/providerquerymanager_test.go index c2fd17f41..8987998e0 100644 --- a/routing/providerquerymanager/providerquerymanager_test.go +++ b/routing/providerquerymanager/providerquerymanager_test.go @@ -11,6 +11,7 @@ import ( cid "github.com/ipfs/go-cid" "github.com/ipfs/go-test/random" "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" ) type fakeProviderDialer struct { @@ -459,3 +460,233 @@ func TestIgnorePeers(t *testing.T) { t.Fatal("did not ignore 4 of the peers") } } + +// callCountingDialer fails the first Connect call for each target peer and +// succeeds on subsequent ones. Used to exercise the WithFindPeerFallback retry. +type callCountingDialer struct { + mu sync.Mutex + callsByID map[peer.ID]int +} + +func (d *callCountingDialer) Connect(_ context.Context, p peer.AddrInfo) error { + d.mu.Lock() + defer d.mu.Unlock() + if d.callsByID == nil { + d.callsByID = map[peer.ID]int{} + } + d.callsByID[p.ID]++ + if d.callsByID[p.ID] == 1 { + return errors.New("first attempt fails") + } + return nil +} + +func (d *callCountingDialer) callCount(id peer.ID) int { + d.mu.Lock() + defer d.mu.Unlock() + return d.callsByID[id] +} + +// fakePeerRouter satisfies ProviderQueryPeerRouter for tests. If findErr is +// non-nil it is returned and findResult is ignored; otherwise findResult is +// returned (use Addrs: nil to simulate "router knows the peer but has no +// addresses"). +type fakePeerRouter struct { + mu sync.Mutex + findResult peer.AddrInfo + findErr error + calls int +} + +func (r *fakePeerRouter) FindPeer(_ context.Context, _ peer.ID) (peer.AddrInfo, error) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls++ + if r.findErr != nil { + return peer.AddrInfo{}, r.findErr + } + return r.findResult, nil +} + +func (r *fakePeerRouter) callCount() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.calls +} + +// fakeProviderDiscoveryWithAddrs surfaces every peer in peersFound with the +// given addrs attached. Lets tests control what AddrInfo the manager hands +// the dialer on the initial attempt, which matters for the +// has-new-addresses check in WithFindPeerFallback. +type fakeProviderDiscoveryWithAddrs struct { + peersFound []peer.ID + addrs []ma.Multiaddr +} + +func (f *fakeProviderDiscoveryWithAddrs) FindProvidersAsync(ctx context.Context, _ cid.Cid, _ int) <-chan peer.AddrInfo { + out := make(chan peer.AddrInfo) + go func() { + defer close(out) + for _, p := range f.peersFound { + select { + case <-ctx.Done(): + return + case out <- peer.AddrInfo{ID: p, Addrs: append([]ma.Multiaddr(nil), f.addrs...)}: + } + } + }() + return out +} + +func collect(ch <-chan peer.AddrInfo) []peer.AddrInfo { + var out []peer.AddrInfo + for ai := range ch { + out = append(out, ai) + } + return out +} + +func TestFindPeerFallbackRescuesFailedDial(t *testing.T) { + peers := random.Peers(1) + p := peers[0] + freshAddr := ma.StringCast("/ip4/198.51.100.7/tcp/4001") + + dialer := &callCountingDialer{} + discovery := &fakeProviderDiscovery{peersFound: peers} + router := &fakePeerRouter{ + findResult: peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{freshAddr}}, + } + + pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router))) + defer pqm.Close() + + sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0)) + + if len(received) != 1 { + t.Fatalf("expected 1 provider after retry, got %d", len(received)) + } + if c := dialer.callCount(p); c != 2 { + t.Errorf("expected 2 Connect calls for %s (initial + retry), got %d", p, c) + } + if c := router.callCount(); c != 1 { + t.Errorf("expected exactly 1 FindPeer call, got %d", c) + } +} + +func TestFindPeerFallbackSkippedWhenNoAddrs(t *testing.T) { + peers := random.Peers(1) + p := peers[0] + + dialer := &callCountingDialer{} + discovery := &fakeProviderDiscovery{peersFound: peers} + router := &fakePeerRouter{ + findResult: peer.AddrInfo{ID: p, Addrs: nil}, + } + + pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router))) + defer pqm.Close() + + sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0)) + + if len(received) != 0 { + t.Fatalf("expected 0 providers, got %d", len(received)) + } + if c := dialer.callCount(p); c != 1 { + t.Errorf("expected exactly 1 Connect call (no retry), got %d", c) + } + if c := router.callCount(); c != 1 { + t.Errorf("expected exactly 1 FindPeer call, got %d", c) + } +} + +func TestFindPeerFallbackSkippedWhenErrors(t *testing.T) { + peers := random.Peers(1) + p := peers[0] + + dialer := &callCountingDialer{} + discovery := &fakeProviderDiscovery{peersFound: peers} + router := &fakePeerRouter{findErr: errors.New("routing: not found")} + + pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router))) + defer pqm.Close() + + sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0)) + + if len(received) != 0 { + t.Fatalf("expected 0 providers, got %d", len(received)) + } + if c := dialer.callCount(p); c != 1 { + t.Errorf("expected exactly 1 Connect call (no retry), got %d", c) + } +} + +// TestFindPeerFallbackSkippedWhenNoNewAddrs verifies the +// hasNewAddrs gate: if FindPeer returns only addresses that were already in +// the routing-record AddrInfo just tried, we don't retry (it would just +// dial the same broken set again). +func TestFindPeerFallbackSkippedWhenNoNewAddrs(t *testing.T) { + peers := random.Peers(1) + p := peers[0] + knownAddr := ma.StringCast("/ip4/198.51.100.7/tcp/4001") + + dialer := &callCountingDialer{} + // Provider record already carries knownAddr. + discovery := &fakeProviderDiscoveryWithAddrs{ + peersFound: peers, + addrs: []ma.Multiaddr{knownAddr}, + } + // FindPeer returns the same addr (plus nothing new). Retry must be skipped. + router := &fakePeerRouter{ + findResult: peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{knownAddr}}, + } + + pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router))) + defer pqm.Close() + + sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0)) + + if len(received) != 0 { + t.Fatalf("expected 0 providers (retry skipped, dial stays failed), got %d", len(received)) + } + if c := dialer.callCount(p); c != 1 { + t.Errorf("expected exactly 1 Connect call (no retry on duplicate addrs), got %d", c) + } + if c := router.callCount(); c != 1 { + t.Errorf("expected exactly 1 FindPeer call, got %d", c) + } +} + +func TestHasNewAddrs(t *testing.T) { + a := ma.StringCast("/ip4/198.51.100.1/tcp/4001") + b := ma.StringCast("/ip4/198.51.100.2/tcp/4001") + c := ma.StringCast("/ip4/198.51.100.3/tcp/4001") + + cases := []struct { + name string + candidate []ma.Multiaddr + existing []ma.Multiaddr + want bool + }{ + {name: "empty candidate", candidate: nil, existing: []ma.Multiaddr{a}, want: false}, + {name: "empty existing", candidate: []ma.Multiaddr{a}, existing: nil, want: true}, + {name: "candidate is subset", candidate: []ma.Multiaddr{a}, existing: []ma.Multiaddr{a, b}, want: false}, + {name: "identical sets", candidate: []ma.Multiaddr{a, b}, existing: []ma.Multiaddr{a, b}, want: false}, + {name: "one new addr", candidate: []ma.Multiaddr{a, c}, existing: []ma.Multiaddr{a, b}, want: true}, + {name: "all new addrs", candidate: []ma.Multiaddr{c}, existing: []ma.Multiaddr{a, b}, want: true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := hasNewAddrs(tc.candidate, tc.existing); got != tc.want { + t.Errorf("hasNewAddrs() = %v, want %v", got, tc.want) + } + }) + } +}