Documentation
¶
Index ¶
- Variables
- func C1ZFileCheckHeader(f io.ReadSeeker) (bool, error)
- func NewC1FileReader(ctx context.Context, dbFilePath string, opts ...C1FOption) (connectorstore.Reader, error)
- func NewC1ZFileDecoder(f io.Reader, opts ...DecoderOption) (io.ReadCloser, error)
- func NewDecoder(f io.Reader, opts ...DecoderOption) (*decoder, error)
- func NewExternalC1FileReader(ctx context.Context, tmpDir string, externalResourceC1ZPath string) (connectorstore.Reader, error)
- func ReadHeader(reader io.Reader) error
- type C1FOption
- type C1File
- func (c *C1File) AttachFile(other *C1File, dbName string) (*C1FileAttached, error)
- func (c *C1File) CheckpointSync(ctx context.Context, syncToken string) error
- func (c *C1File) Cleanup(ctx context.Context) error
- func (c *C1File) Clear(ctx context.Context, opt ...sessions.SessionStoreOption) error
- func (c *C1File) CloneSync(ctx context.Context, outPath string, syncID string) error
- func (c *C1File) Close(ctx context.Context) error
- func (c *C1File) CurrentDBSizeBytes() (int64, error)
- func (c *C1File) CurrentSyncStep(ctx context.Context) (string, error)
- func (c *C1File) Delete(ctx context.Context, key string, opt ...sessions.SessionStoreOption) error
- func (c *C1File) DeleteGrant(ctx context.Context, grantId string) error
- func (c *C1File) DeleteSyncRun(ctx context.Context, syncID string) error
- func (c *C1File) EndSync(ctx context.Context) error
- func (c *C1File) FileOps() FileOps
- func (c *C1File) GenerateSyncDiff(ctx context.Context, baseSyncID string, appliedSyncID string) (string, error)
- func (c *C1File) Get(ctx context.Context, key string, opt ...sessions.SessionStoreOption) ([]byte, bool, error)
- func (c *C1File) GetAll(ctx context.Context, pageToken string, opt ...sessions.SessionStoreOption) (map[string][]byte, string, error)
- func (c *C1File) GetAsset(ctx context.Context, request *v2.AssetServiceGetAssetRequest) (string, io.Reader, error)
- func (c *C1File) GetEntitlement(ctx context.Context, ...) (*reader_v2.EntitlementsReaderServiceGetEntitlementResponse, error)
- func (c *C1File) GetGrant(ctx context.Context, request *reader_v2.GrantsReaderServiceGetGrantRequest) (*reader_v2.GrantsReaderServiceGetGrantResponse, error)
- func (c *C1File) GetLatestFinishedSync(ctx context.Context, ...) (*reader_v2.SyncsReaderServiceGetLatestFinishedSyncResponse, error)
- func (c *C1File) GetMany(ctx context.Context, keys []string, opt ...sessions.SessionStoreOption) (map[string][]byte, []string, error)
- func (c *C1File) GetResource(ctx context.Context, ...) (*reader_v2.ResourcesReaderServiceGetResourceResponse, error)
- func (c *C1File) GetResourceType(ctx context.Context, ...) (*reader_v2.ResourceTypesReaderServiceGetResourceTypeResponse, error)
- func (c *C1File) GetSync(ctx context.Context, request *reader_v2.SyncsReaderServiceGetSyncRequest) (*reader_v2.SyncsReaderServiceGetSyncResponse, error)
- func (c *C1File) GrantStats(ctx context.Context, syncType connectorstore.SyncType, syncId string) (map[string]int64, error)
- func (c *C1File) Grants() GrantStore
- func (c *C1File) InitTables(ctx context.Context) error
- func (c *C1File) LatestFinishedSyncID(ctx context.Context, syncType connectorstore.SyncType) (string, error)
- func (c *C1File) LatestSyncID(ctx context.Context, syncType connectorstore.SyncType) (string, error)
- func (c *C1File) ListEntitlements(ctx context.Context, request *v2.EntitlementsServiceListEntitlementsRequest) (*v2.EntitlementsServiceListEntitlementsResponse, error)
- func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error)
- func (c *C1File) ListGrantsForEntitlement(ctx context.Context, ...) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error)
- func (c *C1File) ListGrantsForPrincipal(ctx context.Context, ...) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error)
- func (c *C1File) ListGrantsForResourceType(ctx context.Context, ...) (*reader_v2.GrantsReaderServiceListGrantsForResourceTypeResponse, error)
- func (c *C1File) ListResourceTypes(ctx context.Context, request *v2.ResourceTypesServiceListResourceTypesRequest) (*v2.ResourceTypesServiceListResourceTypesResponse, error)
- func (c *C1File) ListResources(ctx context.Context, request *v2.ResourcesServiceListResourcesRequest) (*v2.ResourcesServiceListResourcesResponse, error)
- func (c *C1File) ListStaticEntitlements(ctx context.Context, ...) (*v2.EntitlementsServiceListStaticEntitlementsResponse, error)
- func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize uint32) ([]*syncRun, string, error)
- func (c *C1File) ListSyncs(ctx context.Context, request *reader_v2.SyncsReaderServiceListSyncsRequest) (*reader_v2.SyncsReaderServiceListSyncsResponse, error)
- func (c *C1File) OutputFilepath() (string, error)
- func (c *C1File) PreviousSyncID(ctx context.Context, syncType connectorstore.SyncType) (string, error)
- func (c *C1File) PutAsset(ctx context.Context, assetRef *v2.AssetRef, contentType string, data []byte) error
- func (c *C1File) PutEntitlements(ctx context.Context, entitlementObjs ...*v2.Entitlement) error
- func (c *C1File) PutEntitlementsIfNewer(ctx context.Context, entitlementObjs ...*v2.Entitlement) error
- func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error
- func (c *C1File) PutGrantsIfNewer(ctx context.Context, bulkGrants ...*v2.Grant) error
- func (c *C1File) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error
- func (c *C1File) PutResourceTypesIfNewer(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error
- func (c *C1File) PutResources(ctx context.Context, resourceObjs ...*v2.Resource) error
- func (c *C1File) PutResourcesIfNewer(ctx context.Context, resourceObjs ...*v2.Resource) error
- func (c *C1File) ResumeSync(ctx context.Context, syncType connectorstore.SyncType, syncID string) (string, error)
- func (c *C1File) Set(ctx context.Context, key string, value []byte, ...) error
- func (c *C1File) SetCurrentSync(ctx context.Context, syncID string) error
- func (c *C1File) SetMany(ctx context.Context, values map[string][]byte, ...) error
- func (c *C1File) SetSupportsDiff(ctx context.Context, syncID string) error
- func (c *C1File) SetSyncID(_ context.Context, syncID string) error
- func (c *C1File) StartNewSync(ctx context.Context, syncType connectorstore.SyncType, parentSyncID string) (string, error)
- func (c *C1File) StartOrResumeSync(ctx context.Context, syncType connectorstore.SyncType, syncID string) (string, bool, error)
- func (c *C1File) Stats(ctx context.Context, syncType connectorstore.SyncType, syncId string) (map[string]int64, error)
- func (c *C1File) StoreExpandedGrants(ctx context.Context, grants ...*v2.Grant) error
- func (c *C1File) SyncMeta() SyncMeta
- func (c *C1File) Vacuum(ctx context.Context) error
- func (c *C1File) ViewSync(ctx context.Context, syncID string) error
- type C1FileAttached
- func (c *C1FileAttached) CompactEntitlements(ctx context.Context, baseSyncID string, appliedSyncID string) error
- func (c *C1FileAttached) CompactGrants(ctx context.Context, baseSyncID string, appliedSyncID string) error
- func (c *C1FileAttached) CompactResourceTypes(ctx context.Context, baseSyncID string, appliedSyncID string) error
- func (c *C1FileAttached) CompactResources(ctx context.Context, baseSyncID string, appliedSyncID string) error
- func (c *C1FileAttached) CompactTable(ctx context.Context, baseSyncID string, appliedSyncID string, tableName string) error
- func (c *C1FileAttached) DetachFile(dbName string) (*C1FileAttached, error)
- func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID string, newSyncID string) (string, string, error)
- func (c *C1FileAttached) UpdateSync(ctx context.Context, baseSync *reader_v2.SyncRun, ...) error
- type C1ZOption
- func WithDecoderOptions(opts ...DecoderOption) C1ZOption
- func WithEncoderConcurrency(concurrency int) C1ZOption
- func WithPragma(name string, value string) C1ZOption
- func WithReadOnly(readOnly bool) C1ZOption
- func WithSkipCleanup(skip bool) C1ZOption
- func WithSyncLimit(limit int) C1ZOption
- func WithTmpDir(tmpDir string) C1ZOption
- type C1ZStore
- type DecoderOption
- type FileOps
- type GrantAnnotation
- type GrantStore
- type PendingExpansion
- type SessionStore
- type SyncMeta
- type SyncRun
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidFile = fmt.Errorf("c1z: invalid file") ErrMaxSizeExceeded = fmt.Errorf("c1z: max decoded size exceeded, increase DecoderMaxDecodedSize using %v environment variable", maxDecodedSizeEnvVar) ErrWindowSizeExceeded = fmt.Errorf("c1z: window size exceeded, increase DecoderMaxMemory using %v environment variable", maxDecoderMemorySizeEnv) )
var C1ZFileHeader = []byte("C1ZF\x00")
var ErrDbNotOpen = errors.New("c1file: database has not been opened")
var ErrReadOnly = errors.New("c1z: read only mode")
Functions ¶
func C1ZFileCheckHeader ¶
func C1ZFileCheckHeader(f io.ReadSeeker) (bool, error)
C1ZFileCheckHeader reads len(C1ZFileHeader) bytes from the given io.ReadSeeker and compares them to C1ZFileHeader. Returns true if the header is valid. Returns any errors from Read() or Seek(). If a nil error is returned, the given io.ReadSeeker will be pointing to the first byte of the stream, and is suitable to be passed to NewC1ZFileDecoder.
func NewC1FileReader ¶
func NewC1FileReader(ctx context.Context, dbFilePath string, opts ...C1FOption) (connectorstore.Reader, error)
NewC1FileReader returns a connectorstore.Reader implementation for the given sqlite db file path.
func NewC1ZFileDecoder ¶
func NewC1ZFileDecoder(f io.Reader, opts ...DecoderOption) (io.ReadCloser, error)
NewC1ZFileDecoder wraps a given .c1z io.Reader that validates the .c1z and decompresses/decodes the underlying file. Defaults: 128MiB max memory and 3GiB max decoded size You must close the resulting io.ReadCloser when you are done, do not forget to close the given io.Reader if necessary.
func NewDecoder ¶ added in v0.0.24
func NewDecoder(f io.Reader, opts ...DecoderOption) (*decoder, error)
NewDecoder wraps a given .c1z file io.Reader and returns an io.Reader for the underlying decoded/uncompressed file.
func NewExternalC1FileReader ¶ added in v0.2.84
func ReadHeader ¶ added in v0.0.24
ReadHeader reads len(C1ZFileHeader) bytes from the given io.Reader and compares them to C1ZFileHeader, returning an error if they don't match. If possible, ReadHeader will Seek() to the start of the stream before checking the header bytes. On return, the reader will be pointing to the first byte after the header.
Types ¶
type C1FOption ¶ added in v0.1.8
type C1FOption func(*C1File)
func WithC1FEncoderConcurrency ¶ added in v0.6.6
func WithC1FPragma ¶ added in v0.1.22
WithC1FPragma sets a sqlite pragma for the c1z file.
func WithC1FReadOnly ¶ added in v0.6.6
func WithC1FSkipCleanup ¶ added in v0.8.22
WithC1FSkipCleanup skips cleanup of old syncs when set to true.
func WithC1FSyncCountLimit ¶ added in v0.7.0
WithC1FSyncCountLimit sets the number of syncs to keep during cleanup. If not set, defaults to 2 (or BATON_KEEP_SYNC_COUNT env var if set).
func WithC1FTmpDir ¶ added in v0.1.8
WithC1FTmpDir sets the temporary directory to use when cloning a sync. If not provided, os.TempDir() will be used.
type C1File ¶ added in v0.0.24
type C1File struct {
// contains filtered or unexported fields
}
func AsSQLiteStore ¶ added in v0.8.29
AsSQLiteStore type-asserts a C1ZStore to the concrete *C1File. It is an escape hatch for callers that legitimately need SQLite-specific primitives (today: the attached compactor in pkg/synccompactor/attached, which uses SQL ATTACH for cross-file merge). Returns (nil, false) when the store is not backed by *C1File OR when the underlying *C1File is nil.
Avoid using this outside pkg/synccompactor. If you find yourself reaching for it, prefer adding a named method to C1ZStore that expresses what you need; sqlite-specific leak-through is a smell. See RFC 0002 §4.4.
func NewC1ZFile ¶ added in v0.0.24
Returns a new C1File instance with its state stored at the provided filename.
func (*C1File) AttachFile ¶ added in v0.3.35
func (c *C1File) AttachFile(other *C1File, dbName string) (*C1FileAttached, error)
func (*C1File) CheckpointSync ¶ added in v0.0.24
func (*C1File) CloneSync ¶ added in v0.0.24
CloneSync uses sqlite hackery to directly copy the pertinent rows into a new database. 1. Create a new empty sqlite database in a temp file 2. Open the c1z that we are cloning to get a db handle 3. Execute an ATTACH query to bring our empty sqlite db into the context of our db connection 4. Select directly from the cloned db and insert directly into the new database. 5. Close and save the new database as a c1z at the configured path.
func (*C1File) Close ¶ added in v0.0.24
Close ensures that the sqlite database is flushed to disk, and if any changes were made we update the original database with our changes. The provided context is used for the WAL checkpoint operation. If the context is already expired, a fresh context with a timeout is used to ensure the checkpoint completes.
func (*C1File) CurrentDBSizeBytes ¶ added in v0.8.27
CurrentDBSizeBytes returns the current total on-disk size of the underlying uncompressed sqlite database, including the write-ahead log if present. Used by operational tooling (e.g. the grant-expansion progress logger) to observe c1z growth during long in-process writes without waiting for saveC1z to land a new frame.
The WAL file holds writes that have not yet been checkpointed into the main database file; with journal_mode=WAL the main file may stay stable for long stretches while the WAL grows into the hundreds of MB. Summing both gives a representative "bytes written so far" figure during active expansion.
This is the *uncompressed* size. The post-saveC1z c1z file size (compressed) is smaller; for that, see the `c1z: saved` log line emitted by saveC1z.
func (*C1File) CurrentSyncStep ¶ added in v0.0.24
func (*C1File) DeleteGrant ¶ added in v0.2.84
func (*C1File) DeleteSyncRun ¶ added in v0.0.24
DeleteSyncRun removes all the objects with a given syncID from the database.
func (*C1File) EndSync ¶ added in v0.0.24
EndSync updates the current sync_run row with the end time, and removes any other objects that don't have the current sync ID.
func (*C1File) GenerateSyncDiff ¶ added in v0.3.2
func (*C1File) Get ¶ added in v0.5.0
func (c *C1File) Get(ctx context.Context, key string, opt ...sessions.SessionStoreOption) ([]byte, bool, error)
Get implements types.SessionCache.
func (*C1File) GetAll ¶ added in v0.5.0
func (c *C1File) GetAll(ctx context.Context, pageToken string, opt ...sessions.SessionStoreOption) (map[string][]byte, string, error)
GetAll implements types.SessionStore.
func (*C1File) GetAsset ¶ added in v0.0.24
func (c *C1File) GetAsset(ctx context.Context, request *v2.AssetServiceGetAssetRequest) (string, io.Reader, error)
GetAsset fetches the specified asset from the database, and returns the content type and an io.Reader for the caller to read the asset from.
func (*C1File) GetEntitlement ¶ added in v0.0.24
func (c *C1File) GetEntitlement(ctx context.Context, request *reader_v2.EntitlementsReaderServiceGetEntitlementRequest) (*reader_v2.EntitlementsReaderServiceGetEntitlementResponse, error)
func (*C1File) GetGrant ¶ added in v0.0.24
func (c *C1File) GetGrant(ctx context.Context, request *reader_v2.GrantsReaderServiceGetGrantRequest) (*reader_v2.GrantsReaderServiceGetGrantResponse, error)
func (*C1File) GetLatestFinishedSync ¶ added in v0.2.83
func (c *C1File) GetLatestFinishedSync( ctx context.Context, request *reader_v2.SyncsReaderServiceGetLatestFinishedSyncRequest, ) (*reader_v2.SyncsReaderServiceGetLatestFinishedSyncResponse, error)
func (*C1File) GetMany ¶ added in v0.5.0
func (c *C1File) GetMany(ctx context.Context, keys []string, opt ...sessions.SessionStoreOption) (map[string][]byte, []string, error)
GetMany implements types.SessionStore.
func (*C1File) GetResource ¶ added in v0.0.24
func (c *C1File) GetResource(ctx context.Context, request *reader_v2.ResourcesReaderServiceGetResourceRequest) (*reader_v2.ResourcesReaderServiceGetResourceResponse, error)
func (*C1File) GetResourceType ¶ added in v0.0.24
func (c *C1File) GetResourceType(ctx context.Context, request *reader_v2.ResourceTypesReaderServiceGetResourceTypeRequest) (*reader_v2.ResourceTypesReaderServiceGetResourceTypeResponse, error)
func (*C1File) GetSync ¶ added in v0.2.83
func (c *C1File) GetSync(ctx context.Context, request *reader_v2.SyncsReaderServiceGetSyncRequest) (*reader_v2.SyncsReaderServiceGetSyncResponse, error)
func (*C1File) GrantStats ¶ added in v0.3.58
func (c *C1File) GrantStats(ctx context.Context, syncType connectorstore.SyncType, syncId string) (map[string]int64, error)
GrantStats introspects the database and returns the count of grants for the given sync run. If syncId is empty, it will use the latest sync run of the given type.
func (*C1File) Grants ¶ added in v0.8.29
func (c *C1File) Grants() GrantStore
Grants returns the grant-store slice of this c1z.
func (*C1File) LatestFinishedSyncID ¶ added in v0.3.56
func (*C1File) LatestSyncID ¶ added in v0.0.24
func (*C1File) ListEntitlements ¶ added in v0.0.24
func (c *C1File) ListEntitlements(ctx context.Context, request *v2.EntitlementsServiceListEntitlementsRequest) (*v2.EntitlementsServiceListEntitlementsResponse, error)
func (*C1File) ListGrants ¶ added in v0.0.24
func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error)
func (*C1File) ListGrantsForEntitlement ¶ added in v0.0.24
func (c *C1File) ListGrantsForEntitlement( ctx context.Context, request *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest, ) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error)
func (*C1File) ListGrantsForPrincipal ¶ added in v0.1.6
func (c *C1File) ListGrantsForPrincipal( ctx context.Context, request *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest, ) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error)
func (*C1File) ListGrantsForResourceType ¶ added in v0.0.24
func (c *C1File) ListGrantsForResourceType( ctx context.Context, request *reader_v2.GrantsReaderServiceListGrantsForResourceTypeRequest, ) (*reader_v2.GrantsReaderServiceListGrantsForResourceTypeResponse, error)
func (*C1File) ListResourceTypes ¶ added in v0.0.24
func (c *C1File) ListResourceTypes(ctx context.Context, request *v2.ResourceTypesServiceListResourceTypesRequest) (*v2.ResourceTypesServiceListResourceTypesResponse, error)
func (*C1File) ListResources ¶ added in v0.0.24
func (c *C1File) ListResources(ctx context.Context, request *v2.ResourcesServiceListResourcesRequest) (*v2.ResourcesServiceListResourcesResponse, error)
func (*C1File) ListStaticEntitlements ¶ added in v0.5.21
func (c *C1File) ListStaticEntitlements(ctx context.Context, request *v2.EntitlementsServiceListStaticEntitlementsRequest) (*v2.EntitlementsServiceListStaticEntitlementsResponse, error)
func (*C1File) ListSyncRuns ¶ added in v0.0.24
func (*C1File) ListSyncs ¶ added in v0.2.83
func (c *C1File) ListSyncs(ctx context.Context, request *reader_v2.SyncsReaderServiceListSyncsRequest) (*reader_v2.SyncsReaderServiceListSyncsResponse, error)
func (*C1File) OutputFilepath ¶ added in v0.3.3
func (*C1File) PreviousSyncID ¶ added in v0.0.24
func (*C1File) PutAsset ¶ added in v0.0.24
func (c *C1File) PutAsset(ctx context.Context, assetRef *v2.AssetRef, contentType string, data []byte) error
PutAsset stores the given asset in the database.
func (*C1File) PutEntitlements ¶ added in v0.1.46
func (*C1File) PutEntitlementsIfNewer ¶ added in v0.3.3
func (*C1File) PutGrants ¶ added in v0.1.46
PutGrants is the connector-facing write method on connectorstore.Writer. It replaces any conflicting row and re-extracts expansion metadata from the grant payload.
func (*C1File) PutGrantsIfNewer ¶ added in v0.3.3
PutGrantsIfNewer writes grants only when the provided discovered_at is newer than the stored row's discovered_at. Retained on *C1File because it has targeted test coverage in grants_test.go documenting its semantics. Not on any interface.
func (*C1File) PutResourceTypes ¶ added in v0.1.46
func (*C1File) PutResourceTypesIfNewer ¶ added in v0.3.3
func (*C1File) PutResources ¶ added in v0.1.46
func (*C1File) PutResourcesIfNewer ¶ added in v0.3.3
func (*C1File) ResumeSync ¶ added in v0.3.56
func (*C1File) Set ¶ added in v0.5.0
func (c *C1File) Set(ctx context.Context, key string, value []byte, opt ...sessions.SessionStoreOption) error
Set implements types.SessionStore.
func (*C1File) SetCurrentSync ¶ added in v0.3.8
func (*C1File) SetMany ¶ added in v0.5.0
func (c *C1File) SetMany(ctx context.Context, values map[string][]byte, opt ...sessions.SessionStoreOption) error
SetMany implements types.SessionStore.
func (*C1File) SetSupportsDiff ¶ added in v0.7.22
SetSupportsDiff marks the given sync as supporting diff operations. This indicates the sync has SQL-layer grant metadata (is_expandable) properly populated.
func (*C1File) SetSyncID ¶ added in v0.6.8
SetSyncID sets the current sync ID. This is only intended for testing.
func (*C1File) StartNewSync ¶ added in v0.2.15
func (*C1File) StartOrResumeSync ¶ added in v0.3.52
func (c *C1File) StartOrResumeSync(ctx context.Context, syncType connectorstore.SyncType, syncID string) (string, bool, error)
StartOrResumeSync checks if a sync is already running and resumes it if it is. If no sync is running, it starts a new sync. It returns the sync ID and a boolean indicating if a new sync was started.
func (*C1File) Stats ¶ added in v0.0.24
func (c *C1File) Stats(ctx context.Context, syncType connectorstore.SyncType, syncId string) (map[string]int64, error)
Stats introspects the database and returns the count of objects for the given sync run. If syncId is empty, it will use the latest sync run of the given type.
func (*C1File) StoreExpandedGrants ¶ added in v0.8.29
StoreExpandedGrants persists grants produced by the expander. It strips any residual GrantExpandable annotation from each grant payload and then delegates to the internal UpsertGrants path using PreserveExpansion mode so existing expansion/needs_expansion columns are left untouched.
The strip step is defensive: the expander's upstream callers consume the annotation before writing, but a residual annotation on the payload would disagree with the stored expansion columns. Stripping makes the method total regardless of caller discipline.
This method is exposed on *C1File (not just via GrantStore) because test helpers in pkg/sync/expand construct a *C1File directly and pass it to NewExpander; putting it at the top level keeps those tests free of sub-store wiring.
type C1FileAttached ¶ added in v0.3.35
type C1FileAttached struct {
// contains filtered or unexported fields
}
func (*C1FileAttached) CompactEntitlements ¶ added in v0.3.35
func (*C1FileAttached) CompactGrants ¶ added in v0.3.35
func (*C1FileAttached) CompactResourceTypes ¶ added in v0.3.35
func (*C1FileAttached) CompactResources ¶ added in v0.3.35
func (*C1FileAttached) CompactTable ¶ added in v0.3.35
func (*C1FileAttached) DetachFile ¶ added in v0.3.35
func (c *C1FileAttached) DetachFile(dbName string) (*C1FileAttached, error)
func (*C1FileAttached) GenerateSyncDiffFromFile ¶ added in v0.7.0
func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID string, newSyncID string) (string, string, error)
GenerateSyncDiffFromFile compares the old sync (in attached) with the new sync (in main) and generates two new syncs in the main database.
IMPORTANT: This assumes main=NEW/compacted and attached=OLD/base: - diffTableFromAttached: items in attached (OLD) not in main (NEW) = deletions - diffTableFromMain: items in main (NEW) not in attached (OLD) = upserts (additions)
Parameters: - oldSyncID: the sync ID in the attached database (OLD/base state) - newSyncID: the sync ID in the main database (NEW/compacted state)
Returns (upsertsSyncID, deletionsSyncID, error).
func (*C1FileAttached) UpdateSync ¶ added in v0.6.11
type C1ZOption ¶ added in v0.1.8
type C1ZOption func(*c1zOptions)
func WithDecoderOptions ¶ added in v0.3.50
func WithDecoderOptions(opts ...DecoderOption) C1ZOption
func WithEncoderConcurrency ¶ added in v0.6.6
WithEncoderConcurrency sets the number of created encoders. Default is 1, which disables async encoding/concurrency. 0 uses GOMAXPROCS.
func WithPragma ¶ added in v0.1.22
WithPragma sets a sqlite pragma for the c1z file.
func WithReadOnly ¶ added in v0.6.6
WithReadOnly opens the c1z file in read only mode. Modifying the c1z will result in an error on close.
func WithSkipCleanup ¶ added in v0.8.22
WithSkipCleanup skips cleanup of old syncs when set to true.
func WithSyncLimit ¶ added in v0.7.0
WithSyncLimit sets the number of syncs to keep during cleanup. If not set, defaults to 2 (or BATON_KEEP_SYNC_COUNT env var if set).
func WithTmpDir ¶ added in v0.1.8
WithTmpDir sets the temporary directory to extract the c1z file to. If not provided, os.TempDir() will be used.
type C1ZStore ¶ added in v0.8.29
type C1ZStore interface {
connectorstore.Writer
// Grants returns the grant-specific sub-store.
Grants() GrantStore
// SyncMeta returns the sync-run metadata sub-store.
SyncMeta() SyncMeta
// FileOps returns the file-level operations sub-store.
FileOps() FileOps
// Close releases resources, flushing any pending writes.
// Overrides the embedded Reader.Close to document this signature.
Close(ctx context.Context) error
}
C1ZStore is the internal contract used by the sync pipeline, compactor, and related infrastructure to read and write a .c1z file. It is NOT the connector-facing contract — connectors write through connectorstore.Writer, which is embedded here but stays the stable API surface for external code.
C1ZStore adds three kinds of operations that the sync pipeline needs but connectors don't:
- Grant operations with expansion-aware semantics, accessed via Grants().
- Sync-run metadata operations, accessed via SyncMeta().
- File-level operations (clone, diff), accessed via FileOps().
*C1File is the sole implementation.
The Close override replaces connectorstore.Reader.Close with an explicit context parameter (*C1File.Close already takes context on main, so this is exact match, no behavior change).
type DecoderOption ¶ added in v0.0.24
type DecoderOption func(*decoderOptions) error
DecoderOption is an option for creating a decoder.
func WithContext ¶
func WithContext(ctx context.Context) DecoderOption
WithContext sets a context, when cancelled, will cause subequent calls to Read() to return ctx.Error().
func WithDecoderConcurrency ¶ added in v0.3.50
func WithDecoderConcurrency(n int) DecoderOption
WithDecoderConcurrency sets the number of created decoders. Default is 1, which disables async decoding/concurrency. 0 uses GOMAXPROCS. -1 uses GOMAXPROCS or 4, whichever is lower.
func WithDecoderMaxDecodedSize ¶
func WithDecoderMaxDecodedSize(n uint64) DecoderOption
WithDecoderMaxDecodedSize sets the maximum size of the decoded stream. This can be used to cap the resulting decoded stream size. Maximum is 1 << 63 bytes. Default is 1GiB.
func WithDecoderMaxMemory ¶
func WithDecoderMaxMemory(n uint64) DecoderOption
WithDecoderMaxMemory sets the maximum window size for streaming operations. This can be used to control memory usage of potentially hostile content. Maximum is 1 << 63 bytes. Default is 128MiB.
type FileOps ¶ added in v0.8.29
type FileOps interface {
// CloneSync materializes the given sync run's data into a freshly
// created standalone c1z file at outPath. Used by c1's storeCompletedSyncC1Z
// activity to archive a completed sync.
CloneSync(ctx context.Context, outPath string, syncID string) error
// GenerateSyncDiff computes the diff between two existing sync runs
// in this same file and writes the delta as a new SyncTypePartial
// sync. Returns the new sync's id. Used by the local differ CLI.
GenerateSyncDiff(ctx context.Context, baseSyncID, appliedSyncID string) (diffSyncID string, err error)
}
FileOps is the file-level operations sub-store of C1ZStore. It covers operations that span sync runs or produce new sync runs within the same c1z file. Cross-file operations (SQL ATTACH) live in pkg/synccompactor, not here.
type GrantAnnotation ¶ added in v0.8.29
type GrantAnnotation struct {
Grant *v2.Grant
Annotation *v2.GrantExpandable // nil if not expandable
GrantExternalID string
TargetEntitlementID string
PrincipalResourceTypeID string
PrincipalResourceID string
NeedsExpansion bool
}
GrantAnnotation is a row yielded by GrantStore.ListWithAnnotations.
Grant is always populated. Annotation is nil when the grant has no GrantExpandable annotation.
Identity fields (GrantExternalID, TargetEntitlementID, PrincipalResourceTypeID, PrincipalResourceID) are ALWAYS populated from the underlying grant proto, regardless of whether Annotation is nil, so callers can use them without branching.
NeedsExpansion is only meaningful when Annotation is non-nil; it reports the stored needs_expansion column for expandable grants. For non-expandable grants it is false.
type GrantStore ¶ added in v0.8.29
type GrantStore interface {
// StoreExpandedGrants writes grants produced by the expander back to
// storage. The expander has already consumed each grant's
// GrantExpandable annotation and performed expansion; storage must
// persist the grant payload without re-extracting expansion metadata
// (otherwise we'd corrupt expansion state that has already been
// marked "done"). For robustness, any residual GrantExpandable
// annotation on the input is stripped from the persisted payload.
//
// Called by pkg/sync/expand.PutGrantsInChunks.
StoreExpandedGrants(ctx context.Context, grants ...*v2.Grant) error
// PendingExpansionPage returns the next page of grants whose
// expansion metadata still needs processing (needs_expansion=1).
// Grant payloads are NOT materialized — only identity plus the
// parsed expansion annotation — because the caller only needs
// expansion metadata and this keeps the hot path cheap.
//
// Caller drives pagination by passing the prior nextPageToken as
// pageToken; an empty nextPageToken means the page was the last.
// Initial call passes pageToken="".
//
// Called by pkg/sync.syncer.ExpandGrants, which checkpoints page
// tokens in its action state.
PendingExpansionPage(ctx context.Context, pageToken string) (defs []PendingExpansion, nextPageToken string, err error)
// PendingExpansion walks all pages of PendingExpansionPage and
// yields each row. Convenience wrapper for callers that don't
// checkpoint pagination.
//
// ITERATION CONTRACT: on error, the sequence yields a final
// (zero-value PendingExpansion, err) pair and terminates. Callers
// MUST check the second return value on every iteration:
//
// for pe, err := range store.Grants().PendingExpansion(ctx) {
// if err != nil { return err }
// // ... use pe ...
// }
//
// Single-variable ranging (for pe := range seq) silently drops
// errors and is a bug.
PendingExpansion(ctx context.Context) iter.Seq2[PendingExpansion, error]
// ListWithAnnotationsPage returns the next page of grants with
// their expansion annotations inline. Grant payloads are fully
// materialized. Used by the external-principal post-processing
// step which needs full grants plus expansion.
//
// Caller drives pagination by passing the prior nextPageToken as
// pageToken.
ListWithAnnotationsPage(ctx context.Context, pageToken string) (rows []GrantAnnotation, nextPageToken string, err error)
// ListWithAnnotationsForResourcePage filters ListWithAnnotationsPage
// to grants on the given resource. Used by the c1-side
// fileClientWrapper that emulates a connector from a c1z file and
// forwards a ListGrants RPC whose request has a Resource filter.
//
// Page size defaults to the engine's maximum when pageSize == 0.
ListWithAnnotationsForResourcePage(
ctx context.Context,
resource *v2.Resource,
syncID string,
pageToken string,
pageSize uint32,
) (rows []GrantAnnotation, nextPageToken string, err error)
// ListWithAnnotations walks all pages of ListWithAnnotationsPage
// and yields each row. Convenience wrapper for callers that don't
// checkpoint pagination.
//
// ITERATION CONTRACT: same as PendingExpansion. On error, the
// sequence yields (zero-value GrantAnnotation, err) then
// terminates. Callers MUST check err on every iteration.
//
// Called by pkg/sync.syncer.listAllGrantsWithExpansion.
ListWithAnnotations(ctx context.Context) iter.Seq2[GrantAnnotation, error]
}
GrantStore is the grant-specific slice of C1ZStore. Each method maps to a specific caller intent in the sync pipeline — no mode enums, no option structs with behavior switches.
The expansion-reading methods come in two flavors: a page-at-a-time primitive (PendingExpansionPage, ListWithAnnotationsPage) that caller- driven state machines use to checkpoint progress, and an all-pages iterator (PendingExpansion, ListWithAnnotations) that wraps the page primitive for callers who want to walk everything in one pass.
type PendingExpansion ¶ added in v0.8.29
type PendingExpansion struct {
// GrantExternalID is the external id of the grant this expansion
// applies to (matches v2.Grant.Id).
GrantExternalID string
// TargetEntitlementID is the entitlement id granted to the principal.
TargetEntitlementID string
// PrincipalResourceTypeID and PrincipalResourceID identify the principal.
PrincipalResourceTypeID string
PrincipalResourceID string
// Annotation is the grant's expansion annotation. Non-nil by
// construction: a row only appears in PendingExpansion if it has one.
Annotation *v2.GrantExpandable
// NeedsExpansion is the current needs_expansion column value.
// Always true for rows returned by PendingExpansion — included
// for parity with the underlying row shape and future flexibility.
NeedsExpansion bool
}
PendingExpansion is a lightweight row yielded by GrantStore.PendingExpansion. It carries grant identity and the parsed expansion annotation without materializing the full grant payload — this keeps the expansion-worker hot path out of proto.Unmarshal.
The syncer uses GrantExternalID to look up the grant and uses Annotation to decide what expansion work to enqueue.
type SessionStore ¶ added in v0.5.0
type SessionStore interface {
sessions.SessionStore
}
type SyncMeta ¶ added in v0.8.29
type SyncMeta interface {
// MarkSyncSupportsDiff sets the supports_diff flag on the given sync.
// Called by pkg/sync.parallelSyncer after graph construction to signal
// that the sync run has SQL-layer grant metadata populated and diff
// consumers may rely on it.
MarkSyncSupportsDiff(ctx context.Context, syncID string) error
// LatestFullSync returns the most-recently-finished SyncTypeFull sync
// run, or nil if none exists. Used by pkg/sync.syncer to compute the
// "previous sync" reference for etag-based grant reuse.
LatestFullSync(ctx context.Context) (*SyncRun, error)
// LatestFinishedSyncOfAnyType returns the most-recently-finished sync
// of any type (including diff types), or nil if none exists. Used by
// tooling that wants to inspect whatever sync finished last regardless
// of type.
LatestFinishedSyncOfAnyType(ctx context.Context) (*SyncRun, error)
// Stats returns a map of table-name to row-count for the given sync.
// If syncID is empty, the latest sync of the given type is used.
// Mirrors the existing *C1File.Stats signature exactly.
Stats(ctx context.Context, syncType connectorstore.SyncType, syncID string) (map[string]int64, error)
}
SyncMeta is the sync-run-metadata sub-store of C1ZStore. It covers operations that read/write the sync_runs table without going through the gRPC Reader/Writer surfaces.
All methods are callable without an active sync.
type SyncRun ¶ added in v0.8.29
type SyncRun struct {
ID string
StartedAt *time.Time
EndedAt *time.Time
SyncToken string
Type connectorstore.SyncType
ParentSyncID string
LinkedSyncID string
SupportsDiff bool
}
SyncRun is the exported shape of a sync run. It corresponds directly to the internal syncRun struct; the fields match the sync_runs schema.
Callers typically only read ID, Type, and the timestamps; the rest is included for completeness and for use by tooling (e.g. sync-diff pipelines need ParentSyncID and LinkedSyncID).