tracing

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: MIT Imports: 11 Imported by: 0

README

XMTPD APM Tracing

This package provides Datadog APM distributed tracing for xmtpd, enabling end-to-end visibility into message processing, cross-node replication, and database operations.

Configuration

Tracing is disabled by default and must be explicitly enabled via the application flag. Sampling rate defaults to 100% in dev/test and 10% in production/staging. Override with the APM_SAMPLE_RATE environment variable (0.0–1.0).

Enabling Tracing

Set the XMTPD_TRACING_ENABLE environment variable (or CLI flag --tracing.enable):

export XMTPD_TRACING_ENABLE=true

In Terraform, set the variable on the xmtpd server module:

tracing_enable = true
Environment Variables
Variable Default Description
XMTPD_TRACING_ENABLE false Set to true to enable tracing
ENV test Environment name (used as Datadog env tag)
APM_SAMPLE_RATE env-based Sample rate 0.0–1.0 (default: 1.0 dev/test, 0.1 prod/staging)
DD_AGENT_HOST localhost Datadog agent host (standard DD env var)
DD_TRACE_AGENT_PORT 8126 Datadog agent port (standard DD env var)
Zero Overhead When Disabled

When XMTPD_TRACING_ENABLE is not set (the default), all span creation functions return a shared no-op singleton. There is no allocation, no string formatting, and no network I/O. The DB tracer falls back to Prometheus-only logging.

Instrumented Paths

Write Path (Message Publish)
node.publish_payer_envelopes
  └── node.stage_transaction
      └── pgx.query [db.role=writer] INSERT INTO staged_originator_envelopes...
          │
          ▼ (async - trace_linked=true/false)
      publish_worker.process
          ├── publish_worker.calculate_fees
          ├── publish_worker.sign_envelope
          ├── publish_worker.insert_gateway
          │   └── pgx.query INSERT INTO gateway_envelopes...
          └── publish_worker.delete_staged
              └── pgx.query DELETE FROM staged_originator_envelopes...
Read Path (Message Query)
node.query_envelopes
  └── pgx.query [db.role=reader] SELECT * FROM gateway_envelopes...
Subscribe Path (Client Streaming)
subscribe_worker.dispatch
  ├── [batch_size, envelopes_parsed, parse_errors]
  └── subscribe_worker.listener_closed [reason=channel_full|context_done]
Cross-Node Replication
sync.connect_to_node [target_node, target_address]
  └── sync.setup_stream
      └── sync.subscribe_envelopes
          └── sync.receive_batch [source_node, num_envelopes]
              └── sync.validate_envelope [sequence_id, topic]
                  └── sync_worker.store_envelope
                      ├── sync_worker.verify_fees
                      └── sync_worker.insert_gateway

Key Tags for Debugging

Identifying Read-Replica Issues
trigger=timer_fallback     # Notification was missed, fell back to polling
notification_miss=true     # DBSubscription poll found nothing after notification
db.role=reader             # Query went to read replica
db.role=writer             # Query went to primary
Tracing Async Processing
trace_linked=true          # Async span successfully linked to parent
trace_linked=false         # Parent context not found (TTL expired or timer fallback)
staged_id=12345            # Envelope ID for correlation
Client Issues
reason=channel_full        # Client not consuming fast enough
reason=context_done        # Client disconnected
dropped_envelopes=5        # Number of envelopes client missed

Example Datadog Queries

Find all timer fallbacks (read-replica bug indicator)
service:xmtpd @trigger:timer_fallback
Find empty query results
service:xmtpd operation_name:xmtpd.node.query_envelopes @zero_results:true
Find slow database queries
service:xmtpd resource_name:pgx.query @duration:>100ms
Find queries that hit the read replica
service:xmtpd resource_name:pgx.query @db.role:reader
Find dropped client connections
service:xmtpd operation_name:xmtpd.subscribe_worker.listener_closed @reason:channel_full
Find cross-node sync errors
service:xmtpd operation_name:xmtpd.sync.* @error:true
Find out-of-order envelopes
service:xmtpd @out_of_order:true

Production Safety Limits

The tracing package includes built-in limits to prevent runaway resource usage:

Limit Value Purpose
MaxTagValueLength 1024 String tags longer than this are truncated
MaxStoreSize 10000 Maximum entries in TraceContextStore

Troubleshooting

Traces not appearing in Datadog
  1. Verify XMTPD_TRACING_ENABLE=true is set
  2. Verify agent is running: curl http://localhost:8126/info
  3. Verify ENV is set correctly for your environment
  4. Check DD_AGENT_HOST if agent is not on localhost
Missing parent spans (trace_linked=false)

This indicates async context propagation failed. Causes:

  • TraceContextStore TTL expired (5 minutes)
  • Timer fallback was used instead of notification
  • Envelope processed by different worker instance

Architecture Notes

Async Context Propagation

The TraceContextStore bridges async boundaries between staging and worker processing:

  1. When PublishPayerEnvelopes stages an envelope, it stores the span context
  2. When publish_worker processes the envelope, it retrieves the context
  3. Child spans are created with ChildOf(parentContext) for trace linking
  4. Contexts expire after 5 minutes to prevent memory leaks
Composite Database Tracer

Database tracing uses a composite pattern to preserve existing functionality:

  • tracelog.TraceLog - Prometheus metrics logging (existing, always active)
  • apmQueryTracer - Datadog APM spans (only wired when tracing is enabled)

Documentation

Overview

Package tracing provides span operation name constants for APM instrumentation. Using constants ensures consistency and makes refactoring easier.

Package tracing enables [Datadog APM tracing](https://docs.datadoghq.com/tracing/) capabilities, focusing specifically on [Error Tracking](https://docs.datadoghq.com/tracing/error_tracking/)

Index

Constants

View Source
const (
	// DefaultTraceContextTTL is the default time-to-live for stored span contexts.
	// 5 minutes is generous - publish_worker typically processes within seconds.
	DefaultTraceContextTTL = 5 * time.Minute

	// MaxTagValueLength is the maximum length for string tag values.
	// Longer strings are truncated to prevent excessive trace payload sizes.
	// 1KB is generous for most use cases while preventing abuse.
	MaxTagValueLength = 1024

	// MaxStoreSize is the maximum number of entries in TraceContextStore.
	// Prevents unbounded memory growth if publish_worker falls behind.
	// 10K entries at ~100 bytes each = ~1MB max memory.
	MaxStoreSize = 10000
)

Span limits for production safety - prevent runaway memory/payload sizes.

View Source
const (
	// Node API spans - incoming request handling
	SpanNodePublishPayerEnvelopes = "xmtpd.node.publish_payer_envelopes"
	SpanNodeQueryEnvelopes        = "xmtpd.node.query_envelopes"
	SpanNodeStageTransaction      = "xmtpd.node.stage_transaction"
	SpanNodeWaitGatewayPublish    = "xmtpd.node.wait_gateway_publish"

	// Publish worker spans - async envelope processing
	SpanPublishWorkerProcess       = "xmtpd.publish_worker.process"
	SpanPublishWorkerCalculateFees = "xmtpd.publish_worker.calculate_fees"
	SpanPublishWorkerSignEnvelope  = "xmtpd.publish_worker.sign_envelope"
	SpanPublishWorkerInsertGateway = "xmtpd.publish_worker.insert_gateway"
	SpanPublishWorkerDeleteStaged  = "xmtpd.publish_worker.delete_staged"

	// Subscribe worker spans - client streaming
	SpanSubscribeWorkerDispatch       = "xmtpd.subscribe_worker.dispatch"
	SpanSubscribeWorkerBroadcast      = "xmtpd.subscribe_worker.broadcast"
	SpanSubscribeWorkerListenerClosed = "xmtpd.subscribe_worker.listener_closed"

	// DB subscription spans - polling mechanism
	SpanDBSubscriptionPoll = "xmtpd.db_subscription.poll"

	// Sync worker spans - cross-node replication (receiving side)
	SpanSyncConnectToNode    = "xmtpd.sync.connect_to_node"
	SpanSyncSetupStream      = "xmtpd.sync.setup_stream"
	SpanSyncSubscribe        = "xmtpd.sync.subscribe_envelopes"
	SpanSyncReceiveBatch     = "xmtpd.sync.receive_batch"
	SpanSyncValidateEnvelope = "xmtpd.sync.validate_envelope"

	// Envelope sink spans - storing synced envelopes
	SpanSyncWorkerStoreEnvelope         = "xmtpd.sync_worker.store_envelope"
	SpanSyncWorkerVerifyFees            = "xmtpd.sync_worker.verify_fees"
	SpanSyncWorkerInsertGateway         = "xmtpd.sync_worker.insert_gateway"
	SpanSyncWorkerStoreReservedEnvelope = "xmtpd.sync_worker.store_reserved_envelope"
	SpanSyncWorkerStorePayerReport      = "xmtpd.sync_worker.store_payer_report"
	SpanSyncWorkerStoreAttestation      = "xmtpd.sync_worker.store_attestation"

	// Database spans
	SpanDBQuery = "xmtpd.db.query"
)

Span operation names follow the pattern: xmtpd.{component}.{operation} This provides clear hierarchy in Datadog APM service maps.

View Source
const (
	TagTrigger          = "trigger"
	TagStagedID         = "staged_id"
	TagOriginatorNode   = "originator_node"
	TagSourceNode       = "source_node"
	TagTargetNode       = "target_node"
	TagTopic            = "topic"
	TagSequenceID       = "sequence_id"
	TagNumEnvelopes     = "num_envelopes"
	TagNumResults       = "num_results"
	TagZeroResults      = "zero_results"
	TagNotificationMiss = "notification_miss"
	TagTraceLinked      = "trace_linked"
	TagOutOfOrder       = "out_of_order"

	// Batch processing tags
	TagBatchSize       = "batch_size"
	TagEnvelopesParsed = "envelopes_parsed"
	TagParseErrors     = "parse_errors"
	TagValidCount      = "valid_count"
	TagInvalidCount    = "invalid_count"

	// Subscribe worker tags
	TagReason             = "reason"
	TagDroppedEnvelopes   = "dropped_envelopes"
	TagWrongOriginator    = "wrong_originator"
	TagExpectedSequenceID = "expected_sequence_id"
	TagGapDetected        = "gap_detected"

	// Sync worker tags
	TagMigrationMode     = "migration_mode"
	TagConnectionSuccess = "connection_success"
	TagTargetAddress     = "target_address"

	// Database tags
	TagDBRole         = "db.role"
	TagDBStatement    = "db.statement"
	TagDBRowsAffected = "db.rows_affected"
	TagDBSystem       = "db.system"
	TagDBService      = "db.service"
)

Span tag keys - use these for consistency

View Source
const (
	TriggerNotification  = "notification"
	TriggerTimerFallback = "timer_fallback"
)

Trigger values for the trigger tag

View Source
const (
	DBRoleReader = "reader"
	DBRoleWriter = "writer"
)

DB role values

View Source
const EnvAPMSampleRate = "APM_SAMPLE_RATE"

EnvAPMSampleRate is the environment variable for configuring APM sample rate.

Variables

View Source
var (
	ChildOf         = tracer.ChildOf
	WithError       = tracer.WithError
	ContextWithSpan = tracer.ContextWithSpan
)

Re-export tracer types and options that don't need gating.

Functions

func GoPanicWrap

func GoPanicWrap(
	ctx context.Context,
	wg *sync.WaitGroup,
	name string,
	body func(context.Context),
	labels ...string,
)

GoPanicWrap extends PanicWrap by running the body in a goroutine and synchronizing the goroutine exit with the WaitGroup. The body must respect cancellation of the Context.

func IsEnabled added in v1.3.0

func IsEnabled() bool

IsEnabled returns whether APM tracing is currently enabled. Use this to conditionally skip expensive span creation.

func Link(span Span, l *zap.Logger) *zap.Logger

Link connects a logger to a particular trace and span. DD APM should provide some additional functionality based on that. Returns the logger unchanged when tracing is disabled.

func PanicWrap

func PanicWrap(ctx context.Context, name string, body func(context.Context))

PanicWrap executes the body guarding for panics. If panic happens it emits a span with the error attached. This should trigger DD APM's Error Tracking to record the error.

func SetEnabledForTesting added in v1.3.0

func SetEnabledForTesting(enabled bool) func()

SetEnabledForTesting overrides the apmEnabled flag for use in tests. Returns a cleanup function that restores the previous state. This must only be called from test code.

func SpanResource

func SpanResource(span Span, resource string)

func SpanTag

func SpanTag(span Span, key string, value any)

SpanTag sets a tag on a span with production safety limits. String values longer than MaxTagValueLength (in runes) are truncated. Uses rune-based truncation to safely handle multi-byte UTF-8 characters.

func SpanType

func SpanType(span Span, typ string)

func Start

func Start(version string, l *zap.Logger)

Start boots the datadog tracer, run this once early in the startup sequence. Tracing is gated by XMTPD_TRACING_ENABLE at the application config level; callers should only invoke Start() when the feature flag is on.

Configuration via environment variables:

  • ENV: Environment name (default: "test")
  • DD_AGENT_HOST: Datadog agent host (standard DD env var, default: "localhost")
  • DD_TRACE_AGENT_PORT: Datadog agent port (standard DD env var, default: "8126")
  • APM_SAMPLE_RATE: Sample rate 0.0-1.0 (default: 1.0 dev/test, 0.1 prod)

func Stop

func Stop()

Stop shuts down the datadog tracer, defer this right after Start().

func Wrap

func Wrap(
	ctx context.Context,
	logger *zap.Logger,
	operation string,
	action func(context.Context, *zap.Logger, Span) error,
) error

Wrap executes action in the context of a span. Tags the span with the error if action returns one. When tracing is disabled, the action runs without span overhead.

Types

type Span

type Span = ddtrace.Span

Span is the tracer span type (ddtrace.Span interface).

func StartSpan

func StartSpan(operationName string, opts ...ddtrace.StartSpanOption) Span

StartSpan creates a new root span. Returns a no-op span when tracing is disabled.

func StartSpanFromContext

func StartSpanFromContext(
	ctx context.Context,
	operationName string,
	opts ...ddtrace.StartSpanOption,
) (Span, context.Context)

StartSpanFromContext creates a span as a child of the context's active span. Returns a no-op span and the unchanged context when tracing is disabled.

func StartSpanWithParent added in v1.3.0

func StartSpanWithParent(operationName string, parentCtx ddtrace.SpanContext) Span

StartSpanWithParent creates a new span, optionally linked to a parent context. If parentCtx is nil, creates a new root span. This is useful for async workflows where the parent context may or may not be available. Returns a no-op span when tracing is disabled.

type TraceContextStore added in v1.3.0

type TraceContextStore struct {
	// contains filtered or unexported fields
}

TraceContextStore provides async context propagation by mapping staged envelope IDs to their originating span contexts. This allows the publish_worker to create child spans linked to the original staging request, enabling end-to-end distributed tracing across async boundaries.

Includes TTL-based cleanup to prevent memory leaks from orphaned entries.

func NewTraceContextStore added in v1.3.0

func NewTraceContextStore() *TraceContextStore

NewTraceContextStore creates a new store for async trace context propagation.

func (*TraceContextStore) Retrieve added in v1.3.0

func (s *TraceContextStore) Retrieve(stagedID int64) ddtrace.SpanContext

Retrieve gets and removes the span context for a staged envelope ID. Returns nil if no context was stored for this ID or if it expired.

func (*TraceContextStore) Size added in v1.3.0

func (s *TraceContextStore) Size() int

Size returns the current number of stored contexts (for monitoring).

func (*TraceContextStore) Store added in v1.3.0

func (s *TraceContextStore) Store(stagedID int64, span Span)

Store saves the span context for a staged envelope ID. Call this after staging an envelope to enable trace linking. Performs lazy cleanup of expired entries to prevent memory leaks. Drops new entries if store is at capacity (production safety). No-ops when tracing is disabled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL