collectors

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: BSD-2-Clause Imports: 23 Imported by: 0

Documentation

Overview

Package collectors provides standard implementations of the config.Collector interface for loading configuration from various sources.

Collector Types

  • Map — reads configuration from an in-memory map[string]any.
  • Env — reads configuration from environment variables, with configurable prefix, delimiter, and key transformation.
  • Storage — reads multiple configuration documents from a key-value storage (e.g., etcd) under a common prefix, with integrity verification.
  • Directory — reads multiple configuration files from a filesystem directory, with optional recursive scanning.
  • Source / DataSource — abstraction over a single data stream (file, storage key, etc.) used by the generic collector.

Format and Watching

  • Format — interface for parsing raw data (e.g., YAML) into a tree.Node.
  • YamlFormat — YAML implementation of Format.
  • Watcher — interface for reactive change notifications from storage backends.

Builder Pattern

All collectors use a fluent builder pattern with With* methods for optional configuration (WithName, WithSourceType, WithRevision, WithKeepOrder).

Example

m := collectors.NewMap(map[string]any{
    "listen": "127.0.0.1:3301",
    "log":    map[string]any{"level": "info"},
})

cfg, err := config.NewBuilder().
    AddCollector(m.WithName("defaults")).
    Build()
if err != nil {
    log.Fatal(err)
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoData indicates that there is no data to process.
	ErrNoData = errors.New("no data to process")
	// ErrUnmarshall indicates that unmarshalling failed.
	ErrUnmarshall = errors.New("failed to unmarshall")
	// ErrFile indicates a file processing error.
	ErrFile = errors.New("file processing error")
	// ErrReader indicates a reader processing error.
	ErrReader = errors.New("reader processing error")
	// ErrFetchStream indicates that fetching the stream failed.
	ErrFetchStream = errors.New("failed to fetch the stream")
	// ErrFormatParse indicates that parsing data with format failed.
	ErrFormatParse = errors.New("failed to parse data with format")
	// ErrStorageFetch indicates that storage fetch failed.
	ErrStorageFetch = errors.New("storage fetch failed")
	// ErrStorageKeyNotFound indicates that a storage key was not found.
	ErrStorageKeyNotFound = errors.New("storage key not found")
	// ErrStorageRange indicates that a storage range query failed.
	ErrStorageRange = errors.New("storage range query failed")
	// ErrStorageValidation indicates that storage integrity validation failed.
	ErrStorageValidation = errors.New("storage integrity validation failed")
	// ErrDirectoryRead indicates that reading a directory failed.
	ErrDirectoryRead = errors.New("directory read failed")
)

Functions

func NewSource

func NewSource(source DataSource, format Format) (config.Collector, error)

NewSource returns new Source object.

Types

type DataSource

type DataSource interface {
	Name() string
	SourceType() config.SourceType
	Revision() config.RevisionType
	FetchStream(ctx context.Context) (io.ReadCloser, error)
}

DataSource represent data source.

type Directory

type Directory struct {
	// contains filtered or unexported fields
}

Directory implements config.Collector and config.MultiCollector for reading multiple configuration files from a filesystem directory. Each file matching the configured extension is parsed according to the given Format and merged independently as a separate sub-collector. File names (without extension) are used for source identification; the file content determines the tree structure.

When recursive mode is enabled, subdirectories are scanned recursively. Symbolic links to files are followed, but symbolic links to directories are skipped to prevent infinite loops and cyclic traversals.

func NewDirectory

func NewDirectory(
	path string,
	extension string,
	format Format,
) *Directory

NewDirectory creates a new Directory collector that reads all files with the given extension from the specified directory path. Each file's content is parsed using the provided Format. The extension should include the leading dot (e.g., ".yaml").

func (*Directory) Collectors

func (d *Directory) Collectors(_ context.Context) ([]config.Collector, error)

Collectors implements config.MultiCollector. It reads the configured directory, filters files by extension, parses each file using the collector's Format, and returns one sub-collector per file. Each sub-collector is merged independently by the Builder with its own MergerContext, source name, and revision. Returns an error if any file cannot be read or parsed. Symbolic links to files are followed, but symbolic links to directories are skipped.

func (*Directory) KeepOrder

func (d *Directory) KeepOrder() bool

KeepOrder returns whether the collector preserves key order.

func (*Directory) Name

func (d *Directory) Name() string

Name returns the collector's name.

func (*Directory) Read

func (d *Directory) Read(ctx context.Context) <-chan config.Value

Read performs a directory scan and emits all values from all files on a single channel. This is a convenience method; the Builder uses Collectors for independent per-file merging.

func (*Directory) Recursive

func (d *Directory) Recursive() bool

Recursive returns whether the collector scans subdirectories recursively.

func (*Directory) Revision

func (d *Directory) Revision() config.RevisionType

Revision returns the collector's current revision.

func (*Directory) Source

func (d *Directory) Source() config.SourceType

Source returns the collector's source type.

func (*Directory) WithKeepOrder

func (d *Directory) WithKeepOrder(keep bool) *Directory

WithKeepOrder sets whether the collector should preserve the order of keys as they appear in each file (default false).

func (*Directory) WithName

func (d *Directory) WithName(name string) *Directory

WithName sets a custom name prefix for the collector (default "directory"). The final SourceInfo.Name for each value will be "<name>:<path>/<filename>", where <filename> is the name of the file from which the value was read. For example, WithName("config") with path "/etc/app" and file "db.yaml" produces SourceInfo.Name "config:/etc/app/db.yaml".

func (*Directory) WithRecursive

func (d *Directory) WithRecursive(recursive bool) *Directory

WithRecursive sets whether to scan subdirectories recursively (default false).

func (*Directory) WithRevision

func (d *Directory) WithRevision(rev config.RevisionType) *Directory

WithRevision sets the revision for the collector.

func (*Directory) WithSourceType

func (d *Directory) WithSourceType(source config.SourceType) *Directory

WithSourceType sets the source type reported by the collector (default config.FileSource).

type Env

type Env struct {
	// contains filtered or unexported fields
}

Env reads configuration data from environment variables.

func NewEnv

func NewEnv() *Env

NewEnv creates an Env with default settings. By default, it uses underscore ('_') as delimiter, no prefix, and converts keys to lowercase, replacing underscores with slashes to form a hierarchical key path.

func (*Env) KeepOrder

func (ec *Env) KeepOrder() bool

KeepOrder implements the Collector interface.

func (*Env) Name

func (ec *Env) Name() string

Name implements the Collector interface.

func (*Env) Read

func (ec *Env) Read(ctx context.Context) <-chan config.Value

Read implements the Collector interface.

func (*Env) Revision

func (ec *Env) Revision() config.RevisionType

Revision implements the Collector interface.

func (*Env) Source

func (ec *Env) Source() config.SourceType

Source implements the Collector interface.

func (*Env) WithDelimiter

func (ec *Env) WithDelimiter(delim string) *Env

WithDelimiter sets the delimiter used by the default transformation to split environment variable names. The default is underscore ('_').

func (*Env) WithKeepOrder

func (ec *Env) WithKeepOrder(keep bool) *Env

WithKeepOrder sets whether the collector preserves key order.

func (*Env) WithName

func (ec *Env) WithName(name string) *Env

WithName sets a custom name for the collector.

func (*Env) WithPrefix

func (ec *Env) WithPrefix(prefix string) *Env

WithPrefix sets a prefix to strip from environment variable names. If set, only variables starting with this prefix are processed, and the prefix is removed.

func (*Env) WithRevision

func (ec *Env) WithRevision(rev config.RevisionType) *Env

WithRevision sets the revision for the collector.

func (*Env) WithSourceType

func (ec *Env) WithSourceType(source config.SourceType) *Env

WithSourceType sets the source type for the collector.

func (*Env) WithTransform

func (ec *Env) WithTransform(fn func(string) config.KeyPath) *Env

WithTransform sets a custom transformation function from environment variable name to KeyPath. If set, prefix is still applied before transformation; delimiter is ignored.

type File

type File struct {
	// contains filtered or unexported fields
}

File implements DataSource with data from file.

func NewFile

func NewFile(file string) File

NewFile returns new File object.

func (File) FetchStream

func (f File) FetchStream(_ context.Context) (io.ReadCloser, error)

FetchStream returns reader.

func (File) Name

func (f File) Name() string

Name returns name of the source.

func (File) Revision

func (f File) Revision() config.RevisionType

Revision returns data revision.

func (File) SourceType

func (f File) SourceType() config.SourceType

SourceType returns source type.

type Format

type Format interface {
	Name() string
	KeepOrder() bool
	From(r io.Reader) Format
	Parse() (*tree.Node, error)
}

Format represents way to convert some data into the tree.Node.

func NewYamlFormat

func NewYamlFormat() Format

NewYamlFormat return new YamlFormat object.

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map reads configuration data from a map.

func NewMap

func NewMap(data map[string]any) *Map

NewMap creates a Map with the given data. The source type defaults to config.UnknownSource.

func (*Map) KeepOrder

func (mc *Map) KeepOrder() bool

KeepOrder implements the Collector interface.

func (*Map) Name

func (mc *Map) Name() string

Name implements the Collector interface.

func (*Map) Read

func (mc *Map) Read(ctx context.Context) <-chan config.Value

Read implements the Collector interface.

func (*Map) Revision

func (mc *Map) Revision() config.RevisionType

Revision implements the Collector interface.

func (*Map) Source

func (mc *Map) Source() config.SourceType

Source implements the Collector interface.

func (*Map) WithKeepOrder

func (mc *Map) WithKeepOrder(keep bool) *Map

WithKeepOrder sets whether the collector preserves key order.

func (*Map) WithName

func (mc *Map) WithName(name string) *Map

WithName sets a custom name for the collector.

func (*Map) WithRevision

func (mc *Map) WithRevision(rev config.RevisionType) *Map

WithRevision sets the revision for the collector.

func (*Map) WithSourceType

func (mc *Map) WithSourceType(source config.SourceType) *Map

WithSourceType sets the source type for the collector.

type Mock

type Mock struct {
	// contains filtered or unexported fields
}

Mock is a testing collector that returns a predefined set of values.

func NewMock

func NewMock() *Mock

NewMock creates a Mock with default settings.

func (*Mock) KeepOrder

func (mc *Mock) KeepOrder() bool

KeepOrder implements the Collector interface.

func (*Mock) Name

func (mc *Mock) Name() string

Name implements the Collector interface.

func (*Mock) Read

func (mc *Mock) Read(ctx context.Context) <-chan config.Value

Read implements the Collector interface.

func (*Mock) Revision

func (mc *Mock) Revision() config.RevisionType

Revision implements the Collector interface.

func (*Mock) Source

func (mc *Mock) Source() config.SourceType

Source implements the Collector interface.

func (*Mock) WithEntries

func (mc *Mock) WithEntries(entries map[string]any) *Mock

WithEntries adds multiple key-value pairs to the collector.

func (*Mock) WithEntry

func (mc *Mock) WithEntry(keyPath config.KeyPath, value any) *Mock

WithEntry adds a key-value pair to the collector.

func (*Mock) WithKeepOrder

func (mc *Mock) WithKeepOrder(keep bool) *Mock

WithKeepOrder sets whether the collector preserves key order.

func (*Mock) WithName

func (mc *Mock) WithName(name string) *Mock

WithName sets a custom name for the collector.

func (*Mock) WithRevision

func (mc *Mock) WithRevision(rev config.RevisionType) *Mock

WithRevision sets the revision for the collector.

func (*Mock) WithSourceType

func (mc *Mock) WithSourceType(source config.SourceType) *Mock

WithSourceType sets the source type for the collector.

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source represent data source with format.

func (*Source) KeepOrder

func (s *Source) KeepOrder() bool

KeepOrder implements Collector interface.

func (*Source) Name

func (s *Source) Name() string

Name implements Collector interface.

func (*Source) Read

func (s *Source) Read(ctx context.Context) <-chan config.Value

Read implements Collector interface.

func (*Source) Revision

func (s *Source) Revision() config.RevisionType

Revision implements Collector interface.

func (*Source) Source

func (s *Source) Source() config.SourceType

Source implements Collector interface.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage implements config.Collector for reading multiple configuration documents from a key-value storage under a common prefix with integrity verification. Each key's value is parsed according to the given Format and merged into a single config tree. Key names are used only for distinguishing documents; the YAML content determines the tree structure.

func NewStorage

func NewStorage(
	typed *integrity.Typed[[]byte],
	prefix string,
	format Format,
) *Storage

NewStorage creates a new Storage collector that reads all keys under the prefix managed by the given integrity.Typed storage. Each key's value is parsed using the provided Format.

func (*Storage) Collectors

func (s *Storage) Collectors(ctx context.Context) ([]config.Collector, error)

Collectors implements config.MultiCollector. It performs a range query with the configured prefix, validates integrity, parses each key's value using the collector's Format, and returns one sub-collector per storage key. Each sub-collector is merged independently by the Builder with its own MergerContext, source name, and revision. The parent Storage's revision is updated to the maximum ModRevision among the fetched keys. Keys with empty values or parsing errors are skipped.

func (*Storage) KeepOrder

func (s *Storage) KeepOrder() bool

KeepOrder returns whether the collector preserves key order.

func (*Storage) Name

func (s *Storage) Name() string

Name returns the collector's name.

func (*Storage) Read

func (s *Storage) Read(ctx context.Context) <-chan config.Value

Read performs a range query with the configured prefix and emits all values from all documents on a single channel. This is a convenience method; the Builder uses Collectors for independent per-document merging.

func (*Storage) Revision

func (s *Storage) Revision() config.RevisionType

Revision returns the collector's current revision.

func (*Storage) Source

func (s *Storage) Source() config.SourceType

Source returns the collector's source type.

func (*Storage) Watch

func (s *Storage) Watch(ctx context.Context) (<-chan WatchEvent, error)

Watch implements the Watcher interface. It returns a channel that streams change events for the configured prefix in storage.

func (*Storage) WithDelimiter

func (s *Storage) WithDelimiter(delim string) *Storage

WithDelimiter sets the delimiter used to split storage keys into config path segments. The default is "/". If the delimiter differs from "/", it is replaced internally with "/" before constructing the KeyPath.

func (*Storage) WithKeepOrder

func (s *Storage) WithKeepOrder(keep bool) *Storage

WithKeepOrder sets whether the collector should preserve the order of keys as they appear in the storage range (default false).

func (*Storage) WithName

func (s *Storage) WithName(name string) *Storage

WithName sets a custom name prefix for the collector (default "storage"). The final SourceInfo.Name for each value will be "<name>:<prefix><key>", where <key> is the storage key from which the value was read. For example, WithName("etcd") with prefix "/config/" and key "app" produces SourceInfo.Name "etcd:/config/app".

func (*Storage) WithRevision

func (s *Storage) WithRevision(rev config.RevisionType) *Storage

WithRevision sets an initial revision for the collector. If not set, the revision will be derived from the highest ModRevision among the fetched keys after a successful Read.

func (*Storage) WithSourceType

func (s *Storage) WithSourceType(source config.SourceType) *Storage

WithSourceType sets the source type reported by the collector (default config.StorageSource).

type StorageSource

type StorageSource struct {
	// contains filtered or unexported fields
}

StorageSource implements DataSource for reading a single configuration document from a key-value storage with integrity verification. It uses the integrity layer's namer and validator to generate properly structured keys and verify hashes/signatures on the fetched value.

func NewStorageSource

func NewStorageSource(
	strg storage.Storage,
	prefix string,
	name string,
	hashers []hasher.Hasher,
	verifiers []crypto.Verifier,
) *StorageSource

NewStorageSource creates a new StorageSource that will read from the given storage the value identified by the logical name. The prefix determines the key namespace in the storage backend (e.g., "/config/").

func (*StorageSource) FetchStream

func (s *StorageSource) FetchStream(ctx context.Context) (io.ReadCloser, error)

FetchStream performs a transactional Get for the configured name with integrity verification and returns an io.ReadCloser over the raw value bytes. If the key is not found, ErrStorageKeyNotFound is returned. Storage errors are wrapped with ErrStorageFetch. Integrity verification errors are wrapped with ErrStorageValidation. The revision is updated on success.

func (*StorageSource) Name

func (s *StorageSource) Name() string

Name returns the fixed label "storage".

func (*StorageSource) Revision

func (s *StorageSource) Revision() config.RevisionType

Revision returns the modification revision of the last successfully fetched value. The revision is a string representation of the storage's ModRevision.

func (*StorageSource) SourceType

func (s *StorageSource) SourceType() config.SourceType

SourceType returns config.StorageSource.

func (*StorageSource) Watch

func (s *StorageSource) Watch(ctx context.Context) (<-chan WatchEvent, error)

Watch implements the Watcher interface. It returns a channel that streams change events for the configured name in storage.

type WatchEvent

type WatchEvent struct {
	// Prefix indicates the key or prefix that was changed.
	Prefix string
}

WatchEvent represents a change notification from storage.

type Watcher

type Watcher interface {
	// Watch returns a channel that streams change events for the collector's
	// key or prefix. The channel is closed when the context is cancelled.
	Watch(ctx context.Context) (<-chan WatchEvent, error)
}

Watcher provides reactive change notifications from a storage backend. Collectors that support watching for changes implement this interface in addition to the standard Collector interface.

type YamlFormat

type YamlFormat struct {
	// contains filtered or unexported fields
}

YamlFormat implements Format interface.

func (YamlFormat) From

func (y YamlFormat) From(reader io.Reader) Format

From implements the Format interface.

func (YamlFormat) KeepOrder

func (y YamlFormat) KeepOrder() bool

KeepOrder implements the Format interface.

func (YamlFormat) Name

func (y YamlFormat) Name() string

Name implements the Format interface.

func (YamlFormat) Parse

func (y YamlFormat) Parse() (*tree.Node, error)

Parse implements the Format interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL