file

package
v1.0.0-alpha.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 17 Imported by: 0

README

File Input Component

File input component for reading JSONL/JSON files and publishing to NATS subjects.

Purpose

The file input component reads JSON Lines or JSON files from the filesystem and publishes each line or record as a message to NATS. It supports glob patterns for batch processing, rate-limited publishing, and continuous replay loops for test data generation and development scenarios.

Configuration

name: event-ingest
type: input
protocol: file
config:
  ports:
    outputs:
      - name: output
        type: jetstream
        subject: events.raw
        required: true
  path: /data/events/*.jsonl
  format: jsonl
  interval: 10ms
  loop: false
Configuration Fields
Field Type Required Default Description
path string Yes - File path or glob pattern (e.g., /data/*.jsonl)
format string No jsonl File format: jsonl or json
interval string No 10ms Delay between publishing lines (rate control)
loop boolean No false Continuously re-read files after completion
ports.outputs array Yes - Output port configuration for NATS publishing

Input/Output Ports

Input Ports
Port Type Description
file_source file File source reading from specified path pattern
Output Ports
Port Type Description
nats_output nats/jetstream NATS subject for publishing file lines

Supported File Formats

JSONL (JSON Lines)

One JSON object per line, no outer array. Each line is validated before publishing.

{"id": "1", "type": "event", "timestamp": "2024-01-15T10:00:00Z"}
{"id": "2", "type": "event", "timestamp": "2024-01-15T10:00:01Z"}
{"id": "3", "type": "event", "timestamp": "2024-01-15T10:00:02Z"}

Invalid lines are logged and skipped without stopping file processing.

JSON

Standard JSON format. For arrays, each element is published as a separate message.

[
  {"id": "1", "type": "event"},
  {"id": "2", "type": "event"}
]

Example Use Cases

Data Replay

Replay archived event data for testing or recovery scenarios.

config:
  path: /archive/events-2024-01-15.jsonl
  format: jsonl
  interval: 10ms
  loop: false
Continuous Test Data

Generate continuous test data for development and load testing.

config:
  path: /testdata/sample-events.jsonl
  format: jsonl
  interval: 100ms
  loop: true
Batch Import

Maximum throughput import of large datasets.

config:
  path: /import/batch-*.jsonl
  format: jsonl
  interval: 0
  loop: false
Multi-File Processing

Process multiple files matching a glob pattern.

config:
  path: /data/2024-01/*/*.jsonl
  format: jsonl
  interval: 5ms
  loop: false

Rate Control

The interval setting controls publishing rate to prevent overwhelming downstream consumers.

Interval Max Throughput Use Case
0 Unlimited Batch imports, maximum throughput
1ms ~1000 msg/s High-volume ingestion
10ms ~100 msg/s Moderate load testing
100ms ~10 msg/s Gentle replay, development

Observability

Prometheus Metrics
  • semstreams_file_input_lines_read_total - Total lines read from files
  • semstreams_file_input_lines_published_total - Lines successfully published to NATS
  • semstreams_file_input_bytes_read_total - Total bytes read
  • semstreams_file_input_parse_errors_total - JSON parse failures
  • semstreams_file_input_files_processed_total - Files completely processed
Health Status
health := input.Health()
// Healthy: true if component running
// ErrorCount: Parse/publish errors
// Uptime: Time since Start()
Data Flow Metrics
dataFlow := input.DataFlow()
// MessagesPerSecond: Publishing rate
// BytesPerSecond: Byte throughput
// ErrorRate: Error percentage

Lifecycle Management

// Initialize (validate path, check files exist)
if err := input.Initialize(); err != nil {
    log.Fatal(err)
}

// Start reading and publishing
if err := input.Start(ctx); err != nil {
    log.Fatal(err)
}

// Graceful shutdown with timeout
if err := input.Stop(5 * time.Second); err != nil {
    log.Warn(err)
}

Performance Characteristics

  • Throughput: 10,000+ lines/second (without interval delay)
  • Memory: O(1) per file (buffered line-by-line reading)
  • Buffer: 1MB initial, 10MB max per line
  • Context checks: Every 100 lines (responsive shutdown)

Error Handling

  • Invalid configuration: Returns error during initialization
  • No matching files: Returns error during initialization
  • Parse errors: Logged and skipped, processing continues
  • Publish errors: Logged, processing continues

Individual line errors do not stop file processing. File-level errors are logged but do not stop glob pattern processing.

Limitations

  • No compression support (gzip, zstd)
  • No offset tracking (always starts from beginning)
  • No file watching (new files require restart)
  • No parallel file processing
  • Maximum line length: 10MB

Documentation

Overview

Package file provides a file input component for reading JSONL/JSON files and publishing to NATS.

Overview

The file input component reads JSON Lines (JSONL) or JSON files and publishes each line/record as a message to NATS subjects. It supports glob patterns for reading multiple files, configurable delays between messages for rate control, and optional continuous looping for replay scenarios. It implements the StreamKit component interfaces for lifecycle management and observability.

Quick Start

Read JSONL files and publish to NATS:

config := file.Config{
    Ports: &component.PortConfig{
        Outputs: []component.PortDefinition{
            {Name: "output", Type: "nats", Subject: "events.ingest", Required: true},
        },
    },
    Path:     "/data/events/*.jsonl",
    Format:   "jsonl",
    Interval: "10ms",
    Loop:     false,
}

rawConfig, _ := json.Marshal(config)
input, err := file.CreateInput(rawConfig, deps)

Configuration

The Config struct controls file reading behavior:

  • Path: File path or glob pattern (required)
  • Format: File format - "jsonl" or "json" (default: "jsonl")
  • Interval: Delay between publishing lines (default: "10ms")
  • Loop: Continuously re-read files when complete (default: false)
  • Ports: Output port configuration for NATS publishing

File Formats

**JSONL (JSON Lines):**

One JSON object per line, no outer array:

{"id": "1", "type": "event", "data": "..."}
{"id": "2", "type": "event", "data": "..."}

Each line is validated as valid JSON before publishing. Invalid lines are logged and skipped without stopping processing.

**JSON:**

Standard JSON format (array or single object). For arrays, each element is published as a separate message.

Glob Patterns

The Path field supports standard glob patterns:

// Single file
Path: "/data/events.jsonl"

// All .jsonl files in directory
Path: "/data/*.jsonl"

// Recursive pattern (one level)
Path: "/data/2024-*/*.jsonl"

Files matching the pattern are processed in filesystem order. If the pattern matches no files at Initialize time, an error is returned.

Rate Control

The Interval setting controls publishing rate to prevent overwhelming downstream consumers:

Interval: "10ms"   // 100 messages/second max
Interval: "1ms"    // 1000 messages/second max
Interval: "0"      // No delay, maximum throughput

Typical use cases:

  • Replay scenarios: Match original event timing
  • Load testing: Control ingestion rate
  • Gentle startup: Avoid thundering herd on restart

Loop Mode

When Loop is true, the component continuously re-reads files after completion:

Loop: true
// Process all files
// Wait 1 second
// Process all files again
// Repeat until stopped

Useful for:

  • Continuous test data generation
  • Simulation scenarios
  • Development/debugging

NATS Publishing

The component supports both core NATS and JetStream publishing:

**Core NATS:**

Ports:
  Outputs:
    - Type: "nats"
      Subject: "events.raw"

**JetStream:**

Ports:
  Outputs:
    - Type: "jetstream"
      Subject: "events.raw"

JetStream publishing uses acknowledgments and ensures message durability.

Lifecycle Management

Proper component lifecycle with graceful shutdown:

// Initialize (validate path, check files exist)
input.Initialize()

// Start reading and publishing
input.Start(ctx)

// Graceful shutdown
input.Stop(5 * time.Second)

During shutdown:

  1. Signal shutdown via channel
  2. Wait for current file processing to complete
  3. Close all resources

Observability

The component implements component.Discoverable for monitoring:

meta := input.Meta()
// Name: file-input-{filename}
// Type: input
// Description: File input reading from {path}

health := input.Health()
// Healthy: true if component running
// ErrorCount: Parse/publish errors
// Uptime: Time since Start()

dataFlow := input.DataFlow()
// MessagesPerSecond: Publishing rate
// BytesPerSecond: Byte throughput
// ErrorRate: Error percentage

Prometheus metrics:

  • file_input_lines_read_total: Total lines read from files
  • file_input_lines_published_total: Lines successfully published
  • file_input_bytes_read_total: Total bytes read
  • file_input_parse_errors_total: JSON parse failures
  • file_input_files_processed_total: Files completely processed

Performance Characteristics

  • Throughput: 10,000+ lines/second (without interval delay)
  • Memory: O(1) per file (buffered line-by-line reading)
  • Buffer: 1MB initial, 10MB max per line
  • Context checks: Every 100 lines (prevents blocking on shutdown)

Error Handling

The component uses streamkit/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (empty path, invalid format)
  • Missing files: errs.WrapInvalid (no files match glob)
  • Parse errors: Logged and skipped (doesn't stop processing)
  • Publish errors: errs.WrapTransient (NATS unavailable)

Individual line errors don't stop file processing. File errors are logged but don't stop glob pattern processing.

Common Use Cases

**Data Replay:**

Path: "/archive/events-2024-01-15.jsonl"
Format: "jsonl"
Interval: "10ms"
Loop: false

**Continuous Test Data:**

Path: "/testdata/sample-events.jsonl"
Format: "jsonl"
Interval: "100ms"
Loop: true

**Batch Import:**

Path: "/import/*.jsonl"
Format: "jsonl"
Interval: "0"  // Max throughput
Loop: false

Thread Safety

The component is thread-safe:

  • Lifecycle operations protected by mutex
  • Metrics use atomic operations
  • Start/Stop can be called from any goroutine

Concurrency Patterns

The implementation uses standard Go concurrency patterns:

// Lifecycle mutex (separate from data mutex)
f.lifecycleMu.Lock()
defer f.lifecycleMu.Unlock()

// Graceful shutdown via channel
select {
case <-ctx.Done():
    return ctx.Err()
case <-f.shutdown:
    return nil
default:
}

// WaitGroup for goroutine tracking
f.wg.Add(1)
go f.readLoop(ctx)

Scanner Buffer Pool

Memory-efficient reading using pooled buffers:

scannerInitialBuffer = 1MB   // Initial allocation
scannerMaxBuffer     = 10MB  // Maximum line length

Buffers are pooled and reused across file reads, reducing GC pressure for high-throughput scenarios.

Testing

The package follows standard testing patterns:

go test ./input/file -v
go test ./input/file -race  // Race detector

Limitations

Current version limitations:

  • No compression support (gzip, zstd)
  • No offset tracking (always starts from beginning)
  • No file watching (new files require restart)
  • No parallel file processing
  • Single output subject only

Example: Complete Configuration

{
  "ports": {
    "outputs": [
      {"name": "output", "type": "jetstream", "subject": "events.raw", "required": true}
    ]
  },
  "path": "/data/events/*.jsonl",
  "format": "jsonl",
  "interval": "10ms",
  "loop": false
}

See Also

Related packages:

Package file provides a file input component for reading JSONL files and publishing to NATS

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateInput

func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateInput creates a file input component following service pattern

func Register

func Register(registry *component.Registry) error

Register registers the file input component with the given registry

Types

type Config

type Config struct {
	Ports    *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
	Path     string                `json:"path" schema:"type:string,description:File path or glob pattern,required:true"`
	Format   string                `json:"format" schema:"type:string,description:File format (jsonl or json),default:jsonl"`
	Interval string                `json:"interval" schema:"type:string,description:Delay between lines,default:10ms"`
	Loop     bool                  `json:"loop" schema:"type:boolean,description:Loop file when complete,default:false"`
}

Config holds configuration for file input component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate implements component.Validatable interface

type Input

type Input struct {
	// contains filtered or unexported fields
}

Input implements a file reader that publishes lines to NATS

func NewInput

func NewInput(deps InputDeps) *Input

NewInput creates a new file input component

func (*Input) ConfigSchema

func (f *Input) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Input) DataFlow

func (f *Input) DataFlow() component.FlowMetrics

DataFlow returns the current data flow metrics

func (*Input) Health

func (f *Input) Health() component.HealthStatus

Health returns the current health status

func (*Input) Initialize

func (f *Input) Initialize() error

Initialize prepares the file input component

func (*Input) InputPorts

func (f *Input) InputPorts() []component.Port

InputPorts returns the input ports for this component

func (*Input) Meta

func (f *Input) Meta() component.Metadata

Meta returns the component metadata

func (*Input) OutputPorts

func (f *Input) OutputPorts() []component.Port

OutputPorts returns the output ports for this component

func (*Input) Start

func (f *Input) Start(ctx context.Context) error

Start begins reading files and publishing to NATS

func (*Input) Stop

func (f *Input) Stop(timeout time.Duration) error

Stop gracefully shuts down the file input

type InputDeps

type InputDeps struct {
	Name            string
	Config          Config
	NATSClient      *natsclient.Client
	MetricsRegistry *metric.MetricsRegistry
	Logger          *slog.Logger
}

InputDeps holds runtime dependencies for file input component

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for file input component

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL