imports

package
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package imports manages tabular-source imports with a TTL-tracked on-disk pool. It is the home of the pulse_import / pulse_drop tool semantics: convert a CSV / TSV / NDJSON / JSON-array / Parquet / Arrow / Excel file into a .pulse file in $PULSE_DATA_DIR/imports/, write a sidecar with an expiry, and provide hooks for sliding-window TTL renewal on every subsequent operation that touches the handle.

Pulse-native sources (.pulse files) are short-circuited: they pass through with no copy and no sidecar, so the user-curated pool stays outside the managed area's sweeper.

Index

Constants

View Source
const DefaultImportsDir = "imports"

DefaultImportsDir is the relative directory inside the Pulse fs where managed imports live. Overridable via PULSE_IMPORTS_DIR. The directory is created lazily on first import.

View Source
const DefaultTTL = 7 * 24 * time.Hour

DefaultTTL is the lifetime applied to a managed import when the caller does not pass one explicitly. Sliding window: every subsequent operation against the handle bumps the expiry forward by TTL.

View Source
const SidecarSuffix = ".meta.json"

SidecarSuffix is appended to a managed .pulse handle's path to form the sidecar metadata filename. "data.pulse" + SidecarSuffix = "data.pulse.meta.json".

Variables

This section is empty.

Functions

func FormatTTL

func FormatTTL(d time.Duration) string

FormatTTL renders a Duration back into the shortest unambiguous representation. Used in CLI list output and sidecar diagnostics. Negative durations render as "pin"; zero renders as "0s" for compactness across humans + parsers.

func ParseTTL

func ParseTTL(s string) (time.Duration, error)

ParseTTL converts a human-friendly TTL string into a time.Duration. Accepted forms:

  • Go duration strings: "1h", "30m", "3600s", "1h30m", "500ms"
  • Day suffix: "7d" => 7*24h, "1d" => 24h. Combine via Go-duration additivity is not supported ("1d6h" is rejected) — keep parsing surface narrow; callers wanting hybrid units should use Go form.
  • "0" or "" => 0 (caller decides what zero means: in Open it falls back to DefaultTTL; the empty-string case is rejected by callers that require an explicit TTL).
  • "pin", "none", "-1": parse to negative => pinned (no expiry).

Returns an error wrapping the original input on any parse failure; the error is intentionally bare (no error code) — callers that want a CodedError wrap this at their boundary.

Types

type Entry

type Entry struct {
	Handle    string    `json:"handle"`
	Path      string    `json:"path"`
	Sidecar   Sidecar   `json:"sidecar"`
	ExpiresIn string    `json:"expires_in,omitempty"`
	Pinned    bool      `json:"pinned"`
	Expired   bool      `json:"expired"`
	Now       time.Time `json:"-"`
}

Entry is one row of Manager.List output — the snapshot returned to the CLI's `imports list` leaf and the MCP introspection paths.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns the managed-import pool: lazy mkdir of the imports directory, sidecar I/O, sliding-window touches, and sweeps. One Manager per Pulse instance; the root facade constructs and holds it.

Two filesystems:

  • afs: the rooted Pulse fs (typically BasePathFs(OsFs, PULSE_DATA_DIR)). Used for the managed pool (target .pulse files + sidecars) and for reading relative-path sources.
  • srcFs: the source-reading fs. Used when SourcePath is absolute so callers can import files from anywhere within the jail without first copying them under PULSE_DATA_DIR. Defaults to afero.NewOsFs() in production; MemMapFs in hermetic tests.

jailRoot is the cleaned absolute root under which absolute source paths must resolve. Empty string means "no jail" (callers who explicitly pass their own SourceFS opt in to managing access themselves). Default in production is os.Getwd() at New time so an MCP server invocation can only reach files under the directory it was launched from.

func New

func New(afs afero.Fs, opts Options) (*Manager, error)

New constructs a Manager rooted at the given afero.Fs. The Manager reads PULSE_IMPORTS_DIR and PULSE_IMPORT_TTL once at construction; later changes to those env vars do not take effect until a new Manager is created.

func (*Manager) DefaultTTL

func (m *Manager) DefaultTTL() time.Duration

DefaultTTL returns the resolved default TTL. Stable across the Manager's lifetime.

func (*Manager) Drop

func (m *Manager) Drop(_ context.Context, handle string) error

Drop removes a managed handle (and its sidecar) from the pool. Returns PULSE_IMPORT_SOURCE_MISSING when the handle is unknown. Drop is a no-op for non-managed paths.

func (*Manager) ImportsDir

func (m *Manager) ImportsDir() string

ImportsDir returns the resolved imports directory (relative to the Pulse fs root). Stable across the Manager's lifetime.

func (*Manager) IsManagedPath

func (m *Manager) IsManagedPath(p string) bool

IsManagedPath reports whether the given path lives inside the imports directory. Used by callers that want to gate behaviour on "is this a managed handle?" without round-tripping through Resolve.

func (*Manager) JailRoot

func (m *Manager) JailRoot() string

JailRoot returns the cleaned absolute root the Manager confines absolute source paths to, or the empty string when no jail is enforced (i.e., the caller passed an explicit Options.SourceFS).

func (*Manager) List

func (m *Manager) List(_ context.Context) ([]Entry, error)

List returns the current state of the managed-imports pool. Sweep is not invoked; expired entries are flagged via Entry.Expired so callers can render them. Results are sorted by handle name.

func (*Manager) Open

func (m *Manager) Open(ctx context.Context, spec Spec) (*Result, error)

Open imports the source described by spec and returns a Result. Pulse-native sources short-circuit: no copy, no sidecar, Managed=false. All other formats produce a managed handle in the imports pool with a sidecar. Each successful Open opportunistically sweeps expired handles before returning, so the pool self-cleans without a daemon.

func (*Manager) Resolve

func (m *Manager) Resolve(_ context.Context, handle string) (string, error)

Resolve returns the managed .pulse path for the given handle, or PULSE_IMPORT_SOURCE_MISSING when no such handle exists. Used by the CLI when callers want to address a handle by name rather than by path.

func (*Manager) Sweep

func (m *Manager) Sweep(_ context.Context) ([]string, error)

Sweep deletes every managed handle whose sidecar has expired against the Manager's clock. Returns the list of handles that were swept (in alphabetical order) and the first error encountered, if any. A fs.ErrNotExist on the imports dir is not an error; an empty pool returns ([], nil).

func (*Manager) Touch

func (m *Manager) Touch(_ context.Context, p string) error

Touch bumps the sliding-window expiry for the managed handle whose .pulse file sits at the given path. No-op when the path is outside the imports directory, when no sidecar exists, or when the import is pinned. Safe to call from any operation that reads a managed handle (Inspect / Predict / Process / Sample / Facet / Ask).

type Options

type Options struct {
	// ImportsDir overrides the default relative directory. Honoured
	// before the PULSE_IMPORTS_DIR env var.
	ImportsDir string

	// DefaultTTL overrides the default TTL. Honoured before the
	// PULSE_IMPORT_TTL env var. Zero falls back to env, then to
	// the package DefaultTTL.
	DefaultTTL time.Duration

	// SourceFS is the filesystem used when SourcePath is absolute. When
	// nil, the Manager defaults to afero.NewOsFs() jailed to
	// SourceJailRoot. Set explicitly when callers want to manage
	// access themselves (e.g., a pre-chrooted MemMapFs); doing so
	// disables the jail check — the explicit fs IS the boundary.
	SourceFS afero.Fs

	// SourceJailRoot constrains the default SourceFS to a directory.
	// Empty string defaults to os.Getwd() at construction so an MCP
	// server invocation reaches only files under the directory it was
	// launched from. Ignored when SourceFS is set explicitly.
	SourceJailRoot string

	// Now is the clock; injected for testing. Defaults to time.Now.
	Now func() time.Time
}

Options configure Manager construction. Zero values are valid: the Manager uses the package defaults and reads env vars at New time.

type Result

type Result struct {
	Handle       string           `json:"handle"`
	Path         string           `json:"path"`
	Format       string           `json:"format"`
	Managed      bool             `json:"managed"`
	RowsImported int              `json:"rows_imported,omitempty"`
	ImportedAt   time.Time        `json:"imported_at,omitzero"`
	ExpiresAt    *time.Time       `json:"expires_at,omitempty"`
	TTLSeconds   int64            `json:"ttl_seconds,omitempty"`
	Schema       *encoding.Schema `json:"-"`
}

Result describes the outcome of a managed-import call. Managed=false indicates pulse-passthrough (no sidecar, no TTL); the handle's Path is the source path verbatim. Otherwise Path points inside the managed area and ExpiresAt / Sidecar are populated.

type Sidecar

type Sidecar struct {
	Handle       string    `json:"handle"`
	Format       string    `json:"format"`
	SourcePath   string    `json:"source_path"`
	SourceFormat string    `json:"source_format"`
	ImportedAt   time.Time `json:"imported_at"`
	ExpiresAt    time.Time `json:"expires_at,omitzero"`
	TTLSeconds   int64     `json:"ttl_seconds"`
	RowsImported int       `json:"rows_imported"`
}

Sidecar is the JSON payload written next to a managed .pulse file. One sidecar per managed handle. Pinned handles set TTLSeconds=0 and leave ExpiresAt as the zero time (omitted from JSON via omitempty).

func (Sidecar) Pinned

func (s Sidecar) Pinned() bool

Pinned reports whether the sidecar represents an expiry-free import. Pinned imports survive sweeps. TTLSeconds <= 0 is the canonical signal; ExpiresAt is also zero in that case.

type Spec

type Spec struct {
	// SourcePath is the filesystem path of the source file, relative to
	// the Pulse fs root. Required when InlineBytes is unset.
	SourcePath string

	// Format overrides extension-based detection. Use the identifiers
	// from io/format (csv, tsv, ndjson, jsonarray, parquet, arrow,
	// excel, pulse). When empty, FromExt(SourcePath) is used.
	Format string

	// Handle is the desired managed name (without extension). Defaults
	// to the source basename with the original extension stripped.
	// Collisions error unless Overwrite is true.
	Handle string

	// TTL governs how long the import survives in the managed pool.
	// Zero falls back to DefaultTTL. Negative values pin the import
	// (never sweep, never expire).
	TTL time.Duration

	// Sheet is honoured only for Excel sources; ignored otherwise.
	Sheet string

	// Overwrite replaces an existing managed handle of the same name.
	// Defaults to false (collision → PULSE_IMPORT_HANDLE_EXISTS).
	Overwrite bool

	// InlineBytes carries the raw source bytes for in-memory imports.
	// Format must be set explicitly when this is non-nil.
	InlineBytes []byte
}

Spec describes one import request — the input to Manager.Open. Either SourcePath is set (the common path: import from a file on the Pulse filesystem) or InlineBytes is set (rare: import from an in-memory blob, supplying Format explicitly). Mixing both errors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL