event

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EventSubscribe = common.Shortcut{
	Service:     "event",
	Command:     "+subscribe",
	Description: "Subscribe to Lark events via WebSocket (NDJSON output)",
	Risk:        "read",
	Scopes:      []string{},
	AuthTypes:   []string{"bot"},
	Flags: []common.Flag{

		{Name: "output-dir", Desc: "write each event as a JSON file in this directory (default: stdout)"},
		{Name: "route", Type: "string_array", Desc: "regex-based event routing (e.g. --route '^im\\.message=dir:./im/' --route '^contact\\.=dir:./contacts/'); unmatched events fall through to --output-dir or stdout"},

		{Name: "compact", Type: "bool", Desc: "flat key-value output: extract text, strip noise fields"},
		{Name: "json", Type: "bool", Desc: "pretty-print JSON instead of NDJSON"},

		{Name: "event-types", Desc: "comma-separated event types to subscribe; only use when you do not need other events (omit for catch-all)"},
		{Name: "filter", Desc: "regex to further filter events by event_type"},

		{Name: "quiet", Type: "bool", Desc: "suppress stderr status messages"},
		{Name: "force", Type: "bool", Desc: "bypass single-instance lock (UNSAFE: server randomly splits events across connections, each instance only receives a subset)"},
	},
	DryRun: func(ctx context.Context, runtime *common.RuntimeContext) *common.DryRunAPI {
		eventTypesDisplay := "(catch-all)"
		if s := runtime.Str("event-types"); s != "" {
			eventTypesDisplay = s
		}
		filterDisplay := "(none)"
		if s := runtime.Str("filter"); s != "" {
			filterDisplay = s
		}
		outputDirDisplay := "(stdout)"
		if s := runtime.Str("output-dir"); s != "" {
			outputDirDisplay = s
		}
		routeDisplay := "(none)"
		if routes := runtime.StrArray("route"); len(routes) > 0 {
			routeDisplay = strings.Join(routes, "; ")
		}
		return common.NewDryRunAPI().
			Desc("Subscribe to Lark events via WebSocket (long-running)").
			Set("command", "event +subscribe").
			Set("app_id", runtime.Config.AppID).
			Set("event_types", eventTypesDisplay).
			Set("filter", filterDisplay).Set("output_dir", outputDirDisplay).
			Set("route", routeDisplay)
	},
	Execute: func(ctx context.Context, runtime *common.RuntimeContext) error {
		eventTypesStr := runtime.Str("event-types")
		filterStr := runtime.Str("filter")
		jsonFlag := runtime.Bool("json")
		compactFlag := runtime.Bool("compact")
		outputDir := runtime.Str("output-dir")
		quietFlag := runtime.Bool("quiet")
		routeSpecs := runtime.StrArray("route")
		forceFlag := runtime.Bool("force")

		if outputDir != "" {
			safePath, err := validate.SafeOutputPath(outputDir)
			if err != nil {
				return output.ErrValidation("unsafe output path: %s", err)
			}
			outputDir = safePath
		}

		errOut := runtime.IO().ErrOut
		out := runtime.IO().Out

		info := func(msg string) {
			if !quietFlag {
				fmt.Fprintln(errOut, msg)
			}
		}

		if !forceFlag {
			lock, err := lockfile.ForSubscribe(runtime.Config.AppID)
			if err != nil {
				return fmt.Errorf("failed to create lock: %w", err)
			}
			if err := lock.TryLock(); err != nil {
				return output.ErrValidation(
					"another event +subscribe instance is already running for app %s\n"+
						"  Only one subscriber per app is allowed to prevent competing consumers.\n"+
						"  Use --force to bypass this check.",
					runtime.Config.AppID,
				)
			}
			defer lock.Unlock()
		}

		eventTypeFilter := NewEventTypeFilter(eventTypesStr)
		regexFilter, err := NewRegexFilter(filterStr)
		if err != nil {
			return output.ErrValidation("invalid --filter regex: %s", filterStr)
		}
		var filterList []EventFilter
		if eventTypeFilter != nil {
			filterList = append(filterList, eventTypeFilter)
		}
		if regexFilter != nil {
			filterList = append(filterList, regexFilter)
		}
		filters := NewFilterChain(filterList...)

		router, err := ParseRoutes(routeSpecs)
		if err != nil {
			return output.ErrValidation("invalid --route: %v", err)
		}

		mode := TransformRaw
		if compactFlag {
			mode = TransformCompact
		}
		pipeline := NewEventPipeline(DefaultRegistry(), filters, PipelineConfig{
			Mode:      mode,
			JsonFlag:  jsonFlag,
			OutputDir: outputDir,
			Quiet:     quietFlag,
			Router:    router,
		}, out, errOut)

		if err := pipeline.EnsureDirs(); err != nil {
			return err
		}

		rawHandler := func(ctx context.Context, event *larkevent.EventReq) error {
			if event.Body == nil {
				return nil
			}
			var raw RawEvent
			if err := json.Unmarshal(event.Body, &raw); err != nil {
				output.PrintError(errOut, fmt.Sprintf("failed to parse event: %v", err))
				return nil
			}
			pipeline.Process(ctx, &raw)
			return nil
		}

		sdkLogger := &stderrLogger{w: errOut, quiet: quietFlag}

		eventDispatcher := dispatcher.NewEventDispatcher("", "")
		eventDispatcher.InitConfig(larkevent.WithLogger(sdkLogger))
		if eventTypeFilter != nil {
			for _, et := range eventTypeFilter.Types() {
				eventDispatcher.OnCustomizedEvent(et, rawHandler)
			}
		} else {
			for _, et := range commonEventTypes {
				eventDispatcher.OnCustomizedEvent(et, rawHandler)
			}
		}

		domain := lark.FeishuBaseUrl
		if runtime.Config.Brand == core.BrandLark {
			domain = lark.LarkBaseUrl
		}

		info(fmt.Sprintf("%sConnecting to Lark event WebSocket...%s", output.Cyan, output.Reset))
		if eventTypeFilter != nil {
			info(fmt.Sprintf("Listening for: %s%s%s", output.Green, strings.Join(eventTypeFilter.Types(), ", "), output.Reset))
		} else {
			info(fmt.Sprintf("Listening for %s%d common event types%s (catch-all mode)", output.Green, len(commonEventTypes), output.Reset))
			info(fmt.Sprintf("%sTip:%s use --event-types to listen for specific event types", output.Dim, output.Reset))
		}
		if regexFilter != nil {
			info(fmt.Sprintf("Filter: %s%s%s", output.Yellow, regexFilter.String(), output.Reset))
		}
		if router != nil {
			for _, spec := range routeSpecs {
				info(fmt.Sprintf("  Route: %s%s%s", output.Green, spec, output.Reset))
			}
		}

		cli := larkws.NewClient(runtime.Config.AppID, runtime.Config.AppSecret,
			larkws.WithEventHandler(eventDispatcher),
			larkws.WithDomain(domain),
			larkws.WithLogger(sdkLogger),
		)

		sigCh := make(chan os.Signal, 1)
		signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
		defer signal.Stop(sigCh)

		startErrCh := make(chan error, 1)
		go func() {
			startErrCh <- cli.Start(ctx)
		}()

		info(fmt.Sprintf("%s%sConnected.%s Waiting for events... (Ctrl+C to stop)", output.Bold, output.Green, output.Reset))

		select {
		case sig, ok := <-sigCh:
			if ok && sig != nil {
				info(fmt.Sprintf("\n%sReceived %s, shutting down...%s (received %s%d%s events)", output.Yellow, sig, output.Reset, output.Bold, pipeline.EventCount(), output.Reset))
			}
			return nil
		case err, ok := <-startErrCh:
			if !ok {
				return nil
			}
			if err != nil {
				return output.ErrNetwork("WebSocket connection failed: %v", err)
			}
			return nil
		}
	},
}

Functions

func Shortcuts

func Shortcuts() []common.Shortcut

Shortcuts returns all event shortcuts.

Types

type EventFilter

type EventFilter interface {
	Allow(eventType string) bool
}

EventFilter decides whether an event should be processed.

type EventPipeline

type EventPipeline struct {
	// contains filtered or unexported fields
}

EventPipeline chains filter → dedup → transform → emit.

func NewEventPipeline

func NewEventPipeline(
	registry *ProcessorRegistry,
	filters *FilterChain,
	config PipelineConfig,
	out, errOut io.Writer,
) *EventPipeline

NewEventPipeline builds an event processing pipeline.

func (*EventPipeline) EnsureDirs

func (p *EventPipeline) EnsureDirs() error

EnsureDirs creates all configured output directories once at startup.

func (*EventPipeline) EventCount

func (p *EventPipeline) EventCount() int64

EventCount returns the number of processed events.

func (*EventPipeline) Process

func (p *EventPipeline) Process(ctx context.Context, raw *RawEvent)

Process is the pipeline entry point, called by the WebSocket callback.

type EventProcessor

type EventProcessor interface {
	// EventType returns the event type handled, e.g. "im.message.receive_v1".
	// The fallback processor returns an empty string.
	EventType() string

	// Transform converts raw event data to the target format.
	// The returned value is serialized directly to JSON by the pipeline.
	Transform(ctx context.Context, raw *RawEvent, mode TransformMode) interface{}

	// DeduplicateKey returns a deduplication key. Empty string means no dedup.
	DeduplicateKey(raw *RawEvent) string

	// WindowStrategy returns window configuration. Zero value means disabled.
	WindowStrategy() WindowConfig
}

EventProcessor defines the processing strategy for each event type.

Each processor implements its own Transform logic supporting Raw/Compact modes. The framework decides which mode to pass based on CLI flags; the processor decides the output format for that mode.

Raw mode: return raw (the complete *RawEvent) to preserve the full original event. Compact mode: return a flat map[string]interface{} ready for JSON serialization, including semantic fields like "type", "id", "from", "to" plus domain-specific fields.

type EventRouter

type EventRouter struct {
	// contains filtered or unexported fields
}

EventRouter dispatches events to output directories by regex matching on event_type.

func ParseRoutes

func ParseRoutes(specs []string) (*EventRouter, error)

ParseRoutes parses route flag values into an EventRouter. Format: "regex=dir:./path/to/dir" Returns nil, nil when input is empty.

func (*EventRouter) Match

func (r *EventRouter) Match(eventType string) []string

Match returns all target directories for the given event type. Returns nil if no routes match (caller should fall through to default output).

type EventTypeFilter

type EventTypeFilter struct {
	// contains filtered or unexported fields
}

EventTypeFilter filters by an event type whitelist.

func NewEventTypeFilter

func NewEventTypeFilter(commaSeparated string) *EventTypeFilter

NewEventTypeFilter creates a whitelist filter from a comma-separated string. Returns nil for empty input (meaning no filtering).

func (*EventTypeFilter) Allow

func (f *EventTypeFilter) Allow(eventType string) bool

func (*EventTypeFilter) Types

func (f *EventTypeFilter) Types() []string

Types returns the whitelisted event types.

type FilterChain

type FilterChain struct {
	// contains filtered or unexported fields
}

FilterChain combines multiple filters with AND logic.

func NewFilterChain

func NewFilterChain(filters ...EventFilter) *FilterChain

NewFilterChain creates a filter chain. Nil filters are skipped.

func (*FilterChain) Allow

func (c *FilterChain) Allow(eventType string) bool

Allow returns true when all filters pass. An empty chain allows all events.

type GenericProcessor

type GenericProcessor struct{}

GenericProcessor is the fallback for unregistered event types. Compact mode parses the event payload as a map; Raw mode passes through raw.Event.

func (*GenericProcessor) DeduplicateKey

func (p *GenericProcessor) DeduplicateKey(raw *RawEvent) string

func (*GenericProcessor) EventType

func (p *GenericProcessor) EventType() string

func (*GenericProcessor) Transform

func (p *GenericProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*GenericProcessor) WindowStrategy

func (p *GenericProcessor) WindowStrategy() WindowConfig

type ImChatBotProcessor

type ImChatBotProcessor struct {
	// contains filtered or unexported fields
}

ImChatBotProcessor handles im.chat.member.bot.added_v1 and deleted_v1. A single struct serves both event types; the concrete type is set via constructor.

Compact output fields:

  • type, event_id, timestamp (from compactBase)
  • action: "added" or "removed"
  • chat_id: the group chat where the bot was added/removed
  • operator_id: open_id of the user who performed the action
  • external: whether this is an external (cross-tenant) chat

func NewImChatBotAddedProcessor

func NewImChatBotAddedProcessor() *ImChatBotProcessor

NewImChatBotAddedProcessor creates a processor for im.chat.member.bot.added_v1.

func NewImChatBotDeletedProcessor

func NewImChatBotDeletedProcessor() *ImChatBotProcessor

NewImChatBotDeletedProcessor creates a processor for im.chat.member.bot.deleted_v1.

func (*ImChatBotProcessor) DeduplicateKey

func (p *ImChatBotProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImChatBotProcessor) EventType

func (p *ImChatBotProcessor) EventType() string

func (*ImChatBotProcessor) Transform

func (p *ImChatBotProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImChatBotProcessor) WindowStrategy

func (p *ImChatBotProcessor) WindowStrategy() WindowConfig

type ImChatDisbandedProcessor

type ImChatDisbandedProcessor struct{}

ImChatDisbandedProcessor handles im.chat.disbanded_v1 events.

Compact output fields:

  • type, event_id, timestamp (from compactBase)
  • chat_id: the group chat that was disbanded
  • operator_id: open_id of the user who disbanded the chat
  • external: whether this is an external (cross-tenant) chat

func (*ImChatDisbandedProcessor) DeduplicateKey

func (p *ImChatDisbandedProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImChatDisbandedProcessor) EventType

func (p *ImChatDisbandedProcessor) EventType() string

func (*ImChatDisbandedProcessor) Transform

func (p *ImChatDisbandedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImChatDisbandedProcessor) WindowStrategy

func (p *ImChatDisbandedProcessor) WindowStrategy() WindowConfig

type ImChatMemberUserProcessor

type ImChatMemberUserProcessor struct {
	// contains filtered or unexported fields
}

ImChatMemberUserProcessor handles im.chat.member.user.{added,withdrawn,deleted}_v1. A single struct serves all three event types; the concrete type is set via constructor.

Compact output fields:

  • type, event_id, timestamp (from compactBase)
  • action: "added", "withdrawn" (user left), or "removed" (kicked by admin)
  • chat_id: the group chat affected
  • operator_id: open_id of the user who performed the action
  • user_ids: list of open_ids of the affected users
  • external: whether this is an external (cross-tenant) chat

func NewImChatMemberUserAddedProcessor

func NewImChatMemberUserAddedProcessor() *ImChatMemberUserProcessor

NewImChatMemberUserAddedProcessor creates a processor for im.chat.member.user.added_v1.

func NewImChatMemberUserDeletedProcessor

func NewImChatMemberUserDeletedProcessor() *ImChatMemberUserProcessor

NewImChatMemberUserDeletedProcessor creates a processor for im.chat.member.user.deleted_v1.

func NewImChatMemberUserWithdrawnProcessor

func NewImChatMemberUserWithdrawnProcessor() *ImChatMemberUserProcessor

NewImChatMemberUserWithdrawnProcessor creates a processor for im.chat.member.user.withdrawn_v1.

func (*ImChatMemberUserProcessor) DeduplicateKey

func (p *ImChatMemberUserProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImChatMemberUserProcessor) EventType

func (p *ImChatMemberUserProcessor) EventType() string

func (*ImChatMemberUserProcessor) Transform

func (p *ImChatMemberUserProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImChatMemberUserProcessor) WindowStrategy

func (p *ImChatMemberUserProcessor) WindowStrategy() WindowConfig

type ImChatUpdatedProcessor

type ImChatUpdatedProcessor struct{}

ImChatUpdatedProcessor handles im.chat.updated_v1 events.

Compact output fields:

  • type, event_id, timestamp (from compactBase)
  • chat_id: the group chat that was updated
  • operator_id: open_id of the user who made the change
  • external: whether this is an external (cross-tenant) chat
  • before_change: chat properties before the update (e.g. name, description)
  • after_change: chat properties after the update

func (*ImChatUpdatedProcessor) DeduplicateKey

func (p *ImChatUpdatedProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImChatUpdatedProcessor) EventType

func (p *ImChatUpdatedProcessor) EventType() string

func (*ImChatUpdatedProcessor) Transform

func (p *ImChatUpdatedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImChatUpdatedProcessor) WindowStrategy

func (p *ImChatUpdatedProcessor) WindowStrategy() WindowConfig

type ImMessageProcessor

type ImMessageProcessor struct{}

ImMessageProcessor handles im.message.receive_v1 events.

Compact output fields:

  • type, id, message_id, create_time, timestamp
  • chat_id, chat_type, message_type, sender_id
  • content: human-readable text converted via convertlib (supports text, post, image, file, card, etc.)

func (*ImMessageProcessor) DeduplicateKey

func (p *ImMessageProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImMessageProcessor) EventType

func (p *ImMessageProcessor) EventType() string

func (*ImMessageProcessor) Transform

func (p *ImMessageProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImMessageProcessor) WindowStrategy

func (p *ImMessageProcessor) WindowStrategy() WindowConfig

type ImMessageReactionProcessor

type ImMessageReactionProcessor struct {
	// contains filtered or unexported fields
}

ImMessageReactionProcessor handles im.message.reaction.created_v1 and deleted_v1. A single struct serves both event types; the concrete type is set via constructor.

Compact output fields:

  • type, event_id, timestamp (from compactBase)
  • action: "added" (created) or "removed" (deleted)
  • message_id: the message that was reacted to
  • emoji_type: the emoji used (e.g. "THUMBSUP")
  • operator_id: open_id of the user who added/removed the reaction
  • action_time: Unix timestamp of the action

func NewImReactionCreatedProcessor

func NewImReactionCreatedProcessor() *ImMessageReactionProcessor

NewImReactionCreatedProcessor creates a processor for im.message.reaction.created_v1.

func NewImReactionDeletedProcessor

func NewImReactionDeletedProcessor() *ImMessageReactionProcessor

NewImReactionDeletedProcessor creates a processor for im.message.reaction.deleted_v1.

func (*ImMessageReactionProcessor) DeduplicateKey

func (p *ImMessageReactionProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImMessageReactionProcessor) EventType

func (p *ImMessageReactionProcessor) EventType() string

func (*ImMessageReactionProcessor) Transform

func (p *ImMessageReactionProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImMessageReactionProcessor) WindowStrategy

func (p *ImMessageReactionProcessor) WindowStrategy() WindowConfig

type ImMessageReadProcessor

type ImMessageReadProcessor struct{}

ImMessageReadProcessor handles im.message.message_read_v1 events.

Compact output fields:

  • type, event_id, timestamp (from compactBase)
  • reader_id: the open_id of the user who read the message
  • read_time: Unix timestamp of the read action
  • message_ids: list of message IDs that were read

func (*ImMessageReadProcessor) DeduplicateKey

func (p *ImMessageReadProcessor) DeduplicateKey(raw *RawEvent) string

func (*ImMessageReadProcessor) EventType

func (p *ImMessageReadProcessor) EventType() string

func (*ImMessageReadProcessor) Transform

func (p *ImMessageReadProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}

func (*ImMessageReadProcessor) WindowStrategy

func (p *ImMessageReadProcessor) WindowStrategy() WindowConfig

type PipelineConfig

type PipelineConfig struct {
	Mode      TransformMode // determined by --compact flag
	JsonFlag  bool          // --json: pretty JSON instead of NDJSON
	OutputDir string        // --output-dir: write events to files
	Quiet     bool          // --quiet: suppress stderr status messages
	Router    *EventRouter  // --route: regex-based output routing
}

PipelineConfig configures the event processing pipeline.

type ProcessorRegistry

type ProcessorRegistry struct {
	// contains filtered or unexported fields
}

ProcessorRegistry manages event_type → EventProcessor mappings.

func DefaultRegistry

func DefaultRegistry() *ProcessorRegistry

DefaultRegistry builds the standard processor registry. To add a new processor, just add r.Register(...) here.

func NewProcessorRegistry

func NewProcessorRegistry(fallback EventProcessor) *ProcessorRegistry

NewProcessorRegistry creates a registry with a fallback for unregistered event types.

func (*ProcessorRegistry) Lookup

func (r *ProcessorRegistry) Lookup(eventType string) EventProcessor

Lookup finds a processor by event type. Returns fallback if not registered. Never returns nil.

func (*ProcessorRegistry) Register

func (r *ProcessorRegistry) Register(p EventProcessor) error

Register adds a processor. Returns an error on duplicate event type registration.

type RawEvent

type RawEvent struct {
	Schema string                `json:"schema"`
	Header larkevent.EventHeader `json:"header"`
	Event  json.RawMessage       `json:"event"`
}

RawEvent is the strongly-typed V2 event envelope. Parsed directly from event.Body JSON bytes.

type RegexFilter

type RegexFilter struct {
	// contains filtered or unexported fields
}

RegexFilter filters event types by a regular expression.

func NewRegexFilter

func NewRegexFilter(pattern string) (*RegexFilter, error)

NewRegexFilter compiles a regex and creates a filter. Returns nil, nil for empty input.

func (*RegexFilter) Allow

func (f *RegexFilter) Allow(eventType string) bool

func (*RegexFilter) String

func (f *RegexFilter) String() string

type Route

type Route struct {
	// contains filtered or unexported fields
}

Route holds a compiled regex pattern and its target output directory.

type TransformMode

type TransformMode int

TransformMode defines the event transformation mode.

const (
	// TransformRaw passes through with minimal processing.
	TransformRaw TransformMode = iota
	// TransformCompact extracts core fields, suitable for AI agent consumption.
	TransformCompact
)

type WindowConfig

type WindowConfig struct {
	Duration time.Duration
	GroupBy  string
}

WindowConfig configures event windowing strategy (not implemented yet). Zero value means disabled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL