consolestream

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2025 License: Apache-2.0 Imports: 24 Imported by: 0

README

Console Stream

A Go library for executing processes and streaming their output in real-time using Go 1.23+ iterators.

Features

  • Real-time streaming: Process output delivered at configurable intervals or buffer limits
  • Unified API: Single process type supports both pipe and PTY modes
  • Container execution: Run processes inside Docker/Podman containers with volume mounts
  • Event-driven: Process lifecycle events, output data, and heartbeat monitoring
  • Production ready: Context cancellation, error handling, and resource cleanup

Quick Start

package main

import (
    "context"
    "fmt"

    consolestream "github.com/wolfeidau/console-stream"
)

func main() {
    // Create and execute process
    process := consolestream.NewProcess("echo", []string{"Hello, World!"})

    ctx := context.Background()
    for event, err := range process.ExecuteAndStream(ctx) {
        if err != nil {
            fmt.Printf("Error: %v\n", err)
            break
        }

        switch e := event.Event.(type) {
        case *consolestream.OutputData:
            fmt.Printf("Output: %s", string(e.Data))
        case *consolestream.ProcessEnd:
            fmt.Printf("Process completed (Exit: %d)\n", e.ExitCode)
            return
        }
    }
}

API

Process Creation
// Default (pipe mode)
process := consolestream.NewProcess("command", []string{"arg1", "arg2"})

// PTY mode for interactive programs
process := consolestream.NewProcess("bash", []string{"-l"},
    consolestream.WithPTYMode())

// With configuration options
process := consolestream.NewProcess("npm", []string{"install"},
    consolestream.WithPTYMode(),
    consolestream.WithPTYSize(pty.Winsize{Rows: 24, Cols: 80}),
    consolestream.WithEnvVar("NODE_ENV", "production"),
    consolestream.WithFlushInterval(500*time.Millisecond))
Process Options
Option Description
WithPipeMode() Use standard pipes (default)
WithPTYMode() Use pseudo-terminal for interactive programs
WithPTYSize(size) Set terminal dimensions for PTY
WithCancellor(c) Custom process cancellation
WithEnvVar(k, v) Set environment variable
WithFlushInterval(d) Output flush frequency
WithMaxBufferSize(s) Buffer size limit
Container Execution

Execute processes inside Docker/Podman containers with volume mounts and environment variables.

Automatic Image Pulling: Container images are automatically pulled if not available locally, with progress events emitted every 2 seconds.

// Simple container execution
process := consolestream.NewContainerProcess("go", []string{"version"},
    consolestream.WithContainerImage("golang:1.25"))

// With volume mounts
process := consolestream.NewContainerProcess("make", []string{"test"},
    consolestream.WithContainerImage("golang:1.25"),
    consolestream.WithContainerMount("/path/to/code", "/workspace", false),
    consolestream.WithContainerWorkingDir("/workspace"),
    consolestream.WithContainerEnvMap(map[string]string{
        "CI": "true",
        "GOPATH": "/go",
    }))

// Same event streaming model
for event, err := range process.ExecuteAndStream(ctx) {
    if err != nil {
        break
    }

    switch e := event.Event.(type) {
    case *consolestream.ImagePullStart:
        fmt.Printf("Pulling image: %s\n", e.Image)
    case *consolestream.ImagePullProgress:
        fmt.Printf("Pull progress: %d%% (%d/%d bytes)\n",
            e.PercentComplete, e.BytesDownloaded, e.BytesTotal)
    case *consolestream.ImagePullComplete:
        fmt.Printf("Pull complete: %s\n", e.Digest)
    case *consolestream.ContainerCreate:
        fmt.Printf("Container: %s\n", e.ContainerID)
    case *consolestream.OutputData:
        fmt.Print(string(e.Data))
    case *consolestream.ProcessEnd:
        fmt.Printf("Exit: %d\n", e.ExitCode)
    }
}
Container Options
Option Description
WithContainerImage(img) Container image (required)
WithContainerRuntime(r) Runtime: "docker" (default) or "podman"
WithContainerMount(src, dst, ro) Volume mount (source, target, read-only)
WithContainerWorkingDir(d) Working directory in container
WithContainerEnvMap(m) Environment variables map
WithContainerFlushInterval(d) Output flush frequency
WithContainerMaxBufferSize(s) Buffer size limit
Event Types
Event Description
ProcessStart Process started with PID
OutputData Process output (combined stdout/stderr)
ProcessEnd Process completed with exit code
ProcessError Process execution error
HeartbeatEvent Keep-alive when process is silent
TerminalResizeEvent Terminal size change (PTY only)
ContainerCreate Container created with ID and image
ContainerRemove Container cleanup completed
ImagePullStart Container image pull started
ImagePullProgress Container image pull progress (every 2s)
ImagePullComplete Container image pull completed

Use Cases

Pipe Mode - Use for:

  • Build tools, tests, data processing
  • Simple command execution
  • When you need performance

PTY Mode - Use for:

  • Interactive applications (shells, editors)
  • Programs with progress bars or colors
  • Terminal session recording

Container Mode - Use for:

  • CI/CD pipelines with isolated builds
  • Running processes in reproducible environments
  • Sandboxed execution with resource limits
  • Testing with specific runtime versions

Asciicast Recording

Record terminal sessions in asciicast v3 format:

// Record PTY session
process := consolestream.NewProcess("bash", []string{"-l"},
    consolestream.WithPTYMode())

metadata := consolestream.AscicastV3Metadata{
    Term: consolestream.TermInfo{Cols: 80, Rows: 24},
    Command: "bash -l",
}

events := process.ExecuteAndStream(ctx)
asciicast := consolestream.ToAscicastV3(events, metadata)

// Write to file
file, _ := os.Create("session.cast")
defer file.Close()

for line, err := range asciicast {
    if err != nil { break }
    data, _ := json.Marshal(line)
    file.Write(append(data, '\n'))
}

Command Line Tool

Build and use the runner tool:

# Build
go build -o runner ./cmd/runner

# Execute with YAML config
./runner exec --config task.yaml --format json --output results.json

Example config:

command: "docker"
args: ["build", "-t", "myapp", "."]
process_type: "pty"
timeout: "10m"
env:
  DOCKER_BUILDKIT: "1"

Installation

go get github.com/wolfeidau/console-stream

Requirements: Go 1.25+, Unix-like system

License

Apache License, Version 2.0 - Copyright Mark Wolfe

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsContainerCreateEvent added in v0.4.0

func IsContainerCreateEvent(event StreamEvent) bool

func IsContainerRemoveEvent added in v0.4.0

func IsContainerRemoveEvent(event StreamEvent) bool

func IsHeartbeatEvent

func IsHeartbeatEvent(event StreamEvent) bool

func IsOutputEvent added in v0.3.0

func IsOutputEvent(event StreamEvent) bool

func IsProcessEndEvent

func IsProcessEndEvent(event StreamEvent) bool

func IsProcessErrorEvent

func IsProcessErrorEvent(event StreamEvent) bool

func IsProcessLifecycleEvent

func IsProcessLifecycleEvent(event StreamEvent) bool

func IsProcessStartEvent

func IsProcessStartEvent(event StreamEvent) bool

func IsTerminalResizeEvent

func IsTerminalResizeEvent(event StreamEvent) bool

func ToAscicastV3

func ToAscicastV3(events iter.Seq2[Event, error], metadata AscicastV3Metadata) iter.Seq2[AscicastV3Line, error]

ToAscicastV3 transforms a stream of console-stream events into asciicast v3 format

func WriteAscicastV3

func WriteAscicastV3(events iter.Seq2[Event, error], metadata AscicastV3Metadata, writeFunc func([]byte) error) error

WriteAscicastV3 is a convenience function to write asciicast v3 to any writer

Types

type AscicastV3Event

type AscicastV3Event [3]any

AscicastV3Event represents an event line in asciicast v3 format: [interval, code, data]

func NewExitEvent

func NewExitEvent(interval float64, exitCode int) AscicastV3Event

NewExitEvent creates an asciicast exit event

func NewOutputEvent

func NewOutputEvent(interval float64, data string) AscicastV3Event

NewOutputEvent creates an asciicast output event

func NewResizeEvent

func NewResizeEvent(interval float64, cols, rows uint16) AscicastV3Event

NewResizeEvent creates an asciicast resize event

func (AscicastV3Event) MarshalJSON

func (e AscicastV3Event) MarshalJSON() ([]byte, error)

MarshalJSON implements AscicastV3Line interface for events

type AscicastV3Header

type AscicastV3Header struct {
	Version int `json:"version"`
	AscicastV3Metadata
}

AscicastV3Header represents the first line of an asciicast v3 file

func (AscicastV3Header) MarshalJSON

func (h AscicastV3Header) MarshalJSON() ([]byte, error)

MarshalJSON implements AscicastV3Line interface for headers

type AscicastV3Line

type AscicastV3Line interface {
	MarshalJSON() ([]byte, error)
}

AscicastV3Line represents either a header or event line in asciicast v3 format

type AscicastV3Metadata

type AscicastV3Metadata struct {
	Term      TermInfo          `json:"term"`
	Timestamp *int64            `json:"timestamp,omitempty"`
	Command   string            `json:"command,omitempty"`
	Title     string            `json:"title,omitempty"`
	Env       map[string]string `json:"env,omitempty"`
	Tags      []string          `json:"tags,omitempty"`
}

AscicastV3Metadata contains the metadata for the asciicast header

type BufferWriter added in v0.2.0

type BufferWriter struct {
	// contains filtered or unexported fields
}

BufferWriter wraps buffer operations and implements io.WriteCloser This can be used by both PipeProcess and PTYProcess for consistent buffer management

func NewBufferWriter added in v0.2.0

func NewBufferWriter(flushChan chan<- struct{}, maxBufferSize int) *BufferWriter

func (*BufferWriter) Close added in v0.2.0

func (w *BufferWriter) Close() error

func (*BufferWriter) FlushAndClear added in v0.2.0

func (w *BufferWriter) FlushAndClear() []byte

FlushAndClear returns the current buffer contents and clears it

func (*BufferWriter) Len added in v0.2.0

func (w *BufferWriter) Len() int

Len returns the current buffer length (thread-safe)

func (*BufferWriter) Write added in v0.2.0

func (w *BufferWriter) Write(data []byte) (int, error)

type Cancellor

type Cancellor interface {
	Cancel(ctx context.Context, pid int) error
}

Cancellor defines the interface for cancelling processes

type ContainerCreate added in v0.4.0

type ContainerCreate struct {
	ContainerID string
	Image       string
}

ContainerCreate represents container creation

func (*ContainerCreate) String added in v0.4.0

func (c *ContainerCreate) String() string

func (*ContainerCreate) Type added in v0.4.0

func (c *ContainerCreate) Type() EventType

type ContainerMount added in v0.4.0

type ContainerMount struct {
	Source   string
	Target   string
	ReadOnly bool
}

ContainerMount represents a volume mount for container processes

type ContainerProcess added in v0.4.0

type ContainerProcess struct {
	// contains filtered or unexported fields
}

ContainerProcess represents a process running inside a container

func NewContainerProcess added in v0.4.0

func NewContainerProcess(cmd string, args []string, opts ...ContainerProcessOption) *ContainerProcess

NewContainerProcess creates a new container process with functional options

func (*ContainerProcess) ExecuteAndStream added in v0.4.0

func (cp *ContainerProcess) ExecuteAndStream(ctx context.Context) iter.Seq2[Event, error]

ExecuteAndStream starts a container and returns an iterator over Event objects

type ContainerProcessOption added in v0.4.0

type ContainerProcessOption func(*containerProcessConfig)

ContainerProcessOption represents a functional option for configuring container processes

func WithContainerCancellor added in v0.4.0

func WithContainerCancellor(cancellor Cancellor) ContainerProcessOption

WithContainerCancellor sets a custom cancellor for the container process

func WithContainerEnv added in v0.4.0

func WithContainerEnv(env []string) ContainerProcessOption

WithContainerEnv sets environment variables as a slice of "KEY=value" strings

func WithContainerEnvMap added in v0.4.0

func WithContainerEnvMap(envMap map[string]string) ContainerProcessOption

WithContainerEnvMap sets environment variables from a map for convenience

func WithContainerFlushInterval added in v0.4.0

func WithContainerFlushInterval(interval time.Duration) ContainerProcessOption

WithContainerFlushInterval sets how often to flush output buffers for container processes

func WithContainerImage added in v0.4.0

func WithContainerImage(image string) ContainerProcessOption

WithContainerImage sets the container image to use (REQUIRED for container processes)

func WithContainerMaxBufferSize added in v0.4.0

func WithContainerMaxBufferSize(size int) ContainerProcessOption

WithContainerMaxBufferSize sets the maximum buffer size before forcing a flush for container processes

func WithContainerMeter added in v0.4.0

func WithContainerMeter(meter any) ContainerProcessOption

WithContainerMeter sets an OpenTelemetry meter for metrics collection for container processes

func WithContainerMetricsInterval added in v0.4.0

func WithContainerMetricsInterval(interval time.Duration) ContainerProcessOption

WithContainerMetricsInterval sets how often to aggregate metrics into buckets for container processes

func WithContainerMount added in v0.4.0

func WithContainerMount(source, target string, readOnly bool) ContainerProcessOption

WithContainerMount adds a volume mount to the container

func WithContainerRuntime added in v0.4.0

func WithContainerRuntime(runtime string) ContainerProcessOption

WithContainerRuntime sets the container runtime ("docker" or "podman")

func WithContainerWorkingDir added in v0.4.0

func WithContainerWorkingDir(dir string) ContainerProcessOption

WithContainerWorkingDir sets the working directory for the container process

type ContainerRemove added in v0.4.0

type ContainerRemove struct {
	ContainerID string
}

ContainerRemove represents container cleanup

func (*ContainerRemove) String added in v0.4.0

func (c *ContainerRemove) String() string

func (*ContainerRemove) Type added in v0.4.0

func (c *ContainerRemove) Type() EventType

type Event

type Event struct {
	Timestamp time.Time
	Event     StreamEvent
}

func (Event) EventType

func (e Event) EventType() EventType

EventType returns the type of the event contained in this Event

func (Event) String

func (e Event) String() string

String returns a human-readable representation of the Event

type EventType

type EventType int

EventType defines the different kinds of events

const (
	ProcessStartEvent EventType = iota
	ProcessEndEvent
	ProcessErrorEvent
	HeartbeatEventType
	OutputEvent
	TerminalResizeEventType
	ContainerCreateEvent
	ContainerRemoveEvent
	ImagePullStartEvent
	ImagePullProgressEvent
	ImagePullCompleteEvent
)

func (EventType) String

func (e EventType) String() string

type HeartbeatEvent

type HeartbeatEvent struct {
	ProcessAlive bool
	ElapsedTime  time.Duration
}

HeartbeatEvent represents a keep-alive signal when no output is generated

func (*HeartbeatEvent) String

func (h *HeartbeatEvent) String() string

func (*HeartbeatEvent) Type

func (h *HeartbeatEvent) Type() EventType

type ImagePullComplete added in v0.4.0

type ImagePullComplete struct {
	Image  string
	Digest string
}

ImagePullComplete represents successful completion of an image pull

func (*ImagePullComplete) String added in v0.4.0

func (i *ImagePullComplete) String() string

func (*ImagePullComplete) Type added in v0.4.0

func (i *ImagePullComplete) Type() EventType

type ImagePullProgress added in v0.4.0

type ImagePullProgress struct {
	Image           string
	Status          string
	PercentComplete int
	BytesDownloaded int64
	BytesTotal      int64
}

ImagePullProgress represents progress during an image pull

func (*ImagePullProgress) String added in v0.4.0

func (i *ImagePullProgress) String() string

func (*ImagePullProgress) Type added in v0.4.0

func (i *ImagePullProgress) Type() EventType

type ImagePullStart added in v0.4.0

type ImagePullStart struct {
	Image string
}

ImagePullStart represents the beginning of an image pull operation

func (*ImagePullStart) String added in v0.4.0

func (i *ImagePullStart) String() string

func (*ImagePullStart) Type added in v0.4.0

func (i *ImagePullStart) Type() EventType

type LocalCancellor

type LocalCancellor struct {
	// contains filtered or unexported fields
}

LocalCancellor implements process cancellation for local processes

func NewLocalCancellor

func NewLocalCancellor(terminateTimeout time.Duration) *LocalCancellor

func (*LocalCancellor) Cancel

func (lc *LocalCancellor) Cancel(ctx context.Context, pid int) error

type OutputData added in v0.3.0

type OutputData struct {
	Data []byte
}

OutputData represents output data from the process (combined stdout/stderr)

func (*OutputData) String added in v0.3.0

func (o *OutputData) String() string

func (*OutputData) Type added in v0.3.0

func (o *OutputData) Type() EventType

type PipeProcessFailedError

type PipeProcessFailedError struct {
	Cmd      string
	ExitCode int
}

func (PipeProcessFailedError) Error

func (e PipeProcessFailedError) Error() string

type PipeProcessKilledError

type PipeProcessKilledError struct {
	Cmd    string
	Signal string
}

func (PipeProcessKilledError) Error

func (e PipeProcessKilledError) Error() string

type Process added in v0.3.0

type Process struct {
	// contains filtered or unexported fields
}

Process represents a unified process that can run in either PTY or pipe mode

func NewProcess added in v0.3.0

func NewProcess(cmd string, args []string, opts ...ProcessOption) *Process

NewProcess creates a new unified process with functional options

func (*Process) ExecuteAndStream added in v0.3.0

func (p *Process) ExecuteAndStream(ctx context.Context) iter.Seq2[Event, error]

ExecuteAndStream starts a process and returns an iterator over Event objects

func (*Process) GetStats added in v0.3.0

func (p *Process) GetStats() ProcessStatsSnapshot

GetStats returns a copy of the current process statistics

type ProcessEnd

type ProcessEnd struct {
	ExitCode int
	Duration time.Duration
	Success  bool
}

ProcessEnd represents the completion of process execution

func (*ProcessEnd) String

func (p *ProcessEnd) String() string

func (*ProcessEnd) Type

func (p *ProcessEnd) Type() EventType

type ProcessError

type ProcessError struct {
	Error   error
	Message string
}

ProcessError represents an error during process execution

func (*ProcessError) String

func (p *ProcessError) String() string

func (*ProcessError) Type

func (p *ProcessError) Type() EventType

type ProcessInfo added in v0.2.1

type ProcessInfo struct {
	Command string
	Args    []string
	PID     int
}

ProcessInfo contains process information for metrics attributes

type ProcessMode added in v0.3.0

type ProcessMode int

processConfig holds configuration options for processes ProcessMode defines whether to use PTY or pipe mode for process execution

const (
	PipeMode ProcessMode = iota
	PTYMode
)

func (ProcessMode) String added in v0.3.0

func (m ProcessMode) String() string

type ProcessOption

type ProcessOption func(*processConfig)

ProcessOption represents a functional option for configuring processes

func WithCancellor

func WithCancellor(cancellor Cancellor) ProcessOption

WithCancellor sets a custom cancellor for the process

func WithEnv

func WithEnv(env []string) ProcessOption

WithEnv sets environment variables as a slice of "KEY=value" strings

func WithEnvMap

func WithEnvMap(envMap map[string]string) ProcessOption

WithEnvMap sets environment variables from a map for convenience

func WithEnvVar

func WithEnvVar(key, value string) ProcessOption

WithEnvVar sets a single environment variable

func WithFlushInterval

func WithFlushInterval(interval time.Duration) ProcessOption

WithFlushInterval sets how often to flush output buffers

func WithMaxBufferSize

func WithMaxBufferSize(size int) ProcessOption

WithMaxBufferSize sets the maximum buffer size before forcing a flush

func WithMeter added in v0.2.1

func WithMeter(meter any) ProcessOption

WithMeter sets an OpenTelemetry meter for metrics collection

func WithMetricsInterval added in v0.2.1

func WithMetricsInterval(interval time.Duration) ProcessOption

WithMetricsInterval sets how often to aggregate metrics into buckets

func WithPTYMode added in v0.3.0

func WithPTYMode() ProcessOption

WithPTYMode sets the process to use a pseudo-terminal

func WithPTYSize

func WithPTYSize(size any) ProcessOption

WithPTYSize sets a custom terminal size for PTY processes

func WithPipeMode added in v0.3.0

func WithPipeMode() ProcessOption

WithPipeMode sets the process to use standard pipes (stdout/stderr)

func WithWorkingDir added in v0.3.2

func WithWorkingDir(dir string) ProcessOption

WithWorkingDir sets the working directory for the process

type ProcessStart

type ProcessStart struct {
	PID     int
	Command string
	Args    []string
}

ProcessStart represents the beginning of process execution

func (*ProcessStart) String

func (p *ProcessStart) String() string

func (*ProcessStart) Type

func (p *ProcessStart) Type() EventType

type ProcessStartError

type ProcessStartError struct {
	Cmd string
	Err error
}

Error types for different process failure scenarios

func (ProcessStartError) Error

func (e ProcessStartError) Error() string

type ProcessStats added in v0.3.0

type ProcessStats struct {
	// contains filtered or unexported fields
}

ProcessStats manages comprehensive metrics during process execution

func NewProcessStats added in v0.3.0

func NewProcessStats(meter metric.Meter, interval time.Duration, processInfo ProcessInfo) (*ProcessStats, error)

NewProcessStats creates a new ProcessStats instance with OpenTelemetry metrics

func (*ProcessStats) GetSnapshot added in v0.3.0

func (s *ProcessStats) GetSnapshot() ProcessStatsSnapshot

GetSnapshot returns a thread-safe snapshot of the current statistics

func (*ProcessStats) RecordError added in v0.3.0

func (s *ProcessStats) RecordError(ctx context.Context, eventType string)

RecordError increments the error count

func (*ProcessStats) RecordEvent added in v0.3.0

func (s *ProcessStats) RecordEvent(ctx context.Context, eventType string)

RecordEvent records a general event

func (*ProcessStats) RecordOutput added in v0.3.0

func (s *ProcessStats) RecordOutput(ctx context.Context, dataSize int64)

RecordOutput records output data and metrics

func (*ProcessStats) SetExitCode added in v0.3.0

func (s *ProcessStats) SetExitCode(code int)

SetExitCode sets the process exit code

func (*ProcessStats) Start added in v0.3.0

func (s *ProcessStats) Start(ctx context.Context)

Start initializes the stats and records process start

func (*ProcessStats) Stop added in v0.3.0

func (s *ProcessStats) Stop(ctx context.Context)

Stop finalizes the stats and records process stop

type ProcessStatsSnapshot added in v0.3.0

type ProcessStatsSnapshot struct {
	// Basic execution stats
	StartTime  time.Time
	EventCount int
	ErrorCount int
	ExitCode   *int
	Duration   time.Duration
	// contains filtered or unexported fields
}

ProcessStatsSnapshot represents a point-in-time view of process statistics

type StreamEvent

type StreamEvent interface {
	Type() EventType
	String() string
}

StreamEvent represents different types of events that can occur during process execution

type TermInfo

type TermInfo struct {
	Cols int `json:"cols"`
	Rows int `json:"rows"`
}

TermInfo represents terminal dimensions for asciicast header

type TerminalResizeEvent

type TerminalResizeEvent struct {
	Rows uint16
	Cols uint16
	X    uint16 // Width in pixels
	Y    uint16 // Height in pixels
}

TerminalResizeEvent represents a terminal window size change

func (*TerminalResizeEvent) String

func (t *TerminalResizeEvent) String() string

func (*TerminalResizeEvent) Type

func (t *TerminalResizeEvent) Type() EventType

Directories

Path Synopsis
cmd
runner command
Package main provides a pretty slog handler for human-readable colored terminal output.
Package main provides a pretty slog handler for human-readable colored terminal output.
tester command
example
asciicast command
buffering command
burst command
container command
logging command
pty command
simple command
stream command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL