pipe

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2023 License: MIT Imports: 16 Imported by: 6

Documentation

Overview

Package pipe provides BGP message processing with callbacks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInClosed  = errors.New("input channel closed")
	ErrOutClosed = errors.New("output channel closed")
	ErrStopped   = errors.New("pipe stopped")
)
View Source
var (
	// pipe has finished starting
	EVENT_START = "bgpfix/pipe.START"

	// pipe is about to stop
	EVENT_STOP = "bgpfix/pipe.STOP"

	// could not parse the message before its callback
	EVENT_PARSE = "bgpfix/pipe.PARSE"

	// valid OPEN with a bigger message timestamp (seconds) made it to output
	EVENT_OPEN = "bgpfix/pipe.OPEN"

	// KEEPALIVE with a bigger message timestamp (seconds) made it to output
	EVENT_ALIVE = "bgpfix/pipe.ALIVE"

	// UPDATE with a bigger message timestamp (seconds) made it to output
	EVENT_UPDATE = "bgpfix/pipe.UPDATE"

	// session established (OPEN+KEEPALIVE made it to both sides)
	EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED"
)

a collection of events generated internally by pipe

View Source
var DefaultOptions = Options{
	Logger: &log.Logger,
	Caps:   true,
}

Default BGP pipe options

Functions

This section is empty.

Types

type Action

type Action byte

Action corresponds to m.Action values

const (
	// The default, zero action: keep processing as-is.
	ACTION_OK Action = 0

	// Keep the message for later use, do not re-use its memory.
	//
	// You must use this if you wish to re-inject the message,
	// or keep reference to some value inside the msg.
	//
	// Once set, you must not remove this action from a message
	// unless you know you are the sole owner of this message.
	ACTION_BORROW Action = 1 << iota

	// Drop the message immediately from the pipe.
	//
	// If you want to re-inject the message later, set ACTION_BORROW too,
	// and keep in mind the message will try to re-start after where
	// you dropped it, unless you call Context.Clear on it.
	ACTION_DROP

	// Accept the message immediately and write to pipe output.
	ACTION_ACCEPT
)

func (*Action) Add

func (ac *Action) Add(a Action)

Add adds a to action ac

func (*Action) Clear

func (ac *Action) Clear()

Clear clears all bits except for BORROW

func (Action) Is

func (ac Action) Is(a Action) bool

Is returns true iff a is set in ac

func (Action) Not

func (ac Action) Not(a Action) bool

IsNot returns true iff a is NOT set in ac

type Callback

type Callback struct {
	Id      int          // optional callback id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner callback is run
	Enabled *atomic.Bool // if non-nil, disables the callback unless true

	Pre  bool // run before non-pre callbacks?
	Raw  bool // if true, do not parse the message (which may already be parsed, but for other reasons)
	Post bool // run after non-post callbacks?

	Dir   msg.Dir      // if non-zero, limits the direction
	Types []msg.Type   // if non-empty, limits message types
	Func  CallbackFunc // the function to call
}

Callback represents a function to call for matching BGP messages

type CallbackFunc

type CallbackFunc func(m *msg.Msg) (add_action Action)

CallbackFunc processes message m. Optionally returns an Action to add to m's pipe.Context.Action.

type Event

type Event struct {
	Pipe *Pipe     `json:"-"`              // parent pipe
	Seq  uint64    `json:"seq,omitempty"`  // event sequence number
	Time time.Time `json:"time,omitempty"` // event timestamp

	Type  string   `json:"type"`  // type, usually "lib/pkg.NAME"
	Dir   msg.Dir  `json:"dir"`   // optional event direction
	Msg   *msg.Msg `json:"-"`     // optional message that caused the event
	Error error    `json:"err"`   // optional error related to the event
	Value any      `json:"value"` // optional value, type-specific
	// contains filtered or unexported fields
}

Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.

func (*Event) String

func (ev *Event) String() string

String returns the event Type, or "(nil)" if ev is nil

func (*Event) Wait

func (ev *Event) Wait()

Wait blocks until the event is handled

type FilterMode

type FilterMode = int
const (
	// callback filter disabled
	FILTER_NONE FilterMode = iota

	// skip if callback id == value
	FILTER_EQ

	// skip if callback id > value
	FILTER_GT

	// skip if callback id < value
	FILTER_LT

	// skip if callback id >= value
	FILTER_GE

	// skip if callback id <= value
	FILTER_LE

	// skip if callback id != value
	FILTER_NE

	// skip all callbacks
	FILTER_ALL
)

type Handler

type Handler struct {
	Id      int          // optional handler id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner handler is run
	Enabled *atomic.Bool // if non-nil, disables the handler unless true

	Pre  bool // run before non-pre handlers?
	Post bool // run after non-post handlers?

	Dir   msg.Dir     // if non-zero, limits the direction
	Types []string    // if non-empty, limits event types
	Func  HandlerFunc // the function to call
}

Handler represents a function to call for matching pipe events

type HandlerFunc

type HandlerFunc func(ev *Event) (keep_event bool)

HandlerFunc handles event ev. If returns false, unregisters the parent Handler for all Types.

type Input

type Input struct {
	Pipe *Pipe // attached pipe (nil before pipe start)
	Line *Line // attached line (nil before pipe start)

	Id   int     // optional id
	Name string  // optional name
	Dir  msg.Dir // line direction

	// In is the input, where to read incoming messages from.
	In chan *msg.Msg

	// Reverse, when true, runs callbacks in reverse order.
	Reverse bool

	// CallbackFilter controls which callbacks to skip (disabled by default)
	CallbackFilter FilterMode

	// FilterValue specifies the value for CallbackFilter
	FilterValue any

	// statistics
	Stats struct {
		Parsed  uint64
		Short   uint64
		Garbled uint64
	}
	// contains filtered or unexported fields
}

Input processes incoming BGP messages through Callbacks.

func (*Input) Close

func (in *Input) Close()

Close safely closes the .In channel, which should eventually stop the Input

func (*Input) Wait

func (in *Input) Wait()

Wait blocks until the input is done processing the messages

func (*Input) Write

func (in *Input) Write(src []byte) (n int, err error)

Write implements io.Writer and reads all BGP messages from src into pi.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.

In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.

Must not be used concurrently.

func (*Input) WriteMsg

func (in *Input) WriteMsg(m *msg.Msg) (write_error error)

WriteMsg safely sends m to pi.In, avoiding a panic if pi.In is closed. It assigns a sequence number and timestamp before writing to the channel.

type Line

type Line struct {
	Pipe *Pipe   // parent pipe
	Dir  msg.Dir // line direction

	// the default input
	*Input

	// In is the default input, where you write incoming messages to.
	In chan *msg.Msg

	// Out is the output, where you read processed messages from.
	Out chan *msg.Msg

	// UNIX timestamp (seconds) of the last valid OPEN message
	LastOpen atomic.Int64

	// UNIX timestamp (seconds) of the last KEEPALIVE message
	LastAlive atomic.Int64

	// UNIX timestamp (seconds) of the last UPDATE message
	LastUpdate atomic.Int64

	// the OPEN message that updated LastOpen
	Open atomic.Pointer[msg.Open]
	// contains filtered or unexported fields
}

Line represents one direction of a Pipe: inputs with a common output

func (*Line) Close

func (l *Line) Close()

Close safely closes all inputs, which should eventually stop the line

func (*Line) CloseOutput

func (l *Line) CloseOutput()

CloseOutput safely closes the output channel.

func (*Line) Read

func (l *Line) Read(dst []byte) (int, error)

Read reads l.Out and writes raw BGP data to dst. Must not be used concurrently.

func (*Line) Wait

func (l *Line) Wait()

Wait blocks until all processing is done

func (*Line) WriteOut

func (l *Line) WriteOut(m *msg.Msg) (write_error error)

WriteOut safely sends m to l.Out, avoiding a panic if closed.

func (*Line) WriteTo

func (l *Line) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo interface, writing raw BGP data to w

type Options

type Options struct {
	Logger  *zerolog.Logger // if nil logging is disabled
	MsgPool *sync.Pool      // optional pool for msg.Msg

	Caps bool // overwrite pipe.Caps using OPEN messages?

	Callbacks []*Callback // message callbacks
	Handlers  []*Handler  // event handlers
	Inputs    []*Input    // pipe inputs (message processors)
}

BGP pipe options

func (*Options) AddCallback

func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback

AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.

func (*Options) AddHandler

func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler

AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.

func (*Options) AddInput

func (o *Options) AddInput(dir msg.Dir, tpl ...*Input) *Input

AddInput adds pipe Input for given destination, with optional details in tpl.

func (*Options) OnEstablished

func (o *Options) OnEstablished(hdf HandlerFunc) *Handler

OnEstablished request hdf to be called when the BGP session is established.

func (*Options) OnEvent

func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler

OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.

func (*Options) OnEventPost

func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler

OnEventPost is like OnEvent but requests to run hdf after other handlers

func (*Options) OnEventPre

func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler

OnEventPre is like OnEvent but requests to run hdf before other handlers

func (*Options) OnMsg

func (o *Options) OnMsg(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsg adds a callback for all messages of given types (or all types if not specified).

func (*Options) OnMsgPost

func (o *Options) OnMsgPost(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsgPost is like OnMsg but requests to run cb after other callbacks

func (*Options) OnMsgPre

func (o *Options) OnMsgPre(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsgPre is like OnMsg but requests to run cb before other callbacks

func (*Options) OnParseError

func (o *Options) OnParseError(hdf HandlerFunc) *Handler

OnParseError request hdf to be called on BGP message parse error.

func (*Options) OnStart

func (o *Options) OnStart(hdf HandlerFunc) *Handler

OnStart request hdf to be called after the pipe starts.

func (*Options) OnStop

func (o *Options) OnStop(hdf HandlerFunc) *Handler

OnStop request hdf to be called when the pipe stops.

type Pipe

type Pipe struct {
	*zerolog.Logger

	Options           // pipe options; modify before Start()
	Caps    caps.Caps // BGP capability context; always thread-safe
	L       *Line     // pipeline processing messages from R to L
	R       *Line     // pipeline processing messages from L to R

	// generic Key-Value store, always thread-safe
	KV *xsync.MapOf[string, any]
	// contains filtered or unexported fields
}

Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.

Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.

func NewPipe

func NewPipe(ctx context.Context) *Pipe

NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().

func (*Pipe) Event

func (p *Pipe) Event(et string, args ...any) *Event

Event announces a new event type et to the pipe, with optional arguments. The first msg.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into single ev.Error. The remaining arguments are used as ev.Val.

func (*Pipe) Get

func (p *Pipe) Get() (m *msg.Msg)

Get returns empty msg from pool, or a new msg object

func (*Pipe) LineFrom

func (p *Pipe) LineFrom(src msg.Dir) *Line

LineFrom returns the line processing messages coming from src

func (*Pipe) LineTo

func (p *Pipe) LineTo(dst msg.Dir) *Line

LineTo returns the line processing messages destined to dst

func (*Pipe) Put

func (p *Pipe) Put(m *msg.Msg)

Put resets msg and returns it to pool, which might free it

func (*Pipe) Start

func (p *Pipe) Start()

Start starts given number of r/t message handlers in background, by default r/t = 1/1 (single-threaded, strictly ordered processing).

func (*Pipe) Started

func (p *Pipe) Started() bool

Started returns true iff Start() has already been called = pipe is (being) started.

func (*Pipe) Stop

func (p *Pipe) Stop()

Stop stops all handlers and blocks till handlers finish. Pipe must not be used again past this point. Closes all input channels, which should eventually close all output channels, possibly after this function returns.

func (*Pipe) Stopped

func (p *Pipe) Stopped() bool

Stopped returns true iff Stop() has already been called = pipe is (being) stopped.

func (*Pipe) Wait

func (p *Pipe) Wait()

Wait blocks until the pipe starts and stops completely.

type PipeContext

type PipeContext struct {
	Pipe  *Pipe  // pipe processing the message
	Input *Input // input processing the message (message source)

	Callback *Callback // currently run callback
	Action   Action    // requested message actions
	// contains filtered or unexported fields
}

PipeContext tracks message processing progress in a pipe

func Context

func Context(m *msg.Msg) *PipeContext

Context returns pipe Context inside message m, updating m.Value if needed.

func (*PipeContext) Clear

func (pc *PipeContext) Clear()

Clear resets pc, but preserves ACTION_BORROW if set.

func (*PipeContext) FromJSON

func (pc *PipeContext) FromJSON(src []byte) error

TODO

func (*PipeContext) HasKV

func (pc *PipeContext) HasKV() bool

HasKV returns true iff the context already has a Key-Value store.

func (*PipeContext) KV

func (pc *PipeContext) KV() map[string]any

KV returns a generic Key-Value store, creating it first if needed.

func (*PipeContext) Reset

func (pc *PipeContext) Reset()

Reset resets pc to empty state

func (*PipeContext) ToJSON

func (pc *PipeContext) ToJSON(dst []byte) []byte

TODO

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL