broker

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CoordinatorConfig

type CoordinatorConfig struct {
	CleanupInterval time.Duration
}

type GroupCoordinator

type GroupCoordinator struct {
	// contains filtered or unexported fields
}

func NewGroupCoordinator

func NewGroupCoordinator(store metadata.Store, broker protocol.MetadataBroker, cfg *CoordinatorConfig) *GroupCoordinator

func (*GroupCoordinator) DeleteGroups

func (*GroupCoordinator) DescribeGroups

func (*GroupCoordinator) FindCoordinatorResponse

func (c *GroupCoordinator) FindCoordinatorResponse(correlationID int32, errorCode int16) *protocol.FindCoordinatorResponse

func (*GroupCoordinator) Heartbeat

func (c *GroupCoordinator) Heartbeat(ctx context.Context, req *protocol.HeartbeatRequest, correlationID int32) *protocol.HeartbeatResponse

func (*GroupCoordinator) JoinGroup

func (c *GroupCoordinator) JoinGroup(ctx context.Context, req *protocol.JoinGroupRequest, correlationID int32) (*protocol.JoinGroupResponse, error)

func (*GroupCoordinator) LeaveGroup

func (*GroupCoordinator) ListGroups

func (c *GroupCoordinator) ListGroups(ctx context.Context, req *protocol.ListGroupsRequest, correlationID int32) (*protocol.ListGroupsResponse, error)

func (*GroupCoordinator) OffsetCommit

func (*GroupCoordinator) OffsetFetch

func (c *GroupCoordinator) OffsetFetch(ctx context.Context, req *protocol.OffsetFetchRequest, correlationID int32) (*protocol.OffsetFetchResponse, error)

func (*GroupCoordinator) Stop

func (c *GroupCoordinator) Stop()

Stop terminates background cleanup routines.

func (*GroupCoordinator) SyncGroup

func (c *GroupCoordinator) SyncGroup(ctx context.Context, req *protocol.SyncGroupRequest, correlationID int32) (*protocol.SyncGroupResponse, error)

type Handler

type Handler interface {
	Handle(ctx context.Context, header *protocol.RequestHeader, req protocol.Request) ([]byte, error)
}

Handler processes parsed Kafka protocol requests and returns the response payload.

type S3HealthConfig

type S3HealthConfig struct {
	Window      time.Duration
	LatencyWarn time.Duration
	LatencyCrit time.Duration
	ErrorWarn   float64
	ErrorCrit   float64
	MaxSamples  int
}

S3HealthConfig defines thresholds for transitioning between states.

type S3HealthMonitor

type S3HealthMonitor struct {
	// contains filtered or unexported fields
}

S3HealthMonitor aggregates recent S3 requests to determine a health state.

func NewS3HealthMonitor

func NewS3HealthMonitor(cfg S3HealthConfig) *S3HealthMonitor

NewS3HealthMonitor builds a health monitor with sane defaults.

func (*S3HealthMonitor) RecordOperation

func (m *S3HealthMonitor) RecordOperation(op string, latency time.Duration, err error)

RecordOperation records an arbitrary S3 operation outcome.

func (*S3HealthMonitor) RecordUpload

func (m *S3HealthMonitor) RecordUpload(latency time.Duration, err error)

RecordUpload remains for backward compatibility.

func (*S3HealthMonitor) Snapshot

func (m *S3HealthMonitor) Snapshot() S3HealthSnapshot

Snapshot returns the current state and key aggregates.

func (*S3HealthMonitor) State

func (m *S3HealthMonitor) State() S3HealthState

State returns just the current health state.

type S3HealthSnapshot

type S3HealthSnapshot struct {
	State      S3HealthState
	Since      time.Time
	AvgLatency time.Duration
	ErrorRate  float64
}

S3HealthSnapshot captures the monitor's public metrics.

type S3HealthState

type S3HealthState string

S3HealthState models the broker's view of S3 availability.

const (
	S3StateHealthy     S3HealthState = "healthy"
	S3StateDegraded    S3HealthState = "degraded"
	S3StateUnavailable S3HealthState = "unavailable"
)

type Server

type Server struct {
	Addr    string
	Handler Handler
	// contains filtered or unexported fields
}

Server implements minimal Kafka TCP handling for milestone 1.

func (*Server) ListenAddress

func (s *Server) ListenAddress() string

ListenAddress returns the actual listener address if the server has started.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context) error

ListenAndServe starts accepting Kafka protocol connections.

func (*Server) Wait

func (s *Server) Wait()

Wait blocks until all connection goroutines exit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL