Documentation
¶
Index ¶
Examples ¶
- HTTPRequest (Basic)
- HTTPRequest (ErrorHandling)
- HTTPRequest (MultipleRequests)
- HTTPRequest (ProcessingResponse)
- HTTPRequest (WithCustomClient)
- HTTPRequest (WithHeaders)
- HTTPRequest (WithTimeout)
- HTTPRequestJSON (BasicString)
- HTTPRequestJSON (ProcessingPipeline)
- HTTPRequestJSON (Slice)
- HTTPRequestJSON (Struct)
- HTTPRequestJSON (WithCustomClient)
- HTTPRequestJSON (WithErrorHandling)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HTTPRequest ¶
HTTPRequest sends a http request and returns the response. It's a pull-based operator.
A http status code >= 400 is not considered an error.
Don't forget to call resp.Body.Close() when you're done with the response.
Example (Basic) ¶
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"message": "Hello, World!"}`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request
observable := HTTPRequest(req, nil)
subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: &http.Response{Status: "200 OK", StatusCode: 200, ...} Completed
Example (ErrorHandling) ¶
// Create a test server that returns error status
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Server error"))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request
observable := HTTPRequest(req, nil)
subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: &http.Response{Status: "500 Internal Server Error", StatusCode: 500, ...} Completed
Example (MultipleRequests) ¶
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Response"))
}))
defer server.Close()
// Create multiple requests
requests := []*http.Request{
func() *http.Request { req, _ := http.NewRequest("GET", server.URL, nil); return req }(),
func() *http.Request { req, _ := http.NewRequest("GET", server.URL, nil); return req }(),
func() *http.Request { req, _ := http.NewRequest("GET", server.URL, nil); return req }(),
}
// Process multiple requests
observable := ro.Pipe1(
ro.FromSlice(requests),
ro.MergeMap(func(req *http.Request) ro.Observable[*http.Response] {
return HTTPRequest(req, nil)
}),
)
subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()
// Wait for async operations to complete
time.Sleep(50 * time.Millisecond)
Output: Next: &http.Response{Status: "200 OK", StatusCode: 200, ...} Next: &http.Response{Status: "200 OK", StatusCode: 200, ...} Next: &http.Response{Status: "200 OK", StatusCode: 200, ...} Completed
Example (ProcessingResponse) ¶
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status": "success", "data": "test"}`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request and process response
observable := ro.Pipe1(
HTTPRequest(req, nil),
ro.Map(func(resp *http.Response) string {
// Read response body (in real code, you'd handle errors)
// For this example, we'll just return status
return fmt.Sprintf("Status: %s", resp.Status)
}),
)
subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: Status: 200 OK Completed
Example (WithCustomClient) ¶
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Custom client response"))
}))
defer server.Close()
// Create custom HTTP client with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request with custom client
observable := HTTPRequest(req, client)
subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: &http.Response{Status: "200 OK", StatusCode: 200, ...} Completed
Example (WithHeaders) ¶
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Echo back the User-Agent header
userAgent := r.Header.Get("User-Agent")
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("User-Agent: %s", userAgent)))
}))
defer server.Close()
// Create HTTP request with custom headers
req, _ := http.NewRequest("GET", server.URL, nil)
req.Header.Set("User-Agent", "ro-http-client/1.0")
req.Header.Set("Accept", "application/json")
// Send HTTP request
observable := HTTPRequest(req, nil)
subscription := observable.Subscribe(customHTTPObserver())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: &http.Response{Status: "200 OK", StatusCode: 200, ...} Completed
Example (WithTimeout) ¶
// Create a test server that delays response
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond) // Simulate slow response
w.WriteHeader(http.StatusOK)
w.Write([]byte("Delayed response"))
}))
defer server.Close()
// Create HTTP request with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
req, _ := http.NewRequest("GET", server.URL, nil)
req = req.WithContext(ctx)
// Send HTTP request
observable := HTTPRequest(req, nil)
subscription := observable.Subscribe(
ro.NewObserver(
func(resp *http.Response) {
// Handle successful response
},
func(err error) {
// Handle timeout or other errors
fmt.Printf("Error due to timeout\n")
},
func() {
// Handle completion
},
),
)
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Error due to timeout
func HTTPRequestJSON ¶
Example (BasicString) ¶
// Create a test server that returns JSON string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`"Hello, World!"`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request and parse JSON response
observable := HTTPRequestJSON[string](req, nil)
subscription := observable.Subscribe(customJSONObserver[string]())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: Hello, World! Completed
Example (ProcessingPipeline) ¶
// Define response structs
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"id": 42, "name": "Alice"}`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Create processing pipeline
observable := ro.Pipe1(
HTTPRequestJSON[User](req, nil),
ro.Map(func(user User) string {
return fmt.Sprintf("User %s has ID %d", user.Name, user.ID)
}),
)
subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: User Alice has ID 42 Completed
Example (Slice) ¶
// Create a test server that returns JSON array
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`["apple", "banana", "cherry"]`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request and parse JSON response into slice
observable := HTTPRequestJSON[[]string](req, nil)
subscription := observable.Subscribe(customJSONObserver[[]string]())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: [apple banana cherry] Completed
Example (Struct) ¶
// Define a struct for the JSON response
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
// Create a test server that returns JSON object
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"id": 1, "name": "John Doe", "email": "john@example.com"}`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request and parse JSON response into struct
observable := HTTPRequestJSON[User](req, nil)
subscription := observable.Subscribe(customJSONObserver[User]())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: {ID:1 Name:John Doe Email:john@example.com} Completed
Example (WithCustomClient) ¶
// Define a response struct
type APIResponse struct {
Status string `json:"status"`
Message string `json:"message"`
}
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status": "success", "message": "Custom client used"}`))
}))
defer server.Close()
// Create custom HTTP client with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request with custom client and parse JSON
observable := HTTPRequestJSON[APIResponse](req, client)
subscription := observable.Subscribe(customJSONObserver[APIResponse]())
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: Next: {Status:success Message:Custom client used} Completed
Example (WithErrorHandling) ¶
// Create a test server that returns invalid JSON
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`invalid json content`))
}))
defer server.Close()
// Create HTTP request
req, _ := http.NewRequest("GET", server.URL, nil)
// Send HTTP request and parse JSON response
observable := HTTPRequestJSON[string](req, nil)
subscription := observable.Subscribe(
ro.NewObserver(
func(value string) {
fmt.Printf("Next: %s\n", value)
},
func(err error) {
fmt.Printf("JSON parsing error occurred\n")
},
func() {
fmt.Printf("Completed\n")
},
),
)
defer subscription.Unsubscribe()
// Wait for async operation to complete
time.Sleep(50 * time.Millisecond)
Output: JSON parsing error occurred
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.