telemetry

package
v0.16.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: MIT Imports: 28 Imported by: 3

README

/pkg/telemetry

cd /

[!NOTE] asyncmachine-go is a batteries-included graph control flow library (AOP, actor model, state-machine).

/pkg/telemetry provides several telemetry exporters and is accompanied by generated Grafana dashboards:

dbg

dbg is simple telemetry used by am-dbg TUI Debugger. It delivers DbgMsg and DbgMsgStruct via standard net/rpc. It can also be consumed by a custom client.

OpenTelemetry Traces

Open Telemetry traces integration exposes machine's states and transitions as Otel traces, compatible with Jaeger. Tracers are inherited from parent machines and form a tree, with each machine getting a position index as a prefix, eg 0:mach:MyMachId. The transitions are linked to the states with logged arguments are added as trace's tags. Machine tags are added as well.

Tree Structure
- mach:ID
  - states
    - Foo
      - Foo (trace)
      - Foo (trace)
      - ...
    - ...
  - transitions
    - [add] Foo (trace)
    - ...
  - submachines
    - mach:ID2
      - ...
    - ...
Otel Tracing Setup
Automatic Otel Tracing

See /docs/env-configs.md for the required environment variables.

import amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"

// ...

var mach *am.Machine

// open telemetry traces
err = amtele.MachBindOtelEnv(mach)
if err != nil {
    mach.AddErr(err, nil)
}
Manual Otel Tracing
import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
    "go.opentelemetry.io/otel/trace"
)

// ...

var mach *am.Machine
var tracer trace.Tracer

machTracer := amtele.NewOtelMachTracer(tracer, &amtele.OtelMachTracerOpts{
    SkipTransitions: false,
})
mach.BindTracer(machTracer)

OpenTelemetry Logger

Open Telemetry logger integration exposes machine's logs (any level) as structured Otlp format. It can be very handy for stdout logging.

import (
    "go.opentelemetry.io/otel/sdk/log"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
)

// ...

var mach *am.Machine
var logExporter log.Exporter

// activate logs
mach.SetLogLevel(am.LogLevelOps)

// create a log provider
logProvider := amtele.NewOtelLoggerProvider(logExporter)
// bind provider to a machine
amtele.BindOtelLogger(mach, logProvider, "myserviceid")

Prometheus Metrics

pkg/telemetry/prometheus binds to machine's transactions, collects values within a defined interval, and exposes averaged metrics. Use it with the provided Grafana dashboard. Tracers are inherited from parent machines.

Automatic Prometheus Setup

See /docs/env-configs.md for the required environment variables.

import amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"

// ...

var mach *am.Machine

// export metrics to prometheus
metrics := amprom.MachMetricsEnv(mach)
Manual Prometheus Setup
import (
    "time"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amprom "github.com/pancsta/asyncmachine-go/pkg/telemetry/prometheus"
    "github.com/prometheus/client_golang/prometheus/push"
)

// ...

var mach *am.Machine
var promRegistry *prometheus.Registry
var promPusher *push.Pusher

// bind transition to metrics
metrics := amprom.BindMach(mach)

// bind metrics either a registry or a pusher
amprom.BindToRegistry(promRegistry)
amprom.BindToPusher(promPusher)

Loki Logger

Loki is the easiest way to persist distributed logs from asyncmachine. You'll need a promtail client.

Automatic Loki Setup

See /docs/env-configs.md for the required environment variables.

import amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"

// ...

var mach *am.Machine

// export logs to Loki logger
amtele.BindLokiEnv(mach)
Manual Loki Setup
import (
    "github.com/ic2hrmk/promtail"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
)

// ...

var mach *am.Machine
var service string

// init promtail and bind AM logger
identifiers := map[string]string{
    "service_name": service,
}
promtailClient, err := promtail.NewJSONv1Client("localhost:3100", identifiers)
if err != nil {
    panic(err)
}
defer promtailClient.Close()
amtele.BindLokiLogger(mach, promtailClient)

Grafana Dashboard

More info about Grafana dashboards can be found in /tools/cmd/am-gen.

Automatic Grafana Setup

See /docs/env-configs.md for the required environment variables.

import amgen "github.com/pancsta/asyncmachine-go/tools/generator"

// ...

var mach *am.Machine

// create a dedicated dashboard for [mach] and submachines
amgen.MachDashboardEnv(mach)
Manual Grafana Setup
import (
    amgen "github.com/pancsta/asyncmachine-go/tools/generator"
    amgencli "github.com/pancsta/asyncmachine-go/tools/generator/cli"
)

// ...

var mach *am.Machine
var service string
var url string
var token string

p := amgencli.GrafanaParams{
    Ids:        mach.Id(),
    Name:       mach.Id(),
    Folder:     "asyncmachine",
    GrafanaUrl: url,
    Token:      token,
    Source:     service,
}
t := &amgen.SyncTracer{p: p}

mach.BindTracer(t)

Inheritance

Most of the telemetry exporters are automatically inherited from parent machines, so the results come in automatically. To define a submachine-parent relationship, use am.Opts.Parent while initializing a machine. Alternatively, tracers can be copied using OptsWithParentTracers, or manually via Machine.Tracers.

Documentation

Status

Testing, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package telemetry provides telemetry exporters for asyncmachine: am-dbg, Prometheus, and OpenTelemetry.

Index

Constants

View Source
const (
	// DbgAddr is the default address of the am-dbg server.
	DbgAddr = "localhost:6831"
	// EnvAmDbgAddr is the address of a running am-dbg instance.
	// "1" expands to "localhost:6831"
	EnvAmDbgAddr = "AM_DBG_ADDR"
)
View Source
const (
	EnvService         = "AM_SERVICE"
	EnvLokiAddr        = "AM_LOKI_ADDR"
	EnvOtelTrace       = "AM_OTEL_TRACE"
	EnvOtelTraceTxs    = "AM_OTEL_TRACE_TXS"
	EnvOtelTraceArgs   = "AM_OTEL_TRACE_ARGS"
	EnvOtelTraceNoauto = "AM_OTEL_TRACE_NOAUTO"
)

Variables

This section is empty.

Functions

func BindLokiEnv added in v0.11.0

func BindLokiEnv(mach am.Api) error

BindLokiEnv bind Loki logger to [mach], based on environment vars: - AM_SERVICE (required) - AM_LOKI_ADDR (required) This tracer is NOT inherited by submachines.

func BindLokiLogger added in v0.8.0

func BindLokiLogger(mach am.Api, client promtail.Client)

func BindOtelLogger added in v0.8.0

func BindOtelLogger(
	mach am.Api, provider *ologsdk.LoggerProvider, service string,
)

BindOtelLogger binds an OpenTelemetry logger to a machine.

func MachBindOtelEnv added in v0.10.3

func MachBindOtelEnv(mach am.Api) error

MachBindOtelEnv bind an OpenTelemetry tracer to [mach], based on environment variables: - AM_SERVICE (required) - AM_OTEL_TRACE (required) - AM_OTEL_TRACE_TXS - OTEL_EXPORTER_OTLP_ENDPOINT - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT

This tracer is inherited by submachines, and this function applies only to top-level machines.

func NewOtelLoggerProvider added in v0.8.0

func NewOtelLoggerProvider(exporter ologsdk.Exporter) *ologsdk.LoggerProvider

NewOtelLoggerProvider creates a new OpenTelemetry logger provider bound to the given exporter.

func NewOtelProvider added in v0.10.3

func NewOtelProvider(
	source string, ctx context.Context,
) (trace.Tracer, *sdktrace.TracerProvider, error)

func NormalizeId added in v0.8.0

func NormalizeId(id string) string

func TransitionsToDbg added in v0.7.0

func TransitionsToDbg(mach am.Api, addr string) error

TransitionsToDbg sends transitions to the am-dbg server.

Types

type DbgMsg added in v0.5.0

type DbgMsg interface {
	// Clock returns the state's clock, using the passed index
	Clock(statesIndex am.S, state string) uint64
	// Is returns true if the state is active, using the passed index
	Is(statesIndex am.S, states am.S) bool
}

DbgMsg is the interface for the messages to be sent to the am-dbg server.

type DbgMsgStruct added in v0.5.0

type DbgMsgStruct struct {

	// Machine ID
	ID string
	// state names defining the indexes for diffs
	StatesIndex am.S
	// all the states with relations
	// TODO refac: Schema
	States am.Schema
	// list of group names and state indexes
	Groups map[string][]int
	// order of groups
	GroupsOrder []string
	// parent machine ID
	Parent string
	// machine tags
	Tags []string
}

DbgMsgStruct contains the state and relations data.

func (*DbgMsgStruct) Clock added in v0.5.0

func (d *DbgMsgStruct) Clock(_ am.S, _ string) uint64

func (*DbgMsgStruct) Is added in v0.5.0

func (d *DbgMsgStruct) Is(_ am.S, _ am.S) bool

type DbgMsgTx added in v0.5.0

type DbgMsgTx struct {
	MachineID string
	// Transition ID
	// TODO refac: Id
	ID string
	// Clocks is represents the machine time [am.Time] from after the current
	// transition.
	// TODO refac to TimeAfter, re-gen all the assets
	Clocks am.Time
	// QueueTick is the current queue tick in the machine.
	// transition.
	QueueTick uint64
	// TODO QueueDebug with all string entries for comparison
	// MutQueueToken is the token of a prepended mutation, can be scheduled or
	// executed, depending on IsQueued.
	MutQueueToken uint64
	// MutQueueTick is the assigned queue tick when the tx will be executed.
	// Only for IsQueued.
	MutQueueTick uint64
	// mutation type
	Type am.MutationType
	// called states
	// TODO remove. Deprecated use CalledStateNames(index)
	CalledStates []string
	// TODO rename to CalledStates, re-gen all assets
	CalledStatesIdxs []int
	// all the transition steps
	Steps []*am.Step
	// log entries created during the transition
	LogEntries []*am.LogEntry
	// log entries before the transition, which happened after the prev one
	PreLogEntries []*am.LogEntry
	// queue length at the start of the transition
	// TODO rename to QueueLen
	// TODO change to int32
	Queue int
	// Time is human time. Don't send this over the wire.
	// TODO remove or skip in msgpack
	// TODO rename to HTime
	Time *time.Time
	// transition was triggered by an auto state
	IsAuto bool
	// result of the transition
	// TODO rename to IsAccepted
	Accepted bool
	// is this a check (Can*) tx or mutation?
	IsCheck bool
	// is this a queued mutation?
	IsQueued  bool
	Args      map[string]string
	QueueDump []string
}

DbgMsgTx contains transition data.

func (*DbgMsgTx) ActiveStates added in v0.8.0

func (m *DbgMsgTx) ActiveStates(statesIndex am.S) am.S

func (*DbgMsgTx) CalledStateNames added in v0.8.0

func (m *DbgMsgTx) CalledStateNames(statesIndex am.S) am.S

func (*DbgMsgTx) Clock added in v0.5.0

func (m *DbgMsgTx) Clock(statesIndex am.S, state string) uint64

func (*DbgMsgTx) Index added in v0.8.0

func (m *DbgMsgTx) Index(statesIndex am.S, state string) int

func (*DbgMsgTx) Is added in v0.5.0

func (m *DbgMsgTx) Is(statesIndex am.S, states am.S) bool

func (*DbgMsgTx) Is1 added in v0.5.0

func (m *DbgMsgTx) Is1(statesIndex am.S, state string) bool

func (*DbgMsgTx) MutString added in v0.15.0

func (m *DbgMsgTx) MutString(statesIndex am.S) string

TODO unify with Mut String

func (*DbgMsgTx) TimeSum added in v0.5.0

func (m *DbgMsgTx) TimeSum() uint64

TODO Sum() and TimeSum(idxs []int)

func (*DbgMsgTx) TxString added in v0.15.0

func (m *DbgMsgTx) TxString(statesIndex am.S) string

TODO unify with Tx String

type DbgTracer added in v0.7.0

type DbgTracer struct {
	*am.NoOpTracer

	Addr string
	Mach am.Api
	// contains filtered or unexported fields
}

func NewDbgTracer added in v0.8.0

func NewDbgTracer(mach am.Api, addr string) *DbgTracer

func (*DbgTracer) MachineDispose added in v0.8.0

func (t *DbgTracer) MachineDispose(id string)

func (*DbgTracer) MachineInit added in v0.8.0

func (t *DbgTracer) MachineInit(mach am.Api) context.Context

func (*DbgTracer) MutationQueued added in v0.15.0

func (t *DbgTracer) MutationQueued(mach am.Api, mut *am.Mutation)

func (*DbgTracer) SchemaChange added in v0.11.0

func (t *DbgTracer) SchemaChange(mach am.Api, _ am.Schema)

func (*DbgTracer) TransitionEnd added in v0.7.0

func (t *DbgTracer) TransitionEnd(tx *am.Transition)

type OtelMachTracer added in v0.5.0

type OtelMachTracer struct {
	*am.NoOpTracer

	Tracer        trace.Tracer
	Machines      map[string]*OtelMachineData
	MachinesMx    sync.Mutex
	MachinesOrder []string
	RootSpan      trace.Span

	// TODO bind to env var
	Logf func(format string, args ...any)

	NextIndex int
	// contains filtered or unexported fields
}

OtelMachTracer implements machine.Tracer for OpenTelemetry. Supports tracing of multiple state machines, resulting in a single trace. This tracer is automatically bound to new sub-machines.

func NewOtelMachTracer added in v0.5.0

func NewOtelMachTracer(
	rootMach am.Api, rootSpan trace.Span, otelTracer trace.Tracer,
	opts *OtelMachTracerOpts,
) *OtelMachTracer

NewOtelMachTracer creates a new machine tracer from an OpenTelemetry tracer. Requires OtelMachTracer.Dispose to be called at the end.

func (*OtelMachTracer) End added in v0.5.0

func (mt *OtelMachTracer) End()

func (*OtelMachTracer) HandlerEnd added in v0.5.0

func (mt *OtelMachTracer) HandlerEnd(
	tx *am.Transition, emitter string, handler string,
)

func (*OtelMachTracer) MachineDispose added in v0.5.0

func (mt *OtelMachTracer) MachineDispose(id string)

func (*OtelMachTracer) MachineInit added in v0.5.0

func (mt *OtelMachTracer) MachineInit(mach am.Api) context.Context

func (*OtelMachTracer) NewSubmachine added in v0.5.0

func (mt *OtelMachTracer) NewSubmachine(parent, mach am.Api)

NewSubmachine links 2 machines with a parent-child relationship.

func (*OtelMachTracer) QueueEnd added in v0.6.0

func (mt *OtelMachTracer) QueueEnd(mach am.Api)

func (*OtelMachTracer) TransitionEnd added in v0.5.0

func (mt *OtelMachTracer) TransitionEnd(tx *am.Transition)

func (*OtelMachTracer) TransitionInit added in v0.5.0

func (mt *OtelMachTracer) TransitionInit(tx *am.Transition)

type OtelMachTracerOpts added in v0.5.0

type OtelMachTracerOpts struct {
	// if true, only state changes will be traced
	SkipTransitions bool
	// if true, only Healthcheck and Heartbeat will be skipped
	IncludeHealth bool
	// if true, transition traces won't include [am.Machine.GetLogArgs]
	SkipLogArgs bool
	// if true, auto transitions won't be traced
	SkipAuto bool
	// TODO
	WhitelistStates am.S
	// TODO
	BlacklistStates am.S

	Logf func(format string, args ...any)
}

type OtelMachineData added in v0.5.0

type OtelMachineData struct {
	ID string
	// Index is a unique number for this machine withing the Otel tracer.
	Index int
	Ended bool
	// contains filtered or unexported fields
}

type OtelTxHist added in v0.11.0

type OtelTxHist struct {
	Id  string
	Ctx context.Context
}

Directories

Path Synopsis
Package prometheus provides Prometheus metrics for asyncmachine.
Package prometheus provides Prometheus metrics for asyncmachine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL