Documentation
¶
Overview ¶
Package pipe provides BGP message processing with callbacks.
Index ¶
- Variables
- func ActionAccept(m *msg.Msg) *msg.Msg
- func ActionBorrow(m *msg.Msg) *msg.Msg
- func ActionClear(m *msg.Msg) *msg.Msg
- func ActionDrop(m *msg.Msg) *msg.Msg
- func ActionHasAccept(m *msg.Msg) bool
- func ActionHasBorrow(m *msg.Msg) bool
- func ActionHasDrop(m *msg.Msg) bool
- func GetTags(m *msg.Msg) map[string]string
- func UseTags(m *msg.Msg) map[string]string
- type Action
- func (ac *Action) Accept()
- func (ac *Action) Add(a Action)
- func (ac *Action) Borrow()
- func (ac *Action) Clear()
- func (ac *Action) Drop()
- func (ac *Action) FromJSON(src []byte) error
- func (ac Action) Has(a Action) bool
- func (ac Action) HasAccept() bool
- func (ac Action) HasBorrow() bool
- func (ac Action) HasDrop() bool
- func (ac Action) HasNot(a Action) bool
- func (ac Action) ToJSON(dst []byte) []byte
- type Callback
- type CallbackFunc
- type CbFilterMode
- type Context
- func (mx *Context) DropTag(tag string) bool
- func (mx *Context) DropTags() bool
- func (mx *Context) FromJSON(src []byte) error
- func (mx *Context) GetTag(tag string) string
- func (mx *Context) GetTags() map[string]string
- func (mx *Context) HasTag(tag string) bool
- func (mx *Context) HasTags() bool
- func (mx *Context) Reset()
- func (mx *Context) SetTag(tag string, val string)
- func (mx *Context) ToJSON(dst []byte) []byte
- func (mx *Context) UseTags() map[string]string
- type Event
- type Handler
- type HandlerFunc
- type Input
- type Line
- type Options
- func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback
- func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler
- func (o *Options) AddInput(dst dir.Dir, tpl ...*Input) *Input
- func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
- func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnMsg(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback
- func (o *Options) OnMsgPost(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback
- func (o *Options) OnMsgPre(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback
- func (o *Options) OnParseError(hdf HandlerFunc) *Handler
- func (o *Options) OnStart(hdf HandlerFunc) *Handler
- func (o *Options) OnStop(hdf HandlerFunc) *Handler
- type Pipe
- func (p *Pipe) Event(et string, args ...any) *Event
- func (p *Pipe) GetMsg() (m *msg.Msg)
- func (p *Pipe) LineFor(dst dir.Dir) *Line
- func (p *Pipe) ParseMsg(m *msg.Msg) error
- func (p *Pipe) PutMsg(m *msg.Msg)
- func (p *Pipe) Start() error
- func (p *Pipe) Started() bool
- func (p *Pipe) Stop()
- func (p *Pipe) Stopped() bool
- func (p *Pipe) Wait()
Constants ¶
This section is empty.
Variables ¶
var ( ErrInClosed = errors.New("input channel closed") ErrOutClosed = errors.New("output channel closed") ErrStarted = errors.New("pipe started") ErrStopped = errors.New("pipe stopped") ErrFilter = errors.New("filter error") )
var ( // pipe has finished starting EVENT_START = "bgpfix/pipe.START" // pipe is about to stop EVENT_STOP = "bgpfix/pipe.STOP" // could not parse the message before its callback EVENT_PARSE = "bgpfix/pipe.PARSE" // valid OPEN with a bigger message timestamp (seconds) made it to output EVENT_OPEN = "bgpfix/pipe.OPEN" // KEEPALIVE with a bigger message timestamp (seconds) made it to output EVENT_ALIVE = "bgpfix/pipe.ALIVE" // UPDATE with a bigger message timestamp (seconds) made it to output EVENT_UPDATE = "bgpfix/pipe.UPDATE" // session established (OPEN+KEEPALIVE made it to both sides) EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED" // End-of-RIB for new AF made it to output in given direction EVENT_EOR_AF = "bgpfix/pipe.EOR_AF" // End-of-RIB for all AFs in Caps made it to output in given direction EVENT_EOR = "bgpfix/pipe.EOR" )
a collection of events generated internally by pipe
var DefaultOptions = Options{ Logger: &log.Logger, Caps: true, }
Default BGP pipe options
Functions ¶
func ActionAccept ¶ added in v0.1.7
ActionAccept adds ACTION_ACCEPT to m and returns it.
func ActionBorrow ¶ added in v0.1.7
ActionBorrow adds ACTION_BORROW to m and returns it.
func ActionClear ¶ added in v0.1.7
ActionClear clears all action flags but ACTION_BORROW in m and returns it.
func ActionDrop ¶ added in v0.1.7
ActionDrop adds ACTION_DROP to m and returns it.
func ActionHasAccept ¶ added in v0.10.0
ActionHasAccept returns true if ACTION_ACCEPT is set in m.
func ActionHasBorrow ¶ added in v0.10.0
ActionHasBorrow returns true if ACTION_BORROW is set in m.
func ActionHasDrop ¶ added in v0.10.0
ActionHasDrop returns true if ACTION_DROP is set in m.
Types ¶
type Action ¶
type Action byte
Action requests special handling of a message or event in a Pipe
const ( // Keep the message for later use, do not re-use its memory. // // You must use this if you wish to re-inject the message, // or keep reference to some value inside the msg. // // Once set, you must not remove this action from a message // unless you know you are the sole owner of this message. ACTION_BORROW Action = 1 << iota // Drop the message/event immediately (skip other calls, drop from output). // // If you want to re-inject the message later, set ACTION_BORROW too. // When re-injecting, clear the Action first, and remember the message will re-start // processing from the next callback, unless you clear its Context. ACTION_DROP // Accept the message/event immediately (skip other calls, proceed to output) ACTION_ACCEPT // Mask is logical OR of all defined actions ACTION_MASK Action = 1<<iota - 1 )
const ACTION_OK Action = 0
The default, zero action: keep processing as-is.
type Callback ¶
type Callback struct {
Id int // optional callback id number (zero means none)
Name string // optional name
Order int // the lower the order, the sooner callback is run
Enabled *atomic.Bool // if non-nil, disables the callback unless true
Pre bool // run before non-pre callbacks?
Raw bool // if true, do not parse the message (which may already be parsed, but for other reasons)
Post bool // run after non-post callbacks?
Dir dir.Dir // if non-zero, limits the direction
Types []msg.Type // if non-empty, limits message types
Filter []*filter.Filter // skip messages not matching all filters
LimitRate *rate.Limiter // if non-nil, limits the rate of callback invocations
LimitSkip bool // if true, skips the callback when over the rate limit (else delays)
Func CallbackFunc // the function to call
// contains filtered or unexported fields
}
Callback represents a function to call for matching BGP messages. In general, Callbacks can be executed concurrently for different messages, but are always executed sequentially for the same message, in given order.
type CallbackFunc ¶
CallbackFunc processes message m. Return false to drop the message.
type CbFilterMode ¶ added in v0.5.1
type CbFilterMode = int
const ( // callback filter disabled CBFILTER_NONE CbFilterMode = iota // skip if callback id == value CBFILTER_EQ // skip if callback id > value CBFILTER_GT // skip if callback id < value CBFILTER_LT // skip if callback id >= value CBFILTER_GE // skip if callback id <= value CBFILTER_LE // skip if callback id != value CBFILTER_NE // skip all callbacks CBFILTER_ALL )
type Context ¶
type Context struct {
Pipe *Pipe // pipe processing the message
Input *Input // input processing the message (message source)
Callback *Callback // currently running callback
Action Action // requested message actions
// contains filtered or unexported fields
}
Context tracks message processing in a Pipe, stored in Msg.Value.
func GetContext ¶ added in v0.4.0
GetContext returns message Context inside m, iff it exists (or nil).
func UseContext ¶ added in v0.5.1
UseContext returns message Context inside m, creating one if needed.
func (*Context) DropTags ¶ added in v0.2.0
DropTags drops all message tags, returning true if any existed
func (*Context) GetTags ¶ added in v0.5.1
GetTags returns message tags inside mx, iff they exist (or nil).
func (*Context) HasTag ¶ added in v0.1.6
HasTag returns true iff the context has a particular Tag set
type Event ¶
type Event struct {
Pipe *Pipe `json:"-"` // parent pipe
Seq uint64 `json:"seq,omitempty"` // event sequence number
Time time.Time `json:"time,omitempty"` // event timestamp
Type string `json:"type"` // type, usually "lib/pkg.NAME"
Dir dir.Dir `json:"dir"` // optional event direction
Msg string `json:"msg"` // optional BGP message in JSON
Error error `json:"err"` // optional error related to the event
Value any `json:"value"` // optional value, type-specific
Handler *Handler // currently running handler (may be nil)
Action Action // optional event action (zero means none)
// contains filtered or unexported fields
}
Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.
type Handler ¶
type Handler struct {
Id int // optional handler id number (zero means none)
Name string // optional name
Order int // the lower the order, the sooner handler is run
Enabled *atomic.Bool // if non-nil, disables the handler unless true
Pre bool // run before non-pre handlers?
Post bool // run after non-post handlers?
Dir dir.Dir // if non-zero, limits the direction
Types []string // if non-empty, limits event types
Func HandlerFunc // the function to call
// contains filtered or unexported fields
}
Handler represents a function to call for matching pipe events. In general, Handlers can be executed concurrently for different events, but are always executed sequentially for the same event, in given order.
type HandlerFunc ¶
HandlerFunc handles event ev. Return false to unregister the handler (all types).
type Input ¶
type Input struct {
Pipe *Pipe // attached to this Pipe (nil before pipe start)
Line *Line // attached to this Line (nil before pipe start)
Id int // optional input id
Name string // optional input name
Dir dir.Dir // input direction
In chan *msg.Msg // input channel for incoming messages
Reverse bool // if true, run callbacks in reverse order
CbFilter CbFilterMode // which callbacks to skip? (disabled by default)
CbFilterValue any // optionally specifies the value for CbFilter
Filter []*filter.Filter // drop messages not matching all filters
LimitRate *rate.Limiter // optional input rate limit (nil = no limit)
LimitSkip bool // if true, drop messages over the LimitRate (instead delay)
// contains filtered or unexported fields
}
Input processes incoming BGP messages (in given direction) through many Callbacks and optionally writes the result to attached Line.
func (*Input) Close ¶
func (in *Input) Close()
Close safely closes the .In channel, which should eventually stop the Input
func (*Input) Wait ¶
Wait blocks until the input is done processing the messages (returns true), or aborts if the Pipe context is cancelled (returns false).
func (*Input) Write ¶
Write implements io.Writer and reads all BGP messages from src into in.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.
In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.
Must not be used concurrently.
func (*Input) WriteFunc ¶ added in v0.2.0
func (in *Input) WriteFunc(src []byte, cb CallbackFunc) (int, error)
WriteFunc is the same as Input.Write(), but takes an optional callback function to be called just before the message is accepted for processing. If the callback returns false, the message is silently dropped instead.
type Line ¶
type Line struct {
Pipe *Pipe // parent pipe
Dir dir.Dir // line direction
// the default Input, which processes messages through all callbacks.
Input
// Out is the Line output, where you can read processed messages from.
Out chan *msg.Msg
// UNIX timestamp (seconds) of the last valid OPEN message
LastOpen atomic.Int64
// UNIX timestamp (seconds) of the last KEEPALIVE message
LastAlive atomic.Int64
// UNIX timestamp (seconds) of the last UPDATE message
LastUpdate atomic.Int64
// the OPEN message that updated LastOpen
Open atomic.Pointer[msg.Open]
// UNIX timestamp (seconds) of the first EoR for given AF
EoR *xsync.Map[afi.AS, int64]
// contains filtered or unexported fields
}
Line implements one direction of a Pipe: possibly several input processors run messages through callbacks and write the results to a common output.
func (*Line) Close ¶
func (l *Line) Close()
Close safely closes all inputs, which should eventually stop the line
func (*Line) CloseOutput ¶
func (l *Line) CloseOutput()
CloseOutput safely closes the output channel.
func (*Line) Wait ¶
Wait blocks until all processing is done (returns true), or aborts if the Pipe context is cancelled (returns false).
func (*Line) WriteOutput ¶ added in v0.2.0
WriteOutput safely sends m to l.Out, avoiding a panic if closed.
type Options ¶
type Options struct {
Logger *zerolog.Logger // if nil logging is disabled
MsgPool *sync.Pool // optional pool for msg.Msg
Caps bool // overwrite pipe.Caps with the capabilities negotiated in OPEN messages?
Callbacks []*Callback // message callbacks
Handlers []*Handler // event handlers
Inputs []*Input // input processors
}
BGP pipe options
func (*Options) AddCallback ¶
func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback
AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.
func (*Options) AddHandler ¶
func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler
AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.
func (*Options) AddInput ¶
AddInput adds input processor for given pipe direction, with optional details in tpl.
func (*Options) OnEstablished ¶
func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
OnEstablished request hdf to be called when the BGP session is established.
func (*Options) OnEvent ¶
func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.
func (*Options) OnEventPost ¶
func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
OnEventPost is like OnEvent but requests to run hdf after other handlers
func (*Options) OnEventPre ¶
func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
OnEventPre is like OnEvent but requests to run hdf before other handlers
func (*Options) OnMsg ¶
OnMsg adds a callback for all messages of given types (or all types if not specified).
func (*Options) OnParseError ¶
func (o *Options) OnParseError(hdf HandlerFunc) *Handler
OnParseError request hdf to be called on BGP message parse error.
func (*Options) OnStart ¶
func (o *Options) OnStart(hdf HandlerFunc) *Handler
OnStart request hdf to be called after the pipe starts.
func (*Options) OnStop ¶
func (o *Options) OnStop(hdf HandlerFunc) *Handler
OnStop request hdf to be called when the pipe stops.
type Pipe ¶
type Pipe struct {
*zerolog.Logger
Options // pipe options; modify before Start()
Caps caps.Caps // BGP capability context; always thread-safe
L *Line // line processing messages from R to L
R *Line // line processing messages from L to R
// generic Key-Value store, always thread-safe
KV *xsync.Map[string, any]
Ctx context.Context // context for all children
Cancel context.CancelCauseFunc // cancels ctx
// contains filtered or unexported fields
}
Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.
Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.
func NewPipe ¶
NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().
func (*Pipe) Event ¶
Event announces a new event type et to the pipe, with optional arguments. The first dir.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into a single ev.Error. The remaining arguments are used as ev.Val.
func (*Pipe) LineFor ¶ added in v0.1.6
LineFor returns the line processing messages destined for dst. Returns p.R if dst is bidir (DST_LR).
func (*Pipe) ParseMsg ¶ added in v0.4.0
ParseMsg parses given message m (if needed), in the context of this Pipe. In case of error, it emits EVENT_PARSE before returning.
func (*Pipe) Started ¶
Started returns true iff Start() has already been called = pipe is (being) started.
func (*Pipe) Stop ¶
func (p *Pipe) Stop()
Stop stops all inputs and blocks till they finish processing. Pipe must not be used again past this point. Closes all inputs, which should eventually close all outputs, possibly after this function returns.