Documentation
¶
Overview ¶
Package patches provides streaming patch application for snapshots.
This package implements the PatchApplier interface for applying patches to event streams without materializing full documents in memory.
Current implementation (InMemoryApplier) is temporary and materializes the full document. Future implementation (StreamingApplier) will apply patches incrementally to streaming events, only materializing small subtrees at patch target paths.
See docs/patch_design_reference.md for the full streaming design (Piece 2).
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CollectedSubtree ¶ added in v0.0.20
CollectedSubtree represents a collected subtree with its path and IR node.
type EventReadCloser ¶
type EventReadCloser interface {
stream.EventReader
io.Closer
}
EventReadCloser extends stream.EventReader with Close for managing resources. This is specific to storage layer needs, not part of the general stream package.
func NewEmptyEventReader ¶
func NewEmptyEventReader() EventReadCloser
NewEmptyEventReader creates an empty event reader. Returns an EventReadCloser with a no-op Close method.
func NewSnapshotEventReader ¶
func NewSnapshotEventReader(r io.ReadCloser) EventReadCloser
NewSnapshotEventReader creates an event reader from a closable reader positioned at snapshot events. Takes ownership of the closer and will close it when Close() is called.
type EventWriteCloser ¶
type EventWriteCloser interface {
stream.EventWriter
io.Closer
}
EventWriteCloser extends stream.EventSink with Close for managing resources. This is specific to storage layer needs, not part of the general stream package.
func NewBufferEventSink ¶
func NewBufferEventSink(buf *bytes.Buffer) EventWriteCloser
NewBufferEventSink creates an event sink that writes to a byte buffer. Returns an EventWriteCloser with a no-op Close since buffers don't need closing.
func NewFileEventSink ¶
func NewFileEventSink(w io.WriteCloser) EventWriteCloser
NewFileEventSink creates an event sink that writes to a file or other closable writer. Takes ownership of the closer and will close it when Close() is called.
type InMemoryApplier ¶
type InMemoryApplier struct{}
InMemoryApplier is a temporary implementation that materializes the full document. This violates the streaming principle but provides a working implementation until the streaming processor (Piece 2 from patch_design_reference.md) is complete.
func NewInMemoryApplier ¶
func NewInMemoryApplier() *InMemoryApplier
NewInMemoryApplier creates an in-memory patch applier.
func (*InMemoryApplier) ApplyPatches ¶
func (a *InMemoryApplier) ApplyPatches(baseEvents stream.EventReader, patches []*ir.Node, sink stream.EventWriter) error
ApplyPatches materializes the full document, applies patches, converts back to events. TODO: Replace with streaming implementation that never materializes full document.
type PatchApplier ¶
type PatchApplier interface {
// ApplyPatches applies patches to base events, writes result to sink.
// Patches are applied in order.
ApplyPatches(baseEvents stream.EventReader, patches []*ir.Node, sink stream.EventWriter) error
}
PatchApplier applies patches to streaming events. Implementations may materialize subtrees or stream fully depending on design.
type PatchIndex ¶ added in v0.0.20
type PatchIndex struct {
// contains filtered or unexported fields
}
PatchIndex maps kinded paths to the dlog entries that affect them. Used by StreamingProcessor to identify which subtrees need patching.
func BuildPatchIndex ¶ added in v0.0.20
func BuildPatchIndex(entries []*dlog.Entry) *PatchIndex
BuildPatchIndex walks dlog entries to find nodes tagged with PatchRootTag. Returns an index mapping paths to the entries that affect them.
func NewPatchIndex ¶ added in v0.0.20
func NewPatchIndex() *PatchIndex
NewPatchIndex creates an empty PatchIndex.
func (*PatchIndex) HasPatches ¶ added in v0.0.20
func (pi *PatchIndex) HasPatches(path string) bool
HasPatches returns true if any patches affect the given path.
func (*PatchIndex) Lookup ¶ added in v0.0.20
func (pi *PatchIndex) Lookup(path string) []*dlog.Entry
Lookup returns the entries that affect the given path.
func (*PatchIndex) Paths ¶ added in v0.0.20
func (pi *PatchIndex) Paths() []string
Paths returns all paths that have patches.
type StreamingProcessor ¶ added in v0.0.20
type StreamingProcessor struct{}
StreamingProcessor applies patches to streaming events without materializing the full document. Only subtrees that need patching are materialized.
func NewStreamingProcessor ¶ added in v0.0.20
func NewStreamingProcessor() *StreamingProcessor
NewStreamingProcessor creates a new streaming patch processor.
func (*StreamingProcessor) ApplyPatches ¶ added in v0.0.20
func (sp *StreamingProcessor) ApplyPatches(baseEvents stream.EventReader, patches []*ir.Node, sink stream.EventWriter) error
ApplyPatches applies patches to base events, writing results to sink. Patches are applied in order for each patched path.
type SubtreeCollector ¶ added in v0.0.20
type SubtreeCollector struct {
// contains filtered or unexported fields
}
SubtreeCollector collects events for subtrees that need patching. Uses path tracking to identify patched subtrees and collect their events.
Path timing behavior (from stream.State): - EventKey("foo"): currentPath becomes "foo" immediately - Value event after key: currentPath is STILL "foo" (unchanged until next key) - EventBeginArray, then EventString: currentPath becomes "[0]" on the string
Collection algorithm (following path_finder.go pattern): 1. When path matches a patch path:
- EventKey/EventIntKey: Set collecting=true (don't collect the key itself)
- EventBegin*: Set collecting=true, depth=1, append event
- Scalar: Append single event, done
2. While collecting (depth > 0 or waiting for value after key):
- Begin*: depth++, append
- End*: append, depth--, done if depth=0
- Others: append
func NewSubtreeCollector ¶ added in v0.0.20
func NewSubtreeCollector(index *PatchIndex) *SubtreeCollector
NewSubtreeCollector creates a new SubtreeCollector with the given patch index.
func (*SubtreeCollector) IsCollecting ¶ added in v0.0.20
func (sc *SubtreeCollector) IsCollecting() bool
IsCollecting returns true if currently collecting a subtree.
func (*SubtreeCollector) ProcessEvent ¶ added in v0.0.20
func (sc *SubtreeCollector) ProcessEvent(event *stream.Event) (*CollectedSubtree, error)
ProcessEvent processes an event and returns a collected subtree if one is complete. Returns nil if no subtree is complete yet.
func (*SubtreeCollector) Reset ¶ added in v0.0.20
func (sc *SubtreeCollector) Reset()
Reset clears the collector state.