176 lines
4.2 KiB
Plaintext
176 lines
4.2 KiB
Plaintext
|
#!/usr/bin/python
|
||
|
|
||
|
from __future__ import absolute_import, print_function, unicode_literals
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import dbus
|
||
|
import dbus.service
|
||
|
import dbus.mainloop.glib
|
||
|
try:
|
||
|
from gi.repository import GObject
|
||
|
except ImportError:
|
||
|
import gobject as GObject
|
||
|
|
||
|
BUS_NAME='org.bluez.obex'
|
||
|
PATH = '/org/bluez/obex'
|
||
|
CLIENT_INTERFACE = 'org.bluez.obex.Client1'
|
||
|
SESSION_INTERFACE = 'org.bluez.obex.Session1'
|
||
|
PHONEBOOK_ACCESS_INTERFACE = 'org.bluez.obex.PhonebookAccess1'
|
||
|
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
|
||
|
|
||
|
class Transfer:
|
||
|
def __init__(self, callback_func):
|
||
|
self.callback_func = callback_func
|
||
|
self.path = None
|
||
|
self.filename = None
|
||
|
|
||
|
class PbapClient:
|
||
|
def __init__(self, session_path):
|
||
|
self.transfers = 0
|
||
|
self.props = dict()
|
||
|
self.flush_func = None
|
||
|
bus = dbus.SessionBus()
|
||
|
obj = bus.get_object(BUS_NAME, session_path)
|
||
|
self.session = dbus.Interface(obj, SESSION_INTERFACE)
|
||
|
self.pbap = dbus.Interface(obj, PHONEBOOK_ACCESS_INTERFACE)
|
||
|
bus.add_signal_receiver(self.properties_changed,
|
||
|
dbus_interface="org.freedesktop.DBus.Properties",
|
||
|
signal_name="PropertiesChanged",
|
||
|
path_keyword="path")
|
||
|
|
||
|
def register(self, path, properties, transfer):
|
||
|
transfer.path = path
|
||
|
transfer.filename = properties["Filename"]
|
||
|
self.props[path] = transfer
|
||
|
print("Transfer created: %s (file %s)" % (path,
|
||
|
transfer.filename))
|
||
|
|
||
|
def error(self, err):
|
||
|
print(err)
|
||
|
mainloop.quit()
|
||
|
|
||
|
def transfer_complete(self, path):
|
||
|
req = self.props.get(path)
|
||
|
if req == None:
|
||
|
return
|
||
|
self.transfers -= 1
|
||
|
print("Transfer %s complete" % path)
|
||
|
try:
|
||
|
f = open(req.filename, "r")
|
||
|
os.remove(req.filename)
|
||
|
lines = f.readlines()
|
||
|
del self.props[path]
|
||
|
req.callback_func(lines)
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
if (len(self.props) == 0) and (self.transfers == 0):
|
||
|
if self.flush_func != None:
|
||
|
f = self.flush_func
|
||
|
self.flush_func = None
|
||
|
f()
|
||
|
|
||
|
def transfer_error(self, path):
|
||
|
print("Transfer %s error" % path)
|
||
|
mainloop.quit()
|
||
|
|
||
|
def properties_changed(self, interface, properties, invalidated, path):
|
||
|
req = self.props.get(path)
|
||
|
if req == None:
|
||
|
return
|
||
|
|
||
|
if properties['Status'] == 'complete':
|
||
|
self.transfer_complete(path)
|
||
|
return
|
||
|
|
||
|
if properties['Status'] == 'error':
|
||
|
self.transfer_error(path)
|
||
|
return
|
||
|
|
||
|
def pull(self, vcard, params, func):
|
||
|
req = Transfer(func)
|
||
|
self.pbap.Pull(vcard, "", params,
|
||
|
reply_handler=lambda o, p: self.register(o, p, req),
|
||
|
error_handler=self.error)
|
||
|
self.transfers += 1
|
||
|
|
||
|
def pull_all(self, params, func):
|
||
|
req = Transfer(func)
|
||
|
self.pbap.PullAll("", params,
|
||
|
reply_handler=lambda o, p: self.register(o, p, req),
|
||
|
error_handler=self.error)
|
||
|
self.transfers += 1
|
||
|
|
||
|
def flush_transfers(self, func):
|
||
|
if (len(self.props) == 0) and (self.transfers == 0):
|
||
|
return
|
||
|
self.flush_func = func
|
||
|
|
||
|
def interface(self):
|
||
|
return self.pbap
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
|
||
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||
|
|
||
|
bus = dbus.SessionBus()
|
||
|
mainloop = GObject.MainLoop()
|
||
|
|
||
|
client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
|
||
|
CLIENT_INTERFACE)
|
||
|
|
||
|
if (len(sys.argv) < 2):
|
||
|
print("Usage: %s <device>" % (sys.argv[0]))
|
||
|
sys.exit(1)
|
||
|
|
||
|
print("Creating Session")
|
||
|
session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
|
||
|
|
||
|
pbap_client = PbapClient(session_path)
|
||
|
|
||
|
def process_result(lines, header):
|
||
|
if header != None:
|
||
|
print(header)
|
||
|
for line in lines:
|
||
|
print(line),
|
||
|
print
|
||
|
|
||
|
def test_paths(paths):
|
||
|
if len(paths) == 0:
|
||
|
print
|
||
|
print("FINISHED")
|
||
|
mainloop.quit()
|
||
|
return
|
||
|
|
||
|
path = paths[0]
|
||
|
|
||
|
print("\n--- Select Phonebook %s ---\n" % (path))
|
||
|
pbap_client.interface().Select("int", path)
|
||
|
|
||
|
print("\n--- GetSize ---\n")
|
||
|
ret = pbap_client.interface().GetSize()
|
||
|
print("Size = %d\n" % (ret))
|
||
|
|
||
|
print("\n--- List vCard ---\n")
|
||
|
try:
|
||
|
ret = pbap_client.interface().List(dbus.Dictionary())
|
||
|
except:
|
||
|
ret = []
|
||
|
|
||
|
params = dbus.Dictionary({ "Format" : "vcard30",
|
||
|
"Fields" : ["PHOTO"] })
|
||
|
for item in ret:
|
||
|
print("%s : %s" % (item[0], item[1]))
|
||
|
pbap_client.pull(item[0], params,
|
||
|
lambda x: process_result(x, None))
|
||
|
|
||
|
pbap_client.pull_all(params, lambda x: process_result(x,
|
||
|
"\n--- PullAll ---\n"))
|
||
|
|
||
|
pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
|
||
|
|
||
|
test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
|
||
|
|
||
|
mainloop.run()
|