synchro

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: MIT Imports: 13 Imported by: 0

README

Synchro

Release Go Go Reference npm Maven Central SPM License Docs

Offline-first sync between PostgreSQL and native client SDKs for Swift, Kotlin, and React Native. Go server you can embed or deploy standalone. Your tables. Minimal changes.

What You Need

Component What Changes
Your tables Add deleted_at TIMESTAMPTZ NULL column
PostgreSQL wal_level = logical
Your server Register tables + wire 6 HTTP endpoints
Client app client.query() / client.execute() -- standard SQL

Why Synchro

  • Full bidirectional sync with built-in conflict resolution -- not read-only, not BYOB writes
  • Embed in your app or deploy standalone -- scale without rewriting
  • RLS-enforced authorization at the database layer -- Postgres guards your data, not application code
  • Native SDKs: Swift, Kotlin, React Native -- local SQLite, automatic change tracking, background sync

Install

go get github.com/trainstar/synchro

Server

registry := synchro.NewRegistry()
registry.Register(&synchro.TableConfig{
    TableName:   "workouts",
    OwnerColumn: "user_id",
})

engine, _ := synchro.NewEngine(synchro.Config{
    DB:       db,
    Registry: registry,
})

h := handler.New(engine)
http.HandleFunc("POST /sync/register",  h.ServeRegister)
http.HandleFunc("POST /sync/pull",      h.ServePull)
http.HandleFunc("POST /sync/push",      h.ServePush)
http.HandleFunc("POST /sync/snapshot",  h.ServeSnapshot)
http.HandleFunc("GET /sync/tables",     h.ServeTableMeta)
http.HandleFunc("GET /sync/schema",     h.ServeSchema)

Clients

Swift

let client = try SynchroClient(config: SynchroConfig(
    dbPath: dbPath, serverURL: url, authProvider: { token }, clientID: "device-1", appVersion: "1.0.0"
))
try await client.start()
let rows = try client.query("SELECT * FROM workouts")

Kotlin

val client = SynchroClient(SynchroConfig(
    dbPath = dbPath, serverURL = url, authProvider = { token }, clientID = "device-1", appVersion = "1.0.0"
), context)
client.start()
val rows = client.query("SELECT * FROM workouts")

React Native

const client = new SynchroClient({
    dbPath, serverURL: url, authProvider: () => getToken(), clientID: 'device-1', appVersion: '1.0.0',
});
await client.initialize();
await client.start();
const rows = await client.query('SELECT * FROM workouts');

Documentation

Index

Constants

View Source
const (
	PushStatusApplied           = "applied"
	PushStatusConflict          = "conflict"
	PushStatusRejectedTerminal  = "rejected_terminal"
	PushStatusRejectedRetryable = "rejected_retryable"
)

Push status constants.

View Source
const (
	SnapshotReasonInitialSyncRequired   = "initial_sync_required"
	SnapshotReasonCheckpointBeforeLimit = "checkpoint_before_retention"
	SnapshotReasonHistoryUnavailable    = "history_unavailable"
)

Pull snapshot reason constants.

View Source
const (
	DefaultKindNone       = "none"
	DefaultKindPortable   = "portable"
	DefaultKindServerOnly = "server_only"
)

Schema default kind constants.

View Source
const DefaultPullLimit = 100

DefaultPullLimit is the default number of records per pull.

View Source
const DefaultSnapshotLimit = 100

DefaultSnapshotLimit is the default number of records per snapshot page.

View Source
const MaxPullLimit = 1000

MaxPullLimit is the maximum records per pull.

View Source
const MaxSnapshotLimit = 1000

MaxSnapshotLimit is the maximum records per snapshot page.

Variables

View Source
var (
	// ErrUpgradeRequired indicates the client version is too old.
	ErrUpgradeRequired = errors.New("synchro: client upgrade required")

	// ErrOwnershipViolation indicates the user does not own the record.
	ErrOwnershipViolation = errors.New("synchro: ownership violation")

	// ErrRecordNotFound indicates the record does not exist.
	ErrRecordNotFound = errors.New("synchro: record not found")

	// ErrConflict indicates a conflict during push.
	ErrConflict = errors.New("synchro: conflict")

	// ErrStaleClient indicates the client's checkpoint is too far behind.
	ErrStaleClient = errors.New("synchro: stale client")

	// ErrSchemaMismatch indicates client and server schema contracts differ.
	ErrSchemaMismatch = errors.New("synchro: schema mismatch")

	// ErrUnsupportedSchemaFeature indicates the schema contains unsupported constructs.
	ErrUnsupportedSchemaFeature = errors.New("synchro: unsupported schema feature")

	// ErrTableNotRegistered indicates the table is not in the registry.
	ErrTableNotRegistered = errors.New("synchro: table not registered")

	// ErrTableReadOnly indicates the table does not accept push operations.
	ErrTableReadOnly = errors.New("synchro: table is read-only")

	// ErrInvalidOperation indicates an unknown push operation type.
	ErrInvalidOperation = errors.New("synchro: invalid operation")

	// ErrClientNotRegistered indicates the client has not been registered.
	ErrClientNotRegistered = errors.New("synchro: client not registered")

	// ErrUnregisteredParent indicates a table references a parent not in the registry.
	ErrUnregisteredParent = errors.New("synchro: unregistered parent table")

	// ErrCycleDetected indicates a cycle exists in the parent chain.
	ErrCycleDetected = errors.New("synchro: cycle detected in parent chain")

	// ErrOrphanedChain indicates the parent chain root has no OwnerColumn.
	ErrOrphanedChain = errors.New("synchro: orphaned parent chain")

	// ErrMissingOwnership indicates a pushable table has no ownership path.
	ErrMissingOwnership = errors.New("synchro: pushable table has no ownership path")

	// ErrMissingParentFKCol indicates ParentTable is set without ParentFKCol.
	ErrMissingParentFKCol = errors.New("synchro: ParentTable set without ParentFKCol")

	// ErrRedundantProtected indicates a ProtectedColumns entry is redundant.
	ErrRedundantProtected = errors.New("synchro: redundant protected column")

	// ErrInvalidPushPolicy indicates an unsupported push policy value.
	ErrInvalidPushPolicy = errors.New("synchro: invalid push policy")

	// ErrInvalidBucketConfig indicates contradictory bucket configuration.
	ErrInvalidBucketConfig = errors.New("synchro: invalid bucket configuration")

	// ErrSnapshotRequired indicates the client must rebuild from a full snapshot.
	ErrSnapshotRequired = errors.New("synchro: snapshot required")
)

Functions

func CheckVersion

func CheckVersion(clientVersion, minVersion string) error

CheckVersion returns ErrUpgradeRequired if clientVersion < minVersion.

func GenerateRLSPolicies

func GenerateRLSPolicies(registry *Registry) []string

GenerateRLSPolicies generates SQL statements to enable RLS and create policies for all registered tables based on registry configuration.

Ownership comparisons use text equality instead of hard-coded UUID casts so integrations can use non-UUID user identifiers.

func SetAuthContext

func SetAuthContext(ctx context.Context, tx DB, userID string) error

SetAuthContext sets the RLS auth context for push operations. Uses set_config() because SET LOCAL does not accept parameterized values.

Types

type AcceptedRecord

type AcceptedRecord struct {
	ID        string
	TableName string
	Operation Operation
}

AcceptedRecord contains information about a successfully pushed record.

type BucketUpdate

type BucketUpdate struct {
	Added   []string `json:"added,omitempty"`
	Removed []string `json:"removed,omitempty"`
}

BucketUpdate describes changes to a client's bucket subscriptions.

type ChangelogEntry

type ChangelogEntry struct {
	Seq       int64     `json:"seq"`
	BucketID  string    `json:"bucket_id"`
	TableName string    `json:"table_name"`
	RecordID  string    `json:"record_id"`
	Operation Operation `json:"operation"`
	CreatedAt time.Time `json:"created_at"`
}

ChangelogEntry represents a single entry in the sync changelog.

type Client

type Client struct {
	ID          string     `json:"id"`
	UserID      string     `json:"user_id"`
	ClientID    string     `json:"client_id"`
	ClientName  *string    `json:"client_name,omitempty"`
	Platform    string     `json:"platform"`
	AppVersion  string     `json:"app_version"`
	BucketSubs  []string   `json:"bucket_subs,omitempty"`
	LastSyncAt  *time.Time `json:"last_sync_at,omitempty"`
	LastPullAt  *time.Time `json:"last_pull_at,omitempty"`
	LastPushAt  *time.Time `json:"last_push_at,omitempty"`
	LastPullSeq *int64     `json:"last_pull_seq,omitempty"`
	IsActive    bool       `json:"is_active"`
	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
}

Client represents a registered sync client device.

type CompactResult

type CompactResult struct {
	DeactivatedClients int64
	SafeSeq            int64
	DeletedEntries     int64
}

CompactResult reports what a compaction run accomplished.

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor runs changelog compaction.

func NewCompactor

func NewCompactor(cfg *CompactorConfig) *Compactor

NewCompactor creates a Compactor from the given config.

func (*Compactor) Compact

func (c *Compactor) Compact(ctx context.Context, db DB, safeSeq int64) (int64, error)

Compact deletes changelog entries with seq <= safeSeq in batches.

func (*Compactor) DeactivateStaleClients

func (c *Compactor) DeactivateStaleClients(ctx context.Context, db DB) (int64, error)

DeactivateStaleClients marks clients as inactive if they haven't synced within the threshold.

func (*Compactor) RunCompaction

func (c *Compactor) RunCompaction(ctx context.Context, db DB) (CompactResult, error)

RunCompaction orchestrates a full compaction: deactivate stale clients, compute the safe boundary, then delete old changelog entries.

func (*Compactor) SafeSeq

func (c *Compactor) SafeSeq(ctx context.Context, db DB) (int64, error)

SafeSeq returns the minimum last_pull_seq across all active clients. Returns 0 if there are no active clients (nothing safe to compact).

type CompactorConfig

type CompactorConfig struct {
	// StaleThreshold is how long since last sync before a client is deactivated.
	// Default: 7 days.
	StaleThreshold time.Duration

	// BatchSize is the maximum rows deleted per batch. Default: 10000.
	BatchSize int

	// Logger for compaction operations. Falls back to slog.Default().
	Logger *slog.Logger
}

CompactorConfig configures changelog compaction.

type Config

type Config struct {
	// DB is the database connection pool.
	DB *sql.DB

	// Registry holds table configurations.
	Registry *Registry

	// Hooks for lifecycle callbacks.
	Hooks Hooks

	// ConflictResolver resolves push conflicts. Defaults to LWW.
	ConflictResolver ConflictResolver

	// Ownership resolves record ownership for bucketing. Defaults to JoinResolver.
	Ownership OwnershipResolver

	// MinClientVersion is the minimum supported client version (semver).
	MinClientVersion string

	// ClockSkewTolerance is added to client timestamps during LWW comparison.
	ClockSkewTolerance time.Duration

	// Compactor configures changelog compaction. Optional.
	Compactor *CompactorConfig

	// Logger for sync operations. Defaults to slog.Default().
	Logger *slog.Logger
}

Config configures the sync engine.

type Conflict

type Conflict struct {
	Table       string
	RecordID    string
	ClientID    string
	UserID      string
	ClientData  json.RawMessage
	ServerData  json.RawMessage
	ClientTime  time.Time
	ServerTime  time.Time
	BaseVersion *time.Time
}

Conflict provides full context for conflict resolution.

type ConflictResolver

type ConflictResolver interface {
	Resolve(ctx context.Context, conflict Conflict) (Resolution, error)
}

ConflictResolver resolves conflicts between client and server data.

type DB

type DB interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
}

DB is a minimal database interface using database/sql stdlib types only. Both *sql.DB and *sql.Tx satisfy this interface, as do sqlx equivalents.

type DeleteEntry

type DeleteEntry struct {
	ID        string `json:"id"`
	TableName string `json:"table_name"`
}

DeleteEntry represents a deleted record in a pull response.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the top-level sync orchestrator.

func NewEngine

func NewEngine(cfg Config) (*Engine, error)

NewEngine creates a new sync engine from the given configuration.

func (*Engine) CurrentSchemaManifest

func (e *Engine) CurrentSchemaManifest(ctx context.Context) (int64, string, error)

CurrentSchemaManifest returns current server schema version/hash.

func (*Engine) Pull

func (e *Engine) Pull(ctx context.Context, userID string, req *PullRequest) (*PullResponse, error)

Pull retrieves changes for a client since their checkpoint.

func (*Engine) Push

func (e *Engine) Push(ctx context.Context, userID string, req *PushRequest) (*PushResponse, error)

Push processes client changes within a transaction.

func (*Engine) RegisterClient

func (e *Engine) RegisterClient(ctx context.Context, userID string, req *RegisterRequest) (*RegisterResponse, error)

RegisterClient registers or updates a sync client.

func (*Engine) Registry

func (e *Engine) Registry() *Registry

Registry returns the engine's table registry.

func (*Engine) RunCompaction

func (e *Engine) RunCompaction(ctx context.Context) (CompactResult, error)

RunCompaction runs a single compaction cycle. Returns an error if no Compactor was configured.

func (*Engine) Schema

func (e *Engine) Schema(ctx context.Context) (*SchemaResponse, error)

Schema returns the full server schema contract.

func (*Engine) SchemaManifestHistory

func (e *Engine) SchemaManifestHistory(ctx context.Context, limit int) ([]SchemaManifestEntry, error)

SchemaManifestHistory returns persisted manifest rows ordered by newest version first.

func (*Engine) Snapshot

func (e *Engine) Snapshot(ctx context.Context, userID string, req *SnapshotRequest) (*SnapshotResponse, error)

Snapshot performs a full snapshot for a client. Returns paginated results using a stateless cursor.

func (*Engine) StartCompaction

func (e *Engine) StartCompaction(ctx context.Context, interval time.Duration)

StartCompaction runs compaction on a recurring interval in a background goroutine. The goroutine stops when ctx is cancelled. No-op if no Compactor was configured.

func (*Engine) TableMetadata

func (e *Engine) TableMetadata(ctx context.Context) (*TableMetaResponse, error)

TableMetadata returns sync metadata for all registered tables.

type Hooks

type Hooks struct {
	// OnPushAccepted is called within the push transaction after records are applied.
	// Use this for side effects like rebuilding search indexes.
	OnPushAccepted func(ctx context.Context, tx *sql.Tx, accepted []AcceptedRecord) error

	// OnConflict is called when a conflict is detected and resolved.
	// This is informational — it cannot change the resolution.
	OnConflict func(ctx context.Context, conflict Conflict, resolution Resolution)

	// OnPullComplete is called after a successful pull operation.
	OnPullComplete func(ctx context.Context, clientID string, checkpoint int64, count int)

	// OnSchemaIncompatible is called when a client's version is below minimum.
	OnSchemaIncompatible func(ctx context.Context, clientID string, clientVer string, minVer string)

	// OnStaleClient is called when a client hasn't synced recently.
	// Return true to allow the sync, false to reject with ErrStaleClient.
	OnStaleClient func(ctx context.Context, clientID string, lastSync time.Time) bool

	// OnCompaction is called after a successful compaction run.
	OnCompaction func(ctx context.Context, result CompactResult)

	// OnSnapshotRequired is called when a client must rebuild from a full snapshot.
	OnSnapshotRequired func(ctx context.Context, clientID string, checkpoint int64, minSeq int64, reason string)
}

Hooks defines lifecycle callbacks for sync operations. All hooks are optional — nil hooks are skipped.

type JoinResolver

type JoinResolver struct {
	// contains filtered or unexported fields
}

JoinResolver is the default OwnershipResolver that uses registry metadata. It also implements wal.BucketAssigner when constructed with NewJoinResolverWithDB.

func NewJoinResolver

func NewJoinResolver(registry *Registry) *JoinResolver

NewJoinResolver creates a JoinResolver from a registry. The returned resolver can be used as an OwnershipResolver but not as a wal.BucketAssigner (no DB handle). Use NewJoinResolverWithDB for WAL consumer usage.

func NewJoinResolverWithDB

func NewJoinResolverWithDB(registry *Registry, db DB) *JoinResolver

NewJoinResolverWithDB creates a JoinResolver that also satisfies wal.BucketAssigner. The db handle is used by AssignBuckets to resolve parent-chain ownership.

func (*JoinResolver) AssignBuckets

func (r *JoinResolver) AssignBuckets(ctx context.Context, table string, recordID string, _ Operation, data map[string]any) ([]string, error)

AssignBuckets implements wal.BucketAssigner by delegating to ResolveOwner.

func (*JoinResolver) ResolveOwner

func (r *JoinResolver) ResolveOwner(ctx context.Context, db DB, table string, recordID string, data map[string]any) ([]string, error)

ResolveOwner determines bucket IDs for a record.

type LWWResolver

type LWWResolver struct {
	ClockSkewTolerance time.Duration
}

LWWResolver implements Last-Write-Wins conflict resolution. The client timestamp is adjusted by ClockSkewTolerance before comparison.

func (*LWWResolver) Resolve

func (r *LWWResolver) Resolve(_ context.Context, c Conflict) (Resolution, error)

Resolve implements ConflictResolver using LWW semantics.

type Operation

type Operation int

Operation represents the type of change in a sync operation.

const (
	OpInsert Operation = 1
	OpUpdate Operation = 2
	OpDelete Operation = 3
)

func ParseOperation

func ParseOperation(s string) (Operation, bool)

ParseOperation converts a string to an Operation.

func (Operation) String

func (o Operation) String() string

String returns the string representation of an Operation.

type OwnershipResolver

type OwnershipResolver interface {
	ResolveOwner(ctx context.Context, db DB, table string, recordID string, data map[string]any) ([]string, error)
}

OwnershipResolver determines which buckets a record belongs to.

type PullRequest

type PullRequest struct {
	ClientID      string   `json:"client_id"`
	Checkpoint    int64    `json:"checkpoint"`
	Tables        []string `json:"tables,omitempty"`
	Limit         int      `json:"limit,omitempty"`
	KnownBuckets  []string `json:"known_buckets,omitempty"`
	SchemaVersion int64    `json:"schema_version,omitempty"`
	SchemaHash    string   `json:"schema_hash,omitempty"`
}

PullRequest is the request for pulling changes.

type PullResponse

type PullResponse struct {
	Changes          []Record      `json:"changes"`
	Deletes          []DeleteEntry `json:"deletes"`
	Checkpoint       int64         `json:"checkpoint"`
	HasMore          bool          `json:"has_more"`
	SnapshotRequired bool          `json:"snapshot_required,omitempty"`
	SnapshotReason   string        `json:"snapshot_reason,omitempty"`
	BucketUpdates    *BucketUpdate `json:"bucket_updates,omitempty"`
	SchemaVersion    int64         `json:"schema_version"`
	SchemaHash       string        `json:"schema_hash"`
}

PullResponse is the response for pulling changes.

type PushPolicy

type PushPolicy string

PushPolicy defines whether a table accepts client writes.

const (
	// PushPolicyDisabled means the table is read-only from clients.
	PushPolicyDisabled PushPolicy = "disabled"
	// PushPolicyOwnerOnly means pushes are allowed when ownership can be resolved.
	PushPolicyOwnerOnly PushPolicy = "owner_only"
)

type PushRecord

type PushRecord struct {
	ID              string          `json:"id"`
	TableName       string          `json:"table_name"`
	Operation       string          `json:"operation"`
	Data            json.RawMessage `json:"data,omitempty"`
	ClientUpdatedAt time.Time       `json:"client_updated_at"`
	BaseUpdatedAt   *time.Time      `json:"base_updated_at,omitempty"`
}

PushRecord represents a single record being pushed from the client.

type PushRequest

type PushRequest struct {
	ClientID      string       `json:"client_id"`
	Changes       []PushRecord `json:"changes"`
	SchemaVersion int64        `json:"schema_version,omitempty"`
	SchemaHash    string       `json:"schema_hash,omitempty"`
}

PushRequest is the request body for pushing changes.

type PushResponse

type PushResponse struct {
	Accepted      []PushResult `json:"accepted"`
	Rejected      []PushResult `json:"rejected"`
	Checkpoint    int64        `json:"checkpoint"`
	ServerTime    time.Time    `json:"server_time"`
	SchemaVersion int64        `json:"schema_version"`
	SchemaHash    string       `json:"schema_hash"`
}

PushResponse is the response for pushing changes.

type PushResult

type PushResult struct {
	ID              string     `json:"id"`
	TableName       string     `json:"table_name"`
	Operation       string     `json:"operation"`
	Status          string     `json:"status"`
	ReasonCode      string     `json:"reason_code,omitempty"`
	Message         string     `json:"message,omitempty"`
	ServerVersion   *Record    `json:"server_version,omitempty"`
	ServerUpdatedAt *time.Time `json:"server_updated_at,omitempty"`
	ServerDeletedAt *time.Time `json:"server_deleted_at,omitempty"`
}

PushResult represents the result of processing a single push record.

type Record

type Record struct {
	ID        string          `json:"id"`
	TableName string          `json:"table_name"`
	Data      json.RawMessage `json:"data"`
	UpdatedAt time.Time       `json:"updated_at"`
	DeletedAt *time.Time      `json:"deleted_at,omitempty"`
}

Record represents a single synced record.

type RegisterRequest

type RegisterRequest struct {
	ClientID      string  `json:"client_id"`
	ClientName    *string `json:"client_name,omitempty"`
	Platform      string  `json:"platform"`
	AppVersion    string  `json:"app_version"`
	SchemaVersion int64   `json:"schema_version,omitempty"`
	SchemaHash    string  `json:"schema_hash,omitempty"`
}

RegisterRequest is the request body for client registration.

type RegisterResponse

type RegisterResponse struct {
	ID            string     `json:"id"`
	ServerTime    time.Time  `json:"server_time"`
	LastSyncAt    *time.Time `json:"last_sync_at,omitempty"`
	Checkpoint    int64      `json:"checkpoint"`
	SchemaVersion int64      `json:"schema_version"`
	SchemaHash    string     `json:"schema_hash"`
}

RegisterResponse is the response for client registration.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds all syncable table configurations.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new empty Registry.

func (*Registry) All

func (r *Registry) All() []*TableConfig

All returns all registered table configurations in registration order.

func (*Registry) Get

func (r *Registry) Get(name string) *TableConfig

Get returns the configuration for a table, or nil if not registered.

func (*Registry) IsPushable

func (r *Registry) IsPushable(tableName string) bool

IsPushable returns true if the table accepts push operations.

func (*Registry) IsRegistered

func (r *Registry) IsRegistered(tableName string) bool

IsRegistered returns true if the table is registered.

func (*Registry) Register

func (r *Registry) Register(cfg *TableConfig)

Register adds a table configuration to the registry.

func (*Registry) TableNames

func (r *Registry) TableNames() []string

TableNames returns the names of all registered tables.

func (*Registry) Validate

func (r *Registry) Validate() error

Validate checks all registered tables for configuration errors.

type Resolution

type Resolution struct {
	// Winner is "client" or "server".
	Winner string
	// Reason explains the resolution.
	Reason string
}

Resolution describes how a conflict was resolved.

type SQLBucketResolver

type SQLBucketResolver struct {
	// contains filtered or unexported fields
}

SQLBucketResolver resolves row buckets via SQL rules first, then optional Go fallback.

func NewSQLBucketResolver

func NewSQLBucketResolver(cfg SQLBucketResolverConfig) (*SQLBucketResolver, error)

NewSQLBucketResolver creates a SQL-first resolver.

func (*SQLBucketResolver) AssignBuckets

func (r *SQLBucketResolver) AssignBuckets(
	ctx context.Context,
	table string,
	recordID string,
	operation Operation,
	data map[string]any,
) ([]string, error)

AssignBuckets resolves buckets for a row change. This method matches wal.BucketAssigner.

type SQLBucketResolverConfig

type SQLBucketResolverConfig struct {
	// DB is used for SQL rule execution and optional fallback lookups.
	DB DB

	// Registry provides table config and default ownership metadata.
	Registry *Registry

	// GlobalFunction is the SQL resolver function used when a table does not
	// define TableConfig.BucketFunction.
	// Signature: (table_name text, operation text, new_row jsonb, old_row jsonb) -> setof text.
	GlobalFunction string

	// EnableGoFallback allows fallback to OwnershipResolver when SQL rule execution fails.
	// Default false for strict SQL-only operation.
	EnableGoFallback bool

	// GoFallback is used only when EnableGoFallback is true.
	// Defaults to JoinResolver.
	GoFallback OwnershipResolver

	// MaxBucketsPerEvent bounds fanout for a single row change. 0 means unlimited.
	MaxBucketsPerEvent int

	// Logger receives fallback warnings.
	Logger *slog.Logger
}

SQLBucketResolverConfig configures SQL-first bucket resolution.

type SchemaColumn

type SchemaColumn struct {
	Name             string `json:"name"`
	DBType           string `json:"db_type"`
	LogicalType      string `json:"logical_type"`
	Nullable         bool   `json:"nullable"`
	DefaultSQL       string `json:"default_sql,omitempty"`
	DefaultKind      string `json:"default_kind"`
	SQLiteDefaultSQL string `json:"sqlite_default_sql,omitempty"`
	IsPrimaryKey     bool   `json:"is_primary_key"`
}

SchemaColumn describes a table column for client-side table creation.

type SchemaManifestEntry

type SchemaManifestEntry struct {
	SchemaVersion int64     `json:"schema_version"`
	SchemaHash    string    `json:"schema_hash"`
	CreatedAt     time.Time `json:"created_at"`
}

SchemaManifestEntry represents one persisted schema manifest version.

type SchemaResponse

type SchemaResponse struct {
	SchemaVersion int64         `json:"schema_version"`
	SchemaHash    string        `json:"schema_hash"`
	ServerTime    time.Time     `json:"server_time"`
	Tables        []SchemaTable `json:"tables"`
}

SchemaResponse returns the full sync schema contract.

type SchemaTable

type SchemaTable struct {
	TableName            string         `json:"table_name"`
	PushPolicy           string         `json:"push_policy"`
	ParentTable          string         `json:"parent_table,omitempty"`
	ParentFKCol          string         `json:"parent_fk_col,omitempty"`
	Dependencies         []string       `json:"dependencies,omitempty"`
	UpdatedAtColumn      string         `json:"updated_at_column"`
	DeletedAtColumn      string         `json:"deleted_at_column"`
	PrimaryKey           []string       `json:"primary_key"`
	BucketByColumn       string         `json:"bucket_by_column,omitempty"`
	BucketPrefix         string         `json:"bucket_prefix,omitempty"`
	GlobalWhenBucketNull bool           `json:"global_when_bucket_null,omitempty"`
	AllowGlobalRead      bool           `json:"allow_global_read,omitempty"`
	BucketFunction       string         `json:"bucket_function,omitempty"`
	Columns              []SchemaColumn `json:"columns"`
}

SchemaTable describes table metadata and column definitions.

type ServerWinsResolver

type ServerWinsResolver struct{}

ServerWinsResolver always resolves in favor of the server.

func (*ServerWinsResolver) Resolve

Resolve implements ConflictResolver by always choosing the server.

type SnapshotCursor

type SnapshotCursor struct {
	Checkpoint int64  `json:"checkpoint"`
	TableIndex int    `json:"table_idx"`
	AfterID    string `json:"after_id"`
}

SnapshotCursor tracks pagination state across snapshot pages.

type SnapshotRequest

type SnapshotRequest struct {
	ClientID      string          `json:"client_id"`
	Cursor        *SnapshotCursor `json:"cursor,omitempty"`
	Limit         int             `json:"limit,omitempty"`
	SchemaVersion int64           `json:"schema_version,omitempty"`
	SchemaHash    string          `json:"schema_hash,omitempty"`
}

SnapshotRequest is the request for a full snapshot.

type SnapshotResponse

type SnapshotResponse struct {
	Records       []Record        `json:"records"`
	Cursor        *SnapshotCursor `json:"cursor,omitempty"`
	Checkpoint    int64           `json:"checkpoint"`
	HasMore       bool            `json:"has_more"`
	SchemaVersion int64           `json:"schema_version"`
	SchemaHash    string          `json:"schema_hash"`
}

SnapshotResponse is the response for a full snapshot page.

type TableConfig

type TableConfig struct {
	// TableName is the database table name.
	TableName string

	// PushPolicy controls whether pushes are accepted for this table.
	// Default: owner_only when OwnerColumn/ParentTable is set, otherwise disabled.
	PushPolicy PushPolicy

	// OwnerColumn is the column name for user ownership.
	OwnerColumn string

	// ParentTable is the parent table name for child tables.
	ParentTable string

	// ParentFKCol is the foreign key column pointing to the parent table.
	ParentFKCol string

	// SyncColumns specifies which columns to include in pull responses (nil = all columns).
	SyncColumns []string

	// Dependencies lists tables that must sync first (for push ordering).
	Dependencies []string

	// IDColumn is the primary key column name (defaults to "id").
	IDColumn string

	// UpdatedAtColumn is the timestamp column for change tracking (defaults to "updated_at").
	UpdatedAtColumn string

	// DeletedAtColumn is the soft delete column (defaults to "deleted_at").
	DeletedAtColumn string

	// ProtectedColumns are additional columns (beyond defaults) that clients may not write.
	// Default protected: id, created_at, updated_at, deleted_at, OwnerColumn.
	ProtectedColumns []string

	// BucketByColumn enables fast-path bucketing without SQL function execution.
	// Bucket ID is formed as BucketPrefix + value(column).
	BucketByColumn string

	// BucketPrefix prefixes fast-path bucket values. Default "user:".
	BucketPrefix string

	// GlobalWhenBucketNull emits "global" when BucketByColumn value is NULL/empty.
	GlobalWhenBucketNull bool

	// AllowGlobalRead enables NULL-owner rows to be readable by all users via RLS.
	// Default false.
	AllowGlobalRead bool

	// BucketFunction optionally overrides the global SQL bucket resolver function
	// for this table. Signature:
	//   (table_name text, operation text, new_row jsonb, old_row jsonb) -> setof text.
	BucketFunction string
	// contains filtered or unexported fields
}

TableConfig defines sync behavior for a table.

func (*TableConfig) AllowedInsertColumns

func (c *TableConfig) AllowedInsertColumns(dataCols []string) []string

AllowedInsertColumns returns the set of columns allowed in an INSERT. This includes all non-protected columns plus id, owner col, and parent FK col (which are set once on creation).

func (*TableConfig) AllowedUpdateColumns

func (c *TableConfig) AllowedUpdateColumns(dataCols []string) []string

AllowedUpdateColumns returns columns from dataCols that are allowed in an UPDATE. Only non-protected columns — no PKs, ownership FKs, or timestamps.

func (*TableConfig) IsProtected

func (c *TableConfig) IsProtected(col string) bool

IsProtected returns true if the column is protected from client writes.

type TableMeta

type TableMeta struct {
	TableName            string   `json:"table_name"`
	PushPolicy           string   `json:"push_policy"`
	Dependencies         []string `json:"dependencies"`
	ParentTable          string   `json:"parent_table,omitempty"`
	ParentFKCol          string   `json:"parent_fk_col,omitempty"`
	UpdatedAtColumn      string   `json:"updated_at_column,omitempty"`
	DeletedAtColumn      string   `json:"deleted_at_column,omitempty"`
	BucketByColumn       string   `json:"bucket_by_column,omitempty"`
	BucketPrefix         string   `json:"bucket_prefix,omitempty"`
	GlobalWhenBucketNull bool     `json:"global_when_bucket_null,omitempty"`
	AllowGlobalRead      bool     `json:"allow_global_read,omitempty"`
	BucketFunction       string   `json:"bucket_function,omitempty"`
}

TableMeta describes a single table's sync configuration.

type TableMetaResponse

type TableMetaResponse struct {
	Tables        []TableMeta `json:"tables"`
	ServerTime    time.Time   `json:"server_time"`
	SchemaVersion int64       `json:"schema_version"`
	SchemaHash    string      `json:"schema_hash"`
}

TableMetaResponse is the response for the table metadata endpoint.

type TxBeginner

type TxBeginner interface {
	BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
}

TxBeginner can start a transaction. *sql.DB satisfies this.

Directories

Path Synopsis
cmd
synchrod command
synchrotestdb command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL