下面是一个基于Go语言操作MongoDB的通用方法库的示例,满足你所描述的需求:
package mongodb
import (
"context"
"fmt"
"log"
"sync"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type ChangeStreamHandler interface {
OnChange(data bson.M)
}
type MongoDBClient struct {
client *mongo.Client
collectionName string
handler ChangeStreamHandler
mutex sync.Mutex
}
func NewMongoDBClient(uri, collectionName string, handler ChangeStreamHandler) (*MongoDBClient, error) {
clientOptions := options.Client().ApplyURI(uri)
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
return nil, fmt.Errorf("failed to connect to MongoDB: %v", err)
}
err = client.Ping(context.Background(), nil)
if err != nil {
return nil, fmt.Errorf("failed to ping MongoDB: %v", err)
}
return &MongoDBClient{
client: client,
collectionName: collectionName,
handler: handler,
}, nil
}
func (mc *MongoDBClient) ListenForChanges() error {
mc.mutex.Lock()
defer mc.mutex.Unlock()
if mc.handler == nil {
return fmt.Errorf("no change stream handler specified")
}
collection := mc.client.Database("your_database_name").Collection(mc.collectionName)
pipeline := mongo.Pipeline{{}}
options := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetStartAfter(nil)
cursor, err := collection.Watch(context.Background(), pipeline, options)
if err != nil {
return fmt.Errorf("failed to create change stream: %v", err)
}
defer cursor.Close(context.Background())
for cursor.Next(context.Background()) {
var changeDoc bson.M
if err := cursor.Decode(&changeDoc); err != nil {
log.Printf("failed to decode change document: %v", err)
continue
}
mc.handler.OnChange(changeDoc)
}
if err := cursor.Err(); err != nil {
return fmt.Errorf("change stream error: %v", err)
}
return nil
}
请注意,这只是一个示例,并不能直接运行。你需要根据实际情况进行适当修改,比如设置数据库名称、URI等。
使用示例:
package main
import (
"log"
mongodb "your-package/mongodb"
)
type MyChangeStreamHandler struct{}
func (h *MyChangeStreamHandler) OnChange(data bson.M) {
log.Printf("Received change: %+v\n", data)
}
func main() {
handler := &MyChangeStreamHandler{}
client, err := mongodb.NewMongoDBClient("mongodb://localhost:27017", "your_collection_name", handler)
if err != nil {
log.Fatalf("Failed to create MongoDB client: %v", err)
}
err = client.ListenForChanges()
if err != nil {
log.Fatalf("Error while listening for changes: %v", err)
}
}
上述代码中,MyChangeStreamHandler 实现了 OnChange 方法,用于处理变更数据。你可以在该方法中添加自定义的逻辑来解析和处理变更的字段。
希望这个示例能对你有所帮助!
内容由零声教学AI助手提供,问题来源于学员提问




