M7350/external/bluetooth/bluez/test/map-client
2024-09-09 08:57:42 +00:00

233 lines
6.8 KiB
Python
Executable File

#!/usr/bin/python
from __future__ import absolute_import, print_function, unicode_literals
from optparse import OptionParser
import os
from pprint import pformat
import sys
import dbus
import dbus.mainloop.glib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
BUS_NAME='org.bluez.obex'
PATH = '/org/bluez/obex'
CLIENT_INTERFACE = 'org.bluez.obex.Client1'
SESSION_INTERFACE = 'org.bluez.obex.Session1'
MESSAGE_ACCESS_INTERFACE = 'org.bluez.obex.MessageAccess1'
MESSAGE_INTERFACE = 'org.bluez.obex.Message1'
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
def unwrap(x):
"""Hack to unwrap D-Bus values, so that they're easier to read when
printed. Taken from d-feet """
if isinstance(x, list):
return map(unwrap, x)
if isinstance(x, tuple):
return tuple(map(unwrap, x))
if isinstance(x, dict):
return dict([(unwrap(k), unwrap(v)) for k, v in x.iteritems()])
for t in [unicode, str, long, int, float, bool]:
if isinstance(x, t):
return t(x)
return x
def parse_options():
parser.add_option("-d", "--device", dest="device",
help="Device to connect", metavar="DEVICE")
parser.add_option("-c", "--chdir", dest="new_dir",
help="Change current directory to DIR", metavar="DIR")
parser.add_option("-l", "--lsdir", action="store_true", dest="ls_dir",
help="List folders in current directory")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose")
parser.add_option("-L", "--lsmsg", action="store", dest="ls_msg",
help="List messages in supplied CWD subdir")
parser.add_option("-g", "--get", action="store", dest="get_msg",
help="Get message contents")
parser.add_option("-p", "--push", action="store", dest="push_msg",
help="Push message")
parser.add_option("--get-properties", action="store", dest="get_msg_properties",
help="Get message properties")
parser.add_option("--mark-read", action="store", dest="mark_msg_read",
help="Marks the messages as read")
parser.add_option("--mark-unread", action="store", dest="mark_msg_unread",
help="Marks the messages as unread")
parser.add_option("--mark-deleted", action="store", dest="mark_msg_deleted",
help="Deletes the message from the folder")
parser.add_option("--mark-undeleted", action="store", dest="mark_msg_undeleted",
help="Undeletes the message")
parser.add_option("-u", "--update-inbox", action="store_true", dest="update_inbox",
help="Checks for new mails")
return parser.parse_args()
def set_folder(session, new_dir):
session.SetFolder(new_dir)
class MapClient:
def __init__(self, session_path, verbose=False):
self.progress = 0
self.transfer_path = None
self.props = dict()
self.verbose = verbose
self.path = session_path
bus = dbus.SessionBus()
obj = bus.get_object(BUS_NAME, session_path)
self.session = dbus.Interface(obj, SESSION_INTERFACE)
self.map = dbus.Interface(obj, MESSAGE_ACCESS_INTERFACE)
bus.add_signal_receiver(self.properties_changed,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword="path")
def create_transfer_reply(self, path, properties):
self.transfer_path = path
self.props[path] = properties
if self.verbose:
print("Transfer created: %s (file %s)" % (path,
properties["Filename"]))
def generic_reply(self):
if self.verbose:
print("Operation succeeded")
def error(self, err):
print(err)
mainloop.quit()
def transfer_complete(self, path):
if self.verbose:
print("Transfer finished")
properties = self.props.get(path)
if properties == None:
return
f = open(properties["Filename"], "r")
os.remove(properties["Filename"])
print(f.readlines())
def transfer_error(self, path):
print("Transfer %s error" % path)
mainloop.quit()
def properties_changed(self, interface, properties, invalidated, path):
req = self.props.get(path)
if req == None:
return
if properties['Status'] == 'complete':
self.transfer_complete(path)
return
if properties['Status'] == 'error':
self.transfer_error(path)
return
def set_folder(self, new_dir):
self.map.SetFolder(new_dir)
def list_folders(self):
for i in self.map.ListFolders(dict()):
print("%s/" % (i["Name"]))
def list_messages(self, folder):
ret = self.map.ListMessages(folder, dict())
print(pformat(unwrap(ret)))
def get_message(self, handle):
self.map.ListMessages("", dict())
path = self.path + "/message" + handle
obj = bus.get_object(BUS_NAME, path)
msg = dbus.Interface(obj, MESSAGE_INTERFACE)
msg.Get("", True, reply_handler=self.create_transfer_reply,
error_handler=self.error)
def push_message(self, filename):
self.map.PushMessage(filename, "telecom/msg/outbox", dict(),
reply_handler=self.create_transfer_reply,
error_handler=self.error)
def get_message_properties(self, handle):
self.map.ListMessages("", dict())
path = self.path + "/message" + handle
obj = bus.get_object(BUS_NAME, path)
msg = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
ret = msg.GetAll(MESSAGE_INTERFACE)
print(pformat(unwrap(ret)))
def set_message_property(self, handle, prop, flag):
self.map.ListMessages("", dict())
path = self.path + "/message" + handle
obj = bus.get_object(BUS_NAME, path)
msg = dbus.Interface(obj, MESSAGE_INTERFACE)
msg.SetProperty (prop, flag);
def update_inbox(self):
self.map.UpdateInbox()
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
parser = OptionParser()
(options, args) = parse_options()
if not options.device:
parser.print_help()
exit(0)
bus = dbus.SessionBus()
mainloop = GObject.MainLoop()
client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
CLIENT_INTERFACE)
print("Creating Session")
path = client.CreateSession(options.device, { "Target": "map" })
map_client = MapClient(path, options.verbose)
if options.new_dir:
map_client.set_folder(options.new_dir)
if options.ls_dir:
map_client.list_folders()
if options.ls_msg is not None:
map_client.list_messages(options.ls_msg)
if options.get_msg is not None:
map_client.get_message(options.get_msg)
if options.push_msg is not None:
map_client.push_message(options.push_msg)
if options.get_msg_properties is not None:
map_client.get_message_properties(options.get_msg_properties)
if options.mark_msg_read is not None:
map_client.set_message_property(options.mark_msg_read, "Read", True)
if options.mark_msg_unread is not None:
map_client.set_message_property(options.mark_msg_unread, "Read", False)
if options.mark_msg_deleted is not None:
map_client.set_message_property(options.mark_msg_deleted, "Deleted", True)
if options.mark_msg_undeleted is not None:
map_client.set_message_property(options.mark_msg_undeleted, "Deleted", False)
if options.update_inbox:
map_client.update_inbox()
mainloop.run()