Documentation
¶
Overview ¶
Package tocpb provides protobuf bindings and conversion between proto wire types and the canonical Go types in toc/core.
Conversion functions bridge the proto presence model (optional fields) and the Go presence model (ObservationMask bitmask, HasIdleRatio/HasBlockedRatio bools).
FromProto functions return (T, error) and validate invariants. ToProto functions are infallible — the Go types are assumed valid.
Index ¶
- Variables
- func DiagnosisFromProto(pb *Diagnosis) (core.Diagnosis, error)
- func ObservationFromProto(pb *StageObservation) (core.StageObservation, error)
- type DecodedBatch
- type Diagnosis
- func (*Diagnosis) Descriptor() ([]byte, []int)deprecated
- func (x *Diagnosis) GetConfidence() float64
- func (x *Diagnosis) GetConstraint() string
- func (x *Diagnosis) GetStages() []*StageDiagnosis
- func (x *Diagnosis) GetStarvationCount() int64
- func (*Diagnosis) ProtoMessage()
- func (x *Diagnosis) ProtoReflect() protoreflect.Message
- func (x *Diagnosis) Reset()
- func (x *Diagnosis) String() string
- type ObservationBatch
- func (*ObservationBatch) Descriptor() ([]byte, []int)deprecated
- func (x *ObservationBatch) GetObservations() []*StageObservation
- func (x *ObservationBatch) GetPipelineId() string
- func (x *ObservationBatch) GetTimestampUnixNano() int64
- func (x *ObservationBatch) GetWindowDurationNano() int64
- func (*ObservationBatch) ProtoMessage()
- func (x *ObservationBatch) ProtoReflect() protoreflect.Message
- func (x *ObservationBatch) Reset()
- func (x *ObservationBatch) String() string
- type StageDiagnosis
- func (*StageDiagnosis) Descriptor() ([]byte, []int)deprecated
- func (x *StageDiagnosis) GetArrivals() int64
- func (x *StageDiagnosis) GetBlockedRatio() float64
- func (x *StageDiagnosis) GetCompletions() int64
- func (x *StageDiagnosis) GetErrorRate() float64
- func (x *StageDiagnosis) GetFailures() int64
- func (x *StageDiagnosis) GetIdleRatio() float64
- func (x *StageDiagnosis) GetQueueGrowth() int64
- func (x *StageDiagnosis) GetStage() string
- func (x *StageDiagnosis) GetState() StageState
- func (x *StageDiagnosis) GetUtilization() float64
- func (*StageDiagnosis) ProtoMessage()
- func (x *StageDiagnosis) ProtoReflect() protoreflect.Message
- func (x *StageDiagnosis) Reset()
- func (x *StageDiagnosis) String() string
- type StageObservation
- func (*StageObservation) Descriptor() ([]byte, []int)deprecated
- func (x *StageObservation) GetArrivals() int64
- func (x *StageObservation) GetBlockedWork() uint64
- func (x *StageObservation) GetBusyWork() uint64
- func (x *StageObservation) GetCapacityWork() uint64
- func (x *StageObservation) GetCompletions() int64
- func (x *StageObservation) GetFailures() int64
- func (x *StageObservation) GetIdleWork() uint64
- func (x *StageObservation) GetQueueDepth() int64
- func (x *StageObservation) GetStage() string
- func (x *StageObservation) GetWorkers() int32
- func (*StageObservation) ProtoMessage()
- func (x *StageObservation) ProtoReflect() protoreflect.Message
- func (x *StageObservation) Reset()
- func (x *StageObservation) String() string
- type StageState
Constants ¶
This section is empty.
Variables ¶
var ( StageState_name = map[int32]string{ 0: "STAGE_STATE_UNSPECIFIED", 1: "STAGE_STATE_HEALTHY", 2: "STAGE_STATE_STARVED", 3: "STAGE_STATE_BLOCKED", 4: "STAGE_STATE_SATURATED", 5: "STAGE_STATE_BROKEN", } StageState_value = map[string]int32{ "STAGE_STATE_UNSPECIFIED": 0, "STAGE_STATE_HEALTHY": 1, "STAGE_STATE_STARVED": 2, "STAGE_STATE_BLOCKED": 3, "STAGE_STATE_SATURATED": 4, "STAGE_STATE_BROKEN": 5, } )
Enum value maps for StageState.
var File_toc_v1_toc_proto protoreflect.FileDescriptor
Functions ¶
func DiagnosisFromProto ¶
DiagnosisFromProto converts a proto Diagnosis to core form. Returns zero value and error if the message violates invariants. Validates unique stage names and constraint membership.
func ObservationFromProto ¶
func ObservationFromProto(pb *StageObservation) (core.StageObservation, error)
ObservationFromProto converts a proto StageObservation to core form. Returns zero value and error if the message violates invariants.
Types ¶
type DecodedBatch ¶
type DecodedBatch struct {
PipelineID string
TimestampUnixNano int64
WindowDurationNano int64
Observations []core.StageObservation
}
DecodedBatch holds the validated contents of an ObservationBatch.
func BatchFromProto ¶
func BatchFromProto(pb *ObservationBatch) (DecodedBatch, error)
BatchFromProto converts a proto ObservationBatch to a DecodedBatch. Validates batch-level invariants: non-empty pipeline_id, positive window_duration, unique stage names, and each observation individually.
type Diagnosis ¶
type Diagnosis struct {
Constraint string `protobuf:"bytes,1,opt,name=constraint,proto3" json:"constraint,omitempty"`
Confidence float64 `protobuf:"fixed64,2,opt,name=confidence,proto3" json:"confidence,omitempty"`
Stages []*StageDiagnosis `protobuf:"bytes,3,rep,name=stages,proto3" json:"stages,omitempty"`
StarvationCount int64 `protobuf:"varint,4,opt,name=starvation_count,json=starvationCount,proto3" json:"starvation_count,omitempty"`
// contains filtered or unexported fields
}
Diagnosis is the output of one analyzer step.
func DiagnosisToProto ¶
DiagnosisToProto converts a core.Diagnosis to its proto form.
func (*Diagnosis) Descriptor
deprecated
func (*Diagnosis) GetConfidence ¶
func (*Diagnosis) GetConstraint ¶
func (*Diagnosis) GetStages ¶
func (x *Diagnosis) GetStages() []*StageDiagnosis
func (*Diagnosis) GetStarvationCount ¶
func (*Diagnosis) ProtoMessage ¶
func (*Diagnosis) ProtoMessage()
func (*Diagnosis) ProtoReflect ¶
func (x *Diagnosis) ProtoReflect() protoreflect.Message
type ObservationBatch ¶
type ObservationBatch struct {
PipelineId string `protobuf:"bytes,1,opt,name=pipeline_id,json=pipelineId,proto3" json:"pipeline_id,omitempty"`
TimestampUnixNano int64 `protobuf:"varint,2,opt,name=timestamp_unix_nano,json=timestampUnixNano,proto3" json:"timestamp_unix_nano,omitempty"` // window end, UTC nanoseconds
WindowDurationNano int64 `protobuf:"varint,3,opt,name=window_duration_nano,json=windowDurationNano,proto3" json:"window_duration_nano,omitempty"` // must be > 0
Observations []*StageObservation `protobuf:"bytes,4,rep,name=observations,proto3" json:"observations,omitempty"`
// contains filtered or unexported fields
}
ObservationBatch wraps a set of observations for one analysis window. This is the wire message for NATS transport.
All observations in a batch MUST refer to the same analysis window. The batch is produced by a single aggregator — there is no cross-host assembly. Stage names MUST be unique within a batch.
timestamp_unix_nano is the window end time (when the aggregator sampled). window_duration_nano is the length of the analysis window. Together they define [end - duration, end).
func BatchToProto ¶
func BatchToProto(pipelineID string, timestampUnixNano, windowDurationNano int64, observations []core.StageObservation) *ObservationBatch
BatchToProto converts a slice of core.StageObservation values to a proto ObservationBatch with the given metadata.
func (*ObservationBatch) Descriptor
deprecated
func (*ObservationBatch) Descriptor() ([]byte, []int)
Deprecated: Use ObservationBatch.ProtoReflect.Descriptor instead.
func (*ObservationBatch) GetObservations ¶
func (x *ObservationBatch) GetObservations() []*StageObservation
func (*ObservationBatch) GetPipelineId ¶
func (x *ObservationBatch) GetPipelineId() string
func (*ObservationBatch) GetTimestampUnixNano ¶
func (x *ObservationBatch) GetTimestampUnixNano() int64
func (*ObservationBatch) GetWindowDurationNano ¶
func (x *ObservationBatch) GetWindowDurationNano() int64
func (*ObservationBatch) ProtoMessage ¶
func (*ObservationBatch) ProtoMessage()
func (*ObservationBatch) ProtoReflect ¶
func (x *ObservationBatch) ProtoReflect() protoreflect.Message
func (*ObservationBatch) Reset ¶
func (x *ObservationBatch) Reset()
func (*ObservationBatch) String ¶
func (x *ObservationBatch) String() string
type StageDiagnosis ¶
type StageDiagnosis struct {
Stage string `protobuf:"bytes,1,opt,name=stage,proto3" json:"stage,omitempty"`
State StageState `protobuf:"varint,2,opt,name=state,proto3,enum=toc.v1.StageState" json:"state,omitempty"`
Utilization float64 `protobuf:"fixed64,3,opt,name=utilization,proto3" json:"utilization,omitempty"`
// Optional ratios — absent when the source observation lacked
// the corresponding work data.
IdleRatio *float64 `protobuf:"fixed64,4,opt,name=idle_ratio,json=idleRatio,proto3,oneof" json:"idle_ratio,omitempty"`
BlockedRatio *float64 `protobuf:"fixed64,5,opt,name=blocked_ratio,json=blockedRatio,proto3,oneof" json:"blocked_ratio,omitempty"`
ErrorRate float64 `protobuf:"fixed64,6,opt,name=error_rate,json=errorRate,proto3" json:"error_rate,omitempty"`
QueueGrowth int64 `protobuf:"varint,7,opt,name=queue_growth,json=queueGrowth,proto3" json:"queue_growth,omitempty"`
// Passthrough counts for consumer rate computation.
Completions int64 `protobuf:"varint,8,opt,name=completions,proto3" json:"completions,omitempty"`
Failures int64 `protobuf:"varint,9,opt,name=failures,proto3" json:"failures,omitempty"`
Arrivals int64 `protobuf:"varint,10,opt,name=arrivals,proto3" json:"arrivals,omitempty"`
// contains filtered or unexported fields
}
StageDiagnosis holds the classification for one stage. Contains ratios and counts, NOT rates. The caller converts to rates using their own time model (wall-clock or ticks).
func (*StageDiagnosis) Descriptor
deprecated
func (*StageDiagnosis) Descriptor() ([]byte, []int)
Deprecated: Use StageDiagnosis.ProtoReflect.Descriptor instead.
func (*StageDiagnosis) GetArrivals ¶
func (x *StageDiagnosis) GetArrivals() int64
func (*StageDiagnosis) GetBlockedRatio ¶
func (x *StageDiagnosis) GetBlockedRatio() float64
func (*StageDiagnosis) GetCompletions ¶
func (x *StageDiagnosis) GetCompletions() int64
func (*StageDiagnosis) GetErrorRate ¶
func (x *StageDiagnosis) GetErrorRate() float64
func (*StageDiagnosis) GetFailures ¶
func (x *StageDiagnosis) GetFailures() int64
func (*StageDiagnosis) GetIdleRatio ¶
func (x *StageDiagnosis) GetIdleRatio() float64
func (*StageDiagnosis) GetQueueGrowth ¶
func (x *StageDiagnosis) GetQueueGrowth() int64
func (*StageDiagnosis) GetStage ¶
func (x *StageDiagnosis) GetStage() string
func (*StageDiagnosis) GetState ¶
func (x *StageDiagnosis) GetState() StageState
func (*StageDiagnosis) GetUtilization ¶
func (x *StageDiagnosis) GetUtilization() float64
func (*StageDiagnosis) ProtoMessage ¶
func (*StageDiagnosis) ProtoMessage()
func (*StageDiagnosis) ProtoReflect ¶
func (x *StageDiagnosis) ProtoReflect() protoreflect.Message
func (*StageDiagnosis) Reset ¶
func (x *StageDiagnosis) Reset()
func (*StageDiagnosis) String ¶
func (x *StageDiagnosis) String() string
type StageObservation ¶
type StageObservation struct {
Stage string `protobuf:"bytes,1,opt,name=stage,proto3" json:"stage,omitempty"`
// Work accounting (abstract units — nanoseconds or ticks).
BusyWork uint64 `protobuf:"varint,2,opt,name=busy_work,json=busyWork,proto3" json:"busy_work,omitempty"`
CapacityWork uint64 `protobuf:"varint,3,opt,name=capacity_work,json=capacityWork,proto3" json:"capacity_work,omitempty"`
// Optional work accounting — absent means not observed.
IdleWork *uint64 `protobuf:"varint,4,opt,name=idle_work,json=idleWork,proto3,oneof" json:"idle_work,omitempty"`
BlockedWork *uint64 `protobuf:"varint,5,opt,name=blocked_work,json=blockedWork,proto3,oneof" json:"blocked_work,omitempty"`
// Item counters over the window.
Arrivals int64 `protobuf:"varint,6,opt,name=arrivals,proto3" json:"arrivals,omitempty"`
Completions *int64 `protobuf:"varint,7,opt,name=completions,proto3,oneof" json:"completions,omitempty"`
Failures *int64 `protobuf:"varint,8,opt,name=failures,proto3,oneof" json:"failures,omitempty"`
// Point-in-time gauges (end of window).
QueueDepth *int64 `protobuf:"varint,9,opt,name=queue_depth,json=queueDepth,proto3,oneof" json:"queue_depth,omitempty"`
Workers int32 `protobuf:"varint,10,opt,name=workers,proto3" json:"workers,omitempty"`
// contains filtered or unexported fields
}
StageObservation is the canonical input to the analyzer. One per stage per analysis window.
Invariant: busy_work + idle_work + blocked_work <= capacity_work. Failures is a subset of completions. Adapters should aim for equality in the work invariant; the analyzer tolerates inequality.
Presence: idle_work, blocked_work, completions, failures, and queue_depth are optional — not all adapters can observe them. Non-optional scalars (busy_work, capacity_work, arrivals, workers) use proto3 default zero when absent. Converters validate semantics (e.g., stage name non-empty, counts non-negative).
func ObservationToProto ¶
func ObservationToProto(o core.StageObservation) *StageObservation
ObservationToProto converts a core.StageObservation to its proto form. Optional fields are set only when the corresponding mask bit is present.
func (*StageObservation) Descriptor
deprecated
func (*StageObservation) Descriptor() ([]byte, []int)
Deprecated: Use StageObservation.ProtoReflect.Descriptor instead.
func (*StageObservation) GetArrivals ¶
func (x *StageObservation) GetArrivals() int64
func (*StageObservation) GetBlockedWork ¶
func (x *StageObservation) GetBlockedWork() uint64
func (*StageObservation) GetBusyWork ¶
func (x *StageObservation) GetBusyWork() uint64
func (*StageObservation) GetCapacityWork ¶
func (x *StageObservation) GetCapacityWork() uint64
func (*StageObservation) GetCompletions ¶
func (x *StageObservation) GetCompletions() int64
func (*StageObservation) GetFailures ¶
func (x *StageObservation) GetFailures() int64
func (*StageObservation) GetIdleWork ¶
func (x *StageObservation) GetIdleWork() uint64
func (*StageObservation) GetQueueDepth ¶
func (x *StageObservation) GetQueueDepth() int64
func (*StageObservation) GetStage ¶
func (x *StageObservation) GetStage() string
func (*StageObservation) GetWorkers ¶
func (x *StageObservation) GetWorkers() int32
func (*StageObservation) ProtoMessage ¶
func (*StageObservation) ProtoMessage()
func (*StageObservation) ProtoReflect ¶
func (x *StageObservation) ProtoReflect() protoreflect.Message
func (*StageObservation) Reset ¶
func (x *StageObservation) Reset()
func (*StageObservation) String ¶
func (x *StageObservation) String() string
type StageState ¶
type StageState int32
StageState classifies a stage's operational state.
const ( StageState_STAGE_STATE_UNSPECIFIED StageState = 0 StageState_STAGE_STATE_HEALTHY StageState = 1 StageState_STAGE_STATE_STARVED StageState = 2 StageState_STAGE_STATE_BLOCKED StageState = 3 StageState_STAGE_STATE_SATURATED StageState = 4 StageState_STAGE_STATE_BROKEN StageState = 5 )
func (StageState) Descriptor ¶
func (StageState) Descriptor() protoreflect.EnumDescriptor
func (StageState) Enum ¶
func (x StageState) Enum() *StageState
func (StageState) EnumDescriptor
deprecated
func (StageState) EnumDescriptor() ([]byte, []int)
Deprecated: Use StageState.Descriptor instead.
func (StageState) Number ¶
func (x StageState) Number() protoreflect.EnumNumber
func (StageState) String ¶
func (x StageState) String() string
func (StageState) Type ¶
func (StageState) Type() protoreflect.EnumType