| #! /usr/bin/python |
| |
| # Copyright 2014 Linaro Limited |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, |
| # MA 02110-1301, USA. |
| |
| import logging |
| import pexpect |
| import sys |
| import time |
| import re |
| from common import SwitchDriver |
| |
| class CiscoCatalyst(SwitchDriver): |
| |
| connection = None |
| prompt_name = '' |
| ports = [] |
| systemdata = [] |
| serial_number = '' |
| # Regexp of expected hardware information - fail if we don't see |
| # this |
| expected_descr_re = re.compile('WS-C\S+-\d+P') |
| |
| allowed_port_modes = [ "trunk", "general" ] |
| |
| logfile = sys.stderr |
| logfile = None |
| |
| def __init__(self, switch_hostname, switch_telnetport=23): |
| self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport) |
| |
| ################################ |
| ### Switch-level API functions |
| ################################ |
| |
| # Connect to the switch and log in |
| def SwitchConnect(self, username, password, enablepassword): |
| logging.debug("Connecting to Switch with: %s" % self.exec_string) |
| self.connection = pexpect.spawn(self.exec_string, logfile = self.logfile) |
| self._login(username, password, enablepassword) |
| |
| # Try to avoid paged output |
| self.connection.setwinsize(132,1000) |
| |
| # And grab details about the switch. in case we need it |
| for line in self._get_systemdata(): |
| self.systemdata.append (line) |
| |
| # And also validate them - make sure we're driving a switch of |
| # the correct model! Also store the serial number |
| descr_regex = re.compile('^cisco\s+(\S+)') |
| sn_regex = re.compile('System serial number\s+:\s+(\S+)') |
| descr = "" |
| |
| print "Grabbed %d lines of systemdata" % len(self.systemdata) |
| |
| for line in self.systemdata: |
| match = descr_regex.match(line) |
| if match: |
| descr = match.group(1) |
| match = sn_regex.match(line) |
| if match: |
| self.serial_number = match.group(1) |
| |
| print "serial number is %s" % self.serial_number |
| print "system description is %s" % descr |
| |
| if not self.expected_descr_re.match(descr): |
| raise IOError("Switch %s not recognised by this driver: abort" % descr) |
| |
| # Now build a list of our ports, for later sanity checking |
| self.ports = self._get_port_names() |
| if len(self.ports) < 4: |
| raise IOError("Not enough ports detected - problem!") |
| |
| # Log out of the switch and drop the connection and all state |
| def SwitchDisconnect(self): |
| self._logout() |
| logging.debug("Closing connection: %s" % self.connection) |
| self.connection.close(True) |
| del(self) |
| |
| # Save the current running config into flash - we want config to |
| # remain across reboots |
| def SwitchSaveRunningConfig(self): |
| self._cli("copy running-config startup-config") |
| self.connection.expect("Y/N") |
| self._cli("y") |
| self.connection.expect("Copy succeeded") |
| |
| # List the names of all the ports on the switch |
| def SwitchGetPortNames(self): |
| return self.ports |
| |
| ################################ |
| ### VLAN API functions |
| ################################ |
| |
| # Create a VLAN with the specified tag |
| def VlanCreate(self, tag): |
| logging.debug("Creating VLAN %d" % tag) |
| self._configure() |
| self._cli("vlan %d" % tag) |
| self._end_configure() |
| |
| # Validate it happened |
| vlans = self.VlanGetList() |
| for vlan in vlans: |
| if vlan == tag: |
| return |
| raise IOError("Failed to create VLAN %d" % tag) |
| |
| # Destroy a VLAN with the specified tag |
| def VlanDestroy(self, tag): |
| logging.debug("Destroying VLAN %d" % tag) |
| self._configure() |
| self._cli("no vlan %d" % tag) |
| self._end_configure() |
| |
| # Validate it happened |
| vlans = self.VlanGetList() |
| for vlan in vlans: |
| if vlan == tag: |
| raise IOError("Failed to destroy VLAN %d" % tag) |
| |
| # Set the name of a VLAN |
| def VlanSetName(self, tag, name): |
| logging.debug("Setting name of VLAN %d to %s" % (tag, name)) |
| self._configure() |
| self._cli("vlan %d" % tag) |
| self._cli("name %s" % name) |
| self._end_configure() |
| |
| # Validate it happened |
| read_name = self.VlanGetName(tag) |
| if read_name != name: |
| raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")" |
| % (tag, read_name, name)) |
| |
| # Get a list of the VLAN tags currently registered on the switch |
| def VlanGetList(self): |
| logging.debug("Grabbing list of VLANs") |
| vlans = [] |
| |
| regex = re.compile('^ *(\d+).*(active)') |
| |
| self._cli("show vlan brief") |
| for line in self._read_paged_output(): |
| match = regex.match(line) |
| if match: |
| vlans.append(int(match.group(1))) |
| return vlans |
| |
| # For a given VLAN tag, ask the switch what the associated name is |
| def VlanGetName(self, tag): |
| logging.debug("Grabbing the name of VLAN %d" % tag) |
| name = None |
| regex = re.compile('^ *\d+\s+(\S+).*(active)') |
| self._cli("show vlan id %d" % tag) |
| for line in self._read_paged_output(): |
| match = regex.match(line) |
| if match: |
| name = match.group(1) |
| name.strip() |
| return name |
| |
| |
| ################################ |
| ### Port API functions |
| ################################ |
| |
| # Set the mode of a port: general or trunk |
| def PortSetMode(self, port, mode): |
| logging.debug("Setting port %s to %s" % (port, mode)) |
| if not self._is_port_mode_valid(mode): |
| raise IndexError("Port mode %s is not allowed" % mode) |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| # Catalyst uses a different name here :-( |
| if mode == "general": |
| mode = "access" |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("switchport mode %s" % mode) |
| self._end_configure() |
| |
| # Validate it happened |
| read_mode = self.PortGetMode(port) |
| if read_mode != mode: |
| raise IOError("Failed to set mode for port %s" % port) |
| |
| # Get the mode of a port: general or trunk |
| def PortGetMode(self, port): |
| logging.debug("Getting mode of port %s" % port) |
| mode = '' |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| regex = re.compile('Administrative Mode: \S+ (\S+)') |
| self._cli("show interfaces %s switchport" % port) |
| for line in self._read_paged_output(): |
| match = regex.match(line) |
| if match: |
| mode = match.group(1) |
| # Munge the names to match what we expect |
| if mode.lower() == "access": |
| mode = "general" |
| elif mode.lower() == "auto": |
| mode = "trunk" |
| return mode.lower() |
| |
| # Allow the default VLAN on a port |
| def PortAllowDefaultVlan(self, port): |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("no switchport forbidden default-vlan") |
| self._end_configure() |
| # Difficult to validate |
| |
| # Block the default VLAN on a port |
| def PortBlockDefaultVlan(self, port): |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("switchport forbidden default-vlan") |
| self._end_configure() |
| # Difficult to validate |
| |
| # Add a general port to a specified VLAN (tag) |
| def PortAddGeneraltoVlan(self, port, tag): |
| logging.debug("Adding general port %s to VLAN %d" % (port, tag)) |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| if not (self.PortGetMode(port) == "general"): |
| raise IndexError("Port %s not in general mode" % port) |
| |
| # Special handling for VLAN 1 (default) |
| if tag == 1: |
| return self.PortAllowDefaultVlan(port) |
| else: |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("switchport general pvid %d" % tag) |
| self._cli("switchport general allowed vlan add %d untagged" % tag) |
| self._end_configure() |
| |
| # Validate it happened |
| read_vlan = self.PortGetGeneralVlan(port) |
| if read_vlan != tag: |
| raise IOError("Failed to add general port %d to VLAN %d - got VLAN %d" |
| % (port, tag, read_vlan)) |
| |
| # Remove a general port from a specified VLAN (tag) |
| def PortRemoveGeneralFromVlan(self, port, tag): |
| logging.debug("Removing general port %s from VLAN %d" % (port, tag)) |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| if not (self.PortGetMode(port) == "general"): |
| raise IndexError("Port %s not in general mode" % port) |
| |
| # Special handling for VLAN 1 (default) |
| if tag == 1: |
| return self.PortBlockDefaultVlan(port) |
| else: |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("no switchport general pvid") |
| self._cli("switchport general allowed vlan remove %d" % tag) |
| self._end_configure() |
| |
| # Validate it happened |
| read_vlan = self.PortGetGeneralVlan(port) |
| if read_vlan == tag: |
| raise IOError("Failed to remove general port %d from VLAN %d" |
| % (port, tag)) |
| |
| # Add a trunk port to a specified VLAN (tag) |
| def PortAddTrunkToVlan(self, port, tag): |
| logging.debug("Adding trunk port %s to VLAN %d" % (port, tag)) |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| if not (self.PortGetMode(port) == "trunk"): |
| raise IndexError("Port %s not in trunk mode" % port) |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("switchport trunk allowed vlan add %d" % tag) |
| self._end_configure() |
| |
| # Validate it happened |
| read_vlans = self.PortGetTrunkVlanList(port) |
| for vlan in read_vlans: |
| if vlan == tag: |
| return |
| raise IOError("Failed to add trunk port %d to VLAN %d" % (port, tag)) |
| |
| # Remove a trunk port from a specified VLAN (tag) |
| def PortRemoveTrunkFromVlan(self, port, tag): |
| logging.debug("Removing trunk port %s from VLAN %d" % (port, tag)) |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| if not (self.PortGetMode(port) == "trunk"): |
| raise IndexError("Port %s not in trunk mode" % port) |
| self._configure() |
| self._cli("interface %s" % port) |
| self._cli("switchport trunk allowed vlan remove %d" % tag) |
| self._end_configure() |
| |
| # Validate it happened |
| read_vlans = self.PortGetTrunkVlanList(port) |
| for vlan in read_vlans: |
| if vlan == tag: |
| raise IOError("Failed to remove trunk port %d from VLAN %d" % (port, tag)) |
| |
| # Get the configured VLAN tag for an general port (tag) |
| def PortGetGeneralVlan(self, port): |
| logging.debug("Getting VLAN for general port %s" % port) |
| vlan = 1 |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| if not (self.PortGetMode(port) == "general"): |
| raise IndexError("Port %s not in general mode" % port) |
| regex = re.compile('(\d+)\s+\S+\s+Untagged\s+Static') |
| self._cli("show interfaces switchport %s" % port) |
| for line in self._read_paged_output(): |
| match = regex.match(line) |
| if match: |
| vlan = match.group(1) |
| return int(vlan) |
| |
| # Get the list of configured VLAN tags for a trunk port |
| def PortGetTrunkVlanList(self, port): |
| logging.debug("Getting VLANs for trunk port %s" % port) |
| vlans = [ ] |
| if not self._is_port_name_valid(port): |
| raise IndexError("Port name %s not recognised" % port) |
| if not (self.PortGetMode(port) == "trunk"): |
| raise IndexError("Port %s not in general mode" % port) |
| regex = re.compile('(\d+)\s+\S+\s+(Tagged|Untagged)\s+Static') |
| self._cli("show interfaces switchport %s" % port) |
| for line in self._read_paged_output(): |
| match = regex.match(line) |
| if match: |
| vlans.append (int(match.group(1))) |
| return vlans |
| |
| ################################ |
| ### Internal functions |
| ################################ |
| |
| def _login(self, username, password, enablepassword): |
| logging.debug("attempting login with username %s, password %s" % (username, password)) |
| self.connection.expect('User Access Verification') |
| if username is not None: |
| self.connection.expect("User Name:") |
| self._cli("%s" % username) |
| if password is not None: |
| self.connection.expect("Password:") |
| self._cli("%s" % password, False) |
| while True: |
| index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) |
| if index != 4: # Any other means: failed to log in! |
| logging.error("Login failure: index %d\n" % index) |
| logging.error("Login failure: %s\n" % self.connection.match.before) |
| raise IOError |
| |
| # else |
| self.prompt_name = self.connection.match.group(1).strip() |
| if self.connection.match.group(2) == ">": |
| # Need to enter "enable" mode too |
| self._cli("enable") |
| if enablepassword is not None: |
| self.connection.expect("Password:") |
| self._cli("%s" % enablepassword, False) |
| index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)']) |
| if index != 3: # Any other means: failed to log in! |
| logging.error("Enable password failure: %s\n" % self.connection.match) |
| raise IOError |
| return 0 |
| |
| def _logout(self): |
| logging.debug("Logging out") |
| self._cli("exit", False) |
| |
| def _configure(self): |
| self._cli("configure terminal") |
| |
| def _end_configure(self): |
| self._cli("end") |
| |
| def _read_paged_output(self): |
| buf = [] |
| prompt = self.prompt_name + '#' |
| while True: |
| index = self.connection.expect([' -*More-*', prompt]) |
| if index == 0: # More: <space> |
| for line in self.connection.before.split('\r\n'): |
| line1 = re.sub('(\x08|\x0D)*', '', line.strip()) |
| buf.append(line1) |
| self._cli(' ', False) |
| elif index == 1: # Back to a prompt, says output is finished |
| break |
| |
| for line in self.connection.before.split('\r\n'): |
| line1 = re.sub('(\x08|\x0D)*', '', line.strip()) |
| buf.append(line1) |
| |
| return buf |
| |
| def _get_port_names(self): |
| logging.debug("Grabbing list of ports") |
| interfaces = [] |
| |
| # Use "Up" or "Down" to only identify lines in the output that |
| # match interfaces that exist |
| regex = re.compile('^\s*([a-zA-Z0-9_/]*).*(connect)(.*)') |
| regex1 = re.compile('.*Not Present.*') |
| |
| self._cli("show interfaces status") |
| for line in self._read_paged_output(): |
| match = regex.match(line) |
| if match: |
| interface = match.group(1) |
| junk = match.group(3) |
| match1 = regex1.match(junk) # Deliberately drop things |
| # marked as "Not Present" |
| if not match1: |
| interfaces.append(interface) |
| return interfaces |
| |
| def _is_port_name_valid(self, name): |
| logging.debug("Checking if supplied port name \"%s\" is valid" % name) |
| for port in self.ports: |
| if name == port: |
| return True |
| return False |
| |
| def _is_port_mode_valid(self, mode): |
| logging.debug("Checking if supplied port mode \"%s\" is valid" % mode) |
| for allowed in self.allowed_port_modes: |
| if allowed == mode: |
| return True |
| return False |
| |
| def _show_config(self): |
| logging.debug("Grabbing config") |
| self._cli("show running-config") |
| return self._read_paged_output() |
| |
| def _show_clock(self): |
| logging.debug("Grabbing time") |
| self._cli("show clock") |
| return self._read_paged_output() |
| |
| def _show_clock(self): |
| logging.debug("Grabbing ") |
| self._cli("show clock") |
| return self._read_paged_output() |
| |
| def _get_systemdata(self): |
| data = [] |
| |
| logging.debug("Grabbing system sw and hw versions") |
| self._cli("show version") |
| for line in self._read_paged_output(): |
| data.append(line) |
| |
| return data |
| |
| # Wrapper around connection.send - by default, expect() the same |
| # text we've sent, to remove it from the output from the |
| # switch. For the few cases where we don't need that, override |
| # this using echo=False. |
| # Horrible, but seems to work. |
| def _cli(self, text, echo=True): |
| self.connection.send(text + '\r') |
| if echo: |
| self.connection.expect(text) |
| |
| if __name__ == "__main__": |
| p = CiscoCatalyst('lngswitch02', 23) |
| p.SwitchConnect(None, 'lngvirtual', 'lngenable') |
| |
| print "VLANs are:" |
| buf = p.VlanGetList() |
| p._dump_list(buf) |
| |
| buf = p.VlanGetName(2) |
| print "VLAN 2 is named \"%s\"" % buf |
| |
| print "Create VLAN 3" |
| p.VlanCreate(3) |
| |
| buf = p.VlanGetName(3) |
| print "VLAN 3 is named \"%s\"" % buf |
| |
| print "Set name of VLAN 3 to test333" |
| p.VlanSetName(3, "test333") |
| |
| buf = p.VlanGetName(3) |
| print "VLAN 3 is named \"%s\"" % buf |
| |
| print "VLANs are:" |
| buf = p.VlanGetList() |
| p._dump_list(buf) |
| |
| print "Destroy VLAN 3" |
| p.VlanDestroy(3) |
| |
| print "VLANs are:" |
| buf = p.VlanGetList() |
| p._dump_list(buf) |
| |
| #print "List ports" |
| #buf = p.SwitchGetPortNames() |
| #p._dump_list(buf) |
| |
| #print "System data:" |
| #p._dump_list(p.systemdata) |
| |
| # print "Creating VLANs for testing:" |
| # for i in [ 2, 3, 4, 5, 20 ]: |
| # p.VlanCreate(i) |
| # p.VlanSetName(i, "test%d" % i) |
| # print " %d (test%d)" % (i, i) |
| |
| #print "And dump config\n" |
| #buf = p._show_config() |
| #print "%s" % buf |
| |
| #print "Destroying VLAN 2\n" |
| #p.VlanDestroy(2) |
| |
| #print "And dump config\n" |
| #buf = p._show_config() |
| #print "%s" % buf |
| |
| #print "Port names are:" |
| #buf = p.SwitchGetPortNames() |
| #p._dump_list(buf) |
| |
| #buf = p.VlanGetName(25) |
| #print "VLAN with tag 25 is called \"%s\"" % buf |
| |
| #p.VlanSetName(35, "foo") |
| #print "VLAN with tag 35 is called \"foo\"" |
| |
| buf = p.PortGetMode("Gi1/0/10") |
| print "Port Gi1/0/10 is in %s mode" % buf |
| |
| buf = p.PortGetMode("Gi1/0/11") |
| print "Port Gi1/0/11 is in %s mode" % buf |
| |
| # Test general stuff |
| # print "Set fa6 to general mode" |
| # p.PortSetMode("fa6", "general") |
| #print "Remove fa6 from VLAN 1 (default)" |
| #p.PortRemoveGeneralFromVlan("fa6", 1) |
| #print "Move fa6 to VLAN 2" |
| #p.PortAddGeneraltoVlan("fa6", 2) |
| # buf = p.PortGetGeneralVlan("fa6") |
| # print "Read from switch: fa6 is on VLAN %s" % buf |
| # print "Remove fa6 from VLAN 2" |
| # p.PortRemoveGeneralFromVlan("fa6", 2) |
| # print "And re-enable the default VLAN for fa6" |
| # p.PortAddGeneraltoVlan("fa6", 1) |
| #print "And move fa6 back to a trunk port" |
| #p.PortSetMode("fa6", "trunk") |
| #buf = p.PortGetMode("fa6") |
| #print "Port fa6 is in %s mode" % buf |
| |
| # Test trunk stuff |
| # print "Set gi2 to trunk mode" |
| # p.PortSetMode("gi2", "trunk") |
| # print "Add gi2 to VLAN 2" |
| # p.PortAddTrunkToVlan("gi2", 2) |
| # print "Add gi2 to VLAN 3" |
| # p.PortAddTrunkToVlan("gi2", 3) |
| # print "Add gi2 to VLAN 4" |
| # p.PortAddTrunkToVlan("gi2", 4) |
| # print "Read from switch: which VLANs is gi2 on?" |
| # buf = p.PortGetTrunkVlanList("gi2") |
| # p._dump_list(buf) |
| |
| # p.PortRemoveTrunkFromVlan("gi2", 3) |
| # p.PortRemoveTrunkFromVlan("gi2", 3) |
| # p.PortRemoveTrunkFromVlan("gi2", 4) |
| # print "Read from switch: which VLANs is gi2 on?" |
| # buf = p.PortGetTrunkVlanList("gi2") |
| # p._dump_list(buf) |
| |
| # print "Adding lots of ports to VLANs" |
| # p.PortAddTrunkToVlan("fa1", 2) |
| # p.PortAddTrunkToVlan("fa3", 2) |
| # p.PortAddTrunkToVlan("fa5", 2) |
| # p.PortAddTrunkToVlan("fa7", 2) |
| # p.PortAddTrunkToVlan("fa9", 2) |
| # p.PortAddTrunkToVlan("fa11", 2) |
| # p.PortAddTrunkToVlan("fa13", 2) |
| # p.PortAddTrunkToVlan("fa15", 2) |
| # p.PortAddTrunkToVlan("fa17", 2) |
| # p.PortAddTrunkToVlan("fa19", 2) |
| # p.PortAddTrunkToVlan("fa21", 2) |
| # p.PortAddTrunkToVlan("fa23", 2) |
| # p.PortAddTrunkToVlan("gi4", 2) |
| |
| # p.SwitchSaveRunningConfig() |
| |
| # p.SwitchDisconnect() |
| # p._show_config() |
| |