rojsonv2

package module
v2.0.0-...-a6ee939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

README

JSON Encoding Plugin

The JSON encoding plugin provides operators for marshaling and unmarshaling JSON data in reactive streams.

Installation

go get github.com/samber/ro/plugins/encoding/json

Operators

Marshal

Converts Go structs, maps, and other types to JSON byte slices.

import (
    "github.com/samber/ro"
    rojson "github.com/samber/ro/plugins/encoding/json"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}

observable := ro.Pipe1(
    ro.Just(
        User{ID: 1, Name: "Alice", Age: 30},
        User{ID: 2, Name: "Bob", Age: 25},
        User{ID: 3, Name: "Charlie", Age: 35},
    ),
    rojson.Marshal[User](),
)

subscription := observable.Subscribe(ro.PrintObserver[[]byte]())
defer subscription.Unsubscribe()

// Output:
// Next: {"id":1,"name":"Alice","age":30}
// Next: {"id":2,"name":"Bob","age":25}
// Next: {"id":3,"name":"Charlie","age":35}
// Completed
Unmarshal

Converts JSON byte slices back to Go structs, maps, and other types.

observable := ro.Pipe1(
    ro.Just(
        []byte(`{"id":1,"name":"Alice","age":30}`),
        []byte(`{"id":2,"name":"Bob","age":25}`),
        []byte(`{"id":3,"name":"Charlie","age":35}`),
    ),
    rojson.Unmarshal[User](),
)

subscription := observable.Subscribe(ro.PrintObserver[User]())
defer subscription.Unsubscribe()

// Output:
// Next: {ID:1 Name:Alice Age:30}
// Next: {ID:2 Name:Bob Age:25}
// Next: {ID:3 Name:Charlie Age:35}
// Completed

Working with Maps

You can also work with map[string]interface{} for dynamic JSON structures:

// Marshal maps to JSON
observable := ro.Pipe1(
    ro.Just(
        map[string]interface{}{"name": "Alice", "age": 30},
        map[string]interface{}{"name": "Bob", "age": 25},
        map[string]interface{}{"name": "Charlie", "age": 35},
    ),
    rojson.Marshal[map[string]interface{}](),
)

// Output:
// Next: {"age":30,"name":"Alice"}
// Next: {"age":25,"name":"Bob"}
// Next: {"age":35,"name":"Charlie"}
// Completed

Roundtrip Example

Demonstrate marshal/unmarshal roundtrip:

observable := ro.Pipe2(
    ro.Just(
        User{ID: 1, Name: "Alice", Age: 30},
        User{ID: 2, Name: "Bob", Age: 25},
    ),
    rojson.Marshal[User](),
    rojson.Unmarshal[User](),
)

subscription := observable.Subscribe(ro.PrintObserver[User]())
defer subscription.Unsubscribe()

// Output:
// Next: {ID:1 Name:Alice Age:30}
// Next: {ID:2 Name:Bob Age:25}
// Completed

Error Handling

Both Marshal and Unmarshal operators handle errors gracefully:

Marshal Errors
type Circular struct {
    Data interface{} `json:"data"`
}

circular := Circular{}
circular.Data = circular // Create circular reference

observable := ro.Pipe1(
    ro.Just(circular),
    rojson.Marshal[Circular](),
)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(data []byte) {
            // Handle successful marshaling
        },
        func(err error) {
            // Handle marshaling error (e.g., circular reference)
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()
Unmarshal Errors
observable := ro.Pipe1(
    ro.Just(
        []byte(`{"id":1,"name":"Alice","age":30}`), // Valid JSON
        []byte(`{"id":2,"name":"Bob",`),             // Invalid JSON (truncated)
        []byte(`{"id":3,"name":"Charlie","age":35}`), // Valid JSON
    ),
    rojson.Unmarshal[User](),
)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(user User) {
            // Handle successful unmarshaling
        },
        func(err error) {
            // Handle unmarshaling error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Real-world Example

Here's a practical example that processes user data through JSON transformation:

import (
    "github.com/samber/ro"
    rojson "github.com/samber/ro/plugins/encoding/json"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}

type UserResponse struct {
    UserID   int    `json:"user_id"`
    Username string `json:"username"`
    Age      int    `json:"age"`
}

// Process users through JSON transformation pipeline
pipeline := ro.Pipe4(
    // Start with user data
    ro.Just(
        User{ID: 1, Name: "Alice", Age: 30},
        User{ID: 2, Name: "Bob", Age: 25},
        User{ID: 3, Name: "Charlie", Age: 35},
    ),
    // Marshal to JSON
    rojson.Marshal[User](),
    // Transform JSON structure (could be done with string operations)
    ro.Map(func(data []byte) []byte {
        // In a real scenario, you might transform the JSON structure
        // For this example, we'll just pass through
        return data
    }),
    // Unmarshal to different struct
    rojson.Unmarshal[UserResponse](),
)

subscription := pipeline.Subscribe(ro.PrintObserver[UserResponse]())
defer subscription.Unsubscribe()

// Output:
// Next: {UserID:1 Username:Alice Age:30}
// Next: {UserID:2 Username:Bob Age:25}
// Next: {UserID:3 Username:Charlie Age:35}
// Completed

Performance Considerations

  • The Marshal operator uses json.Marshal which is optimized for performance
  • The Unmarshal operator uses json.Unmarshal with proper error handling
  • Both operators are type-safe and work with Go's generic system
  • Consider using json.RawMessage for intermediate JSON processing if you need to preserve the exact JSON structure

Supported Types

The JSON plugin supports all types that the standard encoding/json package supports:

  • Structs with JSON tags
  • Maps (map[string]interface{}, map[string]string, etc.)
  • Slices and arrays
  • Basic types (int, string, bool, float64, etc.)
  • Pointers to supported types
  • Interfaces (with proper type assertions)

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Marshal

func Marshal[T any]() func(ro.Observable[T]) ro.Observable[[]byte]

Marshal encodes values to JSON format using encoding/json v2. Play: https://go.dev/play/p/4far_YBL4I5

Example
// Marshal structs to JSON
observable := ro.Pipe1(
	ro.Just(
		User{ID: 1, Name: "Alice", Age: 30},
		User{ID: 2, Name: "Bob", Age: 25},
		User{ID: 3, Name: "Charlie", Age: 35},
	),
	Marshal[User](),
)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(data []byte) {
			fmt.Printf("Next: %s\n", string(data))
		},
		func(err error) {
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()
Output:

Next: {"id":1,"name":"Alice","age":30}
Next: {"id":2,"name":"Bob","age":25}
Next: {"id":3,"name":"Charlie","age":35}
Completed
Example (Map)
// Marshal maps to JSON
observable := ro.Pipe1(
	ro.Just(
		map[string]interface{}{"name": "Alice", "age": 30},
		map[string]interface{}{"name": "Bob", "age": 25},
		map[string]interface{}{"name": "Charlie", "age": 35},
	),
	Marshal[map[string]interface{}](),
)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(data []byte) {
			fmt.Printf("Next: %s\n", string(data))
		},
		func(err error) {
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()
Output:

Next: {"age":30,"name":"Alice"}
Next: {"age":25,"name":"Bob"}
Next: {"age":35,"name":"Charlie"}
Completed
Example (WithError)
// Marshal with potential errors (e.g., channels cannot be marshaled)
type InvalidStruct struct {
	Data chan int `json:"data"`
}

invalid := InvalidStruct{
	Data: make(chan int),
}

observable := ro.Pipe1(
	ro.Just(invalid),
	Marshal[InvalidStruct](),
)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(data []byte) {
			// Handle successful marshaling
			fmt.Printf("Next: %s\n", string(data))
		},
		func(err error) {
			// Handle marshaling error
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			// Handle completion
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()
Output:

Error: json: unsupported type: chan int

func Unmarshal

func Unmarshal[T any]() func(ro.Observable[[]byte]) ro.Observable[T]

Unmarshal decodes JSON data to values using encoding/json v2. Play: https://go.dev/play/p/4i6ol-5OVDP

Example
// Unmarshal JSON to structs
observable := ro.Pipe1(
	ro.Just(
		[]byte(`{"id":1,"name":"Alice","age":30}`),
		[]byte(`{"id":2,"name":"Bob","age":25}`),
		[]byte(`{"id":3,"name":"Charlie","age":35}`),
	),
	Unmarshal[User](),
)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(user User) {
			fmt.Printf("Next: {ID:%d Name:%s Age:%d}\n", user.ID, user.Name, user.Age)
		},
		func(err error) {
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()
Output:

Next: {ID:1 Name:Alice Age:30}
Next: {ID:2 Name:Bob Age:25}
Next: {ID:3 Name:Charlie Age:35}
Completed
Example (Map)
// Unmarshal JSON to maps
observable := ro.Pipe1(
	ro.Just(
		[]byte(`{"name":"Alice","age":30}`),
		[]byte(`{"name":"Bob","age":25}`),
		[]byte(`{"name":"Charlie","age":35}`),
	),
	Unmarshal[map[string]interface{}](),
)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(data map[string]interface{}) {
			fmt.Printf("Next: map[age:%v name:%s]\n", data["age"], data["name"])
		},
		func(err error) {
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()
Output:

Next: map[age:30 name:Alice]
Next: map[age:25 name:Bob]
Next: map[age:35 name:Charlie]
Completed
Example (WithError)
// Unmarshal with invalid JSON
observable := ro.Pipe1(
	ro.Just(
		[]byte(`{"id":1,"name":"Alice","age":30}`),   // Valid JSON
		[]byte(`{"id":2,"name":"Bob",`),              // Invalid JSON (truncated)
		[]byte(`{"id":3,"name":"Charlie","age":35}`), // Valid JSON
	),
	Unmarshal[User](),
)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(user User) {
			// Handle successful unmarshaling
			fmt.Printf("Next: {ID:%d Name:%s Age:%d}\n", user.ID, user.Name, user.Age)
		},
		func(err error) {
			// Handle unmarshaling error
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			// Handle completion
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()
Output:

Next: {ID:1 Name:Alice Age:30}
Error: unexpected end of JSON input

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL