xgress

package
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2025 License: Apache-2.0 Imports: 30 Imported by: 27

Documentation

Index

Constants

View Source
const (
	MinHeaderKey = 2000
	MaxHeaderKey = MinHeaderKey + int32(math.MaxUint8)

	HeaderKeyCircuitId      = 2256
	HeaderKeySequence       = 2257
	HeaderKeyFlags          = 2258
	HeaderKeyRecvBufferSize = 2259
	HeaderKeyRTT            = 2260
	HeaderPayloadRaw        = 2261

	ContentTypePayloadType         = 1100
	ContentTypeAcknowledgementType = 1101
	ContentTypeControlType         = 1102
)
View Source
const (
	ControlHopCount  = 20
	ControlHopType   = 21
	ControlHopId     = 22
	ControlTimestamp = 23
	ControlUserVal   = 24
	ControlError     = 25
)
View Source
const (
	VersionMask        byte = 0b00000110
	TerminatorFlagMask byte = 0b00001000
	RttFlagMask        byte = 0b00010000
	ChunkFlagMask      byte = 0b00100000
	HeadersFlagMask    byte = 0b01000000
	HeartbeatFlagMask  byte = 0b10000000

	CircuitIdSizeMask     byte = 0b00001111
	PayloadProtocolV1     byte = 1
	PayloadProtocolOffset byte = 1
)
View Source
const DECODER = "data"
View Source
const (
	HeaderKeyUUID = 0
)

Variables

View Source
var ContentTypeValue = map[string]int32{
	"PayloadType":         ContentTypePayloadType,
	"AcknowledgementType": ContentTypeAcknowledgementType,
	"ControlType":         ContentTypeControlType,
}
View Source
var ErrPeerClosed = errors.New("peer closed")
View Source
var ErrWriteClosed = errors.New("write closed")

Functions

func DecodePayload

func DecodePayload(payload *Payload) ([]byte, bool)

func RespondToTraceRequest

func RespondToTraceRequest(headers channel.Headers, hopType, hopId string, response ControlReceiver)

func SetOriginatorFlag

func SetOriginatorFlag(flags uint32, originator Originator) uint32

func UnmarshallPacketPayload

func UnmarshallPacketPayload(buf []byte) (*channel.Message, error)

Types

type AckSender

type AckSender interface {
	SendAck(ack *Acknowledgement, address Address)
}

type Acknowledgement

type Acknowledgement struct {
	CircuitId      string
	Flags          uint32
	RecvBufferSize uint32
	RTT            uint16
	Sequence       []int32
}

func NewAcknowledgement

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement

func UnmarshallAcknowledgement

func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error)

func (*Acknowledgement) GetCircuitId

func (ack *Acknowledgement) GetCircuitId() string

func (*Acknowledgement) GetFlags

func (ack *Acknowledgement) GetFlags() uint32

func (*Acknowledgement) GetLoggerFields

func (ack *Acknowledgement) GetLoggerFields() logrus.Fields

func (*Acknowledgement) GetOriginator

func (ack *Acknowledgement) GetOriginator() Originator

func (*Acknowledgement) GetSequence

func (ack *Acknowledgement) GetSequence() []int32

func (*Acknowledgement) Marshall

func (ack *Acknowledgement) Marshall() *channel.Message

type Address

type Address string

type BindHandler

type BindHandler interface {
	HandleXgressBind(x *Xgress)
}

BindHandler is an interface invoked to install the appropriate handlers.

type CircuitDetail

type CircuitDetail struct {
	CircuitId  string `json:"circuitId"`
	ConnId     uint32 `json:"connId"`
	Address    string `json:"address"`
	Originator string `json:"originator"`
	IsXgress   bool   `json:"isXgress"`
	CtrlId     string `json:"ctrlId"`
}

type CircuitInspectDetail

type CircuitInspectDetail struct {
	CircuitId       string                    `json:"circuitId"`
	Forwards        map[string]string         `json:"forwards"`
	XgressDetails   map[string]*InspectDetail `json:"xgressDetails"`
	RelatedEntities map[string]map[string]any `json:"relatedEntities"`
	Errors          []string                  `json:"errors"`
	// contains filtered or unexported fields
}

func (*CircuitInspectDetail) AddError

func (self *CircuitInspectDetail) AddError(err error)

func (*CircuitInspectDetail) AddRelatedEntity

func (self *CircuitInspectDetail) AddRelatedEntity(entityType string, id string, detail any)

func (*CircuitInspectDetail) AddXgressDetail

func (self *CircuitInspectDetail) AddXgressDetail(xgressDetail *InspectDetail)

func (*CircuitInspectDetail) IncludeGoroutines

func (self *CircuitInspectDetail) IncludeGoroutines() bool

func (*CircuitInspectDetail) SetIncludeGoroutines

func (self *CircuitInspectDetail) SetIncludeGoroutines(includeGoroutines bool)

type CircuitsDetail

type CircuitsDetail struct {
	Circuits map[string]*CircuitDetail `json:"circuits"`
}

type CloseHandler

type CloseHandler interface {
	// HandleXgressClose is invoked when the connected peer terminates the communication.
	//
	HandleXgressClose(x *Xgress)
}

CloseHandler is invoked by an xgress when the connected peer terminates the communication.

type CloseHandlerF

type CloseHandlerF func(x *Xgress)

CloseHandlerF is the function version of CloseHandler

func (CloseHandlerF) HandleXgressClose

func (self CloseHandlerF) HandleXgressClose(x *Xgress)

type Connection

type Connection interface {
	io.Closer
	LogContext() string
	ReadPayload() ([]byte, map[uint8][]byte, error)
	WritePayload([]byte, map[uint8][]byte) (int, error)

	HandleControlMsg(controlType ControlType, headers channel.Headers, responder ControlReceiver) error
}

type Control

type Control struct {
	Type      ControlType
	CircuitId string
	Headers   channel.Headers
}

func UnmarshallControl

func UnmarshallControl(msg *channel.Message) (*Control, error)

func (*Control) CreateTraceResponse

func (self *Control) CreateTraceResponse(hopType, hopId string) *Control

func (*Control) DecrementAndGetHop

func (self *Control) DecrementAndGetHop() uint32

func (*Control) GetLoggerFields

func (self *Control) GetLoggerFields() logrus.Fields

func (*Control) IsTypeTraceRoute

func (self *Control) IsTypeTraceRoute() bool

func (*Control) IsTypeTraceRouteResponse

func (self *Control) IsTypeTraceRouteResponse() bool

func (*Control) Marshall

func (self *Control) Marshall() *channel.Message

type ControlReceiver

type ControlReceiver interface {
	HandleControlReceive(controlType ControlType, headers channel.Headers)
}

type ControlType

type ControlType byte
const (
	ControlTypeTraceRoute         ControlType = 1
	ControlTypeTraceRouteResponse ControlType = 2
)

func (ControlType) String

func (self ControlType) String() string

type DataPlaneAdapter

type DataPlaneAdapter interface {
	// ForwardPayload is used to forward data payloads onto the data-plane from an xgress
	ForwardPayload(payload *Payload, x *Xgress, ctx context.Context)

	// RetransmitPayload is used to retransmit data payloads onto the data-plane from an xgress
	RetransmitPayload(srcAddr Address, payload *Payload) error

	// ForwardControlMessage is used to forward control messages onto the data-plane from an xgress
	ForwardControlMessage(control *Control, x *Xgress)

	// ForwardAcknowledgement is used to forward acks onto the data-plane from an xgress
	ForwardAcknowledgement(ack *Acknowledgement, address Address)

	Env
}

DataPlaneAdapter is invoked by an xgress whenever messages need to be sent to the data plane. Generally a DataPlaneAdapter is implemented to connect the xgress to a data plane data transmission system.

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *channel.Message) ([]byte, bool)

type Env

type Env interface {
	GetRetransmitter() *Retransmitter
	GetPayloadIngester() *PayloadIngester
	GetMetrics() Metrics
}

type Flag

type Flag uint32
const (
	PayloadFlagCircuitEnd   Flag = 1
	PayloadFlagOriginator   Flag = 2
	PayloadFlagCircuitStart Flag = 4
	PayloadFlagChunk        Flag = 8
	PayloadFlagRetransmit   Flag = 16
	PayloadFlagEOF          Flag = 32
	PayloadFlagWriteFailed  Flag = 64
)

type InspectDetail

type InspectDetail struct {
	Address               string            `json:"address"`
	Originator            string            `json:"originator"`
	TimeSinceLastLinkRx   string            `json:"timeSinceLastLinkRx"`
	SendBufferDetail      *SendBufferDetail `json:"sendBufferDetail"`
	RecvBufferDetail      *RecvBufferDetail `json:"recvBufferDetail"`
	XgressPointer         string            `json:"xgressPointer"`
	LinkSendBufferPointer string            `json:"linkSendBufferPointer"`
	Goroutines            []string          `json:"goroutines"`
	Sequence              uint64            `json:"sequence"`
	Flags                 string            `json:"flags"`
	LastSizeSent          uint32            `json:"lastSizeSent"`
}

type InvalidTerminatorError

type InvalidTerminatorError struct {
	InnerError error
}

func (InvalidTerminatorError) Error

func (e InvalidTerminatorError) Error() string

func (InvalidTerminatorError) Unwrap

func (e InvalidTerminatorError) Unwrap() error

type LinkReceiveBuffer

type LinkReceiveBuffer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewLinkReceiveBuffer

func NewLinkReceiveBuffer(txQueueSize int32) *LinkReceiveBuffer

func (*LinkReceiveBuffer) Inspect

func (buffer *LinkReceiveBuffer) Inspect() *RecvBufferDetail

func (*LinkReceiveBuffer) NextPayload added in v1.2.0

func (buffer *LinkReceiveBuffer) NextPayload(closeNotify <-chan struct{}) *Payload

func (*LinkReceiveBuffer) ReceiveUnordered

func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool

func (*LinkReceiveBuffer) Size

func (buffer *LinkReceiveBuffer) Size() uint32

type LinkSendBuffer

type LinkSendBuffer struct {
	// contains filtered or unexported fields
}

Note: if altering this struct, be sure to account for 64 bit alignment on 32 bit arm arch https://pkg.go.dev/sync/atomic#pkg-note-BUG https://github.com/golang/go/issues/36606

func NewLinkSendBuffer

func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer

func (*LinkSendBuffer) BufferPayload

func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error)

func (*LinkSendBuffer) BufferPayloadWithDeadline added in v1.2.0

func (buffer *LinkSendBuffer) BufferPayloadWithDeadline(payload *Payload, ctx context.Context) (func(), error)

func (*LinkSendBuffer) Close

func (buffer *LinkSendBuffer) Close()

func (*LinkSendBuffer) CloseWhenEmpty

func (buffer *LinkSendBuffer) CloseWhenEmpty() bool

func (*LinkSendBuffer) Inspect

func (buffer *LinkSendBuffer) Inspect() *SendBufferDetail

func (*LinkSendBuffer) IsClosed added in v1.2.0

func (buffer *LinkSendBuffer) IsClosed() bool

func (*LinkSendBuffer) ReceiveAcknowledgement

func (buffer *LinkSendBuffer) ReceiveAcknowledgement(ack *Acknowledgement)

type Metrics

type Metrics interface {
	MarkAckReceived()
	MarkPayloadDropped()
	MarkDuplicateAck()
	MarkDuplicatePayload()
	BufferBlockedByLocalWindow()
	BufferUnblockedByLocalWindow()
	BufferBlockedByRemoteWindow()
	BufferUnblockedByRemoteWindow()

	PayloadWritten(duration time.Duration)
	BufferUnblocked(duration time.Duration)

	SendPayloadBuffered(payloadSize int64)
	SendPayloadDelivered(payloadSize int64)
}

func NewMetrics

func NewMetrics(registry metrics.Registry) Metrics

type MisconfiguredTerminatorError

type MisconfiguredTerminatorError struct {
	InnerError error
}

func (MisconfiguredTerminatorError) Error

func (MisconfiguredTerminatorError) Unwrap

type Options

type Options struct {
	Mtu         int32
	RandomDrops bool
	Drop1InN    int32
	TxQueueSize int32

	TxPortalStartSize      uint32
	TxPortalMaxSize        uint32
	TxPortalMinSize        uint32
	TxPortalIncreaseThresh uint32
	TxPortalIncreaseScale  float64
	TxPortalRetxThresh     uint32
	TxPortalRetxScale      float64
	TxPortalDupAckThresh   uint32
	TxPortalDupAckScale    float64

	RxBufferSize uint32
	RetxStartMs  uint32
	RetxScale    float64
	RetxAddMs    uint32

	MaxCloseWait        time.Duration
	GetCircuitTimeout   time.Duration
	CircuitStartTimeout time.Duration
	ConnectTimeout      time.Duration
}

Options contains common Xgress configuration options

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data OptionsData) (*Options, error)

func (Options) String

func (options Options) String() string

type OptionsData

type OptionsData map[interface{}]interface{}

type Originator

type Originator int32
const (
	Initiator  Originator = 0
	Terminator Originator = 1
)

func (Originator) Invert

func (o Originator) Invert() Originator

func (Originator) String

func (o Originator) String() string

type Payload

type Payload struct {
	CircuitId string
	Flags     uint32
	RTT       uint16
	Sequence  int32
	Headers   map[uint8][]byte
	Data      []byte
	// contains filtered or unexported fields
}

func UnmarshallPayload

func UnmarshallPayload(msg *channel.Message) (*Payload, error)

func (*Payload) GetCircuitId

func (payload *Payload) GetCircuitId() string

func (*Payload) GetFlags

func (payload *Payload) GetFlags() uint32

func (*Payload) GetLoggerFields

func (payload *Payload) GetLoggerFields() logrus.Fields

func (*Payload) GetOriginator

func (payload *Payload) GetOriginator() Originator

func (*Payload) GetSequence

func (payload *Payload) GetSequence() int32

func (*Payload) IsCircuitEndFlagSet

func (payload *Payload) IsCircuitEndFlagSet() bool

func (*Payload) IsCircuitStartFlagSet

func (payload *Payload) IsCircuitStartFlagSet() bool

func (*Payload) IsFlagEOFSet added in v1.2.0

func (payload *Payload) IsFlagEOFSet() bool

func (*Payload) IsFlagWriteFailedSet added in v1.2.0

func (payload *Payload) IsFlagWriteFailedSet() bool

func (*Payload) IsRetransmitFlagSet

func (payload *Payload) IsRetransmitFlagSet() bool

func (*Payload) MarkAsRetransmit

func (payload *Payload) MarkAsRetransmit()

func (*Payload) Marshall

func (payload *Payload) Marshall() *channel.Message

type PayloadIngester

type PayloadIngester struct {
	// contains filtered or unexported fields
}

func NewPayloadIngester

func NewPayloadIngester(closeNotify <-chan struct{}) *PayloadIngester

func NewPayloadIngesterWithConfig added in v1.2.0

func NewPayloadIngesterWithConfig(maxWorkers uint32, closeNotify <-chan struct{}) *PayloadIngester

type PayloadTransformer

type PayloadTransformer struct {
}

func (PayloadTransformer) Rx

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel)

func (PayloadTransformer) Tx

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel)

type PayloadType

type PayloadType byte
const (
	PayloadTypeXg  PayloadType = 1
	PayloadTypeRtx PayloadType = 2
	PayloadTypeFwd PayloadType = 3
)

type PeekHandler

type PeekHandler interface {
	Rx(x *Xgress, payload *Payload)
	Tx(x *Xgress, payload *Payload)
	Close(x *Xgress)
}

PeekHandler allows registering watcher to react to data flowing an xgress instance

type RecvBufferDetail

type RecvBufferDetail struct {
	Size           uint32 `json:"size"`
	PayloadCount   uint32 `json:"payloadCount"`
	Sequence       int32  `json:"sequence"`
	MaxSequence    int32  `json:"maxSequence"`
	NextPayload    string `json:"nextPayload"`
	AcquiredSafely bool   `json:"acquiredSafely"`
}

type Retransmitter

type Retransmitter struct {
	// contains filtered or unexported fields
}

func NewRetransmitter

func NewRetransmitter(faultReporter RetransmitterFaultReporter, metrics metrics.Registry, closeNotify <-chan struct{}) *Retransmitter

type RetransmitterFaultReporter

type RetransmitterFaultReporter interface {
	ReportForwardingFault(circuitId string, ctrlId string)
}

type SendBufferDetail

type SendBufferDetail struct {
	WindowSize            uint32  `json:"windowSize"`
	QueuedPayloadCount    int     `json:"queuedPayloadCount"`
	LinkSendBufferSize    uint32  `json:"linkSendBufferSize"`
	LinkRecvBufferSize    uint32  `json:"linkRecvBufferSize"`
	Accumulator           uint32  `json:"accumulator"`
	SuccessfulAcks        uint32  `json:"successfulAcks"`
	DuplicateAcks         uint32  `json:"duplicateAcks"`
	Retransmits           uint32  `json:"retransmits"`
	Closed                bool    `json:"closed"`
	BlockedByLocalWindow  bool    `json:"blockedByLocalWindow"`
	BlockedByRemoteWindow bool    `json:"blockedByRemoteWindow"`
	RetxScale             float64 `json:"retxScale"`
	RetxThreshold         uint32  `json:"retxThreshold"`
	TimeSinceLastRetx     string  `json:"timeSinceLastRetx"`
	CloseWhenEmpty        bool    `json:"closeWhenEmpty"`
	AcquiredSafely        bool    `json:"acquiredSafely"`
}

type SignalConnection added in v1.2.0

type SignalConnection interface {
	Connection
	FlowFromFabricToXgressClosed()
}

type WriteAdapter added in v1.2.0

type WriteAdapter struct {
	// contains filtered or unexported fields
}

func NewWriteAdapter added in v1.2.0

func NewWriteAdapter(x *Xgress) *WriteAdapter

func (*WriteAdapter) Deadline added in v1.2.0

func (self *WriteAdapter) Deadline() (deadline time.Time, ok bool)

func (*WriteAdapter) Done added in v1.2.0

func (self *WriteAdapter) Done() <-chan struct{}

func (*WriteAdapter) Err added in v1.2.0

func (self *WriteAdapter) Err() error

func (*WriteAdapter) SetWriteDeadline added in v1.2.0

func (self *WriteAdapter) SetWriteDeadline(t time.Time) error

func (*WriteAdapter) Value added in v1.2.0

func (self *WriteAdapter) Value(any) any

func (*WriteAdapter) Write added in v1.2.0

func (self *WriteAdapter) Write(b []byte) (n int, err error)

func (*WriteAdapter) WriteToXgress added in v1.2.0

func (self *WriteAdapter) WriteToXgress(b []byte, header map[uint8][]byte) (n int, err error)

type Xgress

type Xgress struct {
	Options *Options
	// contains filtered or unexported fields
}

func NewXgress

func NewXgress(circuitId string, ctrlId string, address Address, peer Connection, originator Originator, options *Options, tags map[string]string) *Xgress

func (*Xgress) AddCloseHandler

func (self *Xgress) AddCloseHandler(closeHandler CloseHandler)

func (*Xgress) AddPeekHandler

func (self *Xgress) AddPeekHandler(peekHandler PeekHandler)

func (*Xgress) Address

func (self *Xgress) Address() Address

func (*Xgress) CircuitId

func (self *Xgress) CircuitId() string

func (*Xgress) Close

func (self *Xgress) Close()

Close should only be called once both sides of the circuit are complete.

func (*Xgress) CloseRxTimeout added in v1.2.0

func (self *Xgress) CloseRxTimeout()

func (*Xgress) CloseSendBuffer

func (self *Xgress) CloseSendBuffer()

func (*Xgress) CloseXgToClient added in v1.2.0

func (self *Xgress) CloseXgToClient()

func (*Xgress) Closed

func (self *Xgress) Closed() bool

func (*Xgress) CtrlId

func (self *Xgress) CtrlId() string

func (*Xgress) ForwardEndOfCircuit

func (self *Xgress) ForwardEndOfCircuit(sendF func(payload *Payload) bool)

func (*Xgress) GetDestinationType added in v1.1.2

func (self *Xgress) GetDestinationType() string

func (*Xgress) GetEndCircuit

func (self *Xgress) GetEndCircuit() *Payload

func (*Xgress) GetInspectDetail

func (self *Xgress) GetInspectDetail(includeGoroutines bool) *InspectDetail

func (*Xgress) GetIntervalId

func (self *Xgress) GetIntervalId() string

func (*Xgress) GetSequence

func (self *Xgress) GetSequence() uint64

func (*Xgress) GetStartCircuit

func (self *Xgress) GetStartCircuit() *Payload

func (*Xgress) GetTags

func (self *Xgress) GetTags() map[string]string
func (self *Xgress) GetTimeOfLastRxFromLink() int64

func (*Xgress) HandleControlReceive

func (self *Xgress) HandleControlReceive(controlType ControlType, headers channel.Headers)

func (*Xgress) InspectCircuit

func (self *Xgress) InspectCircuit(detail *CircuitInspectDetail)

func (*Xgress) IsCircuitStarted

func (self *Xgress) IsCircuitStarted() bool

func (*Xgress) IsClosed added in v1.2.0

func (self *Xgress) IsClosed() bool

func (*Xgress) IsEndOfCircuitReceived

func (self *Xgress) IsEndOfCircuitReceived() bool

func (*Xgress) IsEndOfCircuitSent

func (self *Xgress) IsEndOfCircuitSent() bool

func (*Xgress) IsTerminator

func (self *Xgress) IsTerminator() bool

func (*Xgress) Label

func (self *Xgress) Label() string

func (*Xgress) NewWriteAdapter added in v1.2.0

func (self *Xgress) NewWriteAdapter() *WriteAdapter

func (*Xgress) Originator

func (self *Xgress) Originator() Originator

func (*Xgress) PayloadReceived

func (self *Xgress) PayloadReceived(payload *Payload)

func (*Xgress) PeerClosed added in v1.2.0

func (self *Xgress) PeerClosed()

func (*Xgress) SendAcknowledgement

func (self *Xgress) SendAcknowledgement(acknowledgement *Acknowledgement) error

func (*Xgress) SendControl

func (self *Xgress) SendControl(control *Control) error

func (*Xgress) SendEmptyAck

func (self *Xgress) SendEmptyAck()

func (*Xgress) SendPayload

func (self *Xgress) SendPayload(payload *Payload, _ time.Duration, _ PayloadType) error

func (*Xgress) SetDataPlaneAdapter

func (self *Xgress) SetDataPlaneAdapter(dataPlaneAdapter DataPlaneAdapter)

func (*Xgress) Start

func (self *Xgress) Start()

func (*Xgress) Unrouted

func (self *Xgress) Unrouted()

func (*Xgress) Write added in v1.2.0

func (self *Xgress) Write(buffer []byte, headers map[uint8][]byte, ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL