以下是一个使用 D-Bus 实现蓝牙 Peripheral(外设)和 Central(中心设备)之间通信的示例代码框架。这里以 BlueZ 作为蓝牙协议栈,通过 D-Bus API 实现基本的 GATT 服务和特征值交互。
1. Peripheral 端代码(广告并提供 GATT 服务)
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
# 初始化 D-Bus main loop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
# 定义 GATT 服务的 UUID 和特征值 UUID
SERVICE_UUID = "0000180a-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID = "00002a57-0000-1000-8000-00805f9b34fb"
class GATTCharacteristic(dbus.service.Object):
def __init__(self, bus, path, uuid, flags):
super().__init__(bus, path)
self.uuid = uuid
self.flags = flags
self.value = []
@dbus.service.method("org.bluez.GattCharacteristic1",
in_signature="a{sv}", out_signature="ay")
def ReadValue(self, options):
print("Read request received")
return self.value
@dbus.service.method("org.bluez.GattCharacteristic1",
in_signature="aya{sv}")
def WriteValue(self, value, options):
print(f"Write request received: {value}")
self.value = value
class GATTService(dbus.service.Object):
def __init__(self, bus, path, uuid):
super().__init__(bus, path)
self.uuid = uuid
self.characteristic = GATTCharacteristic(
bus,
f"{path}/char0001",
CHARACTERISTIC_UUID,
["read", "write"]
)
@dbus.service.method("org.bluez.GattService1",
out_signature="s")
def GetUUID(self):
return self.uuid
# 注册广告和服务
def register_advertisement():
ad_manager = dbus.Interface(
bus.get_object("org.bluez", "/org/bluez/hci0"),
"org.bluez.LEAdvertisingManager1"
)
ad_properties = {
"Type": "peripheral",
"ServiceUUIDs": [SERVICE_UUID],
"LocalName": "TestPeripheral",
}
ad_path = "/test/peripheral/ad"
bus.register_object(ad_path, ad_properties, None)
ad_manager.RegisterAdvertisement(ad_path, {})
def register_gatt_service():
service_manager = dbus.Interface(
bus.get_object("org.bluez", "/org/bluez/hci0"),
"org.bluez.GattManager1"
)
service_path = "/test/peripheral/service"
service = GATTService(bus, service_path, SERVICE_UUID)
service_manager.RegisterApplication(service_path, {})
if __name__ == "__main__":
register_advertisement()
register_gatt_service()
print("Peripheral running...")
loop = GLib.MainLoop()
loop.run()
2. Central 端代码(扫描并读写特征值)
import dbus
import dbus.mainloop.glib
from gi.repository import GLib
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
def discover_devices():
# 获取适配器对象
adapter = dbus.Interface(
bus.get_object("org.bluez", "/org/bluez/hci0"),
"org.bluez.Adapter1"
)
# 开始扫描
adapter.StartDiscovery()
print("Scanning for devices...")
# 监听信号:设备发现
bus.add_signal_receiver(
device_found,
signal_name="InterfacesAdded",
dbus_interface="org.freedesktop.DBus.ObjectManager"
)
# 10秒后停止扫描
GLib.timeout_add_seconds(10, lambda: adapter.StopDiscovery())
def device_found(path, interfaces):
if "org.bluez.Device1" in interfaces:
props = interfaces["org.bluez.Device1"]
if "Name" in props and props["Name"] == "TestPeripheral":
print(f"Found target device: {path}")
connect_to_device(path)
def connect_to_device(dev_path):
device = dbus.Interface(
bus.get_object("org.bluez", dev_path),
"org.bluez.Device1"
)
device.Connect()
# 等待服务解析完成
GLib.timeout_add_seconds(3, lambda: discover_services(dev_path))
def discover_services(dev_path):
services_resolved = dbus.Interface(
bus.get_object("org.bluez", dev_path),
"org.freedesktop.DBus.Properties"
).Get("org.bluez.Device1", "ServicesResolved")
if services_resolved:
print("Services resolved")
find_gatt_characteristic(dev_path)
def find_gatt_characteristic(dev_path):
om = dbus.Interface(
bus.get_object("org.freedesktop.DBus", "/"),
"org.freedesktop.DBus.ObjectManager"
)
objects = om.GetManagedObjects()
for path, interfaces in objects.items():
if "org.bluz.GattCharacteristic1" in interfaces:
char_props=interfaces["orog bluz GatChara terisitc!"]
if char_props["UUID"] == CHARACTERISTIC_UUID:
print(f"Found characteristic at {path}")
read_write_characteristic(path)
def read_write_characteristic(char_path):
char=db us.Inte rface(
b us.ge t_o bject ("o rg.bl uez", c har_pa th),
“o rg.bl uez.G attC hara cteri stic!"
)
#读操作
value=cha r.Re adVal ue({})
pr int(f”R ead v alue: {va lue}”)
#写操作
new_val ue=[d bu s.B yte( ord(c )) fo r c in “h ello”]
cha r.Wr iteV alue (ne w_v alue , {})
pr int(“W rite su ccess!”)
if __name__=="__main__":
di scover_devices()
lo op=GL ib.M ainL oop()
lo op.ru n()
3.注意事项
1.依赖:需要安装dbus-python
、pygobject
和BlueZ开发库(如libbluetooth-dev
)。
2.权限:可能需要以root权限运行或配置D-Bus策略。
3.UUID规范:需使用标准UUID或自定义128位UUID。
4.错误处理:实际使用时需添加超时、重试和异常处理。
###4.扩展建议
- 完整实现:参考BlueZ官方API文档(https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc)补充完整功能。
- 异步处理:使用
GLib.MainLoop
避免阻塞操作。 - 安全机制:添加配对和加密支持(BlueZ的
Agent1
接口)。
如果需要更具体的功能(如通知/指示特性),可进一步扩展GATT特征类并实现Notify
方法。
内容由零声教学AI助手提供,问题来源于学员提问