事件总线
VEF Framework 提供了事件总线机制,支持应用内的事件发布和订阅。
基本概念
- 事件(Event):表示系统中发生的事情
- 发布者(Publisher):发送事件的组件
- 订阅者(Subscriber):接收和处理事件的组件
定义事件
go
package events
// UserCreatedEvent is published when a new user is created
type UserCreatedEvent struct {
UserId string
Username string
Email string
}
// Name returns the event name
func (e UserCreatedEvent) Name() string {
return "user.created"
}
// UserDeletedEvent is published when a user is deleted
type UserDeletedEvent struct {
UserId string
}
func (e UserDeletedEvent) Name() string {
return "user.deleted"
}发布事件
go
import "github.com/ilxqx/vef-framework-go/eventbus"
// Publish event
eventbus.Publish(events.UserCreatedEvent{
UserId: user.Id,
Username: user.Username,
Email: user.Email,
})订阅事件
基本订阅
go
// Subscribe to event
eventbus.Subscribe("user.created", func(event interface{}) {
e := event.(events.UserCreatedEvent)
// Handle event
log.Printf("User created: %s", e.Username)
})使用处理器结构体
go
package handlers
import (
"my-app/internal/events"
"my-app/internal/services"
)
type UserEventHandler struct {
emailService *services.EmailService
}
func NewUserEventHandler(emailService *services.EmailService) *UserEventHandler {
return &UserEventHandler{emailService: emailService}
}
func (h *UserEventHandler) OnUserCreated(event interface{}) {
e := event.(events.UserCreatedEvent)
// Send welcome email
h.emailService.SendWelcomeEmail(e.Email, e.Username)
}
func (h *UserEventHandler) OnUserDeleted(event interface{}) {
e := event.(events.UserDeletedEvent)
// Cleanup user data
log.Printf("Cleaning up data for user: %s", e.UserId)
}注册处理器
go
func RegisterEventHandlers(
lc fx.Lifecycle,
handler *handlers.UserEventHandler,
) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
eventbus.Subscribe("user.created", handler.OnUserCreated)
eventbus.Subscribe("user.deleted", handler.OnUserDeleted)
return nil
},
OnStop: func(ctx context.Context) error {
eventbus.Unsubscribe("user.created", handler.OnUserCreated)
eventbus.Unsubscribe("user.deleted", handler.OnUserDeleted)
return nil
},
})
}异步事件
异步发布
go
// Publish event asynchronously
eventbus.PublishAsync(events.UserCreatedEvent{
UserId: user.Id,
Username: user.Username,
Email: user.Email,
})带超时的发布
go
// Publish with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := eventbus.PublishWithContext(ctx, events.UserCreatedEvent{
UserId: user.Id,
})事件过滤
条件订阅
go
// Subscribe with filter
eventbus.SubscribeWithFilter("order.created", func(event interface{}) {
e := event.(events.OrderCreatedEvent)
// Only handle high-value orders
log.Printf("High-value order created: %s", e.OrderId)
}, func(event interface{}) bool {
e := event.(events.OrderCreatedEvent)
return e.Amount > 1000
})在 API 中使用事件
go
func NewUserResource() api.Resource {
return &UserResource{
Resource: api.NewResource("smp/sys/user"),
CreateApi: apis.NewCreateApi[models.User, payloads.UserParams]().
WithHook(api.AfterCreate, func(ctx fiber.Ctx, user *models.User) error {
// Publish event after user creation
eventbus.Publish(events.UserCreatedEvent{
UserId: user.Id,
Username: user.Username,
Email: user.Email,
})
return nil
}),
DeleteApi: apis.NewDeleteApi[models.User]().
WithHook(api.AfterDelete, func(ctx fiber.Ctx, user *models.User) error {
// Publish event after user deletion
eventbus.Publish(events.UserDeletedEvent{
UserId: user.Id,
})
return nil
}),
}
}事件持久化
存储事件到数据库
go
type EventStore struct{}
func NewEventStore() *EventStore {
return &EventStore{}
}
func (s *EventStore) Store(event interface{}) error {
record := &models.EventRecord{
EventName: event.(eventbus.Event).Name(),
Payload: toJSON(event),
CreatedAt: time.Now(),
}
return orm.DB().Create(record).Error
}
// Register as middleware
eventbus.Use(func(event interface{}, next func()) {
// Store event before processing
eventStore.Store(event)
next()
})