sas_ingester

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 30 Imported by: 0

README

sas_ingester — file ingestion pipeline

sas_ingester provides a complete file ingestion system with resumable uploads (tus protocol), security scanning, metadata extraction, and webhook routing.

Upload (tus)
    │
    ▼
  Receive & Chunk ──► SHA-256 dedup check
    │
    ▼
  Metadata extraction (MIME, entropy, magic bytes, trailer)
    │
    ▼
  Security scan (zip bomb, polyglot, macro, ClamAV)
    │
    ▼
  Prompt injection scan (pattern-based)
    │
    ▼
  Webhook routing (opaque or jwt_passthru)

Quick start

cfg, _ := sas_ingester.LoadConfig("config.yaml")

ing, _ := sas_ingester.NewIngester(cfg,
    sas_ingester.WithIDGenerator(idgen.Prefixed("dos_", idgen.Default)),
    sas_ingester.WithAudit(auditLogger),
)
result, _ := ing.Ingest(fileReader, dossierID, ownerSub)

Resumable uploads (tus)

tus := sas_ingester.NewTusHandler(store, cfg, idgen.Default)
upload, _ := tus.Create(dossierID, ownerSub, totalSize)
tus.Patch(upload.UploadID, 0, chunkReader)
tus.Complete(upload.UploadID)

Security scanning

Check Detection
Zip bomb > 10 PK headers in < 1 MiB
Polyglot Multiple magic numbers in one file
Macro OLE2/VBA signatures in Office files
ClamAV INSTREAM protocol via Unix socket
Prompt injection Pattern-based (jailbreak, delimiter, override)

Webhook routing

Two auth modes for downstream consumers:

Mode Identity Use case
opaque_only Dossier ID only Privacy-preserving
jwt_passthru Original JWT forwarded Authenticated pipelines

Webhooks are signed with HMAC-SHA256 (X-Signature-256). Failed deliveries retry with exponential backoff (max 5 attempts).

Schema

5 tables: dossiers, pieces, chunks, routes_pending, tus_uploads.

Exported API

Symbol Description
Ingester Full pipeline orchestrator
NewIngester(cfg, opts) Create ingester
TusHandler Resumable upload manager
Store SQLite persistence layer
Router Webhook fan-out + retry
Config, LoadConfig(path) YAML configuration
ParseJWT(token, secret) JWT validation

Documentation

Overview

CLAUDE:SUMMARY Writes markdown files with YAML frontmatter into the HORAG buffer directory for vectorization. CLAUDE:DEPENDS docpipe (via MarkdownConverter callback) CLAUDE:EXPORTS BufferWriter, NewBufferWriter, WithBufferWriter

CLAUDE:SUMMARY Registers sas_ingester service handlers on a connectivity.Router. CLAUDE:DEPENDS connectivity, ingester, store CLAUDE:EXPORTS RegisterConnectivity

CLAUDE:SUMMARY Defines MarkdownConverter + KeyResolver callback types, and convertToMarkdown pipeline step (5.5). CLAUDE:DEPENDS sas_chunker, store CLAUDE:EXPORTS MarkdownConverter, KeyResolver

CLAUDE:SUMMARY Registers sas_ingester MCP tools via kit.RegisterMCPTool — 6 tools for LLM access. CLAUDE:DEPENDS kit, mcp, ingester, store CLAUDE:EXPORTS RegisterMCP

Index

Constants

View Source
const MaxBase64Bytes = 10 * 1024 * 1024

MaxBase64Bytes is the maximum decoded file size accepted via base64 upload (10 MB). For larger files, use TUS resumable upload over HTTP.

Variables

This section is empty.

Functions

func ExtractDossierID

func ExtractDossierID(claims *JWTClaims) string

ExtractDossierID resolves the dossier_id from the JWT claim. Returns an empty string if the JWT does not carry a dossier_id — the caller must then generate an opaque server-side ID (never derive from Sub).

func HasHomoglyphMixing

func HasHomoglyphMixing(text string) bool

HasHomoglyphMixing detects mixed Latin/Cyrillic in single words (visual obfuscation). Delegates to injection.HasHomoglyphMixing.

func MetadataJSON

func MetadataJSON(m *FileMetadata) string

MetadataJSON returns metadata as a JSON string for storage.

func ParseUploadLength

func ParseUploadLength(s string) (int64, error)

ParseUploadLength parses the Upload-Length header value.

func ParseUploadOffset

func ParseUploadOffset(s string) (int64, error)

ParseUploadOffset parses the Upload-Offset header value.

func RegisterConnectivity

func RegisterConnectivity(router *connectivity.Router, ing *Ingester)

RegisterConnectivity registers sas_ingester service handlers on a connectivity Router.

All handlers require authentication: either owner_sub (pre-authenticated service-to-service) or horoskey (API key, resolved via KeyResolver).

Registered services:

sas_create_context  — create a dossier for piece ingestion (requires auth)
sas_upload_piece    — upload a file as base64 (≤10 MB), run full pipeline
sas_query_piece     — get piece metadata + markdown availability
sas_list_pieces     — list pieces in a dossier (optional state filter)
sas_get_markdown    — retrieve markdown text for a piece
sas_retry_routes    — retry failed webhook deliveries for a piece

func RegisterMCP

func RegisterMCP(srv *mcp.Server, ing *Ingester)

RegisterMCP registers sas_ingester tools on an MCP server.

All tools require a horoskey (API key) for authentication. The key is resolved via Ingester.KeyResolver to identify the owner and authorize the operation.

Tools: sas_create_context, sas_upload_piece, sas_query_piece, sas_list_pieces, sas_get_markdown, sas_retry_routes.

func StripZeroWidthChars

func StripZeroWidthChars(text string) string

StripZeroWidthChars removes zero-width Unicode characters used for steganographic injection. Delegates to injection.StripInvisible which covers a broader set of invisible characters.

Types

type BufferWriter

type BufferWriter struct {
	// contains filtered or unexported fields
}

BufferWriter writes .md files with YAML frontmatter into the HORAG buffer directory. HORAG's watcher picks them up for chunking and embedding.

Nil-safe: calling Write on a nil *BufferWriter is a no-op.

func NewBufferWriter

func NewBufferWriter(bufferDir string) *BufferWriter

NewBufferWriter creates a BufferWriter that writes to the given directory. Returns nil if bufferDir is empty (feature disabled).

func (*BufferWriter) Write

func (bw *BufferWriter) Write(_ context.Context, dossierID, sha256, title, markdown string) error

Write creates a .md file with YAML frontmatter in the buffer directory. Uses atomic write (tmp → rename) to prevent HORAG from reading partial files.

Frontmatter fields match what HORAG watcher expects:

  • id: sha256 hash (unique per document)
  • dossier_id: tenant routing
  • content_hash: dedup key ("sha256:<hash>")
  • source_type: "document" (distinguishes from veille sources)
  • title: original filename or extracted title

type ClamAVConfig

type ClamAVConfig struct {
	Enabled    bool   `yaml:"enabled"`
	SocketPath string `yaml:"socket_path"`
}

ClamAVConfig configures the ClamAV scanner.

type Config

type Config struct {
	Listen      string          `yaml:"listen"`
	DBPath      string          `yaml:"db_path"`
	ChunksDir   string          `yaml:"chunks_dir"`
	MaxFileMB   int             `yaml:"max_file_mb"`
	ChunkSizeMB int             `yaml:"chunk_size_mb"`
	ClamAV      ClamAVConfig    `yaml:"clamav"`
	Webhooks    []WebhookTarget `yaml:"webhooks"`
	JWTSecret   string          `yaml:"jwt_secret"`
	BufferDir   string          `yaml:"buffer_dir"` // HORAG buffer dir for .md files (empty = disabled)
}

Config holds the full sas_ingester configuration.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns sane defaults.

func LoadConfig

func LoadConfig(path string) (*Config, error)

LoadConfig reads and parses a YAML config file. Returns DefaultConfig merged with the file.

func (*Config) ChunkSizeBytes

func (c *Config) ChunkSizeBytes() int64

ChunkSizeBytes returns chunk size in bytes.

func (*Config) MaxFileBytes

func (c *Config) MaxFileBytes() int64

MaxFileBytes returns max file size in bytes.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks that required fields are present and values are sane.

type Dossier

type Dossier struct {
	ID          string `json:"id"`
	OwnerJWTSub string `json:"-"`
	Name        string `json:"name,omitempty"`
	Routes      string `json:"routes,omitempty"` // JSON array of DossierRoute
	CreatedAt   string `json:"created_at"`
}

Dossier represents a dossier row.

func (*Dossier) ParsedRoutes

func (d *Dossier) ParsedRoutes() []DossierRoute

ParsedRoutes returns the per-dossier routes parsed from JSON. Returns nil if the column is empty or invalid.

type DossierRoute

type DossierRoute struct {
	URL           string `json:"url"`
	AuthMode      string `json:"auth_mode"`        // opaque_only | jwt_passthru
	Secret        string `json:"secret,omitempty"` // HMAC signing key
	RequireReview bool   `json:"require_review,omitempty"`
}

DossierRoute defines a per-dossier webhook destination stored as JSON in the dossiers.routes column. When present, these override the global webhook config for that dossier.

type FileMetadata

type FileMetadata struct {
	MIME        string       `json:"mime"`
	Extension   string       `json:"extension,omitempty"`
	Entropy     float64      `json:"entropy"`
	IsText      bool         `json:"is_text"`
	IsBinary    bool         `json:"is_binary"`
	MagicHeader string       `json:"magic_header,omitempty"`
	Trailer     *TrailerInfo `json:"trailer,omitempty"`
}

FileMetadata holds extracted metadata for a piece.

func ExtractFullMetadata

func ExtractFullMetadata(chunkDir string, chunkCount int) (*FileMetadata, error)

ExtractFullMetadata reads the first chunk for header metadata and the last chunk for trailer analysis (PDF %%EOF/startxref, ZIP EOCD). If chunkDir has only one chunk, it is used for both header and trailer. Entropy is computed across all chunks for a more accurate measurement.

func ExtractMetadata

func ExtractMetadata(filePath string) (*FileMetadata, error)

ExtractMetadata reads the first bytes of a file and extracts structural metadata.

type IngestResult

type IngestResult struct {
	SHA256       string           `json:"sha256"`
	SizeBytes    int64            `json:"size_bytes"`
	DossierID    string           `json:"dossier_id"`
	State        string           `json:"state"`
	MIME         string           `json:"mime,omitempty"`
	Deduplicated bool             `json:"deduplicated,omitempty"`
	Scan         *ScanResult      `json:"scan,omitempty"`
	Injection    *InjectionResult `json:"injection,omitempty"`
	MarkdownText string           `json:"markdown_text,omitempty"`
}

IngestResult is the result of a full ingestion pipeline run.

type Ingester

type Ingester struct {
	Store             *Store
	Config            *Config
	Router            *Router
	Audit             *observability.AuditLogger
	Metrics           *observability.MetricsManager
	Events            *observability.EventLogger
	NewID             idgen.Generator
	MarkdownConverter MarkdownConverter // optional, set via WithMarkdownConverter
	BufferWriter      *BufferWriter     // optional, set via WithBufferWriter — writes .md to HORAG buffer
	KeyResolver       KeyResolver       // required for MCP/connectivity auth, set via WithKeyResolver
	ShardCatalog      ShardCatalog      // optional, set via WithShardCatalog — registers dossiers in usertenant
}

Ingester is the main pipeline orchestrator.

func NewIngester

func NewIngester(cfg *Config, opts ...IngesterOption) (*Ingester, error)

NewIngester creates a fully wired ingester.

func (*Ingester) Close

func (ing *Ingester) Close() error

Close releases resources.

func (*Ingester) Ingest

func (ing *Ingester) Ingest(r io.Reader, dossierID, ownerSub string) (*IngestResult, error)

Ingest runs the full pipeline for a single upload:

  1. Receive file → chunk + hash + dedup
  2. Extract metadata
  3. Security scan (ClamAV, zip bomb, polyglot, macro)
  4. Prompt injection scan
  5. Update piece state
  6. Enqueue webhook routes

Identity cutoff: ownerSub is used only for EnsureDossier and the pre-cutoff audit log. After that, it is erased and never passed downstream.

func (*Ingester) IngestFromUpload

func (ing *Ingester) IngestFromUpload(upload *UploadResult, dossierID, ownerSub string) (*IngestResult, error)

IngestFromUpload runs pipeline steps 2-6 for an upload that was already received (e.g. via tus resumable upload). The UploadResult must have SHA256, SizeBytes, and ChunkCount set; the piece must already exist in the DB.

func (*Ingester) IngestFromUploadWithToken

func (ing *Ingester) IngestFromUploadWithToken(upload *UploadResult, dossierID, ownerSub, originalToken string) (*IngestResult, error)

IngestFromUploadWithToken is like IngestFromUpload but captures the original JWT token for jwt_passthru routes.

func (*Ingester) IngestWithToken

func (ing *Ingester) IngestWithToken(r io.Reader, dossierID, ownerSub, originalToken string) (*IngestResult, error)

IngestWithToken is like Ingest but also captures the original JWT token for jwt_passthru routes. The token is cleared after enqueuing routes.

func (*Ingester) RecoverStalePieces

func (ing *Ingester) RecoverStalePieces()

RecoverStalePieces finds pieces stuck in intermediate states (received, scanned) from a previous crash and marks them for re-processing. Call this once at boot before accepting new uploads.

type IngesterOption

type IngesterOption func(*Ingester)

IngesterOption configures an Ingester.

func WithAudit

WithAudit sets the audit logger.

func WithBufferWriter

func WithBufferWriter(bw *BufferWriter) IngesterOption

WithBufferWriter sets the BufferWriter for writing markdown to the HORAG buffer. The writer is called after step 5.5 (markdown conversion) for ready pieces. If nil (default), the buffer step is silently skipped.

func WithEvents

WithEvents sets the event logger.

func WithIDGenerator

func WithIDGenerator(g idgen.Generator) IngesterOption

WithIDGenerator sets the ID generator for dossier IDs.

func WithKeyResolver

func WithKeyResolver(fn KeyResolver) IngesterOption

WithKeyResolver sets the callback that resolves a horoskey (API key) to an owner identity. Required for MCP and connectivity handlers to authenticate callers. Without it, all authenticated handlers reject with an error.

func WithMarkdownConverter

func WithMarkdownConverter(fn MarkdownConverter) IngesterOption

WithMarkdownConverter sets the callback that converts ingested files to markdown. The converter is called in step 5.5 of the pipeline, between metadata update and route enqueuing. If nil, the markdown step is silently skipped.

func WithMetrics

WithMetrics sets the metrics manager.

func WithShardCatalog

func WithShardCatalog(sc ShardCatalog) IngesterOption

WithShardCatalog sets the shard catalog for registering dossiers in usertenant. If nil, catalog registration is skipped (best-effort).

type InjectionResult

type InjectionResult struct {
	Risk    string   `json:"risk"`
	Matches []string `json:"matches,omitempty"`
}

InjectionResult holds the outcome of prompt injection scanning.

func ScanChunksInjection

func ScanChunksInjection(chunksDir string, chunkCount int) *InjectionResult

ScanChunksInjection scans all chunk files for a piece and returns the worst risk.

func ScanInjection

func ScanInjection(text string) *InjectionResult

ScanInjection scans text content for prompt injection patterns. Delegates to the injection package for normalize + intent matching.

type JWTClaims

type JWTClaims struct {
	Sub       string `json:"sub"`
	DossierID string `json:"dossier_id,omitempty"`
	Exp       int64  `json:"exp,omitempty"`
	Iat       int64  `json:"iat,omitempty"`
}

JWTClaims holds the decoded JWT payload relevant to identity.

func ParseJWT

func ParseJWT(tokenStr, secret string) (*JWTClaims, error)

ParseJWT decodes a HS256 JWT token and validates the signature + expiry. Returns the claims on success.

type KeyResolver

type KeyResolver func(ctx context.Context, horoskey string) (ownerSub string, err error)

KeyResolver resolves a horoskey (API key delivered to LLMs) to the owner identity (JWT sub) for billing and authorization. Returns the ownerSub string, or error if the key is invalid/expired/revoked.

This is a callback: sas_ingester defines the interface, the binary or the auth layer provides the implementation (e.g. lookup in horos_ID).

type MarkdownConverter

type MarkdownConverter func(ctx context.Context, filePath string, mime string) (string, error)

MarkdownConverter converts a file at the given path to markdown. The MIME type is provided for format hints. Returns markdown text (with frontmatter) or error.

This is a callback: sas_ingester defines the interface, the binary provides the implementation (e.g. docpipe). This avoids a circular import since docpipe lives in chrc/ which imports hazyhaar_pkg.

type OpaquePayload

type OpaquePayload struct {
	Event     string        `json:"event"`
	DossierID string        `json:"dossier_id"`
	SHA256    string        `json:"sha256"`
	SizeBytes int64         `json:"size_bytes"`
	State     string        `json:"state"`
	MIME      string        `json:"mime,omitempty"`
	Metadata  *FileMetadata `json:"metadata,omitempty"`
	Timestamp string        `json:"timestamp"`
}

OpaquePayload is the JSON body sent to opaque_only webhook targets. It never contains user identity information — only the opaque dossier_id.

type PassthruPayload

type PassthruPayload struct {
	Event     string        `json:"event"`
	DossierID string        `json:"dossier_id"`
	SHA256    string        `json:"sha256"`
	SizeBytes int64         `json:"size_bytes"`
	State     string        `json:"state"`
	MIME      string        `json:"mime,omitempty"`
	Metadata  *FileMetadata `json:"metadata,omitempty"`
	Timestamp string        `json:"timestamp"`
}

PassthruPayload is the JSON body sent to jwt_passthru webhook targets. It includes the opaque dossier_id and the piece details.

type Piece

type Piece struct {
	SHA256        string `json:"sha256"`
	DossierID     string `json:"dossier_id"`
	State         string `json:"state"`
	MIME          string `json:"mime,omitempty"`
	SizeBytes     int64  `json:"size_bytes,omitempty"`
	Metadata      string `json:"metadata,omitempty"`
	InjectionRisk string `json:"injection_risk"`
	ClamAVStatus  string `json:"clamav_status"`
	CreatedAt     string `json:"created_at"`
	UpdatedAt     string `json:"updated_at"`
}

Piece represents a piece row.

type PieceMarkdown

type PieceMarkdown struct {
	SHA256    string `json:"sha256"`
	DossierID string `json:"dossier_id"`
	Markdown  string `json:"markdown"`
	CreatedAt string `json:"created_at"`
}

PieceMarkdown represents a row in pieces_markdown.

type RoutePending

type RoutePending struct {
	PieceSHA256   string `json:"piece_sha256"`
	DossierID     string `json:"dossier_id"`
	Target        string `json:"target"`
	AuthMode      string `json:"auth_mode"` // opaque_only | jwt_passthru
	RequireReview bool   `json:"require_review"`
	Reviewed      bool   `json:"reviewed"`
	Attempts      int    `json:"attempts"`
	LastError     string `json:"last_error,omitempty"`
	NextRetryAt   string `json:"next_retry_at,omitempty"`
	OriginalToken string `json:"-"` // JWT for jwt_passthru; never serialized
}

RoutePending represents a pending route delivery.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router manages webhook fan-out and retries.

func NewRouter

func NewRouter(store *Store, cfg *Config) *Router

NewRouter creates a new webhook router.

func (*Router) Deliver

func (rt *Router) Deliver(route *RoutePending, piece *Piece) bool

Deliver attempts to deliver a single route. Returns true if successful.

func (*Router) EnqueueRoutes

func (rt *Router) EnqueueRoutes(piece *Piece) error

EnqueueRoutes creates pending routes for a piece (without JWT passthru).

func (*Router) EnqueueRoutesWithToken

func (rt *Router) EnqueueRoutesWithToken(piece *Piece, originalToken string) error

EnqueueRoutesWithToken creates pending routes for a piece. If the dossier has per-dossier routes (dossiers.routes JSON), those are used exclusively. Otherwise, the global webhook config is used as a fallback. originalToken is stored for jwt_passthru routes; empty for opaque_only.

func (*Router) ProcessRetries

func (rt *Router) ProcessRetries()

ProcessRetries processes all routes due for retry.

type ScanResult

type ScanResult struct {
	ClamAV   string   `json:"clamav"`
	Warnings []string `json:"warnings,omitempty"`
	Blocked  bool     `json:"blocked"`
}

ScanResult holds the outcome of security scanning a file.

func ScanChunks

func ScanChunks(chunkDir string, chunkCount int, cfg *Config) (*ScanResult, error)

ScanChunks runs security checks across multiple chunk files. It scans the first chunk for magic bytes, polyglot, zip bomb, and macro detection. The last chunk is also scanned for polyglot and macro (trailing payloads). ClamAV is invoked on each chunk.

func ScanFile

func ScanFile(filePath string, cfg *Config) (*ScanResult, error)

ScanFile runs all security checks on a single file without loading it entirely into memory. Structural checks use an 8 KiB header + file size; ClamAV reads the file itself via its socket protocol.

type ShardCatalog

type ShardCatalog interface {
	EnsureShard(ctx context.Context, dossierID, ownerID, name string) error
}

ShardCatalog registers dossiers in the usertenant catalog. Implemented by *tenant.Pool. Optional: if nil, catalog registration is skipped.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store wraps an SQLite database for the Sas Ingester state machine.

func OpenStore

func OpenStore(path string) (*Store, error)

OpenStore opens (or creates) the SQLite database at path and runs migrations.

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database connection.

func (*Store) CompleteTusUpload

func (s *Store) CompleteTusUpload(uploadID string) error

CompleteTusUpload marks a tus upload as completed.

func (*Store) CreateDossier

func (s *Store) CreateDossier(d *Dossier) error

CreateDossier inserts a new dossier.

func (*Store) CreateTusUpload

func (s *Store) CreateTusUpload(u *TusUpload) error

CreateTusUpload inserts a new tus upload record.

func (*Store) DB

func (s *Store) DB() *sql.DB

DB returns the underlying *sql.DB for sharing with audit/trace layers.

func (*Store) DeleteDossier

func (s *Store) DeleteDossier(id string) error

DeleteDossier deletes a dossier by ID. CASCADE on pieces → chunks + routes_pending handles cleanup.

func (*Store) DeleteRoute

func (s *Store) DeleteRoute(pieceSHA256, dossierID, target string) error

DeleteRoute removes a completed route.

func (*Store) DeleteTusUpload

func (s *Store) DeleteTusUpload(uploadID string) error

DeleteTusUpload removes a tus upload record.

func (*Store) EnsureDossier

func (s *Store) EnsureDossier(id, ownerSub string) error

EnsureDossier creates the dossier if it doesn't exist, or verifies owner match.

func (*Store) GetDossier

func (s *Store) GetDossier(id string) (*Dossier, error)

GetDossier returns a dossier by ID. Returns nil, nil if not found.

func (*Store) GetMarkdown

func (s *Store) GetMarkdown(sha256, dossierID string) (string, error)

GetMarkdown returns the markdown text for a piece. Returns "", nil if not found.

func (*Store) GetPiece

func (s *Store) GetPiece(sha256, dossierID string) (*Piece, error)

GetPiece returns a piece by SHA256 and dossier_id. Returns nil, nil if not found.

func (*Store) GetTusUpload

func (s *Store) GetTusUpload(uploadID string) (*TusUpload, error)

GetTusUpload returns a tus upload by ID. Returns nil, nil if not found.

func (*Store) HasMarkdown

func (s *Store) HasMarkdown(sha256, dossierID string) (bool, error)

HasMarkdown checks if a markdown conversion exists for a piece.

func (*Store) InsertChunk

func (s *Store) InsertChunk(pieceSHA256, dossierID string, idx int, chunkSHA256 string, received bool) error

InsertChunk inserts a chunk tracking row.

func (*Store) InsertPiece

func (s *Store) InsertPiece(p *Piece) error

InsertPiece inserts a new piece row.

func (*Store) InsertRoute

func (s *Store) InsertRoute(r *RoutePending) error

InsertRoute inserts a pending route.

func (*Store) ListMarkdownByDossier

func (s *Store) ListMarkdownByDossier(dossierID string) ([]*PieceMarkdown, error)

ListMarkdownByDossier returns all markdown entries for a dossier.

func (*Store) ListPieces

func (s *Store) ListPieces(dossierID string) ([]*Piece, error)

ListPieces returns all pieces for a dossier.

func (*Store) ListPiecesByState

func (s *Store) ListPiecesByState(state string) ([]*Piece, error)

ListPiecesByState returns pieces in the given state (for crash recovery).

func (*Store) ListRetryableRoutes

func (s *Store) ListRetryableRoutes(now string) ([]*RoutePending, error)

ListRetryableRoutes returns routes due for retry.

func (*Store) ListRoutes

func (s *Store) ListRoutes(pieceSHA256, dossierID string) ([]*RoutePending, error)

ListRoutes returns pending routes for a piece.

func (*Store) MarkChunkReceived

func (s *Store) MarkChunkReceived(pieceSHA256, dossierID string, idx int) error

MarkChunkReceived marks a chunk as received.

func (*Store) MarkRouteReviewed

func (s *Store) MarkRouteReviewed(pieceSHA256, dossierID, target string) error

MarkRouteReviewed marks a route as reviewed (approved for delivery).

func (*Store) PiecesCount

func (s *Store) PiecesCount(state string) (int, error)

PiecesCount returns the number of pieces in a given state, or all if state is empty.

func (*Store) SetDossierRoutes

func (s *Store) SetDossierRoutes(id string, routes []DossierRoute) error

SetDossierRoutes updates the per-dossier routes JSON. Pass nil to clear.

func (*Store) StoreMarkdown

func (s *Store) StoreMarkdown(sha256, dossierID, markdown string) error

StoreMarkdown inserts or replaces a markdown conversion for a piece.

func (*Store) UpdatePieceMetadata

func (s *Store) UpdatePieceMetadata(sha256, dossierID, mime, metadata, injectionRisk, clamavStatus, state string) error

UpdatePieceMetadata updates metadata-related fields of a piece.

func (*Store) UpdatePieceState

func (s *Store) UpdatePieceState(sha256, dossierID, state string) error

UpdatePieceState updates the state (and updated_at) of a piece.

func (*Store) UpdateRouteAttempt

func (s *Store) UpdateRouteAttempt(pieceSHA256, dossierID, target string, attempts int, lastError, nextRetryAt string) error

UpdateRouteAttempt updates retry state for a route.

func (*Store) UpdateTusOffset

func (s *Store) UpdateTusOffset(uploadID string, offset int64) error

UpdateTusOffset updates the offset and updated_at for a tus upload.

type TrailerInfo

type TrailerInfo struct {
	HasPDFEOF      bool    `json:"has_pdf_eof,omitempty"`
	PDFStartXRef   string  `json:"pdf_startxref,omitempty"`
	HasZIPEOCD     bool    `json:"has_zip_eocd,omitempty"`
	ZIPComment     string  `json:"zip_comment,omitempty"`
	TrailerEntropy float64 `json:"trailer_entropy"`
}

TrailerInfo holds structural information extracted from the end of a file.

type TusHandler

type TusHandler struct {
	// contains filtered or unexported fields
}

TusHandler manages tus resumable upload operations.

func NewTusHandler

func NewTusHandler(store *Store, cfg *Config, newID func() string) *TusHandler

NewTusHandler creates a new tus handler.

func (*TusHandler) Complete

func (h *TusHandler) Complete(uploadID string) (*UploadResult, error)

Complete finalises a tus upload: hashes the partial file, chunks it via SplitReader, and returns the result ready for the ingestion pipeline. The caller should then proceed with Ingest-like steps (scan, metadata, etc).

func (*TusHandler) Create

func (h *TusHandler) Create(dossierID, ownerSub string, totalSize int64) (*TusUpload, error)

Create initialises a new tus upload. Returns the upload ID.

func (*TusHandler) GetOffset

func (h *TusHandler) GetOffset(uploadID string) (*TusUpload, error)

GetOffset returns the current offset for a tus upload.

func (*TusHandler) Patch

func (h *TusHandler) Patch(uploadID string, clientOffset int64, body io.Reader) (int64, error)

Patch appends data to an existing tus upload. The caller must pass the Upload-Offset header value for consistency check. Returns the new offset after appending.

type TusUpload

type TusUpload struct {
	UploadID    string `json:"upload_id"`
	DossierID   string `json:"dossier_id"`
	OwnerJWTSub string `json:"-"`
	TotalSize   int64  `json:"total_size"`
	OffsetBytes int64  `json:"offset_bytes"`
	ChunkDir    string `json:"chunk_dir"`
	CreatedAt   string `json:"created_at"`
	UpdatedAt   string `json:"updated_at"`
	Completed   bool   `json:"completed"`
}

TusUpload represents a resumable upload in progress.

type UploadResult

type UploadResult struct {
	SHA256       string `json:"sha256"`
	SizeBytes    int64  `json:"size_bytes"`
	ChunkCount   int    `json:"chunk_count"`
	State        string `json:"state,omitempty"`
	Deduplicated bool   `json:"deduplicated"`
}

UploadResult holds the result of receiving a file upload.

func ReceiveFile

func ReceiveFile(r io.Reader, dossierID string, cfg *Config, store *Store) (*UploadResult, error)

ReceiveFile reads from r, streams directly to chunks via sas_chunker.SplitReader (no intermediate temp file), checks dedup, and records the piece in the DB. Manifest.json, Verify() and Assemble() all work out of the box.

type WebhookTarget

type WebhookTarget struct {
	Name          string `yaml:"name"`
	URL           string `yaml:"url"`
	AuthMode      string `yaml:"auth_mode"` // opaque_only | jwt_passthru
	Secret        string `yaml:"secret"`    // per-webhook secret (HMAC signing key)
	RequireReview bool   `yaml:"require_review"`
}

WebhookTarget configures a downstream webhook.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL