input

package
v1.12.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: MIT Imports: 21 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertToJSONCompatible added in v1.12.0

func ConvertToJSONCompatible(input any) any

ConvertToJSONCompatible recursively converts map[any]any to map[string]any for JSON compatibility

func GetInput

func GetInput(inputType string, config map[any]any) topology.Input

GetInput return topoloty.Input from builtin plugins or from a 3rd party plugin

func Register

func Register(inputType string, bf BuildInputFunc)

Register is used by input plugins to register themselves

func SafeDecodeConfig added in v1.12.0

func SafeDecodeConfig(inputType string, config map[any]any, result any)

SafeDecodeConfig safely decodes input configuration using encoding/json This provides detailed error messages from the standard library

func ValidateRequiredFields added in v1.12.0

func ValidateRequiredFields(inputType string, fields map[string]any)

ValidateRequiredFields checks that required fields are present in the decoded config

Types

type BuildInputFunc

type BuildInputFunc func(map[any]any) topology.Input

type InputBox

type InputBox struct {
	// contains filtered or unexported fields
}

func NewInputBox

func NewInputBox(input topology.Input, inputConfig map[any]any, config map[string]any, exit func()) *InputBox

func (*InputBox) Beat

func (box *InputBox) Beat(worker int)

Beat starts the processors and wait until shutdown

func (*InputBox) SetShutdownWhenNil

func (box *InputBox) SetShutdownWhenNil(shutdownWhenNil bool)

SetShutdownWhenNil is used for benchmark. Gohangout main thread would exit when one input box receive a nil message, such as Ctrl-D in Stdin input

func (*InputBox) Shutdown

func (box *InputBox) Shutdown()

Shutdown shutdowns the inputs and outputs

type KafkaInput

type KafkaInput struct {
	// contains filtered or unexported fields
}

func (*KafkaInput) ReadOneEvent

func (p *KafkaInput) ReadOneEvent() map[string]any

ReadOneEvent implement method in topology.Input. gohangout call this method to get one event and pass it to filter or output

func (*KafkaInput) Shutdown

func (p *KafkaInput) Shutdown()

Shutdown implement method in topology.Input. It closes all consumers

type KafkaInputConfig added in v1.12.0

type KafkaInputConfig struct {
	Codec               string           `json:"codec"`
	DecorateEvents      bool             `json:"decorate_events"`
	Topic               map[string]int   `json:"topic"`
	Assign              map[string][]int `json:"assign"`
	MessagesQueueLength int              `json:"messages_queue_length"`
	ConsumerSettings    map[string]any   `json:"consumer_settings"`
}

KafkaInputConfig defines the configuration structure for Kafka input

type RandomInput

type RandomInput struct {
	// contains filtered or unexported fields
}

func (*RandomInput) ReadOneEvent

func (p *RandomInput) ReadOneEvent() map[string]any

func (*RandomInput) Shutdown

func (p *RandomInput) Shutdown()

type StdinConfig added in v1.12.0

type StdinConfig struct {
	Codec string `json:"codec"`
}

StdinConfig defines the configuration structure for Stdin input

type StdinInput

type StdinInput struct {
	// contains filtered or unexported fields
}

func (*StdinInput) ReadOneEvent

func (p *StdinInput) ReadOneEvent() map[string]any

func (*StdinInput) Shutdown

func (p *StdinInput) Shutdown()

type TCPInput

type TCPInput struct {
	// contains filtered or unexported fields
}

func (*TCPInput) ReadOneEvent

func (p *TCPInput) ReadOneEvent() map[string]any

func (*TCPInput) Shutdown

func (p *TCPInput) Shutdown()

type UDPInput

type UDPInput struct {
	// contains filtered or unexported fields
}

func (*UDPInput) ReadOneEvent

func (p *UDPInput) ReadOneEvent() map[string]any

func (*UDPInput) Shutdown

func (p *UDPInput) Shutdown()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL