xgress

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2025 License: Apache-2.0 Imports: 26 Imported by: 27

Documentation

Index

Constants

View Source
const (
	MinHeaderKey = 2000
	MaxHeaderKey = MinHeaderKey + int32(math.MaxUint8)

	HeaderKeyCircuitId      = 2256
	HeaderKeySequence       = 2257
	HeaderKeyFlags          = 2258
	HeaderKeyRecvBufferSize = 2259
	HeaderKeyRTT            = 2260
	HeaderPayloadRaw        = 2261

	ContentTypePayloadType         = 1100
	ContentTypeAcknowledgementType = 1101
	ContentTypeControlType         = 1102
)
View Source
const (
	ControlHopCount  = 20
	ControlHopType   = 21
	ControlHopId     = 22
	ControlTimestamp = 23
	ControlUserVal   = 24
	ControlError     = 25
)
View Source
const (
	VersionMask        byte = 0b00000110
	TerminatorFlagMask byte = 0b00001000
	RttFlagMask        byte = 0b00010000
	ChunkFlagMask      byte = 0b00100000
	HeadersFlagMask    byte = 0b01000000
	HeartbeatFlagMask  byte = 0b10000000

	CircuitIdSizeMask     byte = 0b00001111
	PayloadProtocolV1     byte = 1
	PayloadProtocolOffset byte = 1
)
View Source
const DECODER = "data"
View Source
const (
	HeaderKeyUUID = 0
)

Variables

View Source
var ContentTypeValue = map[string]int32{
	"PayloadType":         ContentTypePayloadType,
	"AcknowledgementType": ContentTypeAcknowledgementType,
	"ControlType":         ContentTypeControlType,
}

Functions

func DecodePayload

func DecodePayload(payload *Payload) ([]byte, bool)

func RespondToTraceRequest

func RespondToTraceRequest(headers channel.Headers, hopType, hopId string, response ControlReceiver)

func SetOriginatorFlag

func SetOriginatorFlag(flags uint32, originator Originator) uint32

func UnmarshallPacketPayload

func UnmarshallPacketPayload(buf []byte) (*channel.Message, error)

Types

type AckSender

type AckSender interface {
	SendAck(ack *Acknowledgement, address Address)
}

type Acknowledgement

type Acknowledgement struct {
	CircuitId      string
	Flags          uint32
	RecvBufferSize uint32
	RTT            uint16
	Sequence       []int32
}

func NewAcknowledgement

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement

func UnmarshallAcknowledgement

func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error)

func (*Acknowledgement) GetCircuitId

func (ack *Acknowledgement) GetCircuitId() string

func (*Acknowledgement) GetFlags

func (ack *Acknowledgement) GetFlags() uint32

func (*Acknowledgement) GetLoggerFields

func (ack *Acknowledgement) GetLoggerFields() logrus.Fields

func (*Acknowledgement) GetOriginator

func (ack *Acknowledgement) GetOriginator() Originator

func (*Acknowledgement) GetSequence

func (ack *Acknowledgement) GetSequence() []int32

func (*Acknowledgement) Marshall

func (ack *Acknowledgement) Marshall() *channel.Message

type Address

type Address string

type BindHandler

type BindHandler interface {
	HandleXgressBind(x *Xgress)
}

The BindHandlers are invoked to install the appropriate handlers.

type CircuitDetail

type CircuitDetail struct {
	CircuitId  string `json:"circuitId"`
	ConnId     uint32 `json:"connId"`
	Address    string `json:"address"`
	Originator string `json:"originator"`
	IsXgress   bool   `json:"isXgress"`
}

type CircuitInspectDetail

type CircuitInspectDetail struct {
	CircuitId       string                    `json:"circuitId"`
	Forwards        map[string]string         `json:"forwards"`
	XgressDetails   map[string]*InspectDetail `json:"xgressDetails"`
	RelatedEntities map[string]map[string]any `json:"relatedEntities"`
	Errors          []string                  `json:"errors"`
	// contains filtered or unexported fields
}

func (*CircuitInspectDetail) AddError

func (self *CircuitInspectDetail) AddError(err error)

func (*CircuitInspectDetail) AddRelatedEntity

func (self *CircuitInspectDetail) AddRelatedEntity(entityType string, id string, detail any)

func (*CircuitInspectDetail) AddXgressDetail

func (self *CircuitInspectDetail) AddXgressDetail(xgressDetail *InspectDetail)

func (*CircuitInspectDetail) IncludeGoroutines

func (self *CircuitInspectDetail) IncludeGoroutines() bool

func (*CircuitInspectDetail) SetIncludeGoroutines

func (self *CircuitInspectDetail) SetIncludeGoroutines(includeGoroutines bool)

type CircuitsDetail

type CircuitsDetail struct {
	Circuits map[string]*CircuitDetail `json:"circuits"`
}

type CloseHandler

type CloseHandler interface {
	// HandleXgressClose is invoked when the connected peer terminates the communication.
	//
	HandleXgressClose(x *Xgress)
}

CloseHandler is invoked by an xgress when the connected peer terminates the communication.

type CloseHandlerF

type CloseHandlerF func(x *Xgress)

CloseHandlerF is the function version of CloseHandler

func (CloseHandlerF) HandleXgressClose

func (self CloseHandlerF) HandleXgressClose(x *Xgress)

type Connection

type Connection interface {
	io.Closer
	LogContext() string
	ReadPayload() ([]byte, map[uint8][]byte, error)
	WritePayload([]byte, map[uint8][]byte) (int, error)
	HandleControlMsg(controlType ControlType, headers channel.Headers, responder ControlReceiver) error
}

type Control

type Control struct {
	Type      ControlType
	CircuitId string
	Headers   channel.Headers
}

func UnmarshallControl

func UnmarshallControl(msg *channel.Message) (*Control, error)

func (*Control) CreateTraceResponse

func (self *Control) CreateTraceResponse(hopType, hopId string) *Control

func (*Control) DecrementAndGetHop

func (self *Control) DecrementAndGetHop() uint32

func (*Control) GetLoggerFields

func (self *Control) GetLoggerFields() logrus.Fields

func (*Control) IsTypeTraceRoute

func (self *Control) IsTypeTraceRoute() bool

func (*Control) IsTypeTraceRouteResponse

func (self *Control) IsTypeTraceRouteResponse() bool

func (*Control) Marshall

func (self *Control) Marshall() *channel.Message

type ControlReceiver

type ControlReceiver interface {
	HandleControlReceive(controlType ControlType, headers channel.Headers)
}

type ControlType

type ControlType byte
const (
	ControlTypeTraceRoute         ControlType = 1
	ControlTypeTraceRouteResponse ControlType = 2
)

func (ControlType) String

func (self ControlType) String() string

type DataPlaneAdapter

type DataPlaneAdapter interface {
	// ForwardPayload is used to forward data payloads onto the data-plane from an xgress
	ForwardPayload(payload *Payload, x *Xgress)

	// RetransmitPayload is used to retransmit data payloads onto the data-plane from an xgress
	RetransmitPayload(srcAddr Address, payload *Payload) error

	// ForwardControlMessage is used to forward control messages onto the data-plane from an xgress
	ForwardControlMessage(control *Control, x *Xgress)

	// ForwardAcknowledgement is used to forward acks onto the data-plane from an xgress
	ForwardAcknowledgement(ack *Acknowledgement, address Address)

	Env
}

DataPlaneAdapter is invoked by an xgress whenever messages need to be sent to the data plane. Generally a DataPlaneAdapter is implemented to connect the xgress to a data plane data transmission system.

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *channel.Message) ([]byte, bool)

type Env

type Env interface {
	GetRetransmitter() *Retransmitter
	GetPayloadIngester() *PayloadIngester
	GetMetrics() Metrics
}

type Flag

type Flag uint32
const (
	PayloadFlagCircuitEnd   Flag = 1
	PayloadFlagOriginator   Flag = 2
	PayloadFlagCircuitStart Flag = 4
	PayloadFlagChunk        Flag = 8
	PayloadFlagRetransmit   Flag = 16
)

type InspectDetail

type InspectDetail struct {
	Address               string            `json:"address"`
	Originator            string            `json:"originator"`
	TimeSinceLastLinkRx   string            `json:"timeSinceLastLinkRx"`
	SendBufferDetail      *SendBufferDetail `json:"sendBufferDetail"`
	RecvBufferDetail      *RecvBufferDetail `json:"recvBufferDetail"`
	XgressPointer         string            `json:"xgressPointer"`
	LinkSendBufferPointer string            `json:"linkSendBufferPointer"`
	Goroutines            []string          `json:"goroutines"`
	Sequence              uint64            `json:"sequence"`
	Flags                 string            `json:"flags"`
}

type InvalidTerminatorError

type InvalidTerminatorError struct {
	InnerError error
}

func (InvalidTerminatorError) Error

func (e InvalidTerminatorError) Error() string

func (InvalidTerminatorError) Unwrap

func (e InvalidTerminatorError) Unwrap() error

type LinkReceiveBuffer

type LinkReceiveBuffer struct {
	// contains filtered or unexported fields
}

func NewLinkReceiveBuffer

func NewLinkReceiveBuffer() *LinkReceiveBuffer

func (*LinkReceiveBuffer) Inspect

func (buffer *LinkReceiveBuffer) Inspect(x *Xgress) *RecvBufferDetail

func (*LinkReceiveBuffer) PeekHead

func (buffer *LinkReceiveBuffer) PeekHead() *Payload

func (*LinkReceiveBuffer) ReceiveUnordered

func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool

func (*LinkReceiveBuffer) Remove

func (buffer *LinkReceiveBuffer) Remove(payload *Payload)

func (*LinkReceiveBuffer) Size

func (buffer *LinkReceiveBuffer) Size() uint32

type LinkSendBuffer

type LinkSendBuffer struct {
	// contains filtered or unexported fields
}

Note: if altering this struct, be sure to account for 64 bit alignment on 32 bit arm arch https://pkg.go.dev/sync/atomic#pkg-note-BUG https://github.com/golang/go/issues/36606

func NewLinkSendBuffer

func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer

func (*LinkSendBuffer) BufferPayload

func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error)

func (*LinkSendBuffer) Close

func (buffer *LinkSendBuffer) Close()

func (*LinkSendBuffer) CloseWhenEmpty

func (buffer *LinkSendBuffer) CloseWhenEmpty() bool

func (*LinkSendBuffer) Inspect

func (buffer *LinkSendBuffer) Inspect() *SendBufferDetail

func (*LinkSendBuffer) ReceiveAcknowledgement

func (buffer *LinkSendBuffer) ReceiveAcknowledgement(ack *Acknowledgement)

type Metrics

type Metrics interface {
	MarkAckReceived()
	MarkPayloadDropped()
	MarkDuplicateAck()
	MarkDuplicatePayload()
	BufferBlockedByLocalWindow()
	BufferUnblockedByLocalWindow()
	BufferBlockedByRemoteWindow()
	BufferUnblockedByRemoteWindow()

	PayloadWritten(duration time.Duration)
	BufferUnblocked(duration time.Duration)

	SendPayloadBuffered(payloadSize int64)
	SendPayloadDelivered(payloadSize int64)
}

func NewMetrics

func NewMetrics(registry metrics.Registry) Metrics

type MisconfiguredTerminatorError

type MisconfiguredTerminatorError struct {
	InnerError error
}

func (MisconfiguredTerminatorError) Error

func (MisconfiguredTerminatorError) Unwrap

type Options

type Options struct {
	Mtu         int32
	RandomDrops bool
	Drop1InN    int32
	TxQueueSize int32

	TxPortalStartSize      uint32
	TxPortalMaxSize        uint32
	TxPortalMinSize        uint32
	TxPortalIncreaseThresh uint32
	TxPortalIncreaseScale  float64
	TxPortalRetxThresh     uint32
	TxPortalRetxScale      float64
	TxPortalDupAckThresh   uint32
	TxPortalDupAckScale    float64

	RxBufferSize uint32
	RetxStartMs  uint32
	RetxScale    float64
	RetxAddMs    uint32

	MaxCloseWait        time.Duration
	GetCircuitTimeout   time.Duration
	CircuitStartTimeout time.Duration
	ConnectTimeout      time.Duration
}

Options contains common Xgress configuration options

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data OptionsData) (*Options, error)

func (Options) String

func (options Options) String() string

type OptionsData

type OptionsData map[interface{}]interface{}

type Originator

type Originator int32
const (
	Initiator  Originator = 0
	Terminator Originator = 1
)

func (Originator) Invert

func (o Originator) Invert() Originator

func (Originator) String

func (o Originator) String() string

type Payload

type Payload struct {
	CircuitId string
	Flags     uint32
	RTT       uint16
	Sequence  int32
	Headers   map[uint8][]byte
	Data      []byte
	// contains filtered or unexported fields
}

func UnmarshallPayload

func UnmarshallPayload(msg *channel.Message) (*Payload, error)

func (*Payload) GetCircuitId

func (payload *Payload) GetCircuitId() string

func (*Payload) GetFlags

func (payload *Payload) GetFlags() uint32

func (*Payload) GetLoggerFields

func (payload *Payload) GetLoggerFields() logrus.Fields

func (*Payload) GetOriginator

func (payload *Payload) GetOriginator() Originator

func (*Payload) GetSequence

func (payload *Payload) GetSequence() int32

func (*Payload) IsCircuitEndFlagSet

func (payload *Payload) IsCircuitEndFlagSet() bool

func (*Payload) IsCircuitStartFlagSet

func (payload *Payload) IsCircuitStartFlagSet() bool

func (*Payload) IsRetransmitFlagSet

func (payload *Payload) IsRetransmitFlagSet() bool

func (*Payload) MarkAsRetransmit

func (payload *Payload) MarkAsRetransmit()

func (*Payload) Marshall

func (payload *Payload) Marshall() *channel.Message

type PayloadIngester

type PayloadIngester struct {
	// contains filtered or unexported fields
}

func NewPayloadIngester

func NewPayloadIngester(closeNotify <-chan struct{}) *PayloadIngester

type PayloadTransformer

type PayloadTransformer struct {
}

func (PayloadTransformer) Rx

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel)

func (PayloadTransformer) Tx

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel)

type PayloadType

type PayloadType byte
const (
	PayloadTypeXg  PayloadType = 1
	PayloadTypeRtx PayloadType = 2
	PayloadTypeFwd PayloadType = 3
)

type PeekHandler

type PeekHandler interface {
	Rx(x *Xgress, payload *Payload)
	Tx(x *Xgress, payload *Payload)
	Close(x *Xgress)
}

PeekHandler allows registering watcher to react to data flowing an xgress instance

type RecvBufferDetail

type RecvBufferDetail struct {
	Size           uint32 `json:"size"`
	PayloadCount   uint32 `json:"payloadCount"`
	LastSizeSent   uint32 `json:"lastSizeSent"`
	Sequence       int32  `json:"sequence"`
	MaxSequence    int32  `json:"maxSequence"`
	NextPayload    string `json:"nextPayload"`
	AcquiredSafely bool   `json:"acquiredSafely"`
}

type Retransmitter

type Retransmitter struct {
	// contains filtered or unexported fields
}

func NewRetransmitter

func NewRetransmitter(faultReporter RetransmitterFaultReporter, metrics metrics.Registry, closeNotify <-chan struct{}) *Retransmitter

type RetransmitterFaultReporter

type RetransmitterFaultReporter interface {
	ReportForwardingFault(circuitId string, ctrlId string)
}

type SendBufferDetail

type SendBufferDetail struct {
	WindowSize            uint32  `json:"windowSize"`
	LinkSendBufferSize    uint32  `json:"linkSendBufferSize"`
	LinkRecvBufferSize    uint32  `json:"linkRecvBufferSize"`
	Accumulator           uint32  `json:"accumulator"`
	SuccessfulAcks        uint32  `json:"successfulAcks"`
	DuplicateAcks         uint32  `json:"duplicateAcks"`
	Retransmits           uint32  `json:"retransmits"`
	Closed                bool    `json:"closed"`
	BlockedByLocalWindow  bool    `json:"blockedByLocalWindow"`
	BlockedByRemoteWindow bool    `json:"blockedByRemoteWindow"`
	RetxScale             float64 `json:"retxScale"`
	RetxThreshold         uint32  `json:"retxThreshold"`
	TimeSinceLastRetx     string  `json:"timeSinceLastRetx"`
	CloseWhenEmpty        bool    `json:"closeWhenEmpty"`
	AcquiredSafely        bool    `json:"acquiredSafely"`
}

type Xgress

type Xgress struct {
	Options *Options
	// contains filtered or unexported fields
}

func NewXgress

func NewXgress(circuitId string, ctrlId string, address Address, peer Connection, originator Originator, options *Options, tags map[string]string) *Xgress

func (*Xgress) AddCloseHandler

func (self *Xgress) AddCloseHandler(closeHandler CloseHandler)

func (*Xgress) AddPeekHandler

func (self *Xgress) AddPeekHandler(peekHandler PeekHandler)

func (*Xgress) Address

func (self *Xgress) Address() Address

func (*Xgress) CircuitId

func (self *Xgress) CircuitId() string

func (*Xgress) Close

func (self *Xgress) Close()

Things which can trigger close

1. Read fails 2. Write fails 3. End of Circuit received 4. Unroute received

func (*Xgress) CloseSendBuffer

func (self *Xgress) CloseSendBuffer()

func (*Xgress) CloseTimeout

func (self *Xgress) CloseTimeout(duration time.Duration)

func (*Xgress) Closed

func (self *Xgress) Closed() bool

func (*Xgress) CtrlId

func (self *Xgress) CtrlId() string

func (*Xgress) ForwardEndOfCircuit

func (self *Xgress) ForwardEndOfCircuit(sendF func(payload *Payload) bool)

func (*Xgress) GetEndCircuit

func (self *Xgress) GetEndCircuit() *Payload

func (*Xgress) GetInspectDetail

func (self *Xgress) GetInspectDetail(includeGoroutines bool) *InspectDetail

func (*Xgress) GetIntervalId

func (self *Xgress) GetIntervalId() string

func (*Xgress) GetSequence

func (self *Xgress) GetSequence() uint64

func (*Xgress) GetStartCircuit

func (self *Xgress) GetStartCircuit() *Payload

func (*Xgress) GetTags

func (self *Xgress) GetTags() map[string]string
func (self *Xgress) GetTimeOfLastRxFromLink() int64

func (*Xgress) HandleControlReceive

func (self *Xgress) HandleControlReceive(controlType ControlType, headers channel.Headers)

func (*Xgress) InspectCircuit

func (self *Xgress) InspectCircuit(detail *CircuitInspectDetail)

func (*Xgress) IsCircuitStarted

func (self *Xgress) IsCircuitStarted() bool

func (*Xgress) IsEndOfCircuitReceived

func (self *Xgress) IsEndOfCircuitReceived() bool

func (*Xgress) IsEndOfCircuitSent

func (self *Xgress) IsEndOfCircuitSent() bool

func (*Xgress) IsTerminator

func (self *Xgress) IsTerminator() bool

func (*Xgress) Label

func (self *Xgress) Label() string

func (*Xgress) Originator

func (self *Xgress) Originator() Originator

func (*Xgress) PayloadReceived

func (self *Xgress) PayloadReceived(payload *Payload)

func (*Xgress) SendAcknowledgement

func (self *Xgress) SendAcknowledgement(acknowledgement *Acknowledgement) error

func (*Xgress) SendControl

func (self *Xgress) SendControl(control *Control) error

func (*Xgress) SendEmptyAck

func (self *Xgress) SendEmptyAck()

func (*Xgress) SendPayload

func (self *Xgress) SendPayload(payload *Payload, _ time.Duration, _ PayloadType) error

func (*Xgress) SetDataPlaneAdapter

func (self *Xgress) SetDataPlaneAdapter(dataPlaneAdapter DataPlaneAdapter)

func (*Xgress) Start

func (self *Xgress) Start()

func (*Xgress) Unrouted

func (self *Xgress) Unrouted()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL