autosync

package
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package autosync implements a lease-guarded background sync manager for Engram's local-first cloud replication.

The manager runs in long-lived local processes (serve, mcp) and:

  • Acquires a SQLite-backed lease to prevent duplicate workers.
  • Pushes pending local mutations to the cloud server.
  • Pulls remote mutations by cursor and applies them locally.
  • Supports debounced wake on dirty state and periodic freshness checks.
  • Uses exponential backoff with jitter on failures, bounded by max retries.
  • Tracks degraded state (phase, last error, backoff timing).
  • Shuts down gracefully via context cancellation.

Index

Constants

View Source
const (
	PhaseIdle       = "idle"
	PhasePushing    = "pushing"
	PhasePulling    = "pulling"
	PhasePushFailed = "push_failed"
	PhasePullFailed = "pull_failed"
	PhaseBackoff    = "backoff"
	PhaseHealthy    = "healthy"
	PhaseDisabled   = "disabled"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CloudTransport

type CloudTransport interface {
	PushMutations(mutations []MutationEntry) (*PushMutationsResult, error)
	PullMutations(sinceSeq int64, limit int) (*PullMutationsResponse, error)
}

CloudTransport is the subset of remote.MutationTransport methods the manager needs.

type Config

type Config struct {
	TargetKey              string        // sync_state target key (default: "cloud")
	LeaseOwner             string        // unique owner identity for lease
	LeaseInterval          time.Duration // how long to hold the lease each cycle
	DebounceDuration       time.Duration // debounce window for dirty notifications
	PollInterval           time.Duration // periodic freshness check while idle
	PushBatchSize          int           // max mutations per push request
	PullBatchSize          int           // max mutations per pull request
	MaxConsecutiveFailures int           // stop retrying after this many consecutive failures
	BaseBackoff            time.Duration // base duration for exponential backoff
	MaxBackoff             time.Duration // ceiling for backoff duration
}

Config holds tuning parameters for the background sync manager.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible production defaults.

type LocalStore

type LocalStore interface {
	GetSyncState(targetKey string) (*store.SyncState, error)
	ListPendingSyncMutations(targetKey string, limit int) ([]store.SyncMutation, error)
	AckSyncMutations(targetKey string, lastAckedSeq int64) error
	AckSyncMutationSeqs(targetKey string, seqs []int64) error
	SkipAckNonEnrolledMutations(targetKey string) (int64, error)
	AcquireSyncLease(targetKey, owner string, ttl time.Duration, now time.Time) (bool, error)
	ReleaseSyncLease(targetKey, owner string) error
	ApplyPulledMutation(targetKey string, mutation store.SyncMutation) error
	MarkSyncFailure(targetKey, message string, backoffUntil time.Time) error
	MarkSyncHealthy(targetKey string) error
}

LocalStore is the subset of store.Store methods the manager needs.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates background push/pull sync between local SQLite and the cloud server. It is safe for concurrent use.

func New

func New(localStore LocalStore, transport CloudTransport, cfg Config) *Manager

New creates a new background sync manager.

func (*Manager) NotifyDirty

func (m *Manager) NotifyDirty()

NotifyDirty signals the manager that local state has changed. Non-blocking; coalesces multiple calls via a buffered channel.

func (*Manager) ResumeAfterUpgrade added in v1.13.0

func (m *Manager) ResumeAfterUpgrade(project string) error

ResumeAfterUpgrade clears the disabled flag and sets phase to PhaseIdle, re-arming the run loop without requiring a full Manager restart. If the Manager was not disabled, this is a no-op.

func (*Manager) Run

func (m *Manager) Run(ctx context.Context)

Run is the main loop. It blocks until the context is cancelled or Stop() is called. On shutdown it releases the lease and returns. The run body is wrapped in recover() — a panic inside cycle() sets PhaseBackoff with reason_code=internal_error and logs the stack trace. BW4: Re-entry guard — a second concurrent Run call returns immediately.

func (*Manager) Status

func (m *Manager) Status() Status

Status returns the current degraded-state snapshot. Thread-safe.

func (*Manager) Stop added in v1.13.0

func (m *Manager) Stop()

Stop cancels the internal context and waits for all goroutines to exit. Safe to call before Run — returns immediately in that case.

func (*Manager) StopForUpgrade added in v1.13.0

func (m *Manager) StopForUpgrade(project string) error

StopForUpgrade sets PhaseDisabled and prevents further cycles. The sync lease is NOT released so no other worker picks it up during upgrade.

type MutationEntry added in v1.13.0

type MutationEntry struct {
	Project   string          `json:"project"`
	Entity    string          `json:"entity"`
	EntityKey string          `json:"entity_key"`
	Op        string          `json:"op"`
	Payload   json.RawMessage `json:"payload"`
}

type PullMutationsResponse added in v1.13.0

type PullMutationsResponse struct {
	Mutations []PulledMutation `json:"mutations"`
	HasMore   bool             `json:"has_more"`
	LatestSeq int64            `json:"latest_seq"`
}

type PulledMutation added in v1.13.0

type PulledMutation struct {
	Seq        int64           `json:"seq"`
	Entity     string          `json:"entity"`
	EntityKey  string          `json:"entity_key"`
	Op         string          `json:"op"`
	Payload    json.RawMessage `json:"payload"`
	OccurredAt string          `json:"occurred_at"`
}

type PushMutationsResult added in v1.13.0

type PushMutationsResult struct {
	AcceptedSeqs []int64 `json:"accepted_seqs"`
}

type Status

type Status struct {
	Phase               string     `json:"phase"`
	LastError           string     `json:"last_error,omitempty"`
	ConsecutiveFailures int        `json:"consecutive_failures"`
	BackoffUntil        *time.Time `json:"backoff_until,omitempty"`
	LastSyncAt          *time.Time `json:"last_sync_at,omitempty"`
	ReasonCode          string     `json:"reason_code,omitempty"`
	ReasonMessage       string     `json:"reason_message,omitempty"`
}

Status represents the current degraded-state snapshot of the manager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL