Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The following emojis are used to highlight certain changes:

- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150)
- `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150)
- `routing/providerquerymanager`: new `WithFindPeerFallback` option. When set, a one-shot `FindPeer` fallback runs if the first dial to a provider fails; the manager retries the dial with the fresh `AddrInfo`, but only if `FindPeer` surfaced at least one address that wasn't already in the routing-record set just tried. Rescues providers whose routing-record snapshot is thin or stale but whose actual addresses are still reachable, without wasting a retry on a duplicate address set. Disabled by default; pass `WithFindPeerFallback(myDHT)` to enable.

### Changed
- upgrade to `go-libp2p-kad-dht` [v0.39.2](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.39.2)
Expand Down
61 changes: 61 additions & 0 deletions routing/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
ma "github.com/multiformats/go-multiaddr"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -50,6 +51,16 @@ type ProviderQueryDialer interface {
Connect(context.Context, peer.AddrInfo) error
}

// ProviderQueryPeerRouter is the subset of routing.PeerRouting used by the
// FindPeer fallback configured with WithFindPeerFallback: when a dial to a
// provider fails, the manager asks for fresh addresses for that peer and
// retries the dial once, but only if FindPeer returned at least one
// address that wasn't already in the routing-record AddrInfo. Typically
// satisfied by a kademlia DHT client.
type ProviderQueryPeerRouter interface {
FindPeer(context.Context, peer.ID) (peer.AddrInfo, error)
}

type providerQueryMessage interface {
debugMessage()
handle(pqm *ProviderQueryManager)
Expand Down Expand Up @@ -90,6 +101,7 @@ type ProviderQueryManager struct {
closing chan struct{}
dialer ProviderQueryDialer
router routing.ContentDiscovery
peerRouting ProviderQueryPeerRouter
providerQueryMessages chan providerQueryMessage
providerRequestsProcessing *chanqueue.ChanQueue[*findProviderRequest]

Expand Down Expand Up @@ -145,6 +157,23 @@ func WithIgnoreProviders(peers ...peer.ID) Option {
}
}

// WithFindPeerFallback enables a one-shot FindPeer fallback: when the first
// dial to a provider fails, the manager asks the supplied peer router for
// fresh addresses and retries the dial once, but only if FindPeer returned
// at least one address that wasn't already in the routing-record AddrInfo
// just attempted. This rescues providers whose routing-record snapshot is
// thin or stale but whose actual addresses can still be reached, without
// wasting a retry when the peer router would only hand back the same set.
//
// Pass nil (or omit the option) to disable. Typically wired with a DHT
// client, e.g. WithFindPeerFallback(myDHT).
func WithFindPeerFallback(pr ProviderQueryPeerRouter) Option {
return func(mgr *ProviderQueryManager) error {
mgr.peerRouting = pr
return nil
}
}

// New initializes a new ProviderQueryManager for a given context and a given
// network provider.
func New(dialer ProviderQueryDialer, router routing.ContentDiscovery, opts ...Option) (*ProviderQueryManager, error) {
Expand Down Expand Up @@ -392,6 +421,19 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
}

err := pqm.dialer.Connect(findProviderCtx, p)
if err != nil && err != swarm.ErrDialToSelf && pqm.peerRouting != nil {
// One-shot rescue: ask the peer router for a fresh
// AddrInfo and retry once, but only if FindPeer
// surfaced at least one address that wasn't in the
// routing-record set we just tried. Retrying with
// the same addresses would just burn another dial
// round for nothing.
refreshed, ferr := pqm.peerRouting.FindPeer(findProviderCtx, p.ID)
if ferr == nil && hasNewAddrs(refreshed.Addrs, p.Addrs) {
span.AddEvent("RetryAfterFindPeer", trace.WithAttributes(attribute.Stringer("peer", p.ID)))
err = pqm.dialer.Connect(findProviderCtx, refreshed)
}
}
if err != nil && err != swarm.ErrDialToSelf {
span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p.ID)))
log.Debugf("failed to connect to provider %s: %s", p.ID, err)
Expand Down Expand Up @@ -437,6 +479,25 @@ func (pqm *ProviderQueryManager) cleanupInProcessRequests() {
}
}

// hasNewAddrs reports whether candidate contains at least one multiaddr
// that isn't already in existing. Used to decide whether a FindPeer
// fallback returned anything worth retrying a dial with.
func hasNewAddrs(candidate, existing []ma.Multiaddr) bool {
if len(candidate) == 0 {
return false
}
seen := make(map[string]struct{}, len(existing))
for _, a := range existing {
seen[a.String()] = struct{}{}
}
for _, a := range candidate {
if _, ok := seen[a.String()]; !ok {
return true
}
}
return false
}

func (pqm *ProviderQueryManager) run() {
defer pqm.cleanupInProcessRequests()

Expand Down
231 changes: 231 additions & 0 deletions routing/providerquerymanager/providerquerymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-test/random"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)

type fakeProviderDialer struct {
Expand Down Expand Up @@ -459,3 +460,233 @@ func TestIgnorePeers(t *testing.T) {
t.Fatal("did not ignore 4 of the peers")
}
}

// callCountingDialer fails the first Connect call for each target peer and
// succeeds on subsequent ones. Used to exercise the WithFindPeerFallback retry.
type callCountingDialer struct {
mu sync.Mutex
callsByID map[peer.ID]int
}

func (d *callCountingDialer) Connect(_ context.Context, p peer.AddrInfo) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.callsByID == nil {
d.callsByID = map[peer.ID]int{}
}
d.callsByID[p.ID]++
if d.callsByID[p.ID] == 1 {
return errors.New("first attempt fails")
}
return nil
}

func (d *callCountingDialer) callCount(id peer.ID) int {
d.mu.Lock()
defer d.mu.Unlock()
return d.callsByID[id]
}

// fakePeerRouter satisfies ProviderQueryPeerRouter for tests. If findErr is
// non-nil it is returned and findResult is ignored; otherwise findResult is
// returned (use Addrs: nil to simulate "router knows the peer but has no
// addresses").
type fakePeerRouter struct {
mu sync.Mutex
findResult peer.AddrInfo
findErr error
calls int
}

func (r *fakePeerRouter) FindPeer(_ context.Context, _ peer.ID) (peer.AddrInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls++
if r.findErr != nil {
return peer.AddrInfo{}, r.findErr
}
return r.findResult, nil
}

func (r *fakePeerRouter) callCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.calls
}

// fakeProviderDiscoveryWithAddrs surfaces every peer in peersFound with the
// given addrs attached. Lets tests control what AddrInfo the manager hands
// the dialer on the initial attempt, which matters for the
// has-new-addresses check in WithFindPeerFallback.
type fakeProviderDiscoveryWithAddrs struct {
peersFound []peer.ID
addrs []ma.Multiaddr
}

func (f *fakeProviderDiscoveryWithAddrs) FindProvidersAsync(ctx context.Context, _ cid.Cid, _ int) <-chan peer.AddrInfo {
out := make(chan peer.AddrInfo)
go func() {
defer close(out)
for _, p := range f.peersFound {
select {
case <-ctx.Done():
return
case out <- peer.AddrInfo{ID: p, Addrs: append([]ma.Multiaddr(nil), f.addrs...)}:
}
}
}()
return out
}

func collect(ch <-chan peer.AddrInfo) []peer.AddrInfo {
var out []peer.AddrInfo
for ai := range ch {
out = append(out, ai)
}
return out
}

func TestFindPeerFallbackRescuesFailedDial(t *testing.T) {
peers := random.Peers(1)
p := peers[0]
freshAddr := ma.StringCast("/ip4/198.51.100.7/tcp/4001")

dialer := &callCountingDialer{}
discovery := &fakeProviderDiscovery{peersFound: peers}
router := &fakePeerRouter{
findResult: peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{freshAddr}},
}

pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router)))
defer pqm.Close()

sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0))

if len(received) != 1 {
t.Fatalf("expected 1 provider after retry, got %d", len(received))
}
if c := dialer.callCount(p); c != 2 {
t.Errorf("expected 2 Connect calls for %s (initial + retry), got %d", p, c)
}
if c := router.callCount(); c != 1 {
t.Errorf("expected exactly 1 FindPeer call, got %d", c)
}
}

func TestFindPeerFallbackSkippedWhenNoAddrs(t *testing.T) {
peers := random.Peers(1)
p := peers[0]

dialer := &callCountingDialer{}
discovery := &fakeProviderDiscovery{peersFound: peers}
router := &fakePeerRouter{
findResult: peer.AddrInfo{ID: p, Addrs: nil},
}

pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router)))
defer pqm.Close()

sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0))

if len(received) != 0 {
t.Fatalf("expected 0 providers, got %d", len(received))
}
if c := dialer.callCount(p); c != 1 {
t.Errorf("expected exactly 1 Connect call (no retry), got %d", c)
}
if c := router.callCount(); c != 1 {
t.Errorf("expected exactly 1 FindPeer call, got %d", c)
}
}

func TestFindPeerFallbackSkippedWhenErrors(t *testing.T) {
peers := random.Peers(1)
p := peers[0]

dialer := &callCountingDialer{}
discovery := &fakeProviderDiscovery{peersFound: peers}
router := &fakePeerRouter{findErr: errors.New("routing: not found")}

pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router)))
defer pqm.Close()

sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0))

if len(received) != 0 {
t.Fatalf("expected 0 providers, got %d", len(received))
}
if c := dialer.callCount(p); c != 1 {
t.Errorf("expected exactly 1 Connect call (no retry), got %d", c)
}
}

// TestFindPeerFallbackSkippedWhenNoNewAddrs verifies the
// hasNewAddrs gate: if FindPeer returns only addresses that were already in
// the routing-record AddrInfo just tried, we don't retry (it would just
// dial the same broken set again).
func TestFindPeerFallbackSkippedWhenNoNewAddrs(t *testing.T) {
peers := random.Peers(1)
p := peers[0]
knownAddr := ma.StringCast("/ip4/198.51.100.7/tcp/4001")

dialer := &callCountingDialer{}
// Provider record already carries knownAddr.
discovery := &fakeProviderDiscoveryWithAddrs{
peersFound: peers,
addrs: []ma.Multiaddr{knownAddr},
}
// FindPeer returns the same addr (plus nothing new). Retry must be skipped.
router := &fakePeerRouter{
findResult: peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{knownAddr}},
}

pqm := mustNotErr(New(dialer, discovery, WithFindPeerFallback(router)))
defer pqm.Close()

sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
received := collect(pqm.FindProvidersAsync(sessionCtx, random.Cids(1)[0], 0))

if len(received) != 0 {
t.Fatalf("expected 0 providers (retry skipped, dial stays failed), got %d", len(received))
}
if c := dialer.callCount(p); c != 1 {
t.Errorf("expected exactly 1 Connect call (no retry on duplicate addrs), got %d", c)
}
if c := router.callCount(); c != 1 {
t.Errorf("expected exactly 1 FindPeer call, got %d", c)
}
}

func TestHasNewAddrs(t *testing.T) {
a := ma.StringCast("/ip4/198.51.100.1/tcp/4001")
b := ma.StringCast("/ip4/198.51.100.2/tcp/4001")
c := ma.StringCast("/ip4/198.51.100.3/tcp/4001")

cases := []struct {
name string
candidate []ma.Multiaddr
existing []ma.Multiaddr
want bool
}{
{name: "empty candidate", candidate: nil, existing: []ma.Multiaddr{a}, want: false},
{name: "empty existing", candidate: []ma.Multiaddr{a}, existing: nil, want: true},
{name: "candidate is subset", candidate: []ma.Multiaddr{a}, existing: []ma.Multiaddr{a, b}, want: false},
{name: "identical sets", candidate: []ma.Multiaddr{a, b}, existing: []ma.Multiaddr{a, b}, want: false},
{name: "one new addr", candidate: []ma.Multiaddr{a, c}, existing: []ma.Multiaddr{a, b}, want: true},
{name: "all new addrs", candidate: []ma.Multiaddr{c}, existing: []ma.Multiaddr{a, b}, want: true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := hasNewAddrs(tc.candidate, tc.existing); got != tc.want {
t.Errorf("hasNewAddrs() = %v, want %v", got, tc.want)
}
})
}
}
Loading