123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- #!/usr/bin/python
- # Tests p2p_connect
- # Will try to connect to another peer
- # and form a group
- ######### MAY NEED TO RUN AS SUDO #############
- import dbus
- import sys, os
- import time
- import gobject
- import getopt
- from dbus.mainloop.glib import DBusGMainLoop
- def usage():
- print "Usage:"
- print " %s -i <interface_name> -m <wps_method> \ " \
- % sys.argv[0]
- print " -a <addr> [-p <pin>] [-g <go_intent>] \ "
- print " [-w <wpas_dbus_interface>]"
- print "Options:"
- print " -i = interface name"
- print " -m = wps method"
- print " -a = peer address"
- print " -p = pin number (8 digits)"
- print " -g = group owner intent"
- print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
- print "Example:"
- print " %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]
- # Required Signals
- def GONegotiationSuccess(status):
- print "Go Negotiation Success"
- def GONegotiationFailure(status):
- print 'Go Negotiation Failed. Status:'
- print format(status)
- os._exit(0)
- def GroupStarted(properties):
- if properties.has_key("group_object"):
- print 'Group Formation Complete %s' \
- % properties["group_object"]
- os._exit(0)
- def WpsFailure(status, etc):
- print "WPS Authentication Failure".format(status)
- print etc
- os._exit(0)
- class P2P_Connect():
- # Needed Variables
- global bus
- global wpas_object
- global interface_object
- global p2p_interface
- global ifname
- global wpas
- global wpas_dbus_interface
- global timeout
- global path
- global wps_method
- global go_intent
- global addr
- global pin
- # Dbus Paths
- global wpas_dbus_opath
- global wpas_dbus_interfaces_opath
- global wpas_dbus_interfaces_interface
- global wpas_dbus_interfaces_p2pdevice
- # Dictionary of Arguements
- global p2p_connect_arguements
- # Constructor
- def __init__(self,ifname,wpas_dbus_interface,addr,
- pin,wps_method,go_intent):
- # Initializes variables and threads
- self.ifname = ifname
- self.wpas_dbus_interface = wpas_dbus_interface
- self.wps_method = wps_method
- self.go_intent = go_intent
- self.addr = addr
- self.pin = pin
- # Generating interface/object paths
- self.wpas_dbus_opath = \
- "/" + self.wpas_dbus_interface.replace(".","/")
- self.wpas_wpas_dbus_interfaces_opath = \
- self.wpas_dbus_opath + "/Interfaces"
- self.wpas_dbus_interfaces_interface = \
- self.wpas_dbus_interface + ".Interface"
- self.wpas_dbus_interfaces_p2pdevice = \
- self.wpas_dbus_interfaces_interface + ".P2PDevice"
- # Getting interfaces and objects
- DBusGMainLoop(set_as_default=True)
- self.bus = dbus.SystemBus()
- self.wpas_object = self.bus.get_object(
- self.wpas_dbus_interface,
- self.wpas_dbus_opath)
- self.wpas = dbus.Interface(
- self.wpas_object, self.wpas_dbus_interface)
- # See if wpa_supplicant already knows about this interface
- self.path = None
- try:
- self.path = self.wpas.GetInterface(ifname)
- except:
- if not str(exc).startswith(
- self.wpas_dbus_interface + \
- ".InterfaceUnknown:"):
- raise exc
- try:
- path = self.wpas.CreateInterface(
- {'Ifname': ifname, 'Driver': 'test'})
- time.sleep(1)
- except dbus.DBusException, exc:
- if not str(exc).startswith(
- self.wpas_dbus_interface + \
- ".InterfaceExists:"):
- raise exc
- # Get Interface and objects
- self.interface_object = self.bus.get_object(
- self.wpas_dbus_interface,self.path)
- self.p2p_interface = dbus.Interface(
- self.interface_object,
- self.wpas_dbus_interfaces_p2pdevice)
- # Add signals
- self.bus.add_signal_receiver(GONegotiationSuccess,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="GONegotiationSuccess")
- self.bus.add_signal_receiver(GONegotiationFailure,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="GONegotiationFailure")
- self.bus.add_signal_receiver(GroupStarted,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="GroupStarted")
- self.bus.add_signal_receiver(WpsFailure,
- dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
- signal_name="WpsFailed")
- #Constructing all the arguements needed to connect
- def constructArguements(self):
- # Adding required arguements
- self.p2p_connect_arguements = {'wps_method':self.wps_method,
- 'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
- # Display requires a pin, and a go intent of 15
- if (self.wps_method == 'display'):
- if (self.pin != None):
- self.p2p_connect_arguements.update({'pin':self.pin})
- else:
- print "Error:\n Pin required for wps_method=display"
- usage()
- quit()
- if (self.go_intent != None and int(self.go_intent) != 15):
- print "go_intent overwritten to 15"
- self.go_intent = '15'
- # Keypad requires a pin, and a go intent of less than 15
- elif (self.wps_method == 'keypad'):
- if (self.pin != None):
- self.p2p_connect_arguements.update({'pin':self.pin})
- else:
- print "Error:\n Pin required for wps_method=keypad"
- usage()
- quit()
- if (self.go_intent != None and int(self.go_intent) == 15):
- error = "Error :\n Group Owner intent cannot be" + \
- " 15 for wps_method=keypad"
- print error
- usage()
- quit()
- # Doesn't require pin
- # for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad
- elif (self.wps_method == 'pin'):
- if (self.pin != None):
- print "pin ignored"
- # No pin is required for pbc so it is ignored
- elif (self.wps_method == 'pbc'):
- if (self.pin != None):
- print "pin ignored"
- else:
- print "Error:\n wps_method not supported or does not exist"
- usage()
- quit()
- # Go_intent is optional for all arguements
- if (self.go_intent != None):
- self.p2p_connect_arguements.update(
- {'go_intent':dbus.Int32(self.go_intent)})
- # Running p2p_connect
- def run(self):
- try:
- result_pin = self.p2p_interface.Connect(
- self.p2p_connect_arguements)
- except dbus.DBusException, exc:
- raise exc
- if (self.wps_method == 'pin' and \
- not self.p2p_connect_arguements.has_key('pin') ):
- print "Connect return with pin value of %d " % int(result_pin)
- gobject.MainLoop().run()
- if __name__ == "__main__":
- # Required
- interface_name = None
- wps_method = None
- addr = None
- # Conditionally optional
- pin = None
- # Optional
- wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
- go_intent = None
- # Using getopts to handle options
- try:
- options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")
- except getopt.GetoptError:
- usage()
- quit()
- # If theres a switch, override default option
- for key, value in options:
- # Help
- if (key == "-h"):
- usage()
- quit()
- # Interface Name
- elif (key == "-i"):
- interface_name = value
- # WPS Method
- elif (key == "-m"):
- wps_method = value
- # Address
- elif (key == "-a"):
- addr = value
- # Pin
- elif (key == "-p"):
- pin = value
- # Group Owner Intent
- elif (key == "-g"):
- go_intent = value
- # Dbus interface
- elif (key == "-w"):
- wpas_dbus_interface = value
- else:
- assert False, "unhandled option"
- # Required Arguements check
- if (interface_name == None or wps_method == None or addr == None):
- print "Error:\n Required arguements not specified"
- usage()
- quit()
- # Group Owner Intent Check
- if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ):
- print "Error:\n Group Owner Intent must be between 0 and 15 inclusive"
- usage()
- quit()
- # Pin Check
- if (pin != None and len(pin) != 8):
- print "Error:\n Pin is not 8 digits"
- usage()
- quit()
- try:
- p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface,
- addr,pin,wps_method,go_intent)
- except:
- print "Error:\n Invalid Arguements"
- usage()
- quit()
- p2p_connect_test.constructArguements()
- p2p_connect_test.run()
- os._exit(0)
|