broker

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithConnInfo added in v1.5.0

func ContextWithConnInfo(ctx context.Context, info *ConnContext) context.Context

ContextWithConnInfo attaches connection info to a context.

Types

type ConnContext added in v1.5.0

type ConnContext struct {
	Principal  string
	RemoteAddr string
	ProxyAddr  string
}

ConnContext carries connection-scoped identity data for auth decisions.

func ConnInfoFromContext added in v1.5.0

func ConnInfoFromContext(ctx context.Context) *ConnContext

ConnInfoFromContext returns connection info if present.

type ConnContextFunc added in v1.5.0

type ConnContextFunc func(conn net.Conn) (net.Conn, *ConnContext, error)

ConnContextFunc can wrap a connection and attach connection-scoped context data.

type CoordinatorConfig

type CoordinatorConfig struct {
	CleanupInterval time.Duration
}

type GroupCoordinator

type GroupCoordinator struct {
	// contains filtered or unexported fields
}

func NewGroupCoordinator

func NewGroupCoordinator(store metadata.Store, broker protocol.MetadataBroker, cfg *CoordinatorConfig) *GroupCoordinator

func (*GroupCoordinator) DeleteGroups

func (*GroupCoordinator) DescribeGroups

func (*GroupCoordinator) FindCoordinatorResponse

func (c *GroupCoordinator) FindCoordinatorResponse(correlationID int32, errorCode int16) *protocol.FindCoordinatorResponse

func (*GroupCoordinator) Heartbeat

func (c *GroupCoordinator) Heartbeat(ctx context.Context, req *protocol.HeartbeatRequest, correlationID int32) *protocol.HeartbeatResponse

func (*GroupCoordinator) JoinGroup

func (c *GroupCoordinator) JoinGroup(ctx context.Context, req *protocol.JoinGroupRequest, correlationID int32) (*protocol.JoinGroupResponse, error)

func (*GroupCoordinator) LeaveGroup

func (*GroupCoordinator) ListGroups

func (c *GroupCoordinator) ListGroups(ctx context.Context, req *protocol.ListGroupsRequest, correlationID int32) (*protocol.ListGroupsResponse, error)

func (*GroupCoordinator) OffsetCommit

func (*GroupCoordinator) OffsetFetch

func (c *GroupCoordinator) OffsetFetch(ctx context.Context, req *protocol.OffsetFetchRequest, correlationID int32) (*protocol.OffsetFetchResponse, error)

func (*GroupCoordinator) Stop

func (c *GroupCoordinator) Stop()

Stop terminates background cleanup routines.

func (*GroupCoordinator) SyncGroup

func (c *GroupCoordinator) SyncGroup(ctx context.Context, req *protocol.SyncGroupRequest, correlationID int32) (*protocol.SyncGroupResponse, error)

type Handler

type Handler interface {
	Handle(ctx context.Context, header *protocol.RequestHeader, req protocol.Request) ([]byte, error)
}

Handler processes parsed Kafka protocol requests and returns the response payload.

type ProxyInfo added in v1.5.0

type ProxyInfo struct {
	SourceAddr string
	DestAddr   string
	SourceIP   string
	DestIP     string
	SourcePort int
	DestPort   int
	Local      bool
}

ProxyInfo captures parsed proxy protocol metadata.

func ReadProxyProtocol added in v1.5.0

func ReadProxyProtocol(conn net.Conn) (net.Conn, *ProxyInfo, error)

ReadProxyProtocol consumes a PROXY protocol header (v1 or v2) if present. It returns a wrapped connection that preserves buffered bytes.

type S3HealthConfig

type S3HealthConfig struct {
	Window      time.Duration
	LatencyWarn time.Duration
	LatencyCrit time.Duration
	ErrorWarn   float64
	ErrorCrit   float64
	MaxSamples  int
}

S3HealthConfig defines thresholds for transitioning between states.

type S3HealthMonitor

type S3HealthMonitor struct {
	// contains filtered or unexported fields
}

S3HealthMonitor aggregates recent S3 requests to determine a health state.

func NewS3HealthMonitor

func NewS3HealthMonitor(cfg S3HealthConfig) *S3HealthMonitor

NewS3HealthMonitor builds a health monitor with sane defaults.

func (*S3HealthMonitor) RecordOperation

func (m *S3HealthMonitor) RecordOperation(op string, latency time.Duration, err error)

RecordOperation records an arbitrary S3 operation outcome.

func (*S3HealthMonitor) RecordUpload

func (m *S3HealthMonitor) RecordUpload(latency time.Duration, err error)

RecordUpload remains for backward compatibility.

func (*S3HealthMonitor) Snapshot

func (m *S3HealthMonitor) Snapshot() S3HealthSnapshot

Snapshot returns the current state and key aggregates.

func (*S3HealthMonitor) State

func (m *S3HealthMonitor) State() S3HealthState

State returns just the current health state.

type S3HealthSnapshot

type S3HealthSnapshot struct {
	State      S3HealthState
	Since      time.Time
	AvgLatency time.Duration
	ErrorRate  float64
}

S3HealthSnapshot captures the monitor's public metrics.

type S3HealthState

type S3HealthState string

S3HealthState models the broker's view of S3 availability.

const (
	S3StateHealthy     S3HealthState = "healthy"
	S3StateDegraded    S3HealthState = "degraded"
	S3StateUnavailable S3HealthState = "unavailable"
)

type Server

type Server struct {
	Addr            string
	Handler         Handler
	ConnContextFunc ConnContextFunc
	// contains filtered or unexported fields
}

Server implements minimal Kafka TCP handling for milestone 1.

func (*Server) ListenAddress

func (s *Server) ListenAddress() string

ListenAddress returns the actual listener address if the server has started.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context) error

ListenAndServe starts accepting Kafka protocol connections.

func (*Server) Wait

func (s *Server) Wait()

Wait blocks until all connection goroutines exit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL