tocpb

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package tocpb provides protobuf bindings and conversion between proto wire types and the canonical Go types in toc/core.

Conversion functions bridge the proto presence model (optional fields) and the Go presence model (ObservationMask bitmask, HasIdleRatio/HasBlockedRatio bools).

FromProto functions return (T, error) and validate invariants. ToProto functions are infallible — the Go types are assumed valid.

Index

Constants

This section is empty.

Variables

View Source
var (
	StageState_name = map[int32]string{
		0: "STAGE_STATE_UNSPECIFIED",
		1: "STAGE_STATE_HEALTHY",
		2: "STAGE_STATE_STARVED",
		3: "STAGE_STATE_BLOCKED",
		4: "STAGE_STATE_SATURATED",
		5: "STAGE_STATE_BROKEN",
	}
	StageState_value = map[string]int32{
		"STAGE_STATE_UNSPECIFIED": 0,
		"STAGE_STATE_HEALTHY":     1,
		"STAGE_STATE_STARVED":     2,
		"STAGE_STATE_BLOCKED":     3,
		"STAGE_STATE_SATURATED":   4,
		"STAGE_STATE_BROKEN":      5,
	}
)

Enum value maps for StageState.

View Source
var File_toc_v1_toc_proto protoreflect.FileDescriptor

Functions

func DiagnosisFromProto

func DiagnosisFromProto(pb *Diagnosis) (core.Diagnosis, error)

DiagnosisFromProto converts a proto Diagnosis to core form. Returns zero value and error if the message violates invariants. Validates unique stage names and constraint membership.

func ObservationFromProto

func ObservationFromProto(pb *StageObservation) (core.StageObservation, error)

ObservationFromProto converts a proto StageObservation to core form. Returns zero value and error if the message violates invariants.

Types

type DecodedBatch

type DecodedBatch struct {
	PipelineID         string
	TimestampUnixNano  int64
	WindowDurationNano int64
	Observations       []core.StageObservation
}

DecodedBatch holds the validated contents of an ObservationBatch.

func BatchFromProto

func BatchFromProto(pb *ObservationBatch) (DecodedBatch, error)

BatchFromProto converts a proto ObservationBatch to a DecodedBatch. Validates batch-level invariants: non-empty pipeline_id, positive window_duration, unique stage names, and each observation individually.

type Diagnosis

type Diagnosis struct {
	Constraint      string            `protobuf:"bytes,1,opt,name=constraint,proto3" json:"constraint,omitempty"`
	Confidence      float64           `protobuf:"fixed64,2,opt,name=confidence,proto3" json:"confidence,omitempty"`
	Stages          []*StageDiagnosis `protobuf:"bytes,3,rep,name=stages,proto3" json:"stages,omitempty"`
	StarvationCount int64             `protobuf:"varint,4,opt,name=starvation_count,json=starvationCount,proto3" json:"starvation_count,omitempty"`
	// contains filtered or unexported fields
}

Diagnosis is the output of one analyzer step.

func DiagnosisToProto

func DiagnosisToProto(d core.Diagnosis) *Diagnosis

DiagnosisToProto converts a core.Diagnosis to its proto form.

func (*Diagnosis) Descriptor deprecated

func (*Diagnosis) Descriptor() ([]byte, []int)

Deprecated: Use Diagnosis.ProtoReflect.Descriptor instead.

func (*Diagnosis) GetConfidence

func (x *Diagnosis) GetConfidence() float64

func (*Diagnosis) GetConstraint

func (x *Diagnosis) GetConstraint() string

func (*Diagnosis) GetStages

func (x *Diagnosis) GetStages() []*StageDiagnosis

func (*Diagnosis) GetStarvationCount

func (x *Diagnosis) GetStarvationCount() int64

func (*Diagnosis) ProtoMessage

func (*Diagnosis) ProtoMessage()

func (*Diagnosis) ProtoReflect

func (x *Diagnosis) ProtoReflect() protoreflect.Message

func (*Diagnosis) Reset

func (x *Diagnosis) Reset()

func (*Diagnosis) String

func (x *Diagnosis) String() string

type ObservationBatch

type ObservationBatch struct {
	PipelineId         string              `protobuf:"bytes,1,opt,name=pipeline_id,json=pipelineId,proto3" json:"pipeline_id,omitempty"`
	TimestampUnixNano  int64               `protobuf:"varint,2,opt,name=timestamp_unix_nano,json=timestampUnixNano,proto3" json:"timestamp_unix_nano,omitempty"`    // window end, UTC nanoseconds
	WindowDurationNano int64               `protobuf:"varint,3,opt,name=window_duration_nano,json=windowDurationNano,proto3" json:"window_duration_nano,omitempty"` // must be > 0
	Observations       []*StageObservation `protobuf:"bytes,4,rep,name=observations,proto3" json:"observations,omitempty"`
	// contains filtered or unexported fields
}

ObservationBatch wraps a set of observations for one analysis window. This is the wire message for NATS transport.

All observations in a batch MUST refer to the same analysis window. The batch is produced by a single aggregator — there is no cross-host assembly. Stage names MUST be unique within a batch.

timestamp_unix_nano is the window end time (when the aggregator sampled). window_duration_nano is the length of the analysis window. Together they define [end - duration, end).

func BatchToProto

func BatchToProto(pipelineID string, timestampUnixNano, windowDurationNano int64, observations []core.StageObservation) *ObservationBatch

BatchToProto converts a slice of core.StageObservation values to a proto ObservationBatch with the given metadata.

func (*ObservationBatch) Descriptor deprecated

func (*ObservationBatch) Descriptor() ([]byte, []int)

Deprecated: Use ObservationBatch.ProtoReflect.Descriptor instead.

func (*ObservationBatch) GetObservations

func (x *ObservationBatch) GetObservations() []*StageObservation

func (*ObservationBatch) GetPipelineId

func (x *ObservationBatch) GetPipelineId() string

func (*ObservationBatch) GetTimestampUnixNano

func (x *ObservationBatch) GetTimestampUnixNano() int64

func (*ObservationBatch) GetWindowDurationNano

func (x *ObservationBatch) GetWindowDurationNano() int64

func (*ObservationBatch) ProtoMessage

func (*ObservationBatch) ProtoMessage()

func (*ObservationBatch) ProtoReflect

func (x *ObservationBatch) ProtoReflect() protoreflect.Message

func (*ObservationBatch) Reset

func (x *ObservationBatch) Reset()

func (*ObservationBatch) String

func (x *ObservationBatch) String() string

type StageDiagnosis

type StageDiagnosis struct {
	Stage       string     `protobuf:"bytes,1,opt,name=stage,proto3" json:"stage,omitempty"`
	State       StageState `protobuf:"varint,2,opt,name=state,proto3,enum=toc.v1.StageState" json:"state,omitempty"`
	Utilization float64    `protobuf:"fixed64,3,opt,name=utilization,proto3" json:"utilization,omitempty"`
	// Optional ratios — absent when the source observation lacked
	// the corresponding work data.
	IdleRatio    *float64 `protobuf:"fixed64,4,opt,name=idle_ratio,json=idleRatio,proto3,oneof" json:"idle_ratio,omitempty"`
	BlockedRatio *float64 `protobuf:"fixed64,5,opt,name=blocked_ratio,json=blockedRatio,proto3,oneof" json:"blocked_ratio,omitempty"`
	ErrorRate    float64  `protobuf:"fixed64,6,opt,name=error_rate,json=errorRate,proto3" json:"error_rate,omitempty"`
	QueueGrowth  int64    `protobuf:"varint,7,opt,name=queue_growth,json=queueGrowth,proto3" json:"queue_growth,omitempty"`
	// Passthrough counts for consumer rate computation.
	Completions int64 `protobuf:"varint,8,opt,name=completions,proto3" json:"completions,omitempty"`
	Failures    int64 `protobuf:"varint,9,opt,name=failures,proto3" json:"failures,omitempty"`
	Arrivals    int64 `protobuf:"varint,10,opt,name=arrivals,proto3" json:"arrivals,omitempty"`
	// contains filtered or unexported fields
}

StageDiagnosis holds the classification for one stage. Contains ratios and counts, NOT rates. The caller converts to rates using their own time model (wall-clock or ticks).

func (*StageDiagnosis) Descriptor deprecated

func (*StageDiagnosis) Descriptor() ([]byte, []int)

Deprecated: Use StageDiagnosis.ProtoReflect.Descriptor instead.

func (*StageDiagnosis) GetArrivals

func (x *StageDiagnosis) GetArrivals() int64

func (*StageDiagnosis) GetBlockedRatio

func (x *StageDiagnosis) GetBlockedRatio() float64

func (*StageDiagnosis) GetCompletions

func (x *StageDiagnosis) GetCompletions() int64

func (*StageDiagnosis) GetErrorRate

func (x *StageDiagnosis) GetErrorRate() float64

func (*StageDiagnosis) GetFailures

func (x *StageDiagnosis) GetFailures() int64

func (*StageDiagnosis) GetIdleRatio

func (x *StageDiagnosis) GetIdleRatio() float64

func (*StageDiagnosis) GetQueueGrowth

func (x *StageDiagnosis) GetQueueGrowth() int64

func (*StageDiagnosis) GetStage

func (x *StageDiagnosis) GetStage() string

func (*StageDiagnosis) GetState

func (x *StageDiagnosis) GetState() StageState

func (*StageDiagnosis) GetUtilization

func (x *StageDiagnosis) GetUtilization() float64

func (*StageDiagnosis) ProtoMessage

func (*StageDiagnosis) ProtoMessage()

func (*StageDiagnosis) ProtoReflect

func (x *StageDiagnosis) ProtoReflect() protoreflect.Message

func (*StageDiagnosis) Reset

func (x *StageDiagnosis) Reset()

func (*StageDiagnosis) String

func (x *StageDiagnosis) String() string

type StageObservation

type StageObservation struct {
	Stage string `protobuf:"bytes,1,opt,name=stage,proto3" json:"stage,omitempty"`
	// Work accounting (abstract units — nanoseconds or ticks).
	BusyWork     uint64 `protobuf:"varint,2,opt,name=busy_work,json=busyWork,proto3" json:"busy_work,omitempty"`
	CapacityWork uint64 `protobuf:"varint,3,opt,name=capacity_work,json=capacityWork,proto3" json:"capacity_work,omitempty"`
	// Optional work accounting — absent means not observed.
	IdleWork    *uint64 `protobuf:"varint,4,opt,name=idle_work,json=idleWork,proto3,oneof" json:"idle_work,omitempty"`
	BlockedWork *uint64 `protobuf:"varint,5,opt,name=blocked_work,json=blockedWork,proto3,oneof" json:"blocked_work,omitempty"`
	// Item counters over the window.
	Arrivals    int64  `protobuf:"varint,6,opt,name=arrivals,proto3" json:"arrivals,omitempty"`
	Completions *int64 `protobuf:"varint,7,opt,name=completions,proto3,oneof" json:"completions,omitempty"`
	Failures    *int64 `protobuf:"varint,8,opt,name=failures,proto3,oneof" json:"failures,omitempty"`
	// Point-in-time gauges (end of window).
	QueueDepth *int64 `protobuf:"varint,9,opt,name=queue_depth,json=queueDepth,proto3,oneof" json:"queue_depth,omitempty"`
	Workers    int32  `protobuf:"varint,10,opt,name=workers,proto3" json:"workers,omitempty"`
	// contains filtered or unexported fields
}

StageObservation is the canonical input to the analyzer. One per stage per analysis window.

Invariant: busy_work + idle_work + blocked_work <= capacity_work. Failures is a subset of completions. Adapters should aim for equality in the work invariant; the analyzer tolerates inequality.

Presence: idle_work, blocked_work, completions, failures, and queue_depth are optional — not all adapters can observe them. Non-optional scalars (busy_work, capacity_work, arrivals, workers) use proto3 default zero when absent. Converters validate semantics (e.g., stage name non-empty, counts non-negative).

func ObservationToProto

func ObservationToProto(o core.StageObservation) *StageObservation

ObservationToProto converts a core.StageObservation to its proto form. Optional fields are set only when the corresponding mask bit is present.

func (*StageObservation) Descriptor deprecated

func (*StageObservation) Descriptor() ([]byte, []int)

Deprecated: Use StageObservation.ProtoReflect.Descriptor instead.

func (*StageObservation) GetArrivals

func (x *StageObservation) GetArrivals() int64

func (*StageObservation) GetBlockedWork

func (x *StageObservation) GetBlockedWork() uint64

func (*StageObservation) GetBusyWork

func (x *StageObservation) GetBusyWork() uint64

func (*StageObservation) GetCapacityWork

func (x *StageObservation) GetCapacityWork() uint64

func (*StageObservation) GetCompletions

func (x *StageObservation) GetCompletions() int64

func (*StageObservation) GetFailures

func (x *StageObservation) GetFailures() int64

func (*StageObservation) GetIdleWork

func (x *StageObservation) GetIdleWork() uint64

func (*StageObservation) GetQueueDepth

func (x *StageObservation) GetQueueDepth() int64

func (*StageObservation) GetStage

func (x *StageObservation) GetStage() string

func (*StageObservation) GetWorkers

func (x *StageObservation) GetWorkers() int32

func (*StageObservation) ProtoMessage

func (*StageObservation) ProtoMessage()

func (*StageObservation) ProtoReflect

func (x *StageObservation) ProtoReflect() protoreflect.Message

func (*StageObservation) Reset

func (x *StageObservation) Reset()

func (*StageObservation) String

func (x *StageObservation) String() string

type StageState

type StageState int32

StageState classifies a stage's operational state.

const (
	StageState_STAGE_STATE_UNSPECIFIED StageState = 0
	StageState_STAGE_STATE_HEALTHY     StageState = 1
	StageState_STAGE_STATE_STARVED     StageState = 2
	StageState_STAGE_STATE_BLOCKED     StageState = 3
	StageState_STAGE_STATE_SATURATED   StageState = 4
	StageState_STAGE_STATE_BROKEN      StageState = 5
)

func (StageState) Descriptor

func (StageState) Descriptor() protoreflect.EnumDescriptor

func (StageState) Enum

func (x StageState) Enum() *StageState

func (StageState) EnumDescriptor deprecated

func (StageState) EnumDescriptor() ([]byte, []int)

Deprecated: Use StageState.Descriptor instead.

func (StageState) Number

func (x StageState) Number() protoreflect.EnumNumber

func (StageState) String

func (x StageState) String() string

func (StageState) Type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL