Documentation
¶
Overview ¶
Package streaming provides a reusable, generic Canton ledger streaming client.
It wraps UpdateService.GetUpdates with automatic reconnection, exponential backoff, and auth-token invalidation on 401.
Index ¶
- type Batch
- type Client
- type FieldValue
- func MakeNoneField() FieldValue
- func MakeNumericField(s string) FieldValue
- func MakePartyField(s string) FieldValue
- func MakeRecordField(fields map[string]FieldValue) FieldValue
- func MakeSomePartyField(party string) FieldValue
- func MakeSomeRecordField(fields map[string]FieldValue) FieldValue
- func MakeTextField(s string) FieldValue
- func MakeTextMapField(entries map[string]string) FieldValue
- func MakeTimestampField(t time.Time) FieldValue
- type LedgerEvent
- func (e *LedgerEvent) DoublyNestedPartyField(outer, middle, field string) string
- func (e *LedgerEvent) DoublyNestedTextField(outer, middle, field string) string
- func (e *LedgerEvent) IsNone(name string) bool
- func (e *LedgerEvent) NestedNumericField(record, field string) string
- func (e *LedgerEvent) NestedPartyField(record, field string) string
- func (e *LedgerEvent) NestedTextField(record, field string) string
- func (e *LedgerEvent) NumericField(name string) string
- func (e *LedgerEvent) OptionalMetaLookup(metaField, key string) string
- func (e *LedgerEvent) OptionalPartyField(name string) string
- func (e *LedgerEvent) OptionalTextField(name string) string
- func (e *LedgerEvent) PartyField(name string) string
- func (e *LedgerEvent) TextField(name string) string
- func (e *LedgerEvent) TimestampField(name string) time.Time
- type LedgerTransaction
- type Option
- type Stream
- type Streamer
- type SubscribeRequest
- type TemplateID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
Batch carries decoded items from one LedgerTransaction, preserving the transaction boundary for atomic offset writes.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps UpdateService.GetUpdates with automatic reconnection and auth handling. It mirrors the streaming pattern established in pkg/cantonsdk/bridge/client.go.
func New ¶
New creates a new streaming Client for the given ledger.
By default (no WithParty option) the client uses FiltersForAnyParty, subscribing to all contracts on the participant regardless of stakeholder membership. This requires the Canton auth token to carry CanReadAsAnyParty rights.
Pass WithParty to scope the stream to a specific party's stakeholder view. WithParty must not be called with an empty string.
func (*Client) Subscribe ¶
func (c *Client) Subscribe( ctx context.Context, req SubscribeRequest, lastOffset *int64, ) <-chan *LedgerTransaction
Subscribe opens a live stream against the Canton ledger and returns a read-only channel of decoded transactions. It reconnects automatically with exponential backoff (5s → 60s) on transient errors, and invalidates the auth token on 401/403.
lastOffset is updated atomically after each received transaction so that reconnects resume from the last safely received point. The caller is responsible for persisting lastOffset to the database (the processor does this atomically with event writes).
The returned channel is closed when ctx is canceled or a terminal error occurs (io.EOF, context cancellation).
type FieldValue ¶
type FieldValue struct {
// contains filtered or unexported fields
}
FieldValue is an opaque DAML value used to construct LedgerEvents. Use the Make* functions below to create values of each DAML type. This keeps callers free of any direct lapiv2 dependency.
func MakeNoneField ¶
func MakeNoneField() FieldValue
MakeNoneField returns a DAML Optional None value.
func MakeNumericField ¶
func MakeNumericField(s string) FieldValue
MakeNumericField wraps a decimal string as a DAML Numeric value.
func MakePartyField ¶
func MakePartyField(s string) FieldValue
MakePartyField wraps a party ID string as a DAML Party value.
func MakeRecordField ¶
func MakeRecordField(fields map[string]FieldValue) FieldValue
MakeRecordField builds a DAML Record value from a map of sub-fields.
func MakeSomePartyField ¶
func MakeSomePartyField(party string) FieldValue
MakeSomePartyField returns a DAML Optional(Party) Some value.
func MakeSomeRecordField ¶
func MakeSomeRecordField(fields map[string]FieldValue) FieldValue
MakeSomeRecordField wraps a record in a DAML Optional(Record) Some value.
func MakeTextField ¶
func MakeTextField(s string) FieldValue
MakeTextField wraps a Go string as a DAML Text value.
func MakeTextMapField ¶
func MakeTextMapField(entries map[string]string) FieldValue
MakeTextMapField builds a DAML TextMap value from a Go string map.
func MakeTimestampField ¶
func MakeTimestampField(t time.Time) FieldValue
MakeTimestampField wraps a time.Time as a DAML Timestamp value.
type LedgerEvent ¶
type LedgerEvent struct {
ContractID string
PackageID string
ModuleName string
TemplateName string
// IsCreated is true for contract create events, false for archive events.
IsCreated bool
// contains filtered or unexported fields
}
LedgerEvent is a single created or archived contract event within a transaction. All DAML field access goes through typed accessor methods — no lapiv2 types are exposed.
func NewLedgerEvent ¶
func NewLedgerEvent(contractID, packageID, moduleName, templateName string, isCreated bool, fields map[string]FieldValue) *LedgerEvent
NewLedgerEvent constructs a LedgerEvent with pre-decoded fields. Used by tests that need to build events without going through the proto decode path. Accepts FieldValue values produced by the Make* constructor functions so that callers have no direct dependency on lapiv2.
func (*LedgerEvent) DoublyNestedPartyField ¶
func (e *LedgerEvent) DoublyNestedPartyField(outer, middle, field string) string
DoublyNestedPartyField accesses a Party field two records deep. Example: event.DoublyNestedPartyField("transfer", "instrumentId", "admin") Returns "" when any of the path segments is absent or not a Record.
func (*LedgerEvent) DoublyNestedTextField ¶
func (e *LedgerEvent) DoublyNestedTextField(outer, middle, field string) string
DoublyNestedTextField accesses a Text field two records deep. Example: event.DoublyNestedTextField("transfer", "instrumentId", "id") Returns "" when any of the path segments is absent or not a Record.
func (*LedgerEvent) IsNone ¶
func (e *LedgerEvent) IsNone(name string) bool
IsNone returns true if the named DAML Optional field holds None.
func (*LedgerEvent) NestedNumericField ¶
func (e *LedgerEvent) NestedNumericField(record, field string) string
NestedNumericField accesses a Numeric sub-field inside a named DAML Record field. Example: event.NestedNumericField("transfer", "amount") Returns "0" when the outer field is absent, the inner field is missing, or the inner field is not a Numeric.
func (*LedgerEvent) NestedPartyField ¶
func (e *LedgerEvent) NestedPartyField(record, field string) string
NestedPartyField accesses a Party sub-field inside a named DAML Record field. Example: event.NestedPartyField("instrumentId", "admin") Returns "" when the outer field is absent or not a Record.
func (*LedgerEvent) NestedTextField ¶
func (e *LedgerEvent) NestedTextField(record, field string) string
NestedTextField accesses a Text sub-field inside a named DAML Record field. Example: event.NestedTextField("instrumentId", "id") Returns "" when the outer field is absent or not a Record.
func (*LedgerEvent) NumericField ¶
func (e *LedgerEvent) NumericField(name string) string
NumericField returns the named DAML Numeric field as a decimal string (e.g. "1.5"). Returns "0" when the field is absent or not of type Numeric.
func (*LedgerEvent) OptionalMetaLookup ¶
func (e *LedgerEvent) OptionalMetaLookup(metaField, key string) string
OptionalMetaLookup looks up a string key within an Optional Metadata field. Metadata is encoded as Optional(Record{values: Map Text Text}). Returns "" when the Optional is None, the key is absent, or the field is absent.
func (*LedgerEvent) OptionalPartyField ¶
func (e *LedgerEvent) OptionalPartyField(name string) string
OptionalPartyField returns the inner Party value of a DAML Optional Party field. Returns "" for None or when the field is absent.
func (*LedgerEvent) OptionalTextField ¶
func (e *LedgerEvent) OptionalTextField(name string) string
OptionalTextField returns the inner Text value of a DAML Optional Text field. Returns "" for None or when the field is absent.
func (*LedgerEvent) PartyField ¶
func (e *LedgerEvent) PartyField(name string) string
PartyField returns the named DAML Party field as a string. Returns "" when the field is absent or not of type Party.
func (*LedgerEvent) TextField ¶
func (e *LedgerEvent) TextField(name string) string
TextField returns the named DAML Text field as a Go string. Returns "" when the field is absent or not of type Text.
func (*LedgerEvent) TimestampField ¶
func (e *LedgerEvent) TimestampField(name string) time.Time
TimestampField returns the named DAML Time field as a Go time.Time. Returns zero time when the field is absent or not of type Timestamp.
type LedgerTransaction ¶
type LedgerTransaction struct {
UpdateID string
Offset int64
EffectiveTime time.Time
Events []*LedgerEvent
}
LedgerTransaction is a decoded transaction received from the Canton GetUpdates stream.
type Option ¶
type Option func(*settings)
Option configures a streaming Client.
func WithLogger ¶
WithLogger sets a custom logger on the streaming Client.
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream[T] wraps a Streamer and applies a per-event decode function. Use when subscribing to a single homogeneous template.
func NewStream ¶
func NewStream[T any](streamer Streamer, decode func(*LedgerTransaction, *LedgerEvent) (T, bool)) *Stream[T]
NewStream creates a Stream[T] that decodes events using the provided function.
func (*Stream[T]) Subscribe ¶
func (s *Stream[T]) Subscribe(ctx context.Context, req SubscribeRequest, lastOffset *int64) <-chan *Batch[T]
Subscribe passes lastOffset to streamer.Subscribe, iterates each tx's events through decode, and emits *Batch[T] for every tx. Items may be empty — offset must still advance for no-op transactions.
type Streamer ¶
type Streamer interface {
Subscribe(ctx context.Context, req SubscribeRequest, lastOffset *int64) <-chan *LedgerTransaction
}
Streamer is the interface for opening a live Canton ledger stream. *Client satisfies this interface.
type SubscribeRequest ¶
type SubscribeRequest struct {
// FromOffset is the exclusive start offset. Use 0 to start from the beginning.
FromOffset int64
// TemplateIDs lists the DAML templates to subscribe to.
TemplateIDs []TemplateID
}
SubscribeRequest configures which templates to stream and from which ledger offset.
type TemplateID ¶
TemplateID identifies a DAML template by its package, module, and entity name. It is the streaming package's own type — callers do not import lapiv2 directly.