Documentation
¶
Overview ¶
Package streaming demonstrates server streaming gRPC support in proto-cli.
This example shows how to:
- Define server streaming RPC methods (one request, multiple responses)
- Stream responses with configurable output formats (JSON, YAML, Go)
- Configure message delimiters for NDJSON and other formats
- Use streaming in both local (direct) and remote (gRPC client) modes
- Handle real-time data feeds like watch operations
The example includes:
- ListItems: Streams a list of items matching filter criteria
- WatchItems: Streams item change events in real-time
Streaming features:
- Each message is formatted independently using the selected OutputFormat
- Messages are separated by configurable delimiters (default: newline)
- Works seamlessly with Unix tools via NDJSON (e.g., | jq ., | grep)
- Supports both local execution and remote gRPC server calls
To run the example:
go run ./streamcli streaming-service list-items --category books --format json go run ./streamcli streaming-service watch-items --start-id 1 --format yaml
Generated code in this package should not be edited manually. To regenerate after modifying streaming.proto, run: go generate
Index ¶
- Constants
- Variables
- func RegisterStreamingServiceServer(s grpc.ServiceRegistrar, srv StreamingServiceServer)
- func StreamingServiceCommand(ctx context.Context, implOrFactory interface{}, opts ...protocli.ServiceOption) *protocli.ServiceCLI
- func StreamingServiceCommandsFlat(ctx context.Context, implOrFactory interface{}, opts ...protocli.ServiceOption) []*v3.Command
- type Item
- type ItemEvent
- func (*ItemEvent) Descriptor() ([]byte, []int)deprecated
- func (x *ItemEvent) GetEventType() string
- func (x *ItemEvent) GetItem() *Item
- func (x *ItemEvent) GetTimestamp() int64
- func (*ItemEvent) ProtoMessage()
- func (x *ItemEvent) ProtoReflect() protoreflect.Message
- func (x *ItemEvent) Reset()
- func (x *ItemEvent) String() string
- type ItemResponse
- func (*ItemResponse) Descriptor() ([]byte, []int)deprecated
- func (x *ItemResponse) GetItem() *Item
- func (x *ItemResponse) GetMessage() string
- func (*ItemResponse) ProtoMessage()
- func (x *ItemResponse) ProtoReflect() protoreflect.Message
- func (x *ItemResponse) Reset()
- func (x *ItemResponse) String() string
- type ListItemsRequest
- func (*ListItemsRequest) Descriptor() ([]byte, []int)deprecated
- func (x *ListItemsRequest) GetCategory() string
- func (x *ListItemsRequest) GetIncludeDeleted() bool
- func (x *ListItemsRequest) GetLimit() int32
- func (x *ListItemsRequest) GetOffset() int32
- func (x *ListItemsRequest) GetSortBy() string
- func (*ListItemsRequest) ProtoMessage()
- func (x *ListItemsRequest) ProtoReflect() protoreflect.Message
- func (x *ListItemsRequest) Reset()
- func (x *ListItemsRequest) String() string
- type StreamingService
- type StreamingServiceClient
- type StreamingServiceServer
- type StreamingService_ListItemsClient
- type StreamingService_ListItemsServer
- type StreamingService_WatchItemsClient
- type StreamingService_WatchItemsServer
- type UnimplementedStreamingServiceServer
- type UnsafeStreamingServiceServer
- type WatchRequest
Constants ¶
const ( StreamingService_ListItems_FullMethodName = "/streaming.StreamingService/ListItems" StreamingService_WatchItems_FullMethodName = "/streaming.StreamingService/WatchItems" )
Variables ¶
var File_examples_streaming_streaming_proto protoreflect.FileDescriptor
var StreamingService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "streaming.StreamingService", HandlerType: (*StreamingServiceServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "ListItems", Handler: _StreamingService_ListItems_Handler, ServerStreams: true, }, { StreamName: "WatchItems", Handler: _StreamingService_WatchItems_Handler, ServerStreams: true, }, }, Metadata: "examples/streaming/streaming.proto", }
StreamingService_ServiceDesc is the grpc.ServiceDesc for StreamingService service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)
Functions ¶
func RegisterStreamingServiceServer ¶
func RegisterStreamingServiceServer(s grpc.ServiceRegistrar, srv StreamingServiceServer)
func StreamingServiceCommand ¶
func StreamingServiceCommand(ctx context.Context, implOrFactory interface{}, opts ...protocli.ServiceOption) *protocli.ServiceCLI
StreamingServiceCommand creates a CLI for StreamingService with options The implOrFactory parameter can be either a direct service implementation or a factory function
func StreamingServiceCommandsFlat ¶
func StreamingServiceCommandsFlat(ctx context.Context, implOrFactory interface{}, opts ...protocli.ServiceOption) []*v3.Command
StreamingServiceCommandsFlat creates a flat command structure for StreamingService (for single-service CLIs) This returns RPC commands directly at the root level instead of nested under a service command. The implOrFactory parameter can be either a direct service implementation or a factory function The returned slice includes all RPC commands plus a daemonize command for starting a gRPC server.
Types ¶
type Item ¶
type Item struct {
Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
Category string `protobuf:"bytes,3,opt,name=category,proto3" json:"category,omitempty"`
// contains filtered or unexported fields
}
func (*Item) Descriptor
deprecated
func (*Item) GetCategory ¶
func (*Item) ProtoMessage ¶
func (*Item) ProtoMessage()
func (*Item) ProtoReflect ¶
func (x *Item) ProtoReflect() protoreflect.Message
type ItemEvent ¶
type ItemEvent struct {
EventType string `protobuf:"bytes,1,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` // "created", "updated", "deleted"
Item *Item `protobuf:"bytes,2,opt,name=item,proto3" json:"item,omitempty"`
Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// contains filtered or unexported fields
}
func (*ItemEvent) Descriptor
deprecated
func (*ItemEvent) GetEventType ¶
func (*ItemEvent) GetTimestamp ¶
func (*ItemEvent) ProtoMessage ¶
func (*ItemEvent) ProtoMessage()
func (*ItemEvent) ProtoReflect ¶
func (x *ItemEvent) ProtoReflect() protoreflect.Message
type ItemResponse ¶
type ItemResponse struct {
Item *Item `protobuf:"bytes,1,opt,name=item,proto3" json:"item,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
// contains filtered or unexported fields
}
func (*ItemResponse) Descriptor
deprecated
func (*ItemResponse) Descriptor() ([]byte, []int)
Deprecated: Use ItemResponse.ProtoReflect.Descriptor instead.
func (*ItemResponse) GetItem ¶
func (x *ItemResponse) GetItem() *Item
func (*ItemResponse) GetMessage ¶
func (x *ItemResponse) GetMessage() string
func (*ItemResponse) ProtoMessage ¶
func (*ItemResponse) ProtoMessage()
func (*ItemResponse) ProtoReflect ¶
func (x *ItemResponse) ProtoReflect() protoreflect.Message
func (*ItemResponse) Reset ¶
func (x *ItemResponse) Reset()
func (*ItemResponse) String ¶
func (x *ItemResponse) String() string
type ListItemsRequest ¶
type ListItemsRequest struct {
Category string `protobuf:"bytes,1,opt,name=category,proto3" json:"category,omitempty"`
Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"`
// Optional fields demonstrate explicit presence tracking
Offset *int32 `protobuf:"varint,3,opt,name=offset,proto3,oneof" json:"offset,omitempty"`
SortBy *string `protobuf:"bytes,4,opt,name=sort_by,json=sortBy,proto3,oneof" json:"sort_by,omitempty"`
IncludeDeleted *bool `protobuf:"varint,5,opt,name=include_deleted,json=includeDeleted,proto3,oneof" json:"include_deleted,omitempty"`
// contains filtered or unexported fields
}
func (*ListItemsRequest) Descriptor
deprecated
func (*ListItemsRequest) Descriptor() ([]byte, []int)
Deprecated: Use ListItemsRequest.ProtoReflect.Descriptor instead.
func (*ListItemsRequest) GetCategory ¶
func (x *ListItemsRequest) GetCategory() string
func (*ListItemsRequest) GetIncludeDeleted ¶
func (x *ListItemsRequest) GetIncludeDeleted() bool
func (*ListItemsRequest) GetLimit ¶
func (x *ListItemsRequest) GetLimit() int32
func (*ListItemsRequest) GetOffset ¶
func (x *ListItemsRequest) GetOffset() int32
func (*ListItemsRequest) GetSortBy ¶
func (x *ListItemsRequest) GetSortBy() string
func (*ListItemsRequest) ProtoMessage ¶
func (*ListItemsRequest) ProtoMessage()
func (*ListItemsRequest) ProtoReflect ¶
func (x *ListItemsRequest) ProtoReflect() protoreflect.Message
func (*ListItemsRequest) Reset ¶
func (x *ListItemsRequest) Reset()
func (*ListItemsRequest) String ¶
func (x *ListItemsRequest) String() string
type StreamingService ¶
type StreamingService struct {
UnimplementedStreamingServiceServer
}
func NewStreamingService ¶
func NewStreamingService() *StreamingService
func (*StreamingService) ListItems ¶
func (s *StreamingService) ListItems(req *ListItemsRequest, stream grpc.ServerStreamingServer[ItemResponse]) error
func (*StreamingService) WatchItems ¶
func (s *StreamingService) WatchItems(req *WatchRequest, stream grpc.ServerStreamingServer[ItemEvent]) error
type StreamingServiceClient ¶
type StreamingServiceClient interface {
// Server streaming: list items as they're found
ListItems(ctx context.Context, in *ListItemsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ItemResponse], error)
// Server streaming: watch for changes
WatchItems(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ItemEvent], error)
}
StreamingServiceClient is the client API for StreamingService service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
func NewStreamingServiceClient ¶
func NewStreamingServiceClient(cc grpc.ClientConnInterface) StreamingServiceClient
type StreamingServiceServer ¶
type StreamingServiceServer interface {
// Server streaming: list items as they're found
ListItems(*ListItemsRequest, grpc.ServerStreamingServer[ItemResponse]) error
// Server streaming: watch for changes
WatchItems(*WatchRequest, grpc.ServerStreamingServer[ItemEvent]) error
// contains filtered or unexported methods
}
StreamingServiceServer is the server API for StreamingService service. All implementations must embed UnimplementedStreamingServiceServer for forward compatibility.
type StreamingService_ListItemsClient ¶
type StreamingService_ListItemsClient = grpc.ServerStreamingClient[ItemResponse]
This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type StreamingService_ListItemsServer ¶
type StreamingService_ListItemsServer = grpc.ServerStreamingServer[ItemResponse]
This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type StreamingService_WatchItemsClient ¶
type StreamingService_WatchItemsClient = grpc.ServerStreamingClient[ItemEvent]
This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type StreamingService_WatchItemsServer ¶
type StreamingService_WatchItemsServer = grpc.ServerStreamingServer[ItemEvent]
This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type UnimplementedStreamingServiceServer ¶
type UnimplementedStreamingServiceServer struct{}
UnimplementedStreamingServiceServer must be embedded to have forward compatible implementations.
NOTE: this should be embedded by value instead of pointer to avoid a nil pointer dereference when methods are called.
func (UnimplementedStreamingServiceServer) ListItems ¶
func (UnimplementedStreamingServiceServer) ListItems(*ListItemsRequest, grpc.ServerStreamingServer[ItemResponse]) error
func (UnimplementedStreamingServiceServer) WatchItems ¶
func (UnimplementedStreamingServiceServer) WatchItems(*WatchRequest, grpc.ServerStreamingServer[ItemEvent]) error
type UnsafeStreamingServiceServer ¶
type UnsafeStreamingServiceServer interface {
// contains filtered or unexported methods
}
UnsafeStreamingServiceServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to StreamingServiceServer will result in compilation errors.
type WatchRequest ¶
type WatchRequest struct {
StartId int64 `protobuf:"varint,1,opt,name=start_id,json=startId,proto3" json:"start_id,omitempty"`
// contains filtered or unexported fields
}
func (*WatchRequest) Descriptor
deprecated
func (*WatchRequest) Descriptor() ([]byte, []int)
Deprecated: Use WatchRequest.ProtoReflect.Descriptor instead.
func (*WatchRequest) GetStartId ¶
func (x *WatchRequest) GetStartId() int64
func (*WatchRequest) ProtoMessage ¶
func (*WatchRequest) ProtoMessage()
func (*WatchRequest) ProtoReflect ¶
func (x *WatchRequest) ProtoReflect() protoreflect.Message
func (*WatchRequest) Reset ¶
func (x *WatchRequest) Reset()
func (*WatchRequest) String ¶
func (x *WatchRequest) String() string