blob: 04dc9627d106ef49d1bac0835bf4af5edd61c108 [file] [log] [blame]
Steve McIntyred6759dd2014-08-12 18:10:00 +01001#! /usr/bin/python
2
3# Copyright 2014 Linaro Limited
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program; if not, write to the Free Software
17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18# MA 02110-1301, USA.
19
20import logging
21import pexpect
22import sys
23import time
24import re
25from common import SwitchDriver
26
27class CiscoCatalyst(SwitchDriver):
28
29 connection = None
30 prompt_name = ''
31 ports = []
32 systemdata = []
33 serial_number = ''
34 # Regexp of expected hardware information - fail if we don't see
35 # this
36 expected_descr_re = re.compile('WS-C\S+-\d+P')
37
38 allowed_port_modes = [ "trunk", "general" ]
39
40 logfile = sys.stderr
41 logfile = None
42
43 def __init__(self, switch_hostname, switch_telnetport=23):
44 self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
45
46 ################################
47 ### Switch-level API functions
48 ################################
49
50 # Connect to the switch and log in
51 def SwitchConnect(self, username, password, enablepassword):
52 logging.debug("Connecting to Switch with: %s" % self.exec_string)
53 self.connection = pexpect.spawn(self.exec_string, logfile = self.logfile)
54 self._login(username, password, enablepassword)
55
56 # Try to avoid paged output
57 self.connection.setwinsize(132,1000)
58
59 # And grab details about the switch. in case we need it
60 for line in self._get_systemdata():
61 self.systemdata.append (line)
62
63 # And also validate them - make sure we're driving a switch of
64 # the correct model! Also store the serial number
65 descr_regex = re.compile('^cisco\s+(\S+)')
66 sn_regex = re.compile('System serial number\s+:\s+(\S+)')
67 descr = ""
68
69 print "Grabbed %d lines of systemdata" % len(self.systemdata)
70
71 for line in self.systemdata:
72 match = descr_regex.match(line)
73 if match:
74 descr = match.group(1)
75 match = sn_regex.match(line)
76 if match:
77 self.serial_number = match.group(1)
78
79 print "serial number is %s" % self.serial_number
80 print "system description is %s" % descr
81
82 if not self.expected_descr_re.match(descr):
83 raise IOError("Switch %s not recognised by this driver: abort" % descr)
84
85 # Now build a list of our ports, for later sanity checking
86 self.ports = self._get_port_names()
87 if len(self.ports) < 4:
88 raise IOError("Not enough ports detected - problem!")
89
90 # Log out of the switch and drop the connection and all state
91 def SwitchDisconnect(self):
92 self._logout()
93 logging.debug("Closing connection: %s" % self.connection)
94 self.connection.close(True)
95 del(self)
96
97 # Save the current running config into flash - we want config to
98 # remain across reboots
99 def SwitchSaveRunningConfig(self):
100 self._cli("copy running-config startup-config")
101 self.connection.expect("Y/N")
102 self._cli("y")
103 self.connection.expect("Copy succeeded")
104
105 # List the names of all the ports on the switch
106 def SwitchGetPortNames(self):
107 return self.ports
108
109 ################################
110 ### VLAN API functions
111 ################################
112
113 # Create a VLAN with the specified tag
114 def VlanCreate(self, tag):
115 logging.debug("Creating VLAN %d" % tag)
116 self._configure()
117 self._cli("vlan %d" % tag)
118 self._end_configure()
119
120 # Validate it happened
121 vlans = self.VlanGetList()
122 for vlan in vlans:
123 if vlan == tag:
124 return
125 raise IOError("Failed to create VLAN %d" % tag)
126
127 # Destroy a VLAN with the specified tag
128 def VlanDestroy(self, tag):
129 logging.debug("Destroying VLAN %d" % tag)
130 self._configure()
131 self._cli("no vlan %d" % tag)
132 self._end_configure()
133
134 # Validate it happened
135 vlans = self.VlanGetList()
136 for vlan in vlans:
137 if vlan == tag:
138 raise IOError("Failed to destroy VLAN %d" % tag)
139
140 # Set the name of a VLAN
141 def VlanSetName(self, tag, name):
142 logging.debug("Setting name of VLAN %d to %s" % (tag, name))
143 self._configure()
144 self._cli("vlan %d" % tag)
145 self._cli("name %s" % name)
146 self._end_configure()
147
148 # Validate it happened
149 read_name = self.VlanGetName(tag)
150 if read_name != name:
151 raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
152 % (tag, read_name, name))
153
154 # Get a list of the VLAN tags currently registered on the switch
155 def VlanGetList(self):
156 logging.debug("Grabbing list of VLANs")
157 vlans = []
158
159 regex = re.compile('^ *(\d+).*(active)')
160
161 self._cli("show vlan brief")
162 for line in self._read_paged_output():
163 match = regex.match(line)
164 if match:
165 vlans.append(int(match.group(1)))
166 return vlans
167
168 # For a given VLAN tag, ask the switch what the associated name is
169 def VlanGetName(self, tag):
170 logging.debug("Grabbing the name of VLAN %d" % tag)
171 name = None
172 regex = re.compile('^ *\d+\s+(\S+).*(active)')
173 self._cli("show vlan id %d" % tag)
174 for line in self._read_paged_output():
175 match = regex.match(line)
176 if match:
177 name = match.group(1)
178 name.strip()
179 return name
180
181
182 ################################
183 ### Port API functions
184 ################################
185
186 # Set the mode of a port: general or trunk
187 def PortSetMode(self, port, mode):
188 logging.debug("Setting port %s to %s" % (port, mode))
189 if not self._is_port_mode_valid(mode):
190 raise IndexError("Port mode %s is not allowed" % mode)
191 if not self._is_port_name_valid(port):
192 raise IndexError("Port name %s not recognised" % port)
193 self._configure()
194 self._cli("interface %s" % port)
195 self._cli("switchport mode %s" % mode)
196 self._end_configure()
197
198 # Validate it happened
199 read_mode = self.PortGetMode(port)
200 if read_mode != mode:
201 raise IOError("Failed to set mode for port %s" % port)
202
203
204 # Get the mode of a port: general or trunk
205 def PortGetMode(self, port):
206 logging.debug("Getting mode of port %s" % port)
207 mode = ''
208 if not self._is_port_name_valid(port):
209 raise IndexError("Port name %s not recognised" % port)
210 regex = re.compile('Port Mode: (\S+)')
211 self._cli("show interfaces switchport %s" % port)
212 for line in self._read_paged_output():
213 match = regex.match(line)
214 if match:
215 mode = match.group(1)
216 return mode.lower()
217
218 # Allow the default VLAN on a port
219 def PortAllowDefaultVlan(self, port):
220 if not self._is_port_name_valid(port):
221 raise IndexError("Port name %s not recognised" % port)
222 self._configure()
223 self._cli("interface %s" % port)
224 self._cli("no switchport forbidden default-vlan")
225 self._end_configure()
226 # Difficult to validate
227
228 # Block the default VLAN on a port
229 def PortBlockDefaultVlan(self, port):
230 if not self._is_port_name_valid(port):
231 raise IndexError("Port name %s not recognised" % port)
232 self._configure()
233 self._cli("interface %s" % port)
234 self._cli("switchport forbidden default-vlan")
235 self._end_configure()
236 # Difficult to validate
237
238 # Add a general port to a specified VLAN (tag)
239 def PortAddGeneraltoVlan(self, port, tag):
240 logging.debug("Adding general port %s to VLAN %d" % (port, tag))
241 if not self._is_port_name_valid(port):
242 raise IndexError("Port name %s not recognised" % port)
243 if not (self.PortGetMode(port) == "general"):
244 raise IndexError("Port %s not in general mode" % port)
245
246 # Special handling for VLAN 1 (default)
247 if tag == 1:
248 return self.PortAllowDefaultVlan(port)
249 else:
250 self._configure()
251 self._cli("interface %s" % port)
252 self._cli("switchport general pvid %d" % tag)
253 self._cli("switchport general allowed vlan add %d untagged" % tag)
254 self._end_configure()
255
256 # Validate it happened
257 read_vlan = self.PortGetGeneralVlan(port)
258 if read_vlan != tag:
259 raise IOError("Failed to add general port %d to VLAN %d - got VLAN %d"
260 % (port, tag, read_vlan))
261
262 # Remove a general port from a specified VLAN (tag)
263 def PortRemoveGeneralFromVlan(self, port, tag):
264 logging.debug("Removing general port %s from VLAN %d" % (port, tag))
265 if not self._is_port_name_valid(port):
266 raise IndexError("Port name %s not recognised" % port)
267 if not (self.PortGetMode(port) == "general"):
268 raise IndexError("Port %s not in general mode" % port)
269
270 # Special handling for VLAN 1 (default)
271 if tag == 1:
272 return self.PortBlockDefaultVlan(port)
273 else:
274 self._configure()
275 self._cli("interface %s" % port)
276 self._cli("no switchport general pvid")
277 self._cli("switchport general allowed vlan remove %d" % tag)
278 self._end_configure()
279
280 # Validate it happened
281 read_vlan = self.PortGetGeneralVlan(port)
282 if read_vlan == tag:
283 raise IOError("Failed to remove general port %d from VLAN %d"
284 % (port, tag))
285
286 # Add a trunk port to a specified VLAN (tag)
287 def PortAddTrunkToVlan(self, port, tag):
288 logging.debug("Adding trunk port %s to VLAN %d" % (port, tag))
289 if not self._is_port_name_valid(port):
290 raise IndexError("Port name %s not recognised" % port)
291 if not (self.PortGetMode(port) == "trunk"):
292 raise IndexError("Port %s not in trunk mode" % port)
293 self._configure()
294 self._cli("interface %s" % port)
295 self._cli("switchport trunk allowed vlan add %d" % tag)
296 self._end_configure()
297
298 # Validate it happened
299 read_vlans = self.PortGetTrunkVlanList(port)
300 for vlan in read_vlans:
301 if vlan == tag:
302 return
303 raise IOError("Failed to add trunk port %d to VLAN %d" % (port, tag))
304
305 # Remove a trunk port from a specified VLAN (tag)
306 def PortRemoveTrunkFromVlan(self, port, tag):
307 logging.debug("Removing trunk port %s from VLAN %d" % (port, tag))
308 if not self._is_port_name_valid(port):
309 raise IndexError("Port name %s not recognised" % port)
310 if not (self.PortGetMode(port) == "trunk"):
311 raise IndexError("Port %s not in trunk mode" % port)
312 self._configure()
313 self._cli("interface %s" % port)
314 self._cli("switchport trunk allowed vlan remove %d" % tag)
315 self._end_configure()
316
317 # Validate it happened
318 read_vlans = self.PortGetTrunkVlanList(port)
319 for vlan in read_vlans:
320 if vlan == tag:
321 raise IOError("Failed to remove trunk port %d from VLAN %d" % (port, tag))
322
323 # Get the configured VLAN tag for an general port (tag)
324 def PortGetGeneralVlan(self, port):
325 logging.debug("Getting VLAN for general port %s" % port)
326 vlan = 1
327 if not self._is_port_name_valid(port):
328 raise IndexError("Port name %s not recognised" % port)
329 if not (self.PortGetMode(port) == "general"):
330 raise IndexError("Port %s not in general mode" % port)
331 regex = re.compile('(\d+)\s+\S+\s+Untagged\s+Static')
332 self._cli("show interfaces switchport %s" % port)
333 for line in self._read_paged_output():
334 match = regex.match(line)
335 if match:
336 vlan = match.group(1)
337 return int(vlan)
338
339 # Get the list of configured VLAN tags for a trunk port
340 def PortGetTrunkVlanList(self, port):
341 logging.debug("Getting VLANs for trunk port %s" % port)
342 vlans = [ ]
343 if not self._is_port_name_valid(port):
344 raise IndexError("Port name %s not recognised" % port)
345 if not (self.PortGetMode(port) == "trunk"):
346 raise IndexError("Port %s not in general mode" % port)
347 regex = re.compile('(\d+)\s+\S+\s+(Tagged|Untagged)\s+Static')
348 self._cli("show interfaces switchport %s" % port)
349 for line in self._read_paged_output():
350 match = regex.match(line)
351 if match:
352 vlans.append (int(match.group(1)))
353 return vlans
354
355 ################################
356 ### Internal functions
357 ################################
358
359 def _login(self, username, password, enablepassword):
360 logging.debug("attempting login with username %s, password %s" % (username, password))
361 self.connection.expect('User Access Verification')
362 if username is not None:
363 self.connection.expect("User Name:")
364 self._cli("%s" % username)
365 if password is not None:
366 self.connection.expect("Password:")
367 self._cli("%s" % password, False)
368 while True:
369 index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
370 if index != 4: # Any other means: failed to log in!
371 logging.error("Login failure: index %d\n" % index)
372 logging.error("Login failure: %s\n" % self.connection.match.before)
373 raise IOError
374
375 # else
376 self.prompt_name = self.connection.match.group(1).strip()
377 if self.connection.match.group(2) == ">":
378 # Need to enter "enable" mode too
379 self._cli("enable")
380 if enablepassword is not None:
381 self.connection.expect("Password:")
382 self._cli("%s" % enablepassword, False)
383 index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
384 if index != 3: # Any other means: failed to log in!
385 logging.error("Enable password failure: %s\n" % self.connection.match)
386 raise IOError
387 return 0
388
389 def _logout(self):
390 logging.debug("Logging out")
391 self._cli("exit", False)
392
393 def _configure(self):
394 self._cli("configure terminal")
395
396 def _end_configure(self):
397 self._cli("end")
398
399 def _read_paged_output(self):
400 buf = []
401 prompt = self.prompt_name + '#'
402 while True:
403 index = self.connection.expect([' -*More-*', prompt])
404 if index == 0: # More: <space>
405 for line in self.connection.before.split('\r\n'):
406 line1 = re.sub('(\x08|\x0D)*', '', line.strip())
407 buf.append(line1)
408 self._cli(' ', False)
409 elif index == 1: # Back to a prompt, says output is finished
410 break
411
412 for line in self.connection.before.split('\r\n'):
413 line1 = re.sub('(\x08|\x0D)*', '', line.strip())
414 buf.append(line1)
415
416 return buf
417
418 def _get_port_names(self):
419 logging.debug("Grabbing list of ports")
420 interfaces = []
421
422 # Use "Up" or "Down" to only identify lines in the output that
423 # match interfaces that exist
424 regex = re.compile('^\s*([a-zA-Z0-9_/]*).*(connect)(.*)')
425 regex1 = re.compile('.*Not Present.*')
426
427 self._cli("show interfaces status")
428 for line in self._read_paged_output():
429 match = regex.match(line)
430 if match:
431 interface = match.group(1)
432 junk = match.group(3)
433 match1 = regex1.match(junk) # Deliberately drop things
434 # marked as "Not Present"
435 if not match1:
436 interfaces.append(interface)
437 return interfaces
438
439 def _is_port_name_valid(self, name):
440 logging.debug("Checking if supplied port name \"%s\" is valid" % name)
441 for port in self.ports:
442 if name == port:
443 return True
444 return False
445
446 def _is_port_mode_valid(self, mode):
447 logging.debug("Checking if supplied port mode \"%s\" is valid" % mode)
448 for allowed in self.allowed_port_modes:
449 if allowed == mode:
450 return True
451 return False
452
453 def _show_config(self):
454 logging.debug("Grabbing config")
455 self._cli("show running-config")
456 return self._read_paged_output()
457
458 def _show_clock(self):
459 logging.debug("Grabbing time")
460 self._cli("show clock")
461 return self._read_paged_output()
462
463 def _show_clock(self):
464 logging.debug("Grabbing ")
465 self._cli("show clock")
466 return self._read_paged_output()
467
468 def _get_systemdata(self):
469 data = []
470
471 logging.debug("Grabbing system sw and hw versions")
472 self._cli("show version")
473 for line in self._read_paged_output():
474 data.append(line)
475
476 return data
477
478 # Wrapper around connection.send - by default, expect() the same
479 # text we've sent, to remove it from the output from the
480 # switch. For the few cases where we don't need that, override
481 # this using echo=False.
482 # Horrible, but seems to work.
483 def _cli(self, text, echo=True):
484 self.connection.send(text + '\r')
485 if echo:
486 self.connection.expect(text)
487
488if __name__ == "__main__":
489 p = CiscoCatalyst('lngswitch02', 23)
490 p.SwitchConnect(None, 'lngvirtual', 'lngenable')
491
492 print "VLANs are:"
493 buf = p.VlanGetList()
494 p._dump_list(buf)
495
496 buf = p.VlanGetName(2)
497 print "VLAN 2 is named \"%s\"" % buf
498
499 print "Create VLAN 3"
500 p.VlanCreate(3)
501
502 buf = p.VlanGetName(3)
503 print "VLAN 3 is named \"%s\"" % buf
504
505 print "Set name of VLAN 3 to test333"
506 p.VlanSetName(3, "test333")
507
508 buf = p.VlanGetName(3)
509 print "VLAN 3 is named \"%s\"" % buf
510
511 print "VLANs are:"
512 buf = p.VlanGetList()
513 p._dump_list(buf)
514
515 print "Destroy VLAN 3"
516 p.VlanDestroy(3)
517
518 print "VLANs are:"
519 buf = p.VlanGetList()
520 p._dump_list(buf)
521
522 #print "List ports"
523 #buf = p.SwitchGetPortNames()
524 #p._dump_list(buf)
525
526 #print "System data:"
527 #p._dump_list(p.systemdata)
528
529# print "Creating VLANs for testing:"
530# for i in [ 2, 3, 4, 5, 20 ]:
531# p.VlanCreate(i)
532# p.VlanSetName(i, "test%d" % i)
533# print " %d (test%d)" % (i, i)
534
535 #print "And dump config\n"
536 #buf = p._show_config()
537 #print "%s" % buf
538
539 #print "Destroying VLAN 2\n"
540 #p.VlanDestroy(2)
541
542 #print "And dump config\n"
543 #buf = p._show_config()
544 #print "%s" % buf
545
546 #print "Port names are:"
547 #buf = p.SwitchGetPortNames()
548 #p._dump_list(buf)
549
550 #buf = p.VlanGetName(25)
551 #print "VLAN with tag 25 is called \"%s\"" % buf
552
553 #p.VlanSetName(35, "foo")
554 #print "VLAN with tag 35 is called \"foo\""
555
556 #buf = p.PortGetMode("fa12")
557 #print "Port fa12 is in %s mode" % buf
558
559 # Test general stuff
560# print "Set fa6 to general mode"
561# p.PortSetMode("fa6", "general")
562 #print "Remove fa6 from VLAN 1 (default)"
563 #p.PortRemoveGeneralFromVlan("fa6", 1)
564 #print "Move fa6 to VLAN 2"
565 #p.PortAddGeneraltoVlan("fa6", 2)
566# buf = p.PortGetGeneralVlan("fa6")
567# print "Read from switch: fa6 is on VLAN %s" % buf
568# print "Remove fa6 from VLAN 2"
569# p.PortRemoveGeneralFromVlan("fa6", 2)
570# print "And re-enable the default VLAN for fa6"
571# p.PortAddGeneraltoVlan("fa6", 1)
572 #print "And move fa6 back to a trunk port"
573 #p.PortSetMode("fa6", "trunk")
574 #buf = p.PortGetMode("fa6")
575 #print "Port fa6 is in %s mode" % buf
576
577 # Test trunk stuff
578# print "Set gi2 to trunk mode"
579# p.PortSetMode("gi2", "trunk")
580# print "Add gi2 to VLAN 2"
581# p.PortAddTrunkToVlan("gi2", 2)
582# print "Add gi2 to VLAN 3"
583# p.PortAddTrunkToVlan("gi2", 3)
584# print "Add gi2 to VLAN 4"
585# p.PortAddTrunkToVlan("gi2", 4)
586# print "Read from switch: which VLANs is gi2 on?"
587# buf = p.PortGetTrunkVlanList("gi2")
588# p._dump_list(buf)
589
590# p.PortRemoveTrunkFromVlan("gi2", 3)
591# p.PortRemoveTrunkFromVlan("gi2", 3)
592# p.PortRemoveTrunkFromVlan("gi2", 4)
593# print "Read from switch: which VLANs is gi2 on?"
594# buf = p.PortGetTrunkVlanList("gi2")
595# p._dump_list(buf)
596
597 # print "Adding lots of ports to VLANs"
598 # p.PortAddTrunkToVlan("fa1", 2)
599 # p.PortAddTrunkToVlan("fa3", 2)
600 # p.PortAddTrunkToVlan("fa5", 2)
601 # p.PortAddTrunkToVlan("fa7", 2)
602 # p.PortAddTrunkToVlan("fa9", 2)
603 # p.PortAddTrunkToVlan("fa11", 2)
604 # p.PortAddTrunkToVlan("fa13", 2)
605 # p.PortAddTrunkToVlan("fa15", 2)
606 # p.PortAddTrunkToVlan("fa17", 2)
607 # p.PortAddTrunkToVlan("fa19", 2)
608 # p.PortAddTrunkToVlan("fa21", 2)
609 # p.PortAddTrunkToVlan("fa23", 2)
610 # p.PortAddTrunkToVlan("gi4", 2)
611
612# p.SwitchSaveRunningConfig()
613
614# p.SwitchDisconnect()
615# p._show_config()
616