rostdio

package module
v0.0.0-...-a6ee939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

README

stdio Plugin

The stdio plugin provides operators for reading from and writing to various input/output streams.

Installation

go get github.com/samber/ro/plugins/stdio

Operators

NewIOReader

Creates an observable that reads data from an io.Reader.

import (
    "strings"
    "github.com/samber/ro"
    rostdio "github.com/samber/ro/plugins/stdio"
)

// Read data from a string reader
data := "Hello, World! This is a test."
reader := strings.NewReader(data)
observable := rostdio.NewIOReader(reader)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()

// Output:
// Next: [72 101 108 108 111 44 32 87 111 114 108 100 33 32 84 104 105 115 32 105 115 32 97 32 116 101 115 116 46]
// Completed
NewIOReaderLine

Creates an observable that reads lines from an io.Reader.

// Read lines from a string reader
data := "Line 1\nLine 2\nLine 3\n"
reader := strings.NewReader(data)
observable := rostdio.NewIOReaderLine(reader)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()

// Output:
// Next: [76 105 110 101 32 49]
// Next: [76 105 110 101 32 50]
// Next: [76 105 110 101 32 51]
// Completed
NewIOWriter

Creates an operator that writes data to an io.Writer and returns the number of bytes written.

import (
    "bytes"
    "github.com/samber/ro"
    rostdio "github.com/samber/ro/plugins/stdio"
)

// Write data to a buffer
var buf bytes.Buffer
writer := &buf

data := ro.Just(
    []byte("Hello, "),
    []byte("World!"),
    []byte(" This is a test."),
)

observable := ro.Pipe1(
    data,
    rostdio.NewIOWriter(writer),
)

subscription := observable.Subscribe(ro.PrintObserver[int]())
defer subscription.Unsubscribe()

// Output:
// Next: 32
// Completed
NewStdReader

Creates an observable that reads from standard input.

observable := rostdio.NewStdReader()

subscription := observable.Subscribe(
    ro.NewObserver(
        func(value []byte) {
            // Handle data from stdin
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()
NewStdReaderLine

Creates an observable that reads lines from standard input.

observable := rostdio.NewStdReaderLine()

subscription := observable.Subscribe(
    ro.NewObserver(
        func(value []byte) {
            // Handle line from stdin
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()
NewPrompt

Creates an observable that prompts the user for input and reads their response.

observable := rostdio.NewPrompt("Enter your name: ")

subscription := observable.Subscribe(
    ro.NewObserver(
        func(value []byte) {
            // Handle user input
            fmt.Printf("You entered: %s\n", string(value))
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()
NewStdWriter

Creates an operator that writes data to standard output.

data := ro.Just(
    []byte("Hello, "),
    []byte("World!"),
    []byte(" This is a test."),
)

observable := ro.Pipe1(
    data,
    rostdio.NewStdWriter(),
)

subscription := observable.Subscribe(ro.PrintObserver[int]())
defer subscription.Unsubscribe()

// Output:
// Next: 32
// Completed

Supported Reader Types

The plugin supports various io.Reader implementations:

String Reader
import "strings"

reader := strings.NewReader("Hello, World!")
observable := rostdio.NewIOReader(reader)
File Reader
import "os"

file, err := os.Open("data.txt")
if err != nil {
    // Handle error
}
defer file.Close()

observable := rostdio.NewIOReader(file)
Buffer Reader
import "bytes"

data := []byte("Hello, World!")
reader := bytes.NewReader(data)
observable := rostdio.NewIOReader(reader)

Supported Writer Types

The plugin supports various io.Writer implementations:

Buffer Writer
import "bytes"

var buf bytes.Buffer
writer := &buf

data := ro.Just([]byte("Hello, World!"))
observable := ro.Pipe1(
    data,
    rostdio.NewIOWriter(writer),
)
File Writer
import "os"

file, err := os.Create("output.txt")
if err != nil {
    // Handle error
}
defer file.Close()

data := ro.Just([]byte("Hello, World!"))
observable := ro.Pipe1(
    data,
    rostdio.NewIOWriter(file),
)

Error Handling

The plugin handles IO errors gracefully:

Reading Errors
reader := strings.NewReader("Hello, World!")
observable := rostdio.NewIOReader(reader)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(value []byte) {
            // Handle successful read
        },
        func(err error) {
            // Handle read error
            // This could be due to:
            // - Network errors
            // - File system errors
            // - Permission errors
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()
Writing Errors
var buf bytes.Buffer
writer := &buf

data := ro.Just([]byte("Hello, World!"))
observable := ro.Pipe1(
    data,
    rostdio.NewIOWriter(writer),
)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(value int) {
            // Handle successful write count
        },
        func(err error) {
            // Handle write error
            // This could be due to:
            // - Disk full
            // - Permission errors
            // - Network errors
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Real-world Example

Here's a practical example that processes a file line by line:

import (
    "bytes"
    "os"
    "strings"
    "github.com/samber/ro"
    rostdio "github.com/samber/ro/plugins/stdio"
)

// Process a file line by line
pipeline := ro.Pipe2(
    // Read lines from file
    rostdio.NewIOReaderLine(strings.NewReader(`Line 1: Hello
Line 2: World
Line 3: Test`)),
    // Transform lines
    ro.Map(func(line []byte) []byte {
        // Convert to uppercase
        return bytes.ToUpper(line)
    }),
)

subscription := pipeline.Subscribe(
    ro.NewObserver(
        func(line []byte) {
            // Process transformed line
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Performance Considerations

  • The plugin uses Go's standard io package for all operations
  • Reading is done in chunks of 1024 bytes by default
  • Line reading uses buffered I/O for efficiency
  • The plugin automatically handles resource cleanup
  • Consider the size of your data when reading/writing large files
  • Use appropriate buffer sizes for your use case
  • The plugin supports streaming for large files
  • Context cancellation properly stops IO operations

Documentation

Index

Examples

Constants

View Source
const IOReaderBufferSize = 1024

Variables

This section is empty.

Functions

func NewIOReader

func NewIOReader(reader io.Reader) ro.Observable[[]byte]

NewIOReader creates an observable that reads bytes from an io.Reader. Play: https://go.dev/play/p/b75Poy3EVYn

Example
// Read data from a string reader
data := "Hello, World! This is a test."
reader := strings.NewReader(data)
observable := NewIOReader(reader)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()
Output:

Next: [72 101 108 108 111 44 32 87 111 114 108 100 33 32 84 104 105 115 32 105 115 32 97 32 116 101 115 116 46]
Completed
Example (WithError)
// Read data with potential errors
reader := strings.NewReader("Hello, World!")
observable := NewIOReader(reader)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()
Output:

Next: [72 101 108 108 111 44 32 87 111 114 108 100 33]
Completed

func NewIOReaderLine

func NewIOReaderLine(reader io.Reader) ro.Observable[[]byte]

NewIOReaderLine creates an observable that reads lines from an io.Reader. Play: https://go.dev/play/p/oMv2jYVSLqd

Example
// Read lines from a string reader
data := "Line 1\nLine 2\nLine 3\n"
reader := strings.NewReader(data)
observable := NewIOReaderLine(reader)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()
Output:

Next: [76 105 110 101 32 49]
Next: [76 105 110 101 32 50]
Next: [76 105 110 101 32 51]
Completed
Example (WithLargeFile)
// Read lines from a large text
data := `Line 1: This is the first line
Line 2: This is the second line
Line 3: This is the third line
Line 4: This is the fourth line
Line 5: This is the fifth line`

reader := strings.NewReader(data)
observable := NewIOReaderLine(reader)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()
Output:

Next: [76 105 110 101 32 49 58 32 84 104 105 115 32 105 115 32 116 104 101 32 102 105 114 115 116 32 108 105 110 101]
Next: [76 105 110 101 32 50 58 32 84 104 105 115 32 105 115 32 116 104 101 32 115 101 99 111 110 100 32 108 105 110 101]
Next: [76 105 110 101 32 51 58 32 84 104 105 115 32 105 115 32 116 104 101 32 116 104 105 114 100 32 108 105 110 101]
Next: [76 105 110 101 32 52 58 32 84 104 105 115 32 105 115 32 116 104 101 32 102 111 117 114 116 104 32 108 105 110 101]
Next: [76 105 110 101 32 53 58 32 84 104 105 115 32 105 115 32 116 104 101 32 102 105 102 116 104 32 108 105 110 101]
Completed

func NewIOWriter

func NewIOWriter(writer io.Writer) func(ro.Observable[[]byte]) ro.Observable[int]

NewIOWriter creates a sink that writes byte slices to an io.Writer and emits the total bytes written. Play: https://go.dev/play/p/XoLdEcsmKxU

Example
// Write data to a buffer
var buf bytes.Buffer
writer := &buf

data := ro.Just(
	[]byte("Hello, "),
	[]byte("World!"),
)

observable := ro.Pipe1(
	data,
	NewIOWriter(writer),
)

subscription := observable.Subscribe(ro.PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 13
Completed
Example (WithError)
// Write data with potential errors
var buf bytes.Buffer
writer := &buf

data := ro.Just(
	[]byte("Hello, "),
	[]byte("World!"),
)

observable := ro.Pipe1(
	data,
	NewIOWriter(writer),
)

subscription := observable.Subscribe(ro.PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 13
Completed

func NewPrompt

func NewPrompt(prompt string) ro.Observable[[]byte]

NewPrompt creates an observable that reads user input after displaying a prompt.

func NewStdReader

func NewStdReader() ro.Observable[[]byte]

NewStdReader creates an observable that reads bytes from standard input.

Example
// Read from standard input
// Simulate stdin input by temporarily redirecting stdin
originalStdin := os.Stdin
defer func() { os.Stdin = originalStdin }()

// Create a pipe to simulate stdin
r, w, _ := os.Pipe()
os.Stdin = r

// Write test data to stdin
go func() {
	w.WriteString("Hello from stdin!")
	w.Close()
}()

observable := NewStdReader()

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()
Output:

Next: [72 101 108 108 111 32 102 114 111 109 32 115 116 100 105 110 33]
Completed

func NewStdReaderLine

func NewStdReaderLine() ro.Observable[[]byte]

NewStdReaderLine creates an observable that reads lines from standard input.

Example
// Read lines from standard input
// Simulate stdin input by temporarily redirecting stdin
originalStdin := os.Stdin
defer func() { os.Stdin = originalStdin }()

// Create a pipe to simulate stdin
r, w, _ := os.Pipe()
os.Stdin = r

// Write test data to stdin
go func() {
	w.WriteString("Line 1\nLine 2\nLine 3\n")
	w.Close()
}()

observable := NewStdReaderLine()

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()
Output:

Next: [76 105 110 101 32 49]
Next: [76 105 110 101 32 50]
Next: [76 105 110 101 32 51]
Completed

func NewStdWriter

func NewStdWriter() func(ro.Observable[[]byte]) ro.Observable[int]

NewStdWriter creates a sink that writes byte slices to standard output and emits the total bytes written. Play: https://go.dev/play/p/9GjhDJIAs7z

Example
// Write data to standard output
// For this example, we'll use a buffer to simulate stdout
var buf bytes.Buffer

data := ro.Just(
	[]byte("Hello, "),
	[]byte("World!"),
)

observable := ro.Pipe1(
	data,
	NewIOWriter(&buf),
)

subscription := observable.Subscribe(ro.PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 13
Completed

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL