file

package
v1.0.0-alpha.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 16 Imported by: 0

README

File Output Component

Writes NATS messages to disk files in multiple formats with buffered I/O and automatic flushing.

Purpose

The file output component persists NATS message streams to disk files in JSON, JSONL, or raw formats. It provides buffered writes with configurable batch sizes for performance, supporting both NATS core and JetStream subscriptions. Files can be opened in append or overwrite mode for logging, archiving, and data export workflows.

Configuration

component:
  type: file
  config:
    ports:
      inputs:
        - name: nats_input
          type: nats
          subject: "events.>"
          required: true
    directory: "/var/log/semstreams"
    file_prefix: "events"
    format: "jsonl"
    append: true
    buffer_size: 100
Configuration Fields
Field Type Default Description
ports.inputs array required NATS or JetStream input port definitions
directory string /tmp/streamkit Output directory path (created if missing)
file_prefix string output Filename prefix before extension
format enum jsonl Output format: json, jsonl, raw
append bool true Append to existing file vs overwrite
buffer_size int 100 Messages to buffer before flushing

Input/Output Ports

Input Ports

The component accepts both NATS core and JetStream subscriptions:

NATS Core Subscription:

inputs:
  - name: events_input
    type: nats
    subject: "app.events.>"
    required: true

JetStream Consumer:

inputs:
  - name: stream_input
    type: jetstream
    subject: "orders.*"
    stream_name: "ORDERS"
    required: true

Multiple input ports can be configured. All inputs write to the same output file.

Output Ports

File output components have no NATS output ports. Data flows terminate at the filesystem.

File Formats

JSON Lines (jsonl)

One JSON object per line, optimized for streaming and log processing:

{"timestamp": "2026-02-11T10:00:00Z", "level": "info", "message": "user login"}
{"timestamp": "2026-02-11T10:00:01Z", "level": "warn", "message": "rate limit"}

Best for: Log aggregation, event streams, line-oriented tools (grep, awk)

Pretty JSON (json)

Indented JSON with newlines, human-readable:

{
  "timestamp": "2026-02-11T10:00:00Z",
  "level": "info",
  "message": "user login"
}
{
  "timestamp": "2026-02-11T10:00:01Z",
  "level": "warn",
  "message": "rate limit"
}

Best for: Manual inspection, debugging, configuration export

Raw (raw)

Binary message data written directly without formatting:

<raw bytes><raw bytes><raw bytes>

Best for: Binary protocols, compact storage, non-JSON data

File Naming and Rotation

File Naming

Files are named using the pattern: {file_prefix}.{format}

Examples:

  • directory: /var/log, file_prefix: events, format: jsonl/var/log/events.jsonl
  • directory: /data/export, file_prefix: sensors, format: json/data/export/sensors.json
Rotation Strategy

The component does not include built-in file rotation. Use external tools:

Using logrotate:

/var/log/semstreams/*.jsonl {
    daily
    rotate 7
    compress
    delaycompress
    missingok
    notifempty
    copytruncate
}

Manual rotation: Stop component, move file, restart component. With append: true, the component creates a new file on restart.

Buffering and Flushing

The component buffers messages in memory before writing to disk:

  1. Messages accumulate in buffer (size: buffer_size)
  2. Flush triggers when buffer is full or every 1 second (automatic)
  3. Graceful shutdown flushes remaining buffer before closing file

Performance considerations:

  • Larger buffer_size improves throughput but increases memory usage
  • Smaller buffer_size reduces latency but increases disk I/O
  • Automatic 1-second flush prevents unbounded latency for low-volume streams

Example Use Cases

Application Event Logging
component:
  type: file
  config:
    ports:
      inputs:
        - name: app_logs
          type: nats
          subject: "app.logs.>"
    directory: "/var/log/app"
    file_prefix: "events"
    format: "jsonl"
    append: true
    buffer_size: 200

Captures all application log events to a JSON Lines file with 200-message buffering.

Sensor Data Export
component:
  type: file
  config:
    ports:
      inputs:
        - name: sensors
          type: jetstream
          subject: "sensors.temperature.*"
          stream_name: "SENSORS"
    directory: "/data/export"
    file_prefix: "temperature-readings"
    format: "json"
    append: false
    buffer_size: 50

Exports temperature sensor data to human-readable JSON, overwriting file on each run.

Binary Protocol Archive
component:
  type: file
  config:
    ports:
      inputs:
        - name: protocol_stream
          type: nats
          subject: "protocol.frames"
    directory: "/archive"
    file_prefix: "protocol-capture"
    format: "raw"
    append: true
    buffer_size: 500

Archives raw binary protocol frames to disk for later analysis or replay.

Multi-Subject Log Aggregation
component:
  type: file
  config:
    ports:
      inputs:
        - name: errors
          type: nats
          subject: "logs.error"
        - name: warnings
          type: nats
          subject: "logs.warn"
        - name: info
          type: nats
          subject: "logs.info"
    directory: "/var/log/aggregated"
    file_prefix: "all-logs"
    format: "jsonl"
    append: true
    buffer_size: 300

Aggregates multiple log levels into a single chronological file.

Documentation

Overview

Package file provides a file output component for writing messages to files.

Overview

The file output component writes incoming NATS messages to files on disk, with support for multiple file formats, automatic flushing, and file rotation. It implements the StreamKit component interfaces for lifecycle management and observability.

Quick Start

Write messages to a JSON lines file:

config := file.Config{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "data.>", Required: true},
        },
    },
    Path:         "/var/log/streamkit/messages.jsonl",
    Format:       "jsonlines",
    FlushInterval: 5 * time.Second,
}

rawConfig, _ := json.Marshal(config)
output, err := file.NewOutput(rawConfig, deps)

Configuration

The FileOutputConfig struct controls file writing behavior:

  • Path: Filesystem path to write to
  • Format: Output format ("jsonlines", "raw", "csv")
  • FlushInterval: How often to flush buffered writes (default: 5s)
  • Append: Append to existing file vs overwrite (default: true)

File Formats

**JSON Lines** (recommended for structured data):

Format: "jsonlines"

// Each message written as single JSON line
{"timestamp": "2024-01-01T00:00:00Z", "value": 42}
{"timestamp": "2024-01-01T00:00:01Z", "value": 43}

**Raw** (binary or text data):

Format: "raw"

// Message bytes written directly to file
// One message per line

**CSV** (structured data as comma-separated values):

Format: "csv"

// Requires JSON messages with consistent fields
timestamp,value
2024-01-01T00:00:00Z,42
2024-01-01T00:00:01Z,43

Buffering and Flushing

The component uses buffered writes with configurable flush intervals:

FlushInterval: 5 * time.Second  // Flush every 5 seconds

Flushing behavior:

  1. Automatic flush every FlushInterval
  2. Flush on Stop() for graceful shutdown
  3. OS-level buffering may add additional delay

Message Flow

NATS Subject → Message Handler → File Buffer → Periodic Flush → Disk

Lifecycle Management

Proper file handle management with graceful shutdown:

// Start writing
output.Start(ctx)

// Graceful shutdown with flush
output.Stop(5 * time.Second)

During shutdown:

  1. Stop accepting new messages
  2. Flush buffered data to disk
  3. Close file handle
  4. Wait for flush goroutine to complete

Observability

The component implements component.Discoverable for monitoring:

meta := output.Meta()
// Name: file-output
// Type: output
// Description: File writer output

health := output.Health()
// Healthy: true if file writable
// ErrorCount: Write errors
// Uptime: Time since Start()

dataFlow := output.DataFlow()
// MessagesPerSecond: Write rate
// BytesPerSecond: Byte throughput
// ErrorRate: Error percentage

Performance Characteristics

  • Throughput: 10,000+ messages/second (buffered)
  • Memory: O(buffer size) + per-message allocations
  • Latency: FlushInterval (default 5s)
  • Disk I/O: Batched via OS buffer

Error Handling

The component uses streamkit/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad configuration)
  • File errors: errs.WrapTransient (disk full, permissions)
  • NATS errors: errs.WrapTransient (connection issues)

Write errors are logged and counted but don't stop the component.

Common Use Cases

**Application Logging:**

Path: "/var/log/app/events.jsonl"
Format: "jsonlines"
FlushInterval: 10 * time.Second

**Data Export:**

Path: "/data/export/sensors.csv"
Format: "csv"
FlushInterval: 1 * time.Minute

**Archive Stream:**

Path: "/archive/raw-stream.dat"
Format: "raw"
Append: true

Thread Safety

The component is fully thread-safe:

  • File writes protected by sync.Mutex
  • Start/Stop can be called from any goroutine
  • Metrics updates use atomic operations

File Rotation

Current version does not include built-in file rotation. Recommended approaches:

  • Use logrotate or similar system tool
  • Stop component, rotate file, restart component
  • External rotation with Append: true works safely

Testing

The package includes test coverage:

  • Unit tests: Config validation, format handling
  • File I/O tests: Real filesystem writes

Run tests:

go test ./output/file -v

Limitations

Current version limitations:

  • No built-in file rotation
  • No compression (gzip, etc.)
  • CSV format requires consistent JSON schema
  • Single file per component instance

Example: Complete Configuration

{
  "ports": {
    "inputs": [
      {"name": "input", "type": "nats", "subject": "logs.>", "required": true}
    ]
  },
  "path": "/var/log/streamkit/messages.jsonl",
  "format": "jsonlines",
  "flush_interval": "5s",
  "append": true
}

Package file provides file output component for writing messages to files

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewOutput

func NewOutput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewOutput creates a new file output from configuration

func Register

func Register(registry *component.Registry) error

Register registers the file output component with the given registry

Types

type Config

type Config struct {
	Ports      *component.PortConfig `json:"ports"       schema:"type:ports,description:Port configuration,category:basic"`
	Directory  string                `json:"directory"   schema:"type:string,description:Output directory,category:basic"`
	FilePrefix string                `json:"file_prefix" schema:"type:string,description:Prefix,category:basic"`
	Format     string                `json:"format"      schema:"type:enum,enum:json|jsonl|raw,category:basic"`
	Append     bool                  `json:"append"      schema:"type:bool,description:Append mode,category:advanced"`
	BufferSize int                   `json:"buffer_size" schema:"type:int,description:Buffer size,category:advanced"`
}

Config holds configuration for file output component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for file output

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type Output

type Output struct {
	// contains filtered or unexported fields
}

Output implements file writing for NATS messages

func (*Output) ConfigSchema

func (f *Output) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Output) DataFlow

func (f *Output) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Output) Health

func (f *Output) Health() component.HealthStatus

Health returns the current health status

func (*Output) Initialize

func (f *Output) Initialize() error

Initialize prepares the output (creates directory)

func (*Output) InputPorts

func (f *Output) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Output) Meta

func (f *Output) Meta() component.Metadata

Meta returns component metadata

func (*Output) OutputPorts

func (f *Output) OutputPorts() []component.Port

OutputPorts returns configured output port definitions (none for file output)

func (*Output) Start

func (f *Output) Start(ctx context.Context) error

Start begins writing messages to files

func (*Output) Stop

func (f *Output) Stop(timeout time.Duration) error

Stop gracefully stops the output

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL