|
@@ -0,0 +1,299 @@
|
|
|
+#!/usr/bin/python
|
|
|
+# Tests p2p_connect
|
|
|
+# Will try to connect to another peer
|
|
|
+# and form a group
|
|
|
+######### MAY NEED TO RUN AS SUDO #############
|
|
|
+
|
|
|
+import dbus
|
|
|
+import sys, os
|
|
|
+import time
|
|
|
+import gobject
|
|
|
+import getopt
|
|
|
+from dbus.mainloop.glib import DBusGMainLoop
|
|
|
+
|
|
|
+
|
|
|
+def usage():
|
|
|
+ print "Usage:"
|
|
|
+ print " %s -i <interface_name> -m <wps_method> \ " \
|
|
|
+ % sys.argv[0]
|
|
|
+ print " -a <addr> [-p <pin>] [-g <go_intent>] \ "
|
|
|
+ print " [-w <wpas_dbus_interface>]"
|
|
|
+ print "Options:"
|
|
|
+ print " -i = interface name"
|
|
|
+ print " -m = wps method"
|
|
|
+ print " -a = peer address"
|
|
|
+ print " -p = pin number (8 digits)"
|
|
|
+ print " -g = group owner intent"
|
|
|
+ print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
|
|
|
+ print "Example:"
|
|
|
+ print " %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]
|
|
|
+
|
|
|
+
|
|
|
+# Required Signals
|
|
|
+def GONegotiationSuccess(status):
|
|
|
+ print "Go Negotiation Success"
|
|
|
+
|
|
|
+def GONegotiationFailure(status):
|
|
|
+ print 'Go Negotiation Failed. Status:'
|
|
|
+ print format(status)
|
|
|
+ os._exit(0)
|
|
|
+
|
|
|
+def GroupStarted(properties):
|
|
|
+ if properties.has_key("group_object"):
|
|
|
+ print 'Group Formation Complete %s' \
|
|
|
+ % properties["group_object"]
|
|
|
+ os._exit(0)
|
|
|
+
|
|
|
+def WpsFailure(status, etc):
|
|
|
+ print "WPS Authentication Failure".format(status)
|
|
|
+ print etc
|
|
|
+ os._exit(0)
|
|
|
+
|
|
|
+class P2P_Connect():
|
|
|
+ # Needed Variables
|
|
|
+ global bus
|
|
|
+ global wpas_object
|
|
|
+ global interface_object
|
|
|
+ global p2p_interface
|
|
|
+ global ifname
|
|
|
+ global wpas
|
|
|
+ global wpas_dbus_interface
|
|
|
+ global timeout
|
|
|
+ global path
|
|
|
+ global wps_method
|
|
|
+ global go_intent
|
|
|
+ global addr
|
|
|
+ global pin
|
|
|
+
|
|
|
+ # Dbus Paths
|
|
|
+ global wpas_dbus_opath
|
|
|
+ global wpas_dbus_interfaces_opath
|
|
|
+ global wpas_dbus_interfaces_interface
|
|
|
+ global wpas_dbus_interfaces_p2pdevice
|
|
|
+
|
|
|
+ # Dictionary of Arguements
|
|
|
+ global p2p_connect_arguements
|
|
|
+
|
|
|
+ # Constructor
|
|
|
+ def __init__(self,ifname,wpas_dbus_interface,addr,
|
|
|
+ pin,wps_method,go_intent):
|
|
|
+ # Initializes variables and threads
|
|
|
+ self.ifname = ifname
|
|
|
+ self.wpas_dbus_interface = wpas_dbus_interface
|
|
|
+ self.wps_method = wps_method
|
|
|
+ self.go_intent = go_intent
|
|
|
+ self.addr = addr
|
|
|
+ self.pin = pin
|
|
|
+
|
|
|
+ # Generating interface/object paths
|
|
|
+ self.wpas_dbus_opath = \
|
|
|
+ "/" + self.wpas_dbus_interface.replace(".","/")
|
|
|
+ self.wpas_wpas_dbus_interfaces_opath = \
|
|
|
+ self.wpas_dbus_opath + "/Interfaces"
|
|
|
+ self.wpas_dbus_interfaces_interface = \
|
|
|
+ self.wpas_dbus_interface + ".Interface"
|
|
|
+ self.wpas_dbus_interfaces_p2pdevice = \
|
|
|
+ self.wpas_dbus_interfaces_interface + ".P2PDevice"
|
|
|
+
|
|
|
+ # Getting interfaces and objects
|
|
|
+ DBusGMainLoop(set_as_default=True)
|
|
|
+ self.bus = dbus.SystemBus()
|
|
|
+ self.wpas_object = self.bus.get_object(
|
|
|
+ self.wpas_dbus_interface,
|
|
|
+ self.wpas_dbus_opath)
|
|
|
+ self.wpas = dbus.Interface(
|
|
|
+ self.wpas_object, self.wpas_dbus_interface)
|
|
|
+
|
|
|
+ # See if wpa_supplicant already knows about this interface
|
|
|
+ self.path = None
|
|
|
+ try:
|
|
|
+ self.path = self.wpas.GetInterface(ifname)
|
|
|
+ except:
|
|
|
+ if not str(exc).startswith(
|
|
|
+ self.wpas_dbus_interface + \
|
|
|
+ ".InterfaceUnknown:"):
|
|
|
+ raise exc
|
|
|
+ try:
|
|
|
+ path = self.wpas.CreateInterface(
|
|
|
+ {'Ifname': ifname, 'Driver': 'test'})
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ except dbus.DBusException, exc:
|
|
|
+ if not str(exc).startswith(
|
|
|
+ self.wpas_dbus_interface + \
|
|
|
+ ".InterfaceExists:"):
|
|
|
+ raise exc
|
|
|
+
|
|
|
+ # Get Interface and objects
|
|
|
+ self.interface_object = self.bus.get_object(
|
|
|
+ self.wpas_dbus_interface,self.path)
|
|
|
+ self.p2p_interface = dbus.Interface(
|
|
|
+ self.interface_object,
|
|
|
+ self.wpas_dbus_interfaces_p2pdevice)
|
|
|
+
|
|
|
+ # Add signals
|
|
|
+ self.bus.add_signal_receiver(GONegotiationSuccess,
|
|
|
+ dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
|
|
|
+ signal_name="GONegotiationSuccess")
|
|
|
+ self.bus.add_signal_receiver(GONegotiationFailure,
|
|
|
+ dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
|
|
|
+ signal_name="GONegotiationFailure")
|
|
|
+ self.bus.add_signal_receiver(GroupStarted,
|
|
|
+ dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
|
|
|
+ signal_name="GroupStarted")
|
|
|
+ self.bus.add_signal_receiver(WpsFailure,
|
|
|
+ dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
|
|
|
+ signal_name="WpsFailed")
|
|
|
+
|
|
|
+
|
|
|
+ #Constructing all the arguements needed to connect
|
|
|
+ def constructArguements(self):
|
|
|
+ # Adding required arguements
|
|
|
+ self.p2p_connect_arguements = {'wps_method':self.wps_method,
|
|
|
+ 'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
|
|
|
+
|
|
|
+ # Display requires a pin, and a go intent of 15
|
|
|
+ if (self.wps_method == 'display'):
|
|
|
+ if (self.pin != None):
|
|
|
+ self.p2p_connect_arguements.update({'pin':self.pin})
|
|
|
+ else:
|
|
|
+ print "Error:\n Pin required for wps_method=display"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ if (self.go_intent != None and int(self.go_intent) != 15):
|
|
|
+ print "go_intent overwritten to 15"
|
|
|
+
|
|
|
+ self.go_intent = '15'
|
|
|
+
|
|
|
+ # Keypad requires a pin, and a go intent of less than 15
|
|
|
+ elif (self.wps_method == 'keypad'):
|
|
|
+ if (self.pin != None):
|
|
|
+ self.p2p_connect_arguements.update({'pin':self.pin})
|
|
|
+ else:
|
|
|
+ print "Error:\n Pin required for wps_method=keypad"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ if (self.go_intent != None and int(self.go_intent) == 15):
|
|
|
+ error = "Error :\n Group Owner intent cannot be" + \
|
|
|
+ " 15 for wps_method=keypad"
|
|
|
+ print error
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ # Doesn't require pin
|
|
|
+ # for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad
|
|
|
+ elif (self.wps_method == 'pin'):
|
|
|
+ if (self.pin != None):
|
|
|
+ print "pin ignored"
|
|
|
+
|
|
|
+ # No pin is required for pbc so it is ignored
|
|
|
+ elif (self.wps_method == 'pbc'):
|
|
|
+ if (self.pin != None):
|
|
|
+ print "pin ignored"
|
|
|
+
|
|
|
+ else:
|
|
|
+ print "Error:\n wps_method not supported or does not exist"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ # Go_intent is optional for all arguements
|
|
|
+ if (self.go_intent != None):
|
|
|
+ self.p2p_connect_arguements.update(
|
|
|
+ {'go_intent':dbus.Int32(self.go_intent)})
|
|
|
+
|
|
|
+ # Running p2p_connect
|
|
|
+ def run(self):
|
|
|
+ try:
|
|
|
+ result_pin = self.p2p_interface.Connect(
|
|
|
+ self.p2p_connect_arguements)
|
|
|
+
|
|
|
+ except dbus.DBusException, exc:
|
|
|
+ raise exc
|
|
|
+
|
|
|
+ if (self.wps_method == 'pin' and \
|
|
|
+ not self.p2p_connect_arguements.has_key('pin') ):
|
|
|
+ print "Connect return with pin value of %d " % int(result_pin)
|
|
|
+ gobject.MainLoop().run()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+
|
|
|
+ # Required
|
|
|
+ interface_name = None
|
|
|
+ wps_method = None
|
|
|
+ addr = None
|
|
|
+
|
|
|
+ # Conditionally optional
|
|
|
+ pin = None
|
|
|
+
|
|
|
+ # Optional
|
|
|
+ wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
|
|
|
+ go_intent = None
|
|
|
+
|
|
|
+ # Using getopts to handle options
|
|
|
+ try:
|
|
|
+ options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")
|
|
|
+
|
|
|
+ except getopt.GetoptError:
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ # If theres a switch, override default option
|
|
|
+ for key, value in options:
|
|
|
+ # Help
|
|
|
+ if (key == "-h"):
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+ # Interface Name
|
|
|
+ elif (key == "-i"):
|
|
|
+ interface_name = value
|
|
|
+ # WPS Method
|
|
|
+ elif (key == "-m"):
|
|
|
+ wps_method = value
|
|
|
+ # Address
|
|
|
+ elif (key == "-a"):
|
|
|
+ addr = value
|
|
|
+ # Pin
|
|
|
+ elif (key == "-p"):
|
|
|
+ pin = value
|
|
|
+ # Group Owner Intent
|
|
|
+ elif (key == "-g"):
|
|
|
+ go_intent = value
|
|
|
+ # Dbus interface
|
|
|
+ elif (key == "-w"):
|
|
|
+ wpas_dbus_interface = value
|
|
|
+ else:
|
|
|
+ assert False, "unhandled option"
|
|
|
+
|
|
|
+ # Required Arguements check
|
|
|
+ if (interface_name == None or wps_method == None or addr == None):
|
|
|
+ print "Error:\n Required arguements not specified"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ # Group Owner Intent Check
|
|
|
+ if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ):
|
|
|
+ print "Error:\n Group Owner Intent must be between 0 and 15 inclusive"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ # Pin Check
|
|
|
+ if (pin != None and len(pin) != 8):
|
|
|
+ print "Error:\n Pin is not 8 digits"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ try:
|
|
|
+ p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface,
|
|
|
+ addr,pin,wps_method,go_intent)
|
|
|
+
|
|
|
+ except:
|
|
|
+ print "Error:\n Invalid Arguements"
|
|
|
+ usage()
|
|
|
+ quit()
|
|
|
+
|
|
|
+ p2p_connect_test.constructArguements()
|
|
|
+ p2p_connect_test.run()
|
|
|
+
|
|
|
+ os._exit(0)
|