|
@@ -20,6 +20,7 @@ except ImportError:
|
|
|
import hostapd
|
|
|
from wpasupplicant import WpaSupplicant
|
|
|
from utils import HwsimSkip, alloc_fail, fail_test
|
|
|
+from p2p_utils import *
|
|
|
from test_ap_tdls import connect_2sta_open
|
|
|
|
|
|
WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
|
|
@@ -3643,6 +3644,55 @@ def test_dbus_p2p_join(dev, apdev):
|
|
|
|
|
|
dev[2].p2p_stop_find()
|
|
|
|
|
|
+def test_dbus_p2p_invitation_received(dev, apdev):
|
|
|
+ """D-Bus P2P and InvitationReceived"""
|
|
|
+ (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
|
|
+ p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
|
|
+
|
|
|
+ form(dev[0], dev[1])
|
|
|
+ addr0 = dev[0].p2p_dev_addr()
|
|
|
+ dev[0].p2p_listen()
|
|
|
+ dev[0].global_request("SET persistent_reconnect 0")
|
|
|
+
|
|
|
+ if not dev[1].discover_peer(addr0, social=True):
|
|
|
+ raise Exception("Peer " + addr0 + " not found")
|
|
|
+ peer = dev[1].get_peer(addr0)
|
|
|
+
|
|
|
+ class TestDbusP2p(TestDbus):
|
|
|
+ def __init__(self, bus):
|
|
|
+ TestDbus.__init__(self, bus)
|
|
|
+ self.done = False
|
|
|
+
|
|
|
+ def __enter__(self):
|
|
|
+ gobject.timeout_add(1, self.run_test)
|
|
|
+ gobject.timeout_add(15000, self.timeout)
|
|
|
+ self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
|
|
|
+ "InvitationReceived")
|
|
|
+ self.loop.run()
|
|
|
+ return self
|
|
|
+
|
|
|
+ def invitationReceived(self, result):
|
|
|
+ logger.debug("invitationReceived: " + str(result))
|
|
|
+ self.done = True
|
|
|
+ self.loop.quit()
|
|
|
+
|
|
|
+ def run_test(self, *args):
|
|
|
+ logger.debug("run_test")
|
|
|
+ dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
|
|
+ cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
|
|
|
+ dev1.global_request(cmd)
|
|
|
+ return False
|
|
|
+
|
|
|
+ def success(self):
|
|
|
+ return self.done
|
|
|
+
|
|
|
+ with TestDbusP2p(bus) as t:
|
|
|
+ if not t.success():
|
|
|
+ raise Exception("Expected signals not seen")
|
|
|
+
|
|
|
+ dev[0].p2p_stop_find()
|
|
|
+ dev[1].p2p_stop_find()
|
|
|
+
|
|
|
def test_dbus_p2p_config(dev, apdev):
|
|
|
"""D-Bus Get/Set P2PDeviceConfig"""
|
|
|
try:
|