blob: a3220ca5afd6e76b31dcd781472ebb815bb1b8ce [file] [log] [blame]
Steve McIntyre448c1d02015-04-29 18:21:25 +01001#! /usr/bin/python
2
3# Copyright 2014 Linaro Limited
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program; if not, write to the Free Software
17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18# MA 02110-1301, USA.
19
20import logging
21import pexpect
22import sys
23import re
24
25if __name__ == '__main__':
26 import os
27 vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0])))
28 sys.path.insert(0, vlandpath)
29 sys.path.insert(0, "%s/.." % vlandpath)
30
31from errors import InputError, PExpectError
32from drivers.common import SwitchDriver, SwitchErrors
33
34class TPLinkTLSG2XXX(SwitchDriver):
35
36 connection = None
37 _username = None
38 _password = None
39 _enable_password = None
40
41 _capabilities = [
42 ]
43
44 # Regexp of expected hardware information - fail if we don't see
45 # this
46 _expected_descr_re = re.compile('TL-SG2\d\d\d')
47
48 logfile = None
49
50 def __init__(self, switch_hostname, switch_telnetport=23, debug = False):
51 SwitchDriver.__init__(self)
52 self._systemdata = []
53 if debug:
54 self.logfile = sys.stderr
55 self.exec_string = "/usr/bin/telnet %s %d" % (switch_hostname, switch_telnetport)
56 self.errors = SwitchErrors()
57
58 ################################
59 ### Switch-level API functions
60 ################################
61
62 # Connect to the switch and log in
63 def switch_connect(self, username, password, enablepassword):
64 self._username = username
65 self._password = password
66 self._enable_password = enablepassword
67 self._switch_connect()
68
69 # Log out of the switch and drop the connection and all state
70 def switch_disconnect(self):
71 self._logout()
72 logging.debug("Closing connection: %s" % self.connection)
73 self.connection.close(True)
74 self._ports = []
75 self._prompt_name = ''
76 self._systemdata = []
77 del(self)
78
79 # Save the current running config into flash - we want config to
80 # remain across reboots
81 def switch_save_running_config(self):
82 try:
83 self._cli("copy running-config startup-config")
84 self.connection.expect("OK")
85 except:
86 # recurse on error
87 self._switch_connect()
88 self.switch_save_running_config()
89
90 # Restart the switch - we need to reload config to do a
91 # roll-back. Do NOT save running-config first if the switch asks -
92 # we're trying to dump recent changes, not save them.
93 #
94 # This will also implicitly cause a connection to be closed
95 def switch_restart(self):
96 self._cli("reboot")
97 index = self.connection.expect(['Daving current', 'Continue?'])
98 if index == 0:
99 self._cli("n") # No, don't save
100 self.connection.expect("Continue?")
101
102 # Fall through
103 self._cli("y") # Yes, continue to reset
104 self.connection.close(True)
105
106 # List the capabilities of the switch (and driver) - some things
107 # make no sense to abstract. Returns a dict of strings, each one
108 # describing an extra feature that that higher levels may care
109 # about
110 def switch_get_capabilities(self):
111 return self._capabilities
112
113 ################################
114 ### VLAN API functions
115 ################################
116
117 # Create a VLAN with the specified tag
118 def vlan_create(self, tag):
119 logging.debug("Creating VLAN %d" % tag)
120
121 try:
122 self._configure()
123 self._cli("vlan %d" % tag)
124 self._end_configure()
125
126 # Validate it happened
127 vlans = self.vlan_get_list()
128 for vlan in vlans:
129 if vlan == tag:
130 return
131 raise IOError("Failed to create VLAN %d" % tag)
132
133 except PExpectError:
134 # recurse on error
135 self._switch_connect()
136 self.vlan_create(tag)
137
138 # Destroy a VLAN with the specified tag
139 def vlan_destroy(self, tag):
140 logging.debug("Destroying VLAN %d" % tag)
141
142 try:
143 self._configure()
144 self._cli("no vlan %d" % tag)
145 self._end_configure()
146
147 # Validate it happened
148 vlans = self.vlan_get_list()
149 for vlan in vlans:
150 if vlan == tag:
151 raise IOError("Failed to destroy VLAN %d" % tag)
152
153 except PExpectError:
154 # recurse on error
155 self._switch_connect()
156 self.vlan_destroy(tag)
157
158 # Set the name of a VLAN
159 def vlan_set_name(self, tag, name):
160 logging.debug("Setting name of VLAN %d to %s" % (tag, name))
161
162 try:
163 self._configure()
164 self._cli("vlan %d" % tag)
165 self._cli("name %s" % name)
166 self._end_configure()
167
168 # Validate it happened
169 read_name = self.vlan_get_name(tag)
170 if read_name != name:
171 raise IOError("Failed to set name for VLAN %d (name found is \"%s\", not \"%s\")"
172 % (tag, read_name, name))
173
174 except:
175 # recurse on error
176 self._switch_connect()
177 self.vlan_set_name(tag, name)
178
179 # Get a list of the VLAN tags currently registered on the switch
180 def vlan_get_list(self):
181 logging.debug("Grabbing list of VLANs")
182
183 try:
184 vlans = []
185 regex = re.compile('^ *(\d+).*active')
186
187 self._cli("show vlan brief")
188 for line in self._read_long_output():
189 match = regex.match(line)
190 if match:
191 vlans.append(int(match.group(1)))
192 return vlans
193
194 except PExpectError:
195 # recurse on error
196 self._switch_connect()
197 return self.vlan_get_list()
198
199 # For a given VLAN tag, ask the switch what the associated name is
200 def vlan_get_name(self, tag):
201 logging.debug("Grabbing the name of VLAN %d" % tag)
202
203 try:
204 name = None
205 regex = re.compile('^ *\d+\s+(\S+).*(active)')
206 self._cli("show vlan id %d" % tag)
207 for line in self._read_long_output():
208 match = regex.match(line)
209 if match:
210 name = match.group(1)
211 name.strip()
212 return name
213
214 except PExpectError:
215 # recurse on error
216 self._switch_connect()
217 return self.vlan_get_name(tag)
218
219 ################################
220 ### Port API functions
221 ################################
222
223 # Set the mode of a port: access or trunk
224 def port_set_mode(self, port, mode):
225 logging.debug("Setting port %s to %s" % (port, mode))
226 if not self._is_port_mode_valid(mode):
227 raise IndexError("Port mode %s is not allowed" % mode)
228 if not self._is_port_name_valid(port):
229 raise IndexError("Port name %s not recognised" % port)
230 # This switch does not support specific modes, so we can't
231 # actually change the mode directly. However, we can and
232 # should deal with the PVID and memberships of existing VLANs
233
234 try:
235 # We define a trunk to be on *all* VLANs on the switch in
236 # tagged mode, and PVID should match the default VLAN (1).
237 if mode == "trunk":
238 # Disconnect all the untagged ports
239 read_vlans = self._port_get_all_vlans(port, 'Untagged')
240 for vlan in read_vlans:
241 self._port_remove_general_vlan(port, vlan)
242
243 # And move to VLAN 1
244 self.port_add_trunk_to_vlan(port, 1)
245 self._set_pvid(port, 1)
246
247 # And an access port should only be on one VLAN. Move to
248 # VLAN 1, untagged, and set PVID there.
249 if mode == "access":
250 # Disconnect all the ports
251 read_vlans = self._port_get_all_vlans(port, 'Untagged')
252 for vlan in read_vlans:
253 self._port_remove_general_vlan(port, vlan)
254 read_vlans = self._port_get_all_vlans(port, 'Tagged')
255 for vlan in read_vlans:
256 self._port_remove_general_vlan(port, vlan)
257
258 # And move to VLAN 1
259 self.port_set_access_vlan(port, 1)
260 self._set_pvid(port, 1)
261
262 except:
263 # recurse on error
264 self._switch_connect()
265 self.port_set_mode(port, mode)
266
267 # Get the mode of a port: access or trunk
268 def port_get_mode(self, port):
269 logging.debug("Getting mode of port %s" % port)
270 mode = ''
271 if not self._is_port_name_valid(port):
272 raise IndexError("Port name %s not recognised" % port)
273
274 # This switch does not support specific modes, so we have to
275 # make stuff up here. We define trunk ports to be on (1 or
276 # many) tagged VLANs, anything not tagged to be access.
277 read_vlans = self._port_get_all_vlans(port, 'Tagged')
278 if len(read_vlans) > 0:
279 return "trunk"
280 else:
281 return "access"
282
283 # Set an access port to be in a specified VLAN (tag)
284 def port_set_access_vlan(self, port, tag):
285 logging.debug("Setting access port %s to VLAN %d" % (port, tag))
286 if not self._is_port_name_valid(port):
287 raise IndexError("Port name %s not recognised" % port)
288 # Does the VLAN already exist?
289 vlan_list = self.vlan_get_list()
290 if not tag in vlan_list:
291 raise IndexError("VLAN tag %d not recognised" % tag)
292
293 try:
294 # Add the new VLAN
295 self._configure()
296 self._cli("interface %s" % self._long_port_name(port))
297 self._cli("switchport general allowed vlan %d untagged" % tag)
298 self._cli("no shutdown")
299 self._end_configure()
300
301 self._set_pvid(port, tag)
302
303 # Now drop all the other VLANs
304 read_vlans = self._port_get_all_vlans(port, 'Untagged')
305 for vlan in read_vlans:
306 if vlan != tag:
307 self._port_remove_general_vlan(port, vlan)
308
309 # Finally, validate things worked
310 read_vlan = int(self.port_get_access_vlan(port))
311 if read_vlan != tag:
312 raise IOError("Failed to move access port %s to VLAN %d - got VLAN %d instead"
313 % (port, tag, read_vlan))
314
315 except PExpectError:
316 # recurse on error
317 self._switch_connect()
318 self.port_set_access_vlan(port, tag)
319
320 # Add a trunk port to a specified VLAN (tag)
321 def port_add_trunk_to_vlan(self, port, tag):
322 logging.debug("Adding trunk port %s to VLAN %d" % (port, tag))
323 if not self._is_port_name_valid(port):
324 raise IndexError("Port name %s not recognised" % port)
325 try:
326 self._configure()
327 self._cli("interface %s" % self._long_port_name(port))
328 self._cli("switchport general allowed vlan %d tagged" % tag)
329 self._end_configure()
330
331 # Validate it happened
332 read_vlans = self.port_get_trunk_vlan_list(port)
333 for vlan in read_vlans:
334 if vlan == tag or vlan == "ALL":
335 return
336 raise IOError("Failed to add trunk port %s to VLAN %d" % (port, tag))
337
338 except PExpectError:
339 # recurse on error
340 self._switch_connect()
341 self.port_add_trunk_to_vlan(port, tag)
342
343 # Remove a trunk port from a specified VLAN (tag)
344 def port_remove_trunk_from_vlan(self, port, tag):
345 logging.debug("Removing trunk port %s from VLAN %d" % (port, tag))
346 if not self._is_port_name_valid(port):
347 raise IndexError("Port name %s not recognised" % port)
348
349 try:
350 self._configure()
351 self._cli("interface %s" % self._long_port_name(port))
352 self._cli("no switchport general allowed vlan %d" % tag)
353 self._end_configure()
354
355 # Validate it happened
356 read_vlans = self.port_get_trunk_vlan_list(port)
357 for vlan in read_vlans:
358 if vlan == tag:
359 raise IOError("Failed to remove trunk port %s from VLAN %d" % (port, tag))
360
361 except PExpectError:
362 # recurse on error
363 self._switch_connect()
364 self.port_remove_trunk_from_vlan(port, tag)
365
366 # Get the configured VLAN tag for an access port (i.e. a port not
367 # configured for tagged egress)
368 def port_get_access_vlan(self, port):
369 logging.debug("Getting VLAN for access port %s" % port)
370 vlan = 1
371 if not self._is_port_name_valid(port):
372 raise IndexError("Port name %s not recognised" % port)
373 regex = re.compile('(\d+)\s+.*Untagged')
374
375 try:
376 self._cli("show interface switchport %s" % self._long_port_name(port))
377 for line in self._read_long_output():
378 match = regex.match(line)
379 if match:
380 vlan = match.group(1)
381 return int(vlan)
382
383 except PExpectError:
384 # recurse on error
385 self._switch_connect()
386 return self.port_get_access_vlan(port)
387
388 # Get the list of configured VLAN tags for a trunk port
389 def port_get_trunk_vlan_list(self, port):
390 logging.debug("Getting VLANs for trunk port %s" % port)
391 vlans = [ ]
392 if not self._is_port_name_valid(port):
393 raise IndexError("Port name %s not recognised" % port)
394
395 return self._port_get_all_vlans(port, 'Tagged')
396
397 ################################
398 ### Internal functions
399 ################################
400
401 # Connect to the switch and log in
402 def _switch_connect(self):
403
404 if not self.connection is None:
405 self.connection.close(True)
406 self.connection = None
407
408 logging.debug("Connecting to Switch with: %s" % self.exec_string)
409 self.connection = pexpect.spawn(self.exec_string, logfile = self.logfile)
410 self._login()
411
412 # No way to avoid paged output on this switch AFAICS
413
414 # And grab details about the switch. in case we need it
415 self._get_systemdata()
416
417 # And also validate them - make sure we're driving a switch of
418 # the correct model! Also store the serial number
419 descr_regex = re.compile('Hardware Version\s+ - (.*)')
420 descr = ""
421
422 for line in self._systemdata:
423 match = descr_regex.match(line)
424 if match:
425 descr = match.group(1)
426
427 logging.debug("system description is %s", descr)
428
429 if not self._expected_descr_re.match(descr):
430 raise IOError("Switch %s not recognised by this driver: abort" % descr)
431
432 # Now build a list of our ports, for later sanity checking
433 self._ports = self._get_port_names()
434 if len(self._ports) < 4:
435 raise IOError("Not enough ports detected - problem!")
436
437 def _login(self):
438 logging.debug("attempting login with username \"%s\", password \"%s\", enable_password \"%s\"", self._username, self._password, self._enable_password)
439 self.connection.expect('User Access Login')
440 if self._username is not None:
441 self.connection.expect("User:")
442 self._cli("%s" % self._username)
443 if self._password is not None:
444 self.connection.expect("Password:")
445 self._cli("%s" % self._password, False)
446 while True:
447 index = self.connection.expect(['User Name:', 'Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
448 if index != 4: # Any other means: failed to log in!
449 logging.error("Login failure: index %d\n" % index)
450 logging.error("Login failure: %s\n" % self.connection.match.before)
451 raise IOError
452
453 # else
454 self._prompt_name = self.connection.match.group(1).strip()
455 if self.connection.match.group(2) == ">":
456 # Need to enter "enable" mode too
457 self._cli("")
458 self._cli("enable")
459 if self._enable_password is not None and len(self._enable_password) > 0:
460 self.connection.expect("Password:")
461 self._cli("%s" % self._enable_password, False)
462 index = self.connection.expect(['Password:', 'Bad passwords', 'authentication failed', r'(.*)(#|>)'])
463 if index != 3: # Any other means: failed to log in!
464 logging.error("Enable password failure: %s\n" % self.connection.match)
465 raise IOError
466 return 0
467
468 def _logout(self):
469 logging.debug("Logging out")
470 self._cli("exit", False)
471
472 def _configure(self):
473 self._cli("configure")
474
475 def _end_configure(self):
476 self._cli("end")
477
478 def _read_long_output(self):
479 buf = []
480 prompt = self._prompt_name + '#'
481 while True:
482 try:
483 index = self.connection.expect(['^Press any key to continue', prompt])
484 if index == 0: # "Press any key to continue (Q to quit)"
485 for line in self.connection.before.split('\r\n'):
486 line1 = re.sub('(\x08|\x0D)*', '', line.strip())
487 buf.append(line1)
488 self._cli(' ', False)
489 elif index == 1: # Back to a prompt, says output is finished
490 break
491 except (pexpect.EOF, pexpect.TIMEOUT):
492 # Something went wrong; logout, log in and try again!
493 logging.error("PEXPECT FAILURE, RECONNECT")
494 self.errors.log_error_in(text)
495 raise PExpectError("_read_long_output failed")
496 except:
497 logging.error("prompt is \"%s\"", prompt)
498 raise
499
500 for line in self.connection.before.split('\r\n'):
501 line1 = re.sub('(\x08|\x0D)*', '', line.strip())
502 buf.append(line1)
503
504 return buf
505
506 def _long_port_name(self, port):
507 return re.sub('Gi', 'gigabitEthernet ', port)
508
509 def _set_pvid(self, port, pvid):
510 # Set a port's PVID
511 self._configure()
512 self._cli("interface %s" % self._long_port_name(port))
513 self._cli("switchport pvid %d" % pvid)
514 self._end_configure()
515
516 def _port_get_all_vlans(self, port, type):
517 vlans = []
518 regex = re.compile('(\d+)\s+.*' + type)
519 self._cli("show interface switchport %s" % self._long_port_name(port))
520 for line in self._read_long_output():
521 match = regex.match(line)
522 if match:
523 vlan = match.group(1)
524 vlans.append(int(vlan))
525 return vlans
526
527 def _port_remove_general_vlan(self, port, tag):
528 try:
529 self._configure()
530 self._cli("interface %s" % self._long_port_name(port))
531 self._cli("no switchport general allowed vlan %d" % tag)
532 self._end_configure()
533
534 except PExpectError:
535 # recurse on error
536 self._switch_connect()
537 return self._port_remove_general_vlan(port, tag)
538
539 def _get_port_names(self):
540 logging.debug("Grabbing list of ports")
541 interfaces = []
542
543 # Use "Link" to only identify lines in the output that match
544 # interfaces that exist - it'll match "LinkUp" and "LinkDown"
545 regex = re.compile('^\s*([a-zA-Z0-9_/]*).*Link')
546
547 try:
548 self._cli("show interface status")
549 for line in self._read_long_output():
550 match = regex.match(line)
551 if match:
552 interface = match.group(1)
553 interfaces.append(interface)
554 return interfaces
555
556 except PExpectError:
557 # recurse on error
558 self._switch_connect()
559 return self._get_port_names()
560
561 def _show_config(self):
562 logging.debug("Grabbing config")
563 self._cli("show running-config")
564 return self._read_long_output()
565
566 def _show_clock(self):
567 logging.debug("Grabbing time")
568 self._cli("show system-time")
569 return self._read_long_output()
570
571 def _get_systemdata(self):
572 logging.debug("Grabbing system sw and hw versions")
573
574 self._cli("show system-info")
575 self._systemdata = []
576 for line in self._read_long_output():
577 self._systemdata.append(line)
578
579 # Wrapper around connection.send - by default, expect() the same
580 # text we've sent, to remove it from the output from the
581 # switch. For the few cases where we don't need that, override
582 # this using echo=False.
583 # Horrible, but seems to work.
584 def _cli(self, text, echo=True):
585 self.connection.send(text + '\r')
586 if echo:
587 self.connection.expect(text)
588
589if __name__ == "__main__":
590 import optparse
591
592 switch = '10.172.2.50'
593 parser = optparse.OptionParser()
594 parser.add_option("--switch",
595 dest = "switch",
596 action = "store",
597 nargs = 1,
598 type = "string",
599 help = "specify switch to connect to for testing",
600 metavar = "<switch>")
601 (opts, args) = parser.parse_args()
602 if opts.switch:
603 switch = opts.switch
604
605 p = TPLinkTLSG2XXX(switch, 23, debug = False)
606 p.switch_connect('admin', 'admin', None)
607
608 print "Ports are:"
609 buf = p.switch_get_port_names()
610 p.dump_list(buf)
611
612 print "VLANs are:"
613 buf = p.vlan_get_list()
614 p.dump_list(buf)
615
616 buf = p.vlan_get_name(2)
617 print "VLAN 2 is named \"%s\"" % buf
618
619 print "Create VLAN 3"
620 p.vlan_create(3)
621
622 buf = p.vlan_get_name(3)
623 print "VLAN 3 is named \"%s\"" % buf
624
625 print "Set name of VLAN 3 to test333"
626 p.vlan_set_name(3, "test333")
627
628 buf = p.vlan_get_name(3)
629 print "VLAN 3 is named \"%s\"" % buf
630
631 print "VLANs are:"
632 buf = p.vlan_get_list()
633 p.dump_list(buf)
634
635 print "Destroy VLAN 3"
636 p.vlan_destroy(3)
637
638 print "VLANs are:"
639 buf = p.vlan_get_list()
640 p.dump_list(buf)
641
642 buf = p.port_get_mode("Gi1/0/10")
643 print "Port Gi1/0/10 is in %s mode" % buf
644
645 buf = p.port_get_mode("Gi1/0/11")
646 print "Port Gi1/0/11 is in %s mode" % buf
647
648 # Test access stuff
649 buf = p.port_get_mode("Gi1/0/9")
650 print "Port Gi1/0/9 is in %s mode" % buf
651
652 print "Set Gi1/0/9 to access mode"
653 p.port_set_mode("Gi1/0/9", "access")
654
655 print "Move Gi1/0/9 to VLAN 4"
656 p.port_set_access_vlan("Gi1/0/9", 4)
657
658 buf = p.port_get_access_vlan("Gi1/0/9")
659 print "Read from switch: Gi1/0/9 is on VLAN %s" % buf
660
661 print "Move Gi1/0/9 back to VLAN 1"
662 p.port_set_access_vlan("Gi1/0/9", 1)
663
664 # Test access stuff
665 print "Set Gi1/0/9 to trunk mode"
666 p.port_set_mode("Gi1/0/9", "trunk")
667 print "Read from switch: which VLANs is Gi1/0/9 on?"
668 buf = p.port_get_trunk_vlan_list("Gi1/0/9")
669 p.dump_list(buf)
670 print "Add Gi1/0/9 to VLAN 2"
671 p.port_add_trunk_to_vlan("Gi1/0/9", 2)
672 print "Add Gi1/0/9 to VLAN 3"
673 p.port_add_trunk_to_vlan("Gi1/0/9", 3)
674 print "Add Gi1/0/9 to VLAN 4"
675 p.port_add_trunk_to_vlan("Gi1/0/9", 4)
676 print "Read from switch: which VLANs is Gi1/0/9 on?"
677 buf = p.port_get_trunk_vlan_list("Gi1/0/9")
678 p.dump_list(buf)
679
680 p.port_remove_trunk_from_vlan("Gi1/0/9", 3)
681 p.port_remove_trunk_from_vlan("Gi1/0/9", 3)
682 p.port_remove_trunk_from_vlan("Gi1/0/9", 4)
683 print "Read from switch: which VLANs is Gi1/0/9 on?"
684 buf = p.port_get_trunk_vlan_list("Gi1/0/9")
685 p.dump_list(buf)
686
687
688 p.switch_save_running_config()
689
690 p.switch_disconnect()
691# p._show_config()