export

package
v1.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: BSD-3-Clause Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrExportNotFound is returned when no export with the given ID exists.
	ErrExportNotFound = errors.New("export not found")

	// ErrExportAlreadyFinished is returned when trying to cancel an export
	// that has already completed (SUCCESS, FAILED, or CANCELED).
	ErrExportAlreadyFinished = errors.New("export has already finished")

	// ErrExportValidation is returned when the export request fails input
	// validation (invalid ID, unknown backend, non-existent class, etc.).
	ErrExportValidation = errors.New("export validation error")

	// ErrExportAlreadyExists is returned when attempting to create an export
	// with an ID that already has artifacts on the storage backend.
	ErrExportAlreadyExists = errors.New("export already exists")

	// ErrExportAlreadyActive is returned when attempting to start an export
	// while another export is already in progress on a participant node.
	ErrExportAlreadyActive = errors.New("export already active")

	// ErrExportShuttingDown is returned when a new export is rejected because
	// the node is shutting down.
	ErrExportShuttingDown = errors.New("server is shutting down")

	// ErrExportDisabled is returned when the export feature is not enabled.
	ErrExportDisabled = errors.New("export API is disabled; enable it via EXPORT_ENABLED=true or the runtime config")
)

Functions

This section is empty.

Types

type BackendProvider

type BackendProvider interface {
	BackupBackend(backend string, useCase modulecapabilities.BackendUseCase) (modulecapabilities.BackupBackend, error)
}

BackendProvider provides access to storage backends for export.

type ExportClient

type ExportClient interface {
	// Prepare asks a participant to reserve its export slot.
	Prepare(ctx context.Context, host string, req *ExportRequest) error
	// Commit tells a participant to start the export.
	Commit(ctx context.Context, host, exportID string) error
	// Abort tells a participant to release its reservation.
	Abort(ctx context.Context, host, exportID string) error
	// IsRunning checks whether a participant node is still running the given export.
	IsRunning(ctx context.Context, host, exportID string) (bool, error)
}

ExportClient is the HTTP client interface for inter-node export communication.

type ExportMetadata

type ExportMetadata struct {
	ID              string                                     `json:"id"`
	Backend         string                                     `json:"backend"`
	StartedAt       time.Time                                  `json:"startedAt"`
	CompletedAt     time.Time                                  `json:"completedAt"`
	Status          export.Status                              `json:"status"`
	Classes         []string                                   `json:"classes"`
	NodeAssignments map[string]map[string][]string             `json:"nodeAssignments,omitempty"` // node → className → []shardName
	Error           string                                     `json:"error,omitempty"`
	ShardStatus     map[string]map[string]models.ShardProgress `json:"shardStatus,omitempty"`
}

ExportMetadata is written to S3 alongside the parquet files. It is the single source of truth for an export's configuration and status.

type ExportMetrics

type ExportMetrics struct {
	ObjectsTotal         prometheus.Counter
	OperationsTotal      *prometheus.CounterVec
	CoordinationDuration prometheus.Histogram
	Duration             prometheus.Histogram
}

ExportMetrics holds Prometheus metrics for the export subsystem.

func NewExportMetrics

func NewExportMetrics(reg prometheus.Registerer) *ExportMetrics

NewExportMetrics creates and registers export metrics with the given registerer.

type ExportRequest

type ExportRequest struct {
	ID           string              `json:"id"`
	Backend      string              `json:"backend"`
	Classes      []string            `json:"classes"`
	Shards       map[string][]string `json:"shards"` // className → []shardName
	Bucket       string              `json:"bucket"`
	Path         string              `json:"path"`
	NodeName     string              `json:"nodeName"`
	SiblingNodes []string            `json:"siblingNodes,omitempty"` // other node names in the same export

	// Fields below are set for multi-node exports
	StartedAt       time.Time                      `json:"startedAt,omitempty"`
	NodeAssignments map[string]map[string][]string `json:"nodeAssignments,omitempty"` // node → className → []shardName
}

ExportRequest is sent from coordinator to participant nodes

type ExportStatusResponse

type ExportStatusResponse struct {
	Running bool `json:"running"`
}

ExportStatusResponse is the JSON payload for GET /exports/status.

type NodeResolver

type NodeResolver interface {
	NodeHostname(nodeName string) (string, bool)
}

NodeResolver resolves node names to hostnames.

type NodeStatus

type NodeStatus struct {
	NodeName      string                               `json:"nodeName"`
	Status        export.Status                        `json:"status"`
	ShardProgress map[string]map[string]*ShardProgress `json:"shardProgress,omitempty"` // className → shardName → progress
	Error         string                               `json:"error,omitempty"`
	CompletedAt   time.Time                            `json:"completedAt,omitempty"`
	Version       string                               `json:"version"`
	// contains filtered or unexported fields
}

NodeStatus is written to S3 by each participant node. The embedded mutex protects the maps and non-atomic fields. ShardProgress.objectsWritten is updated lock-free via atomics.

func (*NodeStatus) AddShardExported

func (ns *NodeStatus) AddShardExported(className, shardName string, delta int64)

AddShardExported atomically increments a shard's written-objects counter. The mutex is held only for the map lookup; the counter itself is lock-free.

func (*NodeStatus) GetShardWritten

func (ns *NodeStatus) GetShardWritten(className, shardName string) int64

GetShardWritten returns the current value of a shard's atomic written-objects counter. The mutex is held only for the map lookup.

func (*NodeStatus) SetFailed

func (ns *NodeStatus) SetFailed(className string, err error)

SetFailed marks the node export as failed in a thread-safe manner.

func (*NodeStatus) SetNodeError

func (ns *NodeStatus) SetNodeError(msg string)

SetNodeError marks the node export as failed with an arbitrary message.

func (*NodeStatus) SetShardProgress

func (ns *NodeStatus) SetShardProgress(className, shardName string, status export.ShardStatus, errMsg, skipReason string)

SetShardProgress updates a shard's export progress in a thread-safe manner. ObjectsExported is not set here; it is synced from the atomic counter by SyncAndSnapshot before each JSON marshal.

func (*NodeStatus) SetSuccess

func (ns *NodeStatus) SetSuccess()

SetSuccess marks the node export as succeeded in a thread-safe manner.

func (*NodeStatus) SyncAndSnapshot

func (ns *NodeStatus) SyncAndSnapshot() *NodeStatus

SyncAndSnapshot syncs live atomic counters into ObjectsExported and returns a deep copy of the NodeStatus suitable for marshaling, all under a single lock acquisition.

type ParquetRow

type ParquetRow struct {
	ID           string `parquet:"id,dict"`
	CreationTime int64  `parquet:"creation_time"`
	UpdateTime   int64  `parquet:"update_time"`
	Vector       []byte `parquet:"vector,optional"`
	NamedVectors []byte `parquet:"named_vectors,optional"`
	MultiVectors []byte `parquet:"multi_vectors,optional"`
	Properties   []byte `parquet:"properties,optional"`
}

ParquetRow represents a single row in the Parquet file

type ParquetWriter

type ParquetWriter struct {
	// contains filtered or unexported fields
}

ParquetWriter writes Weaviate objects to Parquet format

func NewParquetWriter

func NewParquetWriter(w io.Writer) (*ParquetWriter, error)

NewParquetWriter creates a new Parquet writer

func (*ParquetWriter) Close

func (pw *ParquetWriter) Close() error

Close flushes remaining data and closes the writer

func (*ParquetWriter) Flush

func (pw *ParquetWriter) Flush() error

Flush writes all buffered rows to the Parquet file

func (*ParquetWriter) SetFileMetadata

func (pw *ParquetWriter) SetFileMetadata(key, value string)

SetFileMetadata sets a key/value pair in the Parquet file metadata.

func (*ParquetWriter) WriteRow

func (pw *ParquetWriter) WriteRow(row ParquetRow) error

WriteRow writes a pre-converted row to the Parquet file (buffered). This is used by the parallel export path where conversion happens in worker goroutines.

type Participant

type Participant struct {
	// contains filtered or unexported fields
}

Participant handles export requests on a single node. It exports its assigned shards directly to S3 and writes status files.

The two-phase commit protocol works as follows:

  1. Prepare: reserves the export slot (atomic CAS) and starts snapshotting all shards in the background. This anchors the point-in-time to the Prepare phase. A background timer auto-aborts after reservationTimeout if Commit is not called.
  2. Commit: waits for the snapshot goroutine to finish, then starts the scan phase that reads the snapshots and uploads parquet files.
  3. Abort: releases the reservation immediately, canceling any in-flight snapshot goroutine.

func NewParticipant

func NewParticipant(
	selector Selector,
	backends BackendProvider,
	logger logrus.FieldLogger,
	client ExportClient,
	nodeResolver NodeResolver,
	localNode string,
	metrics *ExportMetrics,
	exportParallelism *configRuntime.DynamicValue[int],
) *Participant

NewParticipant creates a new export participant. Call StartShutdown to signal in-flight exports to stop, then Shutdown to wait for them to drain. client and nodeResolver enable best-effort sibling aborts on failure.

func (*Participant) Abort

func (p *Participant) Abort(exportID string)

Abort cancels a prepared or running export. If the export is still in the prepared state, the reservation is released. If the export has already been committed, the running export is canceled.

func (*Participant) Commit

func (p *Participant) Commit(ctx context.Context, exportID string) error

Commit starts the actual export. Must be called after a successful Prepare.

func (*Participant) IsRunning

func (p *Participant) IsRunning(id string) bool

IsRunning reports whether the given export is currently running on this node.

func (*Participant) Prepare

func (p *Participant) Prepare(_ context.Context, req *ExportRequest) error

Prepare reserves the export slot for the given request. If no Commit arrives within reservationTimeout the reservation is automatically released.

func (*Participant) Shutdown

func (p *Participant) Shutdown(ctx context.Context) error

Shutdown waits for any in-flight export goroutine to finish its cleanup (final status flush, sibling abort, metadata promotion). The caller should call StartShutdown first to signal exports to stop, then call Shutdown to wait for them to drain. The provided context bounds how long we wait.

func (*Participant) StartShutdown

func (p *Participant) StartShutdown()

StartShutdown signals in-flight exports to stop. It returns immediately; call Shutdown to wait for exports to finish their cleanup.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages export operations. The node that receives the request acts as coordinator and uses a two-phase commit protocol (prepare/commit/abort) across all participant nodes.

func NewScheduler

func NewScheduler(
	authorizer authorization.Authorizer,
	rbacConfig rbacconf.Config,
	exportConfig config.Export,
	selector Selector,
	backends BackendProvider,
	logger logrus.FieldLogger,
	client ExportClient,
	nodeResolver NodeResolver,
	localNode string,
	participant *Participant,
	metrics *ExportMetrics,
) *Scheduler

NewScheduler creates a new export scheduler.

func (*Scheduler) Cancel

func (s *Scheduler) Cancel(ctx context.Context, principal *models.Principal, backend, id string) error

Cancel cancels a running export. Returns ErrExportNotFound if the export does not exist, or ErrExportAlreadyFinished if it has already completed.

Note: Cancel does not remove artifacts (Parquet files, status files, metadata) already written to the backend. This is intentional — partial data is kept so operators can inspect what was exported before the cancellation and to avoid the complexity of distributed garbage collection across storage backends. The same applies to failed exports.

func (*Scheduler) Export

func (s *Scheduler) Export(ctx context.Context, principal *models.Principal, id, backend string, include, exclude []string) (*models.ExportCreateResponse, error)

Export starts a new export operation.

func (*Scheduler) StartShutdown

func (s *Scheduler) StartShutdown()

StartShutdown marks the scheduler as shutting down so new export requests are rejected. Already-running exports are not affected.

func (*Scheduler) Status

func (s *Scheduler) Status(ctx context.Context, principal *models.Principal, backend, id string) (*models.ExportStatusResponse, error)

Status retrieves the status of an export. Assembles status from metadata's NodeAssignments + per-node status files.

type Selector

type Selector interface {
	ListClasses(ctx context.Context) []string
	ShardOwnership(ctx context.Context, className string) (map[string][]string, error)
	IsMultiTenant(ctx context.Context, className string) bool
	IsAsyncReplicationEnabled(ctx context.Context, className string) bool

	// SnapshotShards creates point-in-time snapshots of the objects buckets
	// for the given shards of a class. For active (loaded) shards it flushes
	// and hard-links from the live bucket. For inactive (unloaded) shards it
	// hard-links directly from the on-disk bucket directory without loading
	// the shard. Skipped shards (e.g. offloaded tenants) are included in the
	// result with a skip reason and nil snapshot.
	SnapshotShards(ctx context.Context, className string, shardNames []string, exportID string) ([]ShardSnapshotResult, error)
}

Selector provides access to shards and classes for export operations.

type ShardProgress

type ShardProgress struct {
	Status          export.ShardStatus `json:"status"`
	ObjectsExported int64              `json:"objectsExported"`
	Error           string             `json:"error,omitempty"`
	SkipReason      string             `json:"skipReason,omitempty"`
	// contains filtered or unexported fields
}

ShardProgress tracks the progress of exporting a single shard. Used internally and in the S3 NodeStatus format.

objectsWritten is an atomic counter that workers increment lock-free. It is copied into ObjectsExported by SyncAndSnapshot before each JSON marshal.

type ShardSnapshotResult

type ShardSnapshotResult struct {
	ShardName   string
	SnapshotDir string // path to the snapshot directory; empty if skipped
	Strategy    string // bucket strategy (e.g. StrategyReplace)
	SkipReason  string // non-empty if the shard was skipped
}

ShardSnapshotResult holds the outcome of snapshotting a single shard. If SkipReason is non-empty the shard was skipped and SnapshotDir is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL