stages

package
v0.1.7-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const SleepTransportUnavailable = 3 * time.Second

SleepTransportUnavailable is used to determine how long to sleep between deliver attempts

Variables

View Source
var ErrUnknownMessageFormat = errors.New("unknown message log format")

Functions

func StageInit

func StageInit(stage Stage, parallelism int)

StageInit initializes parallelism count of stage workers, then waits for stage to close

Types

type MetricsCollector

type MetricsCollector interface {
	IncrementHTTPRequestCount(podName, method, service, path string, status int)
	IncrementHTTPRequestsTotalCount(service string)
	ObserveHTTPRequestTime(podName, method, service, path string, value float64)
	ObserveHTTPUpstreamResponseTimeTotal(podName, method, service, path string, value float64)
}

MetricsCollector is a metrics counter object interface for parsing sli stage

type ParserFunction

type ParserFunction func([]byte) (common.EntryMap, error)

type ParserFunctionDefault

type ParserFunctionDefault func([]byte) common.EntryMap

type ParserSLI

type ParserSLI interface {
	Parse(entryMap common.EntryMap)
}

ParserSLI may inject additional operations on parsed entry

type Stage

type Stage interface {
	InitWorker()
	Close()
}

Stage interface

type StageFiltering

type StageFiltering struct {
	// contains filtered or unexported fields
}

StageFiltering filters messages from input

func NewStageFiltering

func NewStageFiltering(input <-chan common.EntryMap, userLogField string, logger logging.Logger) *StageFiltering

NewStageFiltering is a StageFiltering constructor

func (*StageFiltering) Close

func (s *StageFiltering) Close()

Close closes the stage output after its workers finish

func (*StageFiltering) InitWorker

func (s *StageFiltering) InitWorker()

InitWorker starts stage worker

func (*StageFiltering) Out

func (s *StageFiltering) Out() <-chan common.EntryMap

Out is stage output accessor

type StageJSONMarshalling

type StageJSONMarshalling struct {
	// contains filtered or unexported fields
}

StageJSONMarshalling marshals messages from input to JSON and sends them to output

func NewStageJSONMarshalling

func NewStageJSONMarshalling(input <-chan common.EntryMap, logger logging.Logger) *StageJSONMarshalling

NewStageJSONMarshalling is a constructor for StageJSONMarshalling

func (*StageJSONMarshalling) Close

func (s *StageJSONMarshalling) Close()

Close closes the stage output after its workers finish

func (*StageJSONMarshalling) InitWorker

func (s *StageJSONMarshalling) InitWorker()

InitWorker starts stage worker

func (*StageJSONMarshalling) Out

func (s *StageJSONMarshalling) Out() <-chan string

Out is stage output accessor

type StageParsingEntry

type StageParsingEntry struct {
	// contains filtered or unexported fields
}

StageParsingEntry parses lines using given parser, extends resulting map with metadata and sends it downstream

func NewStageParsingEntry

func NewStageParsingEntry(input <-chan *common.Entry, parseDocker, parseContainerD ParserFunction,
	parserDefault ParserFunctionDefault, extendsField string, logger logging.Logger) *StageParsingEntry

NewStageParsingEntry is a StageParsingEntry constructor

func (*StageParsingEntry) Close

func (s *StageParsingEntry) Close()

Close closes the stage output after its workers finish

func (*StageParsingEntry) InitWorker

func (s *StageParsingEntry) InitWorker()

InitWorker starts stage worker

func (*StageParsingEntry) Out

func (s *StageParsingEntry) Out() <-chan common.EntryMap

Out is stage output accessor

type StageParsingSLI

type StageParsingSLI struct {
	// contains filtered or unexported fields
}

StageParsingSLI checks if the message in its input is a SLI message, modifies metrics, then sends message downstream

func NewStageParsingSLI

func NewStageParsingSLI(input <-chan common.EntryMap, userLogField string, parser ParserSLI, logger logging.Logger) *StageParsingSLI

NewStageParsingSLI is a StageParsingSLI

func (*StageParsingSLI) Close

func (s *StageParsingSLI) Close()

Close closes the stage output after its workers finish

func (*StageParsingSLI) InitWorker

func (s *StageParsingSLI) InitWorker()

InitWorker starts stage worker

func (*StageParsingSLI) Out

func (s *StageParsingSLI) Out() <-chan common.EntryMap

Out is stage output accessor

type StageTransport

type StageTransport struct {
	// contains filtered or unexported fields
}

StageTransport stacks messages from input into batches and tries to send them to transport

func NewStageTransport

func NewStageTransport(input <-chan string, transportClient TransportClient,
	bufferSizeMax int, flushInterval time.Duration, logger logging.Logger) *StageTransport

NewStageTransport is a StageTransport constructor

func (*StageTransport) Close

func (s *StageTransport) Close()

Close waits its workers finish

func (*StageTransport) InitWorker

func (s *StageTransport) InitWorker()

InitWorker starts stage worker

type TransportClient

type TransportClient interface {
	DeliverMessages(messages []string) error
}

TransportClient is a transport interface for transport stage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL