198 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			198 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/python
 | |
| 
 | |
| from __future__ import absolute_import, print_function, unicode_literals
 | |
| 
 | |
| '''
 | |
| Cycling Speed and Cadence test script
 | |
| '''
 | |
| 
 | |
| from optparse import OptionParser, make_option
 | |
| import sys
 | |
| import dbus
 | |
| import dbus.service
 | |
| import dbus.mainloop.glib
 | |
| try:
 | |
|   from gi.repository import GObject
 | |
| except ImportError:
 | |
|   import gobject as GObject
 | |
| import bluezutils
 | |
| 
 | |
| BUS_NAME = 'org.bluez'
 | |
| CYCLINGSPEED_MANAGER_INTERFACE = 'org.bluez.CyclingSpeedManager1'
 | |
| CYCLINGSPEED_WATCHER_INTERFACE = 'org.bluez.CyclingSpeedWatcher1'
 | |
| CYCLINGSPEED_INTERFACE = 'org.bluez.CyclingSpeed1'
 | |
| 
 | |
| class MeasurementQ:
 | |
| 	def __init__(self, wrap_v):
 | |
| 		self._now = [None, None]
 | |
| 		self._prev = [None, None]
 | |
| 		self._wrap_v = wrap_v
 | |
| 
 | |
| 	def can_calc(self):
 | |
| 		return ((self._now[0] is not None)
 | |
| 			and (self._now[1] is not None)
 | |
| 			and (self._prev[0] is not None)
 | |
| 			and (self._prev[1] is not None))
 | |
| 
 | |
| 	def delta_v(self):
 | |
| 		delta = self._now[0] - self._prev[0]
 | |
| 		if (delta < 0) and (self._wrap_v):
 | |
| 			delta = delta + 65536
 | |
| 		return delta
 | |
| 
 | |
| 	def delta_t(self):
 | |
| 		delta = self._now[1] - self._prev[1]
 | |
| 		if delta < 0:
 | |
| 			delta = delta + 65536
 | |
| 		return delta
 | |
| 
 | |
| 	def put(self, data):
 | |
| 		self._prev = self._now
 | |
| 		self._now = data
 | |
| 
 | |
| class Watcher(dbus.service.Object):
 | |
| 	_wheel = MeasurementQ(False)
 | |
| 	_crank = MeasurementQ(True)
 | |
| 	_circumference = None
 | |
| 
 | |
| 	def enable_calc(self, v):
 | |
| 		self._circumference = v
 | |
| 
 | |
| 	@dbus.service.method(CYCLINGSPEED_WATCHER_INTERFACE,
 | |
| 					in_signature="oa{sv}", out_signature="")
 | |
| 	def MeasurementReceived(self, device, measure):
 | |
| 		print("Measurement received from %s" % device)
 | |
| 
 | |
| 		rev = None
 | |
| 		evt = None
 | |
| 		if "WheelRevolutions" in measure:
 | |
| 			rev = measure["WheelRevolutions"]
 | |
| 			print("WheelRevolutions: ", measure["WheelRevolutions"])
 | |
| 		if "LastWheelEventTime" in measure:
 | |
| 			evt = measure["LastWheelEventTime"]
 | |
| 			print("LastWheelEventTime: ", measure["LastWheelEventTime"])
 | |
| 		self._wheel.put( [rev, evt] )
 | |
| 
 | |
| 		rev = None
 | |
| 		evt = None
 | |
| 		if "CrankRevolutions" in measure:
 | |
| 			rev = measure["CrankRevolutions"]
 | |
| 			print("CrankRevolutions: ", measure["CrankRevolutions"])
 | |
| 		if "LastCrankEventTime" in measure:
 | |
| 			evt = measure["LastCrankEventTime"]
 | |
| 			print("LastCrankEventTime: ", measure["LastCrankEventTime"])
 | |
| 		self._crank.put( [rev, evt] )
 | |
| 
 | |
| 		if self._circumference is None:
 | |
| 			return
 | |
| 
 | |
| 		if self._wheel.can_calc():
 | |
| 			delta_v = self._wheel.delta_v()
 | |
| 			delta_t = self._wheel.delta_t()
 | |
| 
 | |
| 			if (delta_v >= 0) and (delta_t > 0):
 | |
| 				speed = delta_v * self._circumference * 1024 / delta_t # mm/s
 | |
| 				speed = speed * 0.0036 # mm/s -> km/h
 | |
| 				print("(calculated) Speed: %.2f km/h" % speed)
 | |
| 
 | |
| 		if self._crank.can_calc():
 | |
| 			delta_v = self._crank.delta_v()
 | |
| 			delta_t = self._crank.delta_t()
 | |
| 
 | |
| 			if delta_t > 0:
 | |
| 				cadence = delta_v * 1024 / delta_t
 | |
| 				print("(calculated) Cadence: %d rpm" % cadence)
 | |
| 
 | |
| def properties_changed(interface, changed, invalidated):
 | |
| 	if "Location" in changed:
 | |
| 		print("Sensor location: %s" % changed["Location"])
 | |
| 
 | |
| if __name__ == "__main__":
 | |
| 	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 | |
| 
 | |
| 	bus = dbus.SystemBus()
 | |
| 
 | |
| 	option_list = [
 | |
| 		make_option("-i", "--adapter", action="store",
 | |
| 			type="string", dest="adapter"),
 | |
| 		make_option("-b", "--device", action="store",
 | |
| 			type="string", dest="address"),
 | |
| 		make_option("-c", "--circumference", action="store",
 | |
| 			type="int", dest="circumference"),
 | |
| 		]
 | |
| 
 | |
| 	parser = OptionParser(option_list=option_list)
 | |
| 
 | |
| 	(options, args) = parser.parse_args()
 | |
| 
 | |
| 	if not options.address:
 | |
| 		print("Usage: %s [-i <adapter>] -b <bdaddr> [-c <value>] [cmd]" % (sys.argv[0]))
 | |
| 		print("Possible commands:")
 | |
| 		print("\tShowSupportedLocations")
 | |
| 		print("\tSetLocation <location>")
 | |
| 		print("\tSetCumulativeWheelRevolutions <value>")
 | |
| 		sys.exit(1)
 | |
| 
 | |
| 	managed_objects = bluezutils.get_managed_objects()
 | |
| 	adapter = bluezutils.find_adapter_in_objects(managed_objects,
 | |
| 								options.adapter)
 | |
| 	adapter_path = adapter.object_path
 | |
| 
 | |
| 	device = bluezutils.find_device_in_objects(managed_objects,
 | |
| 								options.address,
 | |
| 								options.adapter)
 | |
| 	device_path = device.object_path
 | |
| 
 | |
| 	cscmanager = dbus.Interface(bus.get_object(BUS_NAME, adapter_path),
 | |
| 						CYCLINGSPEED_MANAGER_INTERFACE)
 | |
| 
 | |
| 	watcher_path = "/test/watcher"
 | |
| 	watcher = Watcher(bus, watcher_path)
 | |
| 	if options.circumference:
 | |
| 		watcher.enable_calc(options.circumference)
 | |
| 	cscmanager.RegisterWatcher(watcher_path)
 | |
| 
 | |
| 	csc = dbus.Interface(bus.get_object(BUS_NAME, device_path),
 | |
| 							CYCLINGSPEED_INTERFACE)
 | |
| 
 | |
| 	bus.add_signal_receiver(properties_changed, bus_name=BUS_NAME,
 | |
| 				path=device_path,
 | |
| 				dbus_interface="org.freedesktop.DBus.Properties",
 | |
| 				signal_name="PropertiesChanged")
 | |
| 
 | |
| 	device_prop = dbus.Interface(bus.get_object(BUS_NAME, device_path),
 | |
| 					"org.freedesktop.DBus.Properties")
 | |
| 
 | |
| 	properties = device_prop.GetAll(CYCLINGSPEED_INTERFACE)
 | |
| 
 | |
| 	if "Location" in properties:
 | |
| 		print("Sensor location: %s" % properties["Location"])
 | |
| 	else:
 | |
| 		print("Sensor location is not supported")
 | |
| 
 | |
| 	if len(args) > 0:
 | |
| 		if args[0] == "ShowSupportedLocations":
 | |
| 			if properties["MultipleLocationsSupported"]:
 | |
| 				print("Supported locations: ", properties["SupportedLocations"])
 | |
| 			else:
 | |
| 				print("Multiple sensor locations not supported")
 | |
| 
 | |
| 		elif args[0] == "SetLocation":
 | |
| 			if properties["MultipleLocationsSupported"]:
 | |
| 				device_prop.Set(CYCLINGSPEED_INTERFACE, "Location", args[1])
 | |
| 			else:
 | |
| 				print("Multiple sensor locations not supported")
 | |
| 
 | |
| 		elif args[0] == "SetCumulativeWheelRevolutions":
 | |
| 			if properties["WheelRevolutionDataSupported"]:
 | |
| 				csc.SetCumulativeWheelRevolutions(dbus.UInt32(args[1]))
 | |
| 			else:
 | |
| 				print("Wheel revolution data not supported")
 | |
| 
 | |
| 		else:
 | |
| 			print("Unknown command")
 | |
| 			sys.exit(1)
 | |
| 
 | |
| 	mainloop = GObject.MainLoop()
 | |
| 	mainloop.run()
 | 
