streaming

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package streaming provides experimental durable streaming APIs for Genkit.

APIs in this package are under active development and may change in any minor version release. Use with caution in production environments.

When these APIs stabilize, they will be moved to their parent packages (e.g., core and genkit) and these exports will be deprecated.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type InMemoryStreamManager

type InMemoryStreamManager struct {
	// contains filtered or unexported fields
}

InMemoryStreamManager is an in-memory implementation of StreamManager. Useful for testing or single-instance deployments where persistence is not required. Call Close to stop the background cleanup goroutine when the manager is no longer needed.

func NewInMemoryStreamManager

func NewInMemoryStreamManager(opts ...StreamManagerOption) *InMemoryStreamManager

NewInMemoryStreamManager creates a new InMemoryStreamManager. A background goroutine is started to periodically clean up expired streams. Call Close to stop the goroutine when the manager is no longer needed.

func (*InMemoryStreamManager) Close

func (m *InMemoryStreamManager) Close()

Close stops the background cleanup goroutine and releases resources. This method blocks until the cleanup goroutine has stopped.

func (*InMemoryStreamManager) Open

func (m *InMemoryStreamManager) Open(ctx context.Context, streamID string) (StreamInput, error)

Open creates a new stream for writing.

func (*InMemoryStreamManager) Subscribe

func (m *InMemoryStreamManager) Subscribe(ctx context.Context, streamID string) (<-chan StreamEvent, func(), error)

Subscribe subscribes to an existing stream.

type StreamEvent

type StreamEvent struct {
	Type   StreamEventType
	Chunk  json.RawMessage // set when Type == StreamEventChunk
	Output json.RawMessage // set when Type == StreamEventDone
	Err    error           // set when Type == StreamEventError
}

StreamEvent represents an event in a durable stream.

type StreamEventType

type StreamEventType int

StreamEventType indicates the type of stream event.

const (
	StreamEventChunk StreamEventType = iota
	StreamEventDone
	StreamEventError
)

type StreamInput

type StreamInput interface {
	// Write sends a chunk to the stream and notifies all subscribers.
	Write(ctx context.Context, chunk json.RawMessage) error
	// Done marks the stream as successfully completed with the given output.
	Done(ctx context.Context, output json.RawMessage) error
	// Error marks the stream as failed with the given error.
	Error(ctx context.Context, err error) error
	// Close releases resources without marking the stream as done or errored.
	Close() error
}

StreamInput provides methods for writing to a durable stream.

type StreamManager

type StreamManager interface {
	// Open creates a new stream for writing.
	// Returns an error if a stream with the given ID already exists.
	Open(ctx context.Context, streamID string) (StreamInput, error)
	// Subscribe subscribes to an existing stream.
	// Returns a channel that receives stream events, an unsubscribe function, and an error.
	// If the stream has already completed, all buffered events are sent before the done/error event.
	// Returns NOT_FOUND error if the stream doesn't exist.
	Subscribe(ctx context.Context, streamID string) (<-chan StreamEvent, func(), error)
}

StreamManager manages durable streams, allowing creation and subscription. Implementations can provide different storage backends (e.g., in-memory, database, cache).

type StreamManagerOption

type StreamManagerOption interface {
	// contains filtered or unexported methods
}

StreamManagerOption configures an InMemoryStreamManager.

func WithTTL

func WithTTL(ttl time.Duration) StreamManagerOption

WithTTL sets the time-to-live for completed streams. Streams that have completed (done or error) will be cleaned up after this duration. Default is 5 minutes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL