#! /usr/bin/python # Copyright 2014 Linaro Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # # VLANd admin interface # import os, sys import optparse vlandpath = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) sys.path.insert(0, vlandpath) from errors import InputError, SocketError from config.config import VlanConfig from ipc.ipc import VlanIpc version = "0.0.0-DEV" banner = "Linaro VLANd admin interface, version %s" % version def is_positive(text): if text in ('1', 'y', 'Y', 't', 'T', 'True', 'true'): return True elif text in ('0', 'n', 'N', 'f', 'F', 'False', 'false'): return False else: raise InputError("Cannot parse \"%s\" as True or False" % text) def dump_switch(switch): print switch def dump_port(port): print port def dump_vlan(vlan): print vlan def call_vland(msgtype, msg): ipc = VlanIpc() ipc.client_connect('localhost', config.vland.port) msg['client_name'] = 'admin.py' msg['type'] = msgtype # print 'Sending:' # print msg ipc.client_send(msg) ret = ipc.client_recv_and_close() if 'response' not in ret: raise SocketError("Badly-formed response from VLANd server") if ret['response'] == "ERROR": raise InputError("VLANd server said: %s" % ret['error']) # print 'VLANd said in reply:' # print ret return ret['data'] #print '%s' % banner config = VlanConfig(filenames=('./vland.cfg',)) usage = 'Usage: %prog --command [command options]' parser = optparse.OptionParser(usage=usage, description=banner) # System commands system_group = optparse.OptionGroup(parser, "System commands") system_group.add_option("--status", dest="status", action = "store_true", default = False, help = "Describe current system status") system_group.add_option("--statistics", dest="statistics", action = "store_true", default = False, help = "Print some system statistics") system_group.add_option("--version", dest="version", action = "store_true", default = False, help = "Describe the version of this admin interface") system_group.add_option("--vland_version", dest="vland_version", action = "store_true", default = False, help = "Describe the version of the running VLANd") parser.add_option_group(system_group) # May add shutdown, self-check etc. later # Switch commands switch_group = optparse.OptionGroup(parser, "Switch commands") switch_group.add_option("--list_all_switches", dest = "list_all_switches", action = "store_true", default = False, help = "List all the existing switches in the system") switch_group.add_option("--create_switch", dest = "create_switch", action = "store", type = "string", help = "Add a new switch to the system", nargs = 1, metavar = "") switch_group.add_option("--delete_switch", dest = "delete_switch", action = "store", type = "int", help = "Remove an existing switch from the system", default = None, nargs = 1, metavar = "") switch_group.add_option("--show_switch", dest = "show_switch", action = "store", type = "int", help = "Show the details of an existing switch in the system", default = None, nargs = 1, metavar = "") switch_group.add_option("--lookup_switch_by_name", dest = "lookup_switch_by_name", action = "store", type = "string", help = "Lookup a switch ID by name", nargs = 1, metavar = "") switch_group.add_option("--list_switch_ports", dest = "list_switch_ports", action = "store", type = "int", help = "List the IDs of the ports on an existing switch in the system", default = None, nargs = 1, metavar = "") parser.add_option_group(switch_group) # Port commands port_group = optparse.OptionGroup(parser, "Port commands") port_group.add_option("--list_all_ports", dest = "list_all_ports", action = "store_true", default = False, help = "List all the existing ports in the system") port_group.add_option("--create_port", dest = "create_port", action = "store", type = "string", help = "Add a new port to the system", nargs = 2, metavar = " ") port_group.add_option("--delete_port", dest = "delete_port", action = "store", type = "int", help = "Remove an existing port from the system", default = None, nargs = 1, metavar = "") port_group.add_option("--show_port", dest = "show_port", action = "store", type = "int", help = "Show the details of an existing port in the system", default = None, nargs = 1, metavar = "") port_group.add_option("--lookup_port_by_switch_and_name", dest = "lookup_port_by_switch_and_name", action = "store", type = "string", help = "Lookup a port ID by switch and port name", nargs = 2, metavar = " ") port_group.add_option("--set_port_mode", dest = "set_port_mode", action = "store", type = "string", help = "Set the mode of a port to 'trunk' or 'access'", nargs = 2, metavar = " ") port_group.add_option("--lock_port", dest = "lock_port", action = "store", type = "string", help = "Lock the settings on a port", nargs = 1, metavar = "") port_group.add_option("--unlock_port", dest = "unlock_port", action = "store", type = "string", help = "Unock the settings on a port", nargs = 1, metavar = "") port_group.add_option("--set_port_current_vlan", dest = "set_port_current_vlan", action = "store", type = "int", help = "Set the current VLAN assignment for a port", nargs = 2, metavar = " ") port_group.add_option("--get_port_current_vlan", dest = "get_port_current_vlan", action = "store", type = "int", help = "Get the current VLAN assignment for a port", nargs = 1, metavar = "") port_group.add_option("--set_port_base_vlan", dest = "set_port_base_vlan", action = "store", type = "int", help = "Set the base VLAN assignment for a port", nargs = 2, metavar = " ") port_group.add_option("--get_port_base_vlan", dest = "get_port_base_vlan", action = "store", type = "int", help = "Get the base VLAN assignment for a port", nargs = 1, metavar = "") port_group.add_option("--restore_port_to_base_vlan", dest = "restore_port_to_base_vlan", action = "store", type = "int", help = "Reset the port back to its base VLAN", nargs = 1, metavar = "") parser.add_option_group(port_group) # VLAN commands vlan_group = optparse.OptionGroup(parser, "VLAN commands") vlan_group.add_option("--list_all_vlans", dest = "list_all_vlans", action = "store_true", default = False, help = "List all the existing vlans in the system") vlan_group.add_option("--create_vlan", dest = "create_vlan", action = "store", type = "string", help = "Add a new vlan to the system", nargs = 3, metavar = " ") vlan_group.add_option("--delete_vlan", dest = "delete_vlan", action = "store", type = "int", help = "Remove an existing vlan from the system", default = None, nargs = 1, metavar = "") vlan_group.add_option("--show_vlan", dest = "show_vlan", action = "store", type = "int", help = "Show the details of an existing vlan in the system", default = None, nargs = 1, metavar = "") parser.add_option_group(vlan_group) (opts, args) = parser.parse_args() if opts.list_all_switches: result = call_vland('db_query', {'command':'db.all_switches', 'data':None}) count = 0 for line in result: print line count += 1 print '%d entries' % count elif opts.list_all_ports: result = call_vland('db_query', {'command':'db.all_ports', 'data':None}) count = 0 for line in result: print line count += 1 print '%d entries' % count elif opts.list_all_vlans: result = call_vland('db_query', {'command':'db.all_vlans', 'data':None}) count = 0 for line in result: print line count += 1 print '%d entries' % count elif opts.create_switch is not None: try: switch_id = call_vland('db_update', {'command':'db.create_switch', 'data': {'name':opts.create_switch}}) print 'Created switch_id %d' % switch_id except InputError as inst: print 'Failed: %s' % inst elif opts.create_port is not None: try: port_id = call_vland('db_update', {'command':'db.create_port', 'data': {'switch_id': opts.create_port[0], 'name': opts.create_port[1]}}) print 'Created port_id %d' % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.create_vlan is not None: try: vlan_id = call_vland('vlan_update', {'command':'api.create_vlan', 'data': {'name': opts.create_vlan[0], 'tag': opts.create_vlan[1], 'is_base_vlan': is_positive(opts.create_vlan[2])}}) print 'Created vlan_id %d' % vlan_id except InputError as inst: print 'Failed: %s' % inst elif opts.delete_switch is not None: try: switch_id = call_vland('db_update', {'command':'db.delete_switch', 'data': {'switch_id': opts.delete_switch}}) print 'Deleted switch_id %d' % switch_id except InputError as inst: print 'Failed: %s' % inst elif opts.delete_port is not None: try: port_id = call_vland('db_update', {'command':'db.delete_port', 'data': {'port_id': opts.delete_port}}) except InputError as inst: print 'Failed: %s' % inst elif opts.delete_vlan is not None: try: vlan_id = call_vland('vlan_update', {'command':'api.delete_vlan', 'data': {'vlan_id': opts.delete_vlan}}) print 'Deleted vlan_id %d' % vlan_id except InputError as inst: print 'Failed: %s' % inst elif opts.lookup_switch_by_name is not None: try: switch_id = call_vland('db_query', {'command':'db.get_switch_id_by_name', 'data':{'name':opts.lookup_switch_by_name}}) if switch_id is not None: print '%d' % switch_id else: print 'No switch found for name %s' % opts.lookup_switch_by_name except InputError as inst: print 'Failed: %s' % inst elif opts.show_switch is not None: try: this_switch = call_vland('db_query', {'command':'db.get_switch_by_id', 'data': {'switch_id': opts.show_switch}}) if this_switch is not None: dump_switch(this_switch) else: print 'No switch found for switch_id %d' % opts.show_switch except InputError as inst: print 'Failed: %s' % inst elif opts.list_switch_ports is not None: try: ports = call_vland('db_query', {'command':'db.get_ports_by_switch', 'data': {'switch_id': opts.list_switch_ports}}) if ports is not None: for p in ports: dump_port(p) else: print 'No ports found for switch_id %d' % opts.list_switch_ports except InputError as inst: print 'Failed: %s' % inst elif opts.show_port is not None: try: this_port = call_vland('db_query', {'command':'db.get_port_by_id', 'data': {'port_id': opts.show_port}}) if this_port is not None: dump_port(this_port) else: print 'No port found for port_id %d' % opts.show_port except InputError as inst: print 'Failed: %s' % inst elif opts.lookup_port_by_switch_and_name is not None: try: p = call_vland('db_query', {'command':'db.get_port_by_switch_and_name', 'data': {'switch_id': opts.lookup_port_by_switch_and_name[0], 'name': opts.lookup_port_by_switch_and_name[1]}}) if p is not None: print p else: print 'No port found for switch_id %d, name %s' % (int(opts.lookup_port_by_switch_and_name[0]), opts.lookup_port_by_switch_and_name[1]) except InputError as inst: print 'Failed: %s' % inst elif opts.set_port_mode is not None: try: port_id = call_vland('vlan_update', {'command':'api.set_port_mode', 'data': {'port_id': opts.set_port_mode[0], 'mode': opts.set_port_mode[1]}}) print "Updated mode for port_id %d" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.lock_port is not None: try: port_id = call_vland('db_update', {'command':'db.set_port_is_locked', 'data': {'port_id': opts.lock_port, 'is_locked': True}}) print "Locked port_id %d" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.unlock_port is not None: try: port_id = call_vland('db_update', {'command':'db.set_port_is_locked', 'data': {'port_id': opts.unlock_port, 'is_locked': False}}) print "Unlocked port_id %d" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.set_port_current_vlan is not None: try: port_id = call_vland('vlan_update', {'command':'api.set_current_vlan', 'data': {'port_id': opts.set_current_vlan[0], 'vlan_id': opts.set_current_vlan[1]}}) print "Set current VLAN on port_id %d" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.get_port_current_vlan is not None: try: vlan_id = call_vland('db_query', {'command':'db.get_current_vlan_id_by_port', 'data': {'port_id': opts.get_port_current_vlan}}) if vlan_id is not None: print vlan_id else: print "No current_vlan_id found for port_id %d" % opts.get_port_current_vlan except InputError as inst: print 'Failed: %s' % inst elif opts.set_port_base_vlan is not None: try: port_id = call_vland('db_update', {'command':'db.set_base_vlan', 'data': {'port_id': opts.set_base_vlan[0], 'base_vlan_id': opts.set_base_vlan[1]}}) print "Set base VLAN on port_id %d" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.get_port_base_vlan is not None: try: vlan_id = call_vland('db_query', {'command':'db.get_base_vlan_id_by_port', 'data': {'port_id': opts.get_port_base_vlan}}) if vlan_id is not None: print vlan_id else: print "No base_vlan_id found for port_id %d" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.restore_port_to_base_vlan is not None: try: port_id = call_vland('vlan_update', {'command': 'api.restore_base_vlan', 'data': {'port_id': opts.restore_port_to_base_vlan}}) print "Restored port_id %d back to base VLAN" % port_id except InputError as inst: print 'Failed: %s' % inst elif opts.show_vlan is not None: try: v = call_vland('db_query', {'command':'db.get_vlan_by_id', 'data': {'vlan_id': opts.show_vlan}}) if v is not None: dump_vlan(v) else: print 'No vlan found for vlan_id %d' % opts.show_vlan except InputError as inst: print 'Failed: %s' % inst elif opts.status: print 'Config:' print ' knows about %d switch(es)' % len(config.switches) default_vlan_id = call_vland('db_query', {'command':'db.get_vlan_id_by_tag', 'data': {'tag': config.vland.default_vlan_tag}}) print 'The default vlan tag (%d) is vlan_id %d' % (config.vland.default_vlan_tag, default_vlan_id) stat = call_vland('daemon_query', {'command':'daemon.status', 'data': None}) print 'VLANd is running %s' % stat['running'] print 'DB via VLANd:' switches = call_vland('db_query', {'command':'db.all_switches', 'data':None}) print ' knows about %d switch(es)' % len(switches) ports = call_vland('db_query', {'command':'db.all_ports', 'data':None}) print ' knows about %d port(s)' % len(ports) vlans = call_vland('db_query', {'command':'db.all_vlans', 'data':None}) print ' DB knows about %d vlan(s)' % len(vlans) elif opts.vland_version: ver = call_vland('daemon_query', {'command':'daemon.version', 'data': None}) print 'VLANd version %s' % ver['version'] elif opts.statistics: stats = call_vland('daemon_query', {'command':'daemon.statistics', 'data': None}) print 'VLANd uptime: %d seconds' % stats['uptime'] elif opts.version: print 'VLANd admin interface version %s' % version else: print 'No recognised command given. Try -h for help'