connection

package
v1.14.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AbstractConnection

type AbstractConnection struct {
	Logger logger.Logger

	Conn    net.Conn
	Address string
	// contains filtered or unexported fields
}

func (*AbstractConnection) SetEncoder

func (b *AbstractConnection) SetEncoder(encoderInstance encoder.EventEncoder)

func (*AbstractConnection) Stop

func (b *AbstractConnection) Stop()

type AbstractConnectionManager

type AbstractConnectionManager struct {
	Logger logger.Logger

	MinConnectionsNum int
	MaxConnectionsNum int

	RuntimeConfiguration runtime.Configuration
	Configuration        *ManagerConfigration
	// contains filtered or unexported fields
}

func NewAbstractConnectionManager

func NewAbstractConnectionManager(parentLogger logger.Logger, runtimeConfiguration runtime.Configuration, configuration *ManagerConfigration) *AbstractConnectionManager

func (*AbstractConnectionManager) SetStatus

func (bc *AbstractConnectionManager) SetStatus(newStatus status.Status)

func (*AbstractConnectionManager) UpdateStatistics

func (bc *AbstractConnectionManager) UpdateStatistics(durationSec float64)

type AbstractControlMessageConnection

type AbstractControlMessageConnection struct {
	*AbstractConnection
	// contains filtered or unexported fields
}

func (*AbstractControlMessageConnection) GetBroker

func (*AbstractControlMessageConnection) RunHandler

func (bc *AbstractControlMessageConnection) RunHandler()

func (*AbstractControlMessageConnection) SetBroker

type AbstractEventConnection

type AbstractEventConnection struct {
	*AbstractConnection
	// contains filtered or unexported fields
}

func NewAbstractEventConnection

func NewAbstractEventConnection(parentLogger logger.Logger, connectionManager ConnectionManager) *AbstractEventConnection

func (*AbstractEventConnection) ProcessEvent

func (be *AbstractEventConnection) ProcessEvent(item interface{}, functionLogger logger.Logger) (*result.BatchedResults, error)

func (*AbstractEventConnection) RunHandler

func (be *AbstractEventConnection) RunHandler()

func (*AbstractEventConnection) WaitForStart

func (be *AbstractEventConnection) WaitForStart()

type Connection

type Connection struct {
	*AbstractEventConnection
}

func NewConnection

func NewConnection(parentLogger logger.Logger, connection net.Conn, connectionManager ConnectionManager) *Connection

type ConnectionAllocator

type ConnectionAllocator struct {
	*AbstractConnectionManager
	// contains filtered or unexported fields
}

ConnectionAllocator implements AbstractConnectionManager and is responsible for managing connections between the processor and a runtime wrapper.

The connection allocation flow is as follows:

  • Prepare(): Prepares everything needed before the runtime starts.
  • After the runtime process has started, Start() should be called to establish all connections between the processor and the runtime.
  • Only after Start() has completed, Allocate() can be called.
  • At the end of the flow, before stopping the runtime process, Stop() should be called to close all connections.

func NewConnectionAllocator

func NewConnectionAllocator(abstractConnectionManager *AbstractConnectionManager) *ConnectionAllocator

func (*ConnectionAllocator) Allocate

func (ca *ConnectionAllocator) Allocate() (EventConnection, error)

func (*ConnectionAllocator) GetAddressesForWrapperStart

func (ca *ConnectionAllocator) GetAddressesForWrapperStart() ([]string, string)

func (*ConnectionAllocator) Prepare

func (ca *ConnectionAllocator) Prepare() error

func (*ConnectionAllocator) Start

func (ca *ConnectionAllocator) Start() error

func (*ConnectionAllocator) Stop

func (ca *ConnectionAllocator) Stop() error

type ConnectionManager

type ConnectionManager interface {

	// Prepare initializes resources or configurations necessary for the ConnectionManager
	Prepare() error

	// Start begins the operations required for the ConnectionManager to accept and manage connections
	Start() error

	// Stop halts the operations of the ConnectionManager
	Stop() error

	// Allocate provides an instance of EventConnection for handling event
	Allocate() (EventConnection, error)

	// GetAddressesForWrapperStart returns a list of addresses as required for starting a wrapper
	GetAddressesForWrapperStart() ([]string, string)

	// UpdateStatistics records performance or usage statistics based on the
	// duration of an event or process, specified in seconds
	UpdateStatistics(durationSec float64)

	// SetStatus updates the operational status of the ConnectionManager
	SetStatus(status.Status)
}

func NewConnectionManager

func NewConnectionManager(parentLogger logger.Logger, runtimeConfiguration runtime.Configuration, configuration *ManagerConfigration) (ConnectionManager, error)

NewConnectionManager is a Factory function that returns a ConnectionManager based on the configuration

type ControlMessageSocket

type ControlMessageSocket struct {
	*AbstractControlMessageConnection
	// contains filtered or unexported fields
}

func NewControlMessageSocket

func NewControlMessageSocket(parentLogger logger.Logger, socketConn *socketConnection, broker controlcommunication.ControlMessageBroker) *ControlMessageSocket

type EventConnection

type EventConnection interface {
	// WaitForStart waits for connection and handler to be ready for event processing
	WaitForStart()

	// Stop stops the event connection and performs any necessary cleanup tasks
	Stop()

	// ProcessEvent processes a single event item, using the provided functionLogger for any logging
	ProcessEvent(item interface{}, functionLogger logger.Logger) (*result.BatchedResults, error)

	// RunHandler starts the main event handler loop, managing incoming responses until the connection is stopped
	RunHandler()
}

type EventSocket

type EventSocket struct {
	*AbstractEventConnection
	// contains filtered or unexported fields
}

func NewEventSocket

func NewEventSocket(parentLogger logger.Logger, socketConn *socketConnection, connectionManager ConnectionManager) *EventSocket

type ManagerConfigration

type ManagerConfigration struct {
	Kind                        ManagerKind
	SupportControlCommunication bool
	WaitForStart                bool
	SocketType                  SocketType
	GetEventEncoderFunc         func(writer io.Writer) encoder.EventEncoder
	Statistics                  runtime.Statistics
	// contains filtered or unexported fields
}

func NewManagerConfigration

func NewManagerConfigration(supportControlCommunication bool, waitForStart bool, socketType SocketType, getEventEncoderFunc func(writer io.Writer) encoder.EventEncoder, statistics runtime.Statistics, workerId int, mode functionconfig.TriggerWorkMode) *ManagerConfigration

type ManagerKind

type ManagerKind string
const ConnectionAllocatorManagerKind ManagerKind = "connectionAllocator"
const SocketAllocatorManagerKind ManagerKind = "socketAllocator"

type SocketAllocator

type SocketAllocator struct {
	*AbstractConnectionManager
	// contains filtered or unexported fields
}

func NewSocketAllocator

func NewSocketAllocator(abstractConnectionManager *AbstractConnectionManager) *SocketAllocator

func (*SocketAllocator) Allocate

func (sa *SocketAllocator) Allocate() (EventConnection, error)

func (*SocketAllocator) GetAddressesForWrapperStart

func (sa *SocketAllocator) GetAddressesForWrapperStart() ([]string, string)

func (*SocketAllocator) Prepare

func (sa *SocketAllocator) Prepare() error

Prepare initializes the SocketAllocator by setting up control and event sockets according to the configuration.

If SupportControlCommunication is enabled, a control communication socket is created, wrapped in a ControlMessageSocket, and integrated with the ControlMessageBroker for runtime operations.

Creates a minimum number of event sockets (MinConnectionsNum).

func (*SocketAllocator) Start

func (sa *SocketAllocator) Start() error

func (*SocketAllocator) Stop

func (sa *SocketAllocator) Stop() error

type SocketType

type SocketType int

SocketType SocketType is type of socket to use

const (
	UnixSocket SocketType = iota
	TCPSocket
)

RPC socket types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL