inputs

package
v0.44.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var InputTypes = []string{
	"nats",
	"kafka",
	"jetstream",
}
View Source
var Inputs = map[string]Initializer{}

Functions

func Register

func Register(name string, initFn Initializer)

func UpdateProcessorInSlice added in v0.43.0

func UpdateProcessorInSlice(
	logger *log.Logger,
	storeObj store.Store[any],
	eventProcessors []string,
	currentEvps []formatters.EventProcessor,
	processorName string,
	pcfg map[string]any,
) ([]formatters.EventProcessor, bool, error)

Types

type BaseInput added in v0.43.0

type BaseInput struct {
}

func (*BaseInput) Close added in v0.43.0

func (b *BaseInput) Close() error

func (*BaseInput) Start added in v0.43.0

func (b *BaseInput) Start(context.Context, string, map[string]any, ...Option) error

func (*BaseInput) Update added in v0.43.0

func (b *BaseInput) Update(map[string]any) error

func (*BaseInput) UpdateProcessor added in v0.43.0

func (b *BaseInput) UpdateProcessor(string, map[string]any) error

func (*BaseInput) Validate added in v0.43.0

func (b *BaseInput) Validate(map[string]any) error

type Initializer

type Initializer func() Input

type Input

type Input interface {
	// Start initializes the input and starts it.
	Start(context.Context, string, map[string]any, ...Option) error
	// Validate validates the input configuration.
	Validate(map[string]any) error
	// Update updates the input configuration in place for
	// a running input.
	Update(map[string]any) error
	// UpdateProcessor updates the named processor configuration
	// for a running input.
	// if the processor is not used by the Input, it will be ignored.
	UpdateProcessor(string, map[string]any) error
	// Close stops the input.
	Close() error
}

type InputOptions added in v0.42.1

type InputOptions struct {
	Logger   *log.Logger
	Outputs  map[string]outputs.Output
	Name     string
	Store    store.Store[any]
	Pipeline chan *pipeline.Msg
}

type Option

type Option func(*InputOptions) error

func WithConfigStore added in v0.43.0

func WithConfigStore(st store.Store[any]) Option

func WithLogger

func WithLogger(logger *log.Logger) Option

func WithName

func WithName(name string) Option

func WithOutputs

func WithOutputs(outs map[string]outputs.Output) Option

func WithPipeline added in v0.43.0

func WithPipeline(pipeline chan *pipeline.Msg) Option

type PipeMessage added in v0.43.0

type PipeMessage interface {
	Proto() proto.Message
	Meta() outputs.Meta
	Events() []*formatters.EventMsg
	Outputs() map[string]struct{}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL