Documentation
¶
Overview ¶
Package pipe provides BGP message processing with callbacks.
Index ¶
- Variables
- func ActionAccept(m *msg.Msg) *msg.Msg
- func ActionBorrow(m *msg.Msg) *msg.Msg
- func ActionClear(m *msg.Msg) *msg.Msg
- func ActionDrop(m *msg.Msg) *msg.Msg
- func ActionIsAccept(m *msg.Msg) bool
- func ActionIsBorrow(m *msg.Msg) bool
- func ActionIsDrop(m *msg.Msg) bool
- type Action
- type Callback
- type CallbackFunc
- type Context
- func (mx *Context) ActionAccept() *Context
- func (mx *Context) ActionBorrow() *Context
- func (mx *Context) ActionClear() *Context
- func (mx *Context) ActionDrop() *Context
- func (mx *Context) ActionIsAccept() bool
- func (mx *Context) ActionIsBorrow() bool
- func (mx *Context) ActionIsDrop() bool
- func (mx *Context) FromJSON(src []byte) error
- func (mx *Context) GetTag(tag string) string
- func (mx *Context) HasTag(tag string) bool
- func (mx *Context) Reset()
- func (mx *Context) SetTag(tag string, val ...string)
- func (mx *Context) Tags() map[string]string
- func (mx *Context) ToJSON(dst []byte) []byte
- type Event
- type FilterMode
- type Handler
- type HandlerFunc
- type Line
- type Options
- func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback
- func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler
- func (o *Options) AddProc(dir msg.Dir, tpl ...*Proc) *Proc
- func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
- func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnMsg(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback
- func (o *Options) OnMsgPost(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback
- func (o *Options) OnMsgPre(cbf CallbackFunc, dir msg.Dir, types ...msg.Type) *Callback
- func (o *Options) OnParseError(hdf HandlerFunc) *Handler
- func (o *Options) OnStart(hdf HandlerFunc) *Handler
- func (o *Options) OnStop(hdf HandlerFunc) *Handler
- type Pipe
- type Proc
Constants ¶
This section is empty.
Variables ¶
var ( ErrInClosed = errors.New("input channel closed") ErrOutClosed = errors.New("output channel closed") ErrStopped = errors.New("pipe stopped") )
var ( // pipe has finished starting EVENT_START = "bgpfix/pipe.START" // pipe is about to stop EVENT_STOP = "bgpfix/pipe.STOP" // could not parse the message before its callback EVENT_PARSE = "bgpfix/pipe.PARSE" // valid OPEN with a bigger message timestamp (seconds) made it to output EVENT_OPEN = "bgpfix/pipe.OPEN" // KEEPALIVE with a bigger message timestamp (seconds) made it to output EVENT_ALIVE = "bgpfix/pipe.ALIVE" // UPDATE with a bigger message timestamp (seconds) made it to output EVENT_UPDATE = "bgpfix/pipe.UPDATE" // session established (OPEN+KEEPALIVE made it to both sides) EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED" )
a collection of events generated internally by pipe
var DefaultOptions = Options{ Logger: &log.Logger, Caps: true, }
Default BGP pipe options
Functions ¶
func ActionAccept ¶ added in v0.1.7
ActionAccept adds ACTION_ACCEPT to m and returns it.
func ActionBorrow ¶ added in v0.1.7
ActionBorrow adds ACTION_BORROW to m and returns it.
func ActionClear ¶ added in v0.1.7
ActionClear clears all action flags but ACTION_BORROW in m and returns it.
func ActionDrop ¶ added in v0.1.7
ActionDrop adds ACTION_DROP to m and returns it.
func ActionIsAccept ¶ added in v0.1.7
ActionIsAccept returns true if ACTION_ACCEPT is set in m.
func ActionIsBorrow ¶ added in v0.1.7
ActionIsBorrow returns true if ACTION_BORROW is set in m.
func ActionIsDrop ¶ added in v0.1.7
ActionIsDrop returns true if ACTION_DROP is set in m.
Types ¶
type Action ¶
type Action byte
Action requests special handling of a message in Pipe
const ( // The default, zero action: keep processing as-is. ACTION_OK Action = 0 // Keep the message for later use, do not re-use its memory. // // You must use this if you wish to re-inject the message, // or keep reference to some value inside the msg. // // Once set, you must not remove this action from a message // unless you know you are the sole owner of this message. ACTION_BORROW Action = 1 << iota // Drop the message immediately from the pipe. // // If you want to re-inject the message later, set ACTION_BORROW too, // and keep in mind the message will try to re-start after where // you dropped it, unless you call Context.Clear on it. ACTION_DROP // Accept the message immediately and write to pipe output. ACTION_ACCEPT )
type Callback ¶
type Callback struct {
Id int // optional callback id number (zero means none)
Name string // optional name
Order int // the lower the order, the sooner callback is run
Enabled *atomic.Bool // if non-nil, disables the callback unless true
Pre bool // run before non-pre callbacks?
Raw bool // if true, do not parse the message (which may already be parsed, but for other reasons)
Post bool // run after non-post callbacks?
Dir msg.Dir // if non-zero, limits the direction
Types []msg.Type // if non-empty, limits message types
Func CallbackFunc // the function to call
}
Callback represents a function to call for matching BGP messages
type Context ¶
type Context struct {
Pipe *Pipe // pipe processing the message
Input *Proc // input processing the message (message source)
Callback *Callback // currently running callback
Action Action // requested message actions
// contains filtered or unexported fields
}
Context tracks message processing in a Pipe, stored in Msg.Value.
func MsgContext ¶ added in v0.1.6
MsgContext returns message Context inside m, creating one if needed.
func (*Context) ActionAccept ¶ added in v0.1.7
ActionAccept adds ACTION_ACCEPT to mx and returns it.
func (*Context) ActionBorrow ¶ added in v0.1.7
ActionBorrow adds ACTION_BORROW to mx and returns it.
func (*Context) ActionClear ¶ added in v0.1.7
ActionClear clears all action flags but ACTION_BORROW in mx and returns it.
func (*Context) ActionDrop ¶ added in v0.1.7
ActionDrop adds ACTION_DROP to mx and returns it.
func (*Context) ActionIsAccept ¶ added in v0.1.7
ActionIsAccept returns true if ACTION_ACCEPT is set in mx.
func (*Context) ActionIsBorrow ¶ added in v0.1.7
ActionIsBorrow returns true if ACTION_BORROW is set in mx.
func (*Context) ActionIsDrop ¶ added in v0.1.7
ActionIsDrop returns true if ACTION_DROP is set in mx.
func (*Context) SetTag ¶ added in v0.1.6
SetTag set given Tag to given value, or to a value of "" if not provided
type Event ¶
type Event struct {
Pipe *Pipe `json:"-"` // parent pipe
Seq uint64 `json:"seq,omitempty"` // event sequence number
Time time.Time `json:"time,omitempty"` // event timestamp
Type string `json:"type"` // type, usually "lib/pkg.NAME"
Dir msg.Dir `json:"dir"` // optional event direction
Msg *msg.Msg `json:"-"` // optional message that caused the event
Error error `json:"err"` // optional error related to the event
Value any `json:"value"` // optional value, type-specific
// contains filtered or unexported fields
}
Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.
type FilterMode ¶
type FilterMode = int
const ( // callback filter disabled FILTER_NONE FilterMode = iota // skip if callback id == value FILTER_EQ // skip if callback id > value FILTER_GT // skip if callback id < value FILTER_LT // skip if callback id >= value FILTER_GE // skip if callback id <= value FILTER_LE // skip if callback id != value FILTER_NE // skip all callbacks FILTER_ALL )
type Handler ¶
type Handler struct {
Id int // optional handler id number (zero means none)
Name string // optional name
Order int // the lower the order, the sooner handler is run
Enabled *atomic.Bool // if non-nil, disables the handler unless true
Pre bool // run before non-pre handlers?
Post bool // run after non-post handlers?
Dir msg.Dir // if non-zero, limits the direction
Types []string // if non-empty, limits event types
Func HandlerFunc // the function to call
}
Handler represents a function to call for matching pipe events
type HandlerFunc ¶
HandlerFunc handles event ev. If returns false, unregisters the parent Handler (for all Types).
type Line ¶
type Line struct {
Pipe *Pipe // parent pipe
Dir msg.Dir // line direction
// the default input Proc, which processes messages through all callbacks.
Proc
// Out is the Line output, where you can read processed messages from.
Out chan *msg.Msg
// UNIX timestamp (seconds) of the last valid OPEN message
LastOpen atomic.Int64
// UNIX timestamp (seconds) of the last KEEPALIVE message
LastAlive atomic.Int64
// UNIX timestamp (seconds) of the last UPDATE message
LastUpdate atomic.Int64
// the OPEN message that updated LastOpen
Open atomic.Pointer[msg.Open]
// contains filtered or unexported fields
}
Line implements one direction of a Pipe: possibly several input processors run messages through callbacks and write the results to a common output.
func (*Line) Close ¶
func (l *Line) Close()
Close safely closes all inputs, which should eventually stop the line
func (*Line) CloseOutput ¶
func (l *Line) CloseOutput()
CloseOutput safely closes the output channel.
type Options ¶
type Options struct {
Logger *zerolog.Logger // if nil logging is disabled
MsgPool *sync.Pool // optional pool for msg.Msg
Caps bool // overwrite pipe.Caps using OPEN messages?
Callbacks []*Callback // message callbacks
Handlers []*Handler // event handlers
Procs []*Proc // input processors
}
BGP pipe options
func (*Options) AddCallback ¶
func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback
AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.
func (*Options) AddHandler ¶
func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler
AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.
func (*Options) AddProc ¶ added in v0.1.6
AddProc adds input processor for given pipe direction, with optional details in tpl.
func (*Options) OnEstablished ¶
func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
OnEstablished request hdf to be called when the BGP session is established.
func (*Options) OnEvent ¶
func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.
func (*Options) OnEventPost ¶
func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
OnEventPost is like OnEvent but requests to run hdf after other handlers
func (*Options) OnEventPre ¶
func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
OnEventPre is like OnEvent but requests to run hdf before other handlers
func (*Options) OnMsg ¶
OnMsg adds a callback for all messages of given types (or all types if not specified).
func (*Options) OnParseError ¶
func (o *Options) OnParseError(hdf HandlerFunc) *Handler
OnParseError request hdf to be called on BGP message parse error.
func (*Options) OnStart ¶
func (o *Options) OnStart(hdf HandlerFunc) *Handler
OnStart request hdf to be called after the pipe starts.
func (*Options) OnStop ¶
func (o *Options) OnStop(hdf HandlerFunc) *Handler
OnStop request hdf to be called when the pipe stops.
type Pipe ¶
type Pipe struct {
*zerolog.Logger
Options // pipe options; modify before Start()
Caps caps.Caps // BGP capability context; always thread-safe
L *Line // line processing messages from R to L
R *Line // line processing messages from L to R
// generic Key-Value store, always thread-safe
KV *xsync.MapOf[string, any]
// contains filtered or unexported fields
}
Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.
Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.
func NewPipe ¶
NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().
func (*Pipe) Event ¶
Event announces a new event type et to the pipe, with optional arguments. The first msg.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into single ev.Error. The remaining arguments are used as ev.Val.
func (*Pipe) LineFor ¶ added in v0.1.6
LineFor returns the line processing messages destined for dst. Returns p.R if dst is bidir (DST_LR).
func (*Pipe) Start ¶
func (p *Pipe) Start()
Start starts given number of r/t message handlers in background, by default r/t = 1/1 (single-threaded, strictly ordered processing).
func (*Pipe) Started ¶
Started returns true iff Start() has already been called = pipe is (being) started.
func (*Pipe) Stop ¶
func (p *Pipe) Stop()
Stop stops all handlers and blocks till handlers finish. Pipe must not be used again past this point. Closes all input channels, which should eventually close all output channels, possibly after this function returns.
type Proc ¶ added in v0.1.6
type Proc struct {
Pipe *Pipe // attached to this Pipe (nil before pipe start)
Line *Line // attached to this Line (nil before pipe start)
Id int // optional id
Name string // optional name
Dir msg.Dir // line direction
// In is the input for incoming messages.
In chan *msg.Msg
// Reverse, when true, runs callbacks in reverse order.
Reverse bool
// CallbackFilter controls which callbacks to skip (disabled by default)
CallbackFilter FilterMode
// FilterValue specifies the value for CallbackFilter
FilterValue any
// statistics
Stats struct {
Parsed uint64
Short uint64
Garbled uint64
}
// contains filtered or unexported fields
}
Proc processes incoming BGP messages through Callbacks and (optionally) writes the output to attached Line.
func (*Proc) Close ¶ added in v0.1.6
func (in *Proc) Close()
Close safely closes the .In channel, which should eventually stop the Input
func (*Proc) Wait ¶ added in v0.1.6
func (in *Proc) Wait()
Wait blocks until the input is done processing the messages
func (*Proc) Write ¶ added in v0.1.6
Write implements io.Writer and reads all BGP messages from src into pi.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.
In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.
Must not be used concurrently.