Documentation
¶
Index ¶
- Constants
- func BuildKafkaResponseSummary(cmd *KafkaCommand, res *KafkaResult) string
- func BuildKafkaStatement(cmd *KafkaCommand) string
- type BodyBuffer
- type BodyStatus
- type Connection
- func (c *Connection) Context() *ConnectionContext
- func (c *Connection) Meta() Meta
- func (c *Connection) OnHttpRequestBody(frame []byte, endOfStream bool) error
- func (c *Connection) OnHttpRequestHeaders(endOfStream bool) error
- func (c *Connection) OnHttpResponseBody(frame []byte, endOfStream bool) error
- func (c *Connection) OnHttpResponseHeaders(endOfStream bool) error
- func (c *Connection) OnKafkaCommand(cmd *KafkaCommand) error
- func (c *Connection) OnKafkaResult(res *KafkaResult) error
- func (c *Connection) OnMySQLCommand(cmd *MySQLCommand) error
- func (c *Connection) OnMySQLResult(res *MySQLResult) error
- func (c *Connection) OnRedisCommand(cmd *RedisCommand) error
- func (c *Connection) OnRedisResult(res *RedisResult) error
- func (c *Connection) ProxyOnDone() error
- func (c *Connection) SetControlValues(values map[string]any)
- func (c *Connection) SetRequest(req *http.Request)
- func (c *Connection) SetResponse(res *http.Response)
- func (c *Connection) Teardown()
- type ConnectionAdapter
- type ConnectionContext
- type ConnectionType
- type GrpcPlugin
- type GrpcPluginInstance
- type Header
- type HeaderValue
- type Headers
- type HeadersStatus
- type HttpHeaderMap
- func (h *HttpHeaderMap) All() map[string]string
- func (h *HttpHeaderMap) Get(key string) (HeaderValue, bool)
- func (h *HttpHeaderMap) Remove(key string)
- func (h *HttpHeaderMap) Set(key, value string)
- func (h *HttpHeaderMap) StdlibHeader() http.Header
- func (h *HttpHeaderMap) Values(key string, iter func(value HeaderValue))
- type HttpPlugin
- type HttpPluginInstance
- type KafkaCommand
- type KafkaMessage
- type KafkaPlugin
- type KafkaPluginInstance
- type KafkaResult
- type KafkaStatus
- type Manager
- type ManagerOpt
- type Meta
- type MySQLCommand
- type MySQLPlugin
- type MySQLPluginInstance
- type MySQLResult
- type MySQLStatus
- type Plugin
- type PluginAccessor
- type PluginContext
- type PluginInstance
- type PluginRegistry
- type PluginType
- type RedisCommand
- type RedisPlugin
- type RedisPluginInstance
- type RedisResult
- type RedisStatus
- type Stack
- type StackDeployment
Constants ¶
const MaxKafkaSummaryBytes = 64 * 1024 // 64KB
MaxKafkaSummaryBytes is the maximum byte length of a Kafka response summary.
const (
PodLabelStack = "tap.qpoint.io/stack"
)
Variables ¶
This section is empty.
Functions ¶
func BuildKafkaResponseSummary ¶
func BuildKafkaResponseSummary(cmd *KafkaCommand, res *KafkaResult) string
BuildKafkaResponseSummary creates a summary of topics and message samples from a KafkaCommand and optional KafkaResult.
func BuildKafkaStatement ¶
func BuildKafkaStatement(cmd *KafkaCommand) string
BuildKafkaStatement creates a human-readable statement from a KafkaCommand.
Types ¶
type BodyBuffer ¶
type BodyStatus ¶
type BodyStatus int
const ( BodyStatusContinue BodyStatus = 0 BodyStatusStopIterationAndBuffer BodyStatus = 1 BodyStatusContinueAndBuffer BodyStatus = 2 )
type Connection ¶
type Connection struct {
Type ConnectionType
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection(ctx context.Context, logger *zap.Logger, requestID string, bufferSize int, connectionType ConnectionType, stack *StackDeployment, svcs *services.ServiceRegistry) *Connection
func (*Connection) Context ¶
func (c *Connection) Context() *ConnectionContext
func (*Connection) Meta ¶
func (c *Connection) Meta() Meta
func (*Connection) OnHttpRequestBody ¶
func (c *Connection) OnHttpRequestBody(frame []byte, endOfStream bool) error
request body is ready
func (*Connection) OnHttpRequestHeaders ¶
func (c *Connection) OnHttpRequestHeaders(endOfStream bool) error
request headers are ready
func (*Connection) OnHttpResponseBody ¶
func (c *Connection) OnHttpResponseBody(frame []byte, endOfStream bool) error
response body is ready
func (*Connection) OnHttpResponseHeaders ¶
func (c *Connection) OnHttpResponseHeaders(endOfStream bool) error
response headers are ready
func (*Connection) OnKafkaCommand ¶
func (c *Connection) OnKafkaCommand(cmd *KafkaCommand) error
OnKafkaCommand processes a Kafka command through the Kafka plugin stack
func (*Connection) OnKafkaResult ¶
func (c *Connection) OnKafkaResult(res *KafkaResult) error
OnKafkaResult processes a Kafka result through the Kafka plugin stack
func (*Connection) OnMySQLCommand ¶
func (c *Connection) OnMySQLCommand(cmd *MySQLCommand) error
OnMySQLCommand processes a MySQL command through the MySQL plugin stack
func (*Connection) OnMySQLResult ¶
func (c *Connection) OnMySQLResult(res *MySQLResult) error
OnMySQLResult processes a MySQL result through the MySQL plugin stack
func (*Connection) OnRedisCommand ¶
func (c *Connection) OnRedisCommand(cmd *RedisCommand) error
OnRedisCommand processes a Redis command through the Redis plugin stack
func (*Connection) OnRedisResult ¶
func (c *Connection) OnRedisResult(res *RedisResult) error
OnRedisResult processes a Redis result through the Redis plugin stack
func (*Connection) SetControlValues ¶
func (c *Connection) SetControlValues(values map[string]any)
func (*Connection) SetRequest ¶
func (c *Connection) SetRequest(req *http.Request)
set the request and request body
func (*Connection) SetResponse ¶
func (c *Connection) SetResponse(res *http.Response)
set the response and response body
type ConnectionAdapter ¶
type ConnectionAdapter interface {
SetConnection(*connection.Connection)
}
type ConnectionContext ¶
type ConnectionContext struct {
// contains filtered or unexported fields
}
func (*ConnectionContext) Context ¶
func (c *ConnectionContext) Context() context.Context
func (*ConnectionContext) GetRequestBodyBuffer ¶
func (c *ConnectionContext) GetRequestBodyBuffer() BodyBuffer
HttpPluginInstance interface implementation this is the client side of the connection that filters can use to interact with the connection
func (*ConnectionContext) GetResponseBodyBuffer ¶
func (c *ConnectionContext) GetResponseBodyBuffer() BodyBuffer
func (*ConnectionContext) Meta ¶
func (c *ConnectionContext) Meta() Meta
type ConnectionType ¶
type ConnectionType string
context type enum (http, grpc, redis, etc)
const ( ConnectionType_UNKNOWN ConnectionType = "unknown" ConnectionType_HTTP ConnectionType = "http" ConnectionType_GRPC ConnectionType = "grpc" ConnectionType_REDIS ConnectionType = "redis" ConnectionType_MYSQL ConnectionType = "mysql" ConnectionType_KAFKA ConnectionType = "kafka" )
type GrpcPlugin ¶
type GrpcPlugin interface {
Plugin // Embeds base
NewGrpcInstance(PluginContext, *services.ServiceRegistry) GrpcPluginInstance
}
GrpcPlugin is the capability interface for plugins that handle gRPC traffic. Plugins that do not implement GrpcPlugin will fall back to HttpPlugin for gRPC connections.
type GrpcPluginInstance ¶
type GrpcPluginInstance interface {
PluginInstance
RequestHeaders(requestHeaders Headers, endOfStream bool) HeadersStatus
RequestBody(frame BodyBuffer, endOfStream bool) BodyStatus
ResponseHeaders(responseHeaders Headers, endOfStream bool) HeadersStatus
ResponseBody(frame BodyBuffer, endOfStream bool) BodyStatus
}
GrpcPluginInstance handles gRPC traffic for a single connection. gRPC is transported over HTTP/2, so the method signatures mirror HttpPluginInstance, but implementations receive gRPC-enriched headers (Grpc-Status, Grpc-Status-Name, Grpc-Message).
type HeaderValue ¶
type Headers ¶
type Headers interface {
Get(key string) (HeaderValue, bool)
Values(key string, iter func(value HeaderValue))
Set(key, value string)
Remove(key string)
All() map[string]string
}
type HeadersStatus ¶
type HeadersStatus int
const ( HeadersStatusContinue HeadersStatus = 0 HeadersStatusStopIteration HeadersStatus = 1 )
type HttpHeaderMap ¶
type HttpHeaderMap struct {
// contains filtered or unexported fields
}
func NewHeaders ¶
func NewHeaders(header http.Header) *HttpHeaderMap
func (*HttpHeaderMap) All ¶
func (h *HttpHeaderMap) All() map[string]string
func (*HttpHeaderMap) Get ¶
func (h *HttpHeaderMap) Get(key string) (HeaderValue, bool)
func (*HttpHeaderMap) Remove ¶
func (h *HttpHeaderMap) Remove(key string)
func (*HttpHeaderMap) Set ¶
func (h *HttpHeaderMap) Set(key, value string)
func (*HttpHeaderMap) StdlibHeader ¶
func (h *HttpHeaderMap) StdlibHeader() http.Header
func (*HttpHeaderMap) Values ¶
func (h *HttpHeaderMap) Values(key string, iter func(value HeaderValue))
type HttpPlugin ¶
type HttpPlugin interface {
Plugin // Embeds base
NewHttpInstance(PluginContext, *services.ServiceRegistry) HttpPluginInstance
}
HttpPlugin is the capability interface for plugins that handle HTTP traffic
type HttpPluginInstance ¶
type HttpPluginInstance interface {
PluginInstance
RequestHeaders(requestHeaders Headers, endOfStream bool) HeadersStatus
RequestBody(frame BodyBuffer, endOfStream bool) BodyStatus
ResponseHeaders(responseHeaders Headers, endOfStream bool) HeadersStatus
ResponseBody(frame BodyBuffer, endOfStream bool) BodyStatus
}
HttpPluginInstance handles HTTP traffic for a single connection
type KafkaCommand ¶
type KafkaCommand struct {
ApiKey int16 // e.g., 0=Produce, 1=Fetch, 3=Metadata, 18=ApiVersions
ApiVersion int16 // Protocol version
CorrelationID int32 // Correlation ID for request/response matching
ClientID string // Client identifier
Operation string // Human-readable operation name (e.g., "Produce", "Fetch")
Topics []string // Topic names (from Produce/Fetch requests)
GroupID string // Consumer group ID (from JoinGroup/SyncGroup/OffsetCommit)
Messages []KafkaMessage // Sample messages from Produce requests
Timestamp time.Time
}
KafkaCommand represents a Kafka request received from the client
type KafkaMessage ¶
type KafkaMessage struct {
Topic string // Topic name
Partition int32 // Partition index
Key string // Message key (may be empty)
Value string // Message value (truncated at MaxValueSize)
Truncated bool // Value was truncated
}
KafkaMessage represents a sampled message from Produce/Fetch
type KafkaPlugin ¶
type KafkaPlugin interface {
Plugin // Embeds base
NewKafkaInstance(PluginContext, *services.ServiceRegistry) KafkaPluginInstance
}
KafkaPlugin is the capability interface for plugins that handle Kafka traffic
type KafkaPluginInstance ¶
type KafkaPluginInstance interface {
PluginInstance
OnKafkaCommand(cmd *KafkaCommand) KafkaStatus
OnKafkaResult(res *KafkaResult) KafkaStatus
}
KafkaPluginInstance handles Kafka traffic for a single connection
type KafkaResult ¶
type KafkaResult struct {
CorrelationID int32 // Correlation ID for request/response matching
ErrorCode int16 // Kafka error code (0 = no error)
ErrorMessage string // Human-readable error message
IsError bool // Whether this response contains an error
Messages []KafkaMessage // Sample messages from Fetch responses
Latency time.Duration // Round-trip latency from request parse to response parse
}
KafkaResult represents a Kafka response received from the server
type KafkaStatus ¶
type KafkaStatus int
KafkaStatus indicates how plugin iteration should proceed
const ( KafkaStatusContinue KafkaStatus = 0 KafkaStatusStopIteration KafkaStatus = 1 )
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewPluginManager ¶
func NewPluginManager(logger *zap.Logger, opts ...ManagerOpt) *Manager
func (*Manager) GetDomainStack ¶
GetDomainStack returns the stackID for a domain and protocol
func (*Manager) NewConnection ¶
func (m *Manager) NewConnection(ctx context.Context, connectionType ConnectionType, conn *connection.Connection, requestID string) (*Connection, error)
type ManagerOpt ¶
type ManagerOpt func(*Manager)
func AddPersistentPlugins ¶
func AddPersistentPlugins(plugins ...config.Plugin) ManagerOpt
func SetBufferSize ¶
func SetBufferSize(bufferSize int) ManagerOpt
func SetPluginRegistry ¶
func SetPluginRegistry(registry *PluginRegistry) ManagerOpt
type Meta ¶
type Meta interface {
connmeta.Service
RequestID() string
ReadBytes() int64
WriteBytes() int64
SetReadBytes(int64)
SetWriteBytes(int64)
}
Meta provides top-level-connection metadata extended with PluginContext-level metadata
type MySQLCommand ¶
type MySQLCommand struct {
Type byte // Command type (e.g., 0x03 for COM_QUERY)
Query string // SQL query text (for COM_QUERY, COM_STMT_PREPARE)
Timestamp time.Time
}
MySQLCommand represents a MySQL command received from the client
type MySQLPlugin ¶
type MySQLPlugin interface {
Plugin // Embeds base
NewMySQLInstance(PluginContext, *services.ServiceRegistry) MySQLPluginInstance
}
MySQLPlugin is the capability interface for plugins that handle MySQL traffic
type MySQLPluginInstance ¶
type MySQLPluginInstance interface {
PluginInstance
OnMySQLCommand(cmd *MySQLCommand) MySQLStatus
OnMySQLResult(res *MySQLResult) MySQLStatus
}
MySQLPluginInstance handles MySQL traffic for a single connection
type MySQLResult ¶
type MySQLResult struct {
Type string // "OK", "Error", "ResultSet", "EOF"
AffectedRows uint64
LastInsertID uint64
ErrorCode uint16
ErrorMessage string
Columns []string // Column names
Rows [][]string // Row values (string representation)
RowCount int // Total rows (may exceed len(Rows) if truncated)
Truncated bool // True if rows were capped at MaxResultSetRows
}
MySQLResult represents a MySQL result received from the server
type MySQLStatus ¶
type MySQLStatus int
MySQLStatus indicates how plugin iteration should proceed
const ( MySQLStatusContinue MySQLStatus = 0 MySQLStatusStopIteration MySQLStatus = 1 )
type Plugin ¶
type Plugin interface {
Init(logger *zap.Logger, config yaml.Node)
Destroy()
PluginType() PluginType
}
Plugin is the base interface all plugins must implement for lifecycle & config
type PluginAccessor ¶
type PluginAccessor interface {
// Get retrieves a plugin by type
Get(pluginType PluginType) Plugin
}
PluginAccessor is a type that can access the plugin registry
type PluginContext ¶
type PluginContext interface {
GetRequestBodyBuffer() BodyBuffer
GetResponseBodyBuffer() BodyBuffer
Context() context.Context
Meta() Meta
}
type PluginInstance ¶
type PluginInstance interface {
Destroy()
}
PluginInstance is the base interface for all protocol-specific plugin instances
type PluginRegistry ¶
type PluginRegistry struct {
// contains filtered or unexported fields
}
PluginRegistry holds references to active plugin instances
func NewRegistry ¶
func NewRegistry(plugins ...Plugin) *PluginRegistry
NewRegistry creates a new service registry
func (*PluginRegistry) Close ¶
func (sr *PluginRegistry) Close() error
Close closes all registered services that implement CloseableService
func (*PluginRegistry) Get ¶
func (sr *PluginRegistry) Get(pluginType PluginType) Plugin
Get retrieves a plugin by type
func (*PluginRegistry) Register ¶
func (sr *PluginRegistry) Register(svc Plugin)
Register adds or replaces a service in the registry
type PluginType ¶
type PluginType string
func (PluginType) String ¶
func (p PluginType) String() string
type RedisCommand ¶
type RedisCommand struct {
Name string // e.g., "GET", "SET"
Args []string // e.g., ["mykey"]
Raw []byte // The raw wire bytes
Timestamp time.Time
}
RedisCommand represents a Redis command received from the client
type RedisPlugin ¶
type RedisPlugin interface {
Plugin // Embeds base
NewRedisInstance(PluginContext, *services.ServiceRegistry) RedisPluginInstance
}
RedisPlugin is the capability interface for plugins that handle Redis traffic
type RedisPluginInstance ¶
type RedisPluginInstance interface {
PluginInstance
OnRedisCommand(cmd *RedisCommand) RedisStatus
OnRedisResult(res *RedisResult) RedisStatus
}
RedisPluginInstance handles Redis traffic for a single connection
type RedisResult ¶
RedisResult represents a Redis result received from the server
type RedisStatus ¶
type RedisStatus int
RedisStatus indicates how plugin iteration should proceed
const ( RedisStatusContinue RedisStatus = 0 RedisStatusStopIteration RedisStatus = 1 )
type Stack ¶
type Stack struct {
// contains filtered or unexported fields
}
Stack manages the lifecycle of a StackDeployment which contains a list of plugins.
func NewStack ¶
func NewStack(name string, logger *zap.Logger, pluginAccessor PluginAccessor) *Stack
func (*Stack) GetActiveDeployment ¶
func (s *Stack) GetActiveDeployment() *StackDeployment
func (*Stack) SetPersistentPlugins ¶
type StackDeployment ¶
type StackDeployment struct {
// contains filtered or unexported fields
}
StackDeployment manages the lifecycle of a collection of plugins and plugin instances. This is a one off deployment and does not support configuration changes.
func NewStackDeployment ¶
func NewStackDeployment(logger *zap.Logger, name string, pluginAccessor PluginAccessor) *StackDeployment
func (*StackDeployment) NewStack ¶
func (d *StackDeployment) NewStack(connType ConnectionType, ctx PluginContext, svcs *services.ServiceRegistry) []PluginInstance
NewStack creates plugin instances for the given connection type
func (*StackDeployment) SetPersistentPlugins ¶
func (d *StackDeployment) SetPersistentPlugins(plugins []config.Plugin)
func (*StackDeployment) Teardown ¶
func (d *StackDeployment) Teardown()