bulk

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package bulk implements the bulk-loading path that bypasses the transactional WAL stack and writes a Tier 2 csrfile directly from a stream of edges.

Bulk loading is the high-throughput equivalent of running many txn.Commit calls back-to-back. The v1 implementation pipes edges into an in-memory adjacency list and then writes the resulting CSR through csrfile.WriteToFile; a future revision will introduce an external k-way merge sort for graphs that exceed memory.

Example

Example bulk-loads a small graph: edges are streamed through a Loader that bypasses the transactional WAL stack and writes a Tier 2 csrfile directly. Finalise returns the row count and the in-memory CSR; the file is then reopened to confirm it landed on disk.

package main

import (
	"fmt"
	"os"
	"path/filepath"

	"github.com/FlavioCFOliveira/GoGraph/store/bulk"
	"github.com/FlavioCFOliveira/GoGraph/store/csrfile"
)

func main() {
	dir, err := os.MkdirTemp("", "bulk-example")
	if err != nil {
		panic(err)
	}
	defer func() { _ = os.RemoveAll(dir) }()

	out := filepath.Join(dir, "graph.csr")
	l := bulk.New(bulk.Options{OutputPath: out, Directed: true})

	// Feed edges one at a time and in a batch; both paths funnel into
	// the same in-memory adjacency list.
	if err := l.Add(bulk.Edge{Src: "a", Dst: "b", Weight: 1}); err != nil {
		panic(err)
	}
	if err := l.AddBatch([]bulk.Edge{
		{Src: "b", Dst: "c", Weight: 2},
		{Src: "c", Dst: "a", Weight: 3},
	}); err != nil {
		panic(err)
	}

	// Finalise flushes the accumulated edges to the csrfile and returns
	// the row count plus the resulting CSR snapshot.
	rows, c, err := l.Finalise()
	if err != nil {
		panic(err)
	}
	fmt.Printf("rows=%d csr-order=%d csr-size=%d\n", rows, c.Order(), c.Size())

	// Reopen the written file to confirm the bulk load is durable.
	r, err := csrfile.Open(out)
	if err != nil {
		panic(err)
	}
	defer func() { _ = r.Close() }()
	fmt.Printf("on-disk edges=%d\n", r.Header().NEdges)

}
Output:
rows=3 csr-order=3 csr-size=3
on-disk edges=3

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrTooManyRows = errors.New("bulk: row cap exceeded")

ErrTooManyRows is returned by Loader.Add, Loader.AddBatch, and Loader.Drain when the configured Options.MaxRows cap is exceeded.

Functions

This section is empty.

Types

type Edge

type Edge struct {
	Src    string
	Dst    string
	Weight int64
}

Edge is one record the bulk loader consumes.

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

Loader streams edges through an in-memory adjacency list and writes the resulting Tier 2 csrfile when Loader.Finalise runs.

Loader is not safe for concurrent use; callers that wish to parallelise ingestion should partition the edge stream upstream and call separate Loaders, then merge — but the v1 expectation is a single ingest goroutine.

func New

func New(opts Options) *Loader

New returns a fresh Loader.

func (*Loader) Add

func (l *Loader) Add(e Edge) error

Add ingests one edge. Returns ErrTooManyRows when the row cap is exceeded.

func (*Loader) AddBatch

func (l *Loader) AddBatch(es []Edge) error

AddBatch ingests a contiguous batch of edges. Returns ErrTooManyRows on the first edge that would cross the cap; edges accepted before that point remain ingested.

func (*Loader) Drain

func (l *Loader) Drain(ctx context.Context, ch <-chan Edge) (int, error)

Drain consumes from ch until it is closed or ctx is cancelled. Returns the number of edges drained and any error from the input channel (ErrTooManyRows when the row cap is exceeded).

func (*Loader) Finalise

func (l *Loader) Finalise() (int, *csr.CSR[int64], error)

Finalise builds the CSR from the accumulated edges and writes it to opts.OutputPath as a csrfile. Returns the row count, the resulting CSR (for chaining into search/extern), and any error.

func (*Loader) Rows

func (l *Loader) Rows() int

Rows returns the number of edges ingested so far.

type Options

type Options struct {
	// OutputPath is the destination csrfile.
	OutputPath string
	// Directed selects the adjacency-list configuration.
	Directed bool
	// Multigraph allows parallel edges in the loaded graph.
	Multigraph bool
	// MaxRows, when > 0, caps the number of edge records the loader
	// will ingest. Add / AddBatch / Drain return [ErrTooManyRows]
	// on the row that crosses the cap. Default (0) is unbounded.
	MaxRows int
}

Options configures the Loader.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL