下面是一个示例的Golang代码,演示如何使用MongoDB的Change Stream来监听user集合中name和desc字段的变化,并在pipeline中设置$project。
package main
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://localhost:27017"))
if err != nil {
fmt.Println("Failed to connect to MongoDB:", err)
return
}
defer client.Disconnect(context.Background())
collection := client.Database("your_database").Collection("user")
pipeline := []interface{}{
bson.M{
"$match": bson.M{
"operationType": "update",
"ns.coll": "user",
},
},
bson.M{
"$project": bson.M{
"_id": 0,
"fullDoc": 0,
"updateDescription.removedFields": 0,
},
},
}
options := options.ChangeStream()
options.SetFullDocument(options.UpdateLookup)
options.SetMaxAwaitTime(10000)
stream, err := collection.Watch(context.Background(), pipeline, options)
if err != nil {
fmt.Println("Failed to open change stream:", err)
return
}
defer stream.Close(context.Background())
for stream.Next(context.Background()) {
var changeStreamDoc bson.M
if err := stream.Decode(&changeStreamDoc); err != nil {
fmt.Println("Failed to decode change document:", err)
continue
}
// 处理变化文档
fmt.Println(changeStreamDoc)
}
if stream.Err() != nil {
fmt.Println("Change stream error:", stream.Err())
}
}
请注意,您需要替换示例代码中的”mongodb://localhost:27017”为实际的MongoDB连接字符串,以及”your_database”为要监听的数据库名称。另外,确保已导入正确的包和依赖项(如”go.mongodb.org/mongo-driver”)。
在pipeline中,$match
阶段用于筛选操作类型为update且集合名称为”user”的变化事件。然后使用$project
阶段来排除不需要的字段,并设置了不包含”_id”、”fullDoc”和”updateDescription.removedFields”字段。
这个示例提供了一个基本的框架,你可以根据自己的需求进行调整和扩展。
内容由零声教学AI助手提供,问题来源于学员提问