Documentation
¶
Overview ¶
Package channel provides a Go channel-based backend for omnistorage.
The channel backend is useful for:
- Pipeline processing between goroutines
- In-process message passing and streaming
- Test fixtures for streaming data
- Building data processing pipelines
Each path corresponds to a separate channel. Writers send data to channels, and readers receive data from channels.
Example usage:
backend := channel.New()
// Producer goroutine
go func() {
w, _ := backend.NewWriter(ctx, "events")
w.Write([]byte("event1"))
w.Write([]byte("event2"))
w.Close() // Signals end of stream
}()
// Consumer goroutine
r, _ := backend.NewReader(ctx, "events")
for {
buf := make([]byte, 1024)
n, err := r.Read(buf)
if err == io.EOF { break }
process(buf[:n])
}
Index ¶
- func NewFromConfig(config map[string]string) (omnistorage.Backend, error)
- type Backend
- func (b *Backend) Broadcast(ctx context.Context, prefix string, data []byte) error
- func (b *Backend) ChannelCount() int
- func (b *Backend) Close() error
- func (b *Backend) Delete(ctx context.Context, path string) error
- func (b *Backend) Exists(ctx context.Context, path string) (bool, error)
- func (b *Backend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *Backend) NewReader(ctx context.Context, path string, opts ...omnistorage.ReaderOption) (io.ReadCloser, error)
- func (b *Backend) NewWriter(ctx context.Context, path string, opts ...omnistorage.WriterOption) (io.WriteCloser, error)
- type Message
- type Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewFromConfig ¶
func NewFromConfig(config map[string]string) (omnistorage.Backend, error)
NewFromConfig creates a new channel backend from a config map. Supported options:
- buffer_size: Channel buffer size (default: 100)
- persistent: Buffer data for late readers (default: false)
Types ¶
type Backend ¶
type Backend struct {
// contains filtered or unexported fields
}
Backend implements omnistorage.Backend using Go channels.
func (*Backend) ChannelCount ¶
ChannelCount returns the number of active channels.
func (*Backend) NewReader ¶
func (b *Backend) NewReader(ctx context.Context, path string, opts ...omnistorage.ReaderOption) (io.ReadCloser, error)
NewReader creates a reader for the given path. It receives data from any writers on the same path.
func (*Backend) NewWriter ¶
func (b *Backend) NewWriter(ctx context.Context, path string, opts ...omnistorage.WriterOption) (io.WriteCloser, error)
NewWriter creates a writer for the given path. Data written will be sent to any readers on the same path.
type Option ¶
type Option func(*Backend)
Option configures a channel backend.
func WithBufferSize ¶
WithBufferSize sets the channel buffer size. Default is 100 messages.
func WithPersistence ¶
WithPersistence enables buffering data for readers that connect after writers. When enabled, data written to a channel is stored and replayed to new readers. This is useful for testing but uses more memory.