pipe

package
v1.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidCapacity = errors.New("pipe capacity must be a power of two")

Functions

This section is empty.

Types

type Config added in v1.11.4

type Config struct {
	// Capacity is a value that sets the initial capacity of a Pipe.
	// Capacity must be a power of two and must not be negative.
	Capacity int

	// ExtendAfter is a value that sets the duration of time that a
	// Pipe will wait on a blocked call to Send before increasing its
	// capacity. A negative value will disable capacity extension.
	ExtendAfter time.Duration

	// MaxExtensions is a value that sets the maximum number of times
	// that a Pipe will dynamically increase its capacity. A zero
	// value will disable capacity extension. A negative value enables
	// unlimited extensions.
	MaxExtensions int
}

Config is a struct that provides a Pipe its configuration values.

func DefaultConfig added in v1.11.4

func DefaultConfig() Config

func (*Config) Validate added in v1.11.4

func (config *Config) Validate() error

Validate is a function that validates the values of the config. An ErrInvalidCapacity error is returned if the capacity value is not a valid power of two.

type Pipe

type Pipe[T any] struct {
	// contains filtered or unexported fields
}

Pipe is a struct that implements the TxCloser and Rx interfaces. A Pipe will buffer sent values up to its capacity. All methods on Pipe are concurrency safe.

Once a Pipe's capacity has been reached, subsequent calls to Send on the Pipe will block until Recv or Close is called on the Pipe.

When a Pipe's extension configuration is set, its internal buffer will be extended when a call to Send blocks for longer than the configured threshold. A Pipe's buffer is doubled by each extension and may be extended up to the configured maximum number of times.

When a Pipe is empty -- no values are currently buffered -- subsequent calls to Recv on the Pipe will block until Send or Close is called on the Pipe.

Once Close has been called on a Pipe, subsequent calls to Send and Recv will return a `false` value. Additionally, all iter.Seq values returned by calls to Seq on the Pipe will terminate.

func Must

func Must[T any](config Config) *Pipe[T]

Must is a function that returns a new instance of a Pipe, or panics if an error is encountered.

func New

func New[T any](config Config) (*Pipe[T], error)

New is a function that instantiates a new Pipe with a size of n. The value of n must be a valid power of two. Any other value will result in an error.

func (*Pipe[T]) Capacity

func (p *Pipe[T]) Capacity() int

Capacity is a function used to retrieve the current number of items that the pipe's internal buffer can hold.

func (*Pipe[T]) Close

func (p *Pipe[T]) Close() error

Close is a function that stops the Pipe from sending or receiving values. Once Close has been called on a Pipe, all subsequent calls to Send and Recv on that Pipe will immediately return false. All callers in a waiting state will awaken and return `false`.

func (*Pipe[T]) Grow

func (p *Pipe[T]) Grow(n int) error

Grow is a function that increases the pipe's total capacity, if necessary, to guarantee space up to n items. After Grow(n), at least n items can be sent to the pipe without another allocation. If n is not a power of two, Grow will return an ErrInvalidSize error.

func (*Pipe[T]) Recv

func (p *Pipe[T]) Recv(t *T) bool

Recv is a function that receives values from a Pipe. Recv returns `true` when the value of t was successfully overwritten. Recv returns `false` when the Pipe has been closed. Once Recv has returned a `false` value, all subsequent calls to Recv on the same Pipe will return `false`.

When a Pipe's internal buffer is empty, calls to Recv will block until a call to Send or Close is made on the same Pipe.

func (*Pipe[T]) Send

func (p *Pipe[T]) Send(item T) bool

Send is a function that sends values to the Pipe. Send returns `true` when the value was successfully accepted by the Pipe and stored in its internal buffer. Send returns `false` when the Pipe has been closed. Once Send has returned a `false` value, all subsequent calls to Send on the same Pipe will return `false`.

When a Pipe's internal buffer has filled to capacity, calls to Send will block until a call to Recv or Close is made on the same Pipe.

func (*Pipe[T]) Seq

func (p *Pipe[T]) Seq() iter.Seq[T]

Seq is a function that returns an iter.Seq instance that allows the caller to iterate over the values received from the Pipe. When a caller terminates iteration of a returned iter.Seq, all other iter.Seq values returned from the same Pipe will terminate.

func (*Pipe[T]) Size

func (p *Pipe[T]) Size() int

Size is a function used to retrieve the current number of items waiting in the pipe's internal buffer to be received.

type Rx

type Rx[T any] interface {
	// Recv is a function that provides a "pull" mechanism for reading
	// values from the Rx. Recv overwrites the value of t with the next
	// value read from the Rx and returns `true` only when a new value
	// has been written to t. The implementation of Rx must make it safe
	// to call Recv concurrently.
	//
	// After a call to Recv has returned `false`, all subsequent calls to
	// Recv must also return `false`. Additionally any iter.Seq returned
	// by a call to Seq must also terminate at this point.
	Recv(t *T) bool

	// Seq is a function that provides a way to iterate over values in
	// the Rx. Subsequent calls to Seq on the same Rx must provide a unique
	// iterator that contends for values from the Rx. This means that values
	// are not broadcast to each iter.Seq from the same Rx. No guarantees
	// are made about the order in which an iter.Seq will receive a value.
	//
	// The implementation of Rx must make it safe to call Seq concurrently.
	// Once iteration of any produced iter.Seq has stopped, all other iter.Seq
	// instances produced by the same Rx will terminate.
	Seq() iter.Seq[T]
}

Rx is an interface that exposes methods for receiving values. Any implementation of Rx is intended to be concurrency safe.

type RxCloser added in v1.11.4

type RxCloser[T any] interface {
	Rx[T]
	io.Closer
}

RxCloser is an interface that exposes methods for receiving values and closing the instance. It is a combination of Rx and io.Closer interfaces. Any implementation of RxCloser is intended to be concurrency safe.

Once Close has been called on an RxCloser, subsequent calls to Receive must return a `true` value until the buffer is empty.

func StaticRx

func StaticRx[T any](items ...T) RxCloser[T]

StaticRx is a function that instantiates a Rx value with the given items. Once all of the items have been received from the Rx, subsequent calls to Recv on the same Rx will always return `false`.

type Tx

type Tx[T any] interface {
	// Send is a function that provides a way to send a value through the Tx.
	// When a value has been sent successfully, Send must return a `true` value.
	// When a value has not been sent, for any reason, Send must return a `false`
	// value. The implementation of Tx must make it safe to call Send concurrently.
	//
	// Once Send has returned a `false` value, all subsequent calls to
	// Send must also return a `false` value.
	Send(T) bool
}

Tx is an interface that exposes methods for sending values. Any implementation of Tx is intended to be concurrency safe.

type TxCloser

type TxCloser[T any] interface {
	Tx[T]
	io.Closer
}

TxCloser is an interface that exposes methods for sending values and closing the instance. It is a combination of Tx and io.Closer interfaces. Any implementation of TxCloser is intended to be concurrency safe.

Once Close has been called on a TxCloser, all subsequent calls to Send must return a `false` value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL