Documentation
¶
Index ¶
- Constants
- Variables
- func DecodePayload(payload *Payload) ([]byte, bool)
- func RespondToTraceRequest(headers channel.Headers, hopType, hopId string, response ControlReceiver)
- func SetOriginatorFlag(flags uint32, originator Originator) uint32
- func UnmarshallPacketPayload(buf []byte) (*channel.Message, error)
- type AckSender
- type Acknowledgement
- func (ack *Acknowledgement) GetCircuitId() string
- func (ack *Acknowledgement) GetFlags() uint32
- func (ack *Acknowledgement) GetLoggerFields() logrus.Fields
- func (ack *Acknowledgement) GetOriginator() Originator
- func (ack *Acknowledgement) GetSequence() []int32
- func (ack *Acknowledgement) Marshall() *channel.Message
- type Address
- type BindHandler
- type CircuitDetail
- type CircuitInspectDetail
- func (self *CircuitInspectDetail) AddError(err error)
- func (self *CircuitInspectDetail) AddRelatedEntity(entityType string, id string, detail any)
- func (self *CircuitInspectDetail) AddXgressDetail(xgressDetail *InspectDetail)
- func (self *CircuitInspectDetail) IncludeGoroutines() bool
- func (self *CircuitInspectDetail) SetIncludeGoroutines(includeGoroutines bool)
- type CircuitsDetail
- type CloseHandler
- type CloseHandlerF
- type Connection
- type Control
- func (self *Control) CreateTraceResponse(hopType, hopId string) *Control
- func (self *Control) DecrementAndGetHop() uint32
- func (self *Control) GetLoggerFields() logrus.Fields
- func (self *Control) IsTypeTraceRoute() bool
- func (self *Control) IsTypeTraceRouteResponse() bool
- func (self *Control) Marshall() *channel.Message
- type ControlReceiver
- type ControlType
- type DataPlaneAdapter
- type Decoder
- type Env
- type Flag
- type InspectDetail
- type InvalidTerminatorError
- type LinkReceiveBuffer
- func (buffer *LinkReceiveBuffer) Inspect(x *Xgress) *RecvBufferDetail
- func (buffer *LinkReceiveBuffer) PeekHead() *Payload
- func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool
- func (buffer *LinkReceiveBuffer) Remove(payload *Payload)
- func (buffer *LinkReceiveBuffer) Size() uint32
- type LinkSendBuffer
- func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error)
- func (buffer *LinkSendBuffer) Close()
- func (buffer *LinkSendBuffer) CloseWhenEmpty() bool
- func (buffer *LinkSendBuffer) Inspect() *SendBufferDetail
- func (buffer *LinkSendBuffer) ReceiveAcknowledgement(ack *Acknowledgement)
- type Metrics
- type MisconfiguredTerminatorError
- type Options
- type OptionsData
- type Originator
- type Payload
- func (payload *Payload) GetCircuitId() string
- func (payload *Payload) GetFlags() uint32
- func (payload *Payload) GetLoggerFields() logrus.Fields
- func (payload *Payload) GetOriginator() Originator
- func (payload *Payload) GetSequence() int32
- func (payload *Payload) IsCircuitEndFlagSet() bool
- func (payload *Payload) IsCircuitStartFlagSet() bool
- func (payload *Payload) IsRetransmitFlagSet() bool
- func (payload *Payload) MarkAsRetransmit()
- func (payload *Payload) Marshall() *channel.Message
- type PayloadIngester
- type PayloadTransformer
- type PayloadType
- type PeekHandler
- type RecvBufferDetail
- type Retransmitter
- type RetransmitterFaultReporter
- type SendBufferDetail
- type Xgress
- func (self *Xgress) AddCloseHandler(closeHandler CloseHandler)
- func (self *Xgress) AddPeekHandler(peekHandler PeekHandler)
- func (self *Xgress) Address() Address
- func (self *Xgress) CircuitId() string
- func (self *Xgress) Close()
- func (self *Xgress) CloseSendBuffer()
- func (self *Xgress) CloseTimeout(duration time.Duration)
- func (self *Xgress) Closed() bool
- func (self *Xgress) CtrlId() string
- func (self *Xgress) ForwardEndOfCircuit(sendF func(payload *Payload) bool)
- func (self *Xgress) GetEndCircuit() *Payload
- func (self *Xgress) GetInspectDetail(includeGoroutines bool) *InspectDetail
- func (self *Xgress) GetIntervalId() string
- func (self *Xgress) GetSequence() uint64
- func (self *Xgress) GetStartCircuit() *Payload
- func (self *Xgress) GetTags() map[string]string
- func (self *Xgress) GetTimeOfLastRxFromLink() int64
- func (self *Xgress) HandleControlReceive(controlType ControlType, headers channel.Headers)
- func (self *Xgress) InspectCircuit(detail *CircuitInspectDetail)
- func (self *Xgress) IsCircuitStarted() bool
- func (self *Xgress) IsEndOfCircuitReceived() bool
- func (self *Xgress) IsEndOfCircuitSent() bool
- func (self *Xgress) IsTerminator() bool
- func (self *Xgress) Label() string
- func (self *Xgress) Originator() Originator
- func (self *Xgress) PayloadReceived(payload *Payload)
- func (self *Xgress) SendAcknowledgement(acknowledgement *Acknowledgement) error
- func (self *Xgress) SendControl(control *Control) error
- func (self *Xgress) SendEmptyAck()
- func (self *Xgress) SendPayload(payload *Payload, _ time.Duration, _ PayloadType) error
- func (self *Xgress) SetDataPlaneAdapter(dataPlaneAdapter DataPlaneAdapter)
- func (self *Xgress) Start()
- func (self *Xgress) Unrouted()
Constants ¶
const ( MinHeaderKey = 2000 MaxHeaderKey = MinHeaderKey + int32(math.MaxUint8) HeaderKeyCircuitId = 2256 HeaderKeySequence = 2257 HeaderKeyFlags = 2258 HeaderKeyRecvBufferSize = 2259 HeaderKeyRTT = 2260 HeaderPayloadRaw = 2261 ContentTypePayloadType = 1100 ContentTypeAcknowledgementType = 1101 ContentTypeControlType = 1102 )
const ( ControlHopCount = 20 ControlHopType = 21 ControlHopId = 22 ControlTimestamp = 23 ControlUserVal = 24 ControlError = 25 )
const ( VersionMask byte = 0b00000110 TerminatorFlagMask byte = 0b00001000 RttFlagMask byte = 0b00010000 ChunkFlagMask byte = 0b00100000 HeadersFlagMask byte = 0b01000000 HeartbeatFlagMask byte = 0b10000000 CircuitIdSizeMask byte = 0b00001111 PayloadProtocolV1 byte = 1 PayloadProtocolOffset byte = 1 )
const DECODER = "data"
const (
HeaderKeyUUID = 0
)
Variables ¶
var ContentTypeValue = map[string]int32{ "PayloadType": ContentTypePayloadType, "AcknowledgementType": ContentTypeAcknowledgementType, "ControlType": ContentTypeControlType, }
Functions ¶
func DecodePayload ¶
func RespondToTraceRequest ¶
func RespondToTraceRequest(headers channel.Headers, hopType, hopId string, response ControlReceiver)
func SetOriginatorFlag ¶
func SetOriginatorFlag(flags uint32, originator Originator) uint32
func UnmarshallPacketPayload ¶
Types ¶
type AckSender ¶
type AckSender interface {
SendAck(ack *Acknowledgement, address Address)
}
type Acknowledgement ¶
type Acknowledgement struct {
CircuitId string
Flags uint32
RecvBufferSize uint32
RTT uint16
Sequence []int32
}
func NewAcknowledgement ¶
func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement
func UnmarshallAcknowledgement ¶
func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error)
func (*Acknowledgement) GetCircuitId ¶
func (ack *Acknowledgement) GetCircuitId() string
func (*Acknowledgement) GetFlags ¶
func (ack *Acknowledgement) GetFlags() uint32
func (*Acknowledgement) GetLoggerFields ¶
func (ack *Acknowledgement) GetLoggerFields() logrus.Fields
func (*Acknowledgement) GetOriginator ¶
func (ack *Acknowledgement) GetOriginator() Originator
func (*Acknowledgement) GetSequence ¶
func (ack *Acknowledgement) GetSequence() []int32
func (*Acknowledgement) Marshall ¶
func (ack *Acknowledgement) Marshall() *channel.Message
type BindHandler ¶
type BindHandler interface {
HandleXgressBind(x *Xgress)
}
The BindHandlers are invoked to install the appropriate handlers.
type CircuitDetail ¶
type CircuitInspectDetail ¶
type CircuitInspectDetail struct {
CircuitId string `json:"circuitId"`
Forwards map[string]string `json:"forwards"`
XgressDetails map[string]*InspectDetail `json:"xgressDetails"`
RelatedEntities map[string]map[string]any `json:"relatedEntities"`
Errors []string `json:"errors"`
// contains filtered or unexported fields
}
func (*CircuitInspectDetail) AddError ¶
func (self *CircuitInspectDetail) AddError(err error)
func (*CircuitInspectDetail) AddRelatedEntity ¶
func (self *CircuitInspectDetail) AddRelatedEntity(entityType string, id string, detail any)
func (*CircuitInspectDetail) AddXgressDetail ¶
func (self *CircuitInspectDetail) AddXgressDetail(xgressDetail *InspectDetail)
func (*CircuitInspectDetail) IncludeGoroutines ¶
func (self *CircuitInspectDetail) IncludeGoroutines() bool
func (*CircuitInspectDetail) SetIncludeGoroutines ¶
func (self *CircuitInspectDetail) SetIncludeGoroutines(includeGoroutines bool)
type CircuitsDetail ¶
type CircuitsDetail struct {
Circuits map[string]*CircuitDetail `json:"circuits"`
}
type CloseHandler ¶
type CloseHandler interface {
// HandleXgressClose is invoked when the connected peer terminates the communication.
//
HandleXgressClose(x *Xgress)
}
CloseHandler is invoked by an xgress when the connected peer terminates the communication.
type CloseHandlerF ¶
type CloseHandlerF func(x *Xgress)
CloseHandlerF is the function version of CloseHandler
func (CloseHandlerF) HandleXgressClose ¶
func (self CloseHandlerF) HandleXgressClose(x *Xgress)
type Connection ¶
type Control ¶
type Control struct {
Type ControlType
CircuitId string
Headers channel.Headers
}
func UnmarshallControl ¶
func (*Control) CreateTraceResponse ¶
func (*Control) DecrementAndGetHop ¶
func (*Control) GetLoggerFields ¶
func (*Control) IsTypeTraceRoute ¶
func (*Control) IsTypeTraceRouteResponse ¶
type ControlReceiver ¶
type ControlReceiver interface {
HandleControlReceive(controlType ControlType, headers channel.Headers)
}
type ControlType ¶
type ControlType byte
const ( ControlTypeTraceRoute ControlType = 1 ControlTypeTraceRouteResponse ControlType = 2 )
func (ControlType) String ¶
func (self ControlType) String() string
type DataPlaneAdapter ¶
type DataPlaneAdapter interface {
// ForwardPayload is used to forward data payloads onto the data-plane from an xgress
ForwardPayload(payload *Payload, x *Xgress)
// RetransmitPayload is used to retransmit data payloads onto the data-plane from an xgress
RetransmitPayload(srcAddr Address, payload *Payload) error
// ForwardControlMessage is used to forward control messages onto the data-plane from an xgress
ForwardControlMessage(control *Control, x *Xgress)
// ForwardAcknowledgement is used to forward acks onto the data-plane from an xgress
ForwardAcknowledgement(ack *Acknowledgement, address Address)
Env
}
DataPlaneAdapter is invoked by an xgress whenever messages need to be sent to the data plane. Generally a DataPlaneAdapter is implemented to connect the xgress to a data plane data transmission system.
type Env ¶
type Env interface {
GetRetransmitter() *Retransmitter
GetPayloadIngester() *PayloadIngester
GetMetrics() Metrics
}
type InspectDetail ¶
type InspectDetail struct {
Address string `json:"address"`
Originator string `json:"originator"`
TimeSinceLastLinkRx string `json:"timeSinceLastLinkRx"`
SendBufferDetail *SendBufferDetail `json:"sendBufferDetail"`
RecvBufferDetail *RecvBufferDetail `json:"recvBufferDetail"`
XgressPointer string `json:"xgressPointer"`
LinkSendBufferPointer string `json:"linkSendBufferPointer"`
Goroutines []string `json:"goroutines"`
Sequence uint64 `json:"sequence"`
Flags string `json:"flags"`
}
type InvalidTerminatorError ¶
type InvalidTerminatorError struct {
InnerError error
}
func (InvalidTerminatorError) Error ¶
func (e InvalidTerminatorError) Error() string
func (InvalidTerminatorError) Unwrap ¶
func (e InvalidTerminatorError) Unwrap() error
type LinkReceiveBuffer ¶
type LinkReceiveBuffer struct {
// contains filtered or unexported fields
}
func NewLinkReceiveBuffer ¶
func NewLinkReceiveBuffer() *LinkReceiveBuffer
func (*LinkReceiveBuffer) Inspect ¶
func (buffer *LinkReceiveBuffer) Inspect(x *Xgress) *RecvBufferDetail
func (*LinkReceiveBuffer) PeekHead ¶
func (buffer *LinkReceiveBuffer) PeekHead() *Payload
func (*LinkReceiveBuffer) ReceiveUnordered ¶
func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool
func (*LinkReceiveBuffer) Remove ¶
func (buffer *LinkReceiveBuffer) Remove(payload *Payload)
func (*LinkReceiveBuffer) Size ¶
func (buffer *LinkReceiveBuffer) Size() uint32
type LinkSendBuffer ¶
type LinkSendBuffer struct {
// contains filtered or unexported fields
}
Note: if altering this struct, be sure to account for 64 bit alignment on 32 bit arm arch https://pkg.go.dev/sync/atomic#pkg-note-BUG https://github.com/golang/go/issues/36606
func NewLinkSendBuffer ¶
func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer
func (*LinkSendBuffer) BufferPayload ¶
func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error)
func (*LinkSendBuffer) Close ¶
func (buffer *LinkSendBuffer) Close()
func (*LinkSendBuffer) CloseWhenEmpty ¶
func (buffer *LinkSendBuffer) CloseWhenEmpty() bool
func (*LinkSendBuffer) Inspect ¶
func (buffer *LinkSendBuffer) Inspect() *SendBufferDetail
func (*LinkSendBuffer) ReceiveAcknowledgement ¶
func (buffer *LinkSendBuffer) ReceiveAcknowledgement(ack *Acknowledgement)
type Metrics ¶
type Metrics interface {
MarkAckReceived()
MarkPayloadDropped()
MarkDuplicateAck()
MarkDuplicatePayload()
BufferBlockedByLocalWindow()
BufferUnblockedByLocalWindow()
BufferBlockedByRemoteWindow()
BufferUnblockedByRemoteWindow()
PayloadWritten(duration time.Duration)
BufferUnblocked(duration time.Duration)
SendPayloadBuffered(payloadSize int64)
SendPayloadDelivered(payloadSize int64)
}
func NewMetrics ¶
type MisconfiguredTerminatorError ¶
type MisconfiguredTerminatorError struct {
InnerError error
}
func (MisconfiguredTerminatorError) Error ¶
func (e MisconfiguredTerminatorError) Error() string
func (MisconfiguredTerminatorError) Unwrap ¶
func (e MisconfiguredTerminatorError) Unwrap() error
type Options ¶
type Options struct {
Mtu int32
RandomDrops bool
Drop1InN int32
TxQueueSize int32
TxPortalStartSize uint32
TxPortalMaxSize uint32
TxPortalMinSize uint32
TxPortalIncreaseThresh uint32
TxPortalIncreaseScale float64
TxPortalRetxThresh uint32
TxPortalRetxScale float64
TxPortalDupAckThresh uint32
TxPortalDupAckScale float64
RxBufferSize uint32
RetxStartMs uint32
RetxScale float64
RetxAddMs uint32
MaxCloseWait time.Duration
GetCircuitTimeout time.Duration
CircuitStartTimeout time.Duration
ConnectTimeout time.Duration
}
Options contains common Xgress configuration options
func DefaultOptions ¶
func DefaultOptions() *Options
func LoadOptions ¶
func LoadOptions(data OptionsData) (*Options, error)
type OptionsData ¶
type OptionsData map[interface{}]interface{}
type Originator ¶
type Originator int32
const ( Initiator Originator = 0 Terminator Originator = 1 )
func (Originator) Invert ¶
func (o Originator) Invert() Originator
func (Originator) String ¶
func (o Originator) String() string
type Payload ¶
type Payload struct {
CircuitId string
Flags uint32
RTT uint16
Sequence int32
Headers map[uint8][]byte
Data []byte
// contains filtered or unexported fields
}
func UnmarshallPayload ¶
func (*Payload) GetCircuitId ¶
func (*Payload) GetLoggerFields ¶
func (*Payload) GetOriginator ¶
func (payload *Payload) GetOriginator() Originator
func (*Payload) GetSequence ¶
func (*Payload) IsCircuitEndFlagSet ¶
func (*Payload) IsCircuitStartFlagSet ¶
func (*Payload) IsRetransmitFlagSet ¶
func (*Payload) MarkAsRetransmit ¶
func (payload *Payload) MarkAsRetransmit()
type PayloadIngester ¶
type PayloadIngester struct {
// contains filtered or unexported fields
}
func NewPayloadIngester ¶
func NewPayloadIngester(closeNotify <-chan struct{}) *PayloadIngester
type PayloadTransformer ¶
type PayloadTransformer struct {
}
func (PayloadTransformer) Rx ¶
func (self PayloadTransformer) Rx(*channel.Message, channel.Channel)
func (PayloadTransformer) Tx ¶
func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel)
type PayloadType ¶
type PayloadType byte
const ( PayloadTypeXg PayloadType = 1 PayloadTypeRtx PayloadType = 2 PayloadTypeFwd PayloadType = 3 )
type PeekHandler ¶
type PeekHandler interface {
Rx(x *Xgress, payload *Payload)
Tx(x *Xgress, payload *Payload)
Close(x *Xgress)
}
PeekHandler allows registering watcher to react to data flowing an xgress instance
type RecvBufferDetail ¶
type Retransmitter ¶
type Retransmitter struct {
// contains filtered or unexported fields
}
func NewRetransmitter ¶
func NewRetransmitter(faultReporter RetransmitterFaultReporter, metrics metrics.Registry, closeNotify <-chan struct{}) *Retransmitter
type SendBufferDetail ¶
type SendBufferDetail struct {
WindowSize uint32 `json:"windowSize"`
LinkSendBufferSize uint32 `json:"linkSendBufferSize"`
LinkRecvBufferSize uint32 `json:"linkRecvBufferSize"`
Accumulator uint32 `json:"accumulator"`
SuccessfulAcks uint32 `json:"successfulAcks"`
DuplicateAcks uint32 `json:"duplicateAcks"`
Retransmits uint32 `json:"retransmits"`
Closed bool `json:"closed"`
BlockedByLocalWindow bool `json:"blockedByLocalWindow"`
BlockedByRemoteWindow bool `json:"blockedByRemoteWindow"`
RetxScale float64 `json:"retxScale"`
RetxThreshold uint32 `json:"retxThreshold"`
TimeSinceLastRetx string `json:"timeSinceLastRetx"`
CloseWhenEmpty bool `json:"closeWhenEmpty"`
AcquiredSafely bool `json:"acquiredSafely"`
}
type Xgress ¶
type Xgress struct {
Options *Options
// contains filtered or unexported fields
}
func NewXgress ¶
func NewXgress(circuitId string, ctrlId string, address Address, peer Connection, originator Originator, options *Options, tags map[string]string) *Xgress
func (*Xgress) AddCloseHandler ¶
func (self *Xgress) AddCloseHandler(closeHandler CloseHandler)
func (*Xgress) AddPeekHandler ¶
func (self *Xgress) AddPeekHandler(peekHandler PeekHandler)
func (*Xgress) Close ¶
func (self *Xgress) Close()
Things which can trigger close
1. Read fails 2. Write fails 3. End of Circuit received 4. Unroute received
func (*Xgress) CloseSendBuffer ¶
func (self *Xgress) CloseSendBuffer()
func (*Xgress) CloseTimeout ¶
func (*Xgress) ForwardEndOfCircuit ¶
func (*Xgress) GetEndCircuit ¶
func (*Xgress) GetInspectDetail ¶
func (self *Xgress) GetInspectDetail(includeGoroutines bool) *InspectDetail
func (*Xgress) GetIntervalId ¶
func (*Xgress) GetSequence ¶
func (*Xgress) GetStartCircuit ¶
func (*Xgress) GetTimeOfLastRxFromLink ¶
func (*Xgress) HandleControlReceive ¶
func (self *Xgress) HandleControlReceive(controlType ControlType, headers channel.Headers)
func (*Xgress) InspectCircuit ¶
func (self *Xgress) InspectCircuit(detail *CircuitInspectDetail)
func (*Xgress) IsCircuitStarted ¶
func (*Xgress) IsEndOfCircuitReceived ¶
func (*Xgress) IsEndOfCircuitSent ¶
func (*Xgress) IsTerminator ¶
func (*Xgress) Originator ¶
func (self *Xgress) Originator() Originator
func (*Xgress) PayloadReceived ¶
func (*Xgress) SendAcknowledgement ¶
func (self *Xgress) SendAcknowledgement(acknowledgement *Acknowledgement) error
func (*Xgress) SendControl ¶
func (*Xgress) SendEmptyAck ¶
func (self *Xgress) SendEmptyAck()
func (*Xgress) SendPayload ¶
func (*Xgress) SetDataPlaneAdapter ¶
func (self *Xgress) SetDataPlaneAdapter(dataPlaneAdapter DataPlaneAdapter)