Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Marshal ¶
func Marshal[T any]() func(ro.Observable[T]) ro.Observable[[]byte]
Marshal encodes values to JSON format using encoding/json v2. Play: https://go.dev/play/p/4far_YBL4I5
Example ¶
// Marshal structs to JSON
observable := ro.Pipe1(
ro.Just(
User{ID: 1, Name: "Alice", Age: 30},
User{ID: 2, Name: "Bob", Age: 25},
User{ID: 3, Name: "Charlie", Age: 35},
),
Marshal[User](),
)
subscription := observable.Subscribe(
ro.NewObserver(
func(data []byte) {
fmt.Printf("Next: %s\n", string(data))
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
Output: Next: {"id":1,"name":"Alice","age":30} Next: {"id":2,"name":"Bob","age":25} Next: {"id":3,"name":"Charlie","age":35} Completed
Example (Map) ¶
// Marshal maps to JSON
observable := ro.Pipe1(
ro.Just(
map[string]interface{}{"name": "Alice", "age": 30},
map[string]interface{}{"name": "Bob", "age": 25},
map[string]interface{}{"name": "Charlie", "age": 35},
),
Marshal[map[string]interface{}](),
)
subscription := observable.Subscribe(
ro.NewObserver(
func(data []byte) {
fmt.Printf("Next: %s\n", string(data))
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
Output: Next: {"age":30,"name":"Alice"} Next: {"age":25,"name":"Bob"} Next: {"age":35,"name":"Charlie"} Completed
Example (WithError) ¶
// Marshal with potential errors (e.g., channels cannot be marshaled)
type InvalidStruct struct {
Data chan int `json:"data"`
}
invalid := InvalidStruct{
Data: make(chan int),
}
observable := ro.Pipe1(
ro.Just(invalid),
Marshal[InvalidStruct](),
)
subscription := observable.Subscribe(
ro.NewObserver(
func(data []byte) {
// Handle successful marshaling
fmt.Printf("Next: %s\n", string(data))
},
func(err error) {
// Handle marshaling error
fmt.Printf("Error: %s\n", err.Error())
},
func() {
// Handle completion
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
Output: Error: json: unsupported type: chan int
func Unmarshal ¶
func Unmarshal[T any]() func(ro.Observable[[]byte]) ro.Observable[T]
Unmarshal decodes JSON data to values using encoding/json v2. Play: https://go.dev/play/p/4i6ol-5OVDP
Example ¶
// Unmarshal JSON to structs
observable := ro.Pipe1(
ro.Just(
[]byte(`{"id":1,"name":"Alice","age":30}`),
[]byte(`{"id":2,"name":"Bob","age":25}`),
[]byte(`{"id":3,"name":"Charlie","age":35}`),
),
Unmarshal[User](),
)
subscription := observable.Subscribe(
ro.NewObserver(
func(user User) {
fmt.Printf("Next: {ID:%d Name:%s Age:%d}\n", user.ID, user.Name, user.Age)
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
Output: Next: {ID:1 Name:Alice Age:30} Next: {ID:2 Name:Bob Age:25} Next: {ID:3 Name:Charlie Age:35} Completed
Example (Map) ¶
// Unmarshal JSON to maps
observable := ro.Pipe1(
ro.Just(
[]byte(`{"name":"Alice","age":30}`),
[]byte(`{"name":"Bob","age":25}`),
[]byte(`{"name":"Charlie","age":35}`),
),
Unmarshal[map[string]interface{}](),
)
subscription := observable.Subscribe(
ro.NewObserver(
func(data map[string]interface{}) {
fmt.Printf("Next: map[age:%v name:%s]\n", data["age"], data["name"])
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
Output: Next: map[age:30 name:Alice] Next: map[age:25 name:Bob] Next: map[age:35 name:Charlie] Completed
Example (WithError) ¶
// Unmarshal with invalid JSON
observable := ro.Pipe1(
ro.Just(
[]byte(`{"id":1,"name":"Alice","age":30}`), // Valid JSON
[]byte(`{"id":2,"name":"Bob",`), // Invalid JSON (truncated)
[]byte(`{"id":3,"name":"Charlie","age":35}`), // Valid JSON
),
Unmarshal[User](),
)
subscription := observable.Subscribe(
ro.NewObserver(
func(user User) {
// Handle successful unmarshaling
fmt.Printf("Next: {ID:%d Name:%s Age:%d}\n", user.ID, user.Name, user.Age)
},
func(err error) {
// Handle unmarshaling error
fmt.Printf("Error: %s\n", err.Error())
},
func() {
// Handle completion
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
Output: Next: {ID:1 Name:Alice Age:30} Error: unexpected end of JSON input
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.