Documentation
¶
Index ¶
- Constants
- func ContextWithPathway(ctx context.Context, p Pathway) context.Context
- type Backlog
- type CommitOffset
- type Pathway
- type Processor
- func (p *Processor) Flush()
- func (p *Processor) SetCheckpoint(ctx context.Context, edgeTags ...string) context.Context
- func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options.CheckpointParams, edgeTags ...string) context.Context
- func (p *Processor) Start()
- func (p *Processor) Stop()
- func (p *Processor) TrackKafkaCommitOffset(group string, topic string, partition int32, offset int64)
- func (p *Processor) TrackKafkaHighWatermarkOffset(_ string, topic string, partition int32, offset int64)
- func (p *Processor) TrackKafkaProduceOffset(topic string, partition int32, offset int64)
- func (p *Processor) TrackTransaction(transactionID, checkpointName string)
- type ProduceOffset
- type StatsBucket
- type StatsPayload
- type StatsPoint
- type TimestampType
Constants ¶
const (
// PropagationKeyBase64 is the key to use to propagate the pathway between services.
PropagationKeyBase64 = "dd-pathway-ctx-base64"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Backlog ¶
type Backlog struct {
// Tags that identify the backlog
Tags []string
// Value of the backlog
Value int64
}
Backlog represents the size of a queue that hasn't been yet read by the consumer.
type CommitOffset ¶
func (*CommitOffset) DecodeMsg ¶
func (z *CommitOffset) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*CommitOffset) EncodeMsg ¶
func (z *CommitOffset) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*CommitOffset) Msgsize ¶
func (z *CommitOffset) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type Pathway ¶
type Pathway struct {
// contains filtered or unexported fields
}
Pathway is used to monitor how payloads are sent across different services. An example Pathway would be: service A -- edge 1 --> service B -- edge 2 --> service C So it's a branch of services (we also call them "nodes") connected via edges. As the payload is sent around, we save the start time (start of service A), and the start time of the previous service. This allows us to measure the latency of each edge, as well as the latency from origin of any service.
func DecodeBase64 ¶
DecodeBase64 decodes a pathway context from a string using base64 encoding.
func Merge ¶
Merge merges multiple pathways into one. The current implementation samples one resulting Pathway. A future implementation could be more clever and actually merge the Pathways.
func PathwayFromContext ¶
PathwayFromContext returns the pathway contained in the given context, and whether a pathway is found in ctx.
func (Pathway) EncodeBase64 ¶
EncodeBase64 encodes a pathway context into a string using base64 encoding.
func (Pathway) PathwayStart ¶
PathwayStart returns the start timestamp of the pathway
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
func NewProcessor ¶
func (*Processor) Flush ¶
func (p *Processor) Flush()
Flush triggers a flush and waits for it to complete.
func (*Processor) SetCheckpoint ¶
func (*Processor) SetCheckpointWithParams ¶
func (*Processor) TrackKafkaCommitOffset ¶
func (*Processor) TrackKafkaHighWatermarkOffset ¶
func (p *Processor) TrackKafkaHighWatermarkOffset(_ string, topic string, partition int32, offset int64)
TrackKafkaHighWatermarkOffset should be used in the consumer, to track the high watermark offsets of each partition. The first argument is the Kafka cluster ID, and will be used later.
func (*Processor) TrackKafkaProduceOffset ¶
func (*Processor) TrackTransaction ¶ added in v2.7.0
TrackTransaction records a manual transaction checkpoint observation. Use this to track when a specific transaction ID is seen at a named checkpoint in a data pipeline. transactionID identifies the transaction (e.g. a message ID or correlation ID). checkpointName is a stable label for the processing stage (e.g. "ingested", "processed").
type ProduceOffset ¶
func (*ProduceOffset) DecodeMsg ¶
func (z *ProduceOffset) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (ProduceOffset) EncodeMsg ¶
func (z ProduceOffset) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (ProduceOffset) Msgsize ¶
func (z ProduceOffset) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type StatsBucket ¶
type StatsBucket struct {
// Start specifies the beginning of this bucket in unix nanoseconds.
Start uint64
// Duration specifies the duration of this bucket in nanoseconds.
Duration uint64
// Stats contains a set of statistics computed for the duration of this bucket.
Stats []StatsPoint
// Backlogs store information used to compute queue backlog
Backlogs []Backlog
// Transactions is a packed binary blob of transaction records for this bucket.
// Each record is: [checkpointId uint8][timestamp int64 big-endian][idLen uint8][id bytes].
Transactions []byte
// TransactionCheckpointIds is a packed binary blob mapping checkpoint IDs to names.
// Each entry is: [id uint8][nameLen uint8][name bytes].
// This custom binary encoding (rather than a msgpack array of structs) matches the
// Java tracer's wire format; the backend expects this exact layout.
// The name uses Ids (not IDs) to match the msgpack wire key expected by the backend.
TransactionCheckpointIds []byte //nolint:revive
}
StatsBucket specifies a set of stats computed over a duration.
func (*StatsBucket) DecodeMsg ¶
func (z *StatsBucket) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*StatsBucket) EncodeMsg ¶
func (z *StatsBucket) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*StatsBucket) Msgsize ¶
func (z *StatsBucket) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type StatsPayload ¶
type StatsPayload struct {
// Env specifies the env. of the application, as defined by the user.
Env string
// Service is the service of the application
Service string
// Stats holds all stats buckets computed within this payload.
Stats []StatsBucket
// TracerVersion is the version of the tracer
TracerVersion string
// Lang is the language of the tracer
Lang string
// Version is the version of the service
Version string
// ProcessTags contains the process level tags.
ProcessTags []string
// ProductMask is a bitmask of active Datadog products. Bit 0 (1) = APM, Bit 1 (2) = DSM.
ProductMask uint64
}
StatsPayload stores client computed stats.
func (*StatsPayload) DecodeMsg ¶
func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*StatsPayload) EncodeMsg ¶
func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*StatsPayload) Msgsize ¶
func (z *StatsPayload) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type StatsPoint ¶
type StatsPoint struct {
// These fields indicate the properties under which the stats were aggregated.
EdgeTags []string
Hash uint64
ParentHash uint64
// These fields specify the stats for the above aggregation.
// those are distributions of latency in seconds.
PathwayLatency []byte
EdgeLatency []byte
PayloadSize []byte
TimestampType TimestampType
}
StatsPoint contains a set of statistics grouped under various aggregation keys.
func (*StatsPoint) DecodeMsg ¶
func (z *StatsPoint) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*StatsPoint) EncodeMsg ¶
func (z *StatsPoint) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*StatsPoint) Msgsize ¶
func (z *StatsPoint) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type TimestampType ¶
type TimestampType string
TimestampType can be either current or origin.
const ( // TimestampTypeCurrent is for when the recorded timestamp is based on the // timestamp of the current StatsPoint. TimestampTypeCurrent TimestampType = "current" // TimestampTypeOrigin is for when the recorded timestamp is based on the // time that the first StatsPoint in the pathway is sent out. TimestampTypeOrigin TimestampType = "origin" )
func (*TimestampType) DecodeMsg ¶
func (z *TimestampType) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (TimestampType) EncodeMsg ¶
func (z TimestampType) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (TimestampType) Msgsize ¶
func (z TimestampType) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message