diff options
Diffstat (limited to 'DBus.py')
-rw-r--r-- | DBus.py | 66 |
1 files changed, 66 insertions, 0 deletions
@@ -0,0 +1,66 @@ +from gi.repository import GLib, Gio + +from ctypes import CDLL, POINTER, Structure, CFUNCTYPE, pointer +from ctypes import c_void_p, c_char_p, c_bool +from ctypes.util import find_library + +from functools import partial + + +dbus_xml = ''' +<node> + <interface name="net.universe_factory.Pylock"> + <method name="Lock"/> + </interface> +</node> +''' + +gio = CDLL(find_library('gio-2.0')) + +nodeInfo = Gio.DBusNodeInfo.new_for_xml(dbus_xml) +interfaceInfo = nodeInfo.lookup_interface('net.universe_factory.Pylock') + +METHOD_CALL_FUNC = CFUNCTYPE(None, c_void_p, c_char_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p, c_void_p) +GET_PROPERTY_FUNC = CFUNCTYPE(c_void_p, c_void_p, c_char_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p) +SET_PROPERTY_FUNC = CFUNCTYPE(c_bool, c_void_p, c_char_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p, c_void_p) + +def method_call(locker, connection, sender, object_path, interface_name, method_name, parameters, invocation, user_data): + assert str(method_name, 'ascii') == 'Lock' + + locker.lock() + gio.g_dbus_method_invocation_return_value(invocation, hash(GLib.Variant.new_tuple())) + +@GET_PROPERTY_FUNC +def get_property(connection, sender, object_path, interface_name, property_name, error, user_data): + assert False + +@SET_PROPERTY_FUNC +def set_property(connection, sender, object_path, interface_name, property_name, value, error, user_data): + assert False + +class VTable(Structure): + _fields_ = [ + ("method_call", METHOD_CALL_FUNC), + ("get_property", GET_PROPERTY_FUNC), + ("set_property", SET_PROPERTY_FUNC) + ] + + def __init__(self, locker): + Structure.__init__(self) + self.method_call = METHOD_CALL_FUNC(partial(method_call, locker)) + self.get_property = get_property + self.set_property = set_property + +gio.g_dbus_connection_register_object.argtypes = [c_void_p, c_char_p, c_void_p, POINTER(VTable), c_void_p, c_void_p, c_void_p] +gio.g_dbus_method_invocation_return_value.argtypes = [c_void_p, c_void_p] + + +class DBus(object): + def __init__(self, locker): + self.vtable = VTable(locker) + + Gio.bus_own_name(Gio.BusType.SESSION, 'net.universe_factory.Pylock', Gio.BusNameOwnerFlags.NONE, self.bus_aquired, None, None) + + def bus_aquired(self, connection, name): + b = lambda s: bytes(s, 'ascii') + gio.g_dbus_connection_register_object(c_void_p(hash(connection)), b('/net/universe_factory/Pylock'), c_void_p(hash(interfaceInfo)), pointer(self.vtable), None, None, None) |