Documentation
¶
Index ¶
- type CoordinatorConfig
- type GroupCoordinator
- func (c *GroupCoordinator) DeleteGroups(ctx context.Context, req *protocol.DeleteGroupsRequest, correlationID int32) (*protocol.DeleteGroupsResponse, error)
- func (c *GroupCoordinator) DescribeGroups(ctx context.Context, req *protocol.DescribeGroupsRequest, correlationID int32) (*protocol.DescribeGroupsResponse, error)
- func (c *GroupCoordinator) FindCoordinatorResponse(correlationID int32, errorCode int16) *protocol.FindCoordinatorResponse
- func (c *GroupCoordinator) Heartbeat(ctx context.Context, req *protocol.HeartbeatRequest, correlationID int32) *protocol.HeartbeatResponse
- func (c *GroupCoordinator) JoinGroup(ctx context.Context, req *protocol.JoinGroupRequest, correlationID int32) (*protocol.JoinGroupResponse, error)
- func (c *GroupCoordinator) LeaveGroup(ctx context.Context, req *protocol.LeaveGroupRequest, correlationID int32) *protocol.LeaveGroupResponse
- func (c *GroupCoordinator) ListGroups(ctx context.Context, req *protocol.ListGroupsRequest, correlationID int32) (*protocol.ListGroupsResponse, error)
- func (c *GroupCoordinator) OffsetCommit(ctx context.Context, req *protocol.OffsetCommitRequest, correlationID int32) (*protocol.OffsetCommitResponse, error)
- func (c *GroupCoordinator) OffsetFetch(ctx context.Context, req *protocol.OffsetFetchRequest, correlationID int32) (*protocol.OffsetFetchResponse, error)
- func (c *GroupCoordinator) Stop()
- func (c *GroupCoordinator) SyncGroup(ctx context.Context, req *protocol.SyncGroupRequest, correlationID int32) (*protocol.SyncGroupResponse, error)
- type Handler
- type S3HealthConfig
- type S3HealthMonitor
- type S3HealthSnapshot
- type S3HealthState
- type Server
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CoordinatorConfig ¶
type GroupCoordinator ¶
type GroupCoordinator struct {
// contains filtered or unexported fields
}
func NewGroupCoordinator ¶
func NewGroupCoordinator(store metadata.Store, broker protocol.MetadataBroker, cfg *CoordinatorConfig) *GroupCoordinator
func (*GroupCoordinator) DeleteGroups ¶
func (c *GroupCoordinator) DeleteGroups(ctx context.Context, req *protocol.DeleteGroupsRequest, correlationID int32) (*protocol.DeleteGroupsResponse, error)
func (*GroupCoordinator) DescribeGroups ¶
func (c *GroupCoordinator) DescribeGroups(ctx context.Context, req *protocol.DescribeGroupsRequest, correlationID int32) (*protocol.DescribeGroupsResponse, error)
func (*GroupCoordinator) FindCoordinatorResponse ¶
func (c *GroupCoordinator) FindCoordinatorResponse(correlationID int32, errorCode int16) *protocol.FindCoordinatorResponse
func (*GroupCoordinator) Heartbeat ¶
func (c *GroupCoordinator) Heartbeat(ctx context.Context, req *protocol.HeartbeatRequest, correlationID int32) *protocol.HeartbeatResponse
func (*GroupCoordinator) JoinGroup ¶
func (c *GroupCoordinator) JoinGroup(ctx context.Context, req *protocol.JoinGroupRequest, correlationID int32) (*protocol.JoinGroupResponse, error)
func (*GroupCoordinator) LeaveGroup ¶
func (c *GroupCoordinator) LeaveGroup(ctx context.Context, req *protocol.LeaveGroupRequest, correlationID int32) *protocol.LeaveGroupResponse
func (*GroupCoordinator) ListGroups ¶
func (c *GroupCoordinator) ListGroups(ctx context.Context, req *protocol.ListGroupsRequest, correlationID int32) (*protocol.ListGroupsResponse, error)
func (*GroupCoordinator) OffsetCommit ¶
func (c *GroupCoordinator) OffsetCommit(ctx context.Context, req *protocol.OffsetCommitRequest, correlationID int32) (*protocol.OffsetCommitResponse, error)
func (*GroupCoordinator) OffsetFetch ¶
func (c *GroupCoordinator) OffsetFetch(ctx context.Context, req *protocol.OffsetFetchRequest, correlationID int32) (*protocol.OffsetFetchResponse, error)
func (*GroupCoordinator) Stop ¶
func (c *GroupCoordinator) Stop()
Stop terminates background cleanup routines.
func (*GroupCoordinator) SyncGroup ¶
func (c *GroupCoordinator) SyncGroup(ctx context.Context, req *protocol.SyncGroupRequest, correlationID int32) (*protocol.SyncGroupResponse, error)
type Handler ¶
type Handler interface {
Handle(ctx context.Context, header *protocol.RequestHeader, req protocol.Request) ([]byte, error)
}
Handler processes parsed Kafka protocol requests and returns the response payload.
type S3HealthConfig ¶
type S3HealthConfig struct {
Window time.Duration
LatencyWarn time.Duration
LatencyCrit time.Duration
ErrorWarn float64
ErrorCrit float64
MaxSamples int
}
S3HealthConfig defines thresholds for transitioning between states.
type S3HealthMonitor ¶
type S3HealthMonitor struct {
// contains filtered or unexported fields
}
S3HealthMonitor aggregates recent S3 requests to determine a health state.
func NewS3HealthMonitor ¶
func NewS3HealthMonitor(cfg S3HealthConfig) *S3HealthMonitor
NewS3HealthMonitor builds a health monitor with sane defaults.
func (*S3HealthMonitor) RecordOperation ¶
func (m *S3HealthMonitor) RecordOperation(op string, latency time.Duration, err error)
RecordOperation records an arbitrary S3 operation outcome.
func (*S3HealthMonitor) RecordUpload ¶
func (m *S3HealthMonitor) RecordUpload(latency time.Duration, err error)
RecordUpload remains for backward compatibility.
func (*S3HealthMonitor) Snapshot ¶
func (m *S3HealthMonitor) Snapshot() S3HealthSnapshot
Snapshot returns the current state and key aggregates.
func (*S3HealthMonitor) State ¶
func (m *S3HealthMonitor) State() S3HealthState
State returns just the current health state.
type S3HealthSnapshot ¶
type S3HealthSnapshot struct {
State S3HealthState
Since time.Time
AvgLatency time.Duration
ErrorRate float64
}
S3HealthSnapshot captures the monitor's public metrics.
type S3HealthState ¶
type S3HealthState string
S3HealthState models the broker's view of S3 availability.
const ( S3StateHealthy S3HealthState = "healthy" S3StateDegraded S3HealthState = "degraded" )
type Server ¶
Server implements minimal Kafka TCP handling for milestone 1.
func (*Server) ListenAddress ¶
ListenAddress returns the actual listener address if the server has started.
func (*Server) ListenAndServe ¶
ListenAndServe starts accepting Kafka protocol connections.