watcher

package module
v3.0.0-...-7f0c83d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: MIT Imports: 12 Imported by: 0

README

Casbin Watermill Watcher

Go Report Card Build Status Godoc

This is a Casbin watcher based on Watermill, a Go library for working efficiently with message streams. With Watermill's support for various Pub/Sub backends, this watcher allows for flexible integration with numerous messaging systems.

Installation

go get github.com/origadmin/casbin-watcher/v3

Usage

To use the watcher, you need to create a new watcher instance with a connection URL that specifies the driver and its configuration.

import (
"context"
"log"

"github.com/casbin/casbin/v3"
"github.com/origadmin/casbin-watcher/v3"
// Import the specific driver you want to use
_ "github.com/origadmin/casbin-watcher/v3/drivers/nats" // Corrected import path
)

func main() {
// The connection URL for the desired driver.
// See the driver's README for configuration details.
connectionURL := "nats://localhost:4222/casbin_updates" // Corrected NATS URL

// Create a new watcher.
w, err := watcher.NewWatcher(context.Background(), connectionURL) // Removed topic argument
if err != nil {
log.Fatalf("Failed to create watcher: %v", err)
}

// Initialize the enforcer.
e, err := casbin.NewEnforcer("model.conf", "policy.csv")
if err != nil {
log.Fatalf("Failed to create enforcer: %v", err)
}

// Set the watcher for the enforcer.
err = e.SetWatcher(w)
if err != nil {
log.Fatalf("Failed to set watcher: %v", err)
}

// The watcher will now automatically update other enforcer instances
// when the policy changes.
// For example, after e.SavePolicy() or e.AddPolicy(), etc.
}

Supported Drivers

This section lists all Watermill Pub/Sub backends, indicating their implementation status within this casbin-watcher repository.

Driver Name Scheme(s) Underlying Watermill Package Status
AWS (SQS/SNS) sqs://, snssqs:// github.com/ThreeDotsLabs/watermill-aws Implemented
BoltDB bolt:// github.com/ThreeDotsLabs/watermill-bolt Implemented
etcd etcd:// Custom implementation (using go.etcd.io/etcd/client/v3) Implemented
Firestore firestore:// github.com/ThreeDotsLabs/watermill-firestore Implemented
Google Cloud Pub/Sub gcpv2:// github.com/ThreeDotsLabs/watermill-googlecloud/v2 Implemented
HTTP http:// github.com/ThreeDotsLabs/watermill-http/v2 Implemented
IO (Stdin/Stdout/File) io:// github.com/ThreeDotsLabs/watermill-io Implemented
Kafka kafka:// github.com/ThreeDotsLabs/watermill-kafka/v3 Implemented
NATS nats:// github.com/ThreeDotsLabs/watermill-nats/v2 Implemented
RabbitMQ rabbitmq:// github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp Implemented
Redis Streams redisstream:// github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream Implemented
SQL (PostgreSQL/MySQL) postgres://, mysql:// github.com/ThreeDotsLabs/watermill-sql/v4 Implemented
SQLite (modernc) sqlite:// github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc Implemented
AMQP 1.0 amqp10:// github.com/kahowell/watermill-amqp10 Not Implemented
Apache Pulsar pulsar:// github.com/AlexCuse/watermill-pulsar Not Implemented
Apache RocketMQ rocketmq:// github.com/yflau/watermill-rocketmq Not Implemented
CockroachDB cockroachdb:// github.com/cockroachdb/watermill-crdb Not Implemented
Ensign ensign:// github.com/rotationalio/watermill-ensign Not Implemented
Google Cloud (HTTP Push) gcp-http-push:// github.com/dentech-floss/watermill-googlecloud-http Not Implemented
MongoDB mongodb:// github.com/cunyat/watermill-mongodb Not Implemented
MQTT mqtt:// github.com/perfect13/watermill-mqtt Not Implemented
NSQ nsq:// github.com/chennqqi/watermill-nsq Not Implemented
Redis (ZSET) rediszset:// github.com/stong1994/watermill-rediszset Not Implemented
SQLite (zombiezen) sqlite-zombiezen:// github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen Not Implemented

WatcherEx

This library also supports WatcherEx for more granular policy update notifications.

// Create a new WatcherEx instance
w, err := watcher.NewWatcherEx(context.Background(), connectionURL)
if err != nil {
// ...
}

// Now you can use methods like:
// w.UpdateForAddPolicy(...)
// w.UpdateForRemovePolicy(...)

Documentation

Index

Constants

View Source
const (
	UpdateTypePolicyChanged        = "policy-changed"
	UpdateTypeAddPolicy            = "add-policy"
	UpdateTypeRemovePolicy         = "remove-policy"
	UpdateTypeRemoveFilteredPolicy = "remove-filtered-policy"
	UpdateTypeSavePolicy           = "save-policy"
	UpdateTypeAddPolicies          = "add-policies"
	UpdateTypeRemovePolicies       = "remove-policies"
)

Update types for Ex messages.

View Source
const DefaultTopic = "casbin-policy-updates"

DefaultTopic is the default topic used for policy update notifications.

Variables

This section is empty.

Functions

func RegisterDriver

func RegisterDriver(scheme string, driver Driver)

RegisterDriver registers a driver for the given scheme (e.g., "redis", "kafka"). This allows casbin-watcher to dynamically find and create PubSub instances.

Types

type Driver

type Driver interface {
	// NewPubSub creates a new PubSub instance from the parsed URL and logger.
	// The driver is responsible for parsing and validating its specific configuration
	// from the URL's host, path, and query parameters.
	NewPubSub(ctx context.Context, parsedURL *url.URL, logger watermill.LoggerAdapter) (PubSub, error)
}

Driver is the interface for creating a PubSub instance for a specific backend. Each driver is responsible for parsing its specific configuration from the URL.

type Ex

type Ex struct {
	// contains filtered or unexported fields
}

Ex implements persist.WatcherEx.

func NewWatcherEx

func NewWatcherEx(ctx context.Context, connectionURL string, opts ...Option) (*Ex, error)

NewWatcherEx creates a new Ex (extended mode).

func (Ex) Close

func (w Ex) Close()

Close stops and releases the watcher. Public interface method - errors are logged but not returned to match persist.Watcher interface.

func (Ex) SetUpdateCallback

func (w Ex) SetUpdateCallback(callback func(string)) error

func (*Ex) Update

func (w *Ex) Update() error

Update calls the update callback of other instances to synchronize their policy. This method is part of the Watcher interface and publishes a generic "policy-changed" message.

func (*Ex) UpdateForAddPolicies

func (w *Ex) UpdateForAddPolicies(sec string, ptype string, rules ...[]string) error

func (*Ex) UpdateForAddPolicy

func (w *Ex) UpdateForAddPolicy(sec, ptype string, params ...string) error

func (*Ex) UpdateForRemoveFilteredPolicy

func (w *Ex) UpdateForRemoveFilteredPolicy(sec, ptype string, fieldIndex int, fieldValues ...string) error

func (*Ex) UpdateForRemovePolicies

func (w *Ex) UpdateForRemovePolicies(sec string, ptype string, rules ...[]string) error

func (*Ex) UpdateForRemovePolicy

func (w *Ex) UpdateForRemovePolicy(sec, ptype string, params ...string) error

func (*Ex) UpdateForSavePolicy

func (w *Ex) UpdateForSavePolicy(_ model.Model) error

type MarshalUnmarshaler

type MarshalUnmarshaler interface {
	Marshal(v interface{}) ([]byte, error)
	Unmarshal(data []byte, v interface{}) error
}

MarshalUnmarshaler is the interface for serializing and deserializing UpdateMessage.

func DefaultCodec

func DefaultCodec() MarshalUnmarshaler

DefaultCodec returns the default gob codec.

func JSONCodec

func JSONCodec() MarshalUnmarshaler

JSONCodec returns a JSON codec.

type Option

type Option func(*options)

Option is a functional option for configuring the Watcher.

func WithCodec

func WithCodec(codec MarshalUnmarshaler) Option

WithCodec sets the custom MarshalUnmarshaler for UpdateMessage.

func WithLogger

func WithLogger(logger watermill.LoggerAdapter) Option

WithLogger sets the Watermill logger adapter to use.

func WithTopic

func WithTopic(topic string) Option

WithTopic sets the Watermill topic to use for updates.

type PubSub

type PubSub interface {
	message.Publisher
	message.Subscriber
}

PubSub is a high-level interface that combines Publisher and Subscriber. This is the standard interface that all drivers must produce.

type UpdateMessage

type UpdateMessage struct {
	Type   string
	Sec    string
	Ptype  string
	Params []string
	Rules  [][]string
}

UpdateMessage represents the payload for Ex updates.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher implements persist.Watcher.

func NewWatcher

func NewWatcher(ctx context.Context, connectionURL string, opts ...Option) (*Watcher, error)

NewWatcher creates a new Watcher (basic mode).

func (Watcher) Close

func (w Watcher) Close()

Close stops and releases the watcher. Public interface method - errors are logged but not returned to match persist.Watcher interface.

func (Watcher) SetUpdateCallback

func (w Watcher) SetUpdateCallback(callback func(string)) error

func (Watcher) Update

func (w Watcher) Update() error

Directories

Path Synopsis
drivers
aws
io
mem
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL