下面是一个示例的 Go 代码,用于实现监听 MongoDB 集合 changestream 的功能:
package mongodb
import (
"context"
"errors"
"log"
"sync"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// ChangeStreamListener 定义一个 Change Stream 监听器结构体
type ChangeStreamListener struct {
client *mongo.Client
collection *mongo.Collection
stream *mongo.ChangeStream
stop chan struct{}
lock sync.Mutex
}
// NewChangeStreamListener 创建一个 Change Stream 监听器实例
func NewChangeStreamListener(ctx context.Context, uri string, dbName string, collectionName string) (*ChangeStreamListener, error) {
clientOptions := options.Client().ApplyURI(uri)
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
return nil, err
}
db := client.Database(dbName)
collection := db.Collection(collectionName)
return &ChangeStreamListener{
client: client,
collection: collection,
stop: make(chan struct{}),
}, nil
}
// Start 开始监听指定集合的 changestream 事件
func (listener *ChangeStreamListener) Start() error {
listener.lock.Lock()
defer listener.lock.Unlock()
if listener.stream != nil {
return errors.New("already listening to a collection")
}
changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := listener.collection.Watch(context.Background(), mongo.Pipeline{}, changeStreamOpts)
if err != nil {
return err
}
listener.stream = stream
go func() {
defer func() {
listener.stop <- struct{}{}
close(listener.stop)
err := listener.client.Disconnect(context.Background())
if err != nil {
log.Printf("Error disconnecting from MongoDB: %s\n", err.Error())
}
}()
for {
select {
case <-listener.stop:
return
default:
if listener.stream.TryNext(context.Background()) {
// 处理变更事件的逻辑
changeEvent := &mongo.ChangeDocument{}
err := listener.stream.Decode(changeEvent)
if err != nil {
log.Printf("Error decoding change event: %s\n", err.Error())
continue
}
// 解析变更字段,进行相应处理
log.Printf("Received change event: %+v\n", changeEvent)
}
}
}
}()
return nil
}
// Stop 停止监听 changestream 事件
func (listener *ChangeStreamListener) Stop() error {
listener.lock.Lock()
defer listener.lock.Unlock()
if listener.stream == nil {
return errors.New("not currently listening to any collection")
}
err := listener.stream.Close(context.Background())
listener.stream = nil
return err
}
调用示例:
package main
import (
"context"
mongodb "your-package-name/mongodb" // 导入你的库包名
)
func main() {
ctx := context.Background()
listener, err := mongodb.NewChangeStreamListener(ctx, "mongodb://localhost:27017", "your-db-name", "your-collection-name")
if err != nil {
panic(err)
}
err = listener.Start()
if err != nil {
panic(err)
}
// 持续获取 changestream 的 demo 示例代码(此处为示意代码,具体根据需求实现)
select {}
// 停止监听 changestream 的 demo 示例代码
err = listener.Stop()
if err != nil {
panic(err)
}
}
以上示例代码是一个简单的实现,涵盖了监听 MongoDB 集合 changestream、断点恢复和分布式锁处理的基本逻辑。你可以根据自己的需求进行修改和扩展。注意在实际使用中,还需要根据具体情况配置 MongoDB 的连接 URI、数据库名称以及集合名称等信息。
内容由零声教学AI助手提供,问题来源于学员提问




