txn

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package txn provides the transactional surface (Begin / Commit / Rollback) layered over an lpg.Graph and a wal.Writer.

A transaction buffers mutations in a per-Tx slice. Commit appends each mutation as a WAL frame, then a single OpCommit marker frame, fsyncs the WAL once, and only then applies the mutations to the in-memory graph — so a process crash between Commit's WAL sync and the in-memory apply is recoverable by replaying the WAL into a fresh graph.

Atomicity

Every store writes each op as a v3 (OpRecordV3) frame carrying a per-Store transaction sequence, followed by an OpCommit marker frame for the same sequence; recovery buffers a transaction's ops and applies them only on reading the durable marker. A crash that tears the batch at any point therefore recovers all of the transaction or none of it — never a partial node or edge. This is the Atomicity guarantee (see docs/acid-audit.md, gap F1).

The legacy v1 (untagged, fmt.Sprintf-based) frame format is no longer produced; the v1 constructor has been removed and recovery rejects any v1 frame found on disk (see store/recovery.ErrUnsupportedRecordVersion).

Single-writer is enforced by a per-store mutex acquired in Begin and released in Commit or Rollback; reads on the underlying graph remain lock-free in the lpg / adjlist contracts.

Constructor matrix

The package exposes two constructors that differ only in whether edge weights are made durable:

Index

Examples

Constants

View Source
const (
	// OpRecordV1 is the reserved logical version of the legacy untagged
	// record format. This format is no longer produced — the v1 store
	// constructor and its encoder were removed — and any v1 frame found
	// on disk is rejected on read by [store/recovery.Decode] with
	// [store/recovery.ErrUnsupportedRecordVersion]. The constant is
	// retained (value 0) as a RESERVED sentinel so the rejection path and
	// its tests can name the version they refuse; it is never written to
	// disk and must not be reused for a new record version.
	OpRecordV1 uint8 = 0
	// OpRecordV2 is the magic byte that marks the start of a v2-tagged
	// op record. See the package doc above for the rationale.
	OpRecordV2 uint8 = 0xFE
	// OpRecordV3 is the magic byte that marks the start of a v3-tagged
	// op record. A v3 payload is laid out as:
	//
	//	uint8  version (OpRecordV3 = 0xFD)
	//	uint8  kind    (an [OpKind], or [OpCommit] for the commit marker)
	//	uint64 txnSeq  little-endian per-Store transaction sequence
	//	...    the same body bytes a v2 record of this kind carries...
	//
	// v3 adds the txnSeq word and the [OpCommit] marker so a multi-op
	// transaction is recovered atomically: recovery buffers a v3
	// transaction's ops and applies them only on reading the matching
	// OpCommit. The body after the txnSeq word is byte-identical to the
	// v2 body for the same kind, so the recovery decoder reuses the v2
	// body walk. 0xFD is the value reserved for OpRecordV3 in the
	// disambiguation scheme documented above; a v1/v2/v3 reader peeks the
	// first byte to select the decoder.
	OpRecordV3 uint8 = 0xFD
)

Op-record version markers. The marker is a single byte written at offset zero of every v2 WAL payload. v1 records have no marker — their first byte is the OpKind value (always 1..3 today, with room to grow into the low region of the byte space). We pick a v2 marker far outside the OpKind range so a v1-vs-v2 reader can disambiguate by peeking the first byte: any payload that starts with OpRecordV2 is necessarily a v2 frame because no legitimate OpKind value reaches 0xFE.

0xFE is chosen specifically because it leaves 0x00..0x0F free for future OpKind growth, is not a printable ASCII character (so hex-dumped logs are visually unambiguous), and is one less than the universally-recognised "all bits set" sentinel 0xFF — leaving room for at least one further version bump (e.g. OpRecordV3 = 0xFD) in the same disambiguation scheme.

Variables

View Source
var ErrCodecDecode = errors.New("txn/codec: malformed payload")

ErrCodecDecode is the sentinel returned by built-in codecs whenever the input buffer is too short, malformed, or violates a length invariant (e.g. negative varint, oversize length prefix).

View Source
var ErrCommittedNotApplied = errors.New("txn: transaction committed durably but in-memory apply failed; recovery will reconcile")

ErrCommittedNotApplied is returned by Tx.Commit when the transaction was made durable (its op frames and OpCommit marker were written and fsynced) but a later in-memory apply step failed — today only reachable as [adjlist.ErrShardFull] when the store's graph was built with a [adjlist.Config.MaxShardCapacity] cap.

The transaction IS durably committed: it carries a complete commit marker, so recovery — which rebuilds the graph without a shard-capacity cap — replays it in full and atomically. Callers must therefore treat this as "committed; the in-memory view is temporarily behind and will be consistent after the next recovery", NOT as a rollback: retrying the transaction would commit it a second time. The underlying apply error is wrapped and recoverable with errors.Is/errors.Unwrap. This sentinel exists so a durable commit is never reported as a plain, ambiguous failure (audit gap F5, see docs/acid-audit.md).

View Source
var ErrNoWeightCodec = errors.New("txn: store has no WeightCodec; cannot persist non-zero edge weight")

ErrNoWeightCodec is returned by Tx.AddEdge when the caller passes a non-zero weight to a store that was constructed without a typed WeightCodec (i.e. via [NewStore] or NewStoreWithCodec). Zero- weight calls remain accepted on those constructors and buffer an OpAddEdge (unweighted) record.

View Source
var ErrTxFinished = errors.New("txn: transaction already finished")

ErrTxFinished is returned by operations on a transaction that has already been committed or rolled back.

Functions

This section is empty.

Types

type Codec

type Codec[N comparable] interface {
	// Encode appends the wire form of v to buf and returns the
	// extended slice. The returned slice may alias buf.
	Encode(buf []byte, v N) ([]byte, error)
	// Decode reads a value from the head of buf, returning the
	// decoded value, the remaining unread tail, and any error. On
	// error, value and tail are unspecified.
	Decode(buf []byte) (value N, rest []byte, err error)
}

Codec encodes and decodes node-identifier values of type N onto the transactional op log. Implementations append the encoded form to the caller-supplied buffer and reverse the process from the head of a buffer, returning the decoded value plus the unread tail. The append-style API keeps the common path zero-alloc when callers reuse a scratch buffer across ops.

Concurrency: a Codec value is expected to be cheap to copy and safe for concurrent use; the built-in codecs in this package are stateless and therefore inherently safe.

func NewBinaryMarshalerCodec

func NewBinaryMarshalerCodec[N comparable, P interface {
	*N
	encoding.BinaryMarshaler
	encoding.BinaryUnmarshaler
}]() Codec[N]

NewBinaryMarshalerCodec returns a Codec[N] that delegates encoding and decoding to the encoding.BinaryMarshaler and encoding.BinaryUnmarshaler methods on *N. The wire form is a uint32 little-endian length prefix followed by the marshaler's opaque payload.

The type parameter P pins *N as the receiver for the marshaler methods, which is the standard Go pattern for "value type plus pointer-receiver methods". A typical instantiation:

codec := txn.NewBinaryMarshalerCodec[MyKey, *MyKey]()

where MyKey is a value type with MarshalBinary / UnmarshalBinary methods declared on *MyKey.

func NewInt32Codec

func NewInt32Codec() Codec[int32]

NewInt32Codec returns the canonical Codec[int32]. The wire form is a signed varint.

func NewInt64Codec

func NewInt64Codec() Codec[int64]

NewInt64Codec returns the canonical Codec[int64]. The wire form is a signed varint.

func NewIntCodec

func NewIntCodec() Codec[int]

NewIntCodec returns the canonical Codec[int]. The wire form is a signed varint (encoding/binary's PutVarint).

func NewStringCodec

func NewStringCodec() Codec[string]

NewStringCodec returns the canonical Codec[string] used for typed op records keyed by string node identifiers. The wire form is a uint32 little-endian length prefix followed by the utf-8 bytes.

func NewUUIDCodec

func NewUUIDCodec() Codec[[16]byte]

NewUUIDCodec returns the canonical Codec[[16]byte]. The wire form is a fixed 16-byte big-endian copy of the value — no length prefix because the size is constant.

func NewUint64Codec

func NewUint64Codec() Codec[uint64]

NewUint64Codec returns the canonical Codec[uint64]. The wire form is an unsigned varint.

type Op

type Op[N comparable, W any] struct {
	Kind     OpKind
	Src, Dst N
	Weight   W
	Label    string
	// Key is the property key for SetNodeProperty, DelNodeProperty,
	// SetEdgeProperty, and DelEdgeProperty ops.
	Key string
	// Value is the typed property value for SetNodeProperty and SetEdgeProperty
	// ops. It is the zero PropertyValue for all other op kinds.
	Value lpg.PropertyValue
	// Handle is the stable per-edge handle carried by the Stage-2
	// handle-bearing op kinds ([OpAddEdgeH], [OpSetEdgeLabelByHandle],
	// [OpSetEdgePropertyByHandle], [OpRemoveEdgeInstanceByHandle]). It is 0
	// for every other op kind.
	Handle uint64
}

Op is a single buffered mutation.

The type carries the endpoint identifiers (Src, Dst), the edge weight (Weight), a string Label used by label ops, and Key / Value used by property ops. Fields are zero-valued for op kinds that do not require them.

type OpKind

type OpKind uint8

OpKind enumerates the mutation kinds supported by a transaction.

const (
	// OpAddEdge buffers an AddEdge(src, dst, _) mutation. The applied
	// weight on the in-memory graph is the zero value of W. This kind
	// is emitted by stores constructed without a weight codec (see
	// [NewStore] and [NewStoreWithCodec]) and by [NewStoreWithOptions]
	// stores when the caller passes the zero W value.
	OpAddEdge OpKind = iota + 1
	// OpSetNodeLabel buffers a SetNodeLabel(node, label) mutation.
	OpSetNodeLabel
	// OpSetEdgeLabel buffers a SetEdgeLabel(src, dst, label) mutation.
	OpSetEdgeLabel
	// OpAddEdgeWeighted buffers an AddEdge(src, dst, w) mutation with
	// a typed weight payload. Only emitted by stores constructed via
	// [NewStoreWithOptions] (which carries a [WeightCodec]). Recovery
	// readers that do not know about [OpAddEdgeWeighted] surface the
	// frame as an unknown kind; readers that do know it walk the
	// weight payload via the registered [WeightCodec] before reading
	// the trailing label.
	OpAddEdgeWeighted

	// OpAddNode buffers an AddNode(key) mutation.
	OpAddNode
	// OpRemoveNode buffers a logical node removal (strips labels and
	// properties; the mapper entry is permanent).
	OpRemoveNode
	// OpRemoveNodeLabel buffers a RemoveNodeLabel(node, label) mutation.
	// The label is carried in the Label field of the Op.
	OpRemoveNodeLabel
	// OpSetNodeProperty buffers a SetNodeProperty(node, key, value) mutation.
	// Key is the property key; Value is the typed property value.
	OpSetNodeProperty
	// OpDelNodeProperty buffers a DelNodeProperty(node, key) mutation.
	// Key is the property key.
	OpDelNodeProperty
	// OpRemoveEdge buffers a RemoveEdge(src, dst) mutation.
	OpRemoveEdge
	// OpSetEdgeProperty buffers a SetEdgeProperty(src, dst, key, value) mutation.
	// Key is the property key; Value is the typed property value.
	OpSetEdgeProperty
	// OpDelEdgeProperty buffers a DelEdgeProperty(src, dst, key) mutation.
	// Key is the property key.
	OpDelEdgeProperty

	// OpCommit is a control record, not a graph mutation. It marks the
	// durable end of a transaction batch in the v3 ([OpRecordV3]) WAL
	// envelope: a commit writes one OpCommit frame, carrying the
	// transaction's sequence number, after all of the transaction's op
	// frames and immediately before the single fsync. Recovery treats it
	// as a no-op against the graph; its sole effect is on the replay state
	// machine, which applies a buffered transaction's ops only when it
	// reads the matching OpCommit. A torn write that loses the OpCommit
	// (or any preceding op frame) causes recovery to discard the whole
	// transaction, giving all-or-nothing atomicity (audit gap F1, see
	// docs/acid-audit.md). OpCommit never appears in a v1/v2 frame.
	OpCommit

	// OpAddEdgeH buffers an AddEdge(src, dst, w) mutation carrying a durable
	// stable per-edge handle (see graph/lpg/edge_handle.go). It is the
	// handle-bearing successor of [OpAddEdgeWeighted]: the body is the
	// weighted-edge body followed by the 8-byte handle. The handle is
	// allocated from the graph's monotone handle counter when the op is
	// buffered (so the value is stable in the WAL frame) and is replayed via
	// [lpg.Graph.AddEdgeHIfAbsent] so a snapshot + full-WAL recovery does
	// not double the edge. Emitted by [Tx.AddEdge] on a weight-codec store.
	OpAddEdgeH
	// OpSetEdgeLabelByHandle buffers a SetEdgeLabelByHandle(src, dst,
	// handle, label) mutation, persisting one parallel edge's per-CREATE
	// type so it survives recovery keyed to the stable handle rather than
	// collapsing into the per-pair union. The body is the edge-with-label
	// body followed by the 8-byte handle.
	OpSetEdgeLabelByHandle
	// OpSetEdgePropertyByHandle buffers a SetEdgePropertyByHandle(src, dst,
	// handle, key, value) mutation, persisting one parallel edge's
	// per-CREATE property. The body is the edge-property body followed by
	// the 8-byte handle.
	OpSetEdgePropertyByHandle
	// OpRemoveEdgeInstanceByHandle buffers a RemoveEdgeInstanceByHandle(src,
	// dst, handle) mutation, dropping one logical edge's per-handle metadata
	// on DELETE while leaving sibling handles untouched. The body is the
	// edge-no-tail body followed by the 8-byte handle.
	OpRemoveEdgeInstanceByHandle
)

Mutation kinds supported by a transaction. The values are stable wire identifiers: legacy unweighted commits stay on OpAddEdge so pre-T8 readers continue to walk them, and new weighted commits use OpAddEdgeWeighted so the weight payload sits between the codec- encoded endpoints and the trailing label.

type Options

type Options[N comparable, W any] struct {
	// Codec serialises endpoint identifiers. Must not be nil.
	Codec Codec[N]
	// WeightCodec serialises edge weights. Must not be nil.
	WeightCodec WeightCodec[W]
}

Options carries the codecs used by NewStoreWithOptions. Both fields are required: Codec serialises endpoint identifiers and WeightCodec serialises edge weights for OpAddEdgeWeighted records.

A nil WeightCodec is rejected by NewStoreWithOptions; callers that do not need durable weights should use NewStoreWithCodec instead.

type Store

type Store[N comparable, W any] struct {
	// contains filtered or unexported fields
}

Store bundles an lpg.Graph with a wal.Writer and the single- writer lock that serialises transactions.

Concurrency: any number of goroutines may call Begin/BeginCtx; transactions serialise on the store mutex, so only one Tx is active at any moment. Reads on the underlying lpg.Graph remain concurrent and lock-free per the lpg/adjlist contracts.

Example

ExampleStore runs one atomic transaction end-to-end: open a store over a graph and a WAL, begin a transaction, buffer a few mutations, commit them, and read the committed state back. A second transaction is rolled back to show the all-or-nothing nature: rolled-back work leaves no trace in the graph.

package main

import (
	"fmt"
	"os"
	"path/filepath"

	"github.com/FlavioCFOliveira/GoGraph/graph/adjlist"
	"github.com/FlavioCFOliveira/GoGraph/graph/lpg"
	"github.com/FlavioCFOliveira/GoGraph/store/txn"
	"github.com/FlavioCFOliveira/GoGraph/store/wal"
)

func main() {
	dir, err := os.MkdirTemp("", "txn-example")
	if err != nil {
		panic(err)
	}
	defer func() { _ = os.RemoveAll(dir) }()

	w, err := wal.Open(filepath.Join(dir, "wal"))
	if err != nil {
		panic(err)
	}
	defer func() { _ = w.Close() }()

	g := lpg.New[string, int64](adjlist.Config{Directed: true})
	s := txn.NewStoreWithOptions[string, int64](g, w, txn.Options[string, int64]{
		Codec:       txn.NewStringCodec(),
		WeightCodec: txn.NewInt64WeightCodec(),
	})

	// A committed transaction: a labelled node and a weighted edge become
	// visible together when Commit returns.
	tx := s.Begin()
	if err := tx.AddNode("alice"); err != nil {
		panic(err)
	}
	if err := tx.SetNodeLabel("alice", "Person"); err != nil {
		panic(err)
	}
	if err := tx.AddEdge("alice", "bob", 7); err != nil {
		panic(err)
	}
	if err := tx.Commit(); err != nil {
		panic(err)
	}

	committed := s.Graph()
	fmt.Printf("after commit: edge=%t label=%t\n",
		committed.AdjList().HasEdge("alice", "bob"),
		committed.HasNodeLabel("alice", "Person"))

	// A rolled-back transaction: the buffered op never reaches the graph.
	tx2 := s.Begin()
	if err := tx2.AddEdge("alice", "carol", 1); err != nil {
		panic(err)
	}
	if err := tx2.Rollback(); err != nil {
		panic(err)
	}
	fmt.Printf("after rollback: edge=%t\n", committed.AdjList().HasEdge("alice", "carol"))

}
Output:
after commit: edge=true label=true
after rollback: edge=false
Example (Recover)

ExampleStore_recover shows the durability half of the contract: Commit writes to the WAL before applying in memory, so after the store is closed (a simulated restart) recovery replays the WAL and the committed state is fully rebuilt.

package main

import (
	"fmt"
	"os"
	"path/filepath"

	"github.com/FlavioCFOliveira/GoGraph/graph/adjlist"
	"github.com/FlavioCFOliveira/GoGraph/graph/lpg"
	"github.com/FlavioCFOliveira/GoGraph/store/recovery"
	"github.com/FlavioCFOliveira/GoGraph/store/txn"
	"github.com/FlavioCFOliveira/GoGraph/store/wal"
)

func main() {
	dir, err := os.MkdirTemp("", "txn-recover-example")
	if err != nil {
		panic(err)
	}
	defer func() { _ = os.RemoveAll(dir) }()

	// Open, commit, close — the lifetime of one process.
	w, err := wal.Open(filepath.Join(dir, "wal"))
	if err != nil {
		panic(err)
	}
	g := lpg.New[string, int64](adjlist.Config{Directed: true})
	s := txn.NewStoreWithOptions[string, int64](g, w, txn.Options[string, int64]{
		Codec:       txn.NewStringCodec(),
		WeightCodec: txn.NewInt64WeightCodec(),
	})
	tx := s.Begin()
	if err := tx.AddEdge("alice", "bob", 7); err != nil {
		panic(err)
	}
	if err := tx.SetEdgeLabel("alice", "bob", "KNOWS"); err != nil {
		panic(err)
	}
	if err := tx.Commit(); err != nil {
		panic(err)
	}
	if err := w.Close(); err != nil {
		panic(err)
	}

	// A fresh process reopens the directory and replays the WAL.
	res, err := recovery.Open[string, int64](dir, recovery.Options[string, int64]{
		Codec:       txn.NewStringCodec(),
		WeightCodec: txn.NewInt64WeightCodec(),
	})
	if err != nil {
		panic(err)
	}
	rg := res.Graph
	fmt.Printf("recovered: edge=%t label=%t\n",
		rg.AdjList().HasEdge("alice", "bob"),
		rg.HasEdgeLabel("alice", "bob", "KNOWS"))

}
Output:
recovered: edge=true label=true

func NewStoreWithCodec

func NewStoreWithCodec[N comparable, W any](g *lpg.Graph[N, W], wlog *wal.Writer, codec Codec[N]) *Store[N, W]

NewStoreWithCodec returns a Store wrapping g and wal that encodes node identifiers via the supplied typed Codec. Each transaction is emitted as v3-tagged frames: a one-byte version tag (OpRecordV3), the OpKind, the per-transaction sequence, then the codec-encoded src and dst values inline, then a uint16 little-endian label length and the label bytes — one frame per op, followed by an OpCommit marker so recovery applies the transaction atomically. The body is the dual of the v3 branch in store/recovery.Decode, which detects the version tag and walks the body back through the same codec.

codec must not be nil.

The returned store has no WeightCodec; Tx.AddEdge called with a non-zero weight returns ErrNoWeightCodec. Callers that need durable weighted edges should use NewStoreWithOptions.

func NewStoreWithOptions

func NewStoreWithOptions[N comparable, W any](g *lpg.Graph[N, W], wlog *wal.Writer, opts Options[N, W]) *Store[N, W]

NewStoreWithOptions returns a Store wrapping g and wal that encodes node identifiers via opts.Codec and edge weights via opts.WeightCodec. Each WAL payload is emitted in the v2 format. Weighted Tx.AddEdge calls produce OpAddEdgeWeighted frames whose body is laid out as:

uint8  version  ([OpRecordV2])
uint8  kind     ([OpAddEdgeWeighted])
codec  src
codec  dst
wcodec w
uint16 labelLen (always 0 for AddEdge)

Calls to Tx.AddEdge that pass the zero value of W still buffer an OpAddEdge record (without a weight payload), which preserves backwards-compatible replay under readers that predate OpAddEdgeWeighted.

opts.Codec must not be nil; opts.WeightCodec must not be nil. Passing the legacy fmt codec via opts.Codec is undefined behaviour.

func (*Store[N, W]) Begin

func (s *Store[N, W]) Begin() *Tx[N, W]

Begin opens a new transaction. The returned Tx holds the store's single-writer mutex until Commit or Rollback runs.

func (*Store[N, W]) BeginCtx

func (s *Store[N, W]) BeginCtx(ctx context.Context) (*Tx[N, W], error)

BeginCtx is the context-aware variant of Store.Begin. ctx.Err() is checked before acquiring the store mutex; on cancellation returns (nil, wrapped ctx.Err). Once the lock is held the transaction proceeds; further ctx checks happen at the caller's discretion.

func (*Store[N, W]) Codec

func (s *Store[N, W]) Codec() Codec[N]

Codec returns the Codec installed on the Store. The returned value is the same one passed to NewStoreWithCodec or NewStoreWithOptions. Callers should treat the return as read-only.

func (*Store[N, W]) Graph

func (s *Store[N, W]) Graph() *lpg.Graph[N, W]

Graph returns the underlying graph.

func (*Store[N, W]) WeightCodec

func (s *Store[N, W]) WeightCodec() WeightCodec[W]

WeightCodec returns the WeightCodec installed on the Store, or nil if the store was constructed without one. Callers should treat the return as read-only.

type Tx

type Tx[N comparable, W any] struct {
	// contains filtered or unexported fields
}

Tx is an in-progress transaction.

func (*Tx[N, W]) AddEdge

func (t *Tx[N, W]) AddEdge(src, dst N, w W) error

AddEdge buffers an AddEdge(src, dst, w) operation on the graph.

If the store was constructed with a WeightCodec (via NewStoreWithOptions), the operation is recorded as an OpAddEdgeWeighted frame carrying w on commit. If the store has no weight codec, AddEdge accepts a zero-value w (which buffers an OpAddEdge frame, producing a zero-weight edge on replay) and returns ErrNoWeightCodec for any non-zero w. Callers needing durable weighted edges must use NewStoreWithOptions.

func (*Tx[N, W]) AddEdgeWithHandle

func (t *Tx[N, W]) AddEdgeWithHandle(src, dst N, w W, handle uint64) error

AddEdgeWithHandle buffers an OpAddEdgeH operation: an AddEdge(src, dst, w) carrying the supplied durable stable per-edge handle. The handle must be a value the caller minted from the graph's lpg.Graph.NextEdgeHandle counter (or replayed from a durable record); it is written verbatim into the WAL frame and re-inserted on recovery via lpg.Graph.AddEdgeHIfAbsent. Used by the Cypher write path (walMutatorAdapter) so a parallel CREATE's handle is durable; the direct Tx.AddEdge path mints its own handle. Requires a weight codec.

func (*Tx[N, W]) AddNode

func (t *Tx[N, W]) AddNode(key N) error

AddNode buffers an AddNode(key) operation that interns key into the graph.

func (*Tx[N, W]) Commit

func (t *Tx[N, W]) Commit() error

Commit durably appends every buffered op to the WAL and only then applies it to the in-memory graph.

For a typed store the whole batch is committed atomically: every op is written as a v3 frame carrying one transaction sequence, followed by an OpCommit marker frame, then a single fsync. Recovery applies the transaction only on reading the durable marker, so a crash that tears the batch at any point recovers all of the transaction or none of it (audit gap F1, see docs/acid-audit.md).

func (*Tx[N, W]) CommitWALOnly

func (t *Tx[N, W]) CommitWALOnly() error

CommitWALOnly durably appends every buffered op to the WAL but does NOT apply the ops to the in-memory graph. Use this when the caller has already applied mutations eagerly (e.g. [walMutatorAdapter]) and only needs WAL durability without a second in-memory pass. It uses the same atomic v3 framing as Tx.Commit for typed stores.

func (*Tx[N, W]) DelEdgeProperty

func (t *Tx[N, W]) DelEdgeProperty(src, dst N, propKey string) error

DelEdgeProperty buffers a DelEdgeProperty(src, dst, propKey) operation.

func (*Tx[N, W]) DelNodeProperty

func (t *Tx[N, W]) DelNodeProperty(node N, propKey string) error

DelNodeProperty buffers a DelNodeProperty(node, propKey) operation.

func (*Tx[N, W]) RemoveEdge

func (t *Tx[N, W]) RemoveEdge(src, dst N) error

RemoveEdge buffers a RemoveEdge(src, dst) operation.

func (*Tx[N, W]) RemoveEdgeInstanceByHandle

func (t *Tx[N, W]) RemoveEdgeInstanceByHandle(src, dst N, handle uint64) error

RemoveEdgeInstanceByHandle buffers an OpRemoveEdgeInstanceByHandle operation, dropping the per-handle label and property metadata for one logical edge on DELETE while leaving sibling handles untouched.

func (*Tx[N, W]) RemoveNode

func (t *Tx[N, W]) RemoveNode(key N) error

RemoveNode buffers a logical node removal: strips all labels and properties from key. The mapper entry is permanent; this op records the intent so WAL replay can reproduce the stripped state.

func (*Tx[N, W]) RemoveNodeLabel

func (t *Tx[N, W]) RemoveNodeLabel(node N, label string) error

RemoveNodeLabel buffers a RemoveNodeLabel(node, label) operation.

func (*Tx[N, W]) Rollback

func (t *Tx[N, W]) Rollback() error

Rollback discards buffered ops without touching the WAL or graph.

func (*Tx[N, W]) SetEdgeLabel

func (t *Tx[N, W]) SetEdgeLabel(src, dst N, label string) error

SetEdgeLabel buffers a SetEdgeLabel(src, dst, label) operation. The underlying edge must exist at apply time; otherwise the underlying SetEdgeLabel call is a documented no-op.

func (*Tx[N, W]) SetEdgeLabelByHandle

func (t *Tx[N, W]) SetEdgeLabelByHandle(src, dst N, handle uint64, label string) error

SetEdgeLabelByHandle buffers an OpSetEdgeLabelByHandle operation, persisting `label` against one parallel edge's stable `handle` on the (src, dst) pair so the per-CREATE type survives recovery.

func (*Tx[N, W]) SetEdgeProperty

func (t *Tx[N, W]) SetEdgeProperty(src, dst N, propKey string, value lpg.PropertyValue) error

SetEdgeProperty buffers a SetEdgeProperty(src, dst, propKey, value) operation.

func (*Tx[N, W]) SetEdgePropertyByHandle

func (t *Tx[N, W]) SetEdgePropertyByHandle(src, dst N, handle uint64, propKey string, value lpg.PropertyValue) error

SetEdgePropertyByHandle buffers an OpSetEdgePropertyByHandle operation, persisting key=value against one parallel edge's stable `handle` on the (src, dst) pair.

func (*Tx[N, W]) SetNodeLabel

func (t *Tx[N, W]) SetNodeLabel(node N, label string) error

SetNodeLabel buffers a SetNodeLabel(node, label) operation.

func (*Tx[N, W]) SetNodeProperty

func (t *Tx[N, W]) SetNodeProperty(node N, propKey string, value lpg.PropertyValue) error

SetNodeProperty buffers a SetNodeProperty(node, propKey, value) operation.

type WeightCodec

type WeightCodec[W any] interface {
	// Encode appends the wire form of w to buf and returns the
	// extended slice. The returned slice may alias buf.
	Encode(buf []byte, w W) ([]byte, error)
	// Decode reads a value from the head of buf, returning the
	// decoded value, the remaining unread tail, and any error. On
	// error, value and tail are unspecified.
	Decode(buf []byte) (value W, rest []byte, err error)
}

WeightCodec encodes and decodes edge-weight values of type W onto the transactional op log. It is the W-side dual of Codec, used by the v2 OpAddEdgeWeighted frame layout to persist a typed weight between the codec-encoded endpoints and the trailing label.

The contract mirrors Codec: Encode appends the wire form of w to buf and returns the extended slice; Decode reads a value from the head of buf, returning the decoded value, the unread tail, and any error. The append-style API keeps the Commit fast path zero-alloc when callers reuse a scratch buffer across ops.

Concurrency: a WeightCodec value is expected to be cheap to copy and safe for concurrent use; the built-in codecs in this package are stateless and therefore inherently safe.

func NewBinaryMarshalerWeightCodec

func NewBinaryMarshalerWeightCodec[W any, P interface {
	*W
	encoding.BinaryMarshaler
	encoding.BinaryUnmarshaler
}]() WeightCodec[W]

NewBinaryMarshalerWeightCodec returns a WeightCodec[W] that delegates encoding and decoding to the encoding.BinaryMarshaler and encoding.BinaryUnmarshaler methods on *W. The wire form is a uint32 little-endian length prefix followed by the marshaler's opaque payload.

The type parameter P pins *W as the receiver for the marshaler methods, mirroring NewBinaryMarshalerCodec for endpoints. A typical instantiation:

codec := txn.NewBinaryMarshalerWeightCodec[MyWeight, *MyWeight]()

where MyWeight is a value type with MarshalBinary / UnmarshalBinary methods declared on *MyWeight.

func NewFloat64WeightCodec

func NewFloat64WeightCodec() WeightCodec[float64]

NewFloat64WeightCodec returns the canonical WeightCodec[float64]. The wire form is a fixed 8-byte little-endian IEEE 754 layout produced by math.Float64bits and reconstructed by math.Float64frombits. The codec round-trips bits losslessly, including +0.0 vs -0.0, ±Inf and every NaN payload — be aware that NaN comparison rules apply on read (NaN != NaN), so callers that store NaN weights must compare via bit pattern (or math.IsNaN) rather than the equality operator.

func NewInt64WeightCodec

func NewInt64WeightCodec() WeightCodec[int64]

NewInt64WeightCodec returns the canonical WeightCodec[int64]. The wire form is a signed varint, matching the Codec[int64] used for endpoint encoding so that the on-disk shape is symmetrical for the common signed-integer instantiation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL