Skip to content

Commit 54d98b8

Browse files
arkqjlatusek
authored andcommitted
Simple script to proxy messages between D-Bus buses
1 parent 3195025 commit 54d98b8

File tree

1 file changed

+230
-0
lines changed

1 file changed

+230
-0
lines changed

scripts/tools/dbus-proxy-bluez.py

+230
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (c) 2024 Project CHIP Authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import os.path
20+
from argparse import ArgumentParser
21+
from collections import namedtuple
22+
23+
from gi.repository import Gio, GLib
24+
25+
26+
def bus_get_connection(address: str):
27+
"""Get a connection object for a given D-Bus bus."""
28+
if address == "session":
29+
address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SESSION)
30+
elif address == "system":
31+
address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SYSTEM)
32+
logging.info("Connecting to: %s", address)
33+
conn = Gio.DBusConnection.new_for_address_sync(
34+
address,
35+
Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT |
36+
Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION)
37+
logging.info("Assigned unique name: %s", conn.get_unique_name())
38+
return conn
39+
40+
41+
def bus_get_name_owner(conn, name: str):
42+
"""Get the unique name of a well known name on a D-Bus bus."""
43+
params = GLib.Variant("(s)", (name,))
44+
reply = conn.call_sync("org.freedesktop.DBus", "/org/freedesktop/DBus",
45+
"org.freedesktop.DBus", "GetNameOwner",
46+
params, None, Gio.DBusCallFlags.NONE, -1)
47+
return reply.get_child_value(0).get_string()
48+
49+
50+
def bus_introspect_path(conn, client: str, path: str):
51+
"""Introspect a D-Bus object path and return its node info."""
52+
reply = conn.call_sync(client, path,
53+
"org.freedesktop.DBus.Introspectable", "Introspect",
54+
None, None, Gio.DBusCallFlags.NONE, -1)
55+
xml = reply.get_child_value(0).get_string()
56+
return Gio.DBusNodeInfo.new_for_xml(xml)
57+
58+
59+
class DBusServiceProxy:
60+
61+
MappingKey = namedtuple("MappingKey", ["path", "iface"])
62+
63+
objects: dict[MappingKey, int] = {}
64+
subscriptions: set[str] = set()
65+
clients = {}
66+
67+
def __init__(self, source: str, proxy: str, service: str):
68+
self.source = bus_get_connection(source)
69+
self.proxy = bus_get_connection(proxy)
70+
self.service = service
71+
Gio.bus_own_name_on_connection(self.proxy, self.service,
72+
Gio.BusNameOwnerFlags.DO_NOT_QUEUE,
73+
self.on_bus_name_acquired,
74+
self.on_bus_name_lost)
75+
76+
def on_bus_name_acquired(self, conn, name):
77+
logging.info("Acquired name on proxy bus: %s", name)
78+
self.mirror_source_on_proxy(self.service, "/")
79+
80+
def on_bus_name_lost(self, conn, name):
81+
logging.debug("Lost name on proxy bus: %s", name)
82+
83+
def proxy_client_save(self, path, client):
84+
self.clients[path] = client
85+
86+
def proxy_client_load(self, path):
87+
return self.clients[path]
88+
89+
def register_object(self, conn, path, iface):
90+
key = DBusServiceProxy.MappingKey(path, iface.name)
91+
if key not in self.objects:
92+
logging.debug("Registering: %s { %s }", path, iface.name)
93+
id = conn.register_object(path, iface, self.on_method_call)
94+
self.objects[key] = id
95+
96+
def unregister_object(self, conn, path, iface_name):
97+
key = DBusServiceProxy.MappingKey(path, iface_name)
98+
if key in self.objects:
99+
logging.debug("Removing: %s { %s }", path, iface_name)
100+
conn.unregister_object(self.objects.pop(key))
101+
102+
def signal_subscribe(self, conn, client):
103+
"""Subscribe for signals from a D-Bus client."""
104+
if client not in self.subscriptions:
105+
conn.signal_subscribe(client, None, None, None, None,
106+
Gio.DBusSignalFlags.NONE,
107+
self.on_signal_received)
108+
self.subscriptions.add(client)
109+
110+
def mirror_path(self, conn_src, conn_dest, client, path, save=False):
111+
"""Mirror all interfaces and nodes of a D-Bus client object path.
112+
113+
Parameters:
114+
conn_src -- source D-Bus connection
115+
conn_dest -- proxy D-Bus connection
116+
client -- name of the client on the source bus
117+
path -- object path to mirror recursively
118+
save -- save the client name for the path
119+
120+
"""
121+
info = bus_introspect_path(conn_src, client, path)
122+
for iface in info.interfaces:
123+
if save:
124+
self.proxy_client_save(path, client)
125+
self.register_object(conn_dest, path, iface)
126+
for node in info.nodes:
127+
self.mirror_path(conn_src, conn_dest, client,
128+
os.path.join(path, node.path), save)
129+
130+
def mirror_source_on_proxy(self, client, path):
131+
"""Mirror source bus objects on the proxy bus."""
132+
self.signal_subscribe(self.source, client)
133+
self.mirror_path(self.source, self.proxy, client, path)
134+
135+
def mirror_proxy_on_source(self, client, path):
136+
"""Mirror proxy bus objects on the source bus."""
137+
self.signal_subscribe(self.proxy, client)
138+
self.mirror_path(self.proxy, self.source, client, path, True)
139+
140+
def on_method_call(self, conn, sender, *args, **kwargs):
141+
if conn == self.source:
142+
return self.on_method_call_from_source(sender, *args, **kwargs)
143+
return self.on_method_call_from_proxy(sender, *args, **kwargs)
144+
145+
def on_signal_received(self, conn, sender, *args, **kwargs):
146+
if conn == self.source:
147+
return self.on_signal_from_source(sender, *args, **kwargs)
148+
return self.on_signal_from_proxy(sender, *args, **kwargs)
149+
150+
def on_method_call_from_source(self, sender, path, iface, method,
151+
params, invocation):
152+
logging.debug("Call from source: %s %s.%s()", path, iface, method)
153+
self.proxy.call(self.proxy_client_load(path), path, iface, method,
154+
params, None, Gio.DBusCallFlags.NONE, -1, None,
155+
self.on_method_return, invocation)
156+
157+
def on_method_call_from_proxy(self, sender, path, iface, method,
158+
params, invocation):
159+
logging.debug("Call from proxy: %s %s.%s()", path, iface, method)
160+
self.source.call(self.service, path, iface, method,
161+
params, None, Gio.DBusCallFlags.NONE, -1, None,
162+
self.on_method_return, invocation)
163+
164+
def on_method_return(self, conn, result, invocation):
165+
try:
166+
logging.debug("Finishing call: %s %s.%s()",
167+
invocation.get_object_path(),
168+
invocation.get_interface_name(),
169+
invocation.get_method_name())
170+
reply = conn.call_with_unix_fd_list_finish(result)
171+
invocation.return_value_with_unix_fd_list(
172+
reply[0], reply.out_fd_list)
173+
except GLib.Error as e:
174+
invocation.return_gerror(e)
175+
176+
def on_signal_from_source(self, sender, path, iface, signal, params):
177+
logging.debug("Signal from source: %s %s.%s", path, iface, signal)
178+
if iface == "org.freedesktop.DBus.ObjectManager":
179+
if signal == "InterfacesAdded":
180+
dest_path = params.get_child_value(0).get_string()
181+
self.mirror_source_on_proxy(self.service, dest_path)
182+
if signal == "InterfacesRemoved":
183+
dest_path = params.get_child_value(0).get_string()
184+
for dest_iface in params.get_child_value(1).get_strv():
185+
self.unregister_object(self.proxy, dest_path, dest_iface)
186+
self.proxy.emit_signal(None, path, iface, signal, params)
187+
188+
def on_signal_from_proxy(self, sender, path, iface, signal, params):
189+
logging.debug("Signal from proxy: %s %s.%s", path, iface, signal)
190+
self.source.emit_signal(None, path, iface, signal, params)
191+
192+
193+
class BluezProxy(DBusServiceProxy):
194+
195+
def on_method_call_from_proxy(self, sender, path, iface, method,
196+
params, invocation):
197+
198+
if (iface == "org.bluez.GattManager1" and
199+
method == "RegisterApplication"):
200+
app_path = params.get_child_value(0).get_string()
201+
logging.info("Mirroring GATT application: %s %s", sender, app_path)
202+
self.mirror_proxy_on_source(sender, app_path)
203+
204+
if iface == "org.bluez.LEAdvertisingManager1":
205+
if method == "RegisterAdvertisement":
206+
app_path = params.get_child_value(0).get_string()
207+
logging.info("Mirroring advertiser: %s %s", sender, app_path)
208+
self.mirror_proxy_on_source(sender, app_path)
209+
210+
super().on_method_call_from_proxy(sender, path, iface, method,
211+
params, invocation)
212+
213+
214+
parser = ArgumentParser(description="BlueZ D-Bus proxy")
215+
parser.add_argument(
216+
"-v", "--verbose", action="store_true",
217+
help="enable debug output")
218+
parser.add_argument(
219+
"--bus-source", metavar="ADDRESS", default="system",
220+
help="""address of the source D-Bus bus; it can be a bus address string or
221+
'session' or 'system' keywords; default is '%(default)s'""")
222+
parser.add_argument(
223+
"--bus-proxy", metavar="ADDRESS", required=True,
224+
help="""address of the proxy D-Bus bus""")
225+
226+
args = parser.parse_args()
227+
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
228+
229+
BluezProxy(args.bus_source, args.bus_proxy, "org.bluez")
230+
GLib.MainLoop().run()

0 commit comments

Comments
 (0)