rofsnotify

package module
v0.0.0-...-a6ee939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 3 Imported by: 0

README

File System Notify Plugin

The file system notify plugin provides operators for monitoring file system events using the fsnotify package.

Installation

go get github.com/samber/ro/plugins/fsnotify

Operators

NewFSListener

Creates an observable that monitors file system events for specified paths.

import (
    "os"
    "github.com/fsnotify/fsnotify"
    "github.com/samber/ro"
    rofsnotify "github.com/samber/ro/plugins/fsnotify"
)

// Monitor a single directory
tempDir := os.TempDir()
observable := rofsnotify.NewFSListener(tempDir)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(event fsnotify.Event) {
            // Handle file system event
            switch event.Op {
            case fsnotify.Create:
                // File was created
            case fsnotify.Write:
                // File was written to
            case fsnotify.Remove:
                // File was removed
            case fsnotify.Rename:
                // File was renamed
            case fsnotify.Chmod:
                // File permissions changed
            }
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Event Types

The plugin monitors the following file system events:

  • Create: A new file or directory was created
  • Write: A file was written to
  • Remove: A file or directory was removed
  • Rename: A file or directory was renamed
  • Chmod: File permissions were changed

Multiple Path Monitoring

You can monitor multiple directories simultaneously:

paths := []string{
    "/path/to/dir1",
    "/path/to/dir2",
    "/path/to/dir3",
}

observable := rofsnotify.NewFSListener(paths...)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(event fsnotify.Event) {
            // Handle events from any of the monitored paths
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Event Filtering

You can filter events based on file extensions or event types:

import (
    "path/filepath"
    "github.com/fsnotify/fsnotify"
    "github.com/samber/ro"
    rofsnotify "github.com/samber/ro/plugins/fsnotify"
)

// Filter by file extension
observable := ro.Pipe1(
    rofsnotify.NewFSListener("/path/to/monitor"),
    ro.Filter(func(event fsnotify.Event) bool {
        // Only process .txt files
        return filepath.Ext(event.Name) == ".txt"
    }),
)

// Filter by event type
observable := ro.Pipe1(
    rofsnotify.NewFSListener("/path/to/monitor"),
    ro.Filter(func(event fsnotify.Event) bool {
        // Only process create and write events
        return event.Op&(fsnotify.Create|fsnotify.Write) != 0
    }),
)

Event Throttling

To avoid processing too many rapid successive events, you can throttle the events:

import (
    "time"
    "github.com/samber/ro"
    rofsnotify "github.com/samber/ro/plugins/fsnotify"
)

observable := ro.Pipe1(
    rofsnotify.NewFSListener("/path/to/monitor"),
    ro.ThrottleTime[fsnotify.Event](100 * time.Millisecond),
)

subscription := observable.Subscribe(
    ro.NewObserver(
        func(event fsnotify.Event) {
            // Handle throttled file system event
        },
        func(err error) {
            // Handle error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Error Handling

The plugin handles various file system errors gracefully:

observable := rofsnotify.NewFSListener("/path/to/monitor")

subscription := observable.Subscribe(
    ro.NewObserver(
        func(event fsnotify.Event) {
            // Handle successful file system event
        },
        func(err error) {
            // Handle file system monitoring error
            // This could be due to:
            // - Insufficient permissions
            // - Directory not existing
            // - File system limitations
            // - Other file system issues
        },
        func() {
            // Handle completion (when monitoring stops)
        },
    ),
)
defer subscription.Unsubscribe()

Context Support

You can use context for cancellation and timeout:

import (
    "context"
    "time"
    "github.com/samber/ro"
    rofsnotify "github.com/samber/ro/plugins/fsnotify"
)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

observable := rofsnotify.NewFSListener("/path/to/monitor")

subscription := observable.SubscribeWithContext(
    ctx,
    ro.NewObserverWithContext(
        func(ctx context.Context, event fsnotify.Event) {
            // Handle file system event with context
        },
        func(ctx context.Context, err error) {
            // Handle error with context
        },
        func(ctx context.Context) {
            // Handle completion with context
        },
    ),
)
defer subscription.Unsubscribe()

Event Transformation

You can transform file system events into other formats:

observable := ro.Pipe1(
    rofsnotify.NewFSListener("/path/to/monitor"),
    ro.Map(func(event fsnotify.Event) string {
        // Transform event to string representation
        return event.Name + " - " + event.Op.String()
    }),
)

subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()

Real-world Example

Here's a practical example that monitors a log directory and processes new log files:

import (
    "path/filepath"
    "strings"
    "github.com/fsnotify/fsnotify"
    "github.com/samber/ro"
    rofsnotify "github.com/samber/ro/plugins/fsnotify"
)

// Monitor log directory for new log files
pipeline := ro.Pipe3(
    // Monitor the logs directory
    rofsnotify.NewFSListener("/var/log"),
    // Filter for create events on .log files
    ro.Filter(func(event fsnotify.Event) bool {
        return event.Op == fsnotify.Create && 
               filepath.Ext(event.Name) == ".log"
    }),
    // Transform to log file path
    ro.Map(func(event fsnotify.Event) string {
        return event.Name
    }),
)

subscription := pipeline.Subscribe(
    ro.NewObserver(
        func(logFile string) {
            // Process new log file
            // e.g., read and analyze the log file
        },
        func(err error) {
            // Handle monitoring error
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Performance Considerations

  • The plugin uses the fsnotify package for efficient file system monitoring
  • Events are emitted asynchronously to avoid blocking
  • Monitor only necessary directories to reduce system load
  • Use filtering to process only relevant events
  • Consider throttling for high-frequency event sources
  • The plugin automatically handles file system limitations and errors
  • Context cancellation properly cleans up monitoring resources

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFSListener

func NewFSListener(paths ...string) ro.Observable[fsnotify.Event]

NewFSListener creates a file system watcher that emits file system events.

Example
// Monitor file system events for a directory
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := NewFSListener(tempDir)

// Create a test file to trigger events
testFile := filepath.Join(tempDir, "test.txt")

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(event fsnotify.Event) {
			// Handle file system event
			switch event.Op {
			case fsnotify.Create:
				fmt.Println("File was created")
			case fsnotify.Write:
				fmt.Println("File was written to")
			case fsnotify.Remove:
				fmt.Println("File was removed")
			case fsnotify.Rename:
				fmt.Println("File was renamed")
			case fsnotify.Chmod:
				fmt.Println("File permissions changed")
			}
		},
		func(err error) {
			// Handle error
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Trigger a file creation event
file, _ := os.Create(testFile)
file.Close()
time.Sleep(100 * time.Millisecond)
Output:

File was created
Example (WithContext)
// Monitor file system events with context for cancellation
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := NewFSListener(tempDir)

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()

// Set up subscription first
subscription := observable.SubscribeWithContext(
	ctx,
	ro.NewObserverWithContext(
		func(ctx context.Context, event fsnotify.Event) {
			// Handle file system event with context
			fmt.Println("Context event:", event.Op.String())
		},
		func(ctx context.Context, err error) {
			// Handle error with context
			fmt.Println("Context error:", err.Error())
		},
		func(ctx context.Context) {
			// Handle completion with context
			fmt.Println("Context completed")
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Trigger an event
file, _ := os.Create(filepath.Join(tempDir, "test.txt"))
file.Close()
time.Sleep(100 * time.Millisecond)
Output:

Context event: CREATE
Example (WithErrorHandling)
// Monitor file system events with comprehensive error handling
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := NewFSListener(tempDir)

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(event fsnotify.Event) {
			// Handle successful file system event
			fmt.Println("Event received:", event.Op.String())
		},
		func(err error) {
			// Handle file system monitoring error
			fmt.Println("Error occurred:", err.Error())
		},
		func() {
			// Handle completion (when monitoring stops)
			fmt.Println("Monitoring completed")
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Trigger an event
file, _ := os.Create(filepath.Join(tempDir, "test.txt"))
file.Close()
time.Sleep(100 * time.Millisecond)
Output:

Event received: CREATE
Example (WithEventTypeFiltering)
// Monitor specific types of file system events
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := ro.Pipe1(
	NewFSListener(tempDir),
	ro.Filter(func(event fsnotify.Event) bool {
		// Only process create and write events
		return event.Op&(fsnotify.Create) != 0
	}),
)

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(event fsnotify.Event) {
			// Handle create and write events only
			fmt.Println("Event type:", event.Op.String())
		},
		func(err error) {
			// Handle error
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Create and write to a file
file, _ := os.Create(filepath.Join(tempDir, "test.txt"))
file.WriteString("hello")
file.Sync()                       // Force sync to ensure write event
time.Sleep(50 * time.Millisecond) // Wait for write event
file.Close()
time.Sleep(100 * time.Millisecond)
Output:

Event type: CREATE
Example (WithFiltering)
// Monitor file system events with filtering
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := ro.Pipe1(
	NewFSListener(tempDir),
	ro.Filter(func(event fsnotify.Event) bool {
		// Only process .txt files
		return filepath.Ext(event.Name) == ".txt"
	}),
)

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(event fsnotify.Event) {
			// Handle filtered file system event
			fmt.Println("Filtered event for:", filepath.Base(event.Name))
		},
		func(err error) {
			// Handle error
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Create files with different extensions
file1, _ := os.Create(filepath.Join(tempDir, "test.txt"))
file1.Close()
file2, _ := os.Create(filepath.Join(tempDir, "test.log"))
file2.Close()
time.Sleep(100 * time.Millisecond)
Output:

Filtered event for: test.txt
Example (WithMultiplePaths)
// Monitor multiple directories
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

subDir := filepath.Join(tempDir, "subdir")
os.MkdirAll(subDir, 0755)

paths := []string{
	tempDir,
	subDir,
}

observable := NewFSListener(paths...)

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(event fsnotify.Event) {
			// Handle file system event
			if filepath.Base(filepath.Dir(event.Name)) == "subdir" {
				fmt.Println("Event from: subdir")
			} else {
				fmt.Println("Event from: main directory")
			}
		},
		func(err error) {
			// Handle error
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Trigger events in both directories
file1, _ := os.Create(filepath.Join(tempDir, "file1.txt"))
file1.Close()
file2, _ := os.Create(filepath.Join(subDir, "file2.txt"))
file2.Close()
time.Sleep(100 * time.Millisecond)
Output:

Event from: main directory
Event from: subdir
Example (WithThrottling)
// Monitor file system events with throttling to avoid rapid successive events
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := ro.Pipe1(
	NewFSListener(tempDir),
	ro.ThrottleTime[fsnotify.Event](100*time.Millisecond),
)

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(event fsnotify.Event) {
			// Handle throttled file system event
			fmt.Println("Throttled event:", filepath.Base(event.Name))
		},
		func(err error) {
			// Handle error
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Create multiple files rapidly
for i := 0; i < 5; i++ {
	file, _ := os.Create(filepath.Join(tempDir, fmt.Sprintf("file%d.txt", i)))
	file.Close()
}
time.Sleep(200 * time.Millisecond)
Output:

Throttled event: file0.txt
Example (WithTransformation)
// Monitor file system events and transform them
tempDir, err := os.MkdirTemp("", "fsnotify-example")
if err != nil {
	return
}
defer os.RemoveAll(tempDir)

observable := ro.Pipe1(
	NewFSListener(tempDir),
	ro.Map(func(event fsnotify.Event) string {
		// Transform event to string representation
		return filepath.Base(event.Name) + " - " + event.Op.String()
	}),
)

// Set up subscription first
subscription := observable.Subscribe(
	ro.NewObserver(
		func(eventStr string) {
			fmt.Println("Transformed:", eventStr)
		},
		func(err error) {
			// Handle error
		},
		func() {
			// Handle completion
		},
	),
)
defer subscription.Unsubscribe()

// Wait for watcher to be set up
time.Sleep(100 * time.Millisecond)

// Trigger an event
file, _ := os.Create(filepath.Join(tempDir, "test.txt"))
file.Close()
time.Sleep(100 * time.Millisecond)
Output:

Transformed: test.txt - CREATE

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL