pipe

package
v1.11.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// This value is used as the default duration that a send on a
	// pipe will wait before extending the pipe's internal buffer.
	//
	// A negative value disables the pipe extension functionality.
	DefaultExtendAfter time.Duration = -1

	// This value is used as the default value that indicates the maximum
	// number of times that a pipe's internal buffer may be extended.
	//
	// A negative value will allow unlimited extensions.
	DefaultMaxExtensions int = -1
)

Variables

View Source
var ErrInvalidSize = errors.New("pipe size must be a power of two")

Functions

This section is empty.

Types

type Pipe

type Pipe[T any] struct {
	// contains filtered or unexported fields
}

Pipe is a struct that implements the TxCloser and Rx interfaces. A Pipe will buffer sent values up to its capacity. All methods on Pipe are concurrency safe.

Once a Pipe's capacity has been reached, subsequent calls to Send on the Pipe will block until Recv or Close is called on the Pipe.

When a Pipe's extension configuration is set, its internal buffer will be extended when a call to Send blocks for longer than the configured threshold. A Pipe's buffer is doubled by each extension and may be extended up to the configured maximum number of times.

When a Pipe is empty -- no values are currently buffered -- subsequent calls to Recv on the Pipe will block until Send or Close is called on the Pipe.

Once Close has been called on a Pipe, subsequent calls to Send and Recv will return a `false` value. Additionally, all iter.Seq values returned by calls to Seq on the Pipe will terminate.

func Must

func Must[T any](n int) *Pipe[T]

Must is a function that returns a new instance of a Pipe, or panics if an error is encountered.

func New

func New[T any](n int) (*Pipe[T], error)

New is a function that instantiates a new Pipe with a size of n. The value of n must be a valid power of two. Any other value will result in an error.

func (*Pipe[T]) Capacity

func (p *Pipe[T]) Capacity() int

Capacity is a function used to retrieve the current number of items that the pipe's internal buffer can hold.

func (*Pipe[T]) Close

func (p *Pipe[T]) Close() error

Close is a function that stops the Pipe from sending or receiving values. Once Close has been called on a Pipe, all subsequent calls to Send and Recv on that Pipe will immediately return false. All callers in a waiting state will awaken and return `false`.

func (*Pipe[T]) Grow

func (p *Pipe[T]) Grow(n int) error

Grow is a function that increases the pipe's total capacity, if necessary, to guarantee space up to n items. After Grow(n), at least n items can be sent to the pipe without another allocation. If n is not a power of two, Grow will return an ErrInvalidSize error.

func (*Pipe[T]) Recv

func (p *Pipe[T]) Recv(t *T) bool

Recv is a function that receives values from a Pipe. Recv returns `true` when the value of t was successfully overwritten. Recv returns `false` when the Pipe has been closed. Once Recv has returned a `false` value, all subsequent calls to Recv on the same Pipe will return `false`.

When a Pipe's internal buffer is empty, calls to Recv will block until a call to Send or Close is made on the same Pipe.

func (*Pipe[T]) Send

func (p *Pipe[T]) Send(item T) bool

Send is a function that sends values to the Pipe. Send returns `true` when the value was successfully accepted by the Pipe and stored in its internal buffer. Send returns `false` when the Pipe has been closed. Once Send has returned a `false` value, all subsequent calls to Send on the same Pipe will return `false`.

When a Pipe's internal buffer has filled to capacity, calls to Send will block until a call to Recv or Close is made on the same Pipe.

func (*Pipe[T]) Seq

func (p *Pipe[T]) Seq() iter.Seq[T]

Seq is a function that returns an iter.Seq instance that allows the caller to iterate over the values received from the Pipe. When a caller terminates iteration of a returned iter.Seq, all other iter.Seq values returned from the same Pipe will terminate.

func (*Pipe[T]) SetExtensionConfig

func (p *Pipe[T]) SetExtensionConfig(extendAfter time.Duration, maxExtensions int)

SetExtensionConfig is a function that sets the configuration for a pipe's internal buffer.

When sending a value to a pipe takes longer than the extendAfter duration, the pipe's internal buffer will be extended automatically. The buffer will be extended up to the maxExtensions value.

The buffer's size doubles after each extension.

A negative extendAfter value will disable dynamic extension. A 0 value may be provided for extendAfter to indicate immediate extension.

A negative maxExtensions value will allow unlimited extensions.

func (*Pipe[T]) Size

func (p *Pipe[T]) Size() int

Size is a function used to retrieve the current number of items waiting in the pipe's internal buffer to be received.

type Rx

type Rx[T any] interface {
	// Recv is a function that provides a "pull" mechanism for reading
	// values from the Rx. Recv overwrites the value of t with the next
	// value read from the Rx and returns `true` only when a new value
	// has been written to t. The implementation of Rx must make it safe
	// to call Recv concurrently.
	//
	// After a call to Recv has returned `false`, all subsequent calls to
	// Recv must also return `false`. Additionally any iter.Seq returned
	// by a call to Seq must also terminate at this point.
	Recv(t *T) bool

	// Seq is a function that provides a way to iterate over values in
	// the Rx. Subsequent calls to Seq on the same Rx must provide a unique
	// iterator that contends for values from the Rx. This means that values
	// are not broadcast to each iter.Seq from the same Rx. No guarantees
	// are made about the order in which an iter.Seq will receive a value.
	//
	// The implementation of Rx must make it safe to call Seq concurrently.
	// Once iteration of any produced iter.Seq has stopped, all other iter.Seq
	// instances produced by the same Rx will terminate.
	Seq() iter.Seq[T]
}

Rx is an interface that exposes methods for receiving values. Any implementation of Rx is intended to be concurrency safe.

func StaticRx

func StaticRx[T any](items ...T) Rx[T]

StaticRx is a function that instantiates a Rx value with the given items. Once all of the items have been received from the Rx, subsequent calls to Recv on the same Rx will always return `false`.

type Tx

type Tx[T any] interface {
	// Send is a function that provides a way to send a value through the Tx.
	// When a value has been sent successfully, Send must return a `true` value.
	// When a value has not been sent, for any reason, Send must return a `false`
	// value. The implementation of Tx must make it safe to call Send concurrently.
	//
	// Once Send has returned a `false` value, all subsequent calls to
	// Send must also return a `false` value.
	Send(T) bool
}

Tx is an interface that exposes methods for sending values. Any implementation of Tx is intended to be concurrency safe.

type TxCloser

type TxCloser[T any] interface {
	Tx[T]
	io.Closer
}

TxCloser is an interface that exposes methods for sending values and closing the instance. It is a combination of Tx and io.Closer interfaces. Any implementation of TxCloser is intended to be concurrency safe.

Once Close has been called on a TxCloser, all subsequent calls to Send must return a `false` value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL