channel

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package channel provides a Go channel-based backend for omnistorage.

The channel backend is useful for:

  • Pipeline processing between goroutines
  • In-process message passing and streaming
  • Test fixtures for streaming data
  • Building data processing pipelines

Each path corresponds to a separate channel. Writers send data to channels, and readers receive data from channels.

Example usage:

backend := channel.New()

// Producer goroutine
go func() {
    w, _ := backend.NewWriter(ctx, "events")
    w.Write([]byte("event1"))
    w.Write([]byte("event2"))
    w.Close() // Signals end of stream
}()

// Consumer goroutine
r, _ := backend.NewReader(ctx, "events")
for {
    buf := make([]byte, 1024)
    n, err := r.Read(buf)
    if err == io.EOF { break }
    process(buf[:n])
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFromConfig

func NewFromConfig(config map[string]string) (omnistorage.Backend, error)

NewFromConfig creates a new channel backend from a config map. Supported options:

  • buffer_size: Channel buffer size (default: 100)
  • persistent: Buffer data for late readers (default: false)

Types

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

Backend implements omnistorage.Backend using Go channels.

func New

func New(opts ...Option) *Backend

New creates a new channel backend with optional configuration.

func (*Backend) Broadcast

func (b *Backend) Broadcast(ctx context.Context, prefix string, data []byte) error

Broadcast sends data to all channels matching a prefix.

func (*Backend) ChannelCount

func (b *Backend) ChannelCount() int

ChannelCount returns the number of active channels.

func (*Backend) Close

func (b *Backend) Close() error

Close closes all channels and releases resources.

func (*Backend) Delete

func (b *Backend) Delete(ctx context.Context, path string) error

Delete removes a channel for the given path.

func (*Backend) Exists

func (b *Backend) Exists(ctx context.Context, path string) (bool, error)

Exists checks if a channel exists for the given path.

func (*Backend) List

func (b *Backend) List(ctx context.Context, prefix string) ([]string, error)

List returns all channel paths with the given prefix.

func (*Backend) NewReader

func (b *Backend) NewReader(ctx context.Context, path string, opts ...omnistorage.ReaderOption) (io.ReadCloser, error)

NewReader creates a reader for the given path. It receives data from any writers on the same path.

func (*Backend) NewWriter

func (b *Backend) NewWriter(ctx context.Context, path string, opts ...omnistorage.WriterOption) (io.WriteCloser, error)

NewWriter creates a writer for the given path. Data written will be sent to any readers on the same path.

type Message

type Message struct {
	Data []byte
	Path string
}

Message represents a message sent through a channel.

type Option

type Option func(*Backend)

Option configures a channel backend.

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the channel buffer size. Default is 100 messages.

func WithPersistence

func WithPersistence(enabled bool) Option

WithPersistence enables buffering data for readers that connect after writers. When enabled, data written to a channel is stored and replayed to new readers. This is useful for testing but uses more memory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL