consolestream

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

README

Console Stream

Problem: You need to execute external processes and monitor their complete lifecycle in real-time without blocking, buffering issues, or complex event management.

Solution: A Go library that streams process events using Go 1.23+ iterators with automatic buffering, heartbeat monitoring, and comprehensive lifecycle tracking.

What You Get

  • Event-driven architecture: Process lifecycle events, output data, and heartbeat monitoring
  • Real-time streaming: Output delivered at configurable intervals (default 1 second) or buffer limits (default 10MB)
  • Keep-alive detection: Heartbeat events when processes are running but silent
  • Rich process lifecycle: Start/end events with PIDs, duration, exit codes
  • Clean iteration: Standard Go for range loops over all process events
  • Smart buffering: Automatic flushing prevents memory issues and ensures responsiveness
  • Robust cancellation: Context-aware with pluggable termination strategies
  • Production ready: Comprehensive error handling and resource cleanup

Functional Options API

Console Stream uses a functional options pattern for clean, extensible configuration:

// Default configuration (uses built-in 5-second timeout cancellor)
process := consolestream.NewPipeProcess("echo", []string{"hello"})

// Available functional options:
cancellor := consolestream.NewLocalCancellor(10 * time.Second)
process := consolestream.NewPipeProcess("echo", []string{"hello"},
    // Process control
    consolestream.WithCancellor(cancellor),                        // Custom cancellation

    // Environment variables
    consolestream.WithEnvVar("MY_VAR", "value"),                   // Single variable
    consolestream.WithEnv([]string{"PATH=/usr/bin", "HOME=/tmp"}), // Variable slice
    consolestream.WithEnvMap(map[string]string{                    // Variable map
        "API_KEY": "secret",
        "DEBUG":   "true",
    }),

    // Buffer configuration
    consolestream.WithFlushInterval(500*time.Millisecond), // How often to flush
    consolestream.WithMaxBufferSize(5*1024*1024))         // 5MB buffer limit

// PTY-specific options
size := pty.Winsize{Rows: 24, Cols: 80}
ptyProcess := consolestream.NewPTYProcess("bash", []string{"-l"},
    consolestream.WithCancellor(cancellor),
    consolestream.WithPTYSize(size),                // Terminal dimensions
    consolestream.WithEnvVar("TERM", "xterm-256color"))

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    consolestream "github.com/wolfeidau/console-stream"
)

func main() {
    // Create process with functional options for configuration
    cancellor := consolestream.NewLocalCancellor(5 * time.Second)
    process := consolestream.NewPipeProcess("ping", []string{"-c", "5", "google.com"},
        consolestream.WithCancellor(cancellor))

    // Stream events in real-time
    ctx := context.Background()
    for part, err := range process.ExecuteAndStream(ctx) {
        if err != nil {
            fmt.Printf("Stream error: %v\n", err)
            break
        }

        switch event := part.Event.(type) {
        case *consolestream.PipeOutputData:
            fmt.Printf("[%s] %s", event.Stream.String(), string(event.Data))
        case *consolestream.ProcessStart:
            fmt.Printf("Process started (PID: %d)\n", event.PID)
        case *consolestream.ProcessEnd:
            fmt.Printf("Process completed (Exit: %d, Duration: %v)\n", event.ExitCode, event.Duration)
        case *consolestream.HeartbeatEvent:
            fmt.Printf("Process running... (Elapsed: %v)\n", event.ElapsedTime)
        }
    }
}

PTY Support for Interactive Programs

When to Use PTY vs Regular Processes

Use PTY (NewPTYProcess) when:

  • Running interactive terminal applications (editors, shells)
  • Capturing programs with progress bars, colors, or ANSI escape sequences
  • Working with CLI tools that detect TTY presence and change behavior
  • Need to preserve terminal formatting and control sequences

Use Regular Process (NewPipeProcess) when:

  • Simple data pipeline between processes
  • Performance is critical (PTY has slight overhead)
  • You need separate stdout/stderr streams
PTY Examples
// Basic PTY usage - preserves colors and formatting
ptyProcess := consolestream.NewPTYProcess("npm", []string{"install"})
for part, err := range ptyProcess.ExecuteAndStream(ctx) {
    switch event := part.Event.(type) {
    case *consolestream.PTYOutputData:
        // Raw terminal output with ANSI escape sequences preserved
        handleTerminalOutput(event.Data, part.Timestamp)
    case *consolestream.ProcessEnd:
        logInstallCompletion(event.ExitCode, event.Duration)
    }
}

// PTY with specific terminal size and environment variables
size := pty.Winsize{Rows: 24, Cols: 80}
ptyProcess := consolestream.NewPTYProcess("top", []string{"-n", "1"},
    consolestream.WithPTYSize(size),
    consolestream.WithEnvVar("TERM", "xterm-256color"))

Use Cases

Build System Integration

Monitor compiler output, test results, or deployment logs with comprehensive lifecycle tracking:

process := consolestream.NewPipeProcess("go", []string{"test", "-v", "./..."},
    consolestream.WithEnvVar("CGO_ENABLED", "0"),
    consolestream.WithFlushInterval(500*time.Millisecond)) // Faster feedback for tests

for part, err := range process.ExecuteAndStream(ctx) {
    switch event := part.Event.(type) {
    case *consolestream.PipeOutputData:
        logTestProgress(event.Data, part.Timestamp)
    case *consolestream.ProcessEnd:
        logBuildCompletion(event.ExitCode, event.Duration)
    case *consolestream.HeartbeatEvent:
        updateProgressIndicator(event.ElapsedTime)
    }
}
Long-Running Process Monitoring

Monitor services, databases, or data processing jobs with keep-alive detection:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()

process := consolestream.NewPipeProcess("docker", []string{"logs", "-f", "my-service"})
lastActivity := time.Now()

for part, err := range process.ExecuteAndStream(ctx) {
    switch event := part.Event.(type) {
    case *consolestream.PipeOutputData:
        lastActivity = part.Timestamp
        if containsError(event.Data) {
            alertOnError(event.Data)
        }
    case *consolestream.HeartbeatEvent:
        // Process is alive but silent - check for stalls
        if time.Since(lastActivity) > 5*time.Minute {
            alertOnSilentProcess(event.ElapsedTime)
        }
    case *consolestream.ProcessEnd:
        if !event.Success {
            alertOnProcessFailure(event.ExitCode)
        }
    }
}
Interactive Terminal Applications

Monitor CLI tools with progress bars, colors, and interactive elements:

// Run interactive installer with PTY to capture progress bars and colors
ptyProcess := consolestream.NewPTYProcess("npm", []string{"install", "--progress"})
for part, err := range ptyProcess.ExecuteAndStream(ctx) {
    switch event := part.Event.(type) {
    case *consolestream.PTYOutputData:
        // Output contains ANSI escape sequences for colors and progress bars
        logInteractiveOutput(event.Data)
        if containsProgressBar(event.Data) {
            updateProgressDisplay(event.Data)
        }
    case *consolestream.ProcessEnd:
        if event.Success {
            logInstallSuccess(event.Duration)
        } else {
            handleInstallFailure(event.ExitCode)
        }
    case *consolestream.HeartbeatEvent:
        showInstallProgress(event.ElapsedTime)
    }
}
CI/CD Pipeline Steps

Execute deployment commands with comprehensive monitoring and real-time feedback:

process := consolestream.NewPipeProcess("kubectl", []string{"apply", "-f", "deployment.yaml"})
for part, err := range process.ExecuteAndStream(ctx) {
    switch event := part.Event.(type) {
    case *consolestream.PipeOutputData:
        updateDeploymentStatus(event.Data)
    case *consolestream.ProcessStart:
        logDeploymentStart(event.PID)
    case *consolestream.ProcessEnd:
        if event.Success {
            logDeploymentSuccess(event.Duration)
        } else {
            handleDeploymentFailure(event.ExitCode)
        }
    case *consolestream.HeartbeatEvent:
        showDeploymentProgress(event.ElapsedTime)
    }
}

How It Works

  1. Process Execution: Starts your command with separate stdout/stderr pipes
  2. Event Generation: Emits ProcessStart event with PID and command details
  3. Concurrent Reading: Background goroutines read from both streams into buffers
  4. Smart Flushing: Buffers flush at configurable intervals (default 1 second) OR when they reach configurable size (default 10MB) as PipeOutputData events
  5. Heartbeat Monitoring: Emits HeartbeatEvent at flush intervals when no output occurs
  6. Lifecycle Tracking: Emits ProcessEnd event with exit code, duration, and success status
  7. Iterator Protocol: Uses Go 1.23+ iter.Seq2[Event, error] for clean event consumption
  8. Resource Management: Automatic cleanup of pipes, processes, and goroutines

What Can Go Wrong?

Process Failures
  • Non-zero exit codes: Wrapped in ProcessFailedError with exit code
  • Killed processes: Wrapped in ProcessKilledError with signal info
  • Startup failures: Wrapped in ProcessStartError with underlying error
Resource Management
  • Context cancellation: Uses configured Cancellor for graceful shutdown
  • Memory limits: Configurable buffer limits (default 10MB) prevent runaway memory usage
  • Goroutine leaks: Automatic cleanup when context is cancelled or process exits
Example Error Handling
for part, err := range process.ExecuteAndStream(ctx) {
    if err != nil {
        switch e := err.(type) {
        case consolestream.ProcessStartError:
            log.Printf("Failed to start: %v", e.Err)
        }
        break
    }

    switch event := part.Event.(type) {
    case *consolestream.PipeOutputData:
        // Handle normal output
        processOutput(event.Data, event.Stream)
    case *consolestream.ProcessEnd:
        if !event.Success {
            log.Printf("Process failed with exit code %d", event.ExitCode)
        }
        return // Process completed
    case *consolestream.ProcessError:
        log.Printf("Process error: %s", event.Message)
        return
    case *consolestream.HeartbeatEvent:
        // Monitor for stuck processes
        if event.ElapsedTime > 10*time.Minute {
            log.Printf("Process may be stuck (running for %v)", event.ElapsedTime)
        }
    }
}

Extending the Library

Custom Cancellation

Implement the Cancellor interface for different execution environments:

type DockerCancellor struct {
    containerID string
}

func (d *DockerCancellor) Cancel(ctx context.Context, pid int) error {
    return exec.CommandContext(ctx, "docker", "stop", d.containerID).Run()
}

// Use with processes
process := consolestream.NewPipeProcess("docker", []string{"run", "..."},
    consolestream.WithCancellor(&DockerCancellor{"my-container"}))
Event Processing

Add middleware for filtering, transforming, or analyzing events:

func FilterPipeOutputEvents(stream iter.Seq2[Event, error]) iter.Seq2[Event, error] {
    return func(yield func(Event, error) bool) {
        for part, err := range stream {
            if err != nil {
                yield(part, err)
                return
            }

            // Only yield PipeOutputData events
            if _, ok := part.Event.(*consolestream.PipeOutputData); ok {
                if !yield(part, err) {
                    return
                }
            }
        }
    }
}

func MonitorHeartbeats(stream iter.Seq2[Event, error], threshold time.Duration) iter.Seq2[Event, error] {
    return func(yield func(Event, error) bool) {
        for part, err := range stream {
            if !yield(part, err) {
                return
            }

            // Alert on long-running processes
            if hb, ok := part.Event.(*consolestream.HeartbeatEvent); ok {
                if hb.ElapsedTime > threshold {
                    // Could emit custom alert events here
                    log.Printf("Process running for %v", hb.ElapsedTime)
                }
            }
        }
    }
}

// Usage
filtered := FilterPipeOutputEvents(process.ExecuteAndStream(ctx))
monitored := MonitorHeartbeats(filtered, 5*time.Minute)
for part, err := range monitored {
    // Only output events with heartbeat monitoring
}

Requirements

  • Go 1.23+ (for iterator support)
  • Unix-like system (for signal handling and PTY support)
  • github.com/creack/pty (for PTY functionality - automatically included)

Installation

go get github.com/wolfeidau/console-stream

Acknowledgments

This library was developed with the assistance of Claude (Anthropic's AI assistant) through responsible AI collaboration. The design, implementation, and documentation reflect a partnership between human engineering judgment and AI-powered development acceleration.

Human Contributions:

  • Project requirements and architectural decisions
  • Code review and validation of AI-generated implementations
  • Testing strategy and quality assurance
  • Production readiness considerations and operational concerns

AI Contributions:

  • Code generation following specified patterns and requirements
  • Documentation creation adhering to Amazon engineering principles
  • Test program development and comprehensive examples
  • Code refactoring and optimization suggestions

This approach demonstrates responsible AI usage in software development - leveraging AI capabilities while maintaining human oversight, validation, and decision-making authority. The resulting code has been thoroughly tested and reviewed to ensure production quality and maintainability.

License

This project is copyright Mark Wolfe and licensed under the Apache License, Version 2.0.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsHeartbeatEvent

func IsHeartbeatEvent(event StreamEvent) bool

func IsPTYOutputEvent

func IsPTYOutputEvent(event StreamEvent) bool

func IsPipeOutputEvent

func IsPipeOutputEvent(event StreamEvent) bool

func IsProcessEndEvent

func IsProcessEndEvent(event StreamEvent) bool

func IsProcessErrorEvent

func IsProcessErrorEvent(event StreamEvent) bool

func IsProcessLifecycleEvent

func IsProcessLifecycleEvent(event StreamEvent) bool

func IsProcessStartEvent

func IsProcessStartEvent(event StreamEvent) bool

func IsTerminalResizeEvent

func IsTerminalResizeEvent(event StreamEvent) bool

func ToAscicastV3

func ToAscicastV3(events iter.Seq2[Event, error], metadata AscicastV3Metadata) iter.Seq2[AscicastV3Line, error]

ToAscicastV3 transforms a stream of console-stream events into asciicast v3 format

func WriteAscicastV3

func WriteAscicastV3(events iter.Seq2[Event, error], metadata AscicastV3Metadata, writeFunc func([]byte) error) error

WriteAscicastV3 is a convenience function to write asciicast v3 to any writer

Types

type AscicastV3Event

type AscicastV3Event [3]any

AscicastV3Event represents an event line in asciicast v3 format: [interval, code, data]

func NewExitEvent

func NewExitEvent(interval float64, exitCode int) AscicastV3Event

NewExitEvent creates an asciicast exit event

func NewOutputEvent

func NewOutputEvent(interval float64, data []byte) AscicastV3Event

NewOutputEvent creates an asciicast output event

func NewResizeEvent

func NewResizeEvent(interval float64, cols, rows uint16) AscicastV3Event

NewResizeEvent creates an asciicast resize event

func (AscicastV3Event) MarshalJSON

func (e AscicastV3Event) MarshalJSON() ([]byte, error)

MarshalJSON implements AscicastV3Line interface for events

type AscicastV3Header

type AscicastV3Header struct {
	Version int `json:"version"`
	AscicastV3Metadata
}

AscicastV3Header represents the first line of an asciicast v3 file

func (AscicastV3Header) MarshalJSON

func (h AscicastV3Header) MarshalJSON() ([]byte, error)

MarshalJSON implements AscicastV3Line interface for headers

type AscicastV3Line

type AscicastV3Line interface {
	MarshalJSON() ([]byte, error)
}

AscicastV3Line represents either a header or event line in asciicast v3 format

type AscicastV3Metadata

type AscicastV3Metadata struct {
	Term      TermInfo          `json:"term"`
	Timestamp *time.Time        `json:"timestamp,omitempty"`
	Command   string            `json:"command,omitempty"`
	Title     string            `json:"title,omitempty"`
	Env       map[string]string `json:"env,omitempty"`
	Tags      []string          `json:"tags,omitempty"`
}

AscicastV3Metadata contains the metadata for the asciicast header

type Cancellor

type Cancellor interface {
	Cancel(ctx context.Context, pid int) error
}

Cancellor defines the interface for cancelling processes

type Event

type Event struct {
	Timestamp time.Time
	Event     StreamEvent
}

func (Event) EventType

func (e Event) EventType() EventType

EventType returns the type of the event contained in this Event

func (Event) String

func (e Event) String() string

String returns a human-readable representation of the Event

type EventType

type EventType int

EventType defines the different kinds of events

const (
	ProcessStartEvent EventType = iota
	ProcessEndEvent
	ProcessErrorEvent
	HeartbeatEventType
	PipeOutputEvent
	PTYOutputEvent
	TerminalResizeEventType
)

func (EventType) String

func (e EventType) String() string

type HeartbeatEvent

type HeartbeatEvent struct {
	ProcessAlive bool
	ElapsedTime  time.Duration
}

HeartbeatEvent represents a keep-alive signal when no output is generated

func (*HeartbeatEvent) String

func (h *HeartbeatEvent) String() string

func (*HeartbeatEvent) Type

func (h *HeartbeatEvent) Type() EventType

type LocalCancellor

type LocalCancellor struct {
	// contains filtered or unexported fields
}

LocalCancellor implements process cancellation for local processes

func NewLocalCancellor

func NewLocalCancellor(terminateTimeout time.Duration) *LocalCancellor

func (*LocalCancellor) Cancel

func (lc *LocalCancellor) Cancel(ctx context.Context, pid int) error

type PTYOutputData

type PTYOutputData struct {
	Data []byte
}

PTYOutputData represents terminal output from a PTY process with ANSI sequences preserved

func (*PTYOutputData) String

func (p *PTYOutputData) String() string

func (*PTYOutputData) Type

func (p *PTYOutputData) Type() EventType

type PTYProcess

type PTYProcess struct {
	// contains filtered or unexported fields
}

PTYProcess represents a process running in a pseudo-terminal

func NewPTYProcess

func NewPTYProcess(cmd string, args []string, opts ...ProcessOption) *PTYProcess

NewPTYProcess creates a new PTY process with functional options

func (*PTYProcess) ExecuteAndStream

func (p *PTYProcess) ExecuteAndStream(ctx context.Context) iter.Seq2[Event, error]

ExecuteAndStream starts a PTY process and returns an iterator over Event objects

type PipeOutputData

type PipeOutputData struct {
	Data   []byte
	Stream StreamType
}

PipeOutputData represents stdout/stderr data from the process

func (*PipeOutputData) String

func (o *PipeOutputData) String() string

func (*PipeOutputData) Type

func (o *PipeOutputData) Type() EventType

type PipeProcess

type PipeProcess struct {
	// contains filtered or unexported fields
}

func NewPipeProcess

func NewPipeProcess(cmd string, args []string, opts ...ProcessOption) *PipeProcess

func (*PipeProcess) ExecuteAndStream

func (p *PipeProcess) ExecuteAndStream(ctx context.Context) iter.Seq2[Event, error]

ExecuteAndStream starts a process and returns an iterator over Event objects

type PipeProcessFailedError

type PipeProcessFailedError struct {
	Cmd      string
	ExitCode int
}

func (PipeProcessFailedError) Error

func (e PipeProcessFailedError) Error() string

type PipeProcessKilledError

type PipeProcessKilledError struct {
	Cmd    string
	Signal string
}

func (PipeProcessKilledError) Error

func (e PipeProcessKilledError) Error() string

type ProcessEnd

type ProcessEnd struct {
	ExitCode int
	Duration time.Duration
	Success  bool
}

ProcessEnd represents the completion of process execution

func (*ProcessEnd) String

func (p *ProcessEnd) String() string

func (*ProcessEnd) Type

func (p *ProcessEnd) Type() EventType

type ProcessError

type ProcessError struct {
	Error   error
	Message string
}

ProcessError represents an error during process execution

func (*ProcessError) String

func (p *ProcessError) String() string

func (*ProcessError) Type

func (p *ProcessError) Type() EventType

type ProcessOption

type ProcessOption func(*processConfig)

ProcessOption represents a functional option for configuring processes

func WithCancellor

func WithCancellor(cancellor Cancellor) ProcessOption

WithCancellor sets a custom cancellor for the process

func WithEnv

func WithEnv(env []string) ProcessOption

WithEnv sets environment variables as a slice of "KEY=value" strings

func WithEnvMap

func WithEnvMap(envMap map[string]string) ProcessOption

WithEnvMap sets environment variables from a map for convenience

func WithEnvVar

func WithEnvVar(key, value string) ProcessOption

WithEnvVar sets a single environment variable

func WithFlushInterval

func WithFlushInterval(interval time.Duration) ProcessOption

WithFlushInterval sets how often to flush output buffers

func WithMaxBufferSize

func WithMaxBufferSize(size int) ProcessOption

WithMaxBufferSize sets the maximum buffer size before forcing a flush

func WithPTYSize

func WithPTYSize(size pty.Winsize) ProcessOption

WithPTYSize sets a custom terminal size for the PTY process

type ProcessStart

type ProcessStart struct {
	PID     int
	Command string
	Args    []string
}

ProcessStart represents the beginning of process execution

func (*ProcessStart) String

func (p *ProcessStart) String() string

func (*ProcessStart) Type

func (p *ProcessStart) Type() EventType

type ProcessStartError

type ProcessStartError struct {
	Cmd string
	Err error
}

Error types for different process failure scenarios

func (ProcessStartError) Error

func (e ProcessStartError) Error() string

type StreamEvent

type StreamEvent interface {
	Type() EventType
	String() string
}

StreamEvent represents different types of events that can occur during process execution

type StreamType

type StreamType int
const (
	Stdout StreamType = iota
	Stderr
)

func (StreamType) String

func (s StreamType) String() string

type TermInfo

type TermInfo struct {
	Cols int `json:"cols"`
	Rows int `json:"rows"`
}

TermInfo represents terminal dimensions for asciicast header

type TerminalResizeEvent

type TerminalResizeEvent struct {
	Rows uint16
	Cols uint16
	X    uint16 // Width in pixels
	Y    uint16 // Height in pixels
}

TerminalResizeEvent represents a terminal window size change

func (*TerminalResizeEvent) String

func (t *TerminalResizeEvent) String() string

func (*TerminalResizeEvent) Type

func (t *TerminalResizeEvent) Type() EventType

Directories

Path Synopsis
cmd
tester command
example
asciicast command
buffering command
burst command
logging command
pty command
simple command
stream command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL