retrieval

package
v0.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: Apache-2.0, MIT Imports: 9 Imported by: 0

Documentation

Overview

Package retrieval tracks the state of an IPFS content retrieval as it moves through path resolution, provider discovery, connection, and data transfer. State lives on the request context and is updated by boxo's path resolver, provider query manager, and gateway middleware as the retrieval progresses.

Typical consumers:

  • boxo/gateway wraps timeout errors with the State (see WrapWithState) so 504 responses include which phase was active and how many providers were found.

  • CLI tools like Kubo can mirror a daemon's State into a local one via the State.Snapshot / State.Apply / State.Notify pub/sub interface to drive a live progress bar during commands like cat, get, or dag export.

Attach with ContextWithState; read with StateFromContext.

Index

Constants

View Source
const ContextKey contextKey = "boxo-retrieval-state"

ContextKey is the key used to store State in a context.Context. This can be used directly with context.WithValue if needed, though the ContextWithState and StateFromContext functions are preferred.

View Source
const MaxProvidersSampleSize = 3

MaxProvidersSampleSize limits the number of provider peer IDs (both found and failed) that are kept as a sample for diagnostic purposes. This prevents unbounded memory growth while still providing useful debugging information.

Variables

This section is empty.

Functions

func WrapWithState

func WrapWithState(ctx context.Context, err error) error

WrapWithState wraps an error with retrieval state from the context. It returns an *ErrorWithState that preserves the state for custom handling.

The error is ALWAYS wrapped if retrieval state exists in the context, because even "no providers found" is meaningful diagnostic information. If the error is already an *ErrorWithState, it returns it unchanged to avoid double-wrapping.

Example usage in a gateway or IPFS implementation:

func fetchBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
    block, err := blockService.GetBlock(ctx, cid)
    if err != nil {
        // Wrap error with retrieval diagnostics if available
        return nil, retrieval.WrapWithState(ctx, err)
    }
    return block, nil
}

Callers can then extract the state for custom handling:

var errWithState *retrieval.ErrorWithState
if errors.As(err, &errWithState) {
    state := errWithState.State()
    if state.ProvidersFound.Load() == 0 {
        // Handle "content not in network" case specially
    }
}

Types

type ErrorWithState

type ErrorWithState struct {
	// contains filtered or unexported fields
}

ErrorWithState wraps an error with retrieval state information. It preserves the retrieval diagnostics for programmatic access while providing human-readable error messages.

The zero value is not useful; use WrapWithState to create instances.

func (*ErrorWithState) Error

func (e *ErrorWithState) Error() string

Error returns the error message with retrieval diagnostics appended. Format: "original error: retrieval: diagnostic summary"

If err is nil, returns a generic message. If state is nil, returns just the underlying error message.

func (*ErrorWithState) State

func (e *ErrorWithState) State() *State

State returns the retrieval state associated with this error. This allows callers to access detailed diagnostics for custom handling.

func (*ErrorWithState) Unwrap

func (e *ErrorWithState) Unwrap() error

Unwrap returns the wrapped error, allowing errors.Is and errors.As to work with the underlying error.

type RetrievalPhase

type RetrievalPhase int

RetrievalPhase represents the current phase of content retrieval. Phases progress monotonically - they can only move forward, never backward. This helps identify where in the retrieval process a timeout or failure occurred.

const (
	// PhaseInitializing indicates the retrieval process has not yet started.
	PhaseInitializing RetrievalPhase = iota
	// PhasePathResolution indicates the system is resolving an IPFS path to determine
	// what content needs to be fetched (e.g., /ipfs/Qm.../path/to/file).
	PhasePathResolution
	// PhaseProviderDiscovery indicates the system is finding peers that have the content.
	PhaseProviderDiscovery
	// PhaseConnecting indicates the system is establishing connections to providers.
	PhaseConnecting
	// PhaseDataRetrieval indicates the system is transferring data to the client.
	PhaseDataRetrieval
)

func (RetrievalPhase) String

func (p RetrievalPhase) String() string

String returns a human-readable name for the retrieval phase, used in error messages and log output. JSON encoding of phases (in Snapshot) uses the underlying int.

type Snapshot added in v0.40.0

type Snapshot struct {
	Phase              RetrievalPhase
	ProvidersFound     int32
	ProvidersAttempted int32
	ProvidersConnected int32
	FoundProviders     []peer.ID
	FailedProviders    []peer.ID
	RootCID            cid.Cid
	TerminalCID        cid.Cid
}

Snapshot is an immutable copy of a State at a point in time. It is safe to share across goroutines and to serialize as JSON. Receivers (e.g. CLIs reading from a streaming endpoint) reconstitute a local State by calling State.Apply with the snapshot.

JSON encoding uses Go's default field naming (PascalCase, matching the struct fields verbatim). Phase is encoded as the underlying integer of RetrievalPhase (type RetrievalPhase int). Receivers can compare against the PhaseInitializing / PhasePathResolution / etc. constants directly, or call RetrievalPhase.String for a human-readable form.

type State

type State struct {
	// ProvidersFound tracks the number of providers discovered for the content.
	ProvidersFound atomic.Int32
	// ProvidersAttempted tracks the number of providers we tried to connect to.
	ProvidersAttempted atomic.Int32
	// ProvidersConnected tracks the number of providers successfully connected.
	ProvidersConnected atomic.Int32
	// contains filtered or unexported fields
}

State tracks diagnostic information about IPFS content retrieval operations. It is safe for concurrent use and maintains monotonic stage progression. Use ContextWithState to add tracking to a context, and StateFromContext to retrieve the state for updates or inspection

func ContextWithState

func ContextWithState(ctx context.Context) (context.Context, *State)

ContextWithState ensures a State exists in the context. If the context already contains a State, it returns the existing one. Otherwise, it creates a new State and adds it to the context. This function is idempotent and safe to call multiple times.

Example:

ctx, retrievalState := retrieval.ContextWithState(ctx)
// Use retrievalState to track progress
retrievalState.SetStage(retrieval.StageProviderDiscovery)

func NewState

func NewState() *State

NewState creates a new State initialized to PhaseInitializing. The returned state is safe for concurrent use.

func StateFromContext

func StateFromContext(ctx context.Context) *State

StateFromContext retrieves the State from the context. Returns nil if no State is present in the context. This function is typically used by subsystems to check if retrieval tracking is enabled and to update the state if it is.

Example:

if retrievalState := retrieval.StateFromContext(ctx); retrievalState != nil {
    retrievalState.SetStage(retrieval.StageBlockRetrieval)
    retrievalState.ProvidersFound.Add(1)
}

func (*State) AddFailedProvider

func (rs *State) AddFailedProvider(peerID peer.ID)

AddFailedProvider records a provider peer ID that failed to deliver the requested content. This method is safe for concurrent use.

func (*State) AddFoundProvider

func (rs *State) AddFoundProvider(peerID peer.ID)

AddFoundProvider records a provider peer ID that was discovered during provider search. This method is safe for concurrent use.

func (*State) Apply added in v0.40.0

func (rs *State) Apply(s Snapshot)

Apply mirrors a Snapshot onto this State. It is intended for receivers that observe a remote State over a transport (e.g. NDJSON over HTTP) and want to reflect the remote values into a local State that some local UI is observing. Phase progression remains monotonic: a snapshot with an earlier phase will not move the local phase backwards. All writes happen under one critical section, so observers either see the snapshot in full or not at all, and Apply emits exactly one wake-up on State.Notify.

Apply assumes snapshots arrive in causal order from a single producer. Out-of-order delivery (e.g. multiple producers, or a transport that reorders) is unsupported: counters and CID/slice fields are written unconditionally, so a stale snapshot can regress them. The retrieval-state pipeline shipped in kubo (one daemon-side State, one CLI-side subscriber, ordered NDJSON) satisfies this assumption by construction.

func (*State) GetFailedProviders

func (rs *State) GetFailedProviders() []peer.ID

GetFailedProviders returns a sample of failed providers (up to MaxProvidersSampleSize). This is not all providers, just the first few for diagnostic purposes.

func (*State) GetFoundProviders

func (rs *State) GetFoundProviders() []peer.ID

GetFoundProviders returns a sample of found providers (up to MaxProvidersSampleSize). This is not all providers, just the first few for diagnostic purposes.

func (*State) GetPhase

func (rs *State) GetPhase() RetrievalPhase

GetPhase returns the current retrieval phase. This method is safe for concurrent use.

func (*State) GetRootCID

func (rs *State) GetRootCID() cid.Cid

GetRootCID returns the root CID (first CID in the path). This method is safe for concurrent use.

func (*State) GetTerminalCID

func (rs *State) GetTerminalCID() cid.Cid

GetTerminalCID returns the terminal CID (CID of terminating DAG entity). This method is safe for concurrent use.

func (*State) Notify added in v0.40.0

func (rs *State) Notify() <-chan struct{}

Notify returns a size-1 channel that is signalled when the State changes. Writes are coalescing: if multiple updates happen between successive receives, the receiver wakes once and should call State.Snapshot to observe the latest values.

Lifecycle: the channel never closes. Subscribers should stop receiving by other means, typically a context cancellation in the surrounding select:

for {
    select {
    case <-ctx.Done():
        return
    case <-state.Notify():
        publish(state.Snapshot())
    }
}

Single-subscriber: the channel is shared, not fan-out. If two goroutines receive on it, each wake-up goes to one of them non-deterministically and the other misses it. To support multiple subscribers, fan out via your own goroutine: a single reader on Notify that broadcasts to a slice of per-subscriber channels.

func (*State) SetPhase

func (rs *State) SetPhase(phase RetrievalPhase)

SetPhase updates the current retrieval phase to the given phase. The phase progression is monotonic - phases can only move forward, never backward. If the provided phase is less than or equal to the current phase, this is a no-op. This method is safe for concurrent use.

func (*State) SetRootCID

func (rs *State) SetRootCID(c cid.Cid)

SetRootCID sets the root CID (first CID in the path). This method is safe for concurrent use.

func (*State) SetTerminalCID

func (rs *State) SetTerminalCID(c cid.Cid)

SetTerminalCID sets the terminal CID (CID of terminating DAG entity). This method is safe for concurrent use.

func (*State) Snapshot added in v0.40.0

func (rs *State) Snapshot() Snapshot

Snapshot returns the current state as an immutable value. Slice fields are cloned, so callers may freely retain or modify the result without affecting the live State.

Consistency: the read takes the State's lock for slices and CIDs, but counter fields ([State.ProvidersFound] etc.) are atomics that other writers update without the lock. A concurrent writer that mutates an atomic counter while Snapshot is running may produce a snapshot whose counters are slightly newer than its slices (or vice versa). For observability and progress UI use cases this eventual consistency is fine; callers needing a single-instant atomic view across all fields would need writers to also take the lock, which would slow them down.

func (*State) Summary

func (rs *State) Summary() string

Summary generates a human-readable summary of the retrieval state, useful for timeout error messages and diagnostics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL