aboutsummaryrefslogtreecommitdiff
path: root/Vland/drivers
diff options
context:
space:
mode:
Diffstat (limited to 'Vland/drivers')
-rw-r--r--Vland/drivers/CiscoCatalyst.py721
-rw-r--r--Vland/drivers/CiscoSX300.py697
-rw-r--r--Vland/drivers/Dummy.py361
-rw-r--r--Vland/drivers/Mellanox.py795
-rw-r--r--Vland/drivers/NetgearXSM.py782
-rw-r--r--Vland/drivers/TPLinkTLSG2XXX.py695
-rw-r--r--Vland/drivers/__init__.py0
-rw-r--r--Vland/drivers/common.py167
8 files changed, 4218 insertions, 0 deletions
diff --git a/Vland/drivers/CiscoCatalyst.py b/Vland/drivers/CiscoCatalyst.py
new file mode 100644
index 0000000..16f47db
--- /dev/null
+++ b/Vland/drivers/CiscoCatalyst.py
@@ -0,0 +1,721 @@
+#! /usr/bin/python
+
+# Copyright 2014-2015 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import logging
+import sys
+import re
+import pexpect
+
+if __name__ == '__main__':
+ import os
+ vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
+ sys.path.insert(0, vlandpath)
+ sys.path.insert(0, "%s/.." % vlandpath)
+
+from errors import InputError, PExpectError
+from drivers.common import SwitchDriver, SwitchErrors
+
+class CiscoCatalyst(SwitchDriver):
+
+ connection = None
+ _username = None
+ _password = None
+ _enable_password = None
+
+ _capabilities = [
+ 'TrunkWildCardVlans' # Trunk ports are on all VLANs by
+ # default, so we shouldn't need to
+ # bugger with them
+ ]
+
+ # Regexp of expected hardware information - fail if we don't see
+ # this
+ _expected_descr_re = re.compile(r'WS-C\S+-\d+P')
+
+ def __init__(self, switch_hostname, switch_telnetport=23, debug = False):
+ SwitchDriver.__init__(self, switch_hostname, debug)
+ self._systemdata = []
+ self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
+ self.errors = SwitchErrors()
+
+ ################################
+ ### Switch-level API functions
+ ################################
+
+ # Save the current running config into flash - we want config to
+ # remain across reboots
+ def switch_save_running_config(self):
+ try:
+ self._cli("copy running-config startup-config")
+ self.connection.expect("startup-config")
+ self._cli("startup-config")
+ self.connection.expect("OK")
+ except (PExpectError, pexpect.EOF):
+ # recurse on error
+ self._switch_connect()
+ self.switch_save_running_config()
+
+ # Restart the switch - we need to reload config to do a
+ # roll-back. Do NOT save running-config first if the switch asks -
+ # we're trying to dump recent changes, not save them.
+ #
+ # This will also implicitly cause a connection to be closed
+ def switch_restart(self):
+ self._cli("reload")
+ index = self.connection.expect(['has been modified', 'Proceed'])
+ if index == 0:
+ self._cli("n") # No, don't save
+ self.connection.expect("Proceed")
+
+ # Fall through
+ self._cli("y") # Yes, continue to reset
+ self.connection.close(True)
+
+ # List the capabilities of the switch (and driver) - some things
+ # make no sense to abstract. Returns a dict of strings, each one
+ # describing an extra feature that that higher levels may care
+ # about
+ def switch_get_capabilities(self):
+ return self._capabilities
+
+ ################################
+ ### VLAN API functions
+ ################################
+
+ # Create a VLAN with the specified tag
+ def vlan_create(self, tag):
+ logging.debug("Creating VLAN %d", tag)
+ try:
+ self._configure()
+ self._cli("vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ return
+ raise IOError("Failed to create VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_create(tag)
+
+ # Destroy a VLAN with the specified tag
+ def vlan_destroy(self, tag):
+ logging.debug("Destroying VLAN %d", tag)
+
+ try:
+ self._configure()
+ self._cli("no vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ raise IOError("Failed to destroy VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_destroy(tag)
+
+ # Set the name of a VLAN
+ def vlan_set_name(self, tag, name):
+ logging.debug("Setting name of VLAN %d to %s", tag, name)
+
+ try:
+ self._configure()
+ self._cli("vlan %d" % tag)
+ self._cli("name %s" % name)
+ self._end_configure()
+
+ # Validate it happened
+ read_name = self.vlan_get_name(tag)
+ if read_name != name:
+ raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
+ % (tag, read_name, name))
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_set_name(tag, name)
+
+ # Get a list of the VLAN tags currently registered on the switch
+ def vlan_get_list(self):
+ logging.debug("Grabbing list of VLANs")
+
+ try:
+ vlans = []
+
+ regex = re.compile(r'^ *(\d+).*(active)')
+
+ self._cli("show vlan brief")
+ for line in self._read_long_output("show vlan brief"):
+ match = regex.match(line)
+ if match:
+ vlans.append(int(match.group(1)))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_list()
+
+ # For a given VLAN tag, ask the switch what the associated name is
+ def vlan_get_name(self, tag):
+
+ try:
+ logging.debug("Grabbing the name of VLAN %d", tag)
+ name = None
+ regex = re.compile(r'^ *\d+\s+(\S+).*(active)')
+ self._cli("show vlan id %d" % tag)
+ for line in self._read_long_output("show vlan id"):
+ match = regex.match(line)
+ if match:
+ name = match.group(1)
+ name.strip()
+ return name
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_name(tag)
+
+ ################################
+ ### Port API functions
+ ################################
+
+ # Set the mode of a port: access or trunk
+ def port_set_mode(self, port, mode):
+ logging.debug("Setting port %s to %s", port, mode)
+ if not self._is_port_mode_valid(mode):
+ raise InputError("Port mode %s is not allowed" % mode)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ if mode == "trunk":
+ self._cli("switchport trunk encapsulation dot1q")
+ self._cli("switchport trunk native vlan 1")
+ self._cli("switchport mode %s" % mode)
+ self._end_configure()
+
+ # Validate it happened
+ read_mode = self._port_get_mode(port)
+
+ if read_mode != mode:
+ raise IOError("Failed to set mode for port %s" % port)
+
+ # And cache the result
+ self._port_modes[port] = mode
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_mode(port, mode)
+
+ # Set an access port to be in a specified VLAN (tag)
+ def port_set_access_vlan(self, port, tag):
+ logging.debug("Setting access port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport access vlan %d" % tag)
+ self._cli("no shutdown")
+ self._end_configure()
+
+ # Finally, validate things worked
+ read_vlan = int(self.port_get_access_vlan(port))
+ if read_vlan != tag:
+ raise IOError("Failed to move access port %d to VLAN %d - got VLAN %d instead"
+ % (port, tag, read_vlan))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_access_vlan(port, tag)
+
+ # Add a trunk port to a specified VLAN (tag)
+ def port_add_trunk_to_vlan(self, port, tag):
+ logging.debug("Adding trunk port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport trunk allowed vlan add %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag or vlan == "ALL":
+ return
+ raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_add_trunk_to_vlan(port, tag)
+
+ # Remove a trunk port from a specified VLAN (tag)
+ def port_remove_trunk_from_vlan(self, port, tag):
+ logging.debug("Removing trunk port %s from VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport trunk allowed vlan remove %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag:
+ raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_remove_trunk_from_vlan(port, tag)
+
+ # Get the configured VLAN tag for an access port (tag)
+ def port_get_access_vlan(self, port):
+ logging.debug("Getting VLAN for access port %s", port)
+ vlan = 1
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+ regex = re.compile(r'Access Mode VLAN: (\d+)')
+
+ try:
+ self._cli("show interfaces %s switchport" % port)
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ vlan = match.group(1)
+ return int(vlan)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_access_vlan(port)
+
+ # Get the list of configured VLAN tags for a trunk port
+ def port_get_trunk_vlan_list(self, port):
+ logging.debug("Getting VLANs for trunk port %s", port)
+ vlans = [ ]
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+ regex_start = re.compile('Trunking VLANs Enabled: (.*)')
+ regex_continue = re.compile(r'\s*(\d.*)')
+
+ try:
+ self._cli("show interfaces %s switchport" % port)
+
+ # Horrible parsing work - VLAN list may extend over several lines
+ in_match = False
+ vlan_text = ''
+
+ for line in self._read_long_output("show interfaces switchport"):
+ if in_match:
+ match = regex_continue.match(line)
+ if match:
+ vlan_text += match.group(1)
+ else:
+ in_match = False
+ else:
+ match = regex_start.match(line)
+ if match:
+ vlan_text += match.group(1)
+ in_match = True
+
+ vlans = self._parse_vlan_list(vlan_text)
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_trunk_vlan_list(port)
+
+ ################################
+ ### Internal functions
+ ################################
+
+ # Connect to the switch and log in
+ def _switch_connect(self):
+
+ if not self.connection is None:
+ self.connection.close(True)
+ self.connection = None
+
+ logging.debug("Connecting to Switch with: %s", self.exec_string)
+ self.connection = pexpect.spawn(self.exec_string, logfile=self.logger)
+ self._login()
+
+ # Avoid paged output
+ self._cli("terminal length 0")
+
+ # And grab details about the switch. in case we need it
+ self._get_systemdata()
+
+ # And also validate them - make sure we're driving a switch of
+ # the correct model! Also store the serial number
+ descr_regex = re.compile(r'^cisco\s+(\S+)')
+ sn_regex = re.compile(r'System serial number\s+:\s+(\S+)')
+ descr = ""
+
+ for line in self._systemdata:
+ match = descr_regex.match(line)
+ if match:
+ descr = match.group(1)
+ match = sn_regex.match(line)
+ if match:
+ self.serial_number = match.group(1)
+
+ logging.debug("serial number is %s", self.serial_number)
+ logging.debug("system description is %s", descr)
+
+ if not self._expected_descr_re.match(descr):
+ raise IOError("Switch %s not recognised by this driver: abort" % descr)
+
+ # Now build a list of our ports, for later sanity checking
+ self._ports = self._get_port_names()
+ if len(self._ports) < 4:
+ raise IOError("Not enough ports detected - problem!")
+
+ def _login(self):
+ logging.debug("attempting login with username %s, password %s", self._username, self._password)
+ self.connection.expect('User Access Verification')
+ if self._username is not None:
+ self.connection.expect("User Name:")
+ self._cli("%s" % self._username)
+ if self._password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._password, False)
+ while True:
+ index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
+ if index != 4: # Any other means: failed to log in!
+ logging.error("Login failure: index %d\n", index)
+ logging.error("Login failure: %s\n", self.connection.match.before)
+ raise IOError
+
+ # else
+ self._prompt_name = re.escape(self.connection.match.group(1).strip())
+ if self.connection.match.group(2) == ">":
+ # Need to enter "enable" mode too
+ self._cli("enable")
+ if self._enable_password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._enable_password, False)
+ index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
+ if index != 3: # Any other means: failed to log in!
+ logging.error("Enable password failure: %s\n", self.connection.match)
+ raise IOError
+ return 0
+
+ def _logout(self):
+ logging.debug("Logging out")
+ self._cli("exit", False)
+ self.connection.close(True)
+
+ def _configure(self):
+ self._cli("configure terminal")
+
+ def _end_configure(self):
+ self._cli("end")
+
+ def _read_long_output(self, text):
+ prompt = self._prompt_name + '#'
+ try:
+ self.connection.expect(prompt)
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_in(text)
+ raise PExpectError("_read_long_output failed")
+ except:
+ logging.error("prompt is \"%s\"", prompt)
+ raise
+
+ longbuf = []
+ for line in self.connection.before.split('\r\n'):
+ longbuf.append(line.strip())
+ return longbuf
+
+ def _get_port_names(self):
+ logging.debug("Grabbing list of ports")
+ interfaces = []
+
+ # Use "connect" to only identify lines in the output that
+ # match interfaces - it will match lines with "connected" or
+ # "notconnect".
+ regex = re.compile(r'^\s*([a-zA-Z0-9_/]*).*(connect)(.*)')
+ # Deliberately drop things marked as "routed", i.e. the
+ # management port
+ regex2 = re.compile('.*routed.*')
+
+ try:
+ self._cli("show interfaces status")
+ for line in self._read_long_output("show interfaces status"):
+ match = regex.match(line)
+ if match:
+ interface = match.group(1)
+ junk = match.group(3)
+ match2 = regex2.match(junk)
+ if not match2:
+ interfaces.append(interface)
+ self._port_numbers[interface] = len(interfaces)
+ logging.debug(" found %d ports on the switch", len(interfaces))
+ return interfaces
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_port_names()
+
+ # Get the mode of a port: access or trunk
+ def _port_get_mode(self, port):
+ logging.debug("Getting mode of port %s", port)
+ mode = ''
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ regex = re.compile('Administrative Mode: (.*)')
+
+ try:
+ self._cli("show interfaces %s switchport" % port)
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ mode = match.group(1)
+ if mode == 'static access':
+ return 'access'
+ if mode == 'trunk':
+ return 'trunk'
+ # Needs special handling - it's the default port
+ # mode on these switches, and it doesn't
+ # interoperate with some other vendors. Sigh.
+ if mode == 'dynamic auto':
+ return 'dynamic'
+ return mode
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_mode(port)
+
+ def _show_config(self):
+ logging.debug("Grabbing config")
+ try:
+ self._cli("show running-config")
+ return self._read_long_output("show running-config")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_config()
+
+ def _show_clock(self):
+ logging.debug("Grabbing time")
+ try:
+ self._cli("show clock")
+ return self._read_long_output("show clock")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_clock()
+
+ def _get_systemdata(self):
+ logging.debug("Grabbing system sw and hw versions")
+
+ try:
+ self._systemdata = []
+ self._cli("show version")
+ for line in self._read_long_output("show version"):
+ self._systemdata.append(line)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_systemdata()
+
+ def _parse_vlan_list(self, inputdata):
+ vlans = []
+
+ if inputdata == "ALL":
+ return ["ALL"]
+ elif inputdata == "NONE":
+ return []
+ else:
+ # Parse the complex list
+ groups = inputdata.split(',')
+ for group in groups:
+ subgroups = group.split('-')
+ if len(subgroups) == 1:
+ vlans.append(int(subgroups[0]))
+ elif len(subgroups) == 2:
+ for i in range (int(subgroups[0]), int(subgroups[1]) + 1):
+ vlans.append(i)
+ else:
+ logging.debug("Can't parse group \"" + group + "\"")
+
+ return vlans
+
+ # Wrapper around connection.send - by default, expect() the same
+ # text we've sent, to remove it from the output from the
+ # switch. For the few cases where we don't need that, override
+ # this using echo=False.
+ # Horrible, but seems to work.
+ def _cli(self, text, echo=True):
+ self.connection.send(text + '\r')
+ if echo:
+ try:
+ self.connection.expect(text)
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_out(text)
+ raise PExpectError("_cli failed on %s" % text)
+ except:
+ logging.error("Unexpected error: %s", sys.exc_info()[0])
+ raise
+
+if __name__ == "__main__":
+
+ # Simple test harness - exercise the main working functions above to verify
+ # they work. This does *NOT* test really disruptive things like "save
+ # running-config" and "reload" - test those by hand.
+
+ import optparse
+
+ switch = 'vlandswitch01'
+ parser = optparse.OptionParser()
+ parser.add_option("--switch",
+ dest = "switch",
+ action = "store",
+ nargs = 1,
+ type = "string",
+ help = "specify switch to connect to for testing",
+ metavar = "<switch>")
+ (opts, args) = parser.parse_args()
+ if opts.switch:
+ switch = opts.switch
+
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(levelname)-8s %(message)s')
+ p = CiscoCatalyst(switch, 23, debug=True)
+ p.switch_connect(None, 'lngvirtual', 'lngenable')
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.vlan_get_name(2)
+ print "VLAN 2 is named \"%s\"" % buf
+
+ print "Create VLAN 3"
+ p.vlan_create(3)
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "Set name of VLAN 3 to test333"
+ p.vlan_set_name(3, "test333")
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ print "Destroy VLAN 3"
+ p.vlan_destroy(3)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.port_get_mode("Gi1/0/10")
+ print "Port Gi1/0/10 is in %s mode" % buf
+
+ buf = p.port_get_mode("Gi1/0/11")
+ print "Port Gi1/0/11 is in %s mode" % buf
+
+ # Test access stuff
+ print "Set Gi1/0/9 to access mode"
+ p.port_set_mode("Gi1/0/9", "access")
+
+ print "Move Gi1/0/9 to VLAN 4"
+ p.port_set_access_vlan("Gi1/0/9", 4)
+
+ buf = p.port_get_access_vlan("Gi1/0/9")
+ print "Read from switch: Gi1/0/9 is on VLAN %s" % buf
+
+ print "Move Gi1/0/9 back to VLAN 1"
+ p.port_set_access_vlan("Gi1/0/9", 1)
+
+ # Test access stuff
+ print "Set Gi1/0/9 to trunk mode"
+ p.port_set_mode("Gi1/0/9", "trunk")
+ print "Read from switch: which VLANs is Gi1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("Gi1/0/9")
+ p.dump_list(buf)
+ print "Add Gi1/0/9 to VLAN 2"
+ p.port_add_trunk_to_vlan("Gi1/0/9", 2)
+ print "Add Gi1/0/9 to VLAN 3"
+ p.port_add_trunk_to_vlan("Gi1/0/9", 3)
+ print "Add Gi1/0/9 to VLAN 4"
+ p.port_add_trunk_to_vlan("Gi1/0/9", 4)
+ print "Read from switch: which VLANs is Gi1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("Gi1/0/9")
+ p.dump_list(buf)
+
+ p.port_remove_trunk_from_vlan("Gi1/0/9", 3)
+ p.port_remove_trunk_from_vlan("Gi1/0/9", 3)
+ p.port_remove_trunk_from_vlan("Gi1/0/9", 4)
+ print "Read from switch: which VLANs is Gi1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("Gi1/0/9")
+ p.dump_list(buf)
+
+# print 'Restarting switch, to explicitly reset config'
+# p.switch_restart()
+
+# p.switch_save_running_config()
+
+# p.switch_disconnect()
+# p._show_config()
diff --git a/Vland/drivers/CiscoSX300.py b/Vland/drivers/CiscoSX300.py
new file mode 100644
index 0000000..a6f5446
--- /dev/null
+++ b/Vland/drivers/CiscoSX300.py
@@ -0,0 +1,697 @@
+#! /usr/bin/python
+
+# Copyright 2014-2015 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import logging
+import sys
+import re
+import pexpect
+
+if __name__ == '__main__':
+ import os
+ vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
+ sys.path.insert(0, vlandpath)
+ sys.path.insert(0, "%s/.." % vlandpath)
+
+from errors import InputError, PExpectError
+from drivers.common import SwitchDriver, SwitchErrors
+
+class CiscoSX300(SwitchDriver):
+
+ connection = None
+ _username = None
+ _password = None
+ _enable_password = None
+
+ # No extra capabilities for this switch/driver yet
+ _capabilities = [
+ ]
+
+ # Regexp of expected hardware information - fail if we don't see
+ # this
+ _expected_descr_re = re.compile(r'.*\d+-Port.*Managed Switch.*')
+
+ def __init__(self, switch_hostname, switch_telnetport=23, debug = False):
+ SwitchDriver.__init__(self, switch_hostname, debug)
+ self._systemdata = []
+ self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
+ self.errors = SwitchErrors()
+
+ ################################
+ ### Switch-level API functions
+ ################################
+
+ # Save the current running config into flash - we want config to
+ # remain across reboots
+ def switch_save_running_config(self):
+ try:
+ self._cli("copy running-config startup-config")
+ self.connection.expect("Y/N")
+ self._cli("y")
+ self.connection.expect("succeeded")
+ except (PExpectError, pexpect.EOF, pexpect.TIMEOUT):
+ # recurse on error
+ self._switch_connect()
+ self.switch_save_running_config()
+
+ # Restart the switch - we need to reload config to do a
+ # roll-back. Do NOT save running-config first if the switch asks -
+ # we're trying to dump recent changes, not save them.
+ #
+ # This will also implicitly cause a connection to be closed
+ def switch_restart(self):
+ self._cli("reload")
+ index = self.connection.expect(['Are you sure', 'will reset'])
+ if index == 0:
+ self._cli("y") # Yes, continue without saving
+ self.connection.expect("reset the whole")
+
+ # Fall through
+ self._cli("y") # Yes, continue to reset
+ self.connection.close(True)
+
+ ################################
+ ### VLAN API functions
+ ################################
+
+ # Create a VLAN with the specified tag
+ def vlan_create(self, tag):
+ logging.debug("Creating VLAN %d", tag)
+
+ try:
+ self._configure()
+ self._cli("vlan database")
+ self._cli("vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ return
+ raise IOError("Failed to create VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_create(tag)
+
+ # Destroy a VLAN with the specified tag
+ def vlan_destroy(self, tag):
+ logging.debug("Destroying VLAN %d", tag)
+
+ try:
+ self._configure()
+ self._cli("no vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ raise IOError("Failed to destroy VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_destroy(tag)
+
+ # Set the name of a VLAN
+ def vlan_set_name(self, tag, name):
+ logging.debug("Setting name of VLAN %d to %s", tag, name)
+
+ try:
+ self._configure()
+ self._cli("vlan %d" % tag)
+ self._cli("interface vlan %d" % tag)
+ self._cli("name %s" % name)
+ self._end_configure()
+
+ # Validate it happened
+ read_name = self.vlan_get_name(tag)
+ if read_name != name:
+ raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
+ % (tag, read_name, name))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_set_name(tag, name)
+
+ # Get a list of the VLAN tags currently registered on the switch
+ def vlan_get_list(self):
+ logging.debug("Grabbing list of VLANs")
+
+ try:
+ vlans = []
+ regex = re.compile(r'^ *(\d+).*(D|S|G|R)')
+
+ self._cli("show vlan")
+ for line in self._read_long_output("show vlan"):
+ match = regex.match(line)
+ if match:
+ vlans.append(int(match.group(1)))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_list()
+
+ # For a given VLAN tag, ask the switch what the associated name is
+ def vlan_get_name(self, tag):
+ logging.debug("Grabbing the name of VLAN %d", tag)
+
+ try:
+ name = None
+ regex = re.compile(r'^ *\d+\s+(\S+).*(D|S|G|R)')
+ self._cli("show vlan tag %d" % tag)
+ for line in self._read_long_output("show vlan tag"):
+ match = regex.match(line)
+ if match:
+ name = match.group(1)
+ name.strip()
+ return name
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_name(tag)
+
+ ################################
+ ### Port API functions
+ ################################
+
+ # Set the mode of a port: access or trunk
+ def port_set_mode(self, port, mode):
+ logging.debug("Setting port %s to %s", port, mode)
+ if not self._is_port_mode_valid(mode):
+ raise InputError("Port mode %s is not allowed" % mode)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport mode %s" % mode)
+ self._end_configure()
+
+ # Validate it happened
+ read_mode = self._port_get_mode(port)
+ if read_mode != mode:
+ raise IOError("Failed to set mode for port %s" % port)
+
+ # And cache the result
+ self._port_modes[port] = mode
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_mode(port, mode)
+
+ # Set an access port to be in a specified VLAN (tag)
+ def port_set_access_vlan(self, port, tag):
+ logging.debug("Setting access port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport access vlan %d" % tag)
+ self._end_configure()
+
+ # Validate things worked
+ read_vlan = int(self.port_get_access_vlan(port))
+ if read_vlan != tag:
+ raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead"
+ % (port, tag, read_vlan))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_access_vlan(port, tag)
+
+ # Add a trunk port to a specified VLAN (tag)
+ def port_add_trunk_to_vlan(self, port, tag):
+ logging.debug("Adding trunk port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport trunk allowed vlan add %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag:
+ return
+ raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_add_trunk_to_vlan(port, tag)
+
+ # Remove a trunk port from a specified VLAN (tag)
+ def port_remove_trunk_from_vlan(self, port, tag):
+ logging.debug("Removing trunk port %s from VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("switchport trunk allowed vlan remove %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag:
+ raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_remove_trunk_from_vlan(port, tag)
+
+ # Get the configured VLAN tag for an access port (tag)
+ def port_get_access_vlan(self, port):
+ logging.debug("Getting VLAN for access port %s", port)
+ vlan = 1
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+ regex = re.compile(r'(\d+)\s+\S+\s+Untagged\s+(D|S|G|R)')
+
+ try:
+ self._cli("show interfaces switchport %s" % port)
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ vlan = match.group(1)
+ return int(vlan)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_access_vlan(port)
+
+ # Get the list of configured VLAN tags for a trunk port
+ def port_get_trunk_vlan_list(self, port):
+ logging.debug("Getting VLANs for trunk port %s", port)
+ vlans = [ ]
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+ regex = re.compile(r'(\d+)\s+\S+\s+(Tagged|Untagged)\s+(D|S|G|R)')
+
+ try:
+ self._cli("show interfaces switchport %s" % port)
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ vlans.append (int(match.group(1)))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_trunk_vlan_list(port)
+
+ ################################
+ ### Internal functions
+ ################################
+
+ # Connect to the switch and log in
+ def _switch_connect(self):
+
+ if not self.connection is None:
+ self.connection.close(True)
+ self.connection = None
+
+ logging.debug("Connecting to Switch with: %s", self.exec_string)
+ self.connection = pexpect.spawn(self.exec_string, logfile=self.logger)
+ self._login()
+
+ # Avoid paged output
+ self._cli("terminal datadump")
+
+ # And grab details about the switch. in case we need it
+ self._get_systemdata()
+
+ # And also validate them - make sure we're driving a switch of
+ # the correct model! Also store the serial number
+ descr_regex = re.compile(r'System Description:.\s+(.*)')
+ sn_regex = re.compile(r'SN:\s+(\S_)')
+ descr = ""
+
+ for line in self._systemdata:
+ match = descr_regex.match(line)
+ if match:
+ descr = match.group(1)
+ match = sn_regex.match(line)
+ if match:
+ self.serial_number = match.group(1)
+
+ if not self._expected_descr_re.match(descr):
+ raise IOError("Switch %s not recognised by this driver: abort" % descr)
+
+ # Now build a list of our ports, for later sanity checking
+ self._ports = self._get_port_names()
+ if len(self._ports) < 4:
+ raise IOError("Not enough ports detected - problem!")
+
+ def _login(self):
+ logging.debug("attempting login with username %s, password %s", self._username, self._password)
+ self._cli("")
+ self.connection.expect("User Name:")
+ self._cli("%s" % self._username)
+ self.connection.expect("Password:")
+ self._cli("%s" % self._password, False)
+ self.connection.expect(r"\*\*")
+ while True:
+ index = self.connection.expect(['User Name:', 'authentication failed', r'([^#]+)#', 'Password:', '.+'])
+ if index == 0 or index == 1: # Failed to log in!
+ logging.error("Login failure: %s\n", self.connection.match)
+ raise IOError
+ elif index == 2:
+ self._prompt_name = re.escape(self.connection.match.group(1).strip())
+ # Horrible output from the switch at login time may
+ # confuse our pexpect stuff here. If we've somehow got
+ # multiple lines of output, clean up and just take the
+ # *last* line here. Anything before that is going to
+ # just be noise from the "***" password input, etc.
+ prompt_lines = self._prompt_name.split('\r\n')
+ if len(prompt_lines) > 1:
+ self._prompt_name = prompt_lines[-1]
+ logging.debug("Got prompt name %s", self._prompt_name)
+ return 0
+ elif index == 3 or index == 4:
+ self._cli("", False)
+
+ def _logout(self):
+ logging.debug("Logging out")
+ self._cli("exit", False)
+ self.connection.close(True)
+
+ def _configure(self):
+ self._cli("configure terminal")
+
+ def _end_configure(self):
+ self._cli("end")
+
+ def _read_long_output(self, text):
+ prompt = self._prompt_name + '#'
+ try:
+ self.connection.expect(prompt)
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_in(text)
+ raise PExpectError("_read_long_output failed")
+ except:
+ logging.error("prompt is \"%s\"", prompt)
+ raise
+
+ longbuf = []
+ for line in self.connection.before.split('\r\n'):
+ longbuf.append(line.strip())
+ return longbuf
+
+ def _get_port_names(self):
+ logging.debug("Grabbing list of ports")
+ interfaces = []
+
+ # Use "Up" or "Down" to only identify lines in the output that
+ # match interfaces that exist
+ regex = re.compile(r'^(\w+).*(Up|Down)')
+
+ try:
+ self._cli("show interfaces status detailed")
+ for line in self._read_long_output("show interfaces status detailed"):
+ match = regex.match(line)
+ if match:
+ interface = match.group(1)
+ interfaces.append(interface)
+ self._port_numbers[interface] = len(interfaces)
+ logging.debug(" found %d ports on the switch", len(interfaces))
+ return interfaces
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_port_names()
+
+ # Get the mode of a port: access or trunk
+ def _port_get_mode(self, port):
+ logging.debug("Getting mode of port %s", port)
+ mode = ''
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ regex = re.compile(r'Port Mode: (\S+)')
+
+ try:
+ self._cli("show interfaces switchport %s" % port)
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ mode = match.group(1)
+ return mode.lower()
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_mode(port)
+
+ def _show_config(self):
+ logging.debug("Grabbing config")
+ try:
+ self._cli("show running-config")
+ return self._read_long_output("show running-config")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_config()
+
+ def _show_clock(self):
+ logging.debug("Grabbing time")
+ try:
+ self._cli("show clock")
+ return self._read_long_output("show clock")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_clock()
+
+ def _get_systemdata(self):
+ logging.debug("Grabbing system data")
+
+ try:
+ self._systemdata = []
+ self._cli("show system")
+ for line in self._read_long_output("show system"):
+ self._systemdata.append(line)
+
+ logging.debug("Grabbing system sw and hw versions")
+ self._cli("show version")
+ for line in self._read_long_output("show version"):
+ self._systemdata.append(line)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_systemdata()
+
+ ######################################
+ # Internal port access helper methods
+ ######################################
+ # N.B. No parameter checking here, for speed reasons - if you're
+ # calling this internal API then you should already have validated
+ # things yourself! Equally, no post-set checks in here - do that
+ # at the higher level.
+ ######################################
+
+ # Wrapper around connection.send - by default, expect() the same
+ # text we've sent, to remove it from the output from the
+ # switch. For the few cases where we don't need that, override
+ # this using echo=False.
+ # Horrible, but seems to work.
+ def _cli(self, text, echo=True):
+ self.connection.send(text + '\r')
+ if echo:
+ try:
+ self.connection.expect(text)
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_out(text)
+ raise PExpectError("_cli failed on %s" % text)
+ except:
+ logging.error("Unexpected error: %s", sys.exc_info()[0])
+ raise
+
+if __name__ == "__main__":
+# p = CiscoSX300('10.172.2.52', 23)
+
+ # Simple test harness - exercise the main working functions above to verify
+ # they work. This does *NOT* test really disruptive things like "save
+ # running-config" and "reload" - test those by hand.
+
+ import optparse
+
+ switch = 'vlandswitch02'
+ parser = optparse.OptionParser()
+ parser.add_option("--switch",
+ dest = "switch",
+ action = "store",
+ nargs = 1,
+ type = "string",
+ help = "specify switch to connect to for testing",
+ metavar = "<switch>")
+ (opts, args) = parser.parse_args()
+ if opts.switch:
+ switch = opts.switch
+
+ portname_base = "fa"
+ # Text to match if we're on a SG-series switch, ports all called gi<number>
+ sys_descr_re = re.compile('System Description.*Gigabit')
+
+ def _port_name(number):
+ return "%s%d" % (portname_base, number)
+
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(levelname)-8s %(message)s')
+ p = CiscoSX300(switch, 23, debug = True)
+ p.switch_connect('cisco', 'cisco', None)
+ #buf = p._show_clock()
+ #print "%s" % buf
+ #buf = p._show_config()
+ #p.dump_list(buf)
+
+ print "System data:"
+ p.dump_list(p._systemdata)
+ for l in p._systemdata:
+ m = sys_descr_re.match(l)
+ if m:
+ print 'Found an SG switch, using "gi" as port name prefix for testing'
+ portname_base = "gi"
+
+ if portname_base == "fa":
+ print 'Found an SF switch, using "fa" as port name prefix for testing'
+
+ print "Creating VLANs for testing:"
+ for i in [ 2, 3, 4, 5, 20 ]:
+ p.vlan_create(i)
+ p.vlan_set_name(i, "test%d" % i)
+ print " %d (test%d)" % (i, i)
+
+ #print "And dump config\n"
+ #buf = p._show_config()
+ #print "%s" % buf
+
+ #print "Destroying VLAN 2\n"
+ #p.vlan_destroy(2)
+
+ #print "And dump config\n"
+ #buf = p._show_config()
+ #print "%s" % buf
+
+ #print "Port names are:"
+ #buf = p.switch_get_port_names()
+ #p.dump_list(buf)
+
+ #buf = p.vlan_get_name(25)
+ #print "VLAN with tag 25 is called \"%s\"" % buf
+
+ #p.vlan_set_name(35, "foo")
+ #print "VLAN with tag 35 is called \"foo\""
+
+ #buf = p.port_get_mode(_port_name(12))
+ #print "Port %s is in %s mode" % (_port_name(12), buf)
+
+ # Test access stuff
+ print "Set %s to access mode" % _port_name(6)
+ p.port_set_mode(_port_name(6), "access")
+ print "Move %s to VLAN 2" % _port_name(6)
+ p.port_set_access_vlan(_port_name(6), 2)
+ buf = p.port_get_access_vlan(_port_name(6))
+ print "Read from switch: %s is on VLAN %s" % (_port_name(6), buf)
+ print "Move %s back to default VLAN 1" % _port_name(6)
+ p.port_set_access_vlan(_port_name(6), 1)
+ #print "And move %s back to a trunk port" % _port_name(6)
+ #p.port_set_mode(_port_name(6), "trunk")
+ #buf = p.port_get_mode(_port_name(6))
+ #print "Port %s is in %s mode" % (_port_name(6), buf)
+
+ # Test trunk stuff
+ print "Set %s to trunk mode" % _port_name(2)
+ p.port_set_mode(_port_name(2), "trunk")
+ print "Add %s to VLAN 2" % _port_name(2)
+ p.port_add_trunk_to_vlan(_port_name(2), 2)
+ print "Add %s to VLAN 3" % _port_name(2)
+ p.port_add_trunk_to_vlan(_port_name(2), 3)
+ print "Add %s to VLAN 4" % _port_name(2)
+ p.port_add_trunk_to_vlan(_port_name(2), 4)
+ print "Read from switch: which VLANs is %s on?" % _port_name(2)
+ buf = p.port_get_trunk_vlan_list(_port_name(2))
+ p.dump_list(buf)
+
+ print "Remove %s from VLANs 3,3,4" % _port_name(2)
+ p.port_remove_trunk_from_vlan(_port_name(2), 3)
+ p.port_remove_trunk_from_vlan(_port_name(2), 3)
+ p.port_remove_trunk_from_vlan(_port_name(2), 4)
+ print "Read from switch: which VLANs is %s on?" % _port_name(2)
+ buf = p.port_get_trunk_vlan_list(_port_name(2))
+ p.dump_list(buf)
+
+ # print "Adding lots of ports to VLANs"
+ # p.port_add_trunk_to_vlan(_port_name(1), 2)
+ # p.port_add_trunk_to_vlan(_port_name(3), 2)
+ # p.port_add_trunk_to_vlan(_port_name(5), 2)
+ # p.port_add_trunk_to_vlan(_port_name(7), 2)
+ # p.port_add_trunk_to_vlan(_port_name(9), 2)
+ # p.port_add_trunk_to_vlan(_port_name(11), 2)
+ # p.port_add_trunk_to_vlan(_port_name(13), 2)
+ # p.port_add_trunk_to_vlan(_port_name(15), 2)
+ # p.port_add_trunk_to_vlan(_port_name(17), 2)
+ # p.port_add_trunk_to_vlan(_port_name(19), 2)
+ # p.port_add_trunk_to_vlan(_port_name(21), 2)
+ # p.port_add_trunk_to_vlan(_port_name(23), 2)
+ # p.port_add_trunk_to_vlan(_port_name(4, 2)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+# print 'Restarting switch, to explicitly reset config'
+# p.switch_restart()
+
+# p.switch_save_running_config()
+# p._show_config()
diff --git a/Vland/drivers/Dummy.py b/Vland/drivers/Dummy.py
new file mode 100644
index 0000000..f630184
--- /dev/null
+++ b/Vland/drivers/Dummy.py
@@ -0,0 +1,361 @@
+#! /usr/bin/python
+
+# Copyright 2015 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import logging
+import sys
+import re
+import pickle
+import pexpect
+
+# Dummy switch driver, designed specifically for
+# testing/validation. Just remembers what it's been told and gives the
+# same data back on demand.
+#
+# To keep track of data in the dummy switch, this code will simply
+# dump out and read back its internal state to/from a Python pickle
+# file as needed. On first use, if no such file exists then the Dummy
+# driver will simply generate a simple switch model:
+#
+# * N ports in access mode
+# * 1 VLAN (tag 1) labelled DEFAULT
+#
+# The "hostname" given to the switch in VLANd is important, as it will
+# determine both the number of ports allocated in this model and the
+# name of the pickle file used for data storage. Call the switch
+# "dummy-N" in your vland.cfg file to have N ports. If you want to use
+# more than one dummy switch instance, ensure you give them different
+# numbers, e.g. "dummy-25", "dummy-48", etc.
+
+if __name__ == '__main__':
+ import os
+ vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
+ sys.path.insert(0, vlandpath)
+ sys.path.insert(0, "%s/.." % vlandpath)
+
+from errors import InputError, PExpectError
+from drivers.common import SwitchDriver, SwitchErrors
+
+class Dummy(SwitchDriver):
+
+ connection = None
+ _username = None
+ _password = None
+ _enable_password = None
+ _dummy_vlans = {}
+ _dummy_ports = {}
+ _state_file = None
+
+ _capabilities = [
+ ]
+
+ def __init__(self, switch_hostname, switch_telnetport=23, debug = False):
+ SwitchDriver.__init__(self, switch_hostname, debug)
+ self._systemdata = []
+ self.errors = SwitchErrors()
+ self._state_file = "%s.pk" % switch_hostname
+
+ ################################
+ ### Switch-level API functions
+ ################################
+
+ # Save the current running config - we want config to remain
+ # across reboots
+ def switch_save_running_config(self):
+ pass
+
+ # Restart the switch - we need to reload config to do a
+ # roll-back. Do NOT save running-config first if the switch asks -
+ # we're trying to dump recent changes, not save them.
+ def switch_restart(self):
+ pass
+
+ # List the capabilities of the switch (and driver) - some things
+ # make no sense to abstract. Returns a dict of strings, each one
+ # describing an extra feature that that higher levels may care
+ # about
+ def switch_get_capabilities(self):
+ return self._capabilities
+
+ ################################
+ ### VLAN API functions
+ ################################
+
+ # Create a VLAN with the specified tag
+ def vlan_create(self, tag):
+ logging.debug("Creating VLAN %d", tag)
+ if not tag in self._dummy_vlans:
+ self._dummy_vlans[tag] = "VLAN%s" % tag
+ else:
+ # It's not an error if it already exists, but log anyway
+ logging.debug("VLAN %d already exists, name %s",
+ tag, self._dummy_vlans[tag])
+
+ # Destroy a VLAN with the specified tag
+ def vlan_destroy(self, tag):
+ logging.debug("Destroying VLAN %d", tag)
+ if tag in self._dummy_vlans:
+ del self._dummy_vlans[tag]
+ else:
+ # It's not an error if it doesn't exist, but log anyway
+ logging.debug("VLAN %d did not exist", tag)
+
+ # Set the name of a VLAN
+ def vlan_set_name(self, tag, name):
+ logging.debug("Setting name of VLAN %d to %s", tag, name)
+ if not tag in self._dummy_vlans:
+ raise InputError("Tag %d does not exist")
+ self._dummy_vlans[tag] = "VLAN%s" % tag
+
+ # Get a list of the VLAN tags currently registered on the switch
+ def vlan_get_list(self):
+ logging.debug("Grabbing list of VLANs")
+ return sorted(self._dummy_vlans.keys())
+
+ # For a given VLAN tag, ask the switch what the associated name is
+ def vlan_get_name(self, tag):
+ logging.debug("Grabbing the name of VLAN %d", tag)
+ if not tag in self._dummy_vlans:
+ raise InputError("Tag %d does not exist")
+ return self._dummy_vlans[tag]
+
+ ################################
+ ### Port API functions
+ ################################
+
+ # Set the mode of a port: access or trunk
+ def port_set_mode(self, port, mode):
+ logging.debug("Setting port %s to %s mode", port, mode)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ self._dummy_ports[port]['mode'] = mode
+
+ # Get the mode of a port: access or trunk
+ def port_get_mode(self, port):
+ logging.debug("Getting mode of port %s", port)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ return self._dummy_ports[port]['mode']
+
+ # Set an access port to be in a specified VLAN (tag)
+ def port_set_access_vlan(self, port, tag):
+ logging.debug("Setting access port %s to VLAN %d", port, tag)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ if not tag in self._dummy_vlans:
+ raise InputError("VLAN %d does not exist" % tag)
+ self._dummy_ports[port]['access_vlan'] = tag
+
+ # Add a trunk port to a specified VLAN (tag)
+ def port_add_trunk_to_vlan(self, port, tag):
+ logging.debug("Adding trunk port %s to VLAN %d", port, tag)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ if not tag in self._dummy_vlans:
+ raise InputError("VLAN %d does not exist" % tag)
+ self._dummy_ports[port]['trunk_vlans'].append(tag)
+
+ # Remove a trunk port from a specified VLAN (tag)
+ def port_remove_trunk_from_vlan(self, port, tag):
+ logging.debug("Removing trunk port %s from VLAN %d", port, tag)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ if not tag in self._dummy_vlans:
+ raise InputError("VLAN %d does not exist" % tag)
+ self._dummy_ports[port]['trunk_vlans'].remove(tag)
+
+ # Get the configured VLAN tag for an access port (tag)
+ def port_get_access_vlan(self, port):
+ logging.debug("Getting VLAN for access port %s", port)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ return self._dummy_ports[port]['access_vlan']
+
+ # Get the list of configured VLAN tags for a trunk port
+ def port_get_trunk_vlan_list(self, port):
+ logging.debug("Getting VLAN(s) for trunk port %s", port)
+ if not port in self._dummy_ports:
+ raise InputError("Port %s does not exist" % port)
+ return sorted(self._dummy_ports[port]['trunk_vlans'])
+
+ ################################
+ ### Internal functions
+ ################################
+
+ # Connect to the switch and log in
+ def _switch_connect(self):
+ # Open data file if it exists, otherwise initialise
+ try:
+ pkl_file = open(self._state_file, 'rb')
+ self._dummy_vlans = pickle.load(pkl_file)
+ self._dummy_ports = pickle.load(pkl_file)
+ pkl_file.close()
+ except:
+ # Create data here
+ self._dummy_vlans = {1: 'DEFAULT'}
+ match = re.match(r'dummy-(\d+)', self.hostname)
+ if match:
+ num_ports = int(match.group(1))
+ else:
+ raise InputError("Unable to determine number of ports from switch name")
+ for i in range(1, num_ports+1):
+ port_name = "dm%2.2d" % int(i)
+ self._dummy_ports[port_name] = {}
+ self._dummy_ports[port_name]['mode'] = 'access'
+ self._dummy_ports[port_name]['access_vlan'] = 1
+ self._dummy_ports[port_name]['trunk_vlans'] = []
+
+ # Now build a list of our ports, for later sanity checking
+ self._ports = self._get_port_names()
+ if len(self._ports) < 4:
+ raise IOError("Not enough ports detected - problem!")
+
+ def _logout(self):
+ pkl_file = open(self._state_file, 'wb')
+ pickle.dump(self._dummy_vlans, pkl_file)
+ pickle.dump(self._dummy_ports, pkl_file)
+ pkl_file.close()
+
+ def _get_port_names(self):
+ logging.debug("Grabbing list of ports")
+ interfaces = []
+ for interface in sorted(self._dummy_ports.keys()):
+ interfaces.append(interface)
+ self._port_numbers[interface] = len(interfaces)
+ return interfaces
+
+if __name__ == "__main__":
+
+ # Simple test harness - exercise the main working functions above to verify
+ # they work. This does *NOT* test really disruptive things like "save
+ # running-config" and "reload" - test those by hand.
+
+ import optparse
+
+ switch = 'dummy-48'
+ parser = optparse.OptionParser()
+ parser.add_option("--switch",
+ dest = "switch",
+ action = "store",
+ nargs = 1,
+ type = "string",
+ help = "specify switch to connect to for testing",
+ metavar = "<switch>")
+ (opts, args) = parser.parse_args()
+ if opts.switch:
+ switch = opts.switch
+
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(levelname)-8s %(message)s')
+ p = Dummy(switch, 23, debug=True)
+ p.switch_connect('admin', '', None)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.vlan_get_name(1)
+ print "VLAN 1 is named \"%s\"" % buf
+
+ print "Create VLAN 3"
+ p.vlan_create(3)
+
+ print "Create VLAN 4"
+ p.vlan_create(4)
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "Set name of VLAN 3 to test333"
+ p.vlan_set_name(3, "test333")
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ print "Destroy VLAN 3"
+ p.vlan_destroy(3)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.port_get_mode("dm10")
+ print "Port dm10 is in %s mode" % buf
+
+ buf = p.port_get_mode("dm11")
+ print "Port dm11 is in %s mode" % buf
+
+ # Test access stuff
+ print "Set dm09 to access mode"
+ p.port_set_mode("dm09", "access")
+
+ print "Move dm9 to VLAN 4"
+ p.port_set_access_vlan("dm09", 4)
+
+ buf = p.port_get_access_vlan("dm09")
+ print "Read from switch: dm09 is on VLAN %s" % buf
+
+ print "Move dm09 back to VLAN 1"
+ p.port_set_access_vlan("dm09", 1)
+
+ print "Create VLAN 2"
+ p.vlan_create(2)
+
+ print "Create VLAN 3"
+ p.vlan_create(3)
+
+ print "Create VLAN 4"
+ p.vlan_create(4)
+
+ # Test access stuff
+ print "Set dm09 to trunk mode"
+ p.port_set_mode("dm09", "trunk")
+ print "Read from switch: which VLANs is dm09 on?"
+ buf = p.port_get_trunk_vlan_list("dm09")
+ p.dump_list(buf)
+
+ # The adds below are NOOPs in effect on this switch - no filtering
+ # for "trunk" ports
+ print "Add dm09 to VLAN 2"
+ p.port_add_trunk_to_vlan("dm09", 2)
+ print "Add dm09 to VLAN 3"
+ p.port_add_trunk_to_vlan("dm09", 3)
+ print "Add dm09 to VLAN 4"
+ p.port_add_trunk_to_vlan("dm09", 4)
+ print "Read from switch: which VLANs is dm09 on?"
+ buf = p.port_get_trunk_vlan_list("dm09")
+ p.dump_list(buf)
+
+ # And the same for removals here
+ p.port_remove_trunk_from_vlan("dm09", 3)
+ p.port_remove_trunk_from_vlan("dm09", 2)
+ p.port_remove_trunk_from_vlan("dm09", 4)
+ print "Read from switch: which VLANs is dm09 on?"
+ buf = p.port_get_trunk_vlan_list("dm09")
+ p.dump_list(buf)
+
+ print 'Restarting switch, to explicitly reset config'
+ p.switch_restart()
+
+ p.switch_save_running_config()
+
+ p.switch_disconnect()
diff --git a/Vland/drivers/Mellanox.py b/Vland/drivers/Mellanox.py
new file mode 100644
index 0000000..ea74bf0
--- /dev/null
+++ b/Vland/drivers/Mellanox.py
@@ -0,0 +1,795 @@
+#! /usr/bin/python
+
+# Copyright 2018 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import logging
+import sys
+import re
+import pexpect
+
+# Mellanox MLNX-OS driver
+# Developed and tested against the SN2100 in the Linaro LAVA lab
+
+if __name__ == '__main__':
+ import os
+ vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
+ sys.path.insert(0, vlandpath)
+ sys.path.insert(0, "%s/.." % vlandpath)
+
+from errors import InputError, PExpectError
+from drivers.common import SwitchDriver, SwitchErrors
+
+class Mellanox(SwitchDriver):
+
+ connection = None
+ _username = None
+ _password = None
+ _enable_password = None
+
+ _capabilities = [
+ 'TrunkWildCardVlans' # Trunk ports are on all VLANs by
+ # default, so we shouldn't need to
+ # bugger with them
+ ]
+
+ # Regexp of expected hardware information - fail if we don't see
+ # this
+ _expected_descr_re = re.compile(r'MLNX-OS')
+
+ def __init__(self, switch_hostname, switch_telnetport=23, debug = False):
+ SwitchDriver.__init__(self, switch_hostname, debug)
+ self._systemdata = []
+ self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
+ self.errors = SwitchErrors()
+
+ ################################
+ ### Switch-level API functions
+ ################################
+
+ # Save the current running config into flash - we want config to
+ # remain across reboots
+ def switch_save_running_config(self):
+ try:
+ self._configure()
+ self._cli("configuration write")
+ self._end_configure()
+ except (PExpectError, pexpect.EOF):
+ # recurse on error
+ self._switch_connect()
+ self.switch_save_running_config()
+
+ # Restart the switch - we need to reload config to do a
+ # roll-back. Do NOT save running-config first if the switch asks -
+ # we're trying to dump recent changes, not save them.
+ #
+ # This will also implicitly cause a connection to be closed
+ def switch_restart(self):
+ self._cli("reload noconfirm")
+
+ # List the capabilities of the switch (and driver) - some things
+ # make no sense to abstract. Returns a dict of strings, each one
+ # describing an extra feature that that higher levels may care
+ # about
+ def switch_get_capabilities(self):
+ return self._capabilities
+
+ ################################
+ ### VLAN API functions
+ ################################
+
+ # Create a VLAN with the specified tag
+ def vlan_create(self, tag):
+ logging.debug("Creating VLAN %d", tag)
+ try:
+ self._configure()
+ self._cli("vlan %d" % tag)
+ self._end_vlan()
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ return
+ raise IOError("Failed to create VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_create(tag)
+
+ # Destroy a VLAN with the specified tag
+ def vlan_destroy(self, tag):
+ logging.debug("Destroying VLAN %d", tag)
+
+ try:
+ self._configure()
+ self._cli("no vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ raise IOError("Failed to destroy VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_destroy(tag)
+
+ # Set the name of a VLAN
+ def vlan_set_name(self, tag, name):
+ logging.debug("Setting name of VLAN %d to %s", tag, name)
+
+ try:
+ self._configure()
+ self._cli("vlan %d name %s" % (tag, name))
+ self._end_configure()
+
+ # This switch *might* have problems if we drive it too quickly? At
+ # least one instance of set_name()/get_name() not working. This
+ # might help?
+ self._delay()
+
+ # And retry around here
+ retries = 5
+ read_name = None
+ while (retries > 0 and read_name is None):
+ # Validate it happened
+ read_name = self.vlan_get_name(tag)
+ retries -= 1
+
+ if read_name != name:
+ raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
+ % (tag, read_name, name))
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_set_name(tag, name)
+
+ # Get a list of the VLAN tags currently registered on the switch
+ def vlan_get_list(self):
+ logging.debug("Grabbing list of VLANs")
+
+ try:
+ vlans = []
+
+ regex = re.compile(r'^ *(\d+)')
+
+ self._cli("show vlan")
+ for line in self._read_long_output("show vlan"):
+ match = regex.match(line)
+ if match:
+ vlans.append(int(match.group(1)))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_list()
+
+ # For a given VLAN tag, ask the switch what the associated name is
+ def vlan_get_name(self, tag):
+
+ try:
+ logging.debug("Grabbing the name of VLAN %d", tag)
+ name = None
+
+ # Ugh, the output here is messy. VLAN names can include spaces, and
+ # there are no delimiters in the output, e.g.:
+ # VLAN Name Ports
+ # ---- ----------- --------------------------------------
+ # 1 default Eth1/1/1, Eth1/1/2, Eth1/2, Eth1/3/1, Eth1/3/2,
+ # Eth1/4, Eth1/5, Eth1/6, Eth1/7, Eth1/8,
+ # Eth1/10, Eth1/12, Eth1/13, Eth1/14, Eth1/15,
+ # Eth1/16
+ # 102 mdev testing
+ # 103 vpp 1 performance testing Eth1/1/3, Eth1/9
+ # 104 vpp 2 performance testing Eth1/1/4, Eth1/11
+ #
+ # Simplest strategy:
+ # 1. Match on a leading number and grab all the text after it
+ # 2. Drop anything starting with "Eth" to EOL
+ # 3. Strip leading and trailing whitespace
+ #
+ # Not perfect, but it'll have to do. Anybody including "Eth" in a
+ # VLAN name deserves to lose...
+
+ regex = re.compile(r'^ *\d+\s+(.+)')
+ self._cli("show vlan id %d" % tag)
+ for line in self._read_long_output("show vlan id"):
+ match = regex.match(line)
+ if match:
+ name = re.sub(r'Eth.*$',"",match.group(1)).strip()
+ if name is None:
+ logging.debug("vlan_get_name: did not find a name")
+ return name
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_name(tag)
+
+
+ ################################
+ ### Port API functions
+ ################################
+
+ # Set the mode of a port: access or trunk
+ def port_set_mode(self, port, mode):
+ logging.debug("Setting port %s to %s", port, mode)
+ if not self._is_port_mode_valid(mode):
+ raise InputError("Port mode %s is not allowed" % mode)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+
+ try:
+ self._configure()
+ self._cli("interface ethernet %s" % port)
+ self._cli("switchport mode %s" % mode)
+ if mode == "trunk":
+ # Put the new trunk port on all VLANs
+ self._cli("switchport trunk allowed-vlan all")
+ self._end_interface()
+ self._end_configure()
+
+ # Validate it happened
+ read_mode = self._port_get_mode(port)
+ if read_mode != mode:
+ raise IOError("Failed to set mode for port %s" % port)
+
+ # And cache the result
+ self._port_modes[port] = mode
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_mode(port, mode)
+
+ # Set an access port to be in a specified VLAN (tag)
+ def port_set_access_vlan(self, port, tag):
+ logging.debug("Setting access port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface ethernet %s" % port)
+ self._cli("switchport access vlan %d" % tag)
+ self._end_interface()
+ self._end_configure()
+
+ # Validate things worked
+ read_vlan = int(self.port_get_access_vlan(port))
+ if read_vlan != tag:
+ raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead"
+ % (port, tag, read_vlan))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_access_vlan(port, tag)
+
+ # Add a trunk port to a specified VLAN (tag)
+ def port_add_trunk_to_vlan(self, port, tag):
+ logging.debug("Adding trunk port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface ethernet %s" % port)
+ self._cli("switchport trunk allowed-vlan add %d" % tag)
+ self._end_interface()
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag or vlan == "ALL":
+ return
+ raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_add_trunk_to_vlan(port, tag)
+
+ # Remove a trunk port from a specified VLAN (tag)
+ def port_remove_trunk_from_vlan(self, port, tag):
+ logging.debug("Removing trunk port %s from VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface ethernet %s" % port)
+ self._cli("switchport trunk allowed-vlan remove %d" % tag)
+ self._end_interface()
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag:
+ raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_remove_trunk_from_vlan(port, tag)
+
+ # Get the configured VLAN tag for an access port (tag)
+ def port_get_access_vlan(self, port):
+ logging.debug("Getting VLAN for access port %s", port)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+
+ regex = re.compile(r'^Eth%s\s+access\s+(\d+)' % port)
+
+ try:
+ self._cli("show interfaces switchport")
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ vlan = match.group(1)
+ return int(vlan)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_access_vlan(port)
+
+ # Get the list of configured VLAN tags for a trunk port
+ def port_get_trunk_vlan_list(self, port):
+ logging.debug("Getting VLANs for trunk port %s", port)
+ vlans = []
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+ regex_start = re.compile(r'^Eth%s\s+trunk\s+N/A\s+(.*)' % port)
+ regex_continue = re.compile(r'^(\d.*)')
+
+ try:
+ self._cli("show interfaces switchport")
+
+ # Complex parsing work - VLAN list may extend over several lines, e.g.:
+ #
+ # Eth1/16 trunk N/A 1, 102, 103, 104, 1000, 1001, 1002
+ # 1003, 1004
+ #
+ in_match = False
+ vlan_text = ''
+
+ for line in self._read_long_output("show interfaces switchport"):
+ if in_match:
+ match = regex_continue.match(line)
+ if match:
+ vlan_text += ', ' # Make a consistently-formed list
+ vlan_text += match.group(1)
+ else:
+ in_match = False
+ if not in_match:
+ match = regex_start.match(line)
+ if match:
+ vlan_text += match.group(1)
+ in_match = True
+
+ vlans = self._parse_vlan_list(vlan_text)
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_trunk_vlan_list(port)
+
+ ################################
+ ### Internal functions
+ ################################
+
+ # Connect to the switch and log in
+ def _switch_connect(self):
+
+ if not self.connection is None:
+ self.connection.close(True)
+ self.connection = None
+
+ logging.debug("Connecting to Switch with: %s", self.exec_string)
+ self.connection = pexpect.spawn(self.exec_string, logfile=self.logger)
+ self._login()
+
+ # Avoid paged output as much as possible
+ self._cli("terminal length 999")
+ # Don't do silly things with ANSI codes
+ self._cli("terminal type dumb")
+ # and disable auto-logout after delay
+ self._cli("no cli session auto-logout")
+
+ # And grab details about the switch. in case we need it
+ self._get_systemdata()
+
+ # And also validate them - make sure we're driving a switch of
+ # the correct model! Also store the serial number
+ descr_regex = re.compile(r'Product name:\s*(\S+)')
+ sn_regex = re.compile(r'System serial num:\s*(\S+)')
+ descr = ""
+
+ for line in self._systemdata:
+ match = descr_regex.match(line)
+ if match:
+ descr = match.group(1)
+ match = sn_regex.match(line)
+ if match:
+ self.serial_number = match.group(1)
+
+ logging.debug("serial number is %s", self.serial_number)
+ logging.debug("system description is %s", descr)
+
+ if not self._expected_descr_re.match(descr):
+ raise IOError("Switch %s not recognised by this driver: abort" % descr)
+
+ # Now build a list of our ports, for later sanity checking
+ self._ports = self._get_port_names()
+ if len(self._ports) < 4:
+ raise IOError("Not enough ports detected - problem!")
+
+ def _login(self):
+ logging.debug("attempting login with username %s, password %s", self._username, self._password)
+ if self._username is not None:
+ self.connection.expect("login:")
+ self._cli("%s" % self._username)
+ if self._password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._password, False)
+ while True:
+ index = self.connection.expect(['User:', 'Password:', 'Login incorrect', 'authentication failed', r'(.*?)(#|>)'])
+ if index != 4: # Any other means: failed to log in!
+ logging.error("Login failure: index %d\n", index)
+ logging.error("Login failure: %s\n", self.connection.match.before)
+ raise IOError
+
+ # else
+
+ # Add a couple of newlines to get past the "last login" etc. junk
+ self._cli("")
+ self._cli("")
+ self.connection.expect(r'^(.*?) (#|>)')
+ self._prompt_name = re.escape(self.connection.match.group(1).strip())
+ logging.info("Got outer prompt \"%s\"", self._prompt_name)
+ if self.connection.match.group(2) == ">":
+ # Need to enter "enable" mode too
+ self._cli("enable")
+ if self._enable_password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._enable_password, False)
+ index = self.connection.expect(['Password:', 'Login incorrect', 'authentication failed', r'(.*) *(#|>)'])
+ if index != 3: # Any other means: failed to log in!
+ logging.error("Enable password failure: %s\n", self.connection.match)
+ raise IOError
+ self._cli("")
+ self._cli("")
+ self.connection.expect(r'^(.*?) (#|>)')
+ self._prompt_name = re.escape(self.connection.match.group(1).strip())
+ logging.info("Got enable prompt \"%s\"", self._prompt_name)
+ return 0
+
+ def _logout(self):
+ logging.debug("Logging out")
+ self._cli("quit", False)
+ try:
+ self.connection.expect("Would you like to save them now")
+ self._cli("n")
+ except (pexpect.EOF):
+ pass
+ self.connection.close(True)
+
+ def _configure(self):
+ self._cli("configure terminal")
+
+ def _end_configure(self):
+ self._cli("exit")
+
+ def _end_interface(self):
+ self._cli("exit")
+
+ def _end_vlan(self):
+ self._cli("exit")
+
+ def _read_long_output(self, text):
+ longbuf = []
+ prompt = self._prompt_name + r'\s*#'
+ while True:
+ try:
+ index = self.connection.expect([r'lines \d+-\d+', prompt])
+ if index == 0: # "lines 45-50"
+ for line in self.connection.before.split('\r\n'):
+ longbuf.append(line.strip())
+ self._cli(' ', False)
+ elif index == 1: # Back to a prompt, says output is finished
+ break
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_in(text)
+ raise PExpectError("_read_long_output failed")
+ except:
+ logging.error("prompt is \"%s\"", prompt)
+ raise
+
+ for line in self.connection.before.split('\r\n'):
+ longbuf.append(line.strip())
+ return longbuf
+
+ def _get_port_names(self):
+ logging.debug("Grabbing list of ports")
+ interfaces = []
+
+ # Look for "Eth1" at the beginning of the output lines to just
+ # match lines with interfaces - they have names like
+ # "Eth1/15". We do not care about Link Aggregation Groups (lag)
+ # here.
+ regex = re.compile(r'^Eth(\S+)')
+
+ try:
+ self._cli("show interfaces switchport")
+ for line in self._read_long_output("show interfaces switchport"):
+ match = regex.match(line)
+ if match:
+ interface = match.group(1)
+ interfaces.append(interface)
+ self._port_numbers[interface] = len(interfaces)
+ logging.debug(" found %d ports on the switch", len(interfaces))
+ return interfaces
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_port_names()
+
+ # Get the mode of a port: access or trunk
+ def _port_get_mode(self, port):
+ logging.debug("Getting mode of port %s", port)
+ mode = ''
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ regex = re.compile('Switchport mode: (.*)')
+
+ try:
+ self._cli("show interfaces ethernet %s" % port)
+ for line in self._read_long_output("show interfaces ethernet"):
+ match = regex.match(line)
+ if match:
+ mode = match.group(1)
+ if mode == 'access':
+ return 'access'
+ if mode == 'trunk':
+ return 'trunk'
+ return mode
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_mode(port)
+
+ def _show_config(self):
+ logging.debug("Grabbing config")
+ try:
+ self._cli("show running-config")
+ return self._read_long_output("show running-config")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_config()
+
+ def _show_clock(self):
+ logging.debug("Grabbing time")
+ try:
+ self._cli("show clock")
+ return self._read_long_output("show clock")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_clock()
+
+ def _get_systemdata(self):
+ logging.debug("Grabbing system sw and hw versions")
+
+ try:
+ self._systemdata = []
+ self._cli("show version")
+ for line in self._read_long_output("show version"):
+ self._systemdata.append(line)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_systemdata()
+
+ # Borrowed from the Catalyst driver. Over-complex for our needs here, but
+ # it's already tested and will do the job.
+ def _parse_vlan_list(self, inputdata):
+ vlans = []
+
+ if inputdata == "ALL":
+ return ["ALL"]
+ elif inputdata == "NONE":
+ return []
+ elif inputdata == "":
+ return []
+ else:
+ # Parse the complex list
+ groups = inputdata.split(',')
+ for group in groups:
+ subgroups = group.split('-')
+ if len(subgroups) == 1:
+ vlans.append(int(subgroups[0]))
+ elif len(subgroups) == 2:
+ for i in range (int(subgroups[0]), int(subgroups[1]) + 1):
+ vlans.append(i)
+ else:
+ logging.debug("Can't parse group \"" + group + "\"")
+
+ return vlans
+
+ # Wrapper around connection.send - by default, expect() the same
+ # text we've sent, to remove it from the output from the
+ # switch. For the few cases where we don't need that, override
+ # this using echo=False.
+ # Horrible, but seems to work.
+ def _cli(self, text, echo=True):
+ self.connection.send(text + '\r')
+ if echo:
+ try:
+ self.connection.expect(text)
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_out(text)
+ raise PExpectError("_cli failed on %s" % text)
+ except:
+ logging.error("Unexpected error: %s", sys.exc_info()[0])
+ raise
+
+if __name__ == "__main__":
+
+ # Simple test harness - exercise the main working functions above to verify
+ # they work. This does *NOT* test really disruptive things like "save
+ # running-config" and "reload" - test those by hand.
+
+ import optparse
+
+ switch = '172.27.16.6'
+ parser = optparse.OptionParser()
+ parser.add_option("--switch",
+ dest = "switch",
+ action = "store",
+ nargs = 1,
+ type = "string",
+ help = "specify switch to connect to for testing",
+ metavar = "<switch>")
+ (opts, args) = parser.parse_args()
+ if opts.switch:
+ switch = opts.switch
+
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(levelname)-8s %(message)s')
+ p = MlnxOS(switch, 23, debug=False)
+ p.switch_connect('admin', 'admin', None)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.vlan_get_name(102)
+ print "VLAN 102 is named \"%s\"" % buf
+
+ print "Create VLAN 1003"
+ p.vlan_create(1003)
+
+ buf = p.vlan_get_name(1003)
+ print "VLAN 1003 is named \"%s\"" % buf
+
+ print "Set name of VLAN 1003 to test333"
+ p.vlan_set_name(1003, "test333")
+
+ buf = p.vlan_get_name(1003)
+ print "VLAN 1003 is named \"%s\"" % buf
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ print "Destroy VLAN 1003"
+ p.vlan_destroy(1003)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.port_get_mode("1/15")
+ print "Port 1/15 is in %s mode" % buf
+
+ buf = p.port_get_mode("1/16")
+ print "Port 1/16 is in %s mode" % buf
+
+ # Test access stuff
+ print "Set 1/15 to access mode"
+ p.port_set_mode("1/15", "access")
+
+ print "Move 1/15 to VLAN 4"
+ p.port_set_access_vlan("1/15", 4)
+
+ buf = p.port_get_access_vlan("1/15")
+ print "Read from switch: 1/15 is on VLAN %s" % buf
+
+ print "Move 1/15 back to VLAN 1"
+ p.port_set_access_vlan("1/15", 1)
+
+ print "Create VLAN 1002"
+ p.vlan_create(1002)
+
+ print "Create VLAN 1003"
+ p.vlan_create(1003)
+
+ print "Create VLAN 1004"
+ p.vlan_create(1004)
+
+ # Test access stuff
+ print "Set 1/15 to trunk mode"
+ p.port_set_mode("1/15", "trunk")
+ print "Read from switch: which VLANs is 1/15 on?"
+ buf = p.port_get_trunk_vlan_list("1/15")
+ p.dump_list(buf)
+
+ # The adds below are NOOPs in effect on this switch - no filtering
+ # for "trunk" ports
+ print "Add 1/15 to VLAN 1002"
+ p.port_add_trunk_to_vlan("1/15", 1002)
+ print "Add 1/15 to VLAN 1003"
+ p.port_add_trunk_to_vlan("1/15", 1003)
+ print "Add 1/15 to VLAN 1004"
+ p.port_add_trunk_to_vlan("1/15", 1004)
+ print "Read from switch: which VLANs is 1/15 on?"
+ buf = p.port_get_trunk_vlan_list("1/15")
+ p.dump_list(buf)
+
+ # And the same for removals here
+ p.port_remove_trunk_from_vlan("1/15", 1003)
+ p.port_remove_trunk_from_vlan("1/15", 1002)
+ p.port_remove_trunk_from_vlan("1/15", 1004)
+ print "Read from switch: which VLANs is 1/15 on?"
+ buf = p.port_get_trunk_vlan_list("1/15")
+ p.dump_list(buf)
+
+# print 'Restarting switch, to explicitly reset config'
+# p.switch_restart()
+
+# p.switch_save_running_config()
+
+# p.switch_disconnect()
+# p._show_config()
diff --git a/Vland/drivers/NetgearXSM.py b/Vland/drivers/NetgearXSM.py
new file mode 100644
index 0000000..f8ba8a5
--- /dev/null
+++ b/Vland/drivers/NetgearXSM.py
@@ -0,0 +1,782 @@
+#! /usr/bin/python
+
+# Copyright 2015-2018 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import logging
+import sys
+import re
+import pexpect
+
+# Netgear XSM family driver
+# Developed and tested against the XSM7224S in the Linaro LAVA lab
+
+if __name__ == '__main__':
+ import os
+ vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
+ sys.path.insert(0, vlandpath)
+ sys.path.insert(0, "%s/.." % vlandpath)
+
+from errors import InputError, PExpectError
+from drivers.common import SwitchDriver, SwitchErrors
+
+class NetgearXSM(SwitchDriver):
+
+ connection = None
+ _username = None
+ _password = None
+ _enable_password = None
+
+ _capabilities = [
+ ]
+
+ # Regexps of expected hardware information - fail if we don't see
+ # this
+ _expected_manuf = re.compile('^Netgear')
+ _expected_model = re.compile('^XSM')
+
+ def __init__(self, switch_hostname, switch_telnetport=23, debug = False):
+ SwitchDriver.__init__(self, switch_hostname, debug)
+ self._systemdata = []
+ self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
+ self.errors = SwitchErrors()
+
+ ################################
+ ### Switch-level API functions
+ ################################
+
+ # Save the current running config into flash - we want config to
+ # remain across reboots
+ def switch_save_running_config(self):
+ try:
+ self._cli("save")
+ self.connection.expect("Are you sure")
+ self._cli("y")
+ self.connection.expect("Configuration Saved!")
+ except (PExpectError, pexpect.EOF):
+ # recurse on error
+ self._switch_connect()
+ self.switch_save_running_config()
+
+ # Restart the switch - we need to reload config to do a
+ # roll-back. Do NOT save running-config first if the switch asks -
+ # we're trying to dump recent changes, not save them.
+ #
+ # This will also implicitly cause a connection to be closed
+ def switch_restart(self):
+ self._cli("reload")
+ self.connection.expect('Are you sure')
+ self._cli("y") # Yes, continue to reset
+ self.connection.close(True)
+
+ # List the capabilities of the switch (and driver) - some things
+ # make no sense to abstract. Returns a dict of strings, each one
+ # describing an extra feature that that higher levels may care
+ # about
+ def switch_get_capabilities(self):
+ return self._capabilities
+
+ ################################
+ ### VLAN API functions
+ ################################
+
+ # Create a VLAN with the specified tag
+ def vlan_create(self, tag):
+ logging.debug("Creating VLAN %d", tag)
+ try:
+ self._cli("vlan database")
+ self._cli("vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ return
+ raise IOError("Failed to create VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_create(tag)
+
+ # Destroy a VLAN with the specified tag
+ def vlan_destroy(self, tag):
+ logging.debug("Destroying VLAN %d", tag)
+
+ try:
+ self._cli("vlan database")
+ self._cli("no vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ raise IOError("Failed to destroy VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_destroy(tag)
+
+ # Set the name of a VLAN
+ def vlan_set_name(self, tag, name):
+ logging.debug("Setting name of VLAN %d to %s", tag, name)
+
+ try:
+ self._cli("vlan database")
+ self._cli("vlan name %d %s" % (tag, name))
+ self._end_configure()
+
+ # Validate it happened
+ read_name = self.vlan_get_name(tag)
+ if read_name != name:
+ raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
+ % (tag, read_name, name))
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_set_name(tag, name)
+
+ # Get a list of the VLAN tags currently registered on the switch
+ def vlan_get_list(self):
+ logging.debug("Grabbing list of VLANs")
+
+ try:
+ vlans = []
+
+ regex = re.compile(r'^ *(\d+).*(Static)')
+
+ self._cli("show vlan brief")
+ for line in self._read_long_output("show vlan brief"):
+ match = regex.match(line)
+ if match:
+ vlans.append(int(match.group(1)))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_list()
+
+ # For a given VLAN tag, ask the switch what the associated name is
+ def vlan_get_name(self, tag):
+
+ try:
+ logging.debug("Grabbing the name of VLAN %d", tag)
+ name = None
+ regex = re.compile('VLAN Name: (.*)')
+ self._cli("show vlan %d" % tag)
+ for line in self._read_long_output("show vlan"):
+ match = regex.match(line)
+ if match:
+ name = match.group(1)
+ name.strip()
+ return name
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_name(tag)
+
+ ################################
+ ### Port API functions
+ ################################
+
+ # Set the mode of a port: access or trunk
+ def port_set_mode(self, port, mode):
+ logging.debug("Setting port %s to %s mode", port, mode)
+ if not self._is_port_mode_valid(mode):
+ raise InputError("Port mode %s is not allowed" % mode)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+
+ # This switch does not support specific modes, so we can't
+ # actually change the mode directly. However, we can and
+ # should deal with the PVID and memberships of existing VLANs
+ # etc.
+
+ try:
+ if mode == "trunk":
+ # We define a trunk port thus:
+ # * accept all frames on ingress
+ # * accept packets for all VLANs (no ingress filter)
+ # * tags frames on transmission (do that later when
+ # * adding VLANs to the port)
+ # * PVID should match the default VLAN (1).
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("vlan acceptframe all")
+ self._cli("no vlan ingressfilter")
+ self._cli("vlan pvid 1")
+ self._end_interface()
+ self._end_configure()
+
+ # We define an access port thus:
+ # * accept only untagged frames on ingress
+ # * accept packets for only desired VLANs (ingress filter)
+ # * exists on one VLAN only (1 by default)
+ # * do not tag frames on transmission (the devices
+ # we're talking to are expecting untagged frames)
+ # * PVID should match the VLAN it's on (1 by default,
+ # but don't do that here)
+ if mode == "access":
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("vlan acceptframe admituntaggedonly")
+ self._cli("vlan ingressfilter")
+ self._cli("no vlan tagging 1-1023")
+ self._cli("no vlan tagging 1024-2047")
+ self._cli("no vlan tagging 2048-3071")
+ self._cli("no vlan tagging 3072-4093")
+ self._end_interface()
+ self._end_configure()
+
+ # Validate it happened
+ read_mode = self._port_get_mode(port)
+
+ if read_mode != mode:
+ raise IOError("Failed to set mode for port %s" % port)
+
+ # And cache the result
+ self._port_modes[port] = mode
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_mode(port, mode)
+
+ # Set an access port to be in a specified VLAN (tag)
+ def port_set_access_vlan(self, port, tag):
+ logging.debug("Setting access port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+
+ try:
+ current_vlans = self._get_port_vlans(port)
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("vlan pvid %s" % tag)
+ # Find the list of VLANs we're currently on, and drop them
+ # all. "auto" mode is fine here, we won't be included
+ # unless we have GVRP configured, and we don't do
+ # that.
+ for current_vlan in current_vlans:
+ self._cli("vlan participation auto %s" % current_vlan)
+ # Now specifically include the VLAN we want
+ self._cli("vlan participation include %s" % tag)
+ self._cli("no shutdown")
+ self._end_interface()
+ self._end_configure()
+
+ # Finally, validate things worked
+ read_vlan = int(self.port_get_access_vlan(port))
+ if read_vlan != tag:
+ raise IOError("Failed to move access port %d to VLAN %d - got VLAN %d instead"
+ % (port, tag, read_vlan))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_access_vlan(port, tag)
+
+ # Add a trunk port to a specified VLAN (tag)
+ def port_add_trunk_to_vlan(self, port, tag):
+ logging.debug("Adding trunk port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("vlan participation include %d" % tag)
+ self._cli("vlan tagging %d" % tag)
+ self._end_interface()
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag or vlan == "ALL":
+ return
+ raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_add_trunk_to_vlan(port, tag)
+
+ # Remove a trunk port from a specified VLAN (tag)
+ def port_remove_trunk_from_vlan(self, port, tag):
+ logging.debug("Removing trunk port %s from VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % port)
+ self._cli("vlan participation auto %d" % tag)
+ self._cli("no vlan tagging %d" % tag)
+ self._end_interface()
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag:
+ raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_remove_trunk_from_vlan(port, tag)
+
+ # Get the configured VLAN tag for an access port (tag)
+ def port_get_access_vlan(self, port):
+ logging.debug("Getting VLAN for access port %s", port)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "access"):
+ raise InputError("Port %s not in access mode" % port)
+ vlans = self._get_port_vlans(port)
+ if (len(vlans) > 1):
+ raise IOError("More than one VLAN on access port %s" % port)
+ return vlans[0]
+
+ # Get the list of configured VLAN tags for a trunk port
+ def port_get_trunk_vlan_list(self, port):
+ logging.debug("Getting VLAN(s) for trunk port %s", port)
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if not (self.port_get_mode(port) == "trunk"):
+ raise InputError("Port %s not in trunk mode" % port)
+ return self._get_port_vlans(port)
+
+ ################################
+ ### Internal functions
+ ################################
+
+ # Connect to the switch and log in
+ def _switch_connect(self):
+
+ if not self.connection is None:
+ self.connection.close(True)
+ self.connection = None
+
+ logging.debug("Connecting to Switch with: %s", self.exec_string)
+ self.connection = pexpect.spawn(self.exec_string, logfile=self.logger)
+ self._login()
+
+ # Avoid paged output
+ self._cli("terminal length 0")
+
+ # And grab details about the switch. in case we need it
+ self._get_systemdata()
+
+ # And also validate them - make sure we're driving a switch of
+ # the correct model! Also store the serial number
+ manuf_regex = re.compile(r'^Manufacturer([\.\s])+(\S+)')
+ model_regex = re.compile(r'^Machine Model([\.\s])+(\S+)')
+ sn_regex = re.compile(r'^Serial Number([\.\s])+(\S+)')
+
+ for line in self._systemdata:
+ match1 = manuf_regex.match(line)
+ if match1:
+ manuf = match1.group(2)
+
+ match2 = model_regex.match(line)
+ if match2:
+ model = match2.group(2)
+
+ match3 = sn_regex.match(line)
+ if match3:
+ self.serial_number = match3.group(2)
+
+ logging.debug("manufacturer is %s", manuf)
+ logging.debug("model is %s", model)
+ logging.debug("serial number is %s", self.serial_number)
+
+ if not (self._expected_manuf.match(manuf) and self._expected_model.match(model)):
+ raise IOError("Switch %s %s not recognised by this driver: abort" % (manuf, model))
+
+ # Now build a list of our ports, for later sanity checking
+ self._ports = self._get_port_names()
+ if len(self._ports) < 4:
+ raise IOError("Not enough ports detected - problem!")
+
+ def _login(self):
+ logging.debug("attempting login with username %s, password %s", self._username, self._password)
+ if self._username is not None:
+ self.connection.expect("User:")
+ self._cli("%s" % self._username)
+ if self._password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._password, False)
+ while True:
+ index = self.connection.expect(['User:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
+ if index != 4: # Any other means: failed to log in!
+ logging.error("Login failure: index %d\n", index)
+ logging.error("Login failure: %s\n", self.connection.match.before)
+ raise IOError
+
+ # else
+ self._prompt_name = re.escape(self.connection.match.group(1).strip())
+ if self.connection.match.group(2) == ">":
+ # Need to enter "enable" mode too
+ self._cli("enable")
+ if self._enable_password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._enable_password, False)
+ index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*) *(#|>)'])
+ if index != 3: # Any other means: failed to log in!
+ logging.error("Enable password failure: %s\n", self.connection.match)
+ raise IOError
+ return 0
+
+ def _logout(self):
+ logging.debug("Logging out")
+ self._cli("quit", False)
+ try:
+ self.connection.expect("Would you like to save them now")
+ self._cli("n")
+ except (pexpect.EOF):
+ pass
+ self.connection.close(True)
+
+ def _configure(self):
+ self._cli("configure")
+
+ def _end_configure(self):
+ self._cli("exit")
+
+ def _end_interface(self):
+ self._end_configure()
+
+ def _read_long_output(self, text):
+ longbuf = []
+ prompt = self._prompt_name + r'\s*#'
+ while True:
+ try:
+ index = self.connection.expect(['--More--', prompt])
+ if index == 0: # "--More-- or (q)uit"
+ for line in self.connection.before.split('\r\n'):
+ line1 = re.sub('(\x08|\x0D)*', '', line.strip())
+ longbuf.append(line1)
+ self._cli(' ', False)
+ elif index == 1: # Back to a prompt, says output is finished
+ break
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_in(text)
+ raise PExpectError("_read_long_output failed")
+ except:
+ logging.error("prompt is \"%s\"", prompt)
+ raise
+
+ for line in self.connection.before.split('\r\n'):
+ longbuf.append(line.strip())
+ return longbuf
+
+ def _get_port_names(self):
+ logging.debug("Grabbing list of ports")
+ interfaces = []
+
+ # Look for "1" at the beginning of the output lines to just
+ # match lines with interfaces - they have names like
+ # "1/0/22". We do not care about Link Aggregation Groups (lag)
+ # here.
+ regex = re.compile(r'^(1\S+)')
+
+ try:
+ self._cli("show port all")
+ for line in self._read_long_output("show port all"):
+ match = regex.match(line)
+ if match:
+ interface = match.group(1)
+ interfaces.append(interface)
+ self._port_numbers[interface] = len(interfaces)
+ logging.debug(" found %d ports on the switch", len(interfaces))
+ return interfaces
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_port_names()
+
+ # Get the mode of a port: access or trunk
+ def _port_get_mode(self, port):
+ logging.debug("Getting mode of port %s", port)
+
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ acceptframe_re = re.compile('vlan acceptframe (.*)')
+ ingress_re = re.compile('vlan ingressfilter')
+
+ acceptframe = None
+ ingressfilter = True
+
+ try:
+ self._cli("show running-config interface %s" % port)
+ for line in self._read_long_output("show running-config interface"):
+
+ match = acceptframe_re.match(line)
+ if match:
+ acceptframe = match.group(1)
+
+ match = ingress_re.match(line)
+ if match:
+ ingressfilter = True
+
+ # Simple classifier for now; may need to revisit later...
+ if (ingressfilter and acceptframe == "admituntaggedonly"):
+ return "access"
+ else:
+ return "trunk"
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_mode(port)
+
+ def _show_config(self):
+ logging.debug("Grabbing config")
+ try:
+ self._cli("show running-config")
+ return self._read_long_output("show running-config")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_config()
+
+ def _show_clock(self):
+ logging.debug("Grabbing time")
+ try:
+ self._cli("show clock")
+ return self._read_long_output("show clock")
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._show_clock()
+
+ def _get_systemdata(self):
+ logging.debug("Grabbing system sw and hw versions")
+
+ try:
+ self._systemdata = []
+ self._cli("show version")
+ for line in self._read_long_output("show version"):
+ self._systemdata.append(line)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_systemdata()
+
+ def _parse_vlan_list(self, inputdata):
+ vlans = []
+
+ if inputdata == "ALL":
+ return ["ALL"]
+ elif inputdata == "NONE":
+ return []
+ else:
+ # Parse the complex list
+ groups = inputdata.split(',')
+ for group in groups:
+ subgroups = group.split('-')
+ if len(subgroups) == 1:
+ vlans.append(int(subgroups[0]))
+ elif len(subgroups) == 2:
+ for i in range (int(subgroups[0]), int(subgroups[1]) + 1):
+ vlans.append(i)
+ else:
+ logging.debug("Can't parse group \"" + group + "\"")
+
+ return vlans
+
+ def _get_port_vlans(self, port):
+ vlan_text = None
+
+ vlan_part_re = re.compile('vlan participation include (.*)')
+
+ try:
+ self._cli("show running-config interface %s" % port)
+ for line in self._read_long_output("show running-config interface"):
+ match = vlan_part_re.match(line)
+ if match:
+ if vlan_text != None:
+ vlan_text += ","
+ vlan_text += (match.group(1))
+ else:
+ vlan_text = match.group(1)
+
+ if vlan_text is None:
+ return [1]
+ else:
+ vlans = self._parse_vlan_list(vlan_text)
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_port_vlans(port)
+
+ # Wrapper around connection.send - by default, expect() the same
+ # text we've sent, to remove it from the output from the
+ # switch. For the few cases where we don't need that, override
+ # this using echo=False.
+ # Horrible, but seems to work.
+ def _cli(self, text, echo=True):
+ self.connection.send(text + '\r')
+ if echo:
+ try:
+ self.connection.expect(text)
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_out(text)
+ raise PExpectError("_cli failed on %s" % text)
+ except:
+ logging.error("Unexpected error: %s", sys.exc_info()[0])
+ raise
+
+if __name__ == "__main__":
+
+ # Simple test harness - exercise the main working functions above to verify
+ # they work. This does *NOT* test really disruptive things like "save
+ # running-config" and "reload" - test those by hand.
+
+ import optparse
+
+ switch = 'vlandswitch05'
+ parser = optparse.OptionParser()
+ parser.add_option("--switch",
+ dest = "switch",
+ action = "store",
+ nargs = 1,
+ type = "string",
+ help = "specify switch to connect to for testing",
+ metavar = "<switch>")
+ (opts, args) = parser.parse_args()
+ if opts.switch:
+ switch = opts.switch
+
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(levelname)-8s %(message)s')
+ p = NetgearXSM(switch, 23, debug=True)
+ p.switch_connect('admin', '', None)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.vlan_get_name(2)
+ print "VLAN 2 is named \"%s\"" % buf
+
+ print "Create VLAN 3"
+ p.vlan_create(3)
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "Set name of VLAN 3 to test333"
+ p.vlan_set_name(3, "test333")
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ print "Destroy VLAN 3"
+ p.vlan_destroy(3)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.port_get_mode("1/0/10")
+ print "Port 1/0/10 is in %s mode" % buf
+
+ buf = p.port_get_mode("1/0/11")
+ print "Port 1/0/11 is in %s mode" % buf
+
+ # Test access stuff
+ print "Set 1/0/9 to access mode"
+ p.port_set_mode("1/0/9", "access")
+
+ print "Move 1/0/9 to VLAN 4"
+ p.port_set_access_vlan("1/0/9", 4)
+
+ buf = p.port_get_access_vlan("1/0/9")
+ print "Read from switch: 1/0/9 is on VLAN %s" % buf
+
+ print "Move 1/0/9 back to VLAN 1"
+ p.port_set_access_vlan("1/0/9", 1)
+
+ print "Create VLAN 2"
+ p.vlan_create(2)
+
+ print "Create VLAN 3"
+ p.vlan_create(3)
+
+ print "Create VLAN 4"
+ p.vlan_create(4)
+
+ # Test access stuff
+ print "Set 1/0/9 to trunk mode"
+ p.port_set_mode("1/0/9", "trunk")
+ print "Read from switch: which VLANs is 1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("1/0/9")
+ p.dump_list(buf)
+
+ # The adds below are NOOPs in effect on this switch - no filtering
+ # for "trunk" ports
+ print "Add 1/0/9 to VLAN 2"
+ p.port_add_trunk_to_vlan("1/0/9", 2)
+ print "Add 1/0/9 to VLAN 3"
+ p.port_add_trunk_to_vlan("1/0/9", 3)
+ print "Add 1/0/9 to VLAN 4"
+ p.port_add_trunk_to_vlan("1/0/9", 4)
+ print "Read from switch: which VLANs is 1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("1/0/9")
+ p.dump_list(buf)
+
+ # And the same for removals here
+ p.port_remove_trunk_from_vlan("1/0/9", 3)
+ p.port_remove_trunk_from_vlan("1/0/9", 2)
+ p.port_remove_trunk_from_vlan("1/0/9", 4)
+ print "Read from switch: which VLANs is 1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("1/0/9")
+ p.dump_list(buf)
+
+# print 'Restarting switch, to explicitly reset config'
+# p.switch_restart()
+
+# p.switch_save_running_config()
+
+# p.switch_disconnect()
+# p._show_config()
diff --git a/Vland/drivers/TPLinkTLSG2XXX.py b/Vland/drivers/TPLinkTLSG2XXX.py
new file mode 100644
index 0000000..5213d10
--- /dev/null
+++ b/Vland/drivers/TPLinkTLSG2XXX.py
@@ -0,0 +1,695 @@
+#! /usr/bin/python
+
+# Copyright 2014-2015 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import logging
+import sys
+import re
+import pexpect
+
+if __name__ == '__main__':
+ import os
+ vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
+ sys.path.insert(0, vlandpath)
+ sys.path.insert(0, "%s/.." % vlandpath)
+
+from errors import InputError, PExpectError
+from drivers.common import SwitchDriver, SwitchErrors
+
+class TPLinkTLSG2XXX(SwitchDriver):
+
+ connection = None
+ _username = None
+ _password = None
+ _enable_password = None
+
+ _capabilities = [
+ ]
+
+ # Regexp of expected hardware information - fail if we don't see
+ # this
+ _expected_descr_re = re.compile(r'TL-SG2\d\d\d')
+
+ def __init__(self, switch_hostname, switch_telnetport=23, debug=False):
+ SwitchDriver.__init__(self, switch_hostname, debug)
+ self._systemdata = []
+ self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
+ self.errors = SwitchErrors()
+
+ ################################
+ ### Switch-level API functions
+ ################################
+
+ # Save the current running config into flash - we want config to
+ # remain across reboots
+ def switch_save_running_config(self):
+ try:
+ self._cli("copy running-config startup-config")
+ self.connection.expect("OK")
+ except (PExpectError, pexpect.EOF):
+ # recurse on error
+ self._switch_connect()
+ self.switch_save_running_config()
+
+ # Restart the switch - we need to reload config to do a
+ # roll-back. Do NOT save running-config first if the switch asks -
+ # we're trying to dump recent changes, not save them.
+ #
+ # This will also implicitly cause a connection to be closed
+ def switch_restart(self):
+ self._cli("reboot")
+ index = self.connection.expect(['Daving current', 'Continue?'])
+ if index == 0:
+ self._cli("n") # No, don't save
+ self.connection.expect("Continue?")
+
+ # Fall through
+ self._cli("y") # Yes, continue to reset
+ self.connection.close(True)
+
+ # List the capabilities of the switch (and driver) - some things
+ # make no sense to abstract. Returns a dict of strings, each one
+ # describing an extra feature that that higher levels may care
+ # about
+ def switch_get_capabilities(self):
+ return self._capabilities
+
+ ################################
+ ### VLAN API functions
+ ################################
+
+ # Create a VLAN with the specified tag
+ def vlan_create(self, tag):
+ logging.debug("Creating VLAN %d", tag)
+
+ try:
+ self._configure()
+ self._cli("vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ return
+ raise IOError("Failed to create VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_create(tag)
+
+ # Destroy a VLAN with the specified tag
+ def vlan_destroy(self, tag):
+ logging.debug("Destroying VLAN %d", tag)
+
+ try:
+ self._configure()
+ self._cli("no vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ vlans = self.vlan_get_list()
+ for vlan in vlans:
+ if vlan == tag:
+ raise IOError("Failed to destroy VLAN %d" % tag)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.vlan_destroy(tag)
+
+ # Set the name of a VLAN
+ def vlan_set_name(self, tag, name):
+ logging.debug("Setting name of VLAN %d to %s", tag, name)
+
+ try:
+ self._configure()
+ self._cli("vlan %d" % tag)
+ self._cli("name %s" % name)
+ self._end_configure()
+
+ # Validate it happened
+ read_name = self.vlan_get_name(tag)
+ if read_name != name:
+ raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
+ % (tag, read_name, name))
+
+ except (PExpectError, pexpect.EOF):
+ # recurse on error
+ self._switch_connect()
+ self.vlan_set_name(tag, name)
+
+ # Get a list of the VLAN tags currently registered on the switch
+ def vlan_get_list(self):
+ logging.debug("Grabbing list of VLANs")
+
+ try:
+ vlans = []
+ regex = re.compile(r'^ *(\d+).*active')
+
+ self._cli("show vlan brief")
+ for line in self._read_long_output("show vlan brief"):
+ match = regex.match(line)
+ if match:
+ vlans.append(int(match.group(1)))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_list()
+
+ # For a given VLAN tag, ask the switch what the associated name is
+ def vlan_get_name(self, tag):
+ logging.debug("Grabbing the name of VLAN %d", tag)
+
+ try:
+ name = None
+ regex = re.compile(r'^ *\d+\s+(\S+).*(active)')
+ self._cli("show vlan id %d" % tag)
+ for line in self._read_long_output("show vlan id"):
+ match = regex.match(line)
+ if match:
+ name = match.group(1)
+ name.strip()
+ return name
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.vlan_get_name(tag)
+
+ ################################
+ ### Port API functions
+ ################################
+
+ # Set the mode of a port: access or trunk
+ def port_set_mode(self, port, mode):
+ logging.debug("Setting port %s to %s", port, mode)
+ if not self._is_port_mode_valid(mode):
+ raise IndexError("Port mode %s is not allowed" % mode)
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+ # This switch does not support specific modes, so we can't
+ # actually change the mode directly. However, we can and
+ # should deal with the PVID and memberships of existing VLANs
+
+ try:
+ # We define a trunk to be on *all* VLANs on the switch in
+ # tagged mode, and PVID should match the default VLAN (1).
+ if mode == "trunk":
+ # Disconnect all the untagged ports
+ read_vlans = self._port_get_all_vlans(port, 'Untagged')
+ for vlan in read_vlans:
+ self._port_remove_general_vlan(port, vlan)
+
+ # And move to VLAN 1
+ self.port_add_trunk_to_vlan(port, 1)
+ self._set_pvid(port, 1)
+
+ # And an access port should only be on one VLAN. Move to
+ # VLAN 1, untagged, and set PVID there.
+ if mode == "access":
+ # Disconnect all the ports
+ read_vlans = self._port_get_all_vlans(port, 'Untagged')
+ for vlan in read_vlans:
+ self._port_remove_general_vlan(port, vlan)
+ read_vlans = self._port_get_all_vlans(port, 'Tagged')
+ for vlan in read_vlans:
+ self._port_remove_general_vlan(port, vlan)
+
+ # And move to VLAN 1
+ self.port_set_access_vlan(port, 1)
+ self._set_pvid(port, 1)
+
+ # Validate it happened
+ read_mode = self._port_get_mode(port)
+ if read_mode != mode:
+ raise IOError("Failed to set mode for port %s" % port)
+
+ # And cache the result
+ self._port_modes[port] = mode
+
+ except (PExpectError, pexpect.EOF):
+ # recurse on error
+ self._switch_connect()
+ self.port_set_mode(port, mode)
+
+ # Set an access port to be in a specified VLAN (tag)
+ def port_set_access_vlan(self, port, tag):
+ logging.debug("Setting access port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+ # Does the VLAN already exist?
+ vlan_list = self.vlan_get_list()
+ if not tag in vlan_list:
+ raise IndexError("VLAN tag %d not recognised" % tag)
+
+ try:
+ # Add the new VLAN
+ self._configure()
+ self._cli("interface %s" % self._long_port_name(port))
+ self._cli("switchport general allowed vlan %d untagged" % tag)
+ self._cli("no shutdown")
+ self._end_configure()
+
+ self._set_pvid(port, tag)
+
+ # Now drop all the other VLANs
+ read_vlans = self._port_get_all_vlans(port, 'Untagged')
+ for vlan in read_vlans:
+ if vlan != tag:
+ self._port_remove_general_vlan(port, vlan)
+
+ # Finally, validate things worked
+ read_vlan = int(self.port_get_access_vlan(port))
+ if read_vlan != tag:
+ raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead"
+ % (port, tag, read_vlan))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_set_access_vlan(port, tag)
+
+ # Add a trunk port to a specified VLAN (tag)
+ def port_add_trunk_to_vlan(self, port, tag):
+ logging.debug("Adding trunk port %s to VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+ try:
+ self._configure()
+ self._cli("interface %s" % self._long_port_name(port))
+ self._cli("switchport general allowed vlan %d tagged" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag or vlan == "ALL":
+ return
+ raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_add_trunk_to_vlan(port, tag)
+
+ # Remove a trunk port from a specified VLAN (tag)
+ def port_remove_trunk_from_vlan(self, port, tag):
+ logging.debug("Removing trunk port %s from VLAN %d", port, tag)
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+
+ try:
+ self._configure()
+ self._cli("interface %s" % self._long_port_name(port))
+ self._cli("no switchport general allowed vlan %d" % tag)
+ self._end_configure()
+
+ # Validate it happened
+ read_vlans = self.port_get_trunk_vlan_list(port)
+ for vlan in read_vlans:
+ if vlan == tag:
+ raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag))
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ self.port_remove_trunk_from_vlan(port, tag)
+
+ # Get the configured VLAN tag for an access port (i.e. a port not
+ # configured for tagged egress)
+ def port_get_access_vlan(self, port):
+ logging.debug("Getting VLAN for access port %s", port)
+ vlan = 1
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+ regex = re.compile(r'(\d+)\s+.*Untagged')
+
+ try:
+ self._cli("show interface switchport %s" % self._long_port_name(port))
+ for line in self._read_long_output("show interface switchport"):
+ match = regex.match(line)
+ if match:
+ vlan = match.group(1)
+ return int(vlan)
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self.port_get_access_vlan(port)
+
+ # Get the list of configured VLAN tags for a trunk port
+ def port_get_trunk_vlan_list(self, port):
+ logging.debug("Getting VLANs for trunk port %s", port)
+
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+
+ return self._port_get_all_vlans(port, 'Tagged')
+
+ ################################
+ ### Internal functions
+ ################################
+
+ # Connect to the switch and log in
+ def _switch_connect(self):
+
+ if not self.connection is None:
+ self.connection.close(True)
+ self.connection = None
+
+ logging.debug("Connecting to Switch with: %s", self.exec_string)
+ self.connection = pexpect.spawn(self.exec_string, logfile=self.logger)
+ self._login()
+
+ # No way to avoid paged output on this switch AFAICS
+
+ # And grab details about the switch. in case we need it
+ self._get_systemdata()
+
+ # And also validate them - make sure we're driving a switch of
+ # the correct model! Also store the serial number
+ descr_regex = re.compile(r'Hardware Version\s+ - (.*)')
+ descr = ""
+
+ for line in self._systemdata:
+ match = descr_regex.match(line)
+ if match:
+ descr = match.group(1)
+
+ logging.debug("system description is %s", descr)
+
+ if not self._expected_descr_re.match(descr):
+ raise IOError("Switch %s not recognised by this driver: abort" % descr)
+
+ # Now build a list of our ports, for later sanity checking
+ self._ports = self._get_port_names()
+ if len(self._ports) < 4:
+ raise IOError("Not enough ports detected - problem!")
+
+ def _login(self):
+ logging.debug("attempting login with username \"%s\", password \"%s\", enable_password \"%s\"", self._username, self._password, self._enable_password)
+ self.connection.expect('User Access Login')
+ if self._username is not None:
+ self.connection.expect("User:")
+ self._cli("%s" % self._username)
+ if self._password is not None:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._password, False)
+ while True:
+ index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
+ if index != 4: # Any other means: failed to log in!
+ logging.error("Login failure: index %d\n", index)
+ logging.error("Login failure: %s\n", self.connection.match.before)
+ raise IOError
+
+ # else
+ self._prompt_name = re.escape(self.connection.match.group(1).strip())
+ if self.connection.match.group(2) == ">":
+ # Need to enter "enable" mode too
+ self._cli("")
+ self._cli("enable")
+ if self._enable_password is not None and len(self._enable_password) > 0:
+ self.connection.expect("Password:")
+ self._cli("%s" % self._enable_password, False)
+ index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
+ if index != 3: # Any other means: failed to log in!
+ logging.error("Enable password failure: %s\n", self.connection.match)
+ raise IOError
+ return 0
+
+ def _logout(self):
+ logging.debug("Logging out")
+ self._cli("exit", False)
+ self.connection.close(True)
+
+ def _configure(self):
+ self._cli("configure")
+
+ def _end_configure(self):
+ self._cli("end")
+
+ def _read_long_output(self, text):
+ longbuf = []
+ prompt = self._prompt_name + '#'
+ while True:
+ try:
+ index = self.connection.expect(['^Press any key to continue', prompt])
+ if index == 0: # "Press any key to continue (Q to quit)"
+ for line in self.connection.before.split('\r\n'):
+ line1 = re.sub('(\x08|\x0D)*', '', line.strip())
+ longbuf.append(line1)
+ self._cli(' ', False)
+ elif index == 1: # Back to a prompt, says output is finished
+ break
+ except (pexpect.EOF, pexpect.TIMEOUT):
+ # Something went wrong; logout, log in and try again!
+ logging.error("PEXPECT FAILURE, RECONNECT")
+ self.errors.log_error_in(text)
+ raise PExpectError("_read_long_output failed")
+ except:
+ logging.error("prompt is \"%s\"", prompt)
+ raise
+
+ for line in self.connection.before.split('\r\n'):
+ line1 = re.sub('(\x08|\x0D)*', '', line.strip())
+ longbuf.append(line1)
+
+ return longbuf
+
+ def _long_port_name(self, port):
+ return re.sub('Gi', 'gigabitEthernet ', port)
+
+ def _set_pvid(self, port, pvid):
+ # Set a port's PVID
+ self._configure()
+ self._cli("interface %s" % self._long_port_name(port))
+ self._cli("switchport pvid %d" % pvid)
+ self._end_configure()
+
+ def _port_get_all_vlans(self, port, port_type):
+ vlans = []
+ regex = re.compile(r'(\d+)\s+.*' + port_type)
+
+ try:
+ self._cli("show interface switchport %s" % self._long_port_name(port))
+ for line in self._read_long_output("show interface switchport"):
+ match = regex.match(line)
+ if match:
+ vlan = match.group(1)
+ vlans.append(int(vlan))
+ return vlans
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._port_get_all_vlans(port, port_type)
+
+ def _port_remove_general_vlan(self, port, tag):
+ try:
+ self._configure()
+ self._cli("interface %s" % self._long_port_name(port))
+ self._cli("no switchport general allowed vlan %d" % tag)
+ self._end_configure()
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._port_remove_general_vlan(port, tag)
+
+ def _get_port_names(self):
+ logging.debug("Grabbing list of ports")
+ interfaces = []
+
+ # Use "Link" to only identify lines in the output that match
+ # interfaces that exist - it'll match "LinkUp" and "LinkDown"
+ regex = re.compile(r'^\s*([a-zA-Z0-9_/]*).*Link')
+
+ try:
+ self._cli("show interface status")
+ for line in self._read_long_output("show interface status"):
+ match = regex.match(line)
+ if match:
+ interface = match.group(1)
+ interfaces.append(interface)
+ self._port_numbers[interface] = len(interfaces)
+ logging.debug(" found %d ports on the switch", len(interfaces))
+ return interfaces
+
+ except PExpectError:
+ # recurse on error
+ self._switch_connect()
+ return self._get_port_names()
+
+ # Get the mode of a port: access or trunk
+ def _port_get_mode(self, port):
+ logging.debug("Getting mode of port %s", port)
+
+ if not self._is_port_name_valid(port):
+ raise IndexError("Port name %s not recognised" % port)
+
+ # This switch does not support specific modes, so we have to
+ # make stuff up here. We define trunk ports to be on (1 or
+ # many) tagged VLANs, anything not tagged to be access.
+ read_vlans = self._port_get_all_vlans(port, 'Tagged')
+ if len(read_vlans) > 0:
+ return "trunk"
+ else:
+ return "access"
+
+ def _show_config(self):
+ logging.debug("Grabbing config")
+ self._cli("show running-config")
+ return self._read_long_output("show running-config")
+
+ def _show_clock(self):
+ logging.debug("Grabbing time")
+ self._cli("show system-time")
+ return self._read_long_output("show system-time")
+
+ def _get_systemdata(self):
+ logging.debug("Grabbing system sw and hw versions")
+
+ self._cli("show system-info")
+ self._systemdata = []
+ for line in self._read_long_output("show system-info"):
+ self._systemdata.append(line)
+
+ # Wrapper around connection.send - by default, expect() the same
+ # text we've sent, to remove it from the output from the
+ # switch. For the few cases where we don't need that, override
+ # this using echo=False.
+ # Horrible, but seems to work.
+ def _cli(self, text, echo=True):
+ self.connection.send(text + '\r')
+ if echo:
+ self.connection.expect(text)
+
+if __name__ == "__main__":
+
+ # Simple test harness - exercise the main working functions above to verify
+ # they work. This does *NOT* test really disruptive things like "save
+ # running-config" and "reload" - test those by hand.
+
+ import optparse
+
+ switch = '10.172.2.50'
+ parser = optparse.OptionParser()
+ parser.add_option("--switch",
+ dest = "switch",
+ action = "store",
+ nargs = 1,
+ type = "string",
+ help = "specify switch to connect to for testing",
+ metavar = "<switch>")
+ (opts, args) = parser.parse_args()
+ if opts.switch:
+ switch = opts.switch
+
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(levelname)-8s %(message)s')
+ p = TPLinkTLSG2XXX(switch, 23, debug = False)
+ p.switch_connect('admin', 'admin', None)
+
+ print "Ports are:"
+ buf = p.switch_get_port_names()
+ p.dump_list(buf)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.vlan_get_name(2)
+ print "VLAN 2 is named \"%s\"" % buf
+
+ print "Create VLAN 3"
+ p.vlan_create(3)
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "Set name of VLAN 3 to test333"
+ p.vlan_set_name(3, "test333")
+
+ buf = p.vlan_get_name(3)
+ print "VLAN 3 is named \"%s\"" % buf
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ print "Destroy VLAN 3"
+ p.vlan_destroy(3)
+
+ print "VLANs are:"
+ buf = p.vlan_get_list()
+ p.dump_list(buf)
+
+ buf = p.port_get_mode("Gi1/0/10")
+ print "Port Gi1/0/10 is in %s mode" % buf
+
+ buf = p.port_get_mode("Gi1/0/11")
+ print "Port Gi1/0/11 is in %s mode" % buf
+
+ # Test access stuff
+ buf = p.port_get_mode("Gi1/0/9")
+ print "Port Gi1/0/9 is in %s mode" % buf
+
+ print "Set Gi1/0/9 to access mode"
+ p.port_set_mode("Gi1/0/9", "access")
+
+ print "Move Gi1/0/9 to VLAN 4"
+ p.port_set_access_vlan("Gi1/0/9", 4)
+
+ buf = p.port_get_access_vlan("Gi1/0/9")
+ print "Read from switch: Gi1/0/9 is on VLAN %s" % buf
+
+ print "Move Gi1/0/9 back to VLAN 1"
+ p.port_set_access_vlan("Gi1/0/9", 1)
+
+ # Test access stuff
+ print "Set Gi1/0/9 to trunk mode"
+ p.port_set_mode("Gi1/0/9", "trunk")
+ print "Read from switch: which VLANs is Gi1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("Gi1/0/9")
+ p.dump_list(buf)
+ print "Add Gi1/0/9 to VLAN 2"
+ p.port_add_trunk_to_vlan("Gi1/0/9", 2)
+ print "Add Gi1/0/9 to VLAN 3"
+ p.port_add_trunk_to_vlan("Gi1/0/9", 3)
+ print "Add Gi1/0/9 to VLAN 4"
+ p.port_add_trunk_to_vlan("Gi1/0/9", 4)
+ print "Read from switch: which VLANs is Gi1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("Gi1/0/9")
+ p.dump_list(buf)
+
+ p.port_remove_trunk_from_vlan("Gi1/0/9", 3)
+ p.port_remove_trunk_from_vlan("Gi1/0/9", 3)
+ p.port_remove_trunk_from_vlan("Gi1/0/9", 4)
+ print "Read from switch: which VLANs is Gi1/0/9 on?"
+ buf = p.port_get_trunk_vlan_list("Gi1/0/9")
+ p.dump_list(buf)
+
+
+ p.switch_save_running_config()
+
+ p.switch_disconnect()
+# p._show_config()
diff --git a/Vland/drivers/__init__.py b/Vland/drivers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Vland/drivers/__init__.py
diff --git a/Vland/drivers/common.py b/Vland/drivers/common.py
new file mode 100644
index 0000000..e564c9e
--- /dev/null
+++ b/Vland/drivers/common.py
@@ -0,0 +1,167 @@
+#! /usr/bin/python
+
+# Copyright 2014-2015 Linaro Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301, USA.
+
+import time
+import logging
+
+from errors import InputError, PExpectError
+
+class SwitchErrors:
+ """ Error logging and statistics class """
+
+ def __init__(self):
+ self.errors_in = 0
+ self.errors_out = 0
+
+ def __repr__(self):
+ return "<SwitchErrors: errors_in: %d, errors_out: %d>" % (self.errors_in, self.errors_out)
+
+ # For now, just count the error. Later on we might add stats and
+ # analysis
+ def log_error_in(self, text):
+ self.errors_in += 1
+
+ # For now, just count the error. Later on we might add stats and
+ # analysis
+ def log_error_out(self, text):
+ self.errors_out += 1
+
+class SwitchDriver(object):
+
+ connection = None
+ hostname = ""
+ serial_number = ''
+
+ _allowed_port_modes = [ "trunk", "access" ]
+ _ports = []
+ _port_modes = {}
+ _port_numbers = {}
+ _prompt_name = ''
+ _username = ''
+ _password = ''
+ _enable_password = ''
+ _systemdata = []
+
+ def __init__ (self, switch_hostname, debug):
+
+ if debug:
+ # Configure logging for pexpect output if we have debug
+ # enabled
+
+ # get the logger
+ self.logger = logging.getLogger(switch_hostname)
+
+ # give the logger the methods required by pexpect
+ self.logger.write = self._log_write
+ self.logger.flush = self._log_do_nothing
+
+ else:
+ self.logger = None
+
+ self.hostname = switch_hostname
+
+ # Connect to the switch and log in
+ def switch_connect(self, username, password, enablepassword):
+ self._username = username
+ self._password = password
+ self._enable_password = enablepassword
+ self._switch_connect()
+
+ # Log out of the switch and drop the connection and all state
+ def switch_disconnect(self):
+ self._logout()
+ logging.debug("Closing connection to %s", self.hostname)
+ self._ports = []
+ self._port_modes.clear()
+ self._port_numbers.clear()
+ self._prompt_name = ''
+ self._systemdata = []
+ del(self)
+
+ def dump_list(self, data):
+ i = 0
+ for line in data:
+ print "%d: \"%s\"" % (i, line)
+ i += 1
+
+ def _delay(self):
+ time.sleep(0.5)
+
+ # List the capabilities of the switch (and driver) - some things
+ # make no sense to abstract. Returns a dict of strings, each one
+ # describing an extra feature that that higher levels may care
+ # about
+ def switch_get_capabilities(self):
+ return self._capabilities
+
+ # List the names of all the ports on the switch
+ def switch_get_port_names(self):
+ return self._ports
+
+ def _is_port_name_valid(self, name):
+ for port in self._ports:
+ if name == port:
+ return True
+ return False
+
+ def _is_port_mode_valid(self, mode):
+ for allowed in self._allowed_port_modes:
+ if allowed == mode:
+ return True
+ return False
+
+ # Try to look up a port mode in our cache. If not there, go ask
+ # the switch and cache the result
+ def port_get_mode(self, port):
+ if not self._is_port_name_valid(port):
+ raise InputError("Port name %s not recognised" % port)
+ if port in self._port_modes:
+ logging.debug("port_get_mode: returning mode %s from cache for port %s", self._port_modes[port], port)
+ return self._port_modes[port]
+ else:
+ mode = self._port_get_mode(port)
+ self._port_modes[port] = mode
+ logging.debug("port_get_mode: found mode %s for port %s, adding to cache", self._port_modes[port], port)
+ return mode
+
+ def port_map_name_to_number(self, port_name):
+ if not self._is_port_name_valid(port_name):
+ raise InputError("Port name %s not recognised" % port_name)
+ logging.debug("port_map_name_to_number: returning %d for port_name %s", self._port_numbers[port_name], port_name)
+ return self._port_numbers[port_name]
+
+ # Wrappers to adapt logging for pexpect when we've configured on a
+ # switch.
+ # This will be the method called by the pexpect object to write a
+ # log message
+ def _log_write(self, *args, **kwargs):
+ # ignore other parameters, pexpect only uses one arg
+ content = args[0]
+
+ if content in [' ', '', '\n', '\r', '\r\n']:
+ return # don't log empty lines
+
+ # Split the output into multiple lines so we get a
+ # well-formatted logfile
+ for line in content.split('\r\n'):
+ logging.info(line)
+
+ # This is the flush method for pexpect
+ def _log_do_nothing(self):
+ pass