diff options
Diffstat (limited to 'Vland/drivers/NetgearXSM.py')
-rw-r--r-- | Vland/drivers/NetgearXSM.py | 782 |
1 files changed, 782 insertions, 0 deletions
diff --git a/Vland/drivers/NetgearXSM.py b/Vland/drivers/NetgearXSM.py new file mode 100644 index 0000000..f8ba8a5 --- /dev/null +++ b/Vland/drivers/NetgearXSM.py @@ -0,0 +1,782 @@ +#! /usr/bin/python + +# Copyright 2015-2018 Linaro Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301, USA. + +import logging +import sys +import re +import pexpect + +# Netgear XSM family driver +# Developed and tested against the XSM7224S in the Linaro LAVA lab + +if __name__ == '__main__': + import os + vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) + sys.path.insert(0, vlandpath) + sys.path.insert(0, "%s/.." % vlandpath) + +from errors import InputError, PExpectError +from drivers.common import SwitchDriver, SwitchErrors + +class NetgearXSM(SwitchDriver): + + connection = None + _username = None + _password = None + _enable_password = None + + _capabilities = [ + ] + + # Regexps of expected hardware information - fail if we don't see + # this + _expected_manuf = re.compile('^Netgear') + _expected_model = re.compile('^XSM') + + def __init__(self, switch_hostname, switch_telnetport=23, debug = False): + SwitchDriver.__init__(self, switch_hostname, debug) + self._systemdata = [] + self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) + self.errors = SwitchErrors() + + ################################ + ### Switch-level API functions + ################################ + + # Save the current running config into flash - we want config to + # remain across reboots + def switch_save_running_config(self): + try: + self._cli("save") + self.connection.expect("Are you sure") + self._cli("y") + self.connection.expect("Configuration Saved!") + except (PExpectError, pexpect.EOF): + # recurse on error + self._switch_connect() + self.switch_save_running_config() + + # Restart the switch - we need to reload config to do a + # roll-back. Do NOT save running-config first if the switch asks - + # we're trying to dump recent changes, not save them. + # + # This will also implicitly cause a connection to be closed + def switch_restart(self): + self._cli("reload") + self.connection.expect('Are you sure') + self._cli("y") # Yes, continue to reset + self.connection.close(True) + + # List the capabilities of the switch (and driver) - some things + # make no sense to abstract. Returns a dict of strings, each one + # describing an extra feature that that higher levels may care + # about + def switch_get_capabilities(self): + return self._capabilities + + ################################ + ### VLAN API functions + ################################ + + # Create a VLAN with the specified tag + def vlan_create(self, tag): + logging.debug("Creating VLAN %d", tag) + try: + self._cli("vlan database") + self._cli("vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + return + raise IOError("Failed to create VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_create(tag) + + # Destroy a VLAN with the specified tag + def vlan_destroy(self, tag): + logging.debug("Destroying VLAN %d", tag) + + try: + self._cli("vlan database") + self._cli("no vlan %d" % tag) + self._end_configure() + + # Validate it happened + vlans = self.vlan_get_list() + for vlan in vlans: + if vlan == tag: + raise IOError("Failed to destroy VLAN %d" % tag) + + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_destroy(tag) + + # Set the name of a VLAN + def vlan_set_name(self, tag, name): + logging.debug("Setting name of VLAN %d to %s", tag, name) + + try: + self._cli("vlan database") + self._cli("vlan name %d %s" % (tag, name)) + self._end_configure() + + # Validate it happened + read_name = self.vlan_get_name(tag) + if read_name != name: + raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" + % (tag, read_name, name)) + except PExpectError: + # recurse on error + self._switch_connect() + self.vlan_set_name(tag, name) + + # Get a list of the VLAN tags currently registered on the switch + def vlan_get_list(self): + logging.debug("Grabbing list of VLANs") + + try: + vlans = [] + + regex = re.compile(r'^ *(\d+).*(Static)') + + self._cli("show vlan brief") + for line in self._read_long_output("show vlan brief"): + match = regex.match(line) + if match: + vlans.append(int(match.group(1))) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_list() + + # For a given VLAN tag, ask the switch what the associated name is + def vlan_get_name(self, tag): + + try: + logging.debug("Grabbing the name of VLAN %d", tag) + name = None + regex = re.compile('VLAN Name: (.*)') + self._cli("show vlan %d" % tag) + for line in self._read_long_output("show vlan"): + match = regex.match(line) + if match: + name = match.group(1) + name.strip() + return name + + except PExpectError: + # recurse on error + self._switch_connect() + return self.vlan_get_name(tag) + + ################################ + ### Port API functions + ################################ + + # Set the mode of a port: access or trunk + def port_set_mode(self, port, mode): + logging.debug("Setting port %s to %s mode", port, mode) + if not self._is_port_mode_valid(mode): + raise InputError("Port mode %s is not allowed" % mode) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + + # This switch does not support specific modes, so we can't + # actually change the mode directly. However, we can and + # should deal with the PVID and memberships of existing VLANs + # etc. + + try: + if mode == "trunk": + # We define a trunk port thus: + # * accept all frames on ingress + # * accept packets for all VLANs (no ingress filter) + # * tags frames on transmission (do that later when + # * adding VLANs to the port) + # * PVID should match the default VLAN (1). + self._configure() + self._cli("interface %s" % port) + self._cli("vlan acceptframe all") + self._cli("no vlan ingressfilter") + self._cli("vlan pvid 1") + self._end_interface() + self._end_configure() + + # We define an access port thus: + # * accept only untagged frames on ingress + # * accept packets for only desired VLANs (ingress filter) + # * exists on one VLAN only (1 by default) + # * do not tag frames on transmission (the devices + # we're talking to are expecting untagged frames) + # * PVID should match the VLAN it's on (1 by default, + # but don't do that here) + if mode == "access": + self._configure() + self._cli("interface %s" % port) + self._cli("vlan acceptframe admituntaggedonly") + self._cli("vlan ingressfilter") + self._cli("no vlan tagging 1-1023") + self._cli("no vlan tagging 1024-2047") + self._cli("no vlan tagging 2048-3071") + self._cli("no vlan tagging 3072-4093") + self._end_interface() + self._end_configure() + + # Validate it happened + read_mode = self._port_get_mode(port) + + if read_mode != mode: + raise IOError("Failed to set mode for port %s" % port) + + # And cache the result + self._port_modes[port] = mode + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_mode(port, mode) + + # Set an access port to be in a specified VLAN (tag) + def port_set_access_vlan(self, port, tag): + logging.debug("Setting access port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + + try: + current_vlans = self._get_port_vlans(port) + self._configure() + self._cli("interface %s" % port) + self._cli("vlan pvid %s" % tag) + # Find the list of VLANs we're currently on, and drop them + # all. "auto" mode is fine here, we won't be included + # unless we have GVRP configured, and we don't do + # that. + for current_vlan in current_vlans: + self._cli("vlan participation auto %s" % current_vlan) + # Now specifically include the VLAN we want + self._cli("vlan participation include %s" % tag) + self._cli("no shutdown") + self._end_interface() + self._end_configure() + + # Finally, validate things worked + read_vlan = int(self.port_get_access_vlan(port)) + if read_vlan != tag: + raise IOError("Failed to move access port %d to VLAN %d - got VLAN %d instead" + % (port, tag, read_vlan)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_set_access_vlan(port, tag) + + # Add a trunk port to a specified VLAN (tag) + def port_add_trunk_to_vlan(self, port, tag): + logging.debug("Adding trunk port %s to VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("vlan participation include %d" % tag) + self._cli("vlan tagging %d" % tag) + self._end_interface() + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag or vlan == "ALL": + return + raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_add_trunk_to_vlan(port, tag) + + # Remove a trunk port from a specified VLAN (tag) + def port_remove_trunk_from_vlan(self, port, tag): + logging.debug("Removing trunk port %s from VLAN %d", port, tag) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + + try: + self._configure() + self._cli("interface %s" % port) + self._cli("vlan participation auto %d" % tag) + self._cli("no vlan tagging %d" % tag) + self._end_interface() + self._end_configure() + + # Validate it happened + read_vlans = self.port_get_trunk_vlan_list(port) + for vlan in read_vlans: + if vlan == tag: + raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag)) + + except PExpectError: + # recurse on error + self._switch_connect() + self.port_remove_trunk_from_vlan(port, tag) + + # Get the configured VLAN tag for an access port (tag) + def port_get_access_vlan(self, port): + logging.debug("Getting VLAN for access port %s", port) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "access"): + raise InputError("Port %s not in access mode" % port) + vlans = self._get_port_vlans(port) + if (len(vlans) > 1): + raise IOError("More than one VLAN on access port %s" % port) + return vlans[0] + + # Get the list of configured VLAN tags for a trunk port + def port_get_trunk_vlan_list(self, port): + logging.debug("Getting VLAN(s) for trunk port %s", port) + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + if not (self.port_get_mode(port) == "trunk"): + raise InputError("Port %s not in trunk mode" % port) + return self._get_port_vlans(port) + + ################################ + ### Internal functions + ################################ + + # Connect to the switch and log in + def _switch_connect(self): + + if not self.connection is None: + self.connection.close(True) + self.connection = None + + logging.debug("Connecting to Switch with: %s", self.exec_string) + self.connection = pexpect.spawn(self.exec_string, logfile=self.logger) + self._login() + + # Avoid paged output + self._cli("terminal length 0") + + # And grab details about the switch. in case we need it + self._get_systemdata() + + # And also validate them - make sure we're driving a switch of + # the correct model! Also store the serial number + manuf_regex = re.compile(r'^Manufacturer([\.\s])+(\S+)') + model_regex = re.compile(r'^Machine Model([\.\s])+(\S+)') + sn_regex = re.compile(r'^Serial Number([\.\s])+(\S+)') + + for line in self._systemdata: + match1 = manuf_regex.match(line) + if match1: + manuf = match1.group(2) + + match2 = model_regex.match(line) + if match2: + model = match2.group(2) + + match3 = sn_regex.match(line) + if match3: + self.serial_number = match3.group(2) + + logging.debug("manufacturer is %s", manuf) + logging.debug("model is %s", model) + logging.debug("serial number is %s", self.serial_number) + + if not (self._expected_manuf.match(manuf) and self._expected_model.match(model)): + raise IOError("Switch %s %s not recognised by this driver: abort" % (manuf, model)) + + # Now build a list of our ports, for later sanity checking + self._ports = self._get_port_names() + if len(self._ports) < 4: + raise IOError("Not enough ports detected - problem!") + + def _login(self): + logging.debug("attempting login with username %s, password %s", self._username, self._password) + if self._username is not None: + self.connection.expect("User:") + self._cli("%s" % self._username) + if self._password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._password, False) + while True: + index = self.connection.expect(['User:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) + if index != 4: # Any other means: failed to log in! + logging.error("Login failure: index %d\n", index) + logging.error("Login failure: %s\n", self.connection.match.before) + raise IOError + + # else + self._prompt_name = re.escape(self.connection.match.group(1).strip()) + if self.connection.match.group(2) == ">": + # Need to enter "enable" mode too + self._cli("enable") + if self._enable_password is not None: + self.connection.expect("Password:") + self._cli("%s" % self._enable_password, False) + index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*) *(#|>)']) + if index != 3: # Any other means: failed to log in! + logging.error("Enable password failure: %s\n", self.connection.match) + raise IOError + return 0 + + def _logout(self): + logging.debug("Logging out") + self._cli("quit", False) + try: + self.connection.expect("Would you like to save them now") + self._cli("n") + except (pexpect.EOF): + pass + self.connection.close(True) + + def _configure(self): + self._cli("configure") + + def _end_configure(self): + self._cli("exit") + + def _end_interface(self): + self._end_configure() + + def _read_long_output(self, text): + longbuf = [] + prompt = self._prompt_name + r'\s*#' + while True: + try: + index = self.connection.expect(['--More--', prompt]) + if index == 0: # "--More-- or (q)uit" + for line in self.connection.before.split('\r\n'): + line1 = re.sub('(\x08|\x0D)*', '', line.strip()) + longbuf.append(line1) + self._cli(' ', False) + elif index == 1: # Back to a prompt, says output is finished + break + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_in(text) + raise PExpectError("_read_long_output failed") + except: + logging.error("prompt is \"%s\"", prompt) + raise + + for line in self.connection.before.split('\r\n'): + longbuf.append(line.strip()) + return longbuf + + def _get_port_names(self): + logging.debug("Grabbing list of ports") + interfaces = [] + + # Look for "1" at the beginning of the output lines to just + # match lines with interfaces - they have names like + # "1/0/22". We do not care about Link Aggregation Groups (lag) + # here. + regex = re.compile(r'^(1\S+)') + + try: + self._cli("show port all") + for line in self._read_long_output("show port all"): + match = regex.match(line) + if match: + interface = match.group(1) + interfaces.append(interface) + self._port_numbers[interface] = len(interfaces) + logging.debug(" found %d ports on the switch", len(interfaces)) + return interfaces + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_names() + + # Get the mode of a port: access or trunk + def _port_get_mode(self, port): + logging.debug("Getting mode of port %s", port) + + if not self._is_port_name_valid(port): + raise InputError("Port name %s not recognised" % port) + acceptframe_re = re.compile('vlan acceptframe (.*)') + ingress_re = re.compile('vlan ingressfilter') + + acceptframe = None + ingressfilter = True + + try: + self._cli("show running-config interface %s" % port) + for line in self._read_long_output("show running-config interface"): + + match = acceptframe_re.match(line) + if match: + acceptframe = match.group(1) + + match = ingress_re.match(line) + if match: + ingressfilter = True + + # Simple classifier for now; may need to revisit later... + if (ingressfilter and acceptframe == "admituntaggedonly"): + return "access" + else: + return "trunk" + + except PExpectError: + # recurse on error + self._switch_connect() + return self.port_get_mode(port) + + def _show_config(self): + logging.debug("Grabbing config") + try: + self._cli("show running-config") + return self._read_long_output("show running-config") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_config() + + def _show_clock(self): + logging.debug("Grabbing time") + try: + self._cli("show clock") + return self._read_long_output("show clock") + except PExpectError: + # recurse on error + self._switch_connect() + return self._show_clock() + + def _get_systemdata(self): + logging.debug("Grabbing system sw and hw versions") + + try: + self._systemdata = [] + self._cli("show version") + for line in self._read_long_output("show version"): + self._systemdata.append(line) + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_systemdata() + + def _parse_vlan_list(self, inputdata): + vlans = [] + + if inputdata == "ALL": + return ["ALL"] + elif inputdata == "NONE": + return [] + else: + # Parse the complex list + groups = inputdata.split(',') + for group in groups: + subgroups = group.split('-') + if len(subgroups) == 1: + vlans.append(int(subgroups[0])) + elif len(subgroups) == 2: + for i in range (int(subgroups[0]), int(subgroups[1]) + 1): + vlans.append(i) + else: + logging.debug("Can't parse group \"" + group + "\"") + + return vlans + + def _get_port_vlans(self, port): + vlan_text = None + + vlan_part_re = re.compile('vlan participation include (.*)') + + try: + self._cli("show running-config interface %s" % port) + for line in self._read_long_output("show running-config interface"): + match = vlan_part_re.match(line) + if match: + if vlan_text != None: + vlan_text += "," + vlan_text += (match.group(1)) + else: + vlan_text = match.group(1) + + if vlan_text is None: + return [1] + else: + vlans = self._parse_vlan_list(vlan_text) + return vlans + + except PExpectError: + # recurse on error + self._switch_connect() + return self._get_port_vlans(port) + + # Wrapper around connection.send - by default, expect() the same + # text we've sent, to remove it from the output from the + # switch. For the few cases where we don't need that, override + # this using echo=False. + # Horrible, but seems to work. + def _cli(self, text, echo=True): + self.connection.send(text + '\r') + if echo: + try: + self.connection.expect(text) + except (pexpect.EOF, pexpect.TIMEOUT): + # Something went wrong; logout, log in and try again! + logging.error("PEXPECT FAILURE, RECONNECT") + self.errors.log_error_out(text) + raise PExpectError("_cli failed on %s" % text) + except: + logging.error("Unexpected error: %s", sys.exc_info()[0]) + raise + +if __name__ == "__main__": + + # Simple test harness - exercise the main working functions above to verify + # they work. This does *NOT* test really disruptive things like "save + # running-config" and "reload" - test those by hand. + + import optparse + + switch = 'vlandswitch05' + parser = optparse.OptionParser() + parser.add_option("--switch", + dest = "switch", + action = "store", + nargs = 1, + type = "string", + help = "specify switch to connect to for testing", + metavar = "<switch>") + (opts, args) = parser.parse_args() + if opts.switch: + switch = opts.switch + + logging.basicConfig(level = logging.DEBUG, + format = '%(asctime)s %(levelname)-8s %(message)s') + p = NetgearXSM(switch, 23, debug=True) + p.switch_connect('admin', '', None) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.vlan_get_name(2) + print "VLAN 2 is named \"%s\"" % buf + + print "Create VLAN 3" + p.vlan_create(3) + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "Set name of VLAN 3 to test333" + p.vlan_set_name(3, "test333") + + buf = p.vlan_get_name(3) + print "VLAN 3 is named \"%s\"" % buf + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + print "Destroy VLAN 3" + p.vlan_destroy(3) + + print "VLANs are:" + buf = p.vlan_get_list() + p.dump_list(buf) + + buf = p.port_get_mode("1/0/10") + print "Port 1/0/10 is in %s mode" % buf + + buf = p.port_get_mode("1/0/11") + print "Port 1/0/11 is in %s mode" % buf + + # Test access stuff + print "Set 1/0/9 to access mode" + p.port_set_mode("1/0/9", "access") + + print "Move 1/0/9 to VLAN 4" + p.port_set_access_vlan("1/0/9", 4) + + buf = p.port_get_access_vlan("1/0/9") + print "Read from switch: 1/0/9 is on VLAN %s" % buf + + print "Move 1/0/9 back to VLAN 1" + p.port_set_access_vlan("1/0/9", 1) + + print "Create VLAN 2" + p.vlan_create(2) + + print "Create VLAN 3" + p.vlan_create(3) + + print "Create VLAN 4" + p.vlan_create(4) + + # Test access stuff + print "Set 1/0/9 to trunk mode" + p.port_set_mode("1/0/9", "trunk") + print "Read from switch: which VLANs is 1/0/9 on?" + buf = p.port_get_trunk_vlan_list("1/0/9") + p.dump_list(buf) + + # The adds below are NOOPs in effect on this switch - no filtering + # for "trunk" ports + print "Add 1/0/9 to VLAN 2" + p.port_add_trunk_to_vlan("1/0/9", 2) + print "Add 1/0/9 to VLAN 3" + p.port_add_trunk_to_vlan("1/0/9", 3) + print "Add 1/0/9 to VLAN 4" + p.port_add_trunk_to_vlan("1/0/9", 4) + print "Read from switch: which VLANs is 1/0/9 on?" + buf = p.port_get_trunk_vlan_list("1/0/9") + p.dump_list(buf) + + # And the same for removals here + p.port_remove_trunk_from_vlan("1/0/9", 3) + p.port_remove_trunk_from_vlan("1/0/9", 2) + p.port_remove_trunk_from_vlan("1/0/9", 4) + print "Read from switch: which VLANs is 1/0/9 on?" + buf = p.port_get_trunk_vlan_list("1/0/9") + p.dump_list(buf) + +# print 'Restarting switch, to explicitly reset config' +# p.switch_restart() + +# p.switch_save_running_config() + +# p.switch_disconnect() +# p._show_config() |