233 lines
4.9 KiB
Plaintext
233 lines
4.9 KiB
Plaintext
|
#!/usr/bin/python
|
||
|
|
||
|
from __future__ import absolute_import, print_function, unicode_literals
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import sys
|
||
|
import dbus
|
||
|
import dbus.service
|
||
|
from dbus.mainloop.glib import DBusGMainLoop
|
||
|
try:
|
||
|
from gi.repository import GObject
|
||
|
except ImportError:
|
||
|
import gobject as GObject
|
||
|
|
||
|
BUS_NAME = 'org.bluez'
|
||
|
PATH = '/org/bluez'
|
||
|
ADAPTER_INTERFACE = 'org.bluez.Adapter1'
|
||
|
HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1'
|
||
|
HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1'
|
||
|
|
||
|
DBusGMainLoop(set_as_default=True)
|
||
|
loop = GObject.MainLoop()
|
||
|
|
||
|
bus = dbus.SystemBus()
|
||
|
|
||
|
def sig_received(*args, **kwargs):
|
||
|
if "member" not in kwargs:
|
||
|
return
|
||
|
if "path" not in kwargs:
|
||
|
return;
|
||
|
sig_name = kwargs["member"]
|
||
|
path = kwargs["path"]
|
||
|
print(sig_name)
|
||
|
print(path)
|
||
|
if sig_name == "PropertyChanged":
|
||
|
k, v = args
|
||
|
print(k)
|
||
|
print(v)
|
||
|
else:
|
||
|
ob = args[0]
|
||
|
print(ob)
|
||
|
|
||
|
|
||
|
def enter_mainloop():
|
||
|
bus.add_signal_receiver(sig_received, bus_name=BUS_NAME,
|
||
|
dbus_interface=HEALTH_DEVICE_INTERFACE,
|
||
|
path_keyword="path",
|
||
|
member_keyword="member",
|
||
|
interface_keyword="interface")
|
||
|
|
||
|
try:
|
||
|
print("Entering main lopp, push Ctrl+C for finish")
|
||
|
|
||
|
mainloop = GObject.MainLoop()
|
||
|
mainloop.run()
|
||
|
except KeyboardInterrupt:
|
||
|
pass
|
||
|
finally:
|
||
|
print("Exiting, bye")
|
||
|
|
||
|
hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
|
||
|
HEALTH_MANAGER_INTERFACE)
|
||
|
|
||
|
role = None
|
||
|
while role == None:
|
||
|
print("Select 1. source or 2. sink: ",)
|
||
|
try:
|
||
|
sel = int(sys.stdin.readline())
|
||
|
if sel == 1:
|
||
|
role = "source"
|
||
|
elif sel == 2:
|
||
|
role = "sink"
|
||
|
else:
|
||
|
raise ValueError
|
||
|
except (TypeError, ValueError):
|
||
|
print("Wrong selection, try again: ",)
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
dtype = None
|
||
|
while dtype == None:
|
||
|
print("Select a data type: ",)
|
||
|
try:
|
||
|
sel = int(sys.stdin.readline())
|
||
|
if (sel < 0) or (sel > 65535):
|
||
|
raise ValueError
|
||
|
dtype = sel;
|
||
|
except (TypeError, ValueError):
|
||
|
print("Wrong selection, try again: ",)
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
pref = None
|
||
|
if role == "source":
|
||
|
while pref == None:
|
||
|
try:
|
||
|
print("Select a preferred data channel type 1.",)
|
||
|
print("reliable 2. streaming: ",)
|
||
|
sel = int(sys.stdin.readline())
|
||
|
if sel == 1:
|
||
|
pref = "reliable"
|
||
|
elif sel == 2:
|
||
|
pref = "streaming"
|
||
|
else:
|
||
|
raise ValueError
|
||
|
|
||
|
except (TypeError, ValueError):
|
||
|
print("Wrong selection, try again")
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
app_path = hdp_manager.CreateApplication({
|
||
|
"DataType": dbus.types.UInt16(dtype),
|
||
|
"Role": role,
|
||
|
"Description": "Test Source",
|
||
|
"ChannelType": pref})
|
||
|
else:
|
||
|
app_path = hdp_manager.CreateApplication({
|
||
|
"DataType": dbus.types.UInt16(dtype),
|
||
|
"Description": "Test sink",
|
||
|
"Role": role})
|
||
|
|
||
|
print("New application created:", app_path)
|
||
|
|
||
|
con = None
|
||
|
while con == None:
|
||
|
try:
|
||
|
print("Connect to a remote device (y/n)? ",)
|
||
|
sel = sys.stdin.readline()
|
||
|
if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
|
||
|
con = True
|
||
|
elif sel in ("n\n", "no\n", "N\n", "NO\n"):
|
||
|
con = False
|
||
|
else:
|
||
|
print("Wrong selection, try again.")
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
if not con:
|
||
|
enter_mainloop()
|
||
|
sys.exit()
|
||
|
|
||
|
manager = dbus.Interface(bus.get_object(BUS_NAME, "/"),
|
||
|
"org.freedesktop.DBus.ObjectManager")
|
||
|
|
||
|
objects = manager.GetManagedObjects()
|
||
|
adapters = []
|
||
|
|
||
|
for path, ifaces in objects.iteritems():
|
||
|
if ifaces.has_key(ADAPTER_INTERFACE):
|
||
|
adapters.append(path)
|
||
|
|
||
|
i = 1
|
||
|
for ad in adapters:
|
||
|
print("%d. %s" % (i, ad))
|
||
|
i = i + 1
|
||
|
|
||
|
print("Select an adapter: ",)
|
||
|
select = None
|
||
|
while select == None:
|
||
|
try:
|
||
|
pos = int(sys.stdin.readline()) - 1
|
||
|
if pos < 0:
|
||
|
raise TypeError
|
||
|
select = adapters[pos]
|
||
|
except (TypeError, IndexError, ValueError):
|
||
|
print("Wrong selection, try again: ",)
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE)
|
||
|
|
||
|
devices = adapter.GetProperties()["Devices"]
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("No devices available")
|
||
|
sys.exit()
|
||
|
|
||
|
i = 1
|
||
|
for dev in devices:
|
||
|
print("%d. %s" % (i, dev))
|
||
|
i = i + 1
|
||
|
|
||
|
print("Select a device: ",)
|
||
|
select = None
|
||
|
while select == None:
|
||
|
try:
|
||
|
pos = int(sys.stdin.readline()) - 1
|
||
|
if pos < 0:
|
||
|
raise TypeError
|
||
|
select = devices[pos]
|
||
|
except (TypeError, IndexError, ValueError):
|
||
|
print("Wrong selection, try again: ",)
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
device = dbus.Interface(bus.get_object(BUS_NAME, select),
|
||
|
HEALTH_DEVICE_INTERFACE)
|
||
|
|
||
|
echo = None
|
||
|
while echo == None:
|
||
|
try:
|
||
|
print("Perform an echo (y/n)? ",)
|
||
|
sel = sys.stdin.readline()
|
||
|
if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
|
||
|
echo = True
|
||
|
elif sel in ("n\n", "no\n", "N\n", "NO\n"):
|
||
|
echo = False
|
||
|
else:
|
||
|
print("Wrong selection, try again.")
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit()
|
||
|
|
||
|
if echo:
|
||
|
if device.Echo():
|
||
|
print("Echo was ok")
|
||
|
else:
|
||
|
print("Echo war wrong, exiting")
|
||
|
sys.exit()
|
||
|
|
||
|
print("Connecting to device %s" % (select))
|
||
|
|
||
|
if role == "source":
|
||
|
chan = device.CreateChannel(app_path, "reliable")
|
||
|
else:
|
||
|
chan = device.CreateChannel(app_path, "any")
|
||
|
|
||
|
print(chan)
|
||
|
|
||
|
enter_mainloop()
|
||
|
|
||
|
hdp_manager.DestroyApplication(app_path)
|