writer

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package writer provides asynchronous buffered writing for Go applications.

AsyncWriter buffers data in memory and writes to the underlying writer in the background, improving performance for I/O-bound workloads.

Quick Start

file, _ := os.Create("output.txt")
w := writer.New(file)
defer w.Close()

w.WriteString("Hello, async world!")
w.Flush(context.Background())

Configuration

Configure buffer size, flush intervals, and error handling:

config := writer.Config{
	BufferSize:    64 * 1024,           // 64KB buffer
	FlushInterval: time.Second,         // Auto-flush every second
	BlockOnFull:   true,                // Block when buffer full
	MaxRetries:    3,                   // Retry failed writes
}

w := writer.NewWithConfig(underlyingWriter, config)

Writing Data

// Write bytes
w.Write([]byte("data"))

// Write string
w.WriteString("text")

// Write with context (for cancellation/timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
w.WriteContext(ctx, []byte("data"))

Backpressure Handling

When the buffer fills up, you can choose to block or drop writes:

config := writer.Config{
	BlockOnFull: true,  // Block until space available
}

// Or

config := writer.Config{
	BlockOnFull: false, // Drop writes when full
	OnBufferFull: func() {
		log.Println("Buffer full, dropping write")
	},
}

Monitoring

Track performance with callbacks:

config := writer.Config{
	OnFlush: func(bytes int, duration time.Duration) {
		log.Printf("Flushed %d bytes in %v", bytes, duration)
	},
	OnError: func(err error) {
		log.Printf("Write error: %v", err)
	},
}

Statistics

Get current buffer stats:

stats := w.Stats()
fmt.Printf("Written: %d bytes, Flushed: %d bytes, Pending: %d bytes\n",
	stats.BytesWritten, stats.BytesFlushed, stats.BytesPending)

Graceful Shutdown

Always close the writer to flush remaining data:

defer w.Close()  // Flushes and waits for completion

// Or with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
w.FlushAndClose(ctx)

Use Cases

High-Volume Logging:

logWriter := writer.New(logFile)
defer logWriter.Close()

for _, entry := range logEntries {
	logWriter.WriteString(entry + "\n")
}

Network Streaming:

conn, _ := net.Dial("tcp", "server:8080")
w := writer.NewWithConfig(conn, writer.Config{
	BufferSize:    8 * 1024,
	FlushInterval: 100 * time.Millisecond,
})

for data := range dataChannel {
	w.Write(data)
}

File Writing with Error Handling:

file, _ := os.Create("output.dat")
w := writer.NewWithConfig(file, writer.Config{
	MaxRetries: 5,
	OnError: func(err error) {
		log.Printf("Write failed: %v", err)
	},
})

w.Write(largeData)
w.Flush(context.Background())

Thread Safety

AsyncWriter is safe for concurrent use from multiple goroutines.

Performance Notes

- Larger buffers reduce syscalls but increase memory usage - Shorter flush intervals reduce data loss risk but increase overhead - Non-blocking mode improves write latency but may drop data - Retries help with transient errors but can hide persistent issues

See example tests for more usage patterns.

Example

Example demonstrates basic async writer usage.

// Create an underlying writer (could be file, network, etc.)
var buf bytes.Buffer

// Create async writer with default configuration
writer := New(&buf)
defer func() { _ = writer.Close() }()

// Write data asynchronously - returns immediately
_ = writer.WriteString("Hello, ")
_ = writer.WriteString("async ")
_ = writer.WriteString("world!")

// Flush to ensure all data is written
_ = writer.Flush(context.Background())

fmt.Println(buf.String())
Output:

Hello, async world!
Example (AutoFlush)

Example_autoFlush demonstrates automatic flushing.

var buf bytes.Buffer

config := DefaultConfig()
config.FlushInterval = 50 * time.Millisecond // Auto-flush every 50ms

writer := NewWithConfig(&buf, config)

// Write data
_ = writer.WriteString("auto-flush example")

// Wait for auto-flush
time.Sleep(100 * time.Millisecond)

// Ensure all background processing is complete
_ = writer.Flush(context.Background())
_ = writer.Close()

// Data should be automatically written
fmt.Printf("Auto-flushed data: %s\n", buf.String())
Output:

Auto-flushed data: auto-flush example
Example (Callbacks)

Example_callbacks demonstrates using callbacks for monitoring.

var buf bytes.Buffer

config := DefaultConfig()
config.OnFlush = func(bytes int, _ time.Duration) {
	fmt.Printf("Flushed %d bytes\n", bytes)
}
config.OnBufferFull = func() {
	fmt.Println("Buffer is full!")
}
config.OnError = func(err error) {
	fmt.Printf("Write error: %v\n", err)
}

writer := NewWithConfig(&buf, config)
defer func() { _ = writer.Close() }()

// Write some data
_ = writer.WriteString("Hello with callbacks")

// Flush to trigger callback
_ = writer.Flush(context.Background())
Output:

Flushed 20 bytes
Example (Concurrent)

Example_concurrent demonstrates concurrent writes from multiple goroutines.

var buf bytes.Buffer
writer := New(&buf)
defer func() { _ = writer.Close() }()

var wg sync.WaitGroup
numGoroutines := 3
writesPerGoroutine := 3

// Start multiple goroutines writing concurrently
for i := 0; i < numGoroutines; i++ {
	wg.Add(1)
	go func(id int) {
		defer wg.Done()
		for j := 0; j < writesPerGoroutine; j++ {
			_ = writer.WriteString(fmt.Sprintf("G%d-W%d ", id, j))
		}
	}(i)
}

wg.Wait()

// Flush all writes
_ = writer.Flush(context.Background())

result := buf.String()

// Count the writes (should be 9 total)
writeCount := len(strings.Fields(result))
fmt.Printf("Total writes: %d\n", writeCount)

// Check stats
stats := writer.Stats()
fmt.Printf("Recorded writes: %d\n", stats.WriteCount)
Output:

Total writes: 9
Recorded writes: 9
Example (Configuration)

Example_configuration demonstrates various configuration options.

var buf bytes.Buffer

// Custom configuration
config := Config{
	BufferSize:    1024,                   // 1KB buffer
	FlushInterval: 100 * time.Millisecond, // Auto-flush every 100ms
	BlockOnFull:   true,                   // Block when buffer is full
	MaxRetries:    5,                      // Retry failed writes
	RetryDelay:    50 * time.Millisecond,  // Delay between retries
}

writer := NewWithConfig(&buf, config)

// Write data
_ = writer.WriteString("Configured writer example")

// Let auto-flush handle the writing
time.Sleep(150 * time.Millisecond)

// Ensure all background processing is complete
_ = writer.Flush(context.Background())
_ = writer.Close()

fmt.Println(buf.String())
Output:

Configured writer example
Example (ContextCancellation)

Example_contextCancellation demonstrates context-aware operations.

var buf bytes.Buffer
writer := New(&buf)
defer func() { _ = writer.Close() }()

// Write some data
_ = writer.WriteString("context test")

// Create a context that will timeout quickly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()

// This flush will likely be canceled
err := writer.Flush(ctx)
if err == context.DeadlineExceeded {
	fmt.Println("Flush was canceled due to timeout")
} else {
	fmt.Println("Flush completed")
}

// Try again with proper context
err = writer.Flush(context.Background())
if err == nil {
	fmt.Println("Second flush succeeded")
	fmt.Printf("Data: %s\n", buf.String())
}
Output:

Flush was canceled due to timeout
Second flush succeeded
Data: context test
Example (ErrorHandling)

Example_errorHandling demonstrates error handling and retries.

// Create a writer that will fail initially
failingWriter := &failingWriter{failCount: 2}

config := DefaultConfig()
config.MaxRetries = 3
config.RetryDelay = 1 * time.Millisecond

writer := NewWithConfig(failingWriter, config)
defer func() { _ = writer.Close() }()

// Write data (will succeed after retries)
_ = writer.WriteString("persistent data")

err := writer.Flush(context.Background())
if err != nil {
	fmt.Printf("Final error: %v\n", err)
} else {
	fmt.Println("Write succeeded after retries")
	fmt.Printf("Final data: %s\n", failingWriter.buf.String())
}
Output:

Write succeeded after retries
Final data: persistent data
Example (FileWriting)

Example_fileWriting demonstrates writing to a file asynchronously.

// Create a temporary file
file, err := os.CreateTemp("", "async_writer_example_*.txt")
if err != nil {
	log.Fatal(err)
}
defer func() { _ = os.Remove(file.Name()) }() // Clean up

// Create async writer for the file
writer := New(file)
defer func() { _ = writer.Close() }()

// Write data asynchronously
for i := 0; i < 3; i++ {
	_ = writer.WriteString(fmt.Sprintf("Line %d\n", i+1))
}

// Flush to ensure data is written
_ = writer.Flush(context.Background())

// Close the file
_ = file.Close()

// Read back and display
content, err := os.ReadFile(file.Name())
if err != nil {
	log.Fatal(err)
}

fmt.Print(string(content))
Output:

Line 1
Line 2
Line 3
Example (NonBlocking)

Example_nonBlocking demonstrates non-blocking behavior.

var buf bytes.Buffer

config := DefaultConfig()
config.BlockOnFull = false // Don't block on full buffer
config.BufferSize = 20     // Small buffer for demo

writer := NewWithConfig(&buf, config)

// Write data that fits in buffer
err := writer.WriteString("fits")
if err != nil {
	fmt.Printf("Error: %v\n", err)
} else {
	fmt.Println("Small write succeeded")
}

// Try to write large data that exceeds buffer
largeData := strings.Repeat("x", 30)
err = writer.WriteString(largeData)
if err == ErrBufferFull {
	fmt.Println("Large write failed: buffer full")
}

// Flush the successful write
_ = writer.Flush(context.Background())
_ = writer.Close()
fmt.Printf("Buffer contains: %s\n", buf.String())
Output:

Small write succeeded
Large write failed: buffer full
Buffer contains: fits
Example (Statistics)

Example_statistics demonstrates monitoring writer performance.

var buf bytes.Buffer
writer := New(&buf)
defer func() { _ = writer.Close() }()

// Perform some writes
for i := 0; i < 5; i++ {
	_ = writer.WriteString(fmt.Sprintf("Write %d\n", i))
}

// Flush and get stats
_ = writer.Flush(context.Background())
stats := writer.Stats()

fmt.Printf("Writes: %d\n", stats.WriteCount)
fmt.Printf("Bytes: %d\n", stats.BytesWritten)
fmt.Printf("Flushes: %d\n", stats.FlushCount)
fmt.Printf("Errors: %d\n", stats.ErrorCount)
fmt.Printf("Buffer utilization: %.1f%%\n", stats.BufferUtilization*100)
Output:

Writes: 5
Bytes: 40
Flushes: 1
Errors: 0
Buffer utilization: 0.0%

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrBufferFull = errors.New("buffer is full")

ErrBufferFull is returned when the internal buffer is full and cannot accept more data.

View Source
var ErrWriterClosed = errors.New("writer is closed")

ErrWriterClosed is returned when attempting to write to a closed writer.

Functions

This section is empty.

Types

type AsyncWriter

type AsyncWriter interface {
	// Write writes data asynchronously. Returns immediately without blocking.
	// If the buffer is full and blocking is disabled, returns ErrBufferFull.
	Write(data []byte) error

	// WriteString writes a string asynchronously.
	WriteString(s string) error

	// WriteContext writes data with context support for cancellation.
	WriteContext(ctx context.Context, data []byte) error

	// Flush forces all buffered data to be written to the underlying writer.
	// This operation blocks until all data is flushed or context is canceled.
	Flush(ctx context.Context) error

	// Close gracefully shuts down the writer, flushing any remaining data.
	// After Close returns, no more writes are accepted.
	Close() error

	// Stats returns statistics about the writer's performance.
	Stats() Stats

	// IsClosed returns true if the writer is closed.
	IsClosed() bool

	// BufferSize returns the current number of buffered bytes.
	BufferSize() int

	// BufferCapacity returns the maximum buffer capacity.
	BufferCapacity() int
}

AsyncWriter provides asynchronous, buffered writing capabilities. It buffers write operations in memory and flushes them to the underlying writer in a background goroutine, providing non-blocking write operations.

func New

func New(w io.Writer) AsyncWriter

New creates a new AsyncWriter with default configuration.

func NewWithConfig

func NewWithConfig(w io.Writer, config Config) AsyncWriter

NewWithConfig creates a new AsyncWriter with the specified configuration.

type Config

type Config struct {
	// BufferSize is the size of the internal buffer in bytes.
	// Default: 64KB
	BufferSize int

	// FlushInterval is how often to flush the buffer automatically.
	// Set to 0 to disable automatic flushing.
	// Default: 1 second
	FlushInterval time.Duration

	// BlockOnFull determines behavior when buffer is full.
	// If true, Write operations will block until space is available.
	// If false, Write operations will return ErrBufferFull immediately.
	// Default: true
	BlockOnFull bool

	// MaxRetries is the number of times to retry failed write operations.
	// Default: 3
	MaxRetries int

	// RetryDelay is the delay between retries.
	// Default: 100ms
	RetryDelay time.Duration

	// OnError is called when write errors occur.
	OnError func(error)

	// OnFlush is called after each flush operation.
	OnFlush func(bytesWritten int, duration time.Duration)

	// OnBufferFull is called when the buffer becomes full.
	OnBufferFull func()
}

Config holds configuration options for AsyncWriter.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a default configuration.

type Stats

type Stats struct {
	// BytesWritten is the total number of bytes written.
	BytesWritten int64

	// WriteCount is the total number of write operations.
	WriteCount int64

	// FlushCount is the total number of flush operations.
	FlushCount int64

	// ErrorCount is the total number of errors encountered.
	ErrorCount int64

	// BufferOverflows is the number of times the buffer was full.
	BufferOverflows int64

	// AverageWriteTime is the average time per write operation.
	AverageWriteTime time.Duration

	// TotalWriteTime is the total time spent writing.
	TotalWriteTime time.Duration

	// LastWriteTime is the timestamp of the last write operation.
	LastWriteTime time.Time

	// BufferUtilization is the current buffer utilization (0.0 to 1.0).
	BufferUtilization float64
}

Stats holds statistics about async writer performance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL