下面是一个示例的 Go 代码,用于监听 MongoDB 指定集合的 changestream,并提供给外部调用者解析变更的字段。该方法基于官方 MongoDB Go 驱动程序进行开发。
package mongochange
import (
"context"
"errors"
"fmt"
"sync"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type ChangeStreamHandler func(changeDocument bson.Raw)
type MongoChangeStream struct {
client *mongo.Client
collection *mongo.Collection
stream *mongo.ChangeStream
mutex sync.Mutex
isWatching bool
}
func NewMongoChangeStream(client *mongo.Client) (*MongoChangeStream, error) {
return &MongoChangeStream{
client: client,
}, nil
}
func (m *MongoChangeStream) Watch(collectionName string, handler ChangeStreamHandler) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.isWatching {
return errors.New("Already watching a collection")
}
collection := m.client.Database("your_database_name").Collection(collectionName)
pipeline := bson.A{}
changeOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
ctx := context.TODO()
stream, err := collection.Watch(ctx, pipeline, changeOptions)
if err != nil {
return fmt.Errorf("Failed to create change stream: %v", err)
}
m.collection = collection
m.stream = stream
m.isWatching = true
go func() {
defer func() {
m.Stop()
handler(nil) // Signal the end of change stream to the caller.
}()
for m.stream.Next(ctx) {
var changeDoc bson.Raw
if err := m.stream.Decode(&changeDoc); err != nil {
fmt.Printf("Failed to decode change document: %v\n", err)
continue
}
handler(changeDoc)
}
if m.stream.Err() != nil {
fmt.Printf("Change stream error: %v\n", m.stream.Err())
}
}()
return nil
}
func (m *MongoChangeStream) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()
if !m.isWatching {
return errors.New("Not watching any collection")
}
err := m.stream.Close(context.TODO())
if err != nil {
return fmt.Errorf("Failed to close change stream: %v", err)
}
m.collection = nil
m.stream = nil
m.isWatching = false
return nil
}
在使用时,你需要创建一个 MongoDB 的客户端连接,并将其作为参数传递给 NewMongoChangeStream 函数,然后可以调用 Watch 方法来监听指定集合的 changestream。提供的 handler 函数会接收到每个变更文档,并进行相应的处理。
请注意,在上面的代码中,我使用了一把互斥锁来确保同一个实例只能监听一个集合。这是为了避免多个 goroutine 同时访问 Watch 方法导致的并发问题。另外,你需要替换代码中的 “your_database_name” 为你自己的数据库名字。
这只是一个简单示例,根据你的具体需求可能还需要进行一些修改和优化。希望能帮助到你!
内容由零声教学AI助手提供,问题来源于学员提问




