当然,下面是一个使用Golang监听MongoDB changestream并获取operationType的简单示例:
package main
import (
"context"
"fmt"
"log"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
// 设置MongoDB连接选项
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
// 连接到MongoDB
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
log.Fatal(err)
}
defer func() {
if err = client.Disconnect(context.Background()); err != nil {
log.Fatal(err)
}
}()
fmt.Println("Connected to MongoDB!")
collection := client.Database("test").Collection("mycollection")
pipeline := []bson.M{
bson.M{"$match": bson.M{"operationType": bson.M{"$in": []string{"insert", "update", "delete"}}}},
}
changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(time.Second * 2)
stream, err := collection.Watch(context.Background(), pipeline, changeStreamOpts)
if err != nil {
log.Fatal(err)
}
defer stream.Close(context.Background())
for stream.Next(context.Background()) {
var changeDoc bson.M
if err = stream.Decode(&changeDoc); err != nil {
log.Fatal(err)
}
operationType := changeDoc["operationType"]
fmt.Printf("Operation Type: %v\n", operationType)
}
if stream.Err() != nil {
log.Fatal(stream.Err())
}
}
请确保你已经安装了go.mongodb.org/mongo-driver包。该示例将监听MongoDB的changestream,并打印出每个操作的operationType。你需要根据你自己的MongoDB配置进行适当修改。
希望对你有所帮助!
内容由零声教学AI助手提供,问题来源于学员提问




