Documentation
¶
Overview ¶
Package bulk implements the bulk-loading path that bypasses the transactional WAL stack and writes a Tier 2 csrfile directly from a stream of edges.
Bulk loading is the high-throughput equivalent of running many txn.Commit calls back-to-back. The v1 implementation pipes edges into an in-memory adjacency list and then writes the resulting CSR through csrfile.WriteToFile; a future revision will introduce an external k-way merge sort for graphs that exceed memory.
Example ¶
Example bulk-loads a small graph: edges are streamed through a Loader that bypasses the transactional WAL stack and writes a Tier 2 csrfile directly. Finalise returns the row count and the in-memory CSR; the file is then reopened to confirm it landed on disk.
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/FlavioCFOliveira/GoGraph/store/bulk"
"github.com/FlavioCFOliveira/GoGraph/store/csrfile"
)
func main() {
dir, err := os.MkdirTemp("", "bulk-example")
if err != nil {
panic(err)
}
defer func() { _ = os.RemoveAll(dir) }()
out := filepath.Join(dir, "graph.csr")
l := bulk.New(bulk.Options{OutputPath: out, Directed: true})
// Feed edges one at a time and in a batch; both paths funnel into
// the same in-memory adjacency list.
if err := l.Add(bulk.Edge{Src: "a", Dst: "b", Weight: 1}); err != nil {
panic(err)
}
if err := l.AddBatch([]bulk.Edge{
{Src: "b", Dst: "c", Weight: 2},
{Src: "c", Dst: "a", Weight: 3},
}); err != nil {
panic(err)
}
// Finalise flushes the accumulated edges to the csrfile and returns
// the row count plus the resulting CSR snapshot.
rows, c, err := l.Finalise()
if err != nil {
panic(err)
}
fmt.Printf("rows=%d csr-order=%d csr-size=%d\n", rows, c.Order(), c.Size())
// Reopen the written file to confirm the bulk load is durable.
r, err := csrfile.Open(out)
if err != nil {
panic(err)
}
defer func() { _ = r.Close() }()
fmt.Printf("on-disk edges=%d\n", r.Header().NEdges)
}
Output: rows=3 csr-order=3 csr-size=3 on-disk edges=3
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrTooManyRows = errors.New("bulk: row cap exceeded")
ErrTooManyRows is returned by Loader.Add, Loader.AddBatch, and Loader.Drain when the configured Options.MaxRows cap is exceeded.
Functions ¶
This section is empty.
Types ¶
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
Loader streams edges through an in-memory adjacency list and writes the resulting Tier 2 csrfile when Loader.Finalise runs.
Loader is not safe for concurrent use; callers that wish to parallelise ingestion should partition the edge stream upstream and call separate Loaders, then merge — but the v1 expectation is a single ingest goroutine.
func (*Loader) Add ¶
Add ingests one edge. Returns ErrTooManyRows when the row cap is exceeded.
func (*Loader) AddBatch ¶
AddBatch ingests a contiguous batch of edges. Returns ErrTooManyRows on the first edge that would cross the cap; edges accepted before that point remain ingested.
func (*Loader) Drain ¶
Drain consumes from ch until it is closed or ctx is cancelled. Returns the number of edges drained and any error from the input channel (ErrTooManyRows when the row cap is exceeded).
type Options ¶
type Options struct {
// OutputPath is the destination csrfile.
OutputPath string
// Directed selects the adjacency-list configuration.
Directed bool
// Multigraph allows parallel edges in the loaded graph.
Multigraph bool
// MaxRows, when > 0, caps the number of edge records the loader
// will ingest. Add / AddBatch / Drain return [ErrTooManyRows]
// on the row that crosses the cap. Default (0) is unbounded.
MaxRows int
}
Options configures the Loader.