dotc1z

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: Apache-2.0 Imports: 48 Imported by: 2

Documentation

Index

Constants

View Source
const DefaultFinalizeTimeout = 1 * time.Hour

DefaultFinalizeTimeout bounds the detached context used for c1z finalization (WAL checkpoint, save-to-disk, upload). Large tenants take 5-15 min to upload a 4 GB compressed c1z; one hour gives plenty of headroom while still being a hard ceiling so a wedged upload cannot hold a worker indefinitely.

Variables

View Source
var (
	ErrInvalidFile        = fmt.Errorf("c1z: invalid file")
	ErrMaxSizeExceeded    = fmt.Errorf("c1z: max decoded size exceeded, increase DecoderMaxDecodedSize using %v environment variable", maxDecodedSizeEnvVar)
	ErrWindowSizeExceeded = fmt.Errorf("c1z: window size exceeded, increase DecoderMaxMemory using %v  environment variable", maxDecoderMemorySizeEnv)
)
View Source
var C1Z3FileHeader = []byte("C1Z3\x00")

C1Z3FileHeader is the magic byte sequence for v3 files.

View Source
var C1ZFileHeader = []byte("C1ZF\x00")
View Source
var ErrDbNotOpen = errors.New("c1file: database has not been opened")
View Source
var ErrEngineNotAvailable = fmt.Errorf("dotc1z: engine not available")

ErrEngineNotAvailable is returned when a caller requests an engine that the binary does not support.

View Source
var ErrReadOnly = errors.New("c1z: read only mode")

Functions

func C1ZFileCheckHeader

func C1ZFileCheckHeader(f io.ReadSeeker) (bool, error)

C1ZFileCheckHeader reads len(C1ZFileHeader) bytes from the given io.ReadSeeker and compares them to C1ZFileHeader. Returns true if the header is valid. Returns any errors from Read() or Seek(). If a nil error is returned, the given io.ReadSeeker will be pointing to the first byte of the stream, and is suitable to be passed to NewC1ZFileDecoder.

func CleanupSkippedByEnv added in v0.10.0

func CleanupSkippedByEnv() bool

CleanupSkippedByEnv reports whether BATON_SKIP_CLEANUP is set to a truthy value. Callers should also honor any explicit caller-supplied "skip cleanup" option in addition to this env fallback.

func FinalizeTimeout added in v0.10.0

func FinalizeTimeout() time.Duration

FinalizeTimeout returns the bound for the detached context that wraps c1z finalize-and-upload tails.

func NewC1FileReader

func NewC1FileReader(ctx context.Context, dbFilePath string, opts ...C1FOption) (connectorstore.Reader, error)

NewC1FileReader returns a connectorstore.Reader implementation for the given sqlite db file path.

func NewC1ZFileDecoder

func NewC1ZFileDecoder(f io.Reader, opts ...DecoderOption) (io.ReadCloser, error)

NewC1ZFileDecoder wraps a given .c1z io.Reader that validates the .c1z and decompresses/decodes the underlying file. Defaults: 128MiB max memory and 3GiB max decoded size You must close the resulting io.ReadCloser when you are done, do not forget to close the given io.Reader if necessary.

func NewDecoder added in v0.0.24

func NewDecoder(f io.Reader, opts ...DecoderOption) (*decoder, error)

NewDecoder wraps a given .c1z file io.Reader and returns an io.Reader for the underlying decoded/uncompressed file.

func NewExternalC1FileReader added in v0.2.84

func NewExternalC1FileReader(ctx context.Context, tmpDir string, externalResourceC1ZPath string) (connectorstore.Reader, error)

func NewStore added in v0.10.0

func NewStore(ctx context.Context, outputFilePath string, opts ...C1ZOption) (connectorstore.Writer, error)

NewStore opens outputFilePath through the registered engine registry. It is the engine-neutral constructor for callers that may opt into non-default engines. NewC1ZFile remains the concrete SQLite constructor for legacy callers that need *C1File.

func ReadHeader added in v0.0.24

func ReadHeader(reader io.Reader) error

ReadHeader reads len(C1ZFileHeader) bytes from the given io.Reader and compares them to C1ZFileHeader, returning an error if they don't match. If possible, ReadHeader will Seek() to the start of the stream before checking the header bytes. On return, the reader will be pointing to the first byte after the header.

func RegisterEngine added in v0.10.0

func RegisterEngine(driver EngineDriver) error

RegisterEngine registers a storage engine driver with the process-global dotc1z engine registry.

func ResolveCleanupSyncLimit added in v0.10.0

func ResolveCleanupSyncLimit(callerLimit int, currentSyncOpen bool) int

func SelectSyncsToDelete added in v0.10.0

func SelectSyncsToDelete(candidates []SyncRun, currentSyncID string, syncLimit int) []string

SelectSyncsToDelete applies the SDK retention policy to a snapshot of sync runs and returns the IDs whose data should be deleted.

Policy (matches (*C1File).Cleanup, the SQLite implementation that has shipped since the file format's introduction):

  • Candidates with no EndedAt are skipped (unfinished syncs are considered "in flight" and must never be pruned).
  • currentSyncID is skipped when non-empty (the actively-open sync is also off-limits).
  • Candidates are bucketed by Type into fullSyncs, partials, and diff syncs. SyncTypeFull and any unrecognized type go into fullSyncs (matches the SQLite default branch).
  • syncLimit is the number of *additional* full syncs to retain beyond the current one. The caller has already decremented for a running sync (see resolveSyncLimit), so this function trusts the value verbatim. When fullSyncs > syncLimit, the oldest overflow is selected for deletion.
  • Once the earliest-kept full sync is established, partials that ended before that sync started are selected for deletion.
  • When more than two diff syncs (partial_upserts / partial_deletions) exist, only the most recent diff sync and its linked pair are retained; everything else is selected.

Order matters: callers must pass candidates in oldest-first order so "drop the oldest overflow" trims the right end. SQLite supplies them ordered by autoincrement id; Pebble's IterateAllSyncRuns walks by KSUID (timestamp-sortable) which produces the same ordering for syncs created on this machine.

Types

type C1FOption added in v0.1.8

type C1FOption func(*C1File)

func WithC1FEncoderConcurrency added in v0.6.6

func WithC1FEncoderConcurrency(concurrency int) C1FOption

WithC1FEncoderConcurrency sets the number of created encoders. Default is 1, which disables async encoding/concurrency. 0 uses GOMAXPROCS.

func WithC1FEngine added in v0.10.0

func WithC1FEngine(engine Engine) C1FOption

WithC1FEngine selects the storage engine for new .c1z files. The default is EngineSQLite, which keeps the legacy v1 file format and behavior. EnginePebble selects the v3 engine introduced by the storage-engine-v4 RFC.

Engine selection only affects newly created files. Existing files dispatch on their magic byte; readers handle both v1 and v3 regardless of this option.

func WithC1FPayloadEncoding added in v0.10.0

func WithC1FPayloadEncoding(enc PayloadEncoding) C1FOption

WithC1FPayloadEncoding selects the v3 envelope payload encoding (TAR_ZSTD default, TAR uncompressed). No-op for SQLite engines.

func WithC1FPragma added in v0.1.22

func WithC1FPragma(name string, value string) C1FOption

WithC1FPragma sets a sqlite pragma for the c1z file.

func WithC1FReadOnly added in v0.6.6

func WithC1FReadOnly(readOnly bool) C1FOption

WithC1FReadOnly opens the c1file in read only mode. Write operations will return an error. Read only mode is faster, as it disables journaling and synchronous writes.

func WithC1FSkipCleanup added in v0.8.22

func WithC1FSkipCleanup(skip bool) C1FOption

WithC1FSkipCleanup skips cleanup of old syncs when set to true.

func WithC1FSyncCountLimit added in v0.7.0

func WithC1FSyncCountLimit(limit int) C1FOption

WithC1FSyncCountLimit sets the number of syncs to keep during cleanup. If not set, defaults to 2 (or BATON_KEEP_SYNC_COUNT env var if set).

func WithC1FTmpDir added in v0.1.8

func WithC1FTmpDir(tempDir string) C1FOption

WithC1FTmpDir sets the temporary directory to use when cloning a sync. If not provided, os.TempDir() will be used.

func WithC1FV2GrantsWriter added in v0.9.9

func WithC1FV2GrantsWriter(enabled bool) C1FOption

WithC1FV2GrantsWriter strips Grant.Entitlement and Grant.Principal from the serialized data blob at write time. Readers rebuild them as identity-only stubs (Id + nested Resource.Id) from the grants row's columns. Stubs carry no DisplayName, Annotations, Purpose, Slug, or traits; callers that need those must fetch the Entitlement or Resource directly. Readers accept both shapes regardless of this flag, so old and new rows coexist. Default false.

Per-grant escape hatch: see unsafeForSlim. Grants carrying InsertResourceGrants or any ExternalResourceMatch* annotation stay full-blob — those code paths read non-identity fields off the embedded Resource / Principal.

type C1File added in v0.0.24

type C1File struct {
	// contains filtered or unexported fields
}

func AsSQLiteStore added in v0.8.29

func AsSQLiteStore(s C1ZStore) (*C1File, bool)

AsSQLiteStore type-asserts a C1ZStore to the concrete *C1File. It is an escape hatch for callers that legitimately need SQLite-specific primitives (today: the attached compactor in pkg/synccompactor/attached, which uses SQL ATTACH for cross-file merge). Returns (nil, false) when the store is not backed by *C1File OR when the underlying *C1File is nil.

Avoid using this outside pkg/synccompactor. If you find yourself reaching for it, prefer adding a named method to C1ZStore that expresses what you need; sqlite-specific leak-through is a smell. See RFC 0002 §4.4.

func NewC1File added in v0.0.24

func NewC1File(ctx context.Context, dbFilePath string, opts ...C1FOption) (*C1File, error)

Returns a C1File instance for the given db filepath.

func NewC1ZFile added in v0.0.24

func NewC1ZFile(ctx context.Context, outputFilePath string, opts ...C1ZOption) (*C1File, error)

Returns a new C1File instance with its state stored at the provided filename.

func (*C1File) AttachFile added in v0.3.35

func (c *C1File) AttachFile(other *C1File, dbName string) (*C1FileAttached, error)

func (*C1File) CheckpointSync added in v0.0.24

func (c *C1File) CheckpointSync(ctx context.Context, syncToken string) error

func (*C1File) Cleanup added in v0.0.24

func (c *C1File) Cleanup(ctx context.Context) error

func (*C1File) Clear added in v0.5.0

func (c *C1File) Clear(ctx context.Context, opt ...sessions.SessionStoreOption) error

Clear implements types.SessionStore.

func (*C1File) CloneSync added in v0.0.24

func (c *C1File) CloneSync(ctx context.Context, outPath string, syncID string) error

CloneSync uses sqlite hackery to directly copy the pertinent rows into a new database. 1. Create a new empty sqlite database in a temp file 2. Open the c1z that we are cloning to get a db handle 3. Execute an ATTACH query to bring our empty sqlite db into the context of our db connection 4. Select directly from the cloned db and insert directly into the new database. 5. Close and save the new database as a c1z at the configured path.

func (*C1File) Close added in v0.0.24

func (c *C1File) Close(ctx context.Context) (retErr error)

Close ensures that the sqlite database is flushed to disk, and if any changes were made we update the original database with our changes.

When there is real finalize work to do (rawDb open, dbUpdated, not read-only), the WAL checkpoint, raw-db close, and saveC1z all run on a detached context bounded by FinalizeTimeout(). Caller cancellation cannot truncate the c1z mid-finalize; the new-root finalize span links back to the caller's trace for navigability without bloating the parent trace.

func (*C1File) CurrentDBSizeBytes added in v0.8.27

func (c *C1File) CurrentDBSizeBytes() (int64, error)

CurrentDBSizeBytes returns the current total on-disk size of the underlying uncompressed sqlite database, including the write-ahead log if present. Used by operational tooling (e.g. the grant-expansion progress logger) to observe c1z growth during long in-process writes without waiting for saveC1z to land a new frame.

The WAL file holds writes that have not yet been checkpointed into the main database file; with journal_mode=WAL the main file may stay stable for long stretches while the WAL grows into the hundreds of MB. Summing both gives a representative "bytes written so far" figure during active expansion.

This is the *uncompressed* size. The post-saveC1z c1z file size (compressed) is smaller; for that, see the `c1z: saved` log line emitted by saveC1z.

func (*C1File) CurrentSyncStep added in v0.0.24

func (c *C1File) CurrentSyncStep(ctx context.Context) (string, error)

func (*C1File) Delete added in v0.5.0

func (c *C1File) Delete(ctx context.Context, key string, opt ...sessions.SessionStoreOption) error

Delete implements types.SessionStore.

func (*C1File) DeleteGrant added in v0.2.84

func (c *C1File) DeleteGrant(ctx context.Context, grantId string) error

func (*C1File) DeleteSyncRun added in v0.0.24

func (c *C1File) DeleteSyncRun(ctx context.Context, syncID string) error

DeleteSyncRun removes all the objects with a given syncID from the database.

func (*C1File) EndSync added in v0.0.24

func (c *C1File) EndSync(ctx context.Context) error

EndSync updates the current sync_run row with the end time, and removes any other objects that don't have the current sync ID.

func (*C1File) FileOps added in v0.8.29

func (c *C1File) FileOps() FileOps

FileOps returns the file-operations slice of this c1z.

func (*C1File) GenerateSyncDiff added in v0.3.2

func (c *C1File) GenerateSyncDiff(ctx context.Context, baseSyncID string, appliedSyncID string) (string, error)

func (*C1File) Get added in v0.5.0

func (c *C1File) Get(ctx context.Context, key string, opt ...sessions.SessionStoreOption) ([]byte, bool, error)

Get implements types.SessionCache.

func (*C1File) GetAll added in v0.5.0

func (c *C1File) GetAll(ctx context.Context, pageToken string, opt ...sessions.SessionStoreOption) (map[string][]byte, string, error)

GetAll implements types.SessionStore.

func (*C1File) GetAsset added in v0.0.24

func (c *C1File) GetAsset(ctx context.Context, request *v2.AssetServiceGetAssetRequest) (string, io.Reader, error)

GetAsset fetches the specified asset from the database, and returns the content type and an io.Reader for the caller to read the asset from.

func (*C1File) GetMany added in v0.5.0

func (c *C1File) GetMany(ctx context.Context, keys []string, opt ...sessions.SessionStoreOption) (map[string][]byte, []string, error)

GetMany implements types.SessionStore.

func (*C1File) GetSync added in v0.2.83

func (*C1File) Grants added in v0.8.29

func (c *C1File) Grants() GrantStore

Grants returns the grant-store slice of this c1z.

func (*C1File) InitTables added in v0.6.5

func (c *C1File) InitTables(ctx context.Context) (bool, error)

InitTables initializes the tables in the database. Returns true if the any migrations were run, false otherwise.

func (*C1File) LatestFinishedSyncID added in v0.3.56

func (c *C1File) LatestFinishedSyncID(ctx context.Context, syncType connectorstore.SyncType) (string, error)

func (*C1File) LatestSyncID added in v0.0.24

func (c *C1File) LatestSyncID(ctx context.Context, syncType connectorstore.SyncType) (string, error)

func (*C1File) ListEntitlements added in v0.0.24

func (*C1File) ListEntitlementsByIds added in v0.10.0

ListEntitlementsByIds returns the entitlements for the requested ids in any order. Missing rows are silently omitted; callers detect partial misses by length / id comparison.

func (*C1File) ListGrants added in v0.0.24

func (*C1File) ListGrantsForEntitlements added in v0.10.0

ListGrantsForEntitlements batches K-entitlement grant fetches (RFC §A4). Single SQL query per page with entitlement_id IN (?, ?, ...) — uses the (entitlement_id) index on the grants table.

Cursor encoding mirrors the Pebble adapter:

varint(entitlement_index) || varint(intra_cursor_len) ||
intra_cursor_bytes || crc32(sorted-ent-IDs)

where intra_cursor here is the row-id of the last returned grant (encoded as a decimal string for compatibility with the row-id page-token used elsewhere in this package).

func (*C1File) ListResources added in v0.0.24

func (*C1File) ListResourcesByIds added in v0.10.0

ListResourcesByIds returns the resources for the supplied ResourceId pairs (resource_type:resource composite externals). Missing rows are silently omitted.

func (*C1File) ListSyncRuns added in v0.0.24

func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize uint32) ([]*SyncRun, string, error)

func (*C1File) ListSyncs added in v0.2.83

func (*C1File) Metadata added in v0.10.0

func (c *C1File) Metadata() connectorstore.StoreMetadata

Metadata describes the storage backing this c1z. C1File is the SQLite-engine implementation; the v1 file format is implied by the engine. No I/O is performed.

func (*C1File) OutputFilepath added in v0.3.3

func (c *C1File) OutputFilepath() (string, error)

func (*C1File) PreviousSyncID added in v0.0.24

func (c *C1File) PreviousSyncID(ctx context.Context, syncType connectorstore.SyncType) (string, error)

func (*C1File) PutAsset added in v0.0.24

func (c *C1File) PutAsset(ctx context.Context, assetRef *v2.AssetRef, contentType string, data []byte) error

PutAsset stores the given asset in the database.

func (*C1File) PutEntitlements added in v0.1.46

func (c *C1File) PutEntitlements(ctx context.Context, entitlementObjs ...*v2.Entitlement) error

func (*C1File) PutGrants added in v0.1.46

func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error

PutGrants is the connector-facing write method on connectorstore.Writer. It replaces any conflicting row and re-extracts expansion metadata from the grant payload.

func (*C1File) PutResourceTypes added in v0.1.46

func (c *C1File) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error

func (*C1File) PutResources added in v0.1.46

func (c *C1File) PutResources(ctx context.Context, resourceObjs ...*v2.Resource) error

func (*C1File) ResumeSync added in v0.3.56

func (c *C1File) ResumeSync(ctx context.Context, syncType connectorstore.SyncType, syncID string) (string, error)

func (*C1File) Set added in v0.5.0

func (c *C1File) Set(ctx context.Context, key string, value []byte, opt ...sessions.SessionStoreOption) error

Set implements types.SessionStore.

func (*C1File) SetCurrentSync added in v0.3.8

func (c *C1File) SetCurrentSync(ctx context.Context, syncID string) error

func (*C1File) SetMany added in v0.5.0

func (c *C1File) SetMany(ctx context.Context, values map[string][]byte, opt ...sessions.SessionStoreOption) error

SetMany implements types.SessionStore.

func (*C1File) SetSupportsDiff added in v0.7.22

func (c *C1File) SetSupportsDiff(ctx context.Context, syncID string) error

SetSupportsDiff marks the given sync as supporting diff operations. This indicates the sync has SQL-layer grant metadata (is_expandable) properly populated.

func (*C1File) SetSyncID added in v0.6.8

func (c *C1File) SetSyncID(_ context.Context, syncID string) error

SetSyncID sets the current sync ID. This is only intended for testing.

func (*C1File) StartNewSync added in v0.2.15

func (c *C1File) StartNewSync(ctx context.Context, syncType connectorstore.SyncType, parentSyncID string) (string, error)

func (*C1File) StartOrResumeSync added in v0.3.52

func (c *C1File) StartOrResumeSync(ctx context.Context, syncType connectorstore.SyncType, syncID string) (string, bool, error)

StartOrResumeSync checks if a sync is already running and resumes it if it is. If no sync is running, it starts a new sync. It returns the sync ID and a boolean indicating if a new sync was started.

func (*C1File) Stats added in v0.0.24

func (c *C1File) Stats(ctx context.Context, syncType connectorstore.SyncType, syncId string) (map[string]int64, error)

Stats introspects the database and returns the count of objects for the given sync run. If syncId is empty, it will use the latest sync run of the given type.

func (*C1File) StoreExpandedGrants added in v0.8.29

func (c *C1File) StoreExpandedGrants(ctx context.Context, grants ...*v2.Grant) error

StoreExpandedGrants persists grants produced by the expander. It strips any residual GrantExpandable annotation from each grant payload and then delegates to the internal UpsertGrants path using PreserveExpansion mode so existing expansion/needs_expansion columns are left untouched.

The strip step is defensive: the expander's upstream callers consume the annotation before writing, but a residual annotation on the payload would disagree with the stored expansion columns. Stripping makes the method total regardless of caller discipline.

This method is exposed on *C1File (not just via GrantStore) because test helpers in pkg/sync/expand construct a *C1File directly and pass it to NewExpander; putting it at the top level keeps those tests free of sub-store wiring.

func (*C1File) StreamEntitlements added in v0.10.0

func (c *C1File) StreamEntitlements(
	ctx context.Context,
	syncID string,
) iter.Seq2[*v2.Entitlement, error]

StreamEntitlements yields entitlements for the active (or requested) sync.

func (*C1File) StreamGrants added in v0.10.0

func (c *C1File) StreamGrants(
	ctx context.Context,
	syncID string,
	opts connectorstore.StreamGrantsOptions,
) iter.Seq2[*v2.Grant, error]

StreamGrants yields grants for the active (or requested) sync. Implements connectorstore.StreamingReader. SQLite path uses Rows.Next() as the streaming pump — no client-side pagination buffer.

func (*C1File) StreamResources added in v0.10.0

func (c *C1File) StreamResources(
	ctx context.Context,
	syncID string,
	opts connectorstore.StreamResourcesOptions,
) iter.Seq2[*v2.Resource, error]

StreamResources yields resources optionally filtered by RT.

func (*C1File) SyncMeta added in v0.8.29

func (c *C1File) SyncMeta() SyncMeta

SyncMeta returns the sync-metadata slice of this c1z.

func (*C1File) Vacuum added in v0.1.8

func (c *C1File) Vacuum(ctx context.Context) error

Vacuum runs a VACUUM on the database to reclaim space.

func (*C1File) ViewSync added in v0.0.24

func (c *C1File) ViewSync(ctx context.Context, syncID string) error

type C1FileAttached added in v0.3.35

type C1FileAttached struct {
	// contains filtered or unexported fields
}

func (*C1FileAttached) CompactEntitlements added in v0.3.35

func (c *C1FileAttached) CompactEntitlements(ctx context.Context, baseSyncID string, appliedSyncID string) error

func (*C1FileAttached) CompactGrants added in v0.3.35

func (c *C1FileAttached) CompactGrants(ctx context.Context, baseSyncID string, appliedSyncID string) error

func (*C1FileAttached) CompactResourceTypes added in v0.3.35

func (c *C1FileAttached) CompactResourceTypes(ctx context.Context, baseSyncID string, appliedSyncID string) error

func (*C1FileAttached) CompactResources added in v0.3.35

func (c *C1FileAttached) CompactResources(ctx context.Context, baseSyncID string, appliedSyncID string) error

func (*C1FileAttached) CompactTable added in v0.3.35

func (c *C1FileAttached) CompactTable(ctx context.Context, baseSyncID string, appliedSyncID string, tableName string) error

func (*C1FileAttached) DetachFile added in v0.3.35

func (c *C1FileAttached) DetachFile(dbName string) (*C1FileAttached, error)

func (*C1FileAttached) GenerateSyncDiffFromFile added in v0.7.0

func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID string, newSyncID string) (string, string, error)

GenerateSyncDiffFromFile compares the old sync (in attached) with the new sync (in main) and generates two new syncs in the main database.

IMPORTANT: This assumes main=NEW/compacted and attached=OLD/base: - diffTableFromAttached: items in attached (OLD) not in main (NEW) = deletions - diffTableFromMain: items in main (NEW) not in attached (OLD) = upserts (additions)

Parameters: - oldSyncID: the sync ID in the attached database (OLD/base state) - newSyncID: the sync ID in the main database (NEW/compacted state)

Returns (upsertsSyncID, deletionsSyncID, error).

func (*C1FileAttached) UpdateSync added in v0.6.11

func (c *C1FileAttached) UpdateSync(ctx context.Context, baseSync *reader_v2.SyncRun, appliedSync *reader_v2.SyncRun) error

type C1ZFormat added in v0.10.0

type C1ZFormat int

C1ZFormat identifies the on-disk format of a .c1z file. The format byte is the first 5 bytes of the file; see ReadHeaderFormat.

const (
	// C1ZFormatUnknown is the zero value. Returned when the header bytes
	// match neither the v1 nor the v3 magic, or when the read failed.
	C1ZFormatUnknown C1ZFormat = iota

	// C1ZFormatV1 is the original .c1z format: 5-byte magic "C1ZF\x00"
	// followed by a zstd-compressed SQLite database.
	C1ZFormatV1

	// C1ZFormatV3 is the v3 format introduced by the storage-engine-v4
	// RFC: 5-byte magic "C1Z3\x00", a length-prefixed proto manifest,
	// and a zstd-tar payload of a Pebble Checkpoint directory.
	C1ZFormatV3
)

func ReadHeaderFormat added in v0.10.0

func ReadHeaderFormat(reader io.Reader) (C1ZFormat, error)

ReadHeaderFormat reads the first 5 bytes of reader and returns the detected format. On return, the reader is positioned immediately after the header bytes. If reader is also an io.Seeker, it is rewound to offset 0 before reading.

Returns:

  • C1ZFormatV1, nil — file starts with "C1ZF\x00".
  • C1ZFormatV3, nil — file starts with "C1Z3\x00".
  • C1ZFormatUnknown, ErrInvalidFile — header matched no known magic.
  • C1ZFormatUnknown, err — underlying read error.

func (C1ZFormat) String added in v0.10.0

func (f C1ZFormat) String() string

String returns a stable human-readable name for the format.

type C1ZOption added in v0.1.8

type C1ZOption func(*c1zOptions)

func WithDecoderOptions added in v0.3.50

func WithDecoderOptions(opts ...DecoderOption) C1ZOption

func WithEncoderConcurrency added in v0.6.6

func WithEncoderConcurrency(concurrency int) C1ZOption

WithEncoderConcurrency sets the number of created encoders. Default is 1, which disables async encoding/concurrency. 0 uses GOMAXPROCS.

func WithEngine added in v0.10.0

func WithEngine(engine Engine) C1ZOption

WithEngine selects the storage engine for newly created .c1z files. Default is EngineSQLite (v1 format). EnginePebble enables the v3 engine.

Reading existing files dispatches on the file's magic byte and is independent of this option.

func WithPayloadEncoding added in v0.10.0

func WithPayloadEncoding(enc PayloadEncoding) C1ZOption

WithPayloadEncoding selects the c1z v3 envelope payload encoding for newly created files written by the Pebble engine. Default is PayloadEncodingTarZstd. PayloadEncodingTar skips the outer zstd compression — useful when Pebble's L5/L6 SSTs are already zstd-compressed at the engine layer, or when the storage target (S3 with Content-Encoding negotiation, etc.) compresses in transit.

No-op for SQLite engines; the encoding selector applies only to the v3 envelope written by Pebble.

func WithPragma added in v0.1.22

func WithPragma(name string, value string) C1ZOption

WithPragma sets a sqlite pragma for the c1z file.

func WithReadOnly added in v0.6.6

func WithReadOnly(readOnly bool) C1ZOption

WithReadOnly opens the c1z file in read only mode. Modifying the c1z will result in an error on close.

func WithSkipCleanup added in v0.8.22

func WithSkipCleanup(skip bool) C1ZOption

WithSkipCleanup skips cleanup of old syncs when set to true.

func WithSyncLimit added in v0.7.0

func WithSyncLimit(limit int) C1ZOption

WithSyncLimit sets the number of syncs to keep during cleanup. If not set, defaults to 2 (or BATON_KEEP_SYNC_COUNT env var if set).

func WithTmpDir added in v0.1.8

func WithTmpDir(tmpDir string) C1ZOption

WithTmpDir sets the temporary directory to extract the c1z file to. If not provided, os.TempDir() will be used.

func WithV2GrantsWriter added in v0.9.9

func WithV2GrantsWriter(enabled bool) C1ZOption

WithV2GrantsWriter toggles the slim-blob writer path for grants. See WithC1FV2GrantsWriter for details.

type C1ZStore added in v0.8.29

type C1ZStore interface {
	connectorstore.Writer

	// Grants returns the grant-specific sub-store.
	Grants() GrantStore

	// SyncMeta returns the sync-run metadata sub-store.
	SyncMeta() SyncMeta

	// FileOps returns the file-level operations sub-store.
	FileOps() FileOps

	// Close releases resources, flushing any pending writes.
	// Overrides the embedded Reader.Close to document this signature.
	Close(ctx context.Context) error
}

C1ZStore is the internal contract used by the sync pipeline, compactor, and related infrastructure to read and write a .c1z file. It is NOT the connector-facing contract — connectors write through connectorstore.Writer, which is embedded here but stays the stable API surface for external code.

C1ZStore adds three kinds of operations that the sync pipeline needs but connectors don't:

  • Grant operations with expansion-aware semantics, accessed via Grants().
  • Sync-run metadata operations, accessed via SyncMeta().
  • File-level operations (clone, diff), accessed via FileOps().

Implementations:

  • *C1File — the original SQLite-backed implementation (pkg/dotc1z/c1file.go).
  • *pebble.registeredStore — the Pebble v3 engine implementation opened via NewStore(WithEngine(EnginePebble)) (pkg/dotc1z/engine/pebble/register.go). The compile-time guard there asserts the interface is satisfied.

New implementations are expected; treat the interface methods as load-bearing and the implementation list above as informational.

The Close override replaces connectorstore.Reader.Close with an explicit context parameter (*C1File.Close already takes context on main, so this is exact match, no behavior change).

type DecoderOption added in v0.0.24

type DecoderOption func(*decoderOptions) error

DecoderOption is an option for creating a decoder.

func WithContext

func WithContext(ctx context.Context) DecoderOption

WithContext sets a context, when cancelled, will cause subequent calls to Read() to return ctx.Error().

func WithDecoderConcurrency added in v0.3.50

func WithDecoderConcurrency(n int) DecoderOption

WithDecoderConcurrency sets the number of created decoders. Default is 1, which disables async decoding/concurrency. 0 uses GOMAXPROCS. -1 uses GOMAXPROCS or 4, whichever is lower.

func WithDecoderMaxDecodedSize

func WithDecoderMaxDecodedSize(n uint64) DecoderOption

WithDecoderMaxDecodedSize sets the maximum size of the decoded stream. This can be used to cap the resulting decoded stream size. Maximum is 1 << 63 bytes. Default is 1GiB.

func WithDecoderMaxMemory

func WithDecoderMaxMemory(n uint64) DecoderOption

WithDecoderMaxMemory sets the maximum window size for streaming operations. This can be used to control memory usage of potentially hostile content. Maximum is 1 << 63 bytes. Default is 128MiB.

type Engine added in v0.10.0

type Engine string

Engine identifies a storage engine implementation. The engine is chosen by callers via WithEngine(...) on write; on read, the engine is dictated by the file's magic byte and (for v3) the manifest's engine field.

const (
	// EngineSQLite is the default engine: the v1 .c1z format backed by
	// a zstd-compressed SQLite database. Connectors use this; backend
	// infra can opt out.
	EngineSQLite Engine = "sqlite"

	// EnginePebble is the v3 engine: a Pebble LSM wrapped in the v3
	// envelope.
	EnginePebble Engine = "pebble"
)

type EngineDriver added in v0.10.0

type EngineDriver interface {
	Engine() Engine
	Format() C1ZFormat
	OpenStore(ctx context.Context, outputFilePath string, opts StoreOptions) (connectorstore.Writer, error)
}

EngineDriver opens a .c1z file for a specific storage engine. Drivers live outside the dotc1z package when they carry optional dependencies. The default SQLite driver is registered by this package; Pebble registers from pkg/dotc1z/engine/pebble when callers explicitly import and register it.

func EngineDriverFor added in v0.10.0

func EngineDriverFor(engine Engine) (EngineDriver, bool)

EngineDriverFor returns the registered driver for engine.

type FileOps added in v0.8.29

type FileOps interface {
	// CloneSync materializes the given sync run's data into a freshly
	// created standalone c1z file at outPath. Used by c1's storeCompletedSyncC1Z
	// activity to archive a completed sync.
	CloneSync(ctx context.Context, outPath string, syncID string) error

	// GenerateSyncDiff computes the diff between two existing sync runs
	// in this same file and writes the delta as a new SyncTypePartial
	// sync. Returns the new sync's id. Used by the local differ CLI.
	GenerateSyncDiff(ctx context.Context, baseSyncID, appliedSyncID string) (diffSyncID string, err error)
}

FileOps is the file-level operations sub-store of C1ZStore. It covers operations that span sync runs or produce new sync runs within the same c1z file. Cross-file operations (SQL ATTACH) live in pkg/synccompactor, not here.

type GrantAnnotation added in v0.8.29

type GrantAnnotation struct {
	Grant      *v2.Grant
	Annotation *v2.GrantExpandable // nil if not expandable

	GrantExternalID         string
	TargetEntitlementID     string
	PrincipalResourceTypeID string
	PrincipalResourceID     string
	NeedsExpansion          bool
}

GrantAnnotation is a row yielded by GrantStore.ListWithAnnotations.

Grant is always populated. Annotation is nil when the grant has no GrantExpandable annotation.

Identity fields (GrantExternalID, TargetEntitlementID, PrincipalResourceTypeID, PrincipalResourceID) are ALWAYS populated from the underlying grant proto, regardless of whether Annotation is nil, so callers can use them without branching.

NeedsExpansion is only meaningful when Annotation is non-nil; it reports the stored needs_expansion column for expandable grants. For non-expandable grants it is false.

type GrantStore added in v0.8.29

type GrantStore interface {
	// StoreExpandedGrants writes grants produced by the expander back to
	// storage. The expander has already consumed each grant's
	// GrantExpandable annotation and performed expansion; storage must
	// persist the grant payload without re-extracting expansion metadata
	// (otherwise we'd corrupt expansion state that has already been
	// marked "done"). For robustness, any residual GrantExpandable
	// annotation on the input is stripped from the persisted payload.
	//
	// Called by pkg/sync/expand.PutGrantsInChunks.
	StoreExpandedGrants(ctx context.Context, grants ...*v2.Grant) error

	// PendingExpansionPage returns the next page of grants whose
	// expansion metadata still needs processing (needs_expansion=1).
	// Grant payloads are NOT materialized — only identity plus the
	// parsed expansion annotation — because the caller only needs
	// expansion metadata and this keeps the hot path cheap.
	//
	// Caller drives pagination by passing the prior nextPageToken as
	// pageToken; an empty nextPageToken means the page was the last.
	// Initial call passes pageToken="".
	//
	// Called by pkg/sync.syncer.ExpandGrants, which checkpoints page
	// tokens in its action state.
	PendingExpansionPage(ctx context.Context, pageToken string) (defs []PendingExpansion, nextPageToken string, err error)

	// PendingExpansion walks all pages of PendingExpansionPage and
	// yields each row. Convenience wrapper for callers that don't
	// checkpoint pagination.
	//
	// ITERATION CONTRACT: on error, the sequence yields a final
	// (zero-value PendingExpansion, err) pair and terminates. Callers
	// MUST check the second return value on every iteration:
	//
	//     for pe, err := range store.Grants().PendingExpansion(ctx) {
	//         if err != nil { return err }
	//         // ... use pe ...
	//     }
	//
	// Single-variable ranging (for pe := range seq) silently drops
	// errors and is a bug.
	PendingExpansion(ctx context.Context) iter.Seq2[PendingExpansion, error]

	// ListWithAnnotationsPage returns the next page of grants with
	// their expansion annotations inline. Grant payloads are fully
	// materialized. Used by the external-principal post-processing
	// step which needs full grants plus expansion.
	//
	// Caller drives pagination by passing the prior nextPageToken as
	// pageToken.
	ListWithAnnotationsPage(ctx context.Context, pageToken string) (rows []GrantAnnotation, nextPageToken string, err error)

	// ListWithAnnotationsForResourcePage filters ListWithAnnotationsPage
	// to grants on the given resource. Used by the c1-side
	// fileClientWrapper that emulates a connector from a c1z file and
	// forwards a ListGrants RPC whose request has a Resource filter.
	//
	// Page size defaults to the engine's maximum when pageSize == 0.
	ListWithAnnotationsForResourcePage(
		ctx context.Context,
		resource *v2.Resource,
		syncID string,
		pageToken string,
		pageSize uint32,
	) (rows []GrantAnnotation, nextPageToken string, err error)

	// ListWithAnnotations walks all pages of ListWithAnnotationsPage
	// and yields each row. Convenience wrapper for callers that don't
	// checkpoint pagination.
	//
	// ITERATION CONTRACT: same as PendingExpansion. On error, the
	// sequence yields (zero-value GrantAnnotation, err) then
	// terminates. Callers MUST check err on every iteration.
	//
	// Called by pkg/sync.syncer.listAllGrantsWithExpansion.
	ListWithAnnotations(ctx context.Context) iter.Seq2[GrantAnnotation, error]
}

GrantStore is the grant-specific slice of C1ZStore. Each method maps to a specific caller intent in the sync pipeline — no mode enums, no option structs with behavior switches.

The expansion-reading methods come in two flavors: a page-at-a-time primitive (PendingExpansionPage, ListWithAnnotationsPage) that caller- driven state machines use to checkpoint progress, and an all-pages iterator (PendingExpansion, ListWithAnnotations) that wraps the page primitive for callers who want to walk everything in one pass.

type PayloadEncoding added in v0.10.0

type PayloadEncoding int

PayloadEncoding selects the v3 envelope payload framing. Only the Pebble engine consults this; SQLite engines ignore it. The wire numbers match the matching proto enum values in c1.c1z.v3.PayloadEncoding.

const (
	// PayloadEncodingUnspecified is the zero value. Means "use the
	// engine's default" — TarZstd for Pebble.
	PayloadEncodingUnspecified PayloadEncoding = 0

	// PayloadEncodingTarZstd is the default Pebble v3 envelope
	// encoding: tar of the Pebble directory, compressed with zstd.
	PayloadEncodingTarZstd PayloadEncoding = 3

	// PayloadEncodingTar is uncompressed tar. Useful when Pebble's
	// L5/L6 SSTs are already zstd-compressed at the engine layer
	// (avoids double-compression CPU), or when the storage target
	// compresses in transit.
	PayloadEncodingTar PayloadEncoding = 4
)

func (PayloadEncoding) String added in v0.10.0

func (e PayloadEncoding) String() string

String returns a stable human-readable name for the encoding.

type PendingExpansion added in v0.8.29

type PendingExpansion struct {
	// GrantExternalID is the external id of the grant this expansion
	// applies to (matches v2.Grant.Id).
	GrantExternalID string

	// TargetEntitlementID is the entitlement id granted to the principal.
	TargetEntitlementID string

	// PrincipalResourceTypeID and PrincipalResourceID identify the principal.
	PrincipalResourceTypeID string
	PrincipalResourceID     string

	// Annotation is the grant's expansion annotation. Non-nil by
	// construction: a row only appears in PendingExpansion if it has one.
	Annotation *v2.GrantExpandable

	// NeedsExpansion is the current needs_expansion column value.
	// Always true for rows returned by PendingExpansion — included
	// for parity with the underlying row shape and future flexibility.
	NeedsExpansion bool
}

PendingExpansion is a lightweight row yielded by GrantStore.PendingExpansion. It carries grant identity and the parsed expansion annotation without materializing the full grant payload — this keeps the expansion-worker hot path out of proto.Unmarshal.

The syncer uses GrantExternalID to look up the grant and uses Annotation to decide what expansion work to enqueue.

type SessionStore added in v0.5.0

type SessionStore interface {
	sessions.SessionStore
}

type StoreOptions added in v0.10.0

type StoreOptions struct {
	TmpDir             string
	Pragmas            []StorePragma
	DecoderOptions     []DecoderOption
	ReadOnly           bool
	EncoderConcurrency int
	SyncLimit          int
	SkipCleanup        bool
	V2GrantsWriter     bool
	Engine             Engine

	// PayloadEncoding selects the v3 envelope payload framing for
	// engines that produce a v3 envelope (currently Pebble). Zero
	// value means "engine default" (PayloadEncodingTarZstd for Pebble).
	PayloadEncoding PayloadEncoding
}

StoreOptions is the engine-neutral form of C1ZOption values passed to a registered engine driver.

type StorePragma added in v0.10.0

type StorePragma struct {
	Name  string
	Value string
}

StorePragma is a SQLite pragma forwarded through StoreOptions. Non-SQLite engines may ignore it.

type SyncMeta added in v0.8.29

type SyncMeta interface {
	// MarkSyncSupportsDiff sets the supports_diff flag on the given sync.
	// Called by pkg/sync.parallelSyncer after graph construction to signal
	// that the sync run has SQL-layer grant metadata populated and diff
	// consumers may rely on it.
	MarkSyncSupportsDiff(ctx context.Context, syncID string) error

	// LatestFullSync returns the most-recently-finished SyncTypeFull sync
	// run, or nil if none exists. Used by pkg/sync.syncer to compute the
	// "previous sync" reference for etag-based grant reuse.
	LatestFullSync(ctx context.Context) (*SyncRun, error)

	// LatestFinishedSyncOfAnyType returns the most-recently-finished sync
	// of any type (including diff types), or nil if none exists. Used by
	// tooling that wants to inspect whatever sync finished last regardless
	// of type.
	LatestFinishedSyncOfAnyType(ctx context.Context) (*SyncRun, error)

	// Stats returns a map of table-name to row-count for the given sync.
	// If syncID is empty, the latest sync of the given type is used.
	// Mirrors the existing *C1File.Stats signature exactly.
	Stats(ctx context.Context, syncType connectorstore.SyncType, syncID string) (map[string]int64, error)
}

SyncMeta is the sync-run-metadata sub-store of C1ZStore. It covers operations that read/write the sync_runs table without going through the gRPC Reader/Writer surfaces.

All methods are callable without an active sync.

type SyncRun added in v0.8.29

type SyncRun struct {
	ID           string
	StartedAt    *time.Time
	EndedAt      *time.Time
	SyncToken    string
	Type         connectorstore.SyncType
	ParentSyncID string
	LinkedSyncID string
	SupportsDiff bool
	Stats        *reader_v2.SyncStats
}

SyncRun is the exported shape of a sync run. It corresponds directly to the internal syncRun struct; the fields match the sync_runs schema.

Callers typically only read ID, Type, and the timestamps; the rest is included for completeness and for use by tooling (e.g. sync-diff pipelines need ParentSyncID and LinkedSyncID).

Directories

Path Synopsis
engine
equivalence
Package equivalence runs the same workload through two storage backends and asserts they produce equivalent results across every reader method.
Package equivalence runs the same workload through two storage backends and asserts they produce equivalent results across every reader method.
pebble
Package pebble is the v3 storage engine for baton-sdk.
Package pebble is the v3 storage engine for baton-sdk.
pebble/codec
Package codec implements the v3 storage engine's record codec layer.
Package codec implements the v3 storage engine's record codec layer.
pebble/microtests
Package microtests holds the 5 risk-validation tests that were written to prove the material design choices in RFC 0004 (storage engine v4) actually hold under Pebble's real behavior.
Package microtests holds the 5 risk-validation tests that were written to prove the material design choices in RFC 0004 (storage engine v4) actually hold under Pebble's real behavior.
format
v3
Package v3 implements the C1Z3 envelope format: 5-byte magic, a length-prefixed proto manifest, and a payload (typically a zstd-tar of a Pebble directory).
Package v3 implements the C1Z3 envelope format: 5-byte magic, a length-prefixed proto manifest, and a payload (typically a zstd-tar of a Pebble directory).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL