streaming

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package streaming provides a reusable, generic Canton ledger streaming client.

It wraps UpdateService.GetUpdates with automatic reconnection, exponential backoff, and auth-token invalidation on 401.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch[T any] struct {
	Offset   int64
	UpdateID string
	Items    []T
}

Batch carries decoded items from one LedgerTransaction, preserving the transaction boundary for atomic offset writes.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps UpdateService.GetUpdates with automatic reconnection and auth handling. It mirrors the streaming pattern established in pkg/cantonsdk/bridge/client.go.

func New

func New(l ledger.Ledger, opts ...Option) (*Client, error)

New creates a new streaming Client for the given ledger.

By default (no WithParty option) the client uses FiltersForAnyParty, subscribing to all contracts on the participant regardless of stakeholder membership. This requires the Canton auth token to carry CanReadAsAnyParty rights.

Pass WithParty to scope the stream to a specific party's stakeholder view. WithParty must not be called with an empty string.

func (*Client) Subscribe

func (c *Client) Subscribe(
	ctx context.Context,
	req SubscribeRequest,
	lastOffset *int64,
) <-chan *LedgerTransaction

Subscribe opens a live stream against the Canton ledger and returns a read-only channel of decoded transactions. It reconnects automatically with exponential backoff (5s → 60s) on transient errors, and invalidates the auth token on 401/403.

lastOffset is updated atomically after each received transaction so that reconnects resume from the last safely received point. The caller is responsible for persisting lastOffset to the database (the processor does this atomically with event writes).

The returned channel is closed when ctx is canceled or a terminal error occurs (io.EOF, context cancellation).

type FieldValue

type FieldValue struct {
	// contains filtered or unexported fields
}

FieldValue is an opaque DAML value used to construct LedgerEvents. Use the Make* functions below to create values of each DAML type. This keeps callers free of any direct lapiv2 dependency.

func MakeNoneField

func MakeNoneField() FieldValue

MakeNoneField returns a DAML Optional None value.

func MakeNumericField

func MakeNumericField(s string) FieldValue

MakeNumericField wraps a decimal string as a DAML Numeric value.

func MakePartyField

func MakePartyField(s string) FieldValue

MakePartyField wraps a party ID string as a DAML Party value.

func MakeRecordField

func MakeRecordField(fields map[string]FieldValue) FieldValue

MakeRecordField builds a DAML Record value from a map of sub-fields.

func MakeSomePartyField

func MakeSomePartyField(party string) FieldValue

MakeSomePartyField returns a DAML Optional(Party) Some value.

func MakeSomeRecordField

func MakeSomeRecordField(fields map[string]FieldValue) FieldValue

MakeSomeRecordField wraps a record in a DAML Optional(Record) Some value.

func MakeTextField

func MakeTextField(s string) FieldValue

MakeTextField wraps a Go string as a DAML Text value.

func MakeTextMapField

func MakeTextMapField(entries map[string]string) FieldValue

MakeTextMapField builds a DAML TextMap value from a Go string map.

func MakeTimestampField

func MakeTimestampField(t time.Time) FieldValue

MakeTimestampField wraps a time.Time as a DAML Timestamp value.

type LedgerEvent

type LedgerEvent struct {
	ContractID   string
	PackageID    string
	ModuleName   string
	TemplateName string

	// IsCreated is true for contract create events, false for archive events.
	IsCreated bool
	// contains filtered or unexported fields
}

LedgerEvent is a single created or archived contract event within a transaction. All DAML field access goes through typed accessor methods — no lapiv2 types are exposed.

func NewLedgerEvent

func NewLedgerEvent(contractID, packageID, moduleName, templateName string, isCreated bool, fields map[string]FieldValue) *LedgerEvent

NewLedgerEvent constructs a LedgerEvent with pre-decoded fields. Used by tests that need to build events without going through the proto decode path. Accepts FieldValue values produced by the Make* constructor functions so that callers have no direct dependency on lapiv2.

func (*LedgerEvent) DoublyNestedPartyField

func (e *LedgerEvent) DoublyNestedPartyField(outer, middle, field string) string

DoublyNestedPartyField accesses a Party field two records deep. Example: event.DoublyNestedPartyField("transfer", "instrumentId", "admin") Returns "" when any of the path segments is absent or not a Record.

func (*LedgerEvent) DoublyNestedTextField

func (e *LedgerEvent) DoublyNestedTextField(outer, middle, field string) string

DoublyNestedTextField accesses a Text field two records deep. Example: event.DoublyNestedTextField("transfer", "instrumentId", "id") Returns "" when any of the path segments is absent or not a Record.

func (*LedgerEvent) IsNone

func (e *LedgerEvent) IsNone(name string) bool

IsNone returns true if the named DAML Optional field holds None.

func (*LedgerEvent) NestedNumericField

func (e *LedgerEvent) NestedNumericField(record, field string) string

NestedNumericField accesses a Numeric sub-field inside a named DAML Record field. Example: event.NestedNumericField("transfer", "amount") Returns "0" when the outer field is absent, the inner field is missing, or the inner field is not a Numeric.

func (*LedgerEvent) NestedPartyField

func (e *LedgerEvent) NestedPartyField(record, field string) string

NestedPartyField accesses a Party sub-field inside a named DAML Record field. Example: event.NestedPartyField("instrumentId", "admin") Returns "" when the outer field is absent or not a Record.

func (*LedgerEvent) NestedTextField

func (e *LedgerEvent) NestedTextField(record, field string) string

NestedTextField accesses a Text sub-field inside a named DAML Record field. Example: event.NestedTextField("instrumentId", "id") Returns "" when the outer field is absent or not a Record.

func (*LedgerEvent) NumericField

func (e *LedgerEvent) NumericField(name string) string

NumericField returns the named DAML Numeric field as a decimal string (e.g. "1.5"). Returns "0" when the field is absent or not of type Numeric.

func (*LedgerEvent) OptionalMetaLookup

func (e *LedgerEvent) OptionalMetaLookup(metaField, key string) string

OptionalMetaLookup looks up a string key within an Optional Metadata field. Metadata is encoded as Optional(Record{values: Map Text Text}). Returns "" when the Optional is None, the key is absent, or the field is absent.

func (*LedgerEvent) OptionalPartyField

func (e *LedgerEvent) OptionalPartyField(name string) string

OptionalPartyField returns the inner Party value of a DAML Optional Party field. Returns "" for None or when the field is absent.

func (*LedgerEvent) OptionalTextField

func (e *LedgerEvent) OptionalTextField(name string) string

OptionalTextField returns the inner Text value of a DAML Optional Text field. Returns "" for None or when the field is absent.

func (*LedgerEvent) PartyField

func (e *LedgerEvent) PartyField(name string) string

PartyField returns the named DAML Party field as a string. Returns "" when the field is absent or not of type Party.

func (*LedgerEvent) TextField

func (e *LedgerEvent) TextField(name string) string

TextField returns the named DAML Text field as a Go string. Returns "" when the field is absent or not of type Text.

func (*LedgerEvent) TimestampField

func (e *LedgerEvent) TimestampField(name string) time.Time

TimestampField returns the named DAML Time field as a Go time.Time. Returns zero time when the field is absent or not of type Timestamp.

type LedgerTransaction

type LedgerTransaction struct {
	UpdateID      string
	Offset        int64
	EffectiveTime time.Time
	Events        []*LedgerEvent
}

LedgerTransaction is a decoded transaction received from the Canton GetUpdates stream.

type Option

type Option func(*settings)

Option configures a streaming Client.

func WithLogger

func WithLogger(l *zap.Logger) Option

WithLogger sets a custom logger on the streaming Client.

func WithParty

func WithParty(party string) Option

WithParty configures the client to use FiltersByParty, scoping the stream to contracts where the given party is a stakeholder. When omitted the client defaults to FiltersForAnyParty (requires CanReadAsAnyParty rights).

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream[T] wraps a Streamer and applies a per-event decode function. Use when subscribing to a single homogeneous template.

func NewStream

func NewStream[T any](streamer Streamer, decode func(*LedgerTransaction, *LedgerEvent) (T, bool)) *Stream[T]

NewStream creates a Stream[T] that decodes events using the provided function.

func (*Stream[T]) Subscribe

func (s *Stream[T]) Subscribe(ctx context.Context, req SubscribeRequest, lastOffset *int64) <-chan *Batch[T]

Subscribe passes lastOffset to streamer.Subscribe, iterates each tx's events through decode, and emits *Batch[T] for every tx. Items may be empty — offset must still advance for no-op transactions.

type Streamer

type Streamer interface {
	Subscribe(ctx context.Context, req SubscribeRequest, lastOffset *int64) <-chan *LedgerTransaction
}

Streamer is the interface for opening a live Canton ledger stream. *Client satisfies this interface.

type SubscribeRequest

type SubscribeRequest struct {
	// FromOffset is the exclusive start offset. Use 0 to start from the beginning.
	FromOffset int64

	// TemplateIDs lists the DAML templates to subscribe to.
	TemplateIDs []TemplateID
}

SubscribeRequest configures which templates to stream and from which ledger offset.

type TemplateID

type TemplateID struct {
	PackageID  string
	ModuleName string
	EntityName string
}

TemplateID identifies a DAML template by its package, module, and entity name. It is the streaming package's own type — callers do not import lapiv2 directly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL