queue

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: AGPL-3.0 Imports: 6 Imported by: 0

README

GoQueue Library

GoQueue is a Go library that provides a simple and efficient way to handle fan-out messaging, event dispatching, and asynchronous reading. It includes the following main components:

  • FanOut: A structure for broadcasting messages to multiple listeners.
  • EventDispatcher: A structure for publishing and subscribing to events.
  • AsyncReader: A structure for reading from a channel asynchronously.

Installation

To install the GoQueue library, use the following command:

go get github.com/imunhatep/goqueue

Usage

EventDispatcher

The EventDispatcher structure allows you to publish and subscribe to events in-memory. Events are filtered by subject using Go NATS matching wildcards, and subscribers receive events through a channel.

Wildcards
// Will receive any subject begining with "event."
const SubjectAll = "event.>"

// Will receive "event.object.update.test1" but not "event.update.test1"  
const SubjectUpdateWildcard = "*.*.update.>"
Example
package main

import (
	"context"
	"fmt"
	"github.com/imunhatep/goqueue/pubsub"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dispatcher := pubsub.NewEventDispatcher(ctx)

	ch, unsubscribe := dispatcher.Subscribe("example.subject")
	defer unsubscribe()

	go func() {
		for event := range ch {
			fmt.Println("Received event:", event)
		}
	}()

	dispatcher.Publish("example.subject", "Hello, Event!")
	time.Sleep(1 * time.Second)
}
FanOut

Fan-out refers to the practice of starting multiple goroutines to handle incoming tasks. The main idea is to distribute incoming tasks to multiple handlers (goroutines) to ensure that each handler deals with a manageable number of tasks.

Example
package main

import (
	"context"
	"fmt"
	"time"
	"github.com/imunhatep/goqueue"
)

func main() {
	source := make(chan string)
	
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	fanOut := goqueue.NewFanOut[string](ctx, "exampleQueue", source)

	listener1 := fanOut.Subscribe()
	listener2 := fanOut.Subscribe()

	go func() {
		for msg := range listener1 {
			fmt.Println("Listener 1 received:", msg)
		}
	}()

	go func() {
		for msg := range listener2 {
			fmt.Println("Listener 2 received:", msg)
		}
	}()

	source <- "Hello, World!"
	time.Sleep(1 * time.Second)
}
AsyncReader

The AsyncReader structure reads values from channel until channel is closed. This allows multiple readers to share single instance of AsyncReader and get values from a channel.

Example
package main

import (
	"fmt"
	"github.com/imunhatep/goqueue"
)

func main() {
	ch := make(chan int)

	go func() {
		for i := 0; i < 5; i++ {
			ch <- i
		}
		close(ch)
	}()

	reader := goqueue.NewAsyncReader(ch)
	values := reader.Read()

	fmt.Println("Read values:", values)
}

Documentation

Index

Constants

View Source
const (
	QueueMetricsSubsystem = "queue_observer"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncReader

type AsyncReader[T any] struct {
	// contains filtered or unexported fields
}

func NewAsyncReader

func NewAsyncReader[T any](ch <-chan T) *AsyncReader[T]

func (*AsyncReader[T]) Read

func (cr *AsyncReader[T]) Read() []T

type FanOut

type FanOut[T any] struct {
	// contains filtered or unexported fields
}

func NewFanOut

func NewFanOut[T any](ctx context.Context, name string, source <-chan T) *FanOut[T]

func (*FanOut[T]) Run

func (b *FanOut[T]) Run() *FanOut[T]

func (*FanOut[T]) Subscribe

func (b *FanOut[T]) Subscribe() <-chan T

func (*FanOut[T]) Unsubscribe

func (b *FanOut[T]) Unsubscribe(channel <-chan T)

func (*FanOut[T]) WithMetrics

func (b *FanOut[T]) WithMetrics(metrics *QueueMetrics) *FanOut[T]

func (*FanOut[T]) WithQueueSize

func (b *FanOut[T]) WithQueueSize(size int) *FanOut[T]

type FanOutChan

type FanOutChan[T any] struct {
	// contains filtered or unexported fields
}

func NewChanBroadcaster

func NewChanBroadcaster[T any](ctx context.Context, name string, source <-chan (chan T)) *FanOutChan[T]

func (*FanOutChan[T]) Run

func (b *FanOutChan[T]) Run() *FanOutChan[T]

func (*FanOutChan[T]) Subscribe

func (b *FanOutChan[T]) Subscribe() <-chan (<-chan T)

func (*FanOutChan[T]) Unsubscribe

func (b *FanOutChan[T]) Unsubscribe(channel chan (<-chan T))

func (*FanOutChan[T]) WithMetrics

func (b *FanOutChan[T]) WithMetrics(metrics *QueueMetrics) *FanOutChan[T]

func (*FanOutChan[T]) WithQueueSize

func (b *FanOutChan[T]) WithQueueSize(size int) *FanOutChan[T]

type QueueMetrics

type QueueMetrics struct {
	ObserverSubscribed     *prometheus.GaugeVec
	ObserverReadCount      *prometheus.CounterVec
	ObserverWriteCount     *prometheus.CounterVec
	ObserverWriteFullCount *prometheus.CounterVec
}

func NewQueueMetrics

func NewQueueMetrics() *QueueMetrics

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL