autoscale

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package autoscale emits scaling signals from a Murmur worker so an upstream autoscaler (ECS Fargate target tracking, Kubernetes HPA via the CloudWatch adapter, etc.) can adjust replica count to load.

Why this exists: ECS Fargate doesn't natively autoscale on Kafka consumer lag or Kinesis shard count — only on CPU and memory. Per `doc/architecture.md` open question #2, the canonical fix is to publish a custom CloudWatch metric and configure target tracking on it. This package is the in-process emitter side of that pattern.

Scope

  • Signal — a periodically-sampled measurement (consumer lag, shard-iterator age, events-per-second, queue depth). User-supplied because the right signal depends on the source.
  • Emitter — publishes the measurement somewhere. CloudWatch is the reference implementation in pkg/observability/autoscale/cwemitter; Prometheus / Datadog / a Noop are the user's to wire.
  • Run — periodic loop that ties them together. Worker spawns it once at startup; runs until ctx cancellation.

What this isn't

Not a metrics framework. The streaming runtime's `metrics.Recorder` already records per-pipeline events / errors / latencies — wire that to your observability stack via its own Recorder implementation. This package is specifically for the SCALING-SIGNAL surface, where the autoscaler reads ONE clearly-labeled metric at a target value to decide replica count. Conflating the two leads to autoscaler thrash.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(
	ctx context.Context,
	name string,
	period time.Duration,
	signal Signal,
	emitter Emitter,
	opts ...RunOption,
) error

Run reads `signal` every `period` and publishes each measurement via `emitter`. Returns when ctx cancels; non-nil error only if the input is invalid (nil signal / nil emitter / non-positive period).

Sample failures and emit failures both fire callbacks (see WithOnSignalError / WithOnEmitError) but do not stop the loop — autoscaling signals are inherently best-effort, and a stuck emitter shouldn't take the worker down.

Types

type CWConfig

type CWConfig struct {
	// Client is an SDK v2 CloudWatch client. The Emitter does not own
	// the client's lifecycle.
	Client *cloudwatch.Client

	// Namespace is the CloudWatch namespace for the published metric.
	// Convention: "Murmur" or "Murmur/<deployment>".
	Namespace string

	// Unit, when set, attaches a CloudWatch StandardUnit to each emit.
	// Common: cwtypes.StandardUnitCount (lag, queue depth),
	// cwtypes.StandardUnitMilliseconds (iterator age),
	// cwtypes.StandardUnitCountSecond (events/sec).
	// Defaults to None.
	Unit cwtypes.StandardUnit

	// ExtraDimensions are added to every emit. Use for static labels
	// (environment, region, cluster). Per-call dimensions from the
	// Signal are merged on top.
	ExtraDimensions map[string]string
}

CWConfig configures a CloudWatchEmitter.

type CloudWatchEmitter

type CloudWatchEmitter struct {
	// contains filtered or unexported fields
}

CloudWatchEmitter publishes scaling-signal measurements to CloudWatch custom metrics. The reference Emitter implementation; production AWS users will typically wire this directly.

Each Emit issues one PutMetricData call. For high-frequency emits (sub-second), batch via your own buffer in front. Default usage is once-per-second-or-slower, well under the API's 150 req/s default quota.

Configure ECS Fargate target tracking against the namespace+metric you publish here:

resource "aws_appautoscaling_policy" "lag" {
  policy_type = "TargetTrackingScaling"
  target_tracking_scaling_policy_configuration {
    target_value = 1000   # target 1000 events lag per replica
    customized_metric_specification {
      metric_name = "ConsumerLag"
      namespace   = "Murmur"
      statistic   = "Average"
      dimensions {
        name  = "Pipeline"
        value = "page_views"
      }
    }
  }
}

func NewCloudWatchEmitter

func NewCloudWatchEmitter(cfg CWConfig) (*CloudWatchEmitter, error)

NewCloudWatchEmitter constructs the emitter. Validates required fields.

func (*CloudWatchEmitter) Close

func (e *CloudWatchEmitter) Close() error

Close is a no-op; the underlying SDK client is owned by the caller.

func (*CloudWatchEmitter) Emit

func (e *CloudWatchEmitter) Emit(ctx context.Context, name string, value float64, dims map[string]string) error

Emit publishes a single point to CloudWatch.

type Emitter

type Emitter interface {
	// Emit publishes a single (name, value, dimensions) point. Returns
	// an error only on backend failure; successfully published is
	// silent (the caller will record metrics about emitter success
	// from the outside).
	Emit(ctx context.Context, name string, value float64, dimensions map[string]string) error

	// Close releases any underlying resources (HTTP clients, batchers).
	Close() error
}

Emitter publishes a metric value. The Emitter contract is intentionally minimal so adding a Datadog / Prometheus / generic-HTTP backend is a few lines.

type RunOption

type RunOption func(*runConfig)

RunOption configures Run.

func WithInitialDelay

func WithInitialDelay(d time.Duration) RunOption

WithInitialDelay sets a stagger before the first emit. Useful when many workers come up simultaneously — without a delay, the first emit storm hits the metrics backend at the same moment.

func WithOnEmitError

func WithOnEmitError(fn func(name string, err error)) RunOption

WithOnEmitError installs a callback for metric-publish failures. The loop continues regardless; this is the seam for visibility.

func WithOnSignalError

func WithOnSignalError(fn func(name string, err error)) RunOption

WithOnSignalError installs a callback for Signal sample failures (typically a transient connectivity issue with the source the signal reads from). Same continue-the-loop semantics.

type Signal

type Signal func(ctx context.Context) (value float64, dimensions map[string]string, err error)

Signal is the periodically-sampled measurement an autoscaler reads. Examples:

  • Kafka consumer-group lag (sum of per-partition lag for the group).
  • Kinesis IteratorAgeMilliseconds (max across shards).
  • SQS visible-message count (size of the queue waiting to be processed).
  • Events-per-second from the metrics recorder.

The Signal returns the current value plus optional dimensions (CloudWatch labels) that should accompany the metric. Returning an error skips the emit but doesn't stop the Run loop — transient signal-source errors are normal and shouldn't take the worker down.

func EventsPerSecond

func EventsPerSecond(getter func(ctx context.Context) (uint64, error)) Signal

EventsPerSecond returns a Signal that derives the events-per-second rate for a named pipeline from the metrics.Recorder's events counter. Useful when the natural autoscaling signal isn't source-specific lag but worker throughput — when EPS rises above target, scale out.

`getter` returns the cumulative events-processed count. Typically `func(ctx context.Context) (uint64, error) { return rec.SnapshotOne(name).EventsProcessed, nil }`.

The signal computes the delta between consecutive samples and divides by the elapsed wall time, so the first sample always returns 0 (no previous to diff against). Subsequent samples are stable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL