Skip to content

事件总线

VEF Framework 提供了事件总线机制,支持应用内的事件发布和订阅。

基本概念

  • 事件(Event):表示系统中发生的事情
  • 发布者(Publisher):发送事件的组件
  • 订阅者(Subscriber):接收和处理事件的组件

定义事件

go
package events

// UserCreatedEvent is published when a new user is created
type UserCreatedEvent struct {
    UserId   string
    Username string
    Email    string
}

// Name returns the event name
func (e UserCreatedEvent) Name() string {
    return "user.created"
}

// UserDeletedEvent is published when a user is deleted
type UserDeletedEvent struct {
    UserId string
}

func (e UserDeletedEvent) Name() string {
    return "user.deleted"
}

发布事件

go
import "github.com/ilxqx/vef-framework-go/eventbus"

// Publish event
eventbus.Publish(events.UserCreatedEvent{
    UserId:   user.Id,
    Username: user.Username,
    Email:    user.Email,
})

订阅事件

基本订阅

go
// Subscribe to event
eventbus.Subscribe("user.created", func(event interface{}) {
    e := event.(events.UserCreatedEvent)
    // Handle event
    log.Printf("User created: %s", e.Username)
})

使用处理器结构体

go
package handlers

import (
    "my-app/internal/events"
    "my-app/internal/services"
)

type UserEventHandler struct {
    emailService *services.EmailService
}

func NewUserEventHandler(emailService *services.EmailService) *UserEventHandler {
    return &UserEventHandler{emailService: emailService}
}

func (h *UserEventHandler) OnUserCreated(event interface{}) {
    e := event.(events.UserCreatedEvent)
    
    // Send welcome email
    h.emailService.SendWelcomeEmail(e.Email, e.Username)
}

func (h *UserEventHandler) OnUserDeleted(event interface{}) {
    e := event.(events.UserDeletedEvent)
    
    // Cleanup user data
    log.Printf("Cleaning up data for user: %s", e.UserId)
}

注册处理器

go
func RegisterEventHandlers(
    lc fx.Lifecycle,
    handler *handlers.UserEventHandler,
) {
    lc.Append(fx.Hook{
        OnStart: func(ctx context.Context) error {
            eventbus.Subscribe("user.created", handler.OnUserCreated)
            eventbus.Subscribe("user.deleted", handler.OnUserDeleted)
            return nil
        },
        OnStop: func(ctx context.Context) error {
            eventbus.Unsubscribe("user.created", handler.OnUserCreated)
            eventbus.Unsubscribe("user.deleted", handler.OnUserDeleted)
            return nil
        },
    })
}

异步事件

异步发布

go
// Publish event asynchronously
eventbus.PublishAsync(events.UserCreatedEvent{
    UserId:   user.Id,
    Username: user.Username,
    Email:    user.Email,
})

带超时的发布

go
// Publish with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := eventbus.PublishWithContext(ctx, events.UserCreatedEvent{
    UserId: user.Id,
})

事件过滤

条件订阅

go
// Subscribe with filter
eventbus.SubscribeWithFilter("order.created", func(event interface{}) {
    e := event.(events.OrderCreatedEvent)
    // Only handle high-value orders
    log.Printf("High-value order created: %s", e.OrderId)
}, func(event interface{}) bool {
    e := event.(events.OrderCreatedEvent)
    return e.Amount > 1000
})

在 API 中使用事件

go
func NewUserResource() api.Resource {
    return &UserResource{
        Resource: api.NewResource("smp/sys/user"),
        
        CreateApi: apis.NewCreateApi[models.User, payloads.UserParams]().
            WithHook(api.AfterCreate, func(ctx fiber.Ctx, user *models.User) error {
                // Publish event after user creation
                eventbus.Publish(events.UserCreatedEvent{
                    UserId:   user.Id,
                    Username: user.Username,
                    Email:    user.Email,
                })
                return nil
            }),
        
        DeleteApi: apis.NewDeleteApi[models.User]().
            WithHook(api.AfterDelete, func(ctx fiber.Ctx, user *models.User) error {
                // Publish event after user deletion
                eventbus.Publish(events.UserDeletedEvent{
                    UserId: user.Id,
                })
                return nil
            }),
    }
}

事件持久化

存储事件到数据库

go
type EventStore struct{}

func NewEventStore() *EventStore {
    return &EventStore{}
}

func (s *EventStore) Store(event interface{}) error {
    record := &models.EventRecord{
        EventName: event.(eventbus.Event).Name(),
        Payload:   toJSON(event),
        CreatedAt: time.Now(),
    }
    return orm.DB().Create(record).Error
}

// Register as middleware
eventbus.Use(func(event interface{}, next func()) {
    // Store event before processing
    eventStore.Store(event)
    next()
})

下一步

基于 Apache License 2.0 许可发布