Documentation
¶
Index ¶
- Variables
- type BackendProvider
- type ExportClient
- type ExportMetadata
- type ExportMetrics
- type ExportRequest
- type ExportStatusResponse
- type NodeResolver
- type NodeStatus
- func (ns *NodeStatus) AddShardExported(className, shardName string, delta int64)
- func (ns *NodeStatus) GetShardWritten(className, shardName string) int64
- func (ns *NodeStatus) SetFailed(className string, err error)
- func (ns *NodeStatus) SetNodeError(msg string)
- func (ns *NodeStatus) SetShardProgress(className, shardName string, status export.ShardStatus, ...)
- func (ns *NodeStatus) SetSuccess()
- func (ns *NodeStatus) SyncAndSnapshot() *NodeStatus
- type ParquetRow
- type ParquetWriter
- type Participant
- func (p *Participant) Abort(exportID string)
- func (p *Participant) Commit(ctx context.Context, exportID string) error
- func (p *Participant) IsRunning(id string) bool
- func (p *Participant) Prepare(_ context.Context, req *ExportRequest) error
- func (p *Participant) Shutdown(ctx context.Context) error
- func (p *Participant) StartShutdown()
- type Scheduler
- func (s *Scheduler) Cancel(ctx context.Context, principal *models.Principal, backend, id string) error
- func (s *Scheduler) Export(ctx context.Context, principal *models.Principal, id, backend string, ...) (*models.ExportCreateResponse, error)
- func (s *Scheduler) StartShutdown()
- func (s *Scheduler) Status(ctx context.Context, principal *models.Principal, backend, id string) (*models.ExportStatusResponse, error)
- type Selector
- type ShardProgress
- type ShardSnapshotResult
Constants ¶
This section is empty.
Variables ¶
var ( // ErrExportNotFound is returned when no export with the given ID exists. ErrExportNotFound = errors.New("export not found") // ErrExportAlreadyFinished is returned when trying to cancel an export // that has already completed (SUCCESS, FAILED, or CANCELED). ErrExportAlreadyFinished = errors.New("export has already finished") // ErrExportValidation is returned when the export request fails input // validation (invalid ID, unknown backend, non-existent class, etc.). ErrExportValidation = errors.New("export validation error") // ErrExportAlreadyExists is returned when attempting to create an export // with an ID that already has artifacts on the storage backend. ErrExportAlreadyExists = errors.New("export already exists") // ErrExportAlreadyActive is returned when attempting to start an export // while another export is already in progress on a participant node. ErrExportAlreadyActive = errors.New("export already active") // ErrExportShuttingDown is returned when a new export is rejected because // the node is shutting down. ErrExportShuttingDown = errors.New("server is shutting down") // ErrExportDisabled is returned when the export feature is not enabled. ErrExportDisabled = errors.New("export API is disabled; enable it via EXPORT_ENABLED=true or the runtime config") )
Functions ¶
This section is empty.
Types ¶
type BackendProvider ¶
type BackendProvider interface {
BackupBackend(backend string, useCase modulecapabilities.BackendUseCase) (modulecapabilities.BackupBackend, error)
}
BackendProvider provides access to storage backends for export.
type ExportClient ¶
type ExportClient interface {
// Prepare asks a participant to reserve its export slot.
Prepare(ctx context.Context, host string, req *ExportRequest) error
// Commit tells a participant to start the export.
Commit(ctx context.Context, host, exportID string) error
// Abort tells a participant to release its reservation.
Abort(ctx context.Context, host, exportID string) error
// IsRunning checks whether a participant node is still running the given export.
IsRunning(ctx context.Context, host, exportID string) (bool, error)
}
ExportClient is the HTTP client interface for inter-node export communication.
type ExportMetadata ¶
type ExportMetadata struct {
ID string `json:"id"`
Backend string `json:"backend"`
StartedAt time.Time `json:"startedAt"`
CompletedAt time.Time `json:"completedAt"`
Status export.Status `json:"status"`
Classes []string `json:"classes"`
NodeAssignments map[string]map[string][]string `json:"nodeAssignments,omitempty"` // node → className → []shardName
Error string `json:"error,omitempty"`
ShardStatus map[string]map[string]models.ShardProgress `json:"shardStatus,omitempty"`
}
ExportMetadata is written to S3 alongside the parquet files. It is the single source of truth for an export's configuration and status.
type ExportMetrics ¶
type ExportMetrics struct {
ObjectsTotal prometheus.Counter
OperationsTotal *prometheus.CounterVec
CoordinationDuration prometheus.Histogram
Duration prometheus.Histogram
}
ExportMetrics holds Prometheus metrics for the export subsystem.
func NewExportMetrics ¶
func NewExportMetrics(reg prometheus.Registerer) *ExportMetrics
NewExportMetrics creates and registers export metrics with the given registerer.
type ExportRequest ¶
type ExportRequest struct {
ID string `json:"id"`
Backend string `json:"backend"`
Classes []string `json:"classes"`
Shards map[string][]string `json:"shards"` // className → []shardName
Bucket string `json:"bucket"`
Path string `json:"path"`
NodeName string `json:"nodeName"`
SiblingNodes []string `json:"siblingNodes,omitempty"` // other node names in the same export
// Fields below are set for multi-node exports
StartedAt time.Time `json:"startedAt,omitempty"`
NodeAssignments map[string]map[string][]string `json:"nodeAssignments,omitempty"` // node → className → []shardName
}
ExportRequest is sent from coordinator to participant nodes
type ExportStatusResponse ¶
type ExportStatusResponse struct {
Running bool `json:"running"`
}
ExportStatusResponse is the JSON payload for GET /exports/status.
type NodeResolver ¶
NodeResolver resolves node names to hostnames.
type NodeStatus ¶
type NodeStatus struct {
NodeName string `json:"nodeName"`
Status export.Status `json:"status"`
ShardProgress map[string]map[string]*ShardProgress `json:"shardProgress,omitempty"` // className → shardName → progress
Error string `json:"error,omitempty"`
CompletedAt time.Time `json:"completedAt,omitempty"`
Version string `json:"version"`
// contains filtered or unexported fields
}
NodeStatus is written to S3 by each participant node. The embedded mutex protects the maps and non-atomic fields. ShardProgress.objectsWritten is updated lock-free via atomics.
func (*NodeStatus) AddShardExported ¶
func (ns *NodeStatus) AddShardExported(className, shardName string, delta int64)
AddShardExported atomically increments a shard's written-objects counter. The mutex is held only for the map lookup; the counter itself is lock-free.
func (*NodeStatus) GetShardWritten ¶
func (ns *NodeStatus) GetShardWritten(className, shardName string) int64
GetShardWritten returns the current value of a shard's atomic written-objects counter. The mutex is held only for the map lookup.
func (*NodeStatus) SetFailed ¶
func (ns *NodeStatus) SetFailed(className string, err error)
SetFailed marks the node export as failed in a thread-safe manner.
func (*NodeStatus) SetNodeError ¶
func (ns *NodeStatus) SetNodeError(msg string)
SetNodeError marks the node export as failed with an arbitrary message.
func (*NodeStatus) SetShardProgress ¶
func (ns *NodeStatus) SetShardProgress(className, shardName string, status export.ShardStatus, errMsg, skipReason string)
SetShardProgress updates a shard's export progress in a thread-safe manner. ObjectsExported is not set here; it is synced from the atomic counter by SyncAndSnapshot before each JSON marshal.
func (*NodeStatus) SetSuccess ¶
func (ns *NodeStatus) SetSuccess()
SetSuccess marks the node export as succeeded in a thread-safe manner.
func (*NodeStatus) SyncAndSnapshot ¶
func (ns *NodeStatus) SyncAndSnapshot() *NodeStatus
SyncAndSnapshot syncs live atomic counters into ObjectsExported and returns a deep copy of the NodeStatus suitable for marshaling, all under a single lock acquisition.
type ParquetRow ¶
type ParquetRow struct {
ID string `parquet:"id,dict"`
CreationTime int64 `parquet:"creation_time"`
UpdateTime int64 `parquet:"update_time"`
Vector []byte `parquet:"vector,optional"`
NamedVectors []byte `parquet:"named_vectors,optional"`
MultiVectors []byte `parquet:"multi_vectors,optional"`
Properties []byte `parquet:"properties,optional"`
}
ParquetRow represents a single row in the Parquet file
type ParquetWriter ¶
type ParquetWriter struct {
// contains filtered or unexported fields
}
ParquetWriter writes Weaviate objects to Parquet format
func NewParquetWriter ¶
func NewParquetWriter(w io.Writer) (*ParquetWriter, error)
NewParquetWriter creates a new Parquet writer
func (*ParquetWriter) Close ¶
func (pw *ParquetWriter) Close() error
Close flushes remaining data and closes the writer
func (*ParquetWriter) Flush ¶
func (pw *ParquetWriter) Flush() error
Flush writes all buffered rows to the Parquet file
func (*ParquetWriter) SetFileMetadata ¶
func (pw *ParquetWriter) SetFileMetadata(key, value string)
SetFileMetadata sets a key/value pair in the Parquet file metadata.
func (*ParquetWriter) WriteRow ¶
func (pw *ParquetWriter) WriteRow(row ParquetRow) error
WriteRow writes a pre-converted row to the Parquet file (buffered). This is used by the parallel export path where conversion happens in worker goroutines.
type Participant ¶
type Participant struct {
// contains filtered or unexported fields
}
Participant handles export requests on a single node. It exports its assigned shards directly to S3 and writes status files.
The two-phase commit protocol works as follows:
- Prepare: reserves the export slot (atomic CAS) and starts snapshotting all shards in the background. This anchors the point-in-time to the Prepare phase. A background timer auto-aborts after reservationTimeout if Commit is not called.
- Commit: waits for the snapshot goroutine to finish, then starts the scan phase that reads the snapshots and uploads parquet files.
- Abort: releases the reservation immediately, canceling any in-flight snapshot goroutine.
func NewParticipant ¶
func NewParticipant( selector Selector, backends BackendProvider, logger logrus.FieldLogger, client ExportClient, nodeResolver NodeResolver, localNode string, metrics *ExportMetrics, exportParallelism *configRuntime.DynamicValue[int], ) *Participant
NewParticipant creates a new export participant. Call StartShutdown to signal in-flight exports to stop, then Shutdown to wait for them to drain. client and nodeResolver enable best-effort sibling aborts on failure.
func (*Participant) Abort ¶
func (p *Participant) Abort(exportID string)
Abort cancels a prepared or running export. If the export is still in the prepared state, the reservation is released. If the export has already been committed, the running export is canceled.
func (*Participant) Commit ¶
func (p *Participant) Commit(ctx context.Context, exportID string) error
Commit starts the actual export. Must be called after a successful Prepare.
func (*Participant) IsRunning ¶
func (p *Participant) IsRunning(id string) bool
IsRunning reports whether the given export is currently running on this node.
func (*Participant) Prepare ¶
func (p *Participant) Prepare(_ context.Context, req *ExportRequest) error
Prepare reserves the export slot for the given request. If no Commit arrives within reservationTimeout the reservation is automatically released.
func (*Participant) Shutdown ¶
func (p *Participant) Shutdown(ctx context.Context) error
Shutdown waits for any in-flight export goroutine to finish its cleanup (final status flush, sibling abort, metadata promotion). The caller should call StartShutdown first to signal exports to stop, then call Shutdown to wait for them to drain. The provided context bounds how long we wait.
func (*Participant) StartShutdown ¶
func (p *Participant) StartShutdown()
StartShutdown signals in-flight exports to stop. It returns immediately; call Shutdown to wait for exports to finish their cleanup.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages export operations. The node that receives the request acts as coordinator and uses a two-phase commit protocol (prepare/commit/abort) across all participant nodes.
func NewScheduler ¶
func NewScheduler( authorizer authorization.Authorizer, rbacConfig rbacconf.Config, exportConfig config.Export, selector Selector, backends BackendProvider, logger logrus.FieldLogger, client ExportClient, nodeResolver NodeResolver, localNode string, participant *Participant, metrics *ExportMetrics, ) *Scheduler
NewScheduler creates a new export scheduler.
func (*Scheduler) Cancel ¶
func (s *Scheduler) Cancel(ctx context.Context, principal *models.Principal, backend, id string) error
Cancel cancels a running export. Returns ErrExportNotFound if the export does not exist, or ErrExportAlreadyFinished if it has already completed.
Note: Cancel does not remove artifacts (Parquet files, status files, metadata) already written to the backend. This is intentional — partial data is kept so operators can inspect what was exported before the cancellation and to avoid the complexity of distributed garbage collection across storage backends. The same applies to failed exports.
func (*Scheduler) Export ¶
func (s *Scheduler) Export(ctx context.Context, principal *models.Principal, id, backend string, include, exclude []string) (*models.ExportCreateResponse, error)
Export starts a new export operation.
func (*Scheduler) StartShutdown ¶
func (s *Scheduler) StartShutdown()
StartShutdown marks the scheduler as shutting down so new export requests are rejected. Already-running exports are not affected.
type Selector ¶
type Selector interface {
ListClasses(ctx context.Context) []string
ShardOwnership(ctx context.Context, className string) (map[string][]string, error)
IsMultiTenant(ctx context.Context, className string) bool
IsAsyncReplicationEnabled(ctx context.Context, className string) bool
// SnapshotShards creates point-in-time snapshots of the objects buckets
// for the given shards of a class. For active (loaded) shards it flushes
// and hard-links from the live bucket. For inactive (unloaded) shards it
// hard-links directly from the on-disk bucket directory without loading
// the shard. Skipped shards (e.g. offloaded tenants) are included in the
// result with a skip reason and nil snapshot.
SnapshotShards(ctx context.Context, className string, shardNames []string, exportID string) ([]ShardSnapshotResult, error)
}
Selector provides access to shards and classes for export operations.
type ShardProgress ¶
type ShardProgress struct {
Status export.ShardStatus `json:"status"`
ObjectsExported int64 `json:"objectsExported"`
Error string `json:"error,omitempty"`
SkipReason string `json:"skipReason,omitempty"`
// contains filtered or unexported fields
}
ShardProgress tracks the progress of exporting a single shard. Used internally and in the S3 NodeStatus format.
objectsWritten is an atomic counter that workers increment lock-free. It is copied into ObjectsExported by SyncAndSnapshot before each JSON marshal.
type ShardSnapshotResult ¶
type ShardSnapshotResult struct {
ShardName string
SnapshotDir string // path to the snapshot directory; empty if skipped
Strategy string // bucket strategy (e.g. StrategyReplace)
SkipReason string // non-empty if the shard was skipped
}
ShardSnapshotResult holds the outcome of snapshotting a single shard. If SkipReason is non-empty the shard was skipped and SnapshotDir is empty.