pipe

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2024 License: MIT Imports: 16 Imported by: 6

Documentation

Overview

Package pipe provides BGP message processing with callbacks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInClosed  = errors.New("input channel closed")
	ErrOutClosed = errors.New("output channel closed")
	ErrStopped   = errors.New("pipe stopped")
)
View Source
var (
	// pipe has finished starting
	EVENT_START = "bgpfix/pipe.START"

	// pipe is about to stop
	EVENT_STOP = "bgpfix/pipe.STOP"

	// could not parse the message before its callback
	EVENT_PARSE = "bgpfix/pipe.PARSE"

	// valid OPEN with a bigger message timestamp (seconds) made it to output
	EVENT_OPEN = "bgpfix/pipe.OPEN"

	// KEEPALIVE with a bigger message timestamp (seconds) made it to output
	EVENT_ALIVE = "bgpfix/pipe.ALIVE"

	// UPDATE with a bigger message timestamp (seconds) made it to output
	EVENT_UPDATE = "bgpfix/pipe.UPDATE"

	// session established (OPEN+KEEPALIVE made it to both sides)
	EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED"
)

a collection of events generated internally by pipe

View Source
var DefaultOptions = Options{
	Logger: &log.Logger,
	Caps:   true,
}

Default BGP pipe options

Functions

func ActionAccept added in v0.1.7

func ActionAccept(m *msg.Msg) *msg.Msg

ActionAccept adds ACTION_ACCEPT to m and returns it.

func ActionBorrow added in v0.1.7

func ActionBorrow(m *msg.Msg) *msg.Msg

ActionBorrow adds ACTION_BORROW to m and returns it.

func ActionClear added in v0.1.7

func ActionClear(m *msg.Msg) *msg.Msg

ActionClear clears all action flags but ACTION_BORROW in m and returns it.

func ActionDrop added in v0.1.7

func ActionDrop(m *msg.Msg) *msg.Msg

ActionDrop adds ACTION_DROP to m and returns it.

func ActionIsAccept added in v0.1.7

func ActionIsAccept(m *msg.Msg) bool

ActionIsAccept returns true if ACTION_ACCEPT is set in m.

func ActionIsBorrow added in v0.1.7

func ActionIsBorrow(m *msg.Msg) bool

ActionIsBorrow returns true if ACTION_BORROW is set in m.

func ActionIsDrop added in v0.1.7

func ActionIsDrop(m *msg.Msg) bool

ActionIsDrop returns true if ACTION_DROP is set in m.

Types

type Action

type Action byte

Action requests special handling of a message in Pipe

const (
	// The default, zero action: keep processing as-is.
	ACTION_OK Action = 0

	// Keep the message for later use, do not re-use its memory.
	//
	// You must use this if you wish to re-inject the message,
	// or keep reference to some value inside the msg.
	//
	// Once set, you must not remove this action from a message
	// unless you know you are the sole owner of this message.
	ACTION_BORROW Action = 1 << iota

	// Drop the message immediately from the pipe.
	//
	// If you want to re-inject the message later, set ACTION_BORROW too,
	// and keep in mind the message will try to re-start after where
	// you dropped it, unless you call Context.Clear on it.
	ACTION_DROP

	// Accept the message immediately and write to pipe output.
	ACTION_ACCEPT
)

func (*Action) Add

func (ac *Action) Add(a Action)

Add adds a to action ac

func (*Action) Clear

func (ac *Action) Clear()

Clear clears all bits except for ACTION_BORROW

func (Action) Is

func (ac Action) Is(a Action) bool

Is returns true iff a is set in ac

func (Action) Not

func (ac Action) Not(a Action) bool

IsNot returns true iff a is NOT set in ac

type Callback

type Callback struct {
	Id      int          // optional callback id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner callback is run
	Enabled *atomic.Bool // if non-nil, disables the callback unless true

	Pre  bool // run before non-pre callbacks?
	Raw  bool // if true, do not parse the message (which may already be parsed, but for other reasons)
	Post bool // run after non-post callbacks?

	Dir   msg.Dir      // if non-zero, limits the direction
	Types []msg.Type   // if non-empty, limits message types
	Func  CallbackFunc // the function to call
}

Callback represents a function to call for matching BGP messages

type CallbackFunc

type CallbackFunc func(m *msg.Msg)

CallbackFunc processes message m.

type Context

type Context struct {
	Pipe     *Pipe     // pipe processing the message
	Input    *Proc     // input processing the message (message source)
	Callback *Callback // currently running callback

	Action Action // requested message actions
	// contains filtered or unexported fields
}

Context tracks message processing in a Pipe, stored in Msg.Value.

func MsgContext added in v0.1.6

func MsgContext(m *msg.Msg) *Context

MsgContext returns message Context inside m, creating one if needed.

func (*Context) ActionAccept added in v0.1.7

func (mx *Context) ActionAccept() *Context

ActionAccept adds ACTION_ACCEPT to mx and returns it.

func (*Context) ActionBorrow added in v0.1.7

func (mx *Context) ActionBorrow() *Context

ActionBorrow adds ACTION_BORROW to mx and returns it.

func (*Context) ActionClear added in v0.1.7

func (mx *Context) ActionClear() *Context

ActionClear clears all action flags but ACTION_BORROW in mx and returns it.

func (*Context) ActionDrop added in v0.1.7

func (mx *Context) ActionDrop() *Context

ActionDrop adds ACTION_DROP to mx and returns it.

func (*Context) ActionIsAccept added in v0.1.7

func (mx *Context) ActionIsAccept() bool

ActionIsAccept returns true if ACTION_ACCEPT is set in mx.

func (*Context) ActionIsBorrow added in v0.1.7

func (mx *Context) ActionIsBorrow() bool

ActionIsBorrow returns true if ACTION_BORROW is set in mx.

func (*Context) ActionIsDrop added in v0.1.7

func (mx *Context) ActionIsDrop() bool

ActionIsDrop returns true if ACTION_DROP is set in mx.

func (*Context) FromJSON added in v0.1.6

func (mx *Context) FromJSON(src []byte) error

TODO

func (*Context) GetTag added in v0.1.6

func (mx *Context) GetTag(tag string) string

GetTag returns given Tag value, or "" if not set

func (*Context) HasTag added in v0.1.6

func (mx *Context) HasTag(tag string) bool

HasTag returns true iff the context has a Tag set

func (*Context) Reset added in v0.1.6

func (mx *Context) Reset()

Reset resets pc to empty state

func (*Context) SetTag added in v0.1.6

func (mx *Context) SetTag(tag string, val ...string)

SetTag set given Tag to given value, or to a value of "" if not provided

func (*Context) Tags added in v0.1.6

func (mx *Context) Tags() map[string]string

Tags returns a generic string Tag-Value store, creating it first if needed.

func (*Context) ToJSON added in v0.1.6

func (mx *Context) ToJSON(dst []byte) []byte

TODO

type Event

type Event struct {
	Pipe *Pipe     `json:"-"`              // parent pipe
	Seq  uint64    `json:"seq,omitempty"`  // event sequence number
	Time time.Time `json:"time,omitempty"` // event timestamp

	Type  string   `json:"type"`  // type, usually "lib/pkg.NAME"
	Dir   msg.Dir  `json:"dir"`   // optional event direction
	Msg   *msg.Msg `json:"-"`     // optional message that caused the event
	Error error    `json:"err"`   // optional error related to the event
	Value any      `json:"value"` // optional value, type-specific
	// contains filtered or unexported fields
}

Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.

func (*Event) String

func (ev *Event) String() string

String returns the event Type, or "(nil)" if ev is nil

func (*Event) Wait

func (ev *Event) Wait()

Wait blocks until the event is handled

type FilterMode

type FilterMode = int
const (
	// callback filter disabled
	FILTER_NONE FilterMode = iota

	// skip if callback id == value
	FILTER_EQ

	// skip if callback id > value
	FILTER_GT

	// skip if callback id < value
	FILTER_LT

	// skip if callback id >= value
	FILTER_GE

	// skip if callback id <= value
	FILTER_LE

	// skip if callback id != value
	FILTER_NE

	// skip all callbacks
	FILTER_ALL
)

type Handler

type Handler struct {
	Id      int          // optional handler id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner handler is run
	Enabled *atomic.Bool // if non-nil, disables the handler unless true

	Pre  bool // run before non-pre handlers?
	Post bool // run after non-post handlers?

	Dir   msg.Dir     // if non-zero, limits the direction
	Types []string    // if non-empty, limits event types
	Func  HandlerFunc // the function to call
}

Handler represents a function to call for matching pipe events

type HandlerFunc

type HandlerFunc func(ev *Event) (keep_event bool)

HandlerFunc handles event ev. If returns false, unregisters the parent Handler (for all Types).

type Line

type Line struct {
	Pipe *Pipe   // parent pipe
	Dir  msg.Dir // line direction

	// the default input Proc, which processes messages through all callbacks.
	Proc

	// Out is the Line output, where you can read processed messages from.
	Out chan *msg.Msg

	// UNIX timestamp (seconds) of the last valid OPEN message
	LastOpen atomic.Int64

	// UNIX timestamp (seconds) of the last KEEPALIVE message
	LastAlive atomic.Int64

	// UNIX timestamp (seconds) of the last UPDATE message
	LastUpdate atomic.Int64

	// the OPEN message that updated LastOpen
	Open atomic.Pointer[msg.Open]
	// contains filtered or unexported fields
}

Line implements one direction of a Pipe: possibly several input processors run messages through callbacks and write the results to a common output.

func (*Line) Close

func (l *Line) Close()

Close safely closes all inputs, which should eventually stop the line

func (*Line) CloseOutput

func (l *Line) CloseOutput()

CloseOutput safely closes the output channel.

func (*Line) Read

func (l *Line) Read(dst []byte) (int, error)

Read reads l.Out and writes raw BGP data to dst. Must not be used concurrently.

func (*Line) Wait

func (l *Line) Wait()

Wait blocks until all processing is done

func (*Line) WriteOut

func (l *Line) WriteOut(m *msg.Msg) (write_error error)

WriteOut safely sends m to l.Out, avoiding a panic if closed.

func (*Line) WriteTo

func (l *Line) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo interface, writing raw BGP data to w

type Options

type Options struct {
	Logger  *zerolog.Logger // if nil logging is disabled
	MsgPool *sync.Pool      // optional pool for msg.Msg

	Caps bool // overwrite pipe.Caps using OPEN messages?

	Callbacks []*Callback // message callbacks
	Handlers  []*Handler  // event handlers
	Procs     []*Proc     // input processors
}

BGP pipe options

func (*Options) AddCallback

func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback

AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.

func (*Options) AddHandler

func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler

AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.

func (*Options) AddProc added in v0.1.6

func (o *Options) AddProc(dir msg.Dir, tpl ...*Proc) *Proc

AddProc adds input processor for given pipe direction, with optional details in tpl.

func (*Options) OnEstablished

func (o *Options) OnEstablished(hdf HandlerFunc) *Handler

OnEstablished request hdf to be called when the BGP session is established.

func (*Options) OnEvent

func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler

OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.

func (*Options) OnEventPost

func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler

OnEventPost is like OnEvent but requests to run hdf after other handlers

func (*Options) OnEventPre

func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler

OnEventPre is like OnEvent but requests to run hdf before other handlers

func (*Options) OnMsg

func (o *Options) OnMsg(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsg adds a callback for all messages of given types (or all types if not specified).

func (*Options) OnMsgPost

func (o *Options) OnMsgPost(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsgPost is like OnMsg but requests to run cb after other callbacks

func (*Options) OnMsgPre

func (o *Options) OnMsgPre(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback

OnMsgPre is like OnMsg but requests to run cb before other callbacks

func (*Options) OnParseError

func (o *Options) OnParseError(hdf HandlerFunc) *Handler

OnParseError request hdf to be called on BGP message parse error.

func (*Options) OnStart

func (o *Options) OnStart(hdf HandlerFunc) *Handler

OnStart request hdf to be called after the pipe starts.

func (*Options) OnStop

func (o *Options) OnStop(hdf HandlerFunc) *Handler

OnStop request hdf to be called when the pipe stops.

type Pipe

type Pipe struct {
	*zerolog.Logger

	Options           // pipe options; modify before Start()
	Caps    caps.Caps // BGP capability context; always thread-safe
	L       *Line     // line processing messages from R to L
	R       *Line     // line processing messages from L to R

	// generic Key-Value store, always thread-safe
	KV *xsync.MapOf[string, any]
	// contains filtered or unexported fields
}

Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.

Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.

func NewPipe

func NewPipe(ctx context.Context) *Pipe

NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().

func (*Pipe) Event

func (p *Pipe) Event(et string, args ...any) *Event

Event announces a new event type et to the pipe, with optional arguments. The first msg.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into single ev.Error. The remaining arguments are used as ev.Val.

func (*Pipe) GetMsg added in v0.1.7

func (p *Pipe) GetMsg() (m *msg.Msg)

GetMsg returns empty msg from pool, or a new msg object

func (*Pipe) LineFor added in v0.1.6

func (p *Pipe) LineFor(dst msg.Dir) *Line

LineFor returns the line processing messages destined for dst. Returns p.R if dst is bidir (DST_LR).

func (*Pipe) PutMsg added in v0.1.7

func (p *Pipe) PutMsg(m *msg.Msg)

PutMsg resets msg and returns it to pool, which might free it

func (*Pipe) Start

func (p *Pipe) Start()

Start starts given number of r/t message handlers in background, by default r/t = 1/1 (single-threaded, strictly ordered processing).

func (*Pipe) Started

func (p *Pipe) Started() bool

Started returns true iff Start() has already been called = pipe is (being) started.

func (*Pipe) Stop

func (p *Pipe) Stop()

Stop stops all handlers and blocks till handlers finish. Pipe must not be used again past this point. Closes all input channels, which should eventually close all output channels, possibly after this function returns.

func (*Pipe) Stopped

func (p *Pipe) Stopped() bool

Stopped returns true iff Stop() has already been called = pipe is (being) stopped.

func (*Pipe) Wait

func (p *Pipe) Wait()

Wait blocks until the pipe starts and stops completely.

type Proc added in v0.1.6

type Proc struct {
	Pipe *Pipe // attached to this Pipe (nil before pipe start)
	Line *Line // attached to this Line (nil before pipe start)

	Id   int     // optional id
	Name string  // optional name
	Dir  msg.Dir // line direction

	// In is the input for incoming messages.
	In chan *msg.Msg

	// Reverse, when true, runs callbacks in reverse order.
	Reverse bool

	// CallbackFilter controls which callbacks to skip (disabled by default)
	CallbackFilter FilterMode

	// FilterValue specifies the value for CallbackFilter
	FilterValue any

	// statistics
	Stats struct {
		Parsed  uint64
		Short   uint64
		Garbled uint64
	}
	// contains filtered or unexported fields
}

Proc processes incoming BGP messages through Callbacks and (optionally) writes the output to attached Line.

func (*Proc) Close added in v0.1.6

func (in *Proc) Close()

Close safely closes the .In channel, which should eventually stop the Input

func (*Proc) Wait added in v0.1.6

func (in *Proc) Wait()

Wait blocks until the input is done processing the messages

func (*Proc) Write added in v0.1.6

func (in *Proc) Write(src []byte) (n int, err error)

Write implements io.Writer and reads all BGP messages from src into pi.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.

In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.

Must not be used concurrently.

func (*Proc) WriteMsg added in v0.1.6

func (in *Proc) WriteMsg(m *msg.Msg) (write_error error)

WriteMsg safely sends m to pi.In, avoiding a panic if pi.In is closed. It assigns a sequence number and timestamp before writing to the channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL