protobuf

package
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: MIT Imports: 8 Imported by: 0

README

Protocol Buffer Package

Home  /  Protocol Buffer Package

 

This package provides comprehensive Protocol Buffer (protobuf) message support for the go-rabbitmq library, including automatic serialization, type-safe message routing, and message multiplexing capabilities.

 

Features

  • Automatic Serialization: Seamlessly convert protobuf messages to RabbitMQ messages
  • Type-Safe Routing: Register handlers for specific protobuf message types with compile-time safety
  • Message Multiplexing: Route different protobuf message types to appropriate handlers
  • Fallback Handling: Handle non-protobuf messages and unknown protobuf types
  • Header Management: Automatic setting of content type and message type headers

🔝 back to top

 

Installation

go get google.golang.org/protobuf

🔝 back to top

 

Basic Usage

Creating Protobuf Messages
import (
    "github.com/cloudresty/go-rabbitmq/protobuf"
    pb "your-project/protos" // Your generated protobuf code
)

// Create a protobuf message
userEvent := &pb.UserCreated{
    UserId: "12345",
    Email:  "user@example.com",
    Name:   "John Doe",
}

// Convert to RabbitMQ message (automatically sets headers and content type)
message, err := protobuf.NewProtobufMessage(userEvent)
if err != nil {
    log.Fatal(err)
}

// Publish the message
err = publisher.Publish(message, "user.events", "user.created")

🔝 back to top

 

Type-Safe Message Handling
import (
    "context"
    "github.com/cloudresty/go-rabbitmq/protobuf"
    pb "your-project/protos"
)

// Create a message multiplexer
mux := protobuf.NewMessageMux()

// Register type-safe handlers for specific protobuf message types
protobuf.RegisterHandler[*pb.UserCreated](mux, func(ctx context.Context, msg *pb.UserCreated) error {
    log.Printf("User created: %s (%s)", msg.Name, msg.Email)
    // Handle user creation logic
    return nil
})

protobuf.RegisterHandler[*pb.UserUpdated](mux, func(ctx context.Context, msg *pb.UserUpdated) error {
    log.Printf("User updated: %s", msg.UserId)
    // Handle user update logic
    return nil
})

// Set a default handler for non-protobuf messages
mux.SetDefaultHandler(func(ctx context.Context, delivery *protobuf.Delivery) error {
    log.Printf("Received non-protobuf message: %s", string(delivery.Body))
    return nil
})

// Set a handler for unknown protobuf types
mux.SetUnknownProtobufHandler(func(ctx context.Context, messageType string, data []byte) error {
    log.Printf("Unknown protobuf message type: %s", messageType)
    return nil
})

// Use the multiplexer as a message handler
consumer.StartConsuming(mux.Handle, "user.events", []string{"user.*"})

🔝 back to top

 

Message Inspection
// Check if a delivery contains a protobuf message
if protobuf.IsProtobufMessage(delivery) {
    messageType, ok := protobuf.GetProtobufMessageType(delivery)
    if ok {
        log.Printf("Received protobuf message of type: %s", messageType)
    }
}

// Get all registered message types
types := mux.GetRegisteredTypes()
log.Printf("Registered types: %v", types)

🔝 back to top

 

Advanced Usage

Custom Message Types

The package works with any protobuf message that implements the proto.Message interface. Make sure your protobuf files are properly generated:

protoc --go_out=. --go_opt=paths=source_relative your_proto_file.proto

🔝 back to top

 

Error Handling

The package provides detailed error information for common scenarios:

message, err := protobuf.NewProtobufMessage(userEvent)
if err != nil {
    // Handle serialization errors
    log.Printf("Failed to create protobuf message: %v", err)
}

🔝 back to top

 

Integration with Main Package

When using with the main rabbitmq package, you can create a bridge:

import (
    "github.com/cloudresty/go-rabbitmq"
    "github.com/cloudresty/go-rabbitmq/protobuf"
)

// Convert protobuf message to main package message
protobufMsg, err := protobuf.NewProtobufMessage(userEvent)
if err != nil {
    return err
}

// Create main package message with the same properties
mainMsg := rabbitmq.NewMessage(protobufMsg.(*protobuf.BasicMessage).Body).
    WithContentType(protobuf.ContentTypeProtobuf)

for key, value := range protobufMsg.(*protobuf.BasicMessage).Headers {
    mainMsg = mainMsg.WithHeader(key, value)
}

🔝 back to top

 

Best Practices

1. Message Type Consistency

Ensure protobuf message types are consistent across publishers and consumers:

// Good: Use consistent message types
protobuf.RegisterHandler[*pb.UserCreated](mux, handleUserCreated)

// Avoid: Mixing different versions of the same message type

🔝 back to top

 

2. Handler Organization

Group related handlers together and use clear naming:

// User-related handlers
protobuf.RegisterHandler[*pb.UserCreated](mux, handleUserCreated)
protobuf.RegisterHandler[*pb.UserUpdated](mux, handleUserUpdated)
protobuf.RegisterHandler[*pb.UserDeleted](mux, handleUserDeleted)

// Order-related handlers
protobuf.RegisterHandler[*pb.OrderCreated](mux, handleOrderCreated)
protobuf.RegisterHandler[*pb.OrderShipped](mux, handleOrderShipped)

🔝 back to top

 

3. Error Handling

Always handle serialization and deserialization errors:

protobuf.RegisterHandler[*pb.UserCreated](mux, func(ctx context.Context, msg *pb.UserCreated) error {
    if err := validateUser(msg); err != nil {
        return fmt.Errorf("invalid user data: %w", err)
    }

    return processUser(msg)
})

🔝 back to top

 

4. Schema Evolution

Design your protobuf schemas with forward and backward compatibility in mind:

syntax = "proto3";

message UserCreated {
    string user_id = 1;
    string email = 2;
    string name = 3;

    // New fields should be optional and have higher field numbers
    string phone = 4;     // Added in v2
    string company = 5;   // Added in v3
}

🔝 back to top

 

Migration from Root Package

If you're migrating from protobuf functionality in the root package:

  1. Update imports:
// Old
import "github.com/cloudresty/go-rabbitmq"

// New
import "github.com/cloudresty/go-rabbitmq/protobuf"

🔝 back to top

 

  1. Update function calls:
// Old
message, err := rabbitmq.NewProtobufMessage(userEvent)

// New
message, err := protobuf.NewProtobufMessage(userEvent)

🔝 back to top

 

  1. Update handler registration:
// Old
rabbitmq.RegisterHandler[*pb.UserCreated](mux, handler)

// New
protobuf.RegisterHandler[*pb.UserCreated](mux, handler)

🔝 back to top

 

Dependencies

  • google.golang.org/protobuf/proto - Protocol Buffer runtime library
  • github.com/rabbitmq/amqp091-go - RabbitMQ client library

🔝 back to top

 

Performance Considerations

  • Message Size: Protobuf messages are generally smaller than JSON, improving network performance
  • Serialization Speed: Protobuf serialization is typically faster than JSON
  • Type Safety: Compile-time type checking reduces runtime errors
  • Memory Usage: The multiplexer uses reflection for type handling, which has minimal overhead

🔝 back to top

 

Troubleshooting

Common Issues
  1. Message Type Not Found
Error: No handler registered for protobuf type 'your.package.MessageType'

Solution: Ensure the handler is registered before starting consumption.

🔝 back to top

 

  1. Serialization Errors
Error: failed to marshal protobuf message

Solution: Verify the protobuf message is properly initialized and valid.

🔝 back to top

 

  1. Type Mismatch
Error: message type mismatch: expected *pb.UserCreated, got *pb.UserUpdated

Solution: Check message type headers and handler registration.

🔝 back to top

 

For more examples, see the examples/ directory in the main repository.

🔝 back to top

 


 

An open source project brought to you by the Cloudresty team.

Website  |  LinkedIn  |  BlueSky  |  GitHub  |  Docker Hub

 

Documentation

Overview

Package protobuf provides support for Protocol Buffer message serialization and routing for the RabbitMQ library. It includes automatic marshaling/unmarshaling of protobuf messages and type-safe message routing capabilities.

Index

Constants

View Source
const ContentTypeProtobuf = "application/protobuf"

Protobuf content type constant

Variables

This section is empty.

Functions

func GetProtobufMessageType

func GetProtobufMessageType(delivery *Delivery) (string, bool)

GetProtobufMessageType extracts the protobuf message type from a delivery

func IsProtobufMessage

func IsProtobufMessage(delivery *Delivery) bool

IsProtobufMessage checks if a delivery contains a protobuf message

func RegisterHandler

func RegisterHandler[T proto.Message](mux *MessageMux, handlerFunc func(ctx context.Context, msg T) error)

RegisterHandler registers a type-safe handler for a specific protobuf message type The handler function will receive the fully unmarshaled protobuf message Usage: RegisterHandler[*pb.UserCreated](mux, func(ctx context.Context, msg *pb.UserCreated) error {...})

Types

type BasicMessage

type BasicMessage struct {
	Body        []byte
	ContentType string
	Headers     map[string]any
}

BasicMessage provides a basic implementation of the Message interface

func NewMessage

func NewMessage(body []byte) *BasicMessage

NewMessage creates a new basic message with the given body

func (*BasicMessage) WithContentType

func (m *BasicMessage) WithContentType(contentType string) Message

WithContentType sets the content type for the message

func (*BasicMessage) WithHeader

func (m *BasicMessage) WithHeader(key string, value any) Message

WithHeader adds a header to the message

type Delivery

type Delivery struct {
	amqp.Delivery
	ReceivedAt time.Time
}

Delivery represents a RabbitMQ delivery (minimal interface for the protobuf package)

type Message

type Message interface {
	WithContentType(contentType string) Message
	WithHeader(key string, value any) Message
}

Message represents a RabbitMQ message (minimal interface for the protobuf package)

func NewProtobufMessage

func NewProtobufMessage(pb proto.Message) (Message, error)

NewProtobufMessage creates a new Message from a Protocol Buffers message This function automatically: 1. Serializes the protobuf message using proto.Marshal() 2. Sets the content type to "application/protobuf" 3. Sets the "x-message-type" header with the fully qualified message type name

type MessageMux

type MessageMux struct {
	// contains filtered or unexported fields
}

MessageMux provides automatic routing of protobuf messages to type-safe handlers

func NewMessageMux

func NewMessageMux() *MessageMux

NewMessageMux creates a new message multiplexer for routing protobuf messages

func (*MessageMux) GetRegisteredTypes

func (m *MessageMux) GetRegisteredTypes() []string

GetRegisteredTypes returns a list of all registered protobuf message types

func (*MessageMux) Handle

func (m *MessageMux) Handle(ctx context.Context, delivery *Delivery) error

Handle is the main routing method that implements the MessageHandler interface It automatically routes messages based on their type and content

func (*MessageMux) SetDefaultHandler

func (m *MessageMux) SetDefaultHandler(handlerFunc func(ctx context.Context, delivery *Delivery) error)

SetDefaultHandler sets a fallback handler for non-protobuf messages or messages without registered handlers. This handler receives the raw delivery.

func (*MessageMux) SetUnknownProtobufHandler

func (m *MessageMux) SetUnknownProtobufHandler(handlerFunc func(ctx context.Context, messageType string, data []byte) error)

SetUnknownProtobufHandler sets a handler for protobuf messages that don't have a registered handler. This handler receives the message type name and raw data.

type ProtobufHandler

type ProtobufHandler struct {
	// contains filtered or unexported fields
}

ProtobufHandler represents a type-safe handler for a specific protobuf message type

type Serializer

type Serializer struct{}

Serializer implements the rabbitmq.MessageSerializer interface for Protocol Buffers

func NewSerializer

func NewSerializer() *Serializer

NewSerializer creates a new protobuf serializer

func (*Serializer) ContentType

func (s *Serializer) ContentType() string

ContentType returns the content type for protobuf messages

func (*Serializer) Deserialize

func (s *Serializer) Deserialize(data []byte, target any) error

Deserialize unmarshals bytes to a protobuf message

func (*Serializer) Serialize

func (s *Serializer) Serialize(msg any) ([]byte, error)

Serialize marshals a protobuf message to bytes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL