consolestream

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

README

Console Stream

A Go library for executing processes and streaming their output in real-time using Go 1.23+ iterators.

Features

  • Real-time streaming: Process output delivered at configurable intervals or buffer limits
  • Unified API: Single process type supports both pipe and PTY modes
  • Event-driven: Process lifecycle events, output data, and heartbeat monitoring
  • Production ready: Context cancellation, error handling, and resource cleanup

Quick Start

package main

import (
    "context"
    "fmt"

    consolestream "github.com/wolfeidau/console-stream"
)

func main() {
    // Create and execute process
    process := consolestream.NewProcess("echo", []string{"Hello, World!"})

    ctx := context.Background()
    for event, err := range process.ExecuteAndStream(ctx) {
        if err != nil {
            fmt.Printf("Error: %v\n", err)
            break
        }

        switch e := event.Event.(type) {
        case *consolestream.OutputData:
            fmt.Printf("Output: %s", string(e.Data))
        case *consolestream.ProcessEnd:
            fmt.Printf("Process completed (Exit: %d)\n", e.ExitCode)
            return
        }
    }
}

API

Process Creation
// Default (pipe mode)
process := consolestream.NewProcess("command", []string{"arg1", "arg2"})

// PTY mode for interactive programs
process := consolestream.NewProcess("bash", []string{"-l"},
    consolestream.WithPTYMode())

// With configuration options
process := consolestream.NewProcess("npm", []string{"install"},
    consolestream.WithPTYMode(),
    consolestream.WithPTYSize(pty.Winsize{Rows: 24, Cols: 80}),
    consolestream.WithEnvVar("NODE_ENV", "production"),
    consolestream.WithFlushInterval(500*time.Millisecond))
Process Options
Option Description
WithPipeMode() Use standard pipes (default)
WithPTYMode() Use pseudo-terminal for interactive programs
WithPTYSize(size) Set terminal dimensions for PTY
WithCancellor(c) Custom process cancellation
WithEnvVar(k, v) Set environment variable
WithFlushInterval(d) Output flush frequency
WithMaxBufferSize(s) Buffer size limit
Event Types
Event Description
ProcessStart Process started with PID
OutputData Process output (combined stdout/stderr)
ProcessEnd Process completed with exit code
ProcessError Process execution error
HeartbeatEvent Keep-alive when process is silent
TerminalResizeEvent Terminal size change (PTY only)

Use Cases

Pipe Mode - Use for:

  • Build tools, tests, data processing
  • Simple command execution
  • When you need performance

PTY Mode - Use for:

  • Interactive applications (shells, editors)
  • Programs with progress bars or colors
  • Terminal session recording

Asciicast Recording

Record terminal sessions in asciicast v3 format:

// Record PTY session
process := consolestream.NewProcess("bash", []string{"-l"},
    consolestream.WithPTYMode())

metadata := consolestream.AscicastV3Metadata{
    Term: consolestream.TermInfo{Cols: 80, Rows: 24},
    Command: "bash -l",
}

events := process.ExecuteAndStream(ctx)
asciicast := consolestream.ToAscicastV3(events, metadata)

// Write to file
file, _ := os.Create("session.cast")
defer file.Close()

for line, err := range asciicast {
    if err != nil { break }
    data, _ := json.Marshal(line)
    file.Write(append(data, '\n'))
}

Command Line Tool

Build and use the runner tool:

# Build
go build -o runner ./cmd/runner

# Execute with YAML config
./runner exec --config task.yaml --format json --output results.json

Example config:

command: "docker"
args: ["build", "-t", "myapp", "."]
process_type: "pty"
timeout: "10m"
env:
  DOCKER_BUILDKIT: "1"

Installation

go get github.com/wolfeidau/console-stream

Requirements: Go 1.25+, Unix-like system

License

Apache License, Version 2.0 - Copyright Mark Wolfe

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsHeartbeatEvent

func IsHeartbeatEvent(event StreamEvent) bool

func IsOutputEvent added in v0.3.0

func IsOutputEvent(event StreamEvent) bool

func IsProcessEndEvent

func IsProcessEndEvent(event StreamEvent) bool

func IsProcessErrorEvent

func IsProcessErrorEvent(event StreamEvent) bool

func IsProcessLifecycleEvent

func IsProcessLifecycleEvent(event StreamEvent) bool

func IsProcessStartEvent

func IsProcessStartEvent(event StreamEvent) bool

func IsTerminalResizeEvent

func IsTerminalResizeEvent(event StreamEvent) bool

func ToAscicastV3

func ToAscicastV3(events iter.Seq2[Event, error], metadata AscicastV3Metadata) iter.Seq2[AscicastV3Line, error]

ToAscicastV3 transforms a stream of console-stream events into asciicast v3 format

func WriteAscicastV3

func WriteAscicastV3(events iter.Seq2[Event, error], metadata AscicastV3Metadata, writeFunc func([]byte) error) error

WriteAscicastV3 is a convenience function to write asciicast v3 to any writer

Types

type AscicastV3Event

type AscicastV3Event [3]any

AscicastV3Event represents an event line in asciicast v3 format: [interval, code, data]

func NewExitEvent

func NewExitEvent(interval float64, exitCode int) AscicastV3Event

NewExitEvent creates an asciicast exit event

func NewOutputEvent

func NewOutputEvent(interval float64, data string) AscicastV3Event

NewOutputEvent creates an asciicast output event

func NewResizeEvent

func NewResizeEvent(interval float64, cols, rows uint16) AscicastV3Event

NewResizeEvent creates an asciicast resize event

func (AscicastV3Event) MarshalJSON

func (e AscicastV3Event) MarshalJSON() ([]byte, error)

MarshalJSON implements AscicastV3Line interface for events

type AscicastV3Header

type AscicastV3Header struct {
	Version int `json:"version"`
	AscicastV3Metadata
}

AscicastV3Header represents the first line of an asciicast v3 file

func (AscicastV3Header) MarshalJSON

func (h AscicastV3Header) MarshalJSON() ([]byte, error)

MarshalJSON implements AscicastV3Line interface for headers

type AscicastV3Line

type AscicastV3Line interface {
	MarshalJSON() ([]byte, error)
}

AscicastV3Line represents either a header or event line in asciicast v3 format

type AscicastV3Metadata

type AscicastV3Metadata struct {
	Term      TermInfo          `json:"term"`
	Timestamp *int64            `json:"timestamp,omitempty"`
	Command   string            `json:"command,omitempty"`
	Title     string            `json:"title,omitempty"`
	Env       map[string]string `json:"env,omitempty"`
	Tags      []string          `json:"tags,omitempty"`
}

AscicastV3Metadata contains the metadata for the asciicast header

type BufferWriter added in v0.2.0

type BufferWriter struct {
	// contains filtered or unexported fields
}

BufferWriter wraps buffer operations and implements io.WriteCloser This can be used by both PipeProcess and PTYProcess for consistent buffer management

func NewBufferWriter added in v0.2.0

func NewBufferWriter(flushChan chan<- struct{}, maxBufferSize int) *BufferWriter

func (*BufferWriter) Close added in v0.2.0

func (w *BufferWriter) Close() error

func (*BufferWriter) FlushAndClear added in v0.2.0

func (w *BufferWriter) FlushAndClear() []byte

FlushAndClear returns the current buffer contents and clears it

func (*BufferWriter) Len added in v0.2.0

func (w *BufferWriter) Len() int

Len returns the current buffer length (thread-safe)

func (*BufferWriter) Write added in v0.2.0

func (w *BufferWriter) Write(data []byte) (int, error)

type Cancellor

type Cancellor interface {
	Cancel(ctx context.Context, pid int) error
}

Cancellor defines the interface for cancelling processes

type Event

type Event struct {
	Timestamp time.Time
	Event     StreamEvent
}

func (Event) EventType

func (e Event) EventType() EventType

EventType returns the type of the event contained in this Event

func (Event) String

func (e Event) String() string

String returns a human-readable representation of the Event

type EventType

type EventType int

EventType defines the different kinds of events

const (
	ProcessStartEvent EventType = iota
	ProcessEndEvent
	ProcessErrorEvent
	HeartbeatEventType
	OutputEvent
	TerminalResizeEventType
)

func (EventType) String

func (e EventType) String() string

type HeartbeatEvent

type HeartbeatEvent struct {
	ProcessAlive bool
	ElapsedTime  time.Duration
}

HeartbeatEvent represents a keep-alive signal when no output is generated

func (*HeartbeatEvent) String

func (h *HeartbeatEvent) String() string

func (*HeartbeatEvent) Type

func (h *HeartbeatEvent) Type() EventType

type LocalCancellor

type LocalCancellor struct {
	// contains filtered or unexported fields
}

LocalCancellor implements process cancellation for local processes

func NewLocalCancellor

func NewLocalCancellor(terminateTimeout time.Duration) *LocalCancellor

func (*LocalCancellor) Cancel

func (lc *LocalCancellor) Cancel(ctx context.Context, pid int) error

type OutputData added in v0.3.0

type OutputData struct {
	Data []byte
}

OutputData represents output data from the process (combined stdout/stderr)

func (*OutputData) String added in v0.3.0

func (o *OutputData) String() string

func (*OutputData) Type added in v0.3.0

func (o *OutputData) Type() EventType

type PipeProcessFailedError

type PipeProcessFailedError struct {
	Cmd      string
	ExitCode int
}

func (PipeProcessFailedError) Error

func (e PipeProcessFailedError) Error() string

type PipeProcessKilledError

type PipeProcessKilledError struct {
	Cmd    string
	Signal string
}

func (PipeProcessKilledError) Error

func (e PipeProcessKilledError) Error() string

type Process added in v0.3.0

type Process struct {
	// contains filtered or unexported fields
}

Process represents a unified process that can run in either PTY or pipe mode

func NewProcess added in v0.3.0

func NewProcess(cmd string, args []string, opts ...ProcessOption) *Process

NewProcess creates a new unified process with functional options

func (*Process) ExecuteAndStream added in v0.3.0

func (p *Process) ExecuteAndStream(ctx context.Context) iter.Seq2[Event, error]

ExecuteAndStream starts a process and returns an iterator over Event objects

func (*Process) GetStats added in v0.3.0

func (p *Process) GetStats() ProcessStatsSnapshot

GetStats returns a copy of the current process statistics

type ProcessEnd

type ProcessEnd struct {
	ExitCode int
	Duration time.Duration
	Success  bool
}

ProcessEnd represents the completion of process execution

func (*ProcessEnd) String

func (p *ProcessEnd) String() string

func (*ProcessEnd) Type

func (p *ProcessEnd) Type() EventType

type ProcessError

type ProcessError struct {
	Error   error
	Message string
}

ProcessError represents an error during process execution

func (*ProcessError) String

func (p *ProcessError) String() string

func (*ProcessError) Type

func (p *ProcessError) Type() EventType

type ProcessInfo added in v0.2.1

type ProcessInfo struct {
	Command string
	Args    []string
	PID     int
}

ProcessInfo contains process information for metrics attributes

type ProcessMode added in v0.3.0

type ProcessMode int

processConfig holds configuration options for processes ProcessMode defines whether to use PTY or pipe mode for process execution

const (
	PipeMode ProcessMode = iota
	PTYMode
)

func (ProcessMode) String added in v0.3.0

func (m ProcessMode) String() string

type ProcessOption

type ProcessOption func(*processConfig)

ProcessOption represents a functional option for configuring processes

func WithCancellor

func WithCancellor(cancellor Cancellor) ProcessOption

WithCancellor sets a custom cancellor for the process

func WithEnv

func WithEnv(env []string) ProcessOption

WithEnv sets environment variables as a slice of "KEY=value" strings

func WithEnvMap

func WithEnvMap(envMap map[string]string) ProcessOption

WithEnvMap sets environment variables from a map for convenience

func WithEnvVar

func WithEnvVar(key, value string) ProcessOption

WithEnvVar sets a single environment variable

func WithFlushInterval

func WithFlushInterval(interval time.Duration) ProcessOption

WithFlushInterval sets how often to flush output buffers

func WithMaxBufferSize

func WithMaxBufferSize(size int) ProcessOption

WithMaxBufferSize sets the maximum buffer size before forcing a flush

func WithMeter added in v0.2.1

func WithMeter(meter any) ProcessOption

WithMeter sets an OpenTelemetry meter for metrics collection

func WithMetricsInterval added in v0.2.1

func WithMetricsInterval(interval time.Duration) ProcessOption

WithMetricsInterval sets how often to aggregate metrics into buckets

func WithPTYMode added in v0.3.0

func WithPTYMode() ProcessOption

WithPTYMode sets the process to use a pseudo-terminal

func WithPTYSize

func WithPTYSize(size any) ProcessOption

WithPTYSize sets a custom terminal size for PTY processes

func WithPipeMode added in v0.3.0

func WithPipeMode() ProcessOption

WithPipeMode sets the process to use standard pipes (stdout/stderr)

type ProcessStart

type ProcessStart struct {
	PID     int
	Command string
	Args    []string
}

ProcessStart represents the beginning of process execution

func (*ProcessStart) String

func (p *ProcessStart) String() string

func (*ProcessStart) Type

func (p *ProcessStart) Type() EventType

type ProcessStartError

type ProcessStartError struct {
	Cmd string
	Err error
}

Error types for different process failure scenarios

func (ProcessStartError) Error

func (e ProcessStartError) Error() string

type ProcessStats added in v0.3.0

type ProcessStats struct {
	// contains filtered or unexported fields
}

ProcessStats manages comprehensive metrics during process execution

func NewProcessStats added in v0.3.0

func NewProcessStats(meter metric.Meter, interval time.Duration, processInfo ProcessInfo) (*ProcessStats, error)

NewProcessStats creates a new ProcessStats instance with OpenTelemetry metrics

func (*ProcessStats) GetSnapshot added in v0.3.0

func (s *ProcessStats) GetSnapshot() ProcessStatsSnapshot

GetSnapshot returns a thread-safe snapshot of the current statistics

func (*ProcessStats) RecordError added in v0.3.0

func (s *ProcessStats) RecordError(ctx context.Context, eventType string)

RecordError increments the error count

func (*ProcessStats) RecordEvent added in v0.3.0

func (s *ProcessStats) RecordEvent(ctx context.Context, eventType string)

RecordEvent records a general event

func (*ProcessStats) RecordOutput added in v0.3.0

func (s *ProcessStats) RecordOutput(ctx context.Context, dataSize int64)

RecordOutput records output data and metrics

func (*ProcessStats) SetExitCode added in v0.3.0

func (s *ProcessStats) SetExitCode(code int)

SetExitCode sets the process exit code

func (*ProcessStats) Start added in v0.3.0

func (s *ProcessStats) Start(ctx context.Context)

Start initializes the stats and records process start

func (*ProcessStats) Stop added in v0.3.0

func (s *ProcessStats) Stop(ctx context.Context)

Stop finalizes the stats and records process stop

type ProcessStatsSnapshot added in v0.3.0

type ProcessStatsSnapshot struct {
	// Basic execution stats
	StartTime  time.Time
	EventCount int
	ErrorCount int
	ExitCode   *int
	Duration   time.Duration
	// contains filtered or unexported fields
}

ProcessStatsSnapshot represents a point-in-time view of process statistics

type StreamEvent

type StreamEvent interface {
	Type() EventType
	String() string
}

StreamEvent represents different types of events that can occur during process execution

type TermInfo

type TermInfo struct {
	Cols int `json:"cols"`
	Rows int `json:"rows"`
}

TermInfo represents terminal dimensions for asciicast header

type TerminalResizeEvent

type TerminalResizeEvent struct {
	Rows uint16
	Cols uint16
	X    uint16 // Width in pixels
	Y    uint16 // Height in pixels
}

TerminalResizeEvent represents a terminal window size change

func (*TerminalResizeEvent) String

func (t *TerminalResizeEvent) String() string

func (*TerminalResizeEvent) Type

func (t *TerminalResizeEvent) Type() EventType

Directories

Path Synopsis
cmd
runner command
Package main provides a pretty slog handler for human-readable colored terminal output.
Package main provides a pretty slog handler for human-readable colored terminal output.
tester command
example
asciicast command
buffering command
burst command
logging command
pty command
simple command
stream command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL