Documentation
¶
Index ¶
- Variables
- func Shortcuts() []common.Shortcut
- type EventFilter
- type EventPipeline
- type EventProcessor
- type EventRouter
- type EventTypeFilter
- type FilterChain
- type GenericProcessor
- type ImChatBotProcessor
- type ImChatDisbandedProcessor
- func (p *ImChatDisbandedProcessor) DeduplicateKey(raw *RawEvent) string
- func (p *ImChatDisbandedProcessor) EventType() string
- func (p *ImChatDisbandedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
- func (p *ImChatDisbandedProcessor) WindowStrategy() WindowConfig
- type ImChatMemberUserProcessor
- func (p *ImChatMemberUserProcessor) DeduplicateKey(raw *RawEvent) string
- func (p *ImChatMemberUserProcessor) EventType() string
- func (p *ImChatMemberUserProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
- func (p *ImChatMemberUserProcessor) WindowStrategy() WindowConfig
- type ImChatUpdatedProcessor
- type ImMessageProcessor
- type ImMessageReactionProcessor
- func (p *ImMessageReactionProcessor) DeduplicateKey(raw *RawEvent) string
- func (p *ImMessageReactionProcessor) EventType() string
- func (p *ImMessageReactionProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
- func (p *ImMessageReactionProcessor) WindowStrategy() WindowConfig
- type ImMessageReadProcessor
- type PipelineConfig
- type ProcessorRegistry
- type RawEvent
- type RegexFilter
- type Route
- type TransformMode
- type WindowConfig
Constants ¶
This section is empty.
Variables ¶
var EventSubscribe = common.Shortcut{ Service: "event", Command: "+subscribe", Description: "Subscribe to Lark events via WebSocket (NDJSON output)", Risk: "read", Scopes: []string{}, AuthTypes: []string{"bot"}, Flags: []common.Flag{ {Name: "output-dir", Desc: "write each event as a JSON file in this directory (default: stdout)"}, {Name: "route", Type: "string_array", Desc: "regex-based event routing (e.g. --route '^im\\.message=dir:./im/' --route '^contact\\.=dir:./contacts/'); unmatched events fall through to --output-dir or stdout"}, {Name: "compact", Type: "bool", Desc: "flat key-value output: extract text, strip noise fields"}, {Name: "json", Type: "bool", Desc: "pretty-print JSON instead of NDJSON"}, {Name: "event-types", Desc: "comma-separated event types to subscribe; only use when you do not need other events (omit for catch-all)"}, {Name: "filter", Desc: "regex to further filter events by event_type"}, {Name: "quiet", Type: "bool", Desc: "suppress stderr status messages"}, {Name: "force", Type: "bool", Desc: "bypass single-instance lock (UNSAFE: server randomly splits events across connections, each instance only receives a subset)"}, }, DryRun: func(ctx context.Context, runtime *common.RuntimeContext) *common.DryRunAPI { eventTypesDisplay := "(catch-all)" if s := runtime.Str("event-types"); s != "" { eventTypesDisplay = s } filterDisplay := "(none)" if s := runtime.Str("filter"); s != "" { filterDisplay = s } outputDirDisplay := "(stdout)" if s := runtime.Str("output-dir"); s != "" { outputDirDisplay = s } routeDisplay := "(none)" if routes := runtime.StrArray("route"); len(routes) > 0 { routeDisplay = strings.Join(routes, "; ") } return common.NewDryRunAPI(). Desc("Subscribe to Lark events via WebSocket (long-running)"). Set("command", "event +subscribe"). Set("app_id", runtime.Config.AppID). Set("event_types", eventTypesDisplay). Set("filter", filterDisplay).Set("output_dir", outputDirDisplay). Set("route", routeDisplay) }, Execute: func(ctx context.Context, runtime *common.RuntimeContext) error { eventTypesStr := runtime.Str("event-types") filterStr := runtime.Str("filter") jsonFlag := runtime.Bool("json") compactFlag := runtime.Bool("compact") outputDir := runtime.Str("output-dir") quietFlag := runtime.Bool("quiet") routeSpecs := runtime.StrArray("route") forceFlag := runtime.Bool("force") if outputDir != "" { safePath, err := validate.SafeOutputPath(outputDir) if err != nil { return output.ErrValidation("unsafe output path: %s", err) } outputDir = safePath } errOut := runtime.IO().ErrOut out := runtime.IO().Out info := func(msg string) { if !quietFlag { fmt.Fprintln(errOut, msg) } } if !forceFlag { lock, err := lockfile.ForSubscribe(runtime.Config.AppID) if err != nil { return fmt.Errorf("failed to create lock: %w", err) } if err := lock.TryLock(); err != nil { return output.ErrValidation( "another event +subscribe instance is already running for app %s\n"+ " Only one subscriber per app is allowed to prevent competing consumers.\n"+ " Use --force to bypass this check.", runtime.Config.AppID, ) } defer lock.Unlock() } eventTypeFilter := NewEventTypeFilter(eventTypesStr) regexFilter, err := NewRegexFilter(filterStr) if err != nil { return output.ErrValidation("invalid --filter regex: %s", filterStr) } var filterList []EventFilter if eventTypeFilter != nil { filterList = append(filterList, eventTypeFilter) } if regexFilter != nil { filterList = append(filterList, regexFilter) } filters := NewFilterChain(filterList...) router, err := ParseRoutes(routeSpecs) if err != nil { return output.ErrValidation("invalid --route: %v", err) } mode := TransformRaw if compactFlag { mode = TransformCompact } pipeline := NewEventPipeline(DefaultRegistry(), filters, PipelineConfig{ Mode: mode, JsonFlag: jsonFlag, OutputDir: outputDir, Quiet: quietFlag, Router: router, }, out, errOut) if err := pipeline.EnsureDirs(); err != nil { return err } rawHandler := func(ctx context.Context, event *larkevent.EventReq) error { if event.Body == nil { return nil } var raw RawEvent if err := json.Unmarshal(event.Body, &raw); err != nil { output.PrintError(errOut, fmt.Sprintf("failed to parse event: %v", err)) return nil } pipeline.Process(ctx, &raw) return nil } sdkLogger := &stderrLogger{w: errOut, quiet: quietFlag} eventDispatcher := dispatcher.NewEventDispatcher("", "") eventDispatcher.InitConfig(larkevent.WithLogger(sdkLogger)) if eventTypeFilter != nil { for _, et := range eventTypeFilter.Types() { eventDispatcher.OnCustomizedEvent(et, rawHandler) } } else { for _, et := range commonEventTypes { eventDispatcher.OnCustomizedEvent(et, rawHandler) } } domain := lark.FeishuBaseUrl if runtime.Config.Brand == core.BrandLark { domain = lark.LarkBaseUrl } info(fmt.Sprintf("%sConnecting to Lark event WebSocket...%s", output.Cyan, output.Reset)) if eventTypeFilter != nil { info(fmt.Sprintf("Listening for: %s%s%s", output.Green, strings.Join(eventTypeFilter.Types(), ", "), output.Reset)) } else { info(fmt.Sprintf("Listening for %s%d common event types%s (catch-all mode)", output.Green, len(commonEventTypes), output.Reset)) info(fmt.Sprintf("%sTip:%s use --event-types to listen for specific event types", output.Dim, output.Reset)) } if regexFilter != nil { info(fmt.Sprintf("Filter: %s%s%s", output.Yellow, regexFilter.String(), output.Reset)) } if router != nil { for _, spec := range routeSpecs { info(fmt.Sprintf(" Route: %s%s%s", output.Green, spec, output.Reset)) } } cli := larkws.NewClient(runtime.Config.AppID, runtime.Config.AppSecret, larkws.WithEventHandler(eventDispatcher), larkws.WithDomain(domain), larkws.WithLogger(sdkLogger), ) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(sigCh) startErrCh := make(chan error, 1) go func() { startErrCh <- cli.Start(ctx) }() info(fmt.Sprintf("%s%sConnected.%s Waiting for events... (Ctrl+C to stop)", output.Bold, output.Green, output.Reset)) select { case sig, ok := <-sigCh: if ok && sig != nil { info(fmt.Sprintf("\n%sReceived %s, shutting down...%s (received %s%d%s events)", output.Yellow, sig, output.Reset, output.Bold, pipeline.EventCount(), output.Reset)) } return nil case err, ok := <-startErrCh: if !ok { return nil } if err != nil { return output.ErrNetwork("WebSocket connection failed: %v", err) } return nil } }, }
Functions ¶
Types ¶
type EventFilter ¶
EventFilter decides whether an event should be processed.
type EventPipeline ¶
type EventPipeline struct {
// contains filtered or unexported fields
}
EventPipeline chains filter → dedup → transform → emit.
func NewEventPipeline ¶
func NewEventPipeline( registry *ProcessorRegistry, filters *FilterChain, config PipelineConfig, out, errOut io.Writer, ) *EventPipeline
NewEventPipeline builds an event processing pipeline.
func (*EventPipeline) EnsureDirs ¶
func (p *EventPipeline) EnsureDirs() error
EnsureDirs creates all configured output directories once at startup.
func (*EventPipeline) EventCount ¶
func (p *EventPipeline) EventCount() int64
EventCount returns the number of processed events.
type EventProcessor ¶
type EventProcessor interface {
// EventType returns the event type handled, e.g. "im.message.receive_v1".
// The fallback processor returns an empty string.
EventType() string
// Transform converts raw event data to the target format.
// The returned value is serialized directly to JSON by the pipeline.
Transform(ctx context.Context, raw *RawEvent, mode TransformMode) interface{}
// DeduplicateKey returns a deduplication key. Empty string means no dedup.
DeduplicateKey(raw *RawEvent) string
// WindowStrategy returns window configuration. Zero value means disabled.
WindowStrategy() WindowConfig
}
EventProcessor defines the processing strategy for each event type.
Each processor implements its own Transform logic supporting Raw/Compact modes. The framework decides which mode to pass based on CLI flags; the processor decides the output format for that mode.
Raw mode: return raw (the complete *RawEvent) to preserve the full original event. Compact mode: return a flat map[string]interface{} ready for JSON serialization, including semantic fields like "type", "id", "from", "to" plus domain-specific fields.
type EventRouter ¶
type EventRouter struct {
// contains filtered or unexported fields
}
EventRouter dispatches events to output directories by regex matching on event_type.
func ParseRoutes ¶
func ParseRoutes(specs []string) (*EventRouter, error)
ParseRoutes parses route flag values into an EventRouter. Format: "regex=dir:./path/to/dir" Returns nil, nil when input is empty.
func (*EventRouter) Match ¶
func (r *EventRouter) Match(eventType string) []string
Match returns all target directories for the given event type. Returns nil if no routes match (caller should fall through to default output).
type EventTypeFilter ¶
type EventTypeFilter struct {
// contains filtered or unexported fields
}
EventTypeFilter filters by an event type whitelist.
func NewEventTypeFilter ¶
func NewEventTypeFilter(commaSeparated string) *EventTypeFilter
NewEventTypeFilter creates a whitelist filter from a comma-separated string. Returns nil for empty input (meaning no filtering).
func (*EventTypeFilter) Allow ¶
func (f *EventTypeFilter) Allow(eventType string) bool
func (*EventTypeFilter) Types ¶
func (f *EventTypeFilter) Types() []string
Types returns the whitelisted event types.
type FilterChain ¶
type FilterChain struct {
// contains filtered or unexported fields
}
FilterChain combines multiple filters with AND logic.
func NewFilterChain ¶
func NewFilterChain(filters ...EventFilter) *FilterChain
NewFilterChain creates a filter chain. Nil filters are skipped.
func (*FilterChain) Allow ¶
func (c *FilterChain) Allow(eventType string) bool
Allow returns true when all filters pass. An empty chain allows all events.
type GenericProcessor ¶
type GenericProcessor struct{}
GenericProcessor is the fallback for unregistered event types. Compact mode parses the event payload as a map; Raw mode passes through raw.Event.
func (*GenericProcessor) DeduplicateKey ¶
func (p *GenericProcessor) DeduplicateKey(raw *RawEvent) string
func (*GenericProcessor) EventType ¶
func (p *GenericProcessor) EventType() string
func (*GenericProcessor) Transform ¶
func (p *GenericProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*GenericProcessor) WindowStrategy ¶
func (p *GenericProcessor) WindowStrategy() WindowConfig
type ImChatBotProcessor ¶
type ImChatBotProcessor struct {
// contains filtered or unexported fields
}
ImChatBotProcessor handles im.chat.member.bot.added_v1 and deleted_v1. A single struct serves both event types; the concrete type is set via constructor.
Compact output fields:
- type, event_id, timestamp (from compactBase)
- action: "added" or "removed"
- chat_id: the group chat where the bot was added/removed
- operator_id: open_id of the user who performed the action
- external: whether this is an external (cross-tenant) chat
func NewImChatBotAddedProcessor ¶
func NewImChatBotAddedProcessor() *ImChatBotProcessor
NewImChatBotAddedProcessor creates a processor for im.chat.member.bot.added_v1.
func NewImChatBotDeletedProcessor ¶
func NewImChatBotDeletedProcessor() *ImChatBotProcessor
NewImChatBotDeletedProcessor creates a processor for im.chat.member.bot.deleted_v1.
func (*ImChatBotProcessor) DeduplicateKey ¶
func (p *ImChatBotProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImChatBotProcessor) EventType ¶
func (p *ImChatBotProcessor) EventType() string
func (*ImChatBotProcessor) Transform ¶
func (p *ImChatBotProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImChatBotProcessor) WindowStrategy ¶
func (p *ImChatBotProcessor) WindowStrategy() WindowConfig
type ImChatDisbandedProcessor ¶
type ImChatDisbandedProcessor struct{}
ImChatDisbandedProcessor handles im.chat.disbanded_v1 events.
Compact output fields:
- type, event_id, timestamp (from compactBase)
- chat_id: the group chat that was disbanded
- operator_id: open_id of the user who disbanded the chat
- external: whether this is an external (cross-tenant) chat
func (*ImChatDisbandedProcessor) DeduplicateKey ¶
func (p *ImChatDisbandedProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImChatDisbandedProcessor) EventType ¶
func (p *ImChatDisbandedProcessor) EventType() string
func (*ImChatDisbandedProcessor) Transform ¶
func (p *ImChatDisbandedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImChatDisbandedProcessor) WindowStrategy ¶
func (p *ImChatDisbandedProcessor) WindowStrategy() WindowConfig
type ImChatMemberUserProcessor ¶
type ImChatMemberUserProcessor struct {
// contains filtered or unexported fields
}
ImChatMemberUserProcessor handles im.chat.member.user.{added,withdrawn,deleted}_v1. A single struct serves all three event types; the concrete type is set via constructor.
Compact output fields:
- type, event_id, timestamp (from compactBase)
- action: "added", "withdrawn" (user left), or "removed" (kicked by admin)
- chat_id: the group chat affected
- operator_id: open_id of the user who performed the action
- user_ids: list of open_ids of the affected users
- external: whether this is an external (cross-tenant) chat
func NewImChatMemberUserAddedProcessor ¶
func NewImChatMemberUserAddedProcessor() *ImChatMemberUserProcessor
NewImChatMemberUserAddedProcessor creates a processor for im.chat.member.user.added_v1.
func NewImChatMemberUserDeletedProcessor ¶
func NewImChatMemberUserDeletedProcessor() *ImChatMemberUserProcessor
NewImChatMemberUserDeletedProcessor creates a processor for im.chat.member.user.deleted_v1.
func NewImChatMemberUserWithdrawnProcessor ¶
func NewImChatMemberUserWithdrawnProcessor() *ImChatMemberUserProcessor
NewImChatMemberUserWithdrawnProcessor creates a processor for im.chat.member.user.withdrawn_v1.
func (*ImChatMemberUserProcessor) DeduplicateKey ¶
func (p *ImChatMemberUserProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImChatMemberUserProcessor) EventType ¶
func (p *ImChatMemberUserProcessor) EventType() string
func (*ImChatMemberUserProcessor) Transform ¶
func (p *ImChatMemberUserProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImChatMemberUserProcessor) WindowStrategy ¶
func (p *ImChatMemberUserProcessor) WindowStrategy() WindowConfig
type ImChatUpdatedProcessor ¶
type ImChatUpdatedProcessor struct{}
ImChatUpdatedProcessor handles im.chat.updated_v1 events.
Compact output fields:
- type, event_id, timestamp (from compactBase)
- chat_id: the group chat that was updated
- operator_id: open_id of the user who made the change
- external: whether this is an external (cross-tenant) chat
- before_change: chat properties before the update (e.g. name, description)
- after_change: chat properties after the update
func (*ImChatUpdatedProcessor) DeduplicateKey ¶
func (p *ImChatUpdatedProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImChatUpdatedProcessor) EventType ¶
func (p *ImChatUpdatedProcessor) EventType() string
func (*ImChatUpdatedProcessor) Transform ¶
func (p *ImChatUpdatedProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImChatUpdatedProcessor) WindowStrategy ¶
func (p *ImChatUpdatedProcessor) WindowStrategy() WindowConfig
type ImMessageProcessor ¶
type ImMessageProcessor struct{}
ImMessageProcessor handles im.message.receive_v1 events.
Compact output fields:
- type, id, message_id, create_time, timestamp
- chat_id, chat_type, message_type, sender_id
- content: human-readable text converted via convertlib (supports text, post, image, file, card, etc.)
func (*ImMessageProcessor) DeduplicateKey ¶
func (p *ImMessageProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImMessageProcessor) EventType ¶
func (p *ImMessageProcessor) EventType() string
func (*ImMessageProcessor) Transform ¶
func (p *ImMessageProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImMessageProcessor) WindowStrategy ¶
func (p *ImMessageProcessor) WindowStrategy() WindowConfig
type ImMessageReactionProcessor ¶
type ImMessageReactionProcessor struct {
// contains filtered or unexported fields
}
ImMessageReactionProcessor handles im.message.reaction.created_v1 and deleted_v1. A single struct serves both event types; the concrete type is set via constructor.
Compact output fields:
- type, event_id, timestamp (from compactBase)
- action: "added" (created) or "removed" (deleted)
- message_id: the message that was reacted to
- emoji_type: the emoji used (e.g. "THUMBSUP")
- operator_id: open_id of the user who added/removed the reaction
- action_time: Unix timestamp of the action
func NewImReactionCreatedProcessor ¶
func NewImReactionCreatedProcessor() *ImMessageReactionProcessor
NewImReactionCreatedProcessor creates a processor for im.message.reaction.created_v1.
func NewImReactionDeletedProcessor ¶
func NewImReactionDeletedProcessor() *ImMessageReactionProcessor
NewImReactionDeletedProcessor creates a processor for im.message.reaction.deleted_v1.
func (*ImMessageReactionProcessor) DeduplicateKey ¶
func (p *ImMessageReactionProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImMessageReactionProcessor) EventType ¶
func (p *ImMessageReactionProcessor) EventType() string
func (*ImMessageReactionProcessor) Transform ¶
func (p *ImMessageReactionProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImMessageReactionProcessor) WindowStrategy ¶
func (p *ImMessageReactionProcessor) WindowStrategy() WindowConfig
type ImMessageReadProcessor ¶
type ImMessageReadProcessor struct{}
ImMessageReadProcessor handles im.message.message_read_v1 events.
Compact output fields:
- type, event_id, timestamp (from compactBase)
- reader_id: the open_id of the user who read the message
- read_time: Unix timestamp of the read action
- message_ids: list of message IDs that were read
func (*ImMessageReadProcessor) DeduplicateKey ¶
func (p *ImMessageReadProcessor) DeduplicateKey(raw *RawEvent) string
func (*ImMessageReadProcessor) EventType ¶
func (p *ImMessageReadProcessor) EventType() string
func (*ImMessageReadProcessor) Transform ¶
func (p *ImMessageReadProcessor) Transform(_ context.Context, raw *RawEvent, mode TransformMode) interface{}
func (*ImMessageReadProcessor) WindowStrategy ¶
func (p *ImMessageReadProcessor) WindowStrategy() WindowConfig
type PipelineConfig ¶
type PipelineConfig struct {
Mode TransformMode // determined by --compact flag
JsonFlag bool // --json: pretty JSON instead of NDJSON
OutputDir string // --output-dir: write events to files
Quiet bool // --quiet: suppress stderr status messages
Router *EventRouter // --route: regex-based output routing
}
PipelineConfig configures the event processing pipeline.
type ProcessorRegistry ¶
type ProcessorRegistry struct {
// contains filtered or unexported fields
}
ProcessorRegistry manages event_type → EventProcessor mappings.
func DefaultRegistry ¶
func DefaultRegistry() *ProcessorRegistry
DefaultRegistry builds the standard processor registry. To add a new processor, just add r.Register(...) here.
func NewProcessorRegistry ¶
func NewProcessorRegistry(fallback EventProcessor) *ProcessorRegistry
NewProcessorRegistry creates a registry with a fallback for unregistered event types.
func (*ProcessorRegistry) Lookup ¶
func (r *ProcessorRegistry) Lookup(eventType string) EventProcessor
Lookup finds a processor by event type. Returns fallback if not registered. Never returns nil.
func (*ProcessorRegistry) Register ¶
func (r *ProcessorRegistry) Register(p EventProcessor) error
Register adds a processor. Returns an error on duplicate event type registration.
type RawEvent ¶
type RawEvent struct {
Schema string `json:"schema"`
Header larkevent.EventHeader `json:"header"`
Event json.RawMessage `json:"event"`
}
RawEvent is the strongly-typed V2 event envelope. Parsed directly from event.Body JSON bytes.
type RegexFilter ¶
type RegexFilter struct {
// contains filtered or unexported fields
}
RegexFilter filters event types by a regular expression.
func NewRegexFilter ¶
func NewRegexFilter(pattern string) (*RegexFilter, error)
NewRegexFilter compiles a regex and creates a filter. Returns nil, nil for empty input.
func (*RegexFilter) Allow ¶
func (f *RegexFilter) Allow(eventType string) bool
func (*RegexFilter) String ¶
func (f *RegexFilter) String() string
type Route ¶
type Route struct {
// contains filtered or unexported fields
}
Route holds a compiled regex pattern and its target output directory.
type TransformMode ¶
type TransformMode int
TransformMode defines the event transformation mode.
const ( // TransformRaw passes through with minimal processing. TransformRaw TransformMode = iota // TransformCompact extracts core fields, suitable for AI agent consumption. TransformCompact )
type WindowConfig ¶
WindowConfig configures event windowing strategy (not implemented yet). Zero value means disabled.