Documentation
¶
Overview ¶
Package writer provides asynchronous buffered writing for Go applications.
AsyncWriter buffers data in memory and writes to the underlying writer in the background, improving performance for I/O-bound workloads.
Quick Start ¶
file, _ := os.Create("output.txt")
w := writer.New(file)
defer w.Close()
w.WriteString("Hello, async world!")
w.Flush(context.Background())
Configuration ¶
Configure buffer size, flush intervals, and error handling:
config := writer.Config{
BufferSize: 64 * 1024, // 64KB buffer
FlushInterval: time.Second, // Auto-flush every second
BlockOnFull: true, // Block when buffer full
MaxRetries: 3, // Retry failed writes
}
w := writer.NewWithConfig(underlyingWriter, config)
Writing Data ¶
// Write bytes
w.Write([]byte("data"))
// Write string
w.WriteString("text")
// Write with context (for cancellation/timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
w.WriteContext(ctx, []byte("data"))
Backpressure Handling ¶
When the buffer fills up, you can choose to block or drop writes:
config := writer.Config{
BlockOnFull: true, // Block until space available
}
// Or
config := writer.Config{
BlockOnFull: false, // Drop writes when full
OnBufferFull: func() {
log.Println("Buffer full, dropping write")
},
}
Monitoring ¶
Track performance with callbacks:
config := writer.Config{
OnFlush: func(bytes int, duration time.Duration) {
log.Printf("Flushed %d bytes in %v", bytes, duration)
},
OnError: func(err error) {
log.Printf("Write error: %v", err)
},
}
Statistics ¶
Get current buffer stats:
stats := w.Stats()
fmt.Printf("Written: %d bytes, Flushed: %d bytes, Pending: %d bytes\n",
stats.BytesWritten, stats.BytesFlushed, stats.BytesPending)
Graceful Shutdown ¶
Always close the writer to flush remaining data:
defer w.Close() // Flushes and waits for completion // Or with timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() w.FlushAndClose(ctx)
Use Cases ¶
High-Volume Logging:
logWriter := writer.New(logFile)
defer logWriter.Close()
for _, entry := range logEntries {
logWriter.WriteString(entry + "\n")
}
Network Streaming:
conn, _ := net.Dial("tcp", "server:8080")
w := writer.NewWithConfig(conn, writer.Config{
BufferSize: 8 * 1024,
FlushInterval: 100 * time.Millisecond,
})
for data := range dataChannel {
w.Write(data)
}
File Writing with Error Handling:
file, _ := os.Create("output.dat")
w := writer.NewWithConfig(file, writer.Config{
MaxRetries: 5,
OnError: func(err error) {
log.Printf("Write failed: %v", err)
},
})
w.Write(largeData)
w.Flush(context.Background())
Thread Safety ¶
AsyncWriter is safe for concurrent use from multiple goroutines.
Performance Notes ¶
- Larger buffers reduce syscalls but increase memory usage - Shorter flush intervals reduce data loss risk but increase overhead - Non-blocking mode improves write latency but may drop data - Retries help with transient errors but can hide persistent issues
See example tests for more usage patterns.
Example ¶
Example demonstrates basic async writer usage.
// Create an underlying writer (could be file, network, etc.)
var buf bytes.Buffer
// Create async writer with default configuration
writer := New(&buf)
defer func() { _ = writer.Close() }()
// Write data asynchronously - returns immediately
_ = writer.WriteString("Hello, ")
_ = writer.WriteString("async ")
_ = writer.WriteString("world!")
// Flush to ensure all data is written
_ = writer.Flush(context.Background())
fmt.Println(buf.String())
Output: Hello, async world!
Example (AutoFlush) ¶
Example_autoFlush demonstrates automatic flushing.
var buf bytes.Buffer
config := DefaultConfig()
config.FlushInterval = 50 * time.Millisecond // Auto-flush every 50ms
writer := NewWithConfig(&buf, config)
// Write data
_ = writer.WriteString("auto-flush example")
// Wait for auto-flush
time.Sleep(100 * time.Millisecond)
// Ensure all background processing is complete
_ = writer.Flush(context.Background())
_ = writer.Close()
// Data should be automatically written
fmt.Printf("Auto-flushed data: %s\n", buf.String())
Output: Auto-flushed data: auto-flush example
Example (Callbacks) ¶
Example_callbacks demonstrates using callbacks for monitoring.
var buf bytes.Buffer
config := DefaultConfig()
config.OnFlush = func(bytes int, _ time.Duration) {
fmt.Printf("Flushed %d bytes\n", bytes)
}
config.OnBufferFull = func() {
fmt.Println("Buffer is full!")
}
config.OnError = func(err error) {
fmt.Printf("Write error: %v\n", err)
}
writer := NewWithConfig(&buf, config)
defer func() { _ = writer.Close() }()
// Write some data
_ = writer.WriteString("Hello with callbacks")
// Flush to trigger callback
_ = writer.Flush(context.Background())
Output: Flushed 20 bytes
Example (Concurrent) ¶
Example_concurrent demonstrates concurrent writes from multiple goroutines.
var buf bytes.Buffer
writer := New(&buf)
defer func() { _ = writer.Close() }()
var wg sync.WaitGroup
numGoroutines := 3
writesPerGoroutine := 3
// Start multiple goroutines writing concurrently
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < writesPerGoroutine; j++ {
_ = writer.WriteString(fmt.Sprintf("G%d-W%d ", id, j))
}
}(i)
}
wg.Wait()
// Flush all writes
_ = writer.Flush(context.Background())
result := buf.String()
// Count the writes (should be 9 total)
writeCount := len(strings.Fields(result))
fmt.Printf("Total writes: %d\n", writeCount)
// Check stats
stats := writer.Stats()
fmt.Printf("Recorded writes: %d\n", stats.WriteCount)
Output: Total writes: 9 Recorded writes: 9
Example (Configuration) ¶
Example_configuration demonstrates various configuration options.
var buf bytes.Buffer
// Custom configuration
config := Config{
BufferSize: 1024, // 1KB buffer
FlushInterval: 100 * time.Millisecond, // Auto-flush every 100ms
BlockOnFull: true, // Block when buffer is full
MaxRetries: 5, // Retry failed writes
RetryDelay: 50 * time.Millisecond, // Delay between retries
}
writer := NewWithConfig(&buf, config)
// Write data
_ = writer.WriteString("Configured writer example")
// Let auto-flush handle the writing
time.Sleep(150 * time.Millisecond)
// Ensure all background processing is complete
_ = writer.Flush(context.Background())
_ = writer.Close()
fmt.Println(buf.String())
Output: Configured writer example
Example (ContextCancellation) ¶
Example_contextCancellation demonstrates context-aware operations.
var buf bytes.Buffer
writer := New(&buf)
defer func() { _ = writer.Close() }()
// Write some data
_ = writer.WriteString("context test")
// Create a context that will timeout quickly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
// This flush will likely be canceled
err := writer.Flush(ctx)
if err == context.DeadlineExceeded {
fmt.Println("Flush was canceled due to timeout")
} else {
fmt.Println("Flush completed")
}
// Try again with proper context
err = writer.Flush(context.Background())
if err == nil {
fmt.Println("Second flush succeeded")
fmt.Printf("Data: %s\n", buf.String())
}
Output: Flush was canceled due to timeout Second flush succeeded Data: context test
Example (ErrorHandling) ¶
Example_errorHandling demonstrates error handling and retries.
// Create a writer that will fail initially
failingWriter := &failingWriter{failCount: 2}
config := DefaultConfig()
config.MaxRetries = 3
config.RetryDelay = 1 * time.Millisecond
writer := NewWithConfig(failingWriter, config)
defer func() { _ = writer.Close() }()
// Write data (will succeed after retries)
_ = writer.WriteString("persistent data")
err := writer.Flush(context.Background())
if err != nil {
fmt.Printf("Final error: %v\n", err)
} else {
fmt.Println("Write succeeded after retries")
fmt.Printf("Final data: %s\n", failingWriter.buf.String())
}
Output: Write succeeded after retries Final data: persistent data
Example (FileWriting) ¶
Example_fileWriting demonstrates writing to a file asynchronously.
// Create a temporary file
file, err := os.CreateTemp("", "async_writer_example_*.txt")
if err != nil {
log.Fatal(err)
}
defer func() { _ = os.Remove(file.Name()) }() // Clean up
// Create async writer for the file
writer := New(file)
defer func() { _ = writer.Close() }()
// Write data asynchronously
for i := 0; i < 3; i++ {
_ = writer.WriteString(fmt.Sprintf("Line %d\n", i+1))
}
// Flush to ensure data is written
_ = writer.Flush(context.Background())
// Close the file
_ = file.Close()
// Read back and display
content, err := os.ReadFile(file.Name())
if err != nil {
log.Fatal(err)
}
fmt.Print(string(content))
Output: Line 1 Line 2 Line 3
Example (NonBlocking) ¶
Example_nonBlocking demonstrates non-blocking behavior.
var buf bytes.Buffer
config := DefaultConfig()
config.BlockOnFull = false // Don't block on full buffer
config.BufferSize = 20 // Small buffer for demo
writer := NewWithConfig(&buf, config)
// Write data that fits in buffer
err := writer.WriteString("fits")
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Println("Small write succeeded")
}
// Try to write large data that exceeds buffer
largeData := strings.Repeat("x", 30)
err = writer.WriteString(largeData)
if err == ErrBufferFull {
fmt.Println("Large write failed: buffer full")
}
// Flush the successful write
_ = writer.Flush(context.Background())
_ = writer.Close()
fmt.Printf("Buffer contains: %s\n", buf.String())
Output: Small write succeeded Large write failed: buffer full Buffer contains: fits
Example (Statistics) ¶
Example_statistics demonstrates monitoring writer performance.
var buf bytes.Buffer
writer := New(&buf)
defer func() { _ = writer.Close() }()
// Perform some writes
for i := 0; i < 5; i++ {
_ = writer.WriteString(fmt.Sprintf("Write %d\n", i))
}
// Flush and get stats
_ = writer.Flush(context.Background())
stats := writer.Stats()
fmt.Printf("Writes: %d\n", stats.WriteCount)
fmt.Printf("Bytes: %d\n", stats.BytesWritten)
fmt.Printf("Flushes: %d\n", stats.FlushCount)
fmt.Printf("Errors: %d\n", stats.ErrorCount)
fmt.Printf("Buffer utilization: %.1f%%\n", stats.BufferUtilization*100)
Output: Writes: 5 Bytes: 40 Flushes: 1 Errors: 0 Buffer utilization: 0.0%
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrBufferFull = errors.New("buffer is full")
ErrBufferFull is returned when the internal buffer is full and cannot accept more data.
var ErrWriterClosed = errors.New("writer is closed")
ErrWriterClosed is returned when attempting to write to a closed writer.
Functions ¶
This section is empty.
Types ¶
type AsyncWriter ¶
type AsyncWriter interface {
// Write writes data asynchronously. Returns immediately without blocking.
// If the buffer is full and blocking is disabled, returns ErrBufferFull.
Write(data []byte) error
// WriteString writes a string asynchronously.
WriteString(s string) error
// WriteContext writes data with context support for cancellation.
WriteContext(ctx context.Context, data []byte) error
// Flush forces all buffered data to be written to the underlying writer.
// This operation blocks until all data is flushed or context is canceled.
Flush(ctx context.Context) error
// Close gracefully shuts down the writer, flushing any remaining data.
// After Close returns, no more writes are accepted.
Close() error
// Stats returns statistics about the writer's performance.
Stats() Stats
// IsClosed returns true if the writer is closed.
IsClosed() bool
// BufferSize returns the current number of buffered bytes.
BufferSize() int
// BufferCapacity returns the maximum buffer capacity.
BufferCapacity() int
}
AsyncWriter provides asynchronous, buffered writing capabilities. It buffers write operations in memory and flushes them to the underlying writer in a background goroutine, providing non-blocking write operations.
func New ¶
func New(w io.Writer) AsyncWriter
New creates a new AsyncWriter with default configuration.
func NewWithConfig ¶
func NewWithConfig(w io.Writer, config Config) AsyncWriter
NewWithConfig creates a new AsyncWriter with the specified configuration.
type Config ¶
type Config struct {
// BufferSize is the size of the internal buffer in bytes.
// Default: 64KB
BufferSize int
// FlushInterval is how often to flush the buffer automatically.
// Set to 0 to disable automatic flushing.
// Default: 1 second
FlushInterval time.Duration
// BlockOnFull determines behavior when buffer is full.
// If true, Write operations will block until space is available.
// If false, Write operations will return ErrBufferFull immediately.
// Default: true
BlockOnFull bool
// MaxRetries is the number of times to retry failed write operations.
// Default: 3
MaxRetries int
// RetryDelay is the delay between retries.
// Default: 100ms
RetryDelay time.Duration
// OnError is called when write errors occur.
OnError func(error)
// OnFlush is called after each flush operation.
OnFlush func(bytesWritten int, duration time.Duration)
// OnBufferFull is called when the buffer becomes full.
OnBufferFull func()
}
Config holds configuration options for AsyncWriter.
type Stats ¶
type Stats struct {
// BytesWritten is the total number of bytes written.
BytesWritten int64
// WriteCount is the total number of write operations.
WriteCount int64
// FlushCount is the total number of flush operations.
FlushCount int64
// ErrorCount is the total number of errors encountered.
ErrorCount int64
// BufferOverflows is the number of times the buffer was full.
BufferOverflows int64
// AverageWriteTime is the average time per write operation.
AverageWriteTime time.Duration
// TotalWriteTime is the total time spent writing.
TotalWriteTime time.Duration
// LastWriteTime is the timestamp of the last write operation.
LastWriteTime time.Time
// BufferUtilization is the current buffer utilization (0.0 to 1.0).
BufferUtilization float64
}
Stats holds statistics about async writer performance.