Documentation
¶
Overview ¶
Package file provides a file input component for reading JSONL/JSON files and publishing to NATS.
Overview ¶
The file input component reads JSON Lines (JSONL) or JSON files and publishes each line/record as a message to NATS subjects. It supports glob patterns for reading multiple files, configurable delays between messages for rate control, and optional continuous looping for replay scenarios. It implements the StreamKit component interfaces for lifecycle management and observability.
Quick Start ¶
Read JSONL files and publish to NATS:
config := file.Config{
Ports: &component.PortConfig{
Outputs: []component.PortDefinition{
{Name: "output", Type: "nats", Subject: "events.ingest", Required: true},
},
},
Path: "/data/events/*.jsonl",
Format: "jsonl",
Interval: "10ms",
Loop: false,
}
rawConfig, _ := json.Marshal(config)
input, err := file.CreateInput(rawConfig, deps)
Configuration ¶
The Config struct controls file reading behavior:
- Path: File path or glob pattern (required)
- Format: File format - "jsonl" or "json" (default: "jsonl")
- Interval: Delay between publishing lines (default: "10ms")
- Loop: Continuously re-read files when complete (default: false)
- Ports: Output port configuration for NATS publishing
File Formats ¶
**JSONL (JSON Lines):**
One JSON object per line, no outer array:
{"id": "1", "type": "event", "data": "..."}
{"id": "2", "type": "event", "data": "..."}
Each line is validated as valid JSON before publishing. Invalid lines are logged and skipped without stopping processing.
**JSON:**
Standard JSON format (array or single object). For arrays, each element is published as a separate message.
Glob Patterns ¶
The Path field supports standard glob patterns:
// Single file Path: "/data/events.jsonl" // All .jsonl files in directory Path: "/data/*.jsonl" // Recursive pattern (one level) Path: "/data/2024-*/*.jsonl"
Files matching the pattern are processed in filesystem order. If the pattern matches no files at Initialize time, an error is returned.
Rate Control ¶
The Interval setting controls publishing rate to prevent overwhelming downstream consumers:
Interval: "10ms" // 100 messages/second max Interval: "1ms" // 1000 messages/second max Interval: "0" // No delay, maximum throughput
Typical use cases:
- Replay scenarios: Match original event timing
- Load testing: Control ingestion rate
- Gentle startup: Avoid thundering herd on restart
Loop Mode ¶
When Loop is true, the component continuously re-reads files after completion:
Loop: true // Process all files // Wait 1 second // Process all files again // Repeat until stopped
Useful for:
- Continuous test data generation
- Simulation scenarios
- Development/debugging
NATS Publishing ¶
The component supports both core NATS and JetStream publishing:
**Core NATS:**
Ports:
Outputs:
- Type: "nats"
Subject: "events.raw"
**JetStream:**
Ports:
Outputs:
- Type: "jetstream"
Subject: "events.raw"
JetStream publishing uses acknowledgments and ensures message durability.
Lifecycle Management ¶
Proper component lifecycle with graceful shutdown:
// Initialize (validate path, check files exist) input.Initialize() // Start reading and publishing input.Start(ctx) // Graceful shutdown input.Stop(5 * time.Second)
During shutdown:
- Signal shutdown via channel
- Wait for current file processing to complete
- Close all resources
Observability ¶
The component implements component.Discoverable for monitoring:
meta := input.Meta()
// Name: file-input-{filename}
// Type: input
// Description: File input reading from {path}
health := input.Health()
// Healthy: true if component running
// ErrorCount: Parse/publish errors
// Uptime: Time since Start()
dataFlow := input.DataFlow()
// MessagesPerSecond: Publishing rate
// BytesPerSecond: Byte throughput
// ErrorRate: Error percentage
Prometheus metrics:
- file_input_lines_read_total: Total lines read from files
- file_input_lines_published_total: Lines successfully published
- file_input_bytes_read_total: Total bytes read
- file_input_parse_errors_total: JSON parse failures
- file_input_files_processed_total: Files completely processed
Performance Characteristics ¶
- Throughput: 10,000+ lines/second (without interval delay)
- Memory: O(1) per file (buffered line-by-line reading)
- Buffer: 1MB initial, 10MB max per line
- Context checks: Every 100 lines (prevents blocking on shutdown)
Error Handling ¶
The component uses streamkit/errors for consistent error classification:
- Invalid config: errs.WrapInvalid (empty path, invalid format)
- Missing files: errs.WrapInvalid (no files match glob)
- Parse errors: Logged and skipped (doesn't stop processing)
- Publish errors: errs.WrapTransient (NATS unavailable)
Individual line errors don't stop file processing. File errors are logged but don't stop glob pattern processing.
Common Use Cases ¶
**Data Replay:**
Path: "/archive/events-2024-01-15.jsonl" Format: "jsonl" Interval: "10ms" Loop: false
**Continuous Test Data:**
Path: "/testdata/sample-events.jsonl" Format: "jsonl" Interval: "100ms" Loop: true
**Batch Import:**
Path: "/import/*.jsonl" Format: "jsonl" Interval: "0" // Max throughput Loop: false
Thread Safety ¶
The component is thread-safe:
- Lifecycle operations protected by mutex
- Metrics use atomic operations
- Start/Stop can be called from any goroutine
Concurrency Patterns ¶
The implementation uses standard Go concurrency patterns:
// Lifecycle mutex (separate from data mutex)
f.lifecycleMu.Lock()
defer f.lifecycleMu.Unlock()
// Graceful shutdown via channel
select {
case <-ctx.Done():
return ctx.Err()
case <-f.shutdown:
return nil
default:
}
// WaitGroup for goroutine tracking
f.wg.Add(1)
go f.readLoop(ctx)
Scanner Buffer Pool ¶
Memory-efficient reading using pooled buffers:
scannerInitialBuffer = 1MB // Initial allocation scannerMaxBuffer = 10MB // Maximum line length
Buffers are pooled and reused across file reads, reducing GC pressure for high-throughput scenarios.
Testing ¶
The package follows standard testing patterns:
go test ./input/file -v go test ./input/file -race // Race detector
Limitations ¶
Current version limitations:
- No compression support (gzip, zstd)
- No offset tracking (always starts from beginning)
- No file watching (new files require restart)
- No parallel file processing
- Single output subject only
Example: Complete Configuration ¶
{
"ports": {
"outputs": [
{"name": "output", "type": "jetstream", "subject": "events.raw", "required": true}
]
},
"path": "/data/events/*.jsonl",
"format": "jsonl",
"interval": "10ms",
"loop": false
}
See Also ¶
Related packages:
- github.com/c360studio/semstreams/component: Component interfaces
- github.com/c360studio/semstreams/natsclient: NATS connection and publishing
- github.com/c360studio/semstreams/input/udp: UDP input component
- github.com/c360studio/semstreams/input/websocket: WebSocket input component
Package file provides a file input component for reading JSONL files and publishing to NATS
Index ¶
- func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type Config
- type Input
- func (f *Input) ConfigSchema() component.ConfigSchema
- func (f *Input) DataFlow() component.FlowMetrics
- func (f *Input) Health() component.HealthStatus
- func (f *Input) Initialize() error
- func (f *Input) InputPorts() []component.Port
- func (f *Input) Meta() component.Metadata
- func (f *Input) OutputPorts() []component.Port
- func (f *Input) Start(ctx context.Context) error
- func (f *Input) Stop(timeout time.Duration) error
- type InputDeps
- type Metrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateInput ¶
func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateInput creates a file input component following service pattern
Types ¶
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
Path string `json:"path" schema:"type:string,description:File path or glob pattern,required:true"`
Format string `json:"format" schema:"type:string,description:File format (jsonl or json),default:jsonl"`
Interval string `json:"interval" schema:"type:string,description:Delay between lines,default:10ms"`
Loop bool `json:"loop" schema:"type:boolean,description:Loop file when complete,default:false"`
}
Config holds configuration for file input component
type Input ¶
type Input struct {
// contains filtered or unexported fields
}
Input implements a file reader that publishes lines to NATS
func (*Input) ConfigSchema ¶
func (f *Input) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Input) DataFlow ¶
func (f *Input) DataFlow() component.FlowMetrics
DataFlow returns the current data flow metrics
func (*Input) Health ¶
func (f *Input) Health() component.HealthStatus
Health returns the current health status
func (*Input) Initialize ¶
Initialize prepares the file input component
func (*Input) InputPorts ¶
InputPorts returns the input ports for this component
func (*Input) OutputPorts ¶
OutputPorts returns the output ports for this component
type InputDeps ¶
type InputDeps struct {
Name string
Config Config
NATSClient *natsclient.Client
MetricsRegistry *metric.MetricsRegistry
Logger *slog.Logger
}
InputDeps holds runtime dependencies for file input component