plugins

package
v0.0.0-...-47a35ba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const MaxKafkaSummaryBytes = 64 * 1024 // 64KB

MaxKafkaSummaryBytes is the maximum byte length of a Kafka response summary.

View Source
const (
	PodLabelStack = "tap.qpoint.io/stack"
)

Variables

This section is empty.

Functions

func BuildKafkaResponseSummary

func BuildKafkaResponseSummary(cmd *KafkaCommand, res *KafkaResult) string

BuildKafkaResponseSummary creates a summary of topics and message samples from a KafkaCommand and optional KafkaResult.

func BuildKafkaStatement

func BuildKafkaStatement(cmd *KafkaCommand) string

BuildKafkaStatement creates a human-readable statement from a KafkaCommand.

Types

type BodyBuffer

type BodyBuffer interface {
	io.ReaderAt
	Length() int
	Slices(iter func(view []byte))
	Copy() []byte
	NewReader() io.Reader
}

type BodyStatus

type BodyStatus int
const (
	BodyStatusContinue               BodyStatus = 0
	BodyStatusStopIterationAndBuffer BodyStatus = 1
	BodyStatusContinueAndBuffer      BodyStatus = 2
)

type Connection

type Connection struct {
	Type ConnectionType
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(ctx context.Context, logger *zap.Logger, requestID string, bufferSize int, connectionType ConnectionType, stack *StackDeployment, svcs *services.ServiceRegistry) *Connection

func (*Connection) Context

func (c *Connection) Context() *ConnectionContext

func (*Connection) Meta

func (c *Connection) Meta() Meta

func (*Connection) OnHttpRequestBody

func (c *Connection) OnHttpRequestBody(frame []byte, endOfStream bool) error

request body is ready

func (*Connection) OnHttpRequestHeaders

func (c *Connection) OnHttpRequestHeaders(endOfStream bool) error

request headers are ready

func (*Connection) OnHttpResponseBody

func (c *Connection) OnHttpResponseBody(frame []byte, endOfStream bool) error

response body is ready

func (*Connection) OnHttpResponseHeaders

func (c *Connection) OnHttpResponseHeaders(endOfStream bool) error

response headers are ready

func (*Connection) OnKafkaCommand

func (c *Connection) OnKafkaCommand(cmd *KafkaCommand) error

OnKafkaCommand processes a Kafka command through the Kafka plugin stack

func (*Connection) OnKafkaResult

func (c *Connection) OnKafkaResult(res *KafkaResult) error

OnKafkaResult processes a Kafka result through the Kafka plugin stack

func (*Connection) OnMySQLCommand

func (c *Connection) OnMySQLCommand(cmd *MySQLCommand) error

OnMySQLCommand processes a MySQL command through the MySQL plugin stack

func (*Connection) OnMySQLResult

func (c *Connection) OnMySQLResult(res *MySQLResult) error

OnMySQLResult processes a MySQL result through the MySQL plugin stack

func (*Connection) OnRedisCommand

func (c *Connection) OnRedisCommand(cmd *RedisCommand) error

OnRedisCommand processes a Redis command through the Redis plugin stack

func (*Connection) OnRedisResult

func (c *Connection) OnRedisResult(res *RedisResult) error

OnRedisResult processes a Redis result through the Redis plugin stack

func (*Connection) ProxyOnDone

func (c *Connection) ProxyOnDone() error

session is done

func (*Connection) SetControlValues

func (c *Connection) SetControlValues(values map[string]any)

func (*Connection) SetRequest

func (c *Connection) SetRequest(req *http.Request)

set the request and request body

func (*Connection) SetResponse

func (c *Connection) SetResponse(res *http.Response)

set the response and response body

func (*Connection) Teardown

func (c *Connection) Teardown()

teardown the connection

type ConnectionAdapter

type ConnectionAdapter interface {
	SetConnection(*connection.Connection)
}

type ConnectionContext

type ConnectionContext struct {
	// contains filtered or unexported fields
}

func (*ConnectionContext) Context

func (c *ConnectionContext) Context() context.Context

func (*ConnectionContext) GetRequestBodyBuffer

func (c *ConnectionContext) GetRequestBodyBuffer() BodyBuffer

HttpPluginInstance interface implementation this is the client side of the connection that filters can use to interact with the connection

func (*ConnectionContext) GetResponseBodyBuffer

func (c *ConnectionContext) GetResponseBodyBuffer() BodyBuffer

func (*ConnectionContext) Meta

func (c *ConnectionContext) Meta() Meta

type ConnectionType

type ConnectionType string

context type enum (http, grpc, redis, etc)

const (
	ConnectionType_UNKNOWN ConnectionType = "unknown"
	ConnectionType_HTTP    ConnectionType = "http"
	ConnectionType_GRPC    ConnectionType = "grpc"
	ConnectionType_REDIS   ConnectionType = "redis"
	ConnectionType_MYSQL   ConnectionType = "mysql"
	ConnectionType_KAFKA   ConnectionType = "kafka"
)

type GrpcPlugin

type GrpcPlugin interface {
	Plugin // Embeds base
	NewGrpcInstance(PluginContext, *services.ServiceRegistry) GrpcPluginInstance
}

GrpcPlugin is the capability interface for plugins that handle gRPC traffic. Plugins that do not implement GrpcPlugin will fall back to HttpPlugin for gRPC connections.

type GrpcPluginInstance

type GrpcPluginInstance interface {
	PluginInstance
	RequestHeaders(requestHeaders Headers, endOfStream bool) HeadersStatus
	RequestBody(frame BodyBuffer, endOfStream bool) BodyStatus
	ResponseHeaders(responseHeaders Headers, endOfStream bool) HeadersStatus
	ResponseBody(frame BodyBuffer, endOfStream bool) BodyStatus
}

GrpcPluginInstance handles gRPC traffic for a single connection. gRPC is transported over HTTP/2, so the method signatures mirror HttpPluginInstance, but implementations receive gRPC-enriched headers (Grpc-Status, Grpc-Status-Name, Grpc-Message).

type Header []byte

func NewHeaderValue

func NewHeaderValue(str string) Header

func (Header) Bytes

func (h Header) Bytes() []byte

Bytes implements HeaderValue.

func (Header) Equal

func (h Header) Equal(str string) bool

Equal implements HeaderValue.

func (Header) String

func (h Header) String() string

String implements HeaderValue.

type HeaderValue

type HeaderValue interface {
	String() string
	Bytes() []byte
	Equal(str string) bool
}

type Headers

type Headers interface {
	Get(key string) (HeaderValue, bool)
	Values(key string, iter func(value HeaderValue))
	Set(key, value string)
	Remove(key string)
	All() map[string]string
}

type HeadersStatus

type HeadersStatus int
const (
	HeadersStatusContinue      HeadersStatus = 0
	HeadersStatusStopIteration HeadersStatus = 1
)

type HttpHeaderMap

type HttpHeaderMap struct {
	// contains filtered or unexported fields
}

func NewHeaders

func NewHeaders(header http.Header) *HttpHeaderMap

func (*HttpHeaderMap) All

func (h *HttpHeaderMap) All() map[string]string

func (*HttpHeaderMap) Get

func (h *HttpHeaderMap) Get(key string) (HeaderValue, bool)

func (*HttpHeaderMap) Remove

func (h *HttpHeaderMap) Remove(key string)

func (*HttpHeaderMap) Set

func (h *HttpHeaderMap) Set(key, value string)

func (*HttpHeaderMap) StdlibHeader

func (h *HttpHeaderMap) StdlibHeader() http.Header

func (*HttpHeaderMap) Values

func (h *HttpHeaderMap) Values(key string, iter func(value HeaderValue))

type HttpPlugin

type HttpPlugin interface {
	Plugin // Embeds base
	NewHttpInstance(PluginContext, *services.ServiceRegistry) HttpPluginInstance
}

HttpPlugin is the capability interface for plugins that handle HTTP traffic

type HttpPluginInstance

type HttpPluginInstance interface {
	PluginInstance
	RequestHeaders(requestHeaders Headers, endOfStream bool) HeadersStatus
	RequestBody(frame BodyBuffer, endOfStream bool) BodyStatus
	ResponseHeaders(responseHeaders Headers, endOfStream bool) HeadersStatus
	ResponseBody(frame BodyBuffer, endOfStream bool) BodyStatus
}

HttpPluginInstance handles HTTP traffic for a single connection

type KafkaCommand

type KafkaCommand struct {
	ApiKey        int16          // e.g., 0=Produce, 1=Fetch, 3=Metadata, 18=ApiVersions
	ApiVersion    int16          // Protocol version
	CorrelationID int32          // Correlation ID for request/response matching
	ClientID      string         // Client identifier
	Operation     string         // Human-readable operation name (e.g., "Produce", "Fetch")
	Topics        []string       // Topic names (from Produce/Fetch requests)
	GroupID       string         // Consumer group ID (from JoinGroup/SyncGroup/OffsetCommit)
	Messages      []KafkaMessage // Sample messages from Produce requests
	Timestamp     time.Time
}

KafkaCommand represents a Kafka request received from the client

type KafkaMessage

type KafkaMessage struct {
	Topic     string // Topic name
	Partition int32  // Partition index
	Key       string // Message key (may be empty)
	Value     string // Message value (truncated at MaxValueSize)
	Truncated bool   // Value was truncated
}

KafkaMessage represents a sampled message from Produce/Fetch

type KafkaPlugin

type KafkaPlugin interface {
	Plugin // Embeds base
	NewKafkaInstance(PluginContext, *services.ServiceRegistry) KafkaPluginInstance
}

KafkaPlugin is the capability interface for plugins that handle Kafka traffic

type KafkaPluginInstance

type KafkaPluginInstance interface {
	PluginInstance
	OnKafkaCommand(cmd *KafkaCommand) KafkaStatus
	OnKafkaResult(res *KafkaResult) KafkaStatus
}

KafkaPluginInstance handles Kafka traffic for a single connection

type KafkaResult

type KafkaResult struct {
	CorrelationID int32          // Correlation ID for request/response matching
	ErrorCode     int16          // Kafka error code (0 = no error)
	ErrorMessage  string         // Human-readable error message
	IsError       bool           // Whether this response contains an error
	Messages      []KafkaMessage // Sample messages from Fetch responses
	Latency       time.Duration  // Round-trip latency from request parse to response parse
}

KafkaResult represents a Kafka response received from the server

type KafkaStatus

type KafkaStatus int

KafkaStatus indicates how plugin iteration should proceed

const (
	KafkaStatusContinue      KafkaStatus = 0
	KafkaStatusStopIteration KafkaStatus = 1
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewPluginManager

func NewPluginManager(logger *zap.Logger, opts ...ManagerOpt) *Manager

func (*Manager) GetDomainStack

func (m *Manager) GetDomainStack(domain, protocol string) (string, bool)

GetDomainStack returns the stackID for a domain and protocol

func (*Manager) NewConnection

func (m *Manager) NewConnection(ctx context.Context, connectionType ConnectionType, conn *connection.Connection, requestID string) (*Connection, error)

func (*Manager) SetConfig

func (m *Manager) SetConfig(conf *config.Config)

func (*Manager) Start

func (m *Manager) Start() error

func (*Manager) Stop

func (m *Manager) Stop()

type ManagerOpt

type ManagerOpt func(*Manager)

func AddPersistentPlugins

func AddPersistentPlugins(plugins ...config.Plugin) ManagerOpt

func SetBufferSize

func SetBufferSize(bufferSize int) ManagerOpt

func SetPluginRegistry

func SetPluginRegistry(registry *PluginRegistry) ManagerOpt

type Meta

type Meta interface {
	connmeta.Service
	RequestID() string

	ReadBytes() int64
	WriteBytes() int64
	SetReadBytes(int64)
	SetWriteBytes(int64)
}

Meta provides top-level-connection metadata extended with PluginContext-level metadata

type MySQLCommand

type MySQLCommand struct {
	Type      byte   // Command type (e.g., 0x03 for COM_QUERY)
	Query     string // SQL query text (for COM_QUERY, COM_STMT_PREPARE)
	Timestamp time.Time
}

MySQLCommand represents a MySQL command received from the client

type MySQLPlugin

type MySQLPlugin interface {
	Plugin // Embeds base
	NewMySQLInstance(PluginContext, *services.ServiceRegistry) MySQLPluginInstance
}

MySQLPlugin is the capability interface for plugins that handle MySQL traffic

type MySQLPluginInstance

type MySQLPluginInstance interface {
	PluginInstance
	OnMySQLCommand(cmd *MySQLCommand) MySQLStatus
	OnMySQLResult(res *MySQLResult) MySQLStatus
}

MySQLPluginInstance handles MySQL traffic for a single connection

type MySQLResult

type MySQLResult struct {
	Type         string // "OK", "Error", "ResultSet", "EOF"
	AffectedRows uint64
	LastInsertID uint64
	ErrorCode    uint16
	ErrorMessage string
	Columns      []string   // Column names
	Rows         [][]string // Row values (string representation)
	RowCount     int        // Total rows (may exceed len(Rows) if truncated)
	Truncated    bool       // True if rows were capped at MaxResultSetRows
}

MySQLResult represents a MySQL result received from the server

type MySQLStatus

type MySQLStatus int

MySQLStatus indicates how plugin iteration should proceed

const (
	MySQLStatusContinue      MySQLStatus = 0
	MySQLStatusStopIteration MySQLStatus = 1
)

type Plugin

type Plugin interface {
	Init(logger *zap.Logger, config yaml.Node)
	Destroy()
	PluginType() PluginType
}

Plugin is the base interface all plugins must implement for lifecycle & config

type PluginAccessor

type PluginAccessor interface {
	// Get retrieves a plugin by type
	Get(pluginType PluginType) Plugin
}

PluginAccessor is a type that can access the plugin registry

type PluginContext

type PluginContext interface {
	GetRequestBodyBuffer() BodyBuffer
	GetResponseBodyBuffer() BodyBuffer

	Context() context.Context
	Meta() Meta
}

type PluginInstance

type PluginInstance interface {
	Destroy()
}

PluginInstance is the base interface for all protocol-specific plugin instances

type PluginRegistry

type PluginRegistry struct {
	// contains filtered or unexported fields
}

PluginRegistry holds references to active plugin instances

func NewRegistry

func NewRegistry(plugins ...Plugin) *PluginRegistry

NewRegistry creates a new service registry

func (*PluginRegistry) Close

func (sr *PluginRegistry) Close() error

Close closes all registered services that implement CloseableService

func (*PluginRegistry) Get

func (sr *PluginRegistry) Get(pluginType PluginType) Plugin

Get retrieves a plugin by type

func (*PluginRegistry) Register

func (sr *PluginRegistry) Register(svc Plugin)

Register adds or replaces a service in the registry

type PluginType

type PluginType string

func (PluginType) String

func (p PluginType) String() string

type RedisCommand

type RedisCommand struct {
	Name      string   // e.g., "GET", "SET"
	Args      []string // e.g., ["mykey"]
	Raw       []byte   // The raw wire bytes
	Timestamp time.Time
}

RedisCommand represents a Redis command received from the client

type RedisPlugin

type RedisPlugin interface {
	Plugin // Embeds base
	NewRedisInstance(PluginContext, *services.ServiceRegistry) RedisPluginInstance
}

RedisPlugin is the capability interface for plugins that handle Redis traffic

type RedisPluginInstance

type RedisPluginInstance interface {
	PluginInstance
	OnRedisCommand(cmd *RedisCommand) RedisStatus
	OnRedisResult(res *RedisResult) RedisStatus
}

RedisPluginInstance handles Redis traffic for a single connection

type RedisResult

type RedisResult struct {
	Type    string // e.g., "Integer", "BulkString"
	Value   any
	IsError bool
}

RedisResult represents a Redis result received from the server

type RedisStatus

type RedisStatus int

RedisStatus indicates how plugin iteration should proceed

const (
	RedisStatusContinue      RedisStatus = 0
	RedisStatusStopIteration RedisStatus = 1
)

type Stack

type Stack struct {
	// contains filtered or unexported fields
}

Stack manages the lifecycle of a StackDeployment which contains a list of plugins.

func NewStack

func NewStack(name string, logger *zap.Logger, pluginAccessor PluginAccessor) *Stack

func (*Stack) GetActiveDeployment

func (s *Stack) GetActiveDeployment() *StackDeployment

func (*Stack) SetConfig

func (s *Stack) SetConfig(conf *config.Stack) error

func (*Stack) SetPersistentPlugins

func (s *Stack) SetPersistentPlugins(plugins []config.Plugin)

func (*Stack) Teardown

func (s *Stack) Teardown()

type StackDeployment

type StackDeployment struct {
	// contains filtered or unexported fields
}

StackDeployment manages the lifecycle of a collection of plugins and plugin instances. This is a one off deployment and does not support configuration changes.

func NewStackDeployment

func NewStackDeployment(logger *zap.Logger, name string, pluginAccessor PluginAccessor) *StackDeployment

func (*StackDeployment) NewStack

NewStack creates plugin instances for the given connection type

func (*StackDeployment) SetPersistentPlugins

func (d *StackDeployment) SetPersistentPlugins(plugins []config.Plugin)

func (*StackDeployment) Setup

func (d *StackDeployment) Setup(conf *config.Stack) error

func (*StackDeployment) Teardown

func (d *StackDeployment) Teardown()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL