pipe

package
v0.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: MIT Imports: 24 Imported by: 6

Documentation

Overview

Package pipe provides BGP message processing with callbacks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInClosed  = errors.New("input channel closed")
	ErrOutClosed = errors.New("output channel closed")
	ErrStarted   = errors.New("pipe started")
	ErrStopped   = errors.New("pipe stopped")
	ErrFilter    = errors.New("filter error")
)
View Source
var (
	// pipe has finished starting
	EVENT_START = "bgpfix/pipe.START"

	// pipe is about to stop
	EVENT_STOP = "bgpfix/pipe.STOP"

	// could not parse the message before its callback
	EVENT_PARSE = "bgpfix/pipe.PARSE"

	// valid OPEN with a bigger message timestamp (seconds) made it to output
	EVENT_OPEN = "bgpfix/pipe.OPEN"

	// KEEPALIVE with a bigger message timestamp (seconds) made it to output
	EVENT_ALIVE = "bgpfix/pipe.ALIVE"

	// UPDATE with a bigger message timestamp (seconds) made it to output
	EVENT_UPDATE = "bgpfix/pipe.UPDATE"

	// session established (OPEN+KEEPALIVE made it to both sides)
	EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED"

	// End-of-RIB for new AF made it to output in given direction
	EVENT_EOR_AF = "bgpfix/pipe.EOR_AF"

	// End-of-RIB for all AFs in Caps made it to output in given direction
	EVENT_EOR = "bgpfix/pipe.EOR"
)

a collection of events generated internally by pipe

View Source
var DefaultOptions = Options{
	Logger: &log.Logger,
	Caps:   true,
}

Default BGP pipe options

Functions

func ActionAccept added in v0.1.7

func ActionAccept(m *msg.Msg) *msg.Msg

ActionAccept adds ACTION_ACCEPT to m and returns it.

func ActionBorrow added in v0.1.7

func ActionBorrow(m *msg.Msg) *msg.Msg

ActionBorrow adds ACTION_BORROW to m and returns it.

func ActionClear added in v0.1.7

func ActionClear(m *msg.Msg) *msg.Msg

ActionClear clears all action flags but ACTION_BORROW in m and returns it.

func ActionDrop added in v0.1.7

func ActionDrop(m *msg.Msg) *msg.Msg

ActionDrop adds ACTION_DROP to m and returns it.

func ActionHasAccept added in v0.10.0

func ActionHasAccept(m *msg.Msg) bool

ActionHasAccept returns true if ACTION_ACCEPT is set in m.

func ActionHasBorrow added in v0.10.0

func ActionHasBorrow(m *msg.Msg) bool

ActionHasBorrow returns true if ACTION_BORROW is set in m.

func ActionHasDrop added in v0.10.0

func ActionHasDrop(m *msg.Msg) bool

ActionHasDrop returns true if ACTION_DROP is set in m.

func GetTags added in v0.10.0

func GetTags(m *msg.Msg) map[string]string

GetTags returns message tags inside m, iff they exist (or nil).

func UseTags added in v0.10.0

func UseTags(m *msg.Msg) map[string]string

UseTags returns message tags inside m, creating them first if needed.

Types

type Action

type Action byte

Action requests special handling of a message or event in a Pipe

const (
	// Keep the message for later use, do not re-use its memory.
	//
	// You must use this if you wish to re-inject the message,
	// or keep reference to some value inside the msg.
	//
	// Once set, you must not remove this action from a message
	// unless you know you are the sole owner of this message.
	ACTION_BORROW Action = 1 << iota

	// Drop the message/event immediately (skip other calls, drop from output).
	//
	// If you want to re-inject the message later, set ACTION_BORROW too.
	// When re-injecting, clear the Action first, and remember the message will re-start
	// processing from the next callback, unless you clear its Context.
	ACTION_DROP

	// Accept the message/event immediately (skip other calls, proceed to output)
	ACTION_ACCEPT

	// Mask is logical OR of all defined actions
	ACTION_MASK Action = 1<<iota - 1
)
const ACTION_OK Action = 0

The default, zero action: keep processing as-is.

func (*Action) Accept added in v0.2.0

func (ac *Action) Accept()

Accept adds ACTION_ACCEPT

func (*Action) Add

func (ac *Action) Add(a Action)

Add adds a to action ac

func (*Action) Borrow added in v0.2.0

func (ac *Action) Borrow()

Borrow adds ACTION_BORROW

func (*Action) Clear

func (ac *Action) Clear()

Clear clears all bits except for ACTION_BORROW

func (*Action) Drop added in v0.2.0

func (ac *Action) Drop()

Drop adds ACTION_DROP

func (*Action) FromJSON added in v0.2.0

func (ac *Action) FromJSON(src []byte) error

FromJSON parses JSON representation in src

func (Action) Has added in v0.10.0

func (ac Action) Has(a Action) bool

Has returns true iff a is set in ac

func (Action) HasAccept added in v0.10.0

func (ac Action) HasAccept() bool

HasAccept returns true iff ACTION_ACCEPT is set in ac

func (Action) HasBorrow added in v0.10.0

func (ac Action) HasBorrow() bool

HasBorrow returns true iff ACTION_BORROW is set in ac

func (Action) HasDrop added in v0.10.0

func (ac Action) HasDrop() bool

HasDrop returns true iff ACTION_DROP is set in ac

func (Action) HasNot added in v0.10.0

func (ac Action) HasNot(a Action) bool

HasNot returns true iff a is NOT set in ac

func (Action) ToJSON added in v0.2.0

func (ac Action) ToJSON(dst []byte) []byte

ToJSON appends JSON representation to dst

type Callback

type Callback struct {
	Id      int          // optional callback id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner callback is run
	Enabled *atomic.Bool // if non-nil, disables the callback unless true

	Pre  bool // run before non-pre callbacks?
	Raw  bool // if true, do not parse the message (which may already be parsed, but for other reasons)
	Post bool // run after non-post callbacks?

	Dir       dir.Dir          // if non-zero, limits the direction
	Types     []msg.Type       // if non-empty, limits message types
	Filter    []*filter.Filter // skip messages not matching all filters
	LimitRate *rate.Limiter    // if non-nil, limits the rate of callback invocations
	LimitSkip bool             // if true, skips the callback when over the rate limit (else delays)

	Func CallbackFunc // the function to call
	// contains filtered or unexported fields
}

Callback represents a function to call for matching BGP messages. In general, Callbacks can be executed concurrently for different messages, but are always executed sequentially for the same message, in given order.

func (*Callback) Drop added in v0.2.0

func (cb *Callback) Drop()

Drop marks the callback as dropped permanently.

func (*Callback) String added in v0.2.0

func (cb *Callback) String() string

String returns callback name and id as string

type CallbackFunc

type CallbackFunc func(m *msg.Msg) (keep_message bool)

CallbackFunc processes message m. Return false to drop the message.

type CbFilterMode added in v0.5.1

type CbFilterMode = int
const (
	// callback filter disabled
	CBFILTER_NONE CbFilterMode = iota

	// skip if callback id == value
	CBFILTER_EQ

	// skip if callback id > value
	CBFILTER_GT

	// skip if callback id < value
	CBFILTER_LT

	// skip if callback id >= value
	CBFILTER_GE

	// skip if callback id <= value
	CBFILTER_LE

	// skip if callback id != value
	CBFILTER_NE

	// skip all callbacks
	CBFILTER_ALL
)

type Context

type Context struct {
	Pipe     *Pipe     // pipe processing the message
	Input    *Input    // input processing the message (message source)
	Callback *Callback // currently running callback

	Action Action // requested message actions
	// contains filtered or unexported fields
}

Context tracks message processing in a Pipe, stored in Msg.Value.

func GetContext added in v0.4.0

func GetContext(m *msg.Msg) *Context

GetContext returns message Context inside m, iff it exists (or nil).

func UseContext added in v0.5.1

func UseContext(m *msg.Msg) *Context

UseContext returns message Context inside m, creating one if needed.

func (*Context) DropTag added in v0.5.1

func (mx *Context) DropTag(tag string) bool

DropTag deletes given tag, returning true if it existed

func (*Context) DropTags added in v0.2.0

func (mx *Context) DropTags() bool

DropTags drops all message tags, returning true if any existed

func (*Context) FromJSON added in v0.1.6

func (mx *Context) FromJSON(src []byte) error

FromJSON unmarshals Context from JSON

func (*Context) GetTag added in v0.1.6

func (mx *Context) GetTag(tag string) string

GetTag returns given Tag value, or "" if not set

func (*Context) GetTags added in v0.5.1

func (mx *Context) GetTags() map[string]string

GetTags returns message tags inside mx, iff they exist (or nil).

func (*Context) HasTag added in v0.1.6

func (mx *Context) HasTag(tag string) bool

HasTag returns true iff the context has a particular Tag set

func (*Context) HasTags added in v0.2.0

func (mx *Context) HasTags() bool

HasTags returns true iff the context has any Tags set

func (*Context) Reset added in v0.1.6

func (mx *Context) Reset()

Reset resets pc to empty state

func (*Context) SetTag added in v0.1.6

func (mx *Context) SetTag(tag string, val string)

SetTag set given Tag to given value.

func (*Context) ToJSON added in v0.1.6

func (mx *Context) ToJSON(dst []byte) []byte

ToJSON marshals Context to JSON

func (*Context) UseTags added in v0.5.1

func (mx *Context) UseTags() map[string]string

UseTags returns message tags inside mx, creating them first if needed

type Event

type Event struct {
	Pipe *Pipe     `json:"-"`              // parent pipe
	Seq  uint64    `json:"seq,omitempty"`  // event sequence number
	Time time.Time `json:"time,omitempty"` // event timestamp

	Type  string  `json:"type"`  // type, usually "lib/pkg.NAME"
	Dir   dir.Dir `json:"dir"`   // optional event direction
	Msg   string  `json:"msg"`   // optional BGP message in JSON
	Error error   `json:"err"`   // optional error related to the event
	Value any     `json:"value"` // optional value, type-specific

	Handler *Handler // currently running handler (may be nil)
	Action  Action   // optional event action (zero means none)
	// contains filtered or unexported fields
}

Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.

func (*Event) String

func (ev *Event) String() string

String returns event type and seq number as string

func (*Event) Wait

func (ev *Event) Wait() bool

Wait blocks until the event is handled (returns true), or aborts if the Pipe context is cancelled (returns false).

type Handler

type Handler struct {
	Id      int          // optional handler id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner handler is run
	Enabled *atomic.Bool // if non-nil, disables the handler unless true

	Pre  bool // run before non-pre handlers?
	Post bool // run after non-post handlers?

	Dir   dir.Dir  // if non-zero, limits the direction
	Types []string // if non-empty, limits event types

	Func HandlerFunc // the function to call
	// contains filtered or unexported fields
}

Handler represents a function to call for matching pipe events. In general, Handlers can be executed concurrently for different events, but are always executed sequentially for the same event, in given order.

func (*Handler) Drop added in v0.2.0

func (h *Handler) Drop()

Drop marks the handler as dropped permanently.

func (*Handler) String added in v0.2.0

func (h *Handler) String() string

String returns handler name and id as string

type HandlerFunc

type HandlerFunc func(ev *Event) (keep_handler bool)

HandlerFunc handles event ev. Return false to unregister the handler (all types).

type Input

type Input struct {
	Pipe *Pipe // attached to this Pipe (nil before pipe start)
	Line *Line // attached to this Line (nil before pipe start)

	Id   int     // optional input id
	Name string  // optional input name
	Dir  dir.Dir // input direction

	In            chan *msg.Msg    // input channel for incoming messages
	Reverse       bool             // if true, run callbacks in reverse order
	CbFilter      CbFilterMode     // which callbacks to skip? (disabled by default)
	CbFilterValue any              // optionally specifies the value for CbFilter
	Filter        []*filter.Filter // drop messages not matching all filters
	LimitRate     *rate.Limiter    // optional input rate limit (nil = no limit)
	LimitSkip     bool             // if true, drop messages over the LimitRate (instead delay)
	// contains filtered or unexported fields
}

Input processes incoming BGP messages (in given direction) through many Callbacks and optionally writes the result to attached Line.

func (*Input) Close

func (in *Input) Close()

Close safely closes the .In channel, which should eventually stop the Input

func (*Input) Wait

func (in *Input) Wait() bool

Wait blocks until the input is done processing the messages (returns true), or aborts if the Pipe context is cancelled (returns false).

func (*Input) Write

func (in *Input) Write(src []byte) (int, error)

Write implements io.Writer and reads all BGP messages from src into in.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if pi.In is full.

In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, until it returns a nil err.

Must not be used concurrently.

func (*Input) WriteFunc added in v0.2.0

func (in *Input) WriteFunc(src []byte, cb CallbackFunc) (int, error)

WriteFunc is the same as Input.Write(), but takes an optional callback function to be called just before the message is accepted for processing. If the callback returns false, the message is silently dropped instead.

func (*Input) WriteMsg

func (in *Input) WriteMsg(m *msg.Msg) (write_error error)

WriteMsg safely sends m to in.In, avoiding a panic if it is closed. It assigns a sequence number and timestamp before writing to the channel.

type Line

type Line struct {
	Pipe *Pipe   // parent pipe
	Dir  dir.Dir // line direction

	// the default Input, which processes messages through all callbacks.
	Input

	// Out is the Line output, where you can read processed messages from.
	Out chan *msg.Msg

	// UNIX timestamp (seconds) of the last valid OPEN message
	LastOpen atomic.Int64

	// UNIX timestamp (seconds) of the last KEEPALIVE message
	LastAlive atomic.Int64

	// UNIX timestamp (seconds) of the last UPDATE message
	LastUpdate atomic.Int64

	// the OPEN message that updated LastOpen
	Open atomic.Pointer[msg.Open]

	// UNIX timestamp (seconds) of the first EoR for given AF
	EoR *xsync.Map[afi.AS, int64]
	// contains filtered or unexported fields
}

Line implements one direction of a Pipe: possibly several input processors run messages through callbacks and write the results to a common output.

func (*Line) Close

func (l *Line) Close()

Close safely closes all inputs, which should eventually stop the line

func (*Line) CloseOutput

func (l *Line) CloseOutput()

CloseOutput safely closes the output channel.

func (*Line) Read

func (l *Line) Read(dst []byte) (int, error)

Read reads l.Out and writes raw BGP data to dst. Must not be used concurrently.

func (*Line) Wait

func (l *Line) Wait() bool

Wait blocks until all processing is done (returns true), or aborts if the Pipe context is cancelled (returns false).

func (*Line) WriteOutput added in v0.2.0

func (l *Line) WriteOutput(m *msg.Msg) (write_error error)

WriteOutput safely sends m to l.Out, avoiding a panic if closed.

func (*Line) WriteTo

func (l *Line) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo interface, writing raw BGP data to w

type Options

type Options struct {
	Logger  *zerolog.Logger // if nil logging is disabled
	MsgPool *sync.Pool      // optional pool for msg.Msg

	Caps bool // overwrite pipe.Caps with the capabilities negotiated in OPEN messages?

	Callbacks []*Callback // message callbacks
	Handlers  []*Handler  // event handlers
	Inputs    []*Input    // input processors
}

BGP pipe options

func (*Options) AddCallback

func (o *Options) AddCallback(cbf CallbackFunc, tpl ...*Callback) *Callback

AddCallbacks adds a callback function using tpl as its template (if present). It returns the added Callback, which can be further configured.

func (*Options) AddHandler

func (o *Options) AddHandler(hdf HandlerFunc, tpl ...*Handler) *Handler

AddHandler adds a handler function using tpl as its template (if present). It returns the added Handler, which can be further configured.

func (*Options) AddInput

func (o *Options) AddInput(dst dir.Dir, tpl ...*Input) *Input

AddInput adds input processor for given pipe direction, with optional details in tpl.

func (*Options) OnEstablished

func (o *Options) OnEstablished(hdf HandlerFunc) *Handler

OnEstablished request hdf to be called when the BGP session is established.

func (*Options) OnEvent

func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler

OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.

func (*Options) OnEventPost

func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler

OnEventPost is like OnEvent but requests to run hdf after other handlers

func (*Options) OnEventPre

func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler

OnEventPre is like OnEvent but requests to run hdf before other handlers

func (*Options) OnMsg

func (o *Options) OnMsg(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback

OnMsg adds a callback for all messages of given types (or all types if not specified).

func (*Options) OnMsgPost

func (o *Options) OnMsgPost(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback

OnMsgPost is like OnMsg but requests to run cb after other callbacks

func (*Options) OnMsgPre

func (o *Options) OnMsgPre(cbf CallbackFunc, dir dir.Dir, types ...msg.Type) *Callback

OnMsgPre is like OnMsg but requests to run cb before other callbacks

func (*Options) OnParseError

func (o *Options) OnParseError(hdf HandlerFunc) *Handler

OnParseError request hdf to be called on BGP message parse error.

func (*Options) OnStart

func (o *Options) OnStart(hdf HandlerFunc) *Handler

OnStart request hdf to be called after the pipe starts.

func (*Options) OnStop

func (o *Options) OnStop(hdf HandlerFunc) *Handler

OnStop request hdf to be called when the pipe stops.

type Pipe

type Pipe struct {
	*zerolog.Logger

	Options           // pipe options; modify before Start()
	Caps    caps.Caps // BGP capability context; always thread-safe
	L       *Line     // line processing messages from R to L
	R       *Line     // line processing messages from L to R

	// generic Key-Value store, always thread-safe
	KV *xsync.Map[string, any]

	Ctx    context.Context         // context for all children
	Cancel context.CancelCauseFunc // cancels ctx
	// contains filtered or unexported fields
}

Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines, with an internal event system.

Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.

func NewPipe

func NewPipe(ctx context.Context) *Pipe

NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().

func (*Pipe) Event

func (p *Pipe) Event(et string, args ...any) *Event

Event announces a new event type et to the pipe, with optional arguments. The first dir.Dir argument is used as ev.Dir. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into a single ev.Error. The remaining arguments are used as ev.Val.

func (*Pipe) GetMsg added in v0.1.7

func (p *Pipe) GetMsg() (m *msg.Msg)

GetMsg returns empty msg from pool, or a new msg object

func (*Pipe) LineFor added in v0.1.6

func (p *Pipe) LineFor(dst dir.Dir) *Line

LineFor returns the line processing messages destined for dst. Returns p.R if dst is bidir (DST_LR).

func (*Pipe) ParseMsg added in v0.4.0

func (p *Pipe) ParseMsg(m *msg.Msg) error

ParseMsg parses given message m (if needed), in the context of this Pipe. In case of error, it emits EVENT_PARSE before returning.

func (*Pipe) PutMsg added in v0.1.7

func (p *Pipe) PutMsg(m *msg.Msg)

PutMsg resets msg and returns it to pool, which might free it

func (*Pipe) Start

func (p *Pipe) Start() error

Start starts the Pipe in background and returns.

func (*Pipe) Started

func (p *Pipe) Started() bool

Started returns true iff Start() has already been called = pipe is (being) started.

func (*Pipe) Stop

func (p *Pipe) Stop()

Stop stops all inputs and blocks till they finish processing. Pipe must not be used again past this point. Closes all inputs, which should eventually close all outputs, possibly after this function returns.

func (*Pipe) Stopped

func (p *Pipe) Stopped() bool

Stopped returns true iff Stop() has already been called = pipe is (being) stopped.

func (*Pipe) Wait

func (p *Pipe) Wait()

Wait blocks until the pipe starts and stops completely.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL