Documentation
¶
Index ¶
- type AbstractConnection
- type AbstractConnectionManager
- type AbstractControlMessageConnection
- type AbstractEventConnection
- type Connection
- type ConnectionAllocator
- type ConnectionManager
- type ControlMessageSocket
- type EventConnection
- type EventSocket
- type ManagerConfigration
- type ManagerKind
- type SocketAllocator
- type SocketType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AbstractConnection ¶
type AbstractConnection struct {
Logger logger.Logger
Conn net.Conn
Address string
// contains filtered or unexported fields
}
func (*AbstractConnection) SetEncoder ¶
func (b *AbstractConnection) SetEncoder(encoderInstance encoder.EventEncoder)
func (*AbstractConnection) Stop ¶
func (b *AbstractConnection) Stop()
type AbstractConnectionManager ¶
type AbstractConnectionManager struct {
Logger logger.Logger
MinConnectionsNum int
MaxConnectionsNum int
RuntimeConfiguration runtime.Configuration
Configuration *ManagerConfigration
// contains filtered or unexported fields
}
func NewAbstractConnectionManager ¶
func NewAbstractConnectionManager(parentLogger logger.Logger, runtimeConfiguration runtime.Configuration, configuration *ManagerConfigration) *AbstractConnectionManager
func (*AbstractConnectionManager) SetStatus ¶
func (bc *AbstractConnectionManager) SetStatus(newStatus status.Status)
func (*AbstractConnectionManager) UpdateStatistics ¶
func (bc *AbstractConnectionManager) UpdateStatistics(durationSec float64)
type AbstractControlMessageConnection ¶
type AbstractControlMessageConnection struct {
*AbstractConnection
// contains filtered or unexported fields
}
func NewAbstractControlMessageConnection ¶
func NewAbstractControlMessageConnection(parentLogger logger.Logger, broker controlcommunication.ControlMessageBroker) *AbstractControlMessageConnection
func (*AbstractControlMessageConnection) GetBroker ¶
func (bc *AbstractControlMessageConnection) GetBroker() controlcommunication.ControlMessageBroker
func (*AbstractControlMessageConnection) RunHandler ¶
func (bc *AbstractControlMessageConnection) RunHandler()
func (*AbstractControlMessageConnection) SetBroker ¶
func (bc *AbstractControlMessageConnection) SetBroker(abstractBroker *controlcommunication.AbstractControlMessageBroker)
type AbstractEventConnection ¶
type AbstractEventConnection struct {
*AbstractConnection
// contains filtered or unexported fields
}
func NewAbstractEventConnection ¶
func NewAbstractEventConnection(parentLogger logger.Logger, connectionManager ConnectionManager) *AbstractEventConnection
func (*AbstractEventConnection) ProcessEvent ¶
func (be *AbstractEventConnection) ProcessEvent(item interface{}, functionLogger logger.Logger) (*result.BatchedResults, error)
func (*AbstractEventConnection) RunHandler ¶
func (be *AbstractEventConnection) RunHandler()
func (*AbstractEventConnection) WaitForStart ¶
func (be *AbstractEventConnection) WaitForStart()
type Connection ¶
type Connection struct {
*AbstractEventConnection
}
func NewConnection ¶
func NewConnection(parentLogger logger.Logger, connection net.Conn, connectionManager ConnectionManager) *Connection
type ConnectionAllocator ¶
type ConnectionAllocator struct {
*AbstractConnectionManager
// contains filtered or unexported fields
}
ConnectionAllocator implements AbstractConnectionManager and is responsible for managing connections between the processor and a runtime wrapper.
The connection allocation flow is as follows:
- Prepare(): Prepares everything needed before the runtime starts.
- After the runtime process has started, Start() should be called to establish all connections between the processor and the runtime.
- Only after Start() has completed, Allocate() can be called.
- At the end of the flow, before stopping the runtime process, Stop() should be called to close all connections.
func NewConnectionAllocator ¶
func NewConnectionAllocator(abstractConnectionManager *AbstractConnectionManager) *ConnectionAllocator
func (*ConnectionAllocator) Allocate ¶
func (ca *ConnectionAllocator) Allocate() (EventConnection, error)
func (*ConnectionAllocator) GetAddressesForWrapperStart ¶
func (ca *ConnectionAllocator) GetAddressesForWrapperStart() ([]string, string)
func (*ConnectionAllocator) Prepare ¶
func (ca *ConnectionAllocator) Prepare() error
func (*ConnectionAllocator) Start ¶
func (ca *ConnectionAllocator) Start() error
func (*ConnectionAllocator) Stop ¶
func (ca *ConnectionAllocator) Stop() error
type ConnectionManager ¶
type ConnectionManager interface {
// Prepare initializes resources or configurations necessary for the ConnectionManager
Prepare() error
// Start begins the operations required for the ConnectionManager to accept and manage connections
Start() error
// Stop halts the operations of the ConnectionManager
Stop() error
// Allocate provides an instance of EventConnection for handling event
Allocate() (EventConnection, error)
// GetAddressesForWrapperStart returns a list of addresses as required for starting a wrapper
GetAddressesForWrapperStart() ([]string, string)
// UpdateStatistics records performance or usage statistics based on the
// duration of an event or process, specified in seconds
UpdateStatistics(durationSec float64)
// SetStatus updates the operational status of the ConnectionManager
SetStatus(status.Status)
}
func NewConnectionManager ¶
func NewConnectionManager(parentLogger logger.Logger, runtimeConfiguration runtime.Configuration, configuration *ManagerConfigration) (ConnectionManager, error)
NewConnectionManager is a Factory function that returns a ConnectionManager based on the configuration
type ControlMessageSocket ¶
type ControlMessageSocket struct {
*AbstractControlMessageConnection
// contains filtered or unexported fields
}
func NewControlMessageSocket ¶
func NewControlMessageSocket(parentLogger logger.Logger, socketConn *socketConnection, broker controlcommunication.ControlMessageBroker) *ControlMessageSocket
type EventConnection ¶
type EventConnection interface {
// WaitForStart waits for connection and handler to be ready for event processing
WaitForStart()
// Stop stops the event connection and performs any necessary cleanup tasks
Stop()
// ProcessEvent processes a single event item, using the provided functionLogger for any logging
ProcessEvent(item interface{}, functionLogger logger.Logger) (*result.BatchedResults, error)
// RunHandler starts the main event handler loop, managing incoming responses until the connection is stopped
RunHandler()
}
type EventSocket ¶
type EventSocket struct {
*AbstractEventConnection
// contains filtered or unexported fields
}
func NewEventSocket ¶
func NewEventSocket(parentLogger logger.Logger, socketConn *socketConnection, connectionManager ConnectionManager) *EventSocket
type ManagerConfigration ¶
type ManagerConfigration struct {
Kind ManagerKind
SupportControlCommunication bool
WaitForStart bool
SocketType SocketType
GetEventEncoderFunc func(writer io.Writer) encoder.EventEncoder
Statistics runtime.Statistics
// contains filtered or unexported fields
}
func NewManagerConfigration ¶
func NewManagerConfigration(supportControlCommunication bool, waitForStart bool, socketType SocketType, getEventEncoderFunc func(writer io.Writer) encoder.EventEncoder, statistics runtime.Statistics, workerId int, mode functionconfig.TriggerWorkMode) *ManagerConfigration
type ManagerKind ¶
type ManagerKind string
const ConnectionAllocatorManagerKind ManagerKind = "connectionAllocator"
const SocketAllocatorManagerKind ManagerKind = "socketAllocator"
type SocketAllocator ¶
type SocketAllocator struct {
*AbstractConnectionManager
// contains filtered or unexported fields
}
func NewSocketAllocator ¶
func NewSocketAllocator(abstractConnectionManager *AbstractConnectionManager) *SocketAllocator
func (*SocketAllocator) Allocate ¶
func (sa *SocketAllocator) Allocate() (EventConnection, error)
func (*SocketAllocator) GetAddressesForWrapperStart ¶
func (sa *SocketAllocator) GetAddressesForWrapperStart() ([]string, string)
func (*SocketAllocator) Prepare ¶
func (sa *SocketAllocator) Prepare() error
Prepare initializes the SocketAllocator by setting up control and event sockets according to the configuration.
If SupportControlCommunication is enabled, a control communication socket is created, wrapped in a ControlMessageSocket, and integrated with the ControlMessageBroker for runtime operations.
Creates a minimum number of event sockets (MinConnectionsNum).
func (*SocketAllocator) Start ¶
func (sa *SocketAllocator) Start() error
func (*SocketAllocator) Stop ¶
func (sa *SocketAllocator) Stop() error
type SocketType ¶
type SocketType int
SocketType SocketType is type of socket to use
const ( UnixSocket SocketType = iota TCPSocket )
RPC socket types