rohttpclient

package module
v0.0.0-...-a6ee939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

README

HTTP Plugin

The HTTP plugin provides operators for making HTTP requests in reactive streams.

Installation

go get github.com/samber/ro/plugins/http

Operators

HTTPRequest

Sends HTTP requests and returns responses as an observable stream.

import (
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

// Create HTTP request
req, _ := http.NewRequest("GET", "https://api.example.com/users", nil)

// Send HTTP request
observable := rohttp.HTTPRequest(req, nil)

subscription := observable.Subscribe(ro.PrintObserver[*http.Response]())
defer subscription.Unsubscribe()

// Output:
// Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
// Completed
HTTPRequestJSON

Sends HTTP requests and automatically parses JSON responses into the specified type. This is a convenience operator that combines HTTPRequest with JSON parsing.

import (
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Email string `json:"email"`
}

// Create HTTP request
req, _ := http.NewRequest("GET", "https://api.example.com/users/1", nil)

// Send HTTP request and parse JSON response
observable := rohttp.HTTPRequestJSON[User](req, nil)

subscription := observable.Subscribe(ro.PrintObserver[User]())
defer subscription.Unsubscribe()

// Output:
// Next: {ID:1 Name:John Doe Email:john@example.com}
// Completed

Basic Usage

Simple GET Request
import (
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

// Create a simple GET request
req, _ := http.NewRequest("GET", "https://jsonplaceholder.typicode.com/posts/1", nil)

// Send the request
observable := rohttp.HTTPRequest(req, nil)

subscription := observable.Subscribe(ro.PrintObserver[*http.Response]())
defer subscription.Unsubscribe()
Request with Headers
// Create request with custom headers
req, _ := http.NewRequest("GET", "https://api.example.com/data", nil)
req.Header.Set("User-Agent", "ro-http-client/1.0")
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", "Bearer your-token")

observable := rohttp.HTTPRequest(req, nil)

subscription := observable.Subscribe(ro.PrintObserver[*http.Response]())
defer subscription.Unsubscribe()
Custom HTTP Client
import (
    "net/http"
    "time"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

// Create custom HTTP client with timeout
client := &http.Client{
    Timeout: 10 * time.Second,
}

req, _ := http.NewRequest("GET", "https://api.example.com/data", nil)
observable := rohttp.HTTPRequest(req, client)

subscription := observable.Subscribe(ro.PrintObserver[*http.Response]())
defer subscription.Unsubscribe()

Advanced Usage

Processing Responses
import (
    "fmt"
    "io"
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

// Send request and process response
observable := ro.Pipe1(
    rohttp.HTTPRequest(req, nil),
    ro.Map(func(resp *http.Response) string {
        // Read response body
        body, _ := io.ReadAll(resp.Body)
        defer resp.Body.Close()
        
        return fmt.Sprintf("Status: %s, Body: %s", resp.Status, string(body))
    }),
)

subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()

// Output:
// Next: Status: 200 OK, Body: {"id": 1, "title": "..."}
// Completed
Multiple Concurrent Requests
import (
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

// Create multiple requests
urls := []string{
    "https://api.example.com/users/1",
    "https://api.example.com/users/2",
    "https://api.example.com/users/3",
}

// Convert URLs to requests
requests := make([]*http.Request, len(urls))
for i, url := range urls {
    req, _ := http.NewRequest("GET", url, nil)
    requests[i] = req
}

// Process multiple requests concurrently
observable := ro.Pipe1(
    ro.FromSlice(requests),
    ro.MergeMap(func(req *http.Request) ro.Observable[*http.Response] {
        return rohttp.HTTPRequest(req, nil)
    }),
)

subscription := observable.Subscribe(ro.PrintObserver[*http.Response]())
defer subscription.Unsubscribe()

// Output:
// Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
// Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
// Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
// Completed
Error Handling
import (
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

req, _ := http.NewRequest("GET", "https://invalid-url-that-will-fail.com", nil)

observable := rohttp.HTTPRequest(req, nil)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(resp *http.Response) {
            // Handle successful response
            // Note: HTTP status >= 400 is not considered an error by this operator
            if resp.StatusCode >= 400 {
                // Handle HTTP error status manually
            }
        },
        func(err error) {
            // Handle network errors, timeouts, etc.
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()
Context and Timeout
import (
    "context"
    "net/http"
    "time"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

// Create request with timeout context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

req, _ := http.NewRequest("GET", "https://api.example.com/slow-endpoint", nil)
req = req.WithContext(ctx)

observable := rohttp.HTTPRequest(req, nil)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(resp *http.Response) {
            // Handle successful response
        },
        func(err error) {
            // Handle timeout or other errors
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Real-world Example

Here's a practical example that fetches user data from an API and processes it:

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
    rojson "github.com/samber/ro/plugins/encoding/json"
)

type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Email string `json:"email"`
}

// Create a pipeline that fetches and processes user data
pipeline := ro.Pipe4(
    // Create requests for multiple users
    ro.Just(1, 2, 3),
    ro.Map(func(id int) *http.Request {
        req, _ := http.NewRequest("GET", fmt.Sprintf("https://jsonplaceholder.typicode.com/users/%d", id), nil)
        return req
    }),
    // Send HTTP requests
    ro.MergeMap(func(req *http.Request) ro.Observable[*http.Response] {
        return rohttp.HTTPRequest(req, nil)
    }),
    // Process responses
    ro.Map(func(resp *http.Response) User {
        body, _ := io.ReadAll(resp.Body)
        defer resp.Body.Close()
        
        var user User
        json.Unmarshal(body, &user)
        return user
    }),
)

// Add timeout context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

subscription := pipeline.SubscribeWithContext(ctx, ro.PrintObserver[User]())
defer subscription.Unsubscribe()

// Output:
// Next: {ID:1 Name:Leanne Graham Email:Sincere@april.biz}
// Next: {ID:2 Name:Ervin Howell Email:Shanna@melissa.tv}
// Next: {ID:3 Name:Clementine Bauch Email:Nathan@yesenia.net}
// Completed

HTTPRequestJSON Usage Examples

Simple JSON String
import (
    "net/http"
    "github.com/samber/ro"
    rohttp "github.com/samber/ro/plugins/http"
)

req, _ := http.NewRequest("GET", "https://api.example.com/message", nil)

observable := rohttp.HTTPRequestJSON[string](req, nil)

subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()
JSON Array
req, _ := http.NewRequest("GET", "https://api.example.com/tags", nil)

observable := rohttp.HTTPRequestJSON[[]string](req, nil)

subscription := observable.Subscribe(ro.PrintObserver[[]string]())
defer subscription.Unsubscribe()
Complex JSON with Error Handling
type APIResponse struct {
    Success bool   `json:"success"`
    Data    []User `json:"data"`
    Error   string `json:"error,omitempty"`
}

req, _ := http.NewRequest("GET", "https://api.example.com/users", nil)

observable := rohttp.HTTPRequestJSON[APIResponse](req, nil)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(response APIResponse) {
            if response.Success {
                fmt.Printf("Got %d users\n", len(response.Data))
            } else {
                fmt.Printf("API Error: %s\n", response.Error)
            }
        },
        func(err error) {
            fmt.Printf("Network or JSON error: %s\n", err)
        },
        func() {
            fmt.Println("Request completed")
        },
    ),
)
defer subscription.Unsubscribe()
HTTPRequestJSON with Processing Pipeline
type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

pipeline := ro.Pipe2(
    // Fetch multiple users concurrently
    ro.FromSlice([]int{1, 2, 3}),
    ro.MergeMap(func(userID int) ro.Observable[User] {
        req, _ := http.NewRequest("GET", fmt.Sprintf("https://jsonplaceholder.typicode.com/users/%d", userID), nil)
        return rohttp.HTTPRequestJSON[User](req, nil)
    }),
)

subscription := pipeline.Subscribe(ro.PrintObserver[User]())
defer subscription.Unsubscribe()

Important Notes

  1. HTTP Status Codes: HTTP status codes >= 400 are not considered errors by these operators. You need to handle HTTP errors manually if needed.

  2. Response Body: For HTTPRequest, remember to call resp.Body.Close() when you're done with the response to avoid resource leaks. HTTPRequestJSON handles this automatically.

  3. JSON Parsing: HTTPRequestJSON automatically closes the response body and will emit an error if the JSON cannot be parsed.

  4. Context: Use request context for timeouts and cancellation.

  5. Concurrency: The operators are designed for concurrent use and are thread-safe.

  6. Error Handling: Network errors, timeouts, and other transport errors will be emitted as error notifications.

Performance Considerations

  • The operator uses Go's standard http.Client for requests
  • Requests are executed asynchronously
  • Use MergeMap for concurrent requests
  • Consider connection pooling for high-throughput scenarios
  • Use appropriate timeouts to prevent hanging requests

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func HTTPRequest

func HTTPRequest(req *http.Request, client *http.Client) ro.Observable[*http.Response]

HTTPRequest sends a http request and returns the response. It's a pull-based operator.

A http status code >= 400 is not considered an error.

Don't forget to call resp.Body.Close() when you're done with the response.

Example (Basic)
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"message": "Hello, World!"}`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request
observable := HTTPRequest(req, nil)

subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
Completed
Example (ErrorHandling)
// Create a test server that returns error status
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusInternalServerError)
	w.Write([]byte("Server error"))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request
observable := HTTPRequest(req, nil)

subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: &http.Response{Status: "500 Internal Server Error", StatusCode: 500, ...}
Completed
Example (MultipleRequests)
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Response"))
}))
defer server.Close()

// Create multiple requests
requests := []*http.Request{
	func() *http.Request { req, _ := http.NewRequest("GET", server.URL, nil); return req }(),
	func() *http.Request { req, _ := http.NewRequest("GET", server.URL, nil); return req }(),
	func() *http.Request { req, _ := http.NewRequest("GET", server.URL, nil); return req }(),
}

// Process multiple requests
observable := ro.Pipe1(
	ro.FromSlice(requests),
	ro.MergeMap(func(req *http.Request) ro.Observable[*http.Response] {
		return HTTPRequest(req, nil)
	}),
)

subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()

// Wait for async operations to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
Completed
Example (ProcessingResponse)
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"status": "success", "data": "test"}`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request and process response
observable := ro.Pipe1(
	HTTPRequest(req, nil),
	ro.Map(func(resp *http.Response) string {
		// Read response body (in real code, you'd handle errors)
		// For this example, we'll just return status
		return fmt.Sprintf("Status: %s", resp.Status)
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: Status: 200 OK
Completed
Example (WithCustomClient)
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Custom client response"))
}))
defer server.Close()

// Create custom HTTP client with timeout
client := &http.Client{
	Timeout: 10 * time.Second,
}

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request with custom client
observable := HTTPRequest(req, client)

subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
Completed
Example (WithHeaders)
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	// Echo back the User-Agent header
	userAgent := r.Header.Get("User-Agent")
	w.Header().Set("Content-Type", "text/plain")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(fmt.Sprintf("User-Agent: %s", userAgent)))
}))
defer server.Close()

// Create HTTP request with custom headers
req, _ := http.NewRequest("GET", server.URL, nil)
req.Header.Set("User-Agent", "ro-http-client/1.0")
req.Header.Set("Accept", "application/json")

// Send HTTP request
observable := HTTPRequest(req, nil)

subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: &http.Response{Status: "200 OK", StatusCode: 200, ...}
Completed
Example (WithTimeout)
// Create a test server that delays response
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	time.Sleep(100 * time.Millisecond) // Simulate slow response
	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Delayed response"))
}))
defer server.Close()

// Create HTTP request with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

req, _ := http.NewRequest("GET", server.URL, nil)
req = req.WithContext(ctx)

// Send HTTP request
observable := HTTPRequest(req, nil)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(resp *http.Response) {
			// Handle successful response
		},
		func(err error) {
			// Handle timeout or other errors
			fmt.Printf("Error due to timeout\n")
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Error due to timeout

func HTTPRequestJSON

func HTTPRequestJSON[T any](req *http.Request, client *http.Client) ro.Observable[T]
Example (BasicString)
// Create a test server that returns JSON string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`"Hello, World!"`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request and parse JSON response
observable := HTTPRequestJSON[string](req, nil)

subscription := observable.Subscribe(customJSONObserver[string]())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: Hello, World!
Completed
Example (ProcessingPipeline)
// Define response structs
type User struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"id": 42, "name": "Alice"}`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Create processing pipeline
observable := ro.Pipe1(
	HTTPRequestJSON[User](req, nil),
	ro.Map(func(user User) string {
		return fmt.Sprintf("User %s has ID %d", user.Name, user.ID)
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: User Alice has ID 42
Completed
Example (Slice)
// Create a test server that returns JSON array
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`["apple", "banana", "cherry"]`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request and parse JSON response into slice
observable := HTTPRequestJSON[[]string](req, nil)

subscription := observable.Subscribe(customJSONObserver[[]string]())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: [apple banana cherry]
Completed
Example (Struct)
// Define a struct for the JSON response
type User struct {
	ID    int    `json:"id"`
	Name  string `json:"name"`
	Email string `json:"email"`
}

// Create a test server that returns JSON object
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"id": 1, "name": "John Doe", "email": "john@example.com"}`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request and parse JSON response into struct
observable := HTTPRequestJSON[User](req, nil)

subscription := observable.Subscribe(customJSONObserver[User]())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: {ID:1 Name:John Doe Email:john@example.com}
Completed
Example (WithCustomClient)
// Define a response struct
type APIResponse struct {
	Status  string `json:"status"`
	Message string `json:"message"`
}

// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"status": "success", "message": "Custom client used"}`))
}))
defer server.Close()

// Create custom HTTP client with timeout
client := &http.Client{
	Timeout: 10 * time.Second,
}

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request with custom client and parse JSON
observable := HTTPRequestJSON[APIResponse](req, client)

subscription := observable.Subscribe(customJSONObserver[APIResponse]())
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

Next: {Status:success Message:Custom client used}
Completed
Example (WithErrorHandling)
// Create a test server that returns invalid JSON
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`invalid json content`))
}))
defer server.Close()

// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)

// Send HTTP request and parse JSON response
observable := HTTPRequestJSON[string](req, nil)

subscription := observable.Subscribe(
	ro.NewObserver(
		func(value string) {
			fmt.Printf("Next: %s\n", value)
		},
		func(err error) {
			fmt.Printf("JSON parsing error occurred\n")
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)
defer subscription.Unsubscribe()

// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output:

JSON parsing error occurred

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL