pipelaner

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

README

Go Lint Go Test

pipelaner

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNameNotBeEmptyString = errors.New("ErrNameNotBeEmptyString")
	ErrEnvNotFound          = errors.New("ErrEnvNotFound")
)
View Source
var (
	ErrInputsNotFound       = errors.New("ErrInputsNotFound")
	ErrUnknownItem          = errors.New("ErrUnknownItem")
	ErrLaneNameMustBeUnique = errors.New("ErrLaneNameMustBeUnique")
	ErrInvalidConfig        = errors.New("ErrInvalidConfig")
	ErrUnknownGenerator     = errors.New("ErrUnknownGenerator")
	ErrUnknownMap           = errors.New("ErrUnknownMap")
	ErrUnknownSink          = errors.New("ErrUnknownSink")
)
View Source
var (
	ErrLaneIsStopped = errors.New("ErrLaneIsStopped")
)

Functions

func ErrLaneWithoutSink

func ErrLaneWithoutSink(s string) error

func NewLogger

func NewLogger() zerolog.Logger

func ReadToml

func ReadToml(file string) (map[string]any, error)

func RegisterGenerator

func RegisterGenerator(name string, generators Generator)

func RegisterMap

func RegisterMap(name string, maps Map)

func RegisterSink

func RegisterSink(name string, sink Sink)

Types

type Agent

type Agent struct {
	// contains filtered or unexported fields
}

func NewAgent

func NewAgent(
	file string,
) (*Agent, error)

func (*Agent) Serve

func (a *Agent) Serve()

func (*Agent) Stop

func (a *Agent) Stop()

type BaseLaneConfig

type BaseLaneConfig struct {
	OutputBufferSize           int64    `pipelane:"output_buffer"`
	Threads                    int64    `pipelane:"threads"`
	StartGCAfterMessageProcess bool     `pipelane:"start_gc_after_message_process"`
	SourceName                 string   `pipelane:"source_name"`
	Inputs                     []string `pipelane:"inputs"`
	Internal
}

func NewBaseConfig

func NewBaseConfig(val map[string]any) (*BaseLaneConfig, error)

func NewBaseConfigWithTypeAndExtended

func NewBaseConfigWithTypeAndExtended(
	itemType LaneTypes,
	name string,
	extended map[string]any,
) (*BaseLaneConfig, error)

func (*BaseLaneConfig) ParseExtended

func (c *BaseLaneConfig) ParseExtended(v any) error

type Config

type Config struct {
	Input map[string]any `pipeline:"input"`
	Map   map[string]any `pipeline:"map"`
	Sink  map[string]any `pipeline:"sink"`
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(c map[string]any) (*Config, error)

func NewConfigFromFile

func NewConfigFromFile(file string) (*Config, error)

type Context

type Context struct {
	// contains filtered or unexported fields
}

func NewContext

func NewContext(ctx context.Context, laneItem *LaneItem) *Context

func (*Context) Context

func (c *Context) Context() context.Context

func (*Context) LaneItem

func (c *Context) LaneItem() *LaneItem

func (*Context) LaneName

func (c *Context) LaneName() string

func (*Context) LaneType

func (c *Context) LaneType() LaneTypes

func (*Context) Logger

func (c *Context) Logger() zerolog.Logger

func (*Context) ReturnValue

func (c *Context) ReturnValue(value any) error

func (*Context) SourceName

func (c *Context) SourceName() string

func (*Context) Value

func (c *Context) Value() any

type Generator

type Generator interface {
	Init
	Generate(ctx *Context, input chan<- any)
}

type Generators

type Generators map[string]Generator

type HealthCheck

type HealthCheck struct {
	// contains filtered or unexported fields
}

func NewHealthCheck

func NewHealthCheck(conf Config) (*HealthCheck, error)

func (*HealthCheck) Serve

func (p *HealthCheck) Serve()

type Init

type Init interface {
	Init(ctx *Context) error
}

type Internal

type Internal struct {
	Name     string    `pipelane:"-"`
	LaneType LaneTypes `pipelane:"-"`
	Extended any       `pipelane:"-"`
	// contains filtered or unexported fields
}

type LaneItem

type LaneItem struct {
	// contains filtered or unexported fields
}

func NewLaneItem

func NewLaneItem(
	config *BaseLaneConfig,
	metrics bool,
) *LaneItem

func (*LaneItem) Config

func (l *LaneItem) Config() *BaseLaneConfig

func (*LaneItem) Subscribe

func (l *LaneItem) Subscribe(output *LaneItem)

type LaneTypes

type LaneTypes string
const (
	InputType LaneTypes = "input"
	MapType   LaneTypes = "map"
	SinkType  LaneTypes = "sink"
)

type LogFormat

type LogFormat string
const (
	LogFormatPlain LogFormat = "plain"
	LogFormatJSON  LogFormat = "json"
)

type Map

type Map interface {
	Init
	Map(ctx *Context, val any) any
}

type Maps

type Maps map[string]Map

type MethodGenerator

type MethodGenerator func(ctx *Context, input chan<- any)

type MethodMap

type MethodMap func(ctx *Context, val any) any

type MethodSink

type MethodSink func(ctx *Context, val any)

type MetricsServer

type MetricsServer struct {
	// contains filtered or unexported fields
}

func NewMetricsServer

func NewMetricsServer(cfg Config) (*MetricsServer, error)

func (*MetricsServer) Serve

func (m *MetricsServer) Serve() error

type Sink

type Sink interface {
	Init
	Sink(ctx *Context, val any)
}

type Sinks

type Sinks map[string]Sink

type TreeLanes

type TreeLanes struct {
	Items map[string]*LaneItem
}

func NewTreeFrom

func NewTreeFrom(
	ctx context.Context,
	file string,
) (*TreeLanes, error)

func NewTreeFromConfig

func NewTreeFromConfig(
	ctx context.Context,
	cfg *Config,
) (*TreeLanes, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL