diff options
Diffstat (limited to 'Vland/drivers')
-rw-r--r-- | Vland/drivers/CiscoCatalyst.py | 721 | ||||
-rw-r--r-- | Vland/drivers/CiscoSX300.py | 697 | ||||
-rw-r--r-- | Vland/drivers/Dummy.py | 361 | ||||
-rw-r--r-- | Vland/drivers/Mellanox.py | 795 | ||||
-rw-r--r-- | Vland/drivers/NetgearXSM.py | 782 | ||||
-rw-r--r-- | Vland/drivers/TPLinkTLSG2XXX.py | 695 | ||||
-rw-r--r-- | Vland/drivers/__init__.py | 0 | ||||
-rw-r--r-- | Vland/drivers/common.py | 167 |
8 files changed, 4218 insertions, 0 deletions
diff --git a/Vland/drivers/CiscoCatalyst.py b/Vland/drivers/CiscoCatalyst.py new file mode 100644 index 0000000..16f47db --- /dev/null +++ b/Vland/drivers/CiscoCatalyst.py @@ -0,0 +1,721 @@ +#! /usr/bin/python + +# Copyright 2014-2015 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pexpect + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class CiscoCatalyst(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + + _capabilities = [ + 'TrunkWildCardVlans' # Trunk ports are on all VLANs by + # default, so we shouldn't need to + # bugger with them + ] + + # Regexp of expected hardware information - fail if we don't see + # this + _expected_descr_re = re.compile(r'WS-C\S+-\d+P') + + def __init__(self, switch_hostname, switch_telnetport=23, debug = False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) + self.errors = SwitchErrors() + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config into flash - we want config to + # remain across reboots + def switch_save_running_config(self): + try: + self._cli("copy running-config startup-config") + self.connection.expect("startup-config") + self._cli("startup-config") + self.connection.expect("OK") + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.switch_save_running_config() + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + # + # This will also implicitly cause a connection to be closed + def switch_restart(self): + self._cli("reload") + index = self.connection.expect(['has been modified', 'Proceed']) + if index == 0: + self._cli("n") # No, don't save + self.connection.expect("Proceed") + + # Fall through + self._cli("y") # Yes, continue to reset + self.connection.close(True) + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + try: + self._configure() + self._cli("vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + return + raise IOError("Failed to create VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_create(tag) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + + try: + self._configure() + self._cli("no vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + raise IOError("Failed to destroy VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_destroy(tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + + try: + self._configure() + self._cli("vlan %d" % tag) + self._cli("name %s" % name) + self._end_configure() + + # Validate it happened + read_name = self.vlan_get_name(tag) + if read_name != name: + raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" + % (tag, read_name, name)) + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_set_name(tag, name) + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + + try: + vlans = [] + + regex = re.compile(r'^ *(\d+).*(active)') + + self._cli("show vlan brief") + for line in self._read_long_output("show vlan brief"): + match = regex.match(line) + if match: + vlans.append(int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_list() + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + + try: + logging.debug("Grabbing the name of VLAN %d", tag) + name = None + regex = re.compile(r'^ *\d+\s+(\S+).*(active)') + self._cli("show vlan id %d" % tag) + for line in self._read_long_output("show vlan id"): + match = regex.match(line) + if match: + name = match.group(1) + name.strip() + return name + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_name(tag) + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s", port, mode) + if not self._is_port_mode_valid(mode): + raise InputError("Port mode %s is not allowed" % mode) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + + try: + self._configure() + self._cli("interface %s" % port) + if mode == "trunk": + self._cli("switchport trunk encapsulation dot1q") + self._cli("switchport trunk native vlan 1") + self._cli("switchport mode %s" % mode) + self._end_configure() + + # Validate it happened + read_mode = self._port_get_mode(port) + + if read_mode != mode: + raise IOError("Failed to set mode for port %s" % port) + + # And cache the result + self._port_modes[port] = mode + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_mode(port, mode) + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport access vlan %d" % tag) + self._cli("no shutdown") + self._end_configure() + + # Finally, validate things worked + read_vlan = int(self.port_get_access_vlan(port)) + if read_vlan != tag: + raise IOError("Failed to move access port %d to VLAN %d - got VLAN %d instead" + % (port, tag, read_vlan)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_access_vlan(port, tag) + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport trunk allowed vlan add %d" % tag) + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag or vlan == "ALL": + return + raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_add_trunk_to_vlan(port, tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport trunk allowed vlan remove %d" % tag) + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_remove_trunk_from_vlan(port, tag) + + # Get the configured VLAN tag for an access port (tag) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + vlan = 1 + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + regex = re.compile(r'Access Mode VLAN: (\d+)') + + try: + self._cli("show interfaces %s switchport" % port) + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + vlan = match.group(1) + return int(vlan) + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_access_vlan(port) + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLANs for trunk port %s", port) + vlans = [ ] + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + regex_start = re.compile('Trunking VLANs Enabled: (.*)') + regex_continue = re.compile(r'\s*(\d.*)') + + try: + self._cli("show interfaces %s switchport" % port) + + # Horrible parsing work - VLAN list may extend over several lines + in_match = False + vlan_text = '' + + for line in self._read_long_output("show interfaces switchport"): + if in_match: + match = regex_continue.match(line) + if match: + vlan_text += match.group(1) + else: + in_match = False + else: + match = regex_start.match(line) + if match: + vlan_text += match.group(1) + in_match = True + + vlans = self._parse_vlan_list(vlan_text) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_trunk_vlan_list(port) + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + + if not self.connection is None: + self.connection.close(True) + self.connection = None + + logging.debug("Connecting to Switch with: %s", self.exec_string) + self.connection = pexpect.spawn(self.exec_string, logfile=self.logger) + self._login() + + # Avoid paged output + self._cli("terminal length 0") + + # And grab details about the switch. in case we need it + self._get_systemdata() + + # And also validate them - make sure we're driving a switch of + # the correct model! Also store the serial number + descr_regex = re.compile(r'^cisco\s+(\S+)') + sn_regex = re.compile(r'System serial number\s+:\s+(\S+)') + descr = "" + + for line in self._systemdata: + match = descr_regex.match(line) + if match: + descr = match.group(1) + match = sn_regex.match(line) + if match: + self.serial_number = match.group(1) + + logging.debug("serial number is %s", self.serial_number) + logging.debug("system description is %s", descr) + + if not self._expected_descr_re.match(descr): + raise IOError("Switch %s not recognised by this driver: abort" % descr) + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _login(self): + logging.debug("attempting login with username %s, password %s", self._username, self._password) + self.connection.expect('User Access Verification') + if self._username is not None: + self.connection.expect("User Name:") + self._cli("%s" % self._username) + if self._password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._password, False) + while True: + index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) + if index != 4: # Any other means: failed to log in! + logging.error("Login failure: index %d\n", index) + logging.error("Login failure: %s\n", self.connection.match.before) + raise IOError + + # else + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + if self.connection.match.group(2) == ">": + # Need to enter "enable" mode too + self._cli("enable") + if self._enable_password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._enable_password, False) + index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) + if index != 3: # Any other means: failed to log in! + logging.error("Enable password failure: %s\n", self.connection.match) + raise IOError + return 0 + + def _logout(self): + logging.debug("Logging out") + self._cli("exit", False) + self.connection.close(True) + + def _configure(self): + self._cli("configure terminal") + + def _end_configure(self): + self._cli("end") + + def _read_long_output(self, text): + prompt = self._prompt_name + '#' + try: + self.connection.expect(prompt) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_in(text) + raise PExpectError("_read_long_output failed") + except: + logging.error("prompt is \"%s\"", prompt) + raise + + longbuf = [] + for line in self.connection.before.split('\r\n'): + longbuf.append(line.strip()) + return longbuf + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + + # Use "connect" to only identify lines in the output that + # match interfaces - it will match lines with "connected" or + # "notconnect". + regex = re.compile(r'^\s*([a-zA-Z0-9_/]*).*(connect)(.*)') + # Deliberately drop things marked as "routed", i.e. the + # management port + regex2 = re.compile('.*routed.*') + + try: + self._cli("show interfaces status") + for line in self._read_long_output("show interfaces status"): + match = regex.match(line) + if match: + interface = match.group(1) + junk = match.group(3) + match2 = regex2.match(junk) + if not match2: + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + logging.debug(" found %d ports on the switch", len(interfaces)) + return interfaces + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_names() + + # Get the mode of a port: access or trunk + def _port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + mode = '' + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + regex = re.compile('Administrative Mode: (.*)') + + try: + self._cli("show interfaces %s switchport" % port) + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + mode = match.group(1) + if mode == 'static access': + return 'access' + if mode == 'trunk': + return 'trunk' + # Needs special handling - it's the default port + # mode on these switches, and it doesn't + # interoperate with some other vendors. Sigh. + if mode == 'dynamic auto': + return 'dynamic' + return mode + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_mode(port) + + def _show_config(self): + logging.debug("Grabbing config") + try: + self._cli("show running-config") + return self._read_long_output("show running-config") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_config() + + def _show_clock(self): + logging.debug("Grabbing time") + try: + self._cli("show clock") + return self._read_long_output("show clock") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_clock() + + def _get_systemdata(self): + logging.debug("Grabbing system sw and hw versions") + + try: + self._systemdata = [] + self._cli("show version") + for line in self._read_long_output("show version"): + self._systemdata.append(line) + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_systemdata() + + def _parse_vlan_list(self, inputdata): + vlans = [] + + if inputdata == "ALL": + return ["ALL"] + elif inputdata == "NONE": + return [] + else: + # Parse the complex list + groups = inputdata.split(',') + for group in groups: + subgroups = group.split('-') + if len(subgroups) == 1: + vlans.append(int(subgroups[0])) + elif len(subgroups) == 2: + for i in range (int(subgroups[0]), int(subgroups[1]) + 1): + vlans.append(i) + else: + logging.debug("Can't parse group \"" + group + "\"") + + return vlans + + # Wrapper around connection.send - by default, expect() the same + # text we've sent, to remove it from the output from the + # switch. For the few cases where we don't need that, override + # this using echo=False. + # Horrible, but seems to work. + def _cli(self, text, echo=True): + self.connection.send(text + '\r') + if echo: + try: + self.connection.expect(text) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_out(text) + raise PExpectError("_cli failed on %s" % text) + except: + logging.error("Unexpected error: %s", sys.exc_info()[0]) + raise + +if __name__ == "__main__": + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = 'vlandswitch01' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = CiscoCatalyst(switch, 23, debug=True) + p.switch_connect(None, 'lngvirtual', 'lngenable') + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.vlan_get_name(2) + print "VLAN 2 is named \"%s\"" % buf + + print "Create VLAN 3" + p.vlan_create(3) + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "Set name of VLAN 3 to test333" + p.vlan_set_name(3, "test333") + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + print "Destroy VLAN 3" + p.vlan_destroy(3) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.port_get_mode("Gi1/0/10") + print "Port Gi1/0/10 is in %s mode" % buf + + buf = p.port_get_mode("Gi1/0/11") + print "Port Gi1/0/11 is in %s mode" % buf + + # Test access stuff + print "Set Gi1/0/9 to access mode" + p.port_set_mode("Gi1/0/9", "access") + + print "Move Gi1/0/9 to VLAN 4" + p.port_set_access_vlan("Gi1/0/9", 4) + + buf = p.port_get_access_vlan("Gi1/0/9") + print "Read from switch: Gi1/0/9 is on VLAN %s" % buf + + print "Move Gi1/0/9 back to VLAN 1" + p.port_set_access_vlan("Gi1/0/9", 1) + + # Test access stuff + print "Set Gi1/0/9 to trunk mode" + p.port_set_mode("Gi1/0/9", "trunk") + print "Read from switch: which VLANs is Gi1/0/9 on?" + buf = p.port_get_trunk_vlan_list("Gi1/0/9") + p.dump_list(buf) + print "Add Gi1/0/9 to VLAN 2" + p.port_add_trunk_to_vlan("Gi1/0/9", 2) + print "Add Gi1/0/9 to VLAN 3" + p.port_add_trunk_to_vlan("Gi1/0/9", 3) + print "Add Gi1/0/9 to VLAN 4" + p.port_add_trunk_to_vlan("Gi1/0/9", 4) + print "Read from switch: which VLANs is Gi1/0/9 on?" + buf = p.port_get_trunk_vlan_list("Gi1/0/9") + p.dump_list(buf) + + p.port_remove_trunk_from_vlan("Gi1/0/9", 3) + p.port_remove_trunk_from_vlan("Gi1/0/9", 3) + p.port_remove_trunk_from_vlan("Gi1/0/9", 4) + print "Read from switch: which VLANs is Gi1/0/9 on?" + buf = p.port_get_trunk_vlan_list("Gi1/0/9") + p.dump_list(buf) + +# print 'Restarting switch, to explicitly reset config' +# p.switch_restart() + +# p.switch_save_running_config() + +# p.switch_disconnect() +# p._show_config() diff --git a/Vland/drivers/CiscoSX300.py b/Vland/drivers/CiscoSX300.py new file mode 100644 index 0000000..a6f5446 --- /dev/null +++ b/Vland/drivers/CiscoSX300.py @@ -0,0 +1,697 @@ +#! /usr/bin/python + +# Copyright 2014-2015 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pexpect + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class CiscoSX300(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + + # No extra capabilities for this switch/driver yet + _capabilities = [ + ] + + # Regexp of expected hardware information - fail if we don't see + # this + _expected_descr_re = re.compile(r'.*\d+-Port.*Managed Switch.*') + + def __init__(self, switch_hostname, switch_telnetport=23, debug = False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) + self.errors = SwitchErrors() + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config into flash - we want config to + # remain across reboots + def switch_save_running_config(self): + try: + self._cli("copy running-config startup-config") + self.connection.expect("Y/N") + self._cli("y") + self.connection.expect("succeeded") + except (PExpectError, pexpect.EOF, pexpect.TIMEOUT): + # recurse on error + self._switch_connect() + self.switch_save_running_config() + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + # + # This will also implicitly cause a connection to be closed + def switch_restart(self): + self._cli("reload") + index = self.connection.expect(['Are you sure', 'will reset']) + if index == 0: + self._cli("y") # Yes, continue without saving + self.connection.expect("reset the whole") + + # Fall through + self._cli("y") # Yes, continue to reset + self.connection.close(True) + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + + try: + self._configure() + self._cli("vlan database") + self._cli("vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + return + raise IOError("Failed to create VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_create(tag) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + + try: + self._configure() + self._cli("no vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + raise IOError("Failed to destroy VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_destroy(tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + + try: + self._configure() + self._cli("vlan %d" % tag) + self._cli("interface vlan %d" % tag) + self._cli("name %s" % name) + self._end_configure() + + # Validate it happened + read_name = self.vlan_get_name(tag) + if read_name != name: + raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" + % (tag, read_name, name)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_set_name(tag, name) + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + + try: + vlans = [] + regex = re.compile(r'^ *(\d+).*(D|S|G|R)') + + self._cli("show vlan") + for line in self._read_long_output("show vlan"): + match = regex.match(line) + if match: + vlans.append(int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_list() + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + logging.debug("Grabbing the name of VLAN %d", tag) + + try: + name = None + regex = re.compile(r'^ *\d+\s+(\S+).*(D|S|G|R)') + self._cli("show vlan tag %d" % tag) + for line in self._read_long_output("show vlan tag"): + match = regex.match(line) + if match: + name = match.group(1) + name.strip() + return name + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_name(tag) + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s", port, mode) + if not self._is_port_mode_valid(mode): + raise InputError("Port mode %s is not allowed" % mode) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport mode %s" % mode) + self._end_configure() + + # Validate it happened + read_mode = self._port_get_mode(port) + if read_mode != mode: + raise IOError("Failed to set mode for port %s" % port) + + # And cache the result + self._port_modes[port] = mode + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_mode(port, mode) + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport access vlan %d" % tag) + self._end_configure() + + # Validate things worked + read_vlan = int(self.port_get_access_vlan(port)) + if read_vlan != tag: + raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead" + % (port, tag, read_vlan)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_access_vlan(port, tag) + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport trunk allowed vlan add %d" % tag) + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + return + raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_add_trunk_to_vlan(port, tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("switchport trunk allowed vlan remove %d" % tag) + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_remove_trunk_from_vlan(port, tag) + + # Get the configured VLAN tag for an access port (tag) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + vlan = 1 + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + regex = re.compile(r'(\d+)\s+\S+\s+Untagged\s+(D|S|G|R)') + + try: + self._cli("show interfaces switchport %s" % port) + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + vlan = match.group(1) + return int(vlan) + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_access_vlan(port) + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLANs for trunk port %s", port) + vlans = [ ] + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + regex = re.compile(r'(\d+)\s+\S+\s+(Tagged|Untagged)\s+(D|S|G|R)') + + try: + self._cli("show interfaces switchport %s" % port) + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + vlans.append (int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_trunk_vlan_list(port) + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + + if not self.connection is None: + self.connection.close(True) + self.connection = None + + logging.debug("Connecting to Switch with: %s", self.exec_string) + self.connection = pexpect.spawn(self.exec_string, logfile=self.logger) + self._login() + + # Avoid paged output + self._cli("terminal datadump") + + # And grab details about the switch. in case we need it + self._get_systemdata() + + # And also validate them - make sure we're driving a switch of + # the correct model! Also store the serial number + descr_regex = re.compile(r'System Description:.\s+(.*)') + sn_regex = re.compile(r'SN:\s+(\S_)') + descr = "" + + for line in self._systemdata: + match = descr_regex.match(line) + if match: + descr = match.group(1) + match = sn_regex.match(line) + if match: + self.serial_number = match.group(1) + + if not self._expected_descr_re.match(descr): + raise IOError("Switch %s not recognised by this driver: abort" % descr) + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _login(self): + logging.debug("attempting login with username %s, password %s", self._username, self._password) + self._cli("") + self.connection.expect("User Name:") + self._cli("%s" % self._username) + self.connection.expect("Password:") + self._cli("%s" % self._password, False) + self.connection.expect(r"\*\*") + while True: + index = self.connection.expect(['User Name:', 'authentication failed', r'([^#]+)#', 'Password:', '.+']) + if index == 0 or index == 1: # Failed to log in! + logging.error("Login failure: %s\n", self.connection.match) + raise IOError + elif index == 2: + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + # Horrible output from the switch at login time may + # confuse our pexpect stuff here. If we've somehow got + # multiple lines of output, clean up and just take the + # *last* line here. Anything before that is going to + # just be noise from the "***" password input, etc. + prompt_lines = self._prompt_name.split('\r\n') + if len(prompt_lines) > 1: + self._prompt_name = prompt_lines[-1] + logging.debug("Got prompt name %s", self._prompt_name) + return 0 + elif index == 3 or index == 4: + self._cli("", False) + + def _logout(self): + logging.debug("Logging out") + self._cli("exit", False) + self.connection.close(True) + + def _configure(self): + self._cli("configure terminal") + + def _end_configure(self): + self._cli("end") + + def _read_long_output(self, text): + prompt = self._prompt_name + '#' + try: + self.connection.expect(prompt) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_in(text) + raise PExpectError("_read_long_output failed") + except: + logging.error("prompt is \"%s\"", prompt) + raise + + longbuf = [] + for line in self.connection.before.split('\r\n'): + longbuf.append(line.strip()) + return longbuf + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + + # Use "Up" or "Down" to only identify lines in the output that + # match interfaces that exist + regex = re.compile(r'^(\w+).*(Up|Down)') + + try: + self._cli("show interfaces status detailed") + for line in self._read_long_output("show interfaces status detailed"): + match = regex.match(line) + if match: + interface = match.group(1) + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + logging.debug(" found %d ports on the switch", len(interfaces)) + return interfaces + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_names() + + # Get the mode of a port: access or trunk + def _port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + mode = '' + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + regex = re.compile(r'Port Mode: (\S+)') + + try: + self._cli("show interfaces switchport %s" % port) + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + mode = match.group(1) + return mode.lower() + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_mode(port) + + def _show_config(self): + logging.debug("Grabbing config") + try: + self._cli("show running-config") + return self._read_long_output("show running-config") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_config() + + def _show_clock(self): + logging.debug("Grabbing time") + try: + self._cli("show clock") + return self._read_long_output("show clock") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_clock() + + def _get_systemdata(self): + logging.debug("Grabbing system data") + + try: + self._systemdata = [] + self._cli("show system") + for line in self._read_long_output("show system"): + self._systemdata.append(line) + + logging.debug("Grabbing system sw and hw versions") + self._cli("show version") + for line in self._read_long_output("show version"): + self._systemdata.append(line) + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_systemdata() + + ###################################### + # Internal port access helper methods + ###################################### + # N.B. No parameter checking here, for speed reasons - if you're + # calling this internal API then you should already have validated + # things yourself! Equally, no post-set checks in here - do that + # at the higher level. + ###################################### + + # Wrapper around connection.send - by default, expect() the same + # text we've sent, to remove it from the output from the + # switch. For the few cases where we don't need that, override + # this using echo=False. + # Horrible, but seems to work. + def _cli(self, text, echo=True): + self.connection.send(text + '\r') + if echo: + try: + self.connection.expect(text) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_out(text) + raise PExpectError("_cli failed on %s" % text) + except: + logging.error("Unexpected error: %s", sys.exc_info()[0]) + raise + +if __name__ == "__main__": +# p = CiscoSX300('10.172.2.52', 23) + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = 'vlandswitch02' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + portname_base = "fa" + # Text to match if we're on a SG-series switch, ports all called gi<number> + sys_descr_re = re.compile('System Description.*Gigabit') + + def _port_name(number): + return "%s%d" % (portname_base, number) + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = CiscoSX300(switch, 23, debug = True) + p.switch_connect('cisco', 'cisco', None) + #buf = p._show_clock() + #print "%s" % buf + #buf = p._show_config() + #p.dump_list(buf) + + print "System data:" + p.dump_list(p._systemdata) + for l in p._systemdata: + m = sys_descr_re.match(l) + if m: + print 'Found an SG switch, using "gi" as port name prefix for testing' + portname_base = "gi" + + if portname_base == "fa": + print 'Found an SF switch, using "fa" as port name prefix for testing' + + print "Creating VLANs for testing:" + for i in [ 2, 3, 4, 5, 20 ]: + p.vlan_create(i) + p.vlan_set_name(i, "test%d" % i) + print " %d (test%d)" % (i, i) + + #print "And dump config\n" + #buf = p._show_config() + #print "%s" % buf + + #print "Destroying VLAN 2\n" + #p.vlan_destroy(2) + + #print "And dump config\n" + #buf = p._show_config() + #print "%s" % buf + + #print "Port names are:" + #buf = p.switch_get_port_names() + #p.dump_list(buf) + + #buf = p.vlan_get_name(25) + #print "VLAN with tag 25 is called \"%s\"" % buf + + #p.vlan_set_name(35, "foo") + #print "VLAN with tag 35 is called \"foo\"" + + #buf = p.port_get_mode(_port_name(12)) + #print "Port %s is in %s mode" % (_port_name(12), buf) + + # Test access stuff + print "Set %s to access mode" % _port_name(6) + p.port_set_mode(_port_name(6), "access") + print "Move %s to VLAN 2" % _port_name(6) + p.port_set_access_vlan(_port_name(6), 2) + buf = p.port_get_access_vlan(_port_name(6)) + print "Read from switch: %s is on VLAN %s" % (_port_name(6), buf) + print "Move %s back to default VLAN 1" % _port_name(6) + p.port_set_access_vlan(_port_name(6), 1) + #print "And move %s back to a trunk port" % _port_name(6) + #p.port_set_mode(_port_name(6), "trunk") + #buf = p.port_get_mode(_port_name(6)) + #print "Port %s is in %s mode" % (_port_name(6), buf) + + # Test trunk stuff + print "Set %s to trunk mode" % _port_name(2) + p.port_set_mode(_port_name(2), "trunk") + print "Add %s to VLAN 2" % _port_name(2) + p.port_add_trunk_to_vlan(_port_name(2), 2) + print "Add %s to VLAN 3" % _port_name(2) + p.port_add_trunk_to_vlan(_port_name(2), 3) + print "Add %s to VLAN 4" % _port_name(2) + p.port_add_trunk_to_vlan(_port_name(2), 4) + print "Read from switch: which VLANs is %s on?" % _port_name(2) + buf = p.port_get_trunk_vlan_list(_port_name(2)) + p.dump_list(buf) + + print "Remove %s from VLANs 3,3,4" % _port_name(2) + p.port_remove_trunk_from_vlan(_port_name(2), 3) + p.port_remove_trunk_from_vlan(_port_name(2), 3) + p.port_remove_trunk_from_vlan(_port_name(2), 4) + print "Read from switch: which VLANs is %s on?" % _port_name(2) + buf = p.port_get_trunk_vlan_list(_port_name(2)) + p.dump_list(buf) + + # print "Adding lots of ports to VLANs" + # p.port_add_trunk_to_vlan(_port_name(1), 2) + # p.port_add_trunk_to_vlan(_port_name(3), 2) + # p.port_add_trunk_to_vlan(_port_name(5), 2) + # p.port_add_trunk_to_vlan(_port_name(7), 2) + # p.port_add_trunk_to_vlan(_port_name(9), 2) + # p.port_add_trunk_to_vlan(_port_name(11), 2) + # p.port_add_trunk_to_vlan(_port_name(13), 2) + # p.port_add_trunk_to_vlan(_port_name(15), 2) + # p.port_add_trunk_to_vlan(_port_name(17), 2) + # p.port_add_trunk_to_vlan(_port_name(19), 2) + # p.port_add_trunk_to_vlan(_port_name(21), 2) + # p.port_add_trunk_to_vlan(_port_name(23), 2) + # p.port_add_trunk_to_vlan(_port_name(4, 2) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + +# print 'Restarting switch, to explicitly reset config' +# p.switch_restart() + +# p.switch_save_running_config() +# p._show_config() diff --git a/Vland/drivers/Dummy.py b/Vland/drivers/Dummy.py new file mode 100644 index 0000000..f630184 --- /dev/null +++ b/Vland/drivers/Dummy.py @@ -0,0 +1,361 @@ +#! /usr/bin/python + +# Copyright 2015 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pickle +import pexpect + +# Dummy switch driver, designed specifically for +# testing/validation. Just remembers what it's been told and gives the +# same data back on demand. +# +# To keep track of data in the dummy switch, this code will simply +# dump out and read back its internal state to/from a Python pickle +# file as needed. On first use, if no such file exists then the Dummy +# driver will simply generate a simple switch model: +# +# * N ports in access mode +# * 1 VLAN (tag 1) labelled DEFAULT +# +# The "hostname" given to the switch in VLANd is important, as it will +# determine both the number of ports allocated in this model and the +# name of the pickle file used for data storage. Call the switch +# "dummy-N" in your vland.cfg file to have N ports. If you want to use +# more than one dummy switch instance, ensure you give them different +# numbers, e.g. "dummy-25", "dummy-48", etc. + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class Dummy(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + _dummy_vlans = {} + _dummy_ports = {} + _state_file = None + + _capabilities = [ + ] + + def __init__(self, switch_hostname, switch_telnetport=23, debug = False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.errors = SwitchErrors() + self._state_file = "%s.pk" % switch_hostname + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config - we want config to remain + # across reboots + def switch_save_running_config(self): + pass + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + def switch_restart(self): + pass + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + if not tag in self._dummy_vlans: + self._dummy_vlans[tag] = "VLAN%s" % tag + else: + # It's not an error if it already exists, but log anyway + logging.debug("VLAN %d already exists, name %s", + tag, self._dummy_vlans[tag]) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + if tag in self._dummy_vlans: + del self._dummy_vlans[tag] + else: + # It's not an error if it doesn't exist, but log anyway + logging.debug("VLAN %d did not exist", tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + if not tag in self._dummy_vlans: + raise InputError("Tag %d does not exist") + self._dummy_vlans[tag] = "VLAN%s" % tag + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + return sorted(self._dummy_vlans.keys()) + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + logging.debug("Grabbing the name of VLAN %d", tag) + if not tag in self._dummy_vlans: + raise InputError("Tag %d does not exist") + return self._dummy_vlans[tag] + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s mode", port, mode) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + self._dummy_ports[port]['mode'] = mode + + # Get the mode of a port: access or trunk + def port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + return self._dummy_ports[port]['mode'] + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + if not tag in self._dummy_vlans: + raise InputError("VLAN %d does not exist" % tag) + self._dummy_ports[port]['access_vlan'] = tag + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + if not tag in self._dummy_vlans: + raise InputError("VLAN %d does not exist" % tag) + self._dummy_ports[port]['trunk_vlans'].append(tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + if not tag in self._dummy_vlans: + raise InputError("VLAN %d does not exist" % tag) + self._dummy_ports[port]['trunk_vlans'].remove(tag) + + # Get the configured VLAN tag for an access port (tag) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + return self._dummy_ports[port]['access_vlan'] + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLAN(s) for trunk port %s", port) + if not port in self._dummy_ports: + raise InputError("Port %s does not exist" % port) + return sorted(self._dummy_ports[port]['trunk_vlans']) + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + # Open data file if it exists, otherwise initialise + try: + pkl_file = open(self._state_file, 'rb') + self._dummy_vlans = pickle.load(pkl_file) + self._dummy_ports = pickle.load(pkl_file) + pkl_file.close() + except: + # Create data here + self._dummy_vlans = {1: 'DEFAULT'} + match = re.match(r'dummy-(\d+)', self.hostname) + if match: + num_ports = int(match.group(1)) + else: + raise InputError("Unable to determine number of ports from switch name") + for i in range(1, num_ports+1): + port_name = "dm%2.2d" % int(i) + self._dummy_ports[port_name] = {} + self._dummy_ports[port_name]['mode'] = 'access' + self._dummy_ports[port_name]['access_vlan'] = 1 + self._dummy_ports[port_name]['trunk_vlans'] = [] + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _logout(self): + pkl_file = open(self._state_file, 'wb') + pickle.dump(self._dummy_vlans, pkl_file) + pickle.dump(self._dummy_ports, pkl_file) + pkl_file.close() + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + for interface in sorted(self._dummy_ports.keys()): + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + return interfaces + +if __name__ == "__main__": + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = 'dummy-48' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = Dummy(switch, 23, debug=True) + p.switch_connect('admin', '', None) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.vlan_get_name(1) + print "VLAN 1 is named \"%s\"" % buf + + print "Create VLAN 3" + p.vlan_create(3) + + print "Create VLAN 4" + p.vlan_create(4) + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "Set name of VLAN 3 to test333" + p.vlan_set_name(3, "test333") + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + print "Destroy VLAN 3" + p.vlan_destroy(3) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.port_get_mode("dm10") + print "Port dm10 is in %s mode" % buf + + buf = p.port_get_mode("dm11") + print "Port dm11 is in %s mode" % buf + + # Test access stuff + print "Set dm09 to access mode" + p.port_set_mode("dm09", "access") + + print "Move dm9 to VLAN 4" + p.port_set_access_vlan("dm09", 4) + + buf = p.port_get_access_vlan("dm09") + print "Read from switch: dm09 is on VLAN %s" % buf + + print "Move dm09 back to VLAN 1" + p.port_set_access_vlan("dm09", 1) + + print "Create VLAN 2" + p.vlan_create(2) + + print "Create VLAN 3" + p.vlan_create(3) + + print "Create VLAN 4" + p.vlan_create(4) + + # Test access stuff + print "Set dm09 to trunk mode" + p.port_set_mode("dm09", "trunk") + print "Read from switch: which VLANs is dm09 on?" + buf = p.port_get_trunk_vlan_list("dm09") + p.dump_list(buf) + + # The adds below are NOOPs in effect on this switch - no filtering + # for "trunk" ports + print "Add dm09 to VLAN 2" + p.port_add_trunk_to_vlan("dm09", 2) + print "Add dm09 to VLAN 3" + p.port_add_trunk_to_vlan("dm09", 3) + print "Add dm09 to VLAN 4" + p.port_add_trunk_to_vlan("dm09", 4) + print "Read from switch: which VLANs is dm09 on?" + buf = p.port_get_trunk_vlan_list("dm09") + p.dump_list(buf) + + # And the same for removals here + p.port_remove_trunk_from_vlan("dm09", 3) + p.port_remove_trunk_from_vlan("dm09", 2) + p.port_remove_trunk_from_vlan("dm09", 4) + print "Read from switch: which VLANs is dm09 on?" + buf = p.port_get_trunk_vlan_list("dm09") + p.dump_list(buf) + + print 'Restarting switch, to explicitly reset config' + p.switch_restart() + + p.switch_save_running_config() + + p.switch_disconnect() diff --git a/Vland/drivers/Mellanox.py b/Vland/drivers/Mellanox.py new file mode 100644 index 0000000..ea74bf0 --- /dev/null +++ b/Vland/drivers/Mellanox.py @@ -0,0 +1,795 @@ +#! /usr/bin/python + +# Copyright 2018 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pexpect + +# Mellanox MLNX-OS driver +# Developed and tested against the SN2100 in the Linaro LAVA lab + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class Mellanox(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + + _capabilities = [ + 'TrunkWildCardVlans' # Trunk ports are on all VLANs by + # default, so we shouldn't need to + # bugger with them + ] + + # Regexp of expected hardware information - fail if we don't see + # this + _expected_descr_re = re.compile(r'MLNX-OS') + + def __init__(self, switch_hostname, switch_telnetport=23, debug = False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) + self.errors = SwitchErrors() + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config into flash - we want config to + # remain across reboots + def switch_save_running_config(self): + try: + self._configure() + self._cli("configuration write") + self._end_configure() + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.switch_save_running_config() + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + # + # This will also implicitly cause a connection to be closed + def switch_restart(self): + self._cli("reload noconfirm") + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + try: + self._configure() + self._cli("vlan %d" % tag) + self._end_vlan() + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + return + raise IOError("Failed to create VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_create(tag) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + + try: + self._configure() + self._cli("no vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + raise IOError("Failed to destroy VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_destroy(tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + + try: + self._configure() + self._cli("vlan %d name %s" % (tag, name)) + self._end_configure() + + # This switch *might* have problems if we drive it too quickly? At + # least one instance of set_name()/get_name() not working. This + # might help? + self._delay() + + # And retry around here + retries = 5 + read_name = None + while (retries > 0 and read_name is None): + # Validate it happened + read_name = self.vlan_get_name(tag) + retries -= 1 + + if read_name != name: + raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" + % (tag, read_name, name)) + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_set_name(tag, name) + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + + try: + vlans = [] + + regex = re.compile(r'^ *(\d+)') + + self._cli("show vlan") + for line in self._read_long_output("show vlan"): + match = regex.match(line) + if match: + vlans.append(int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_list() + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + + try: + logging.debug("Grabbing the name of VLAN %d", tag) + name = None + + # Ugh, the output here is messy. VLAN names can include spaces, and + # there are no delimiters in the output, e.g.: + # VLAN Name Ports + # ---- ----------- -------------------------------------- + # 1 default Eth1/1/1, Eth1/1/2, Eth1/2, Eth1/3/1, Eth1/3/2, + # Eth1/4, Eth1/5, Eth1/6, Eth1/7, Eth1/8, + # Eth1/10, Eth1/12, Eth1/13, Eth1/14, Eth1/15, + # Eth1/16 + # 102 mdev testing + # 103 vpp 1 performance testing Eth1/1/3, Eth1/9 + # 104 vpp 2 performance testing Eth1/1/4, Eth1/11 + # + # Simplest strategy: + # 1. Match on a leading number and grab all the text after it + # 2. Drop anything starting with "Eth" to EOL + # 3. Strip leading and trailing whitespace + # + # Not perfect, but it'll have to do. Anybody including "Eth" in a + # VLAN name deserves to lose... + + regex = re.compile(r'^ *\d+\s+(.+)') + self._cli("show vlan id %d" % tag) + for line in self._read_long_output("show vlan id"): + match = regex.match(line) + if match: + name = re.sub(r'Eth.*$',"",match.group(1)).strip() + if name is None: + logging.debug("vlan_get_name: did not find a name") + return name + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_name(tag) + + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s", port, mode) + if not self._is_port_mode_valid(mode): + raise InputError("Port mode %s is not allowed" % mode) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + + try: + self._configure() + self._cli("interface ethernet %s" % port) + self._cli("switchport mode %s" % mode) + if mode == "trunk": + # Put the new trunk port on all VLANs + self._cli("switchport trunk allowed-vlan all") + self._end_interface() + self._end_configure() + + # Validate it happened + read_mode = self._port_get_mode(port) + if read_mode != mode: + raise IOError("Failed to set mode for port %s" % port) + + # And cache the result + self._port_modes[port] = mode + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_mode(port, mode) + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + + try: + self._configure() + self._cli("interface ethernet %s" % port) + self._cli("switchport access vlan %d" % tag) + self._end_interface() + self._end_configure() + + # Validate things worked + read_vlan = int(self.port_get_access_vlan(port)) + if read_vlan != tag: + raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead" + % (port, tag, read_vlan)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_access_vlan(port, tag) + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface ethernet %s" % port) + self._cli("switchport trunk allowed-vlan add %d" % tag) + self._end_interface() + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag or vlan == "ALL": + return + raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_add_trunk_to_vlan(port, tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface ethernet %s" % port) + self._cli("switchport trunk allowed-vlan remove %d" % tag) + self._end_interface() + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_remove_trunk_from_vlan(port, tag) + + # Get the configured VLAN tag for an access port (tag) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + + regex = re.compile(r'^Eth%s\s+access\s+(\d+)' % port) + + try: + self._cli("show interfaces switchport") + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + vlan = match.group(1) + return int(vlan) + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_access_vlan(port) + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLANs for trunk port %s", port) + vlans = [] + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + regex_start = re.compile(r'^Eth%s\s+trunk\s+N/A\s+(.*)' % port) + regex_continue = re.compile(r'^(\d.*)') + + try: + self._cli("show interfaces switchport") + + # Complex parsing work - VLAN list may extend over several lines, e.g.: + # + # Eth1/16 trunk N/A 1, 102, 103, 104, 1000, 1001, 1002 + # 1003, 1004 + # + in_match = False + vlan_text = '' + + for line in self._read_long_output("show interfaces switchport"): + if in_match: + match = regex_continue.match(line) + if match: + vlan_text += ', ' # Make a consistently-formed list + vlan_text += match.group(1) + else: + in_match = False + if not in_match: + match = regex_start.match(line) + if match: + vlan_text += match.group(1) + in_match = True + + vlans = self._parse_vlan_list(vlan_text) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_trunk_vlan_list(port) + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + + if not self.connection is None: + self.connection.close(True) + self.connection = None + + logging.debug("Connecting to Switch with: %s", self.exec_string) + self.connection = pexpect.spawn(self.exec_string, logfile=self.logger) + self._login() + + # Avoid paged output as much as possible + self._cli("terminal length 999") + # Don't do silly things with ANSI codes + self._cli("terminal type dumb") + # and disable auto-logout after delay + self._cli("no cli session auto-logout") + + # And grab details about the switch. in case we need it + self._get_systemdata() + + # And also validate them - make sure we're driving a switch of + # the correct model! Also store the serial number + descr_regex = re.compile(r'Product name:\s*(\S+)') + sn_regex = re.compile(r'System serial num:\s*(\S+)') + descr = "" + + for line in self._systemdata: + match = descr_regex.match(line) + if match: + descr = match.group(1) + match = sn_regex.match(line) + if match: + self.serial_number = match.group(1) + + logging.debug("serial number is %s", self.serial_number) + logging.debug("system description is %s", descr) + + if not self._expected_descr_re.match(descr): + raise IOError("Switch %s not recognised by this driver: abort" % descr) + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _login(self): + logging.debug("attempting login with username %s, password %s", self._username, self._password) + if self._username is not None: + self.connection.expect("login:") + self._cli("%s" % self._username) + if self._password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._password, False) + while True: + index = self.connection.expect(['User:', 'Password:', 'Login incorrect', 'authentication failed', r'(.*?)(#|>)']) + if index != 4: # Any other means: failed to log in! + logging.error("Login failure: index %d\n", index) + logging.error("Login failure: %s\n", self.connection.match.before) + raise IOError + + # else + + # Add a couple of newlines to get past the "last login" etc. junk + self._cli("") + self._cli("") + self.connection.expect(r'^(.*?) (#|>)') + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + logging.info("Got outer prompt \"%s\"", self._prompt_name) + if self.connection.match.group(2) == ">": + # Need to enter "enable" mode too + self._cli("enable") + if self._enable_password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._enable_password, False) + index = self.connection.expect(['Password:', 'Login incorrect', 'authentication failed', r'(.*) *(#|>)']) + if index != 3: # Any other means: failed to log in! + logging.error("Enable password failure: %s\n", self.connection.match) + raise IOError + self._cli("") + self._cli("") + self.connection.expect(r'^(.*?) (#|>)') + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + logging.info("Got enable prompt \"%s\"", self._prompt_name) + return 0 + + def _logout(self): + logging.debug("Logging out") + self._cli("quit", False) + try: + self.connection.expect("Would you like to save them now") + self._cli("n") + except (pexpect.EOF): + pass + self.connection.close(True) + + def _configure(self): + self._cli("configure terminal") + + def _end_configure(self): + self._cli("exit") + + def _end_interface(self): + self._cli("exit") + + def _end_vlan(self): + self._cli("exit") + + def _read_long_output(self, text): + longbuf = [] + prompt = self._prompt_name + r'\s*#' + while True: + try: + index = self.connection.expect([r'lines \d+-\d+', prompt]) + if index == 0: # "lines 45-50" + for line in self.connection.before.split('\r\n'): + longbuf.append(line.strip()) + self._cli(' ', False) + elif index == 1: # Back to a prompt, says output is finished + break + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_in(text) + raise PExpectError("_read_long_output failed") + except: + logging.error("prompt is \"%s\"", prompt) + raise + + for line in self.connection.before.split('\r\n'): + longbuf.append(line.strip()) + return longbuf + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + + # Look for "Eth1" at the beginning of the output lines to just + # match lines with interfaces - they have names like + # "Eth1/15". We do not care about Link Aggregation Groups (lag) + # here. + regex = re.compile(r'^Eth(\S+)') + + try: + self._cli("show interfaces switchport") + for line in self._read_long_output("show interfaces switchport"): + match = regex.match(line) + if match: + interface = match.group(1) + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + logging.debug(" found %d ports on the switch", len(interfaces)) + return interfaces + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_names() + + # Get the mode of a port: access or trunk + def _port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + mode = '' + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + regex = re.compile('Switchport mode: (.*)') + + try: + self._cli("show interfaces ethernet %s" % port) + for line in self._read_long_output("show interfaces ethernet"): + match = regex.match(line) + if match: + mode = match.group(1) + if mode == 'access': + return 'access' + if mode == 'trunk': + return 'trunk' + return mode + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_mode(port) + + def _show_config(self): + logging.debug("Grabbing config") + try: + self._cli("show running-config") + return self._read_long_output("show running-config") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_config() + + def _show_clock(self): + logging.debug("Grabbing time") + try: + self._cli("show clock") + return self._read_long_output("show clock") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_clock() + + def _get_systemdata(self): + logging.debug("Grabbing system sw and hw versions") + + try: + self._systemdata = [] + self._cli("show version") + for line in self._read_long_output("show version"): + self._systemdata.append(line) + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_systemdata() + + # Borrowed from the Catalyst driver. Over-complex for our needs here, but + # it's already tested and will do the job. + def _parse_vlan_list(self, inputdata): + vlans = [] + + if inputdata == "ALL": + return ["ALL"] + elif inputdata == "NONE": + return [] + elif inputdata == "": + return [] + else: + # Parse the complex list + groups = inputdata.split(',') + for group in groups: + subgroups = group.split('-') + if len(subgroups) == 1: + vlans.append(int(subgroups[0])) + elif len(subgroups) == 2: + for i in range (int(subgroups[0]), int(subgroups[1]) + 1): + vlans.append(i) + else: + logging.debug("Can't parse group \"" + group + "\"") + + return vlans + + # Wrapper around connection.send - by default, expect() the same + # text we've sent, to remove it from the output from the + # switch. For the few cases where we don't need that, override + # this using echo=False. + # Horrible, but seems to work. + def _cli(self, text, echo=True): + self.connection.send(text + '\r') + if echo: + try: + self.connection.expect(text) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_out(text) + raise PExpectError("_cli failed on %s" % text) + except: + logging.error("Unexpected error: %s", sys.exc_info()[0]) + raise + +if __name__ == "__main__": + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = '172.27.16.6' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = MlnxOS(switch, 23, debug=False) + p.switch_connect('admin', 'admin', None) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.vlan_get_name(102) + print "VLAN 102 is named \"%s\"" % buf + + print "Create VLAN 1003" + p.vlan_create(1003) + + buf = p.vlan_get_name(1003) + print "VLAN 1003 is named \"%s\"" % buf + + print "Set name of VLAN 1003 to test333" + p.vlan_set_name(1003, "test333") + + buf = p.vlan_get_name(1003) + print "VLAN 1003 is named \"%s\"" % buf + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + print "Destroy VLAN 1003" + p.vlan_destroy(1003) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.port_get_mode("1/15") + print "Port 1/15 is in %s mode" % buf + + buf = p.port_get_mode("1/16") + print "Port 1/16 is in %s mode" % buf + + # Test access stuff + print "Set 1/15 to access mode" + p.port_set_mode("1/15", "access") + + print "Move 1/15 to VLAN 4" + p.port_set_access_vlan("1/15", 4) + + buf = p.port_get_access_vlan("1/15") + print "Read from switch: 1/15 is on VLAN %s" % buf + + print "Move 1/15 back to VLAN 1" + p.port_set_access_vlan("1/15", 1) + + print "Create VLAN 1002" + p.vlan_create(1002) + + print "Create VLAN 1003" + p.vlan_create(1003) + + print "Create VLAN 1004" + p.vlan_create(1004) + + # Test access stuff + print "Set 1/15 to trunk mode" + p.port_set_mode("1/15", "trunk") + print "Read from switch: which VLANs is 1/15 on?" + buf = p.port_get_trunk_vlan_list("1/15") + p.dump_list(buf) + + # The adds below are NOOPs in effect on this switch - no filtering + # for "trunk" ports + print "Add 1/15 to VLAN 1002" + p.port_add_trunk_to_vlan("1/15", 1002) + print "Add 1/15 to VLAN 1003" + p.port_add_trunk_to_vlan("1/15", 1003) + print "Add 1/15 to VLAN 1004" + p.port_add_trunk_to_vlan("1/15", 1004) + print "Read from switch: which VLANs is 1/15 on?" + buf = p.port_get_trunk_vlan_list("1/15") + p.dump_list(buf) + + # And the same for removals here + p.port_remove_trunk_from_vlan("1/15", 1003) + p.port_remove_trunk_from_vlan("1/15", 1002) + p.port_remove_trunk_from_vlan("1/15", 1004) + print "Read from switch: which VLANs is 1/15 on?" + buf = p.port_get_trunk_vlan_list("1/15") + p.dump_list(buf) + +# print 'Restarting switch, to explicitly reset config' +# p.switch_restart() + +# p.switch_save_running_config() + +# p.switch_disconnect() +# p._show_config() diff --git a/Vland/drivers/NetgearXSM.py b/Vland/drivers/NetgearXSM.py new file mode 100644 index 0000000..f8ba8a5 --- /dev/null +++ b/Vland/drivers/NetgearXSM.py @@ -0,0 +1,782 @@ +#! /usr/bin/python + +# Copyright 2015-2018 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pexpect + +# Netgear XSM family driver +# Developed and tested against the XSM7224S in the Linaro LAVA lab + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class NetgearXSM(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + + _capabilities = [ + ] + + # Regexps of expected hardware information - fail if we don't see + # this + _expected_manuf = re.compile('^Netgear') + _expected_model = re.compile('^XSM') + + def __init__(self, switch_hostname, switch_telnetport=23, debug = False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) + self.errors = SwitchErrors() + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config into flash - we want config to + # remain across reboots + def switch_save_running_config(self): + try: + self._cli("save") + self.connection.expect("Are you sure") + self._cli("y") + self.connection.expect("Configuration Saved!") + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.switch_save_running_config() + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + # + # This will also implicitly cause a connection to be closed + def switch_restart(self): + self._cli("reload") + self.connection.expect('Are you sure') + self._cli("y") # Yes, continue to reset + self.connection.close(True) + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + try: + self._cli("vlan database") + self._cli("vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + return + raise IOError("Failed to create VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_create(tag) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + + try: + self._cli("vlan database") + self._cli("no vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + raise IOError("Failed to destroy VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_destroy(tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + + try: + self._cli("vlan database") + self._cli("vlan name %d %s" % (tag, name)) + self._end_configure() + + # Validate it happened + read_name = self.vlan_get_name(tag) + if read_name != name: + raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" + % (tag, read_name, name)) + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_set_name(tag, name) + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + + try: + vlans = [] + + regex = re.compile(r'^ *(\d+).*(Static)') + + self._cli("show vlan brief") + for line in self._read_long_output("show vlan brief"): + match = regex.match(line) + if match: + vlans.append(int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_list() + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + + try: + logging.debug("Grabbing the name of VLAN %d", tag) + name = None + regex = re.compile('VLAN Name: (.*)') + self._cli("show vlan %d" % tag) + for line in self._read_long_output("show vlan"): + match = regex.match(line) + if match: + name = match.group(1) + name.strip() + return name + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_name(tag) + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s mode", port, mode) + if not self._is_port_mode_valid(mode): + raise InputError("Port mode %s is not allowed" % mode) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + + # This switch does not support specific modes, so we can't + # actually change the mode directly. However, we can and + # should deal with the PVID and memberships of existing VLANs + # etc. + + try: + if mode == "trunk": + # We define a trunk port thus: + # * accept all frames on ingress + # * accept packets for all VLANs (no ingress filter) + # * tags frames on transmission (do that later when + # * adding VLANs to the port) + # * PVID should match the default VLAN (1). + self._configure() + self._cli("interface %s" % port) + self._cli("vlan acceptframe all") + self._cli("no vlan ingressfilter") + self._cli("vlan pvid 1") + self._end_interface() + self._end_configure() + + # We define an access port thus: + # * accept only untagged frames on ingress + # * accept packets for only desired VLANs (ingress filter) + # * exists on one VLAN only (1 by default) + # * do not tag frames on transmission (the devices + # we're talking to are expecting untagged frames) + # * PVID should match the VLAN it's on (1 by default, + # but don't do that here) + if mode == "access": + self._configure() + self._cli("interface %s" % port) + self._cli("vlan acceptframe admituntaggedonly") + self._cli("vlan ingressfilter") + self._cli("no vlan tagging 1-1023") + self._cli("no vlan tagging 1024-2047") + self._cli("no vlan tagging 2048-3071") + self._cli("no vlan tagging 3072-4093") + self._end_interface() + self._end_configure() + + # Validate it happened + read_mode = self._port_get_mode(port) + + if read_mode != mode: + raise IOError("Failed to set mode for port %s" % port) + + # And cache the result + self._port_modes[port] = mode + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_mode(port, mode) + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + + try: + current_vlans = self._get_port_vlans(port) + self._configure() + self._cli("interface %s" % port) + self._cli("vlan pvid %s" % tag) + # Find the list of VLANs we're currently on, and drop them + # all. "auto" mode is fine here, we won't be included + # unless we have GVRP configured, and we don't do + # that. + for current_vlan in current_vlans: + self._cli("vlan participation auto %s" % current_vlan) + # Now specifically include the VLAN we want + self._cli("vlan participation include %s" % tag) + self._cli("no shutdown") + self._end_interface() + self._end_configure() + + # Finally, validate things worked + read_vlan = int(self.port_get_access_vlan(port)) + if read_vlan != tag: + raise IOError("Failed to move access port %d to VLAN %d - got VLAN %d instead" + % (port, tag, read_vlan)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_access_vlan(port, tag) + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("vlan participation include %d" % tag) + self._cli("vlan tagging %d" % tag) + self._end_interface() + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag or vlan == "ALL": + return + raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_add_trunk_to_vlan(port, tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("vlan participation auto %d" % tag) + self._cli("no vlan tagging %d" % tag) + self._end_interface() + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_remove_trunk_from_vlan(port, tag) + + # Get the configured VLAN tag for an access port (tag) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + vlans = self._get_port_vlans(port) + if (len(vlans) > 1): + raise IOError("More than one VLAN on access port %s" % port) + return vlans[0] + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLAN(s) for trunk port %s", port) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + return self._get_port_vlans(port) + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + + if not self.connection is None: + self.connection.close(True) + self.connection = None + + logging.debug("Connecting to Switch with: %s", self.exec_string) + self.connection = pexpect.spawn(self.exec_string, logfile=self.logger) + self._login() + + # Avoid paged output + self._cli("terminal length 0") + + # And grab details about the switch. in case we need it + self._get_systemdata() + + # And also validate them - make sure we're driving a switch of + # the correct model! Also store the serial number + manuf_regex = re.compile(r'^Manufacturer([\.\s])+(\S+)') + model_regex = re.compile(r'^Machine Model([\.\s])+(\S+)') + sn_regex = re.compile(r'^Serial Number([\.\s])+(\S+)') + + for line in self._systemdata: + match1 = manuf_regex.match(line) + if match1: + manuf = match1.group(2) + + match2 = model_regex.match(line) + if match2: + model = match2.group(2) + + match3 = sn_regex.match(line) + if match3: + self.serial_number = match3.group(2) + + logging.debug("manufacturer is %s", manuf) + logging.debug("model is %s", model) + logging.debug("serial number is %s", self.serial_number) + + if not (self._expected_manuf.match(manuf) and self._expected_model.match(model)): + raise IOError("Switch %s %s not recognised by this driver: abort" % (manuf, model)) + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _login(self): + logging.debug("attempting login with username %s, password %s", self._username, self._password) + if self._username is not None: + self.connection.expect("User:") + self._cli("%s" % self._username) + if self._password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._password, False) + while True: + index = self.connection.expect(['User:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) + if index != 4: # Any other means: failed to log in! + logging.error("Login failure: index %d\n", index) + logging.error("Login failure: %s\n", self.connection.match.before) + raise IOError + + # else + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + if self.connection.match.group(2) == ">": + # Need to enter "enable" mode too + self._cli("enable") + if self._enable_password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._enable_password, False) + index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*) *(#|>)']) + if index != 3: # Any other means: failed to log in! + logging.error("Enable password failure: %s\n", self.connection.match) + raise IOError + return 0 + + def _logout(self): + logging.debug("Logging out") + self._cli("quit", False) + try: + self.connection.expect("Would you like to save them now") + self._cli("n") + except (pexpect.EOF): + pass + self.connection.close(True) + + def _configure(self): + self._cli("configure") + + def _end_configure(self): + self._cli("exit") + + def _end_interface(self): + self._end_configure() + + def _read_long_output(self, text): + longbuf = [] + prompt = self._prompt_name + r'\s*#' + while True: + try: + index = self.connection.expect(['--More--', prompt]) + if index == 0: # "--More-- or (q)uit" + for line in self.connection.before.split('\r\n'): + line1 = re.sub('(\x08|\x0D)*', '', line.strip()) + longbuf.append(line1) + self._cli(' ', False) + elif index == 1: # Back to a prompt, says output is finished + break + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_in(text) + raise PExpectError("_read_long_output failed") + except: + logging.error("prompt is \"%s\"", prompt) + raise + + for line in self.connection.before.split('\r\n'): + longbuf.append(line.strip()) + return longbuf + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + + # Look for "1" at the beginning of the output lines to just + # match lines with interfaces - they have names like + # "1/0/22". We do not care about Link Aggregation Groups (lag) + # here. + regex = re.compile(r'^(1\S+)') + + try: + self._cli("show port all") + for line in self._read_long_output("show port all"): + match = regex.match(line) + if match: + interface = match.group(1) + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + logging.debug(" found %d ports on the switch", len(interfaces)) + return interfaces + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_names() + + # Get the mode of a port: access or trunk + def _port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + acceptframe_re = re.compile('vlan acceptframe (.*)') + ingress_re = re.compile('vlan ingressfilter') + + acceptframe = None + ingressfilter = True + + try: + self._cli("show running-config interface %s" % port) + for line in self._read_long_output("show running-config interface"): + + match = acceptframe_re.match(line) + if match: + acceptframe = match.group(1) + + match = ingress_re.match(line) + if match: + ingressfilter = True + + # Simple classifier for now; may need to revisit later... + if (ingressfilter and acceptframe == "admituntaggedonly"): + return "access" + else: + return "trunk" + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_mode(port) + + def _show_config(self): + logging.debug("Grabbing config") + try: + self._cli("show running-config") + return self._read_long_output("show running-config") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_config() + + def _show_clock(self): + logging.debug("Grabbing time") + try: + self._cli("show clock") + return self._read_long_output("show clock") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_clock() + + def _get_systemdata(self): + logging.debug("Grabbing system sw and hw versions") + + try: + self._systemdata = [] + self._cli("show version") + for line in self._read_long_output("show version"): + self._systemdata.append(line) + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_systemdata() + + def _parse_vlan_list(self, inputdata): + vlans = [] + + if inputdata == "ALL": + return ["ALL"] + elif inputdata == "NONE": + return [] + else: + # Parse the complex list + groups = inputdata.split(',') + for group in groups: + subgroups = group.split('-') + if len(subgroups) == 1: + vlans.append(int(subgroups[0])) + elif len(subgroups) == 2: + for i in range (int(subgroups[0]), int(subgroups[1]) + 1): + vlans.append(i) + else: + logging.debug("Can't parse group \"" + group + "\"") + + return vlans + + def _get_port_vlans(self, port): + vlan_text = None + + vlan_part_re = re.compile('vlan participation include (.*)') + + try: + self._cli("show running-config interface %s" % port) + for line in self._read_long_output("show running-config interface"): + match = vlan_part_re.match(line) + if match: + if vlan_text != None: + vlan_text += "," + vlan_text += (match.group(1)) + else: + vlan_text = match.group(1) + + if vlan_text is None: + return [1] + else: + vlans = self._parse_vlan_list(vlan_text) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_vlans(port) + + # Wrapper around connection.send - by default, expect() the same + # text we've sent, to remove it from the output from the + # switch. For the few cases where we don't need that, override + # this using echo=False. + # Horrible, but seems to work. + def _cli(self, text, echo=True): + self.connection.send(text + '\r') + if echo: + try: + self.connection.expect(text) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_out(text) + raise PExpectError("_cli failed on %s" % text) + except: + logging.error("Unexpected error: %s", sys.exc_info()[0]) + raise + +if __name__ == "__main__": + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = 'vlandswitch05' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = NetgearXSM(switch, 23, debug=True) + p.switch_connect('admin', '', None) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.vlan_get_name(2) + print "VLAN 2 is named \"%s\"" % buf + + print "Create VLAN 3" + p.vlan_create(3) + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "Set name of VLAN 3 to test333" + p.vlan_set_name(3, "test333") + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + print "Destroy VLAN 3" + p.vlan_destroy(3) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.port_get_mode("1/0/10") + print "Port 1/0/10 is in %s mode" % buf + + buf = p.port_get_mode("1/0/11") + print "Port 1/0/11 is in %s mode" % buf + + # Test access stuff + print "Set 1/0/9 to access mode" + p.port_set_mode("1/0/9", "access") + + print "Move 1/0/9 to VLAN 4" + p.port_set_access_vlan("1/0/9", 4) + + buf = p.port_get_access_vlan("1/0/9") + print "Read from switch: 1/0/9 is on VLAN %s" % buf + + print "Move 1/0/9 back to VLAN 1" + p.port_set_access_vlan("1/0/9", 1) + + print "Create VLAN 2" + p.vlan_create(2) + + print "Create VLAN 3" + p.vlan_create(3) + + print "Create VLAN 4" + p.vlan_create(4) + + # Test access stuff + print "Set 1/0/9 to trunk mode" + p.port_set_mode("1/0/9", "trunk") + print "Read from switch: which VLANs is 1/0/9 on?" + buf = p.port_get_trunk_vlan_list("1/0/9") + p.dump_list(buf) + + # The adds below are NOOPs in effect on this switch - no filtering + # for "trunk" ports + print "Add 1/0/9 to VLAN 2" + p.port_add_trunk_to_vlan("1/0/9", 2) + print "Add 1/0/9 to VLAN 3" + p.port_add_trunk_to_vlan("1/0/9", 3) + print "Add 1/0/9 to VLAN 4" + p.port_add_trunk_to_vlan("1/0/9", 4) + print "Read from switch: which VLANs is 1/0/9 on?" + buf = p.port_get_trunk_vlan_list("1/0/9") + p.dump_list(buf) + + # And the same for removals here + p.port_remove_trunk_from_vlan("1/0/9", 3) + p.port_remove_trunk_from_vlan("1/0/9", 2) + p.port_remove_trunk_from_vlan("1/0/9", 4) + print "Read from switch: which VLANs is 1/0/9 on?" + buf = p.port_get_trunk_vlan_list("1/0/9") + p.dump_list(buf) + +# print 'Restarting switch, to explicitly reset config' +# p.switch_restart() + +# p.switch_save_running_config() + +# p.switch_disconnect() +# p._show_config() diff --git a/Vland/drivers/TPLinkTLSG2XXX.py b/Vland/drivers/TPLinkTLSG2XXX.py new file mode 100644 index 0000000..5213d10 --- /dev/null +++ b/Vland/drivers/TPLinkTLSG2XXX.py @@ -0,0 +1,695 @@ +#! /usr/bin/python + +# Copyright 2014-2015 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pexpect + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class TPLinkTLSG2XXX(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + + _capabilities = [ + ] + + # Regexp of expected hardware information - fail if we don't see + # this + _expected_descr_re = re.compile(r'TL-SG2\d\d\d') + + def __init__(self, switch_hostname, switch_telnetport=23, debug=False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) + self.errors = SwitchErrors() + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config into flash - we want config to + # remain across reboots + def switch_save_running_config(self): + try: + self._cli("copy running-config startup-config") + self.connection.expect("OK") + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.switch_save_running_config() + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + # + # This will also implicitly cause a connection to be closed + def switch_restart(self): + self._cli("reboot") + index = self.connection.expect(['Daving current', 'Continue?']) + if index == 0: + self._cli("n") # No, don't save + self.connection.expect("Continue?") + + # Fall through + self._cli("y") # Yes, continue to reset + self.connection.close(True) + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + + try: + self._configure() + self._cli("vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + return + raise IOError("Failed to create VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_create(tag) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + + try: + self._configure() + self._cli("no vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + raise IOError("Failed to destroy VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_destroy(tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + + try: + self._configure() + self._cli("vlan %d" % tag) + self._cli("name %s" % name) + self._end_configure() + + # Validate it happened + read_name = self.vlan_get_name(tag) + if read_name != name: + raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" + % (tag, read_name, name)) + + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.vlan_set_name(tag, name) + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + + try: + vlans = [] + regex = re.compile(r'^ *(\d+).*active') + + self._cli("show vlan brief") + for line in self._read_long_output("show vlan brief"): + match = regex.match(line) + if match: + vlans.append(int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_list() + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + logging.debug("Grabbing the name of VLAN %d", tag) + + try: + name = None + regex = re.compile(r'^ *\d+\s+(\S+).*(active)') + self._cli("show vlan id %d" % tag) + for line in self._read_long_output("show vlan id"): + match = regex.match(line) + if match: + name = match.group(1) + name.strip() + return name + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_name(tag) + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s", port, mode) + if not self._is_port_mode_valid(mode): + raise IndexError("Port mode %s is not allowed" % mode) + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + # This switch does not support specific modes, so we can't + # actually change the mode directly. However, we can and + # should deal with the PVID and memberships of existing VLANs + + try: + # We define a trunk to be on *all* VLANs on the switch in + # tagged mode, and PVID should match the default VLAN (1). + if mode == "trunk": + # Disconnect all the untagged ports + read_vlans = self._port_get_all_vlans(port, 'Untagged') + for vlan in read_vlans: + self._port_remove_general_vlan(port, vlan) + + # And move to VLAN 1 + self.port_add_trunk_to_vlan(port, 1) + self._set_pvid(port, 1) + + # And an access port should only be on one VLAN. Move to + # VLAN 1, untagged, and set PVID there. + if mode == "access": + # Disconnect all the ports + read_vlans = self._port_get_all_vlans(port, 'Untagged') + for vlan in read_vlans: + self._port_remove_general_vlan(port, vlan) + read_vlans = self._port_get_all_vlans(port, 'Tagged') + for vlan in read_vlans: + self._port_remove_general_vlan(port, vlan) + + # And move to VLAN 1 + self.port_set_access_vlan(port, 1) + self._set_pvid(port, 1) + + # Validate it happened + read_mode = self._port_get_mode(port) + if read_mode != mode: + raise IOError("Failed to set mode for port %s" % port) + + # And cache the result + self._port_modes[port] = mode + + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.port_set_mode(port, mode) + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + # Does the VLAN already exist? + vlan_list = self.vlan_get_list() + if not tag in vlan_list: + raise IndexError("VLAN tag %d not recognised" % tag) + + try: + # Add the new VLAN + self._configure() + self._cli("interface %s" % self._long_port_name(port)) + self._cli("switchport general allowed vlan %d untagged" % tag) + self._cli("no shutdown") + self._end_configure() + + self._set_pvid(port, tag) + + # Now drop all the other VLANs + read_vlans = self._port_get_all_vlans(port, 'Untagged') + for vlan in read_vlans: + if vlan != tag: + self._port_remove_general_vlan(port, vlan) + + # Finally, validate things worked + read_vlan = int(self.port_get_access_vlan(port)) + if read_vlan != tag: + raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead" + % (port, tag, read_vlan)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_access_vlan(port, tag) + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + try: + self._configure() + self._cli("interface %s" % self._long_port_name(port)) + self._cli("switchport general allowed vlan %d tagged" % tag) + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag or vlan == "ALL": + return + raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_add_trunk_to_vlan(port, tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + + try: + self._configure() + self._cli("interface %s" % self._long_port_name(port)) + self._cli("no switchport general allowed vlan %d" % tag) + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_remove_trunk_from_vlan(port, tag) + + # Get the configured VLAN tag for an access port (i.e. a port not + # configured for tagged egress) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + vlan = 1 + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + regex = re.compile(r'(\d+)\s+.*Untagged') + + try: + self._cli("show interface switchport %s" % self._long_port_name(port)) + for line in self._read_long_output("show interface switchport"): + match = regex.match(line) + if match: + vlan = match.group(1) + return int(vlan) + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_access_vlan(port) + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLANs for trunk port %s", port) + + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + + return self._port_get_all_vlans(port, 'Tagged') + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + + if not self.connection is None: + self.connection.close(True) + self.connection = None + + logging.debug("Connecting to Switch with: %s", self.exec_string) + self.connection = pexpect.spawn(self.exec_string, logfile=self.logger) + self._login() + + # No way to avoid paged output on this switch AFAICS + + # And grab details about the switch. in case we need it + self._get_systemdata() + + # And also validate them - make sure we're driving a switch of + # the correct model! Also store the serial number + descr_regex = re.compile(r'Hardware Version\s+ - (.*)') + descr = "" + + for line in self._systemdata: + match = descr_regex.match(line) + if match: + descr = match.group(1) + + logging.debug("system description is %s", descr) + + if not self._expected_descr_re.match(descr): + raise IOError("Switch %s not recognised by this driver: abort" % descr) + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _login(self): + logging.debug("attempting login with username \"%s\", password \"%s\", enable_password \"%s\"", self._username, self._password, self._enable_password) + self.connection.expect('User Access Login') + if self._username is not None: + self.connection.expect("User:") + self._cli("%s" % self._username) + if self._password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._password, False) + while True: + index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) + if index != 4: # Any other means: failed to log in! + logging.error("Login failure: index %d\n", index) + logging.error("Login failure: %s\n", self.connection.match.before) + raise IOError + + # else + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + if self.connection.match.group(2) == ">": + # Need to enter "enable" mode too + self._cli("") + self._cli("enable") + if self._enable_password is not None and len(self._enable_password) > 0: + self.connection.expect("Password:") + self._cli("%s" % self._enable_password, False) + index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) + if index != 3: # Any other means: failed to log in! + logging.error("Enable password failure: %s\n", self.connection.match) + raise IOError + return 0 + + def _logout(self): + logging.debug("Logging out") + self._cli("exit", False) + self.connection.close(True) + + def _configure(self): + self._cli("configure") + + def _end_configure(self): + self._cli("end") + + def _read_long_output(self, text): + longbuf = [] + prompt = self._prompt_name + '#' + while True: + try: + index = self.connection.expect(['^Press any key to continue', prompt]) + if index == 0: # "Press any key to continue (Q to quit)" + for line in self.connection.before.split('\r\n'): + line1 = re.sub('(\x08|\x0D)*', '', line.strip()) + longbuf.append(line1) + self._cli(' ', False) + elif index == 1: # Back to a prompt, says output is finished + break + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_in(text) + raise PExpectError("_read_long_output failed") + except: + logging.error("prompt is \"%s\"", prompt) + raise + + for line in self.connection.before.split('\r\n'): + line1 = re.sub('(\x08|\x0D)*', '', line.strip()) + longbuf.append(line1) + + return longbuf + + def _long_port_name(self, port): + return re.sub('Gi', 'gigabitEthernet ', port) + + def _set_pvid(self, port, pvid): + # Set a port's PVID + self._configure() + self._cli("interface %s" % self._long_port_name(port)) + self._cli("switchport pvid %d" % pvid) + self._end_configure() + + def _port_get_all_vlans(self, port, port_type): + vlans = [] + regex = re.compile(r'(\d+)\s+.*' + port_type) + + try: + self._cli("show interface switchport %s" % self._long_port_name(port)) + for line in self._read_long_output("show interface switchport"): + match = regex.match(line) + if match: + vlan = match.group(1) + vlans.append(int(vlan)) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self._port_get_all_vlans(port, port_type) + + def _port_remove_general_vlan(self, port, tag): + try: + self._configure() + self._cli("interface %s" % self._long_port_name(port)) + self._cli("no switchport general allowed vlan %d" % tag) + self._end_configure() + + except PExpectError: + # recurse on error + self._switch_connect() + return self._port_remove_general_vlan(port, tag) + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + + # Use "Link" to only identify lines in the output that match + # interfaces that exist - it'll match "LinkUp" and "LinkDown" + regex = re.compile(r'^\s*([a-zA-Z0-9_/]*).*Link') + + try: + self._cli("show interface status") + for line in self._read_long_output("show interface status"): + match = regex.match(line) + if match: + interface = match.group(1) + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + logging.debug(" found %d ports on the switch", len(interfaces)) + return interfaces + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_names() + + # Get the mode of a port: access or trunk + def _port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + + if not self._is_port_name_valid(port): + raise IndexError("Port name %s not recognised" % port) + + # This switch does not support specific modes, so we have to + # make stuff up here. We define trunk ports to be on (1 or + # many) tagged VLANs, anything not tagged to be access. + read_vlans = self._port_get_all_vlans(port, 'Tagged') + if len(read_vlans) > 0: + return "trunk" + else: + return "access" + + def _show_config(self): + logging.debug("Grabbing config") + self._cli("show running-config") + return self._read_long_output("show running-config") + + def _show_clock(self): + logging.debug("Grabbing time") + self._cli("show system-time") + return self._read_long_output("show system-time") + + def _get_systemdata(self): + logging.debug("Grabbing system sw and hw versions") + + self._cli("show system-info") + self._systemdata = [] + for line in self._read_long_output("show system-info"): + self._systemdata.append(line) + + # Wrapper around connection.send - by default, expect() the same + # text we've sent, to remove it from the output from the + # switch. For the few cases where we don't need that, override + # this using echo=False. + # Horrible, but seems to work. + def _cli(self, text, echo=True): + self.connection.send(text + '\r') + if echo: + self.connection.expect(text) + +if __name__ == "__main__": + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = '10.172.2.50' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = TPLinkTLSG2XXX(switch, 23, debug = False) + p.switch_connect('admin', 'admin', None) + + print "Ports are:" + buf = p.switch_get_port_names() + p.dump_list(buf) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.vlan_get_name(2) + print "VLAN 2 is named \"%s\"" % buf + + print "Create VLAN 3" + p.vlan_create(3) + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "Set name of VLAN 3 to test333" + p.vlan_set_name(3, "test333") + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + print "Destroy VLAN 3" + p.vlan_destroy(3) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.port_get_mode("Gi1/0/10") + print "Port Gi1/0/10 is in %s mode" % buf + + buf = p.port_get_mode("Gi1/0/11") + print "Port Gi1/0/11 is in %s mode" % buf + + # Test access stuff + buf = p.port_get_mode("Gi1/0/9") + print "Port Gi1/0/9 is in %s mode" % buf + + print "Set Gi1/0/9 to access mode" + p.port_set_mode("Gi1/0/9", "access") + + print "Move Gi1/0/9 to VLAN 4" + p.port_set_access_vlan("Gi1/0/9", 4) + + buf = p.port_get_access_vlan("Gi1/0/9") + print "Read from switch: Gi1/0/9 is on VLAN %s" % buf + + print "Move Gi1/0/9 back to VLAN 1" + p.port_set_access_vlan("Gi1/0/9", 1) + + # Test access stuff + print "Set Gi1/0/9 to trunk mode" + p.port_set_mode("Gi1/0/9", "trunk") + print "Read from switch: which VLANs is Gi1/0/9 on?" + buf = p.port_get_trunk_vlan_list("Gi1/0/9") + p.dump_list(buf) + print "Add Gi1/0/9 to VLAN 2" + p.port_add_trunk_to_vlan("Gi1/0/9", 2) + print "Add Gi1/0/9 to VLAN 3" + p.port_add_trunk_to_vlan("Gi1/0/9", 3) + print "Add Gi1/0/9 to VLAN 4" + p.port_add_trunk_to_vlan("Gi1/0/9", 4) + print "Read from switch: which VLANs is Gi1/0/9 on?" + buf = p.port_get_trunk_vlan_list("Gi1/0/9") + p.dump_list(buf) + + p.port_remove_trunk_from_vlan("Gi1/0/9", 3) + p.port_remove_trunk_from_vlan("Gi1/0/9", 3) + p.port_remove_trunk_from_vlan("Gi1/0/9", 4) + print "Read from switch: which VLANs is Gi1/0/9 on?" + buf = p.port_get_trunk_vlan_list("Gi1/0/9") + p.dump_list(buf) + + + p.switch_save_running_config() + + p.switch_disconnect() +# p._show_config() diff --git a/Vland/drivers/__init__.py b/Vland/drivers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Vland/drivers/__init__.py diff --git a/Vland/drivers/common.py b/Vland/drivers/common.py new file mode 100644 index 0000000..e564c9e --- /dev/null +++ b/Vland/drivers/common.py @@ -0,0 +1,167 @@ +#! /usr/bin/python + +# Copyright 2014-2015 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import time +import logging + +from errors import InputError, PExpectError + +class SwitchErrors: + """ Error logging and statistics class """ + + def __init__(self): + self.errors_in = 0 + self.errors_out = 0 + + def __repr__(self): + return "<SwitchErrors: errors_in: %d, errors_out: %d>" % (self.errors_in, self.errors_out) + + # For now, just count the error. Later on we might add stats and + # analysis + def log_error_in(self, text): + self.errors_in += 1 + + # For now, just count the error. Later on we might add stats and + # analysis + def log_error_out(self, text): + self.errors_out += 1 + +class SwitchDriver(object): + + connection = None + hostname = "" + serial_number = '' + + _allowed_port_modes = [ "trunk", "access" ] + _ports = [] + _port_modes = {} + _port_numbers = {} + _prompt_name = '' + _username = '' + _password = '' + _enable_password = '' + _systemdata = [] + + def __init__ (self, switch_hostname, debug): + + if debug: + # Configure logging for pexpect output if we have debug + # enabled + + # get the logger + self.logger = logging.getLogger(switch_hostname) + + # give the logger the methods required by pexpect + self.logger.write = self._log_write + self.logger.flush = self._log_do_nothing + + else: + self.logger = None + + self.hostname = switch_hostname + + # Connect to the switch and log in + def switch_connect(self, username, password, enablepassword): + self._username = username + self._password = password + self._enable_password = enablepassword + self._switch_connect() + + # Log out of the switch and drop the connection and all state + def switch_disconnect(self): + self._logout() + logging.debug("Closing connection to %s", self.hostname) + self._ports = [] + self._port_modes.clear() + self._port_numbers.clear() + self._prompt_name = '' + self._systemdata = [] + del(self) + + def dump_list(self, data): + i = 0 + for line in data: + print "%d: \"%s\"" % (i, line) + i += 1 + + def _delay(self): + time.sleep(0.5) + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + # List the names of all the ports on the switch + def switch_get_port_names(self): + return self._ports + + def _is_port_name_valid(self, name): + for port in self._ports: + if name == port: + return True + return False + + def _is_port_mode_valid(self, mode): + for allowed in self._allowed_port_modes: + if allowed == mode: + return True + return False + + # Try to look up a port mode in our cache. If not there, go ask + # the switch and cache the result + def port_get_mode(self, port): + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if port in self._port_modes: + logging.debug("port_get_mode: returning mode %s from cache for port %s", self._port_modes[port], port) + return self._port_modes[port] + else: + mode = self._port_get_mode(port) + self._port_modes[port] = mode + logging.debug("port_get_mode: found mode %s for port %s, adding to cache", self._port_modes[port], port) + return mode + + def port_map_name_to_number(self, port_name): + if not self._is_port_name_valid(port_name): + raise InputError("Port name %s not recognised" % port_name) + logging.debug("port_map_name_to_number: returning %d for port_name %s", self._port_numbers[port_name], port_name) + return self._port_numbers[port_name] + + # Wrappers to adapt logging for pexpect when we've configured on a + # switch. + # This will be the method called by the pexpect object to write a + # log message + def _log_write(self, *args, **kwargs): + # ignore other parameters, pexpect only uses one arg + content = args[0] + + if content in [' ', '', '\n', '\r', '\r\n']: + return # don't log empty lines + + # Split the output into multiple lines so we get a + # well-formatted logfile + for line in content.split('\r\n'): + logging.info(line) + + # This is the flush method for pexpect + def _log_do_nothing(self): + pass |