Add debugsign and test suite

git-svn-id: https://llvm.org/svn/llvm-project/zorg/trunk@320453 91177308-0d34-0410-b5e6-96231b3b80d8
diff --git a/codesign/debugsign/dbsign/commands.py b/codesign/debugsign/dbsign/commands.py
new file mode 100644
index 0000000..02f6de7
--- /dev/null
+++ b/codesign/debugsign/dbsign/commands.py
@@ -0,0 +1,422 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.commands
+"""
+
+from __future__ import print_function
+
+import os
+import sys
+
+from dbsign.ansi import ERROR, INFO, OK, WARN
+import dbsign.logger as L
+import dbsign.security as S
+import dbsign.shell as sh
+
+
+#
+# Globals and configurables
+#
+
+OVERVIEW_TEXT = '''\
+
+{1}
+OVERVIEW:
+
+    To configure code signing on a new system, do the following (in order):
+
+        {0} setup
+        {0} import P12_FILE     # MUST BE DONE FROM GUI CONSOLE!
+
+    To verify the configuration:
+
+        {0} check               # Not foolproof, but catches most issues
+
+    To enable access to the identity for code signing (eg, from Jenkins job):
+
+        {0} prep
+
+    To replace the configured identity with a new one:
+
+        {0} remove              # Removes the whole keychain!
+        {0} import NEW_P12      # Must be done from GUI console!
+
+    Note that this script currently assumes the following:
+
+      * The identity's common name will be "lldb_codesign"
+      * The keychain will be named "lldb_codesign"
+      * The keychain will be locked using the password "lldb_codesign"
+      * The P12 archive will be encrypted with the password "lldb_codesign"
+
+    This is intended to make it trivial to codesign utilities using the
+    imported certificate, without exposing any local account information
+    (eg, user's login keychain password). Please take these factors into
+    account when evaluating security.
+'''
+
+log = L.get_logger(__name__)
+
+CFG = {
+    'debug':        False,
+    'executable':   os.path.basename(sys.argv[0]),
+    'identity':     'lldb_codesign',
+    'id_file':      None,       # from command line argument
+    'keynick':      'lldb',
+    'keydb':        None,
+    'keypass':      'lldb_codesign',
+    'privileges':   ['system.privilege.taskport'],
+}
+
+
+#
+# Top-Level Commands
+#
+
+def cmd_check():  # type: () -> int
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    identity = CFG['identity']
+    exe = CFG['executable']
+
+    for priv in CFG['privileges']:
+        print('Verifying privilege {} ... '.format(priv), end='')
+        res_priv = S.verify_privilege(priv)
+        if res_priv:
+            print(OK('OK'))
+        else:
+            print(WARN('NOT SET'))
+            log.debug(res_priv.value)
+            print(WARN('WARNING'), 'Privileges have not been set.')
+            print(INFO('To set, run: {} --unsafe setup'.format(exe)))
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_unlock.value)
+        print(WARN('WARNING'), 'Keychain not configured.')
+        print(INFO('Please run: {} setup'.format(exe)))
+        return 1
+
+    print("Verifying keychain ... ", end='')
+    res_find = S.keychain_exists(keydb)
+    if res_find:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_find)
+        print(INFO(res_find.value))
+        return 2
+
+    print("Searching for identity in keychain ... ", end='')
+    res_find = S.identity_installed(identity, keydb)
+    if res_find:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_find)
+        print(WARN('WARNING'), res_find.value)
+        print(INFO("Please run: {} import".format(exe)))
+        return 3
+
+    print('Verifying identity ... ', end='')
+    res_id = S.verify_identity(identity, keydb)
+    if res_id:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_id.value)
+        print(WARN('WARNING'), "Unable to verify identity")
+        print(INFO('Please run: {} import'.format(exe)))
+        return 4
+
+    return 0
+
+
+def cmd_clean():  # type: () -> int
+    identity = CFG['identity']
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(WARN('FAILED'))
+        log.debug(res_unlock.value)
+        print(INFO('Failed to unlock keychain.'))
+
+    print('Removing identity and trust settings ... ', end='')
+    res_id = S.delete_identity(identity, keydb)
+    if res_id:
+        print(OK('OK'))
+    else:
+        print(WARN('Failed to remove identity'))
+        log.debug(res_id.value)
+
+    print('Backing up and removing keychain ... ', end='')
+    res_key = S.delete_keychain(keydb, backup=True)
+    if res_key:
+        print(OK('OK'))
+    else:
+        print(WARN('Failed to remove keychain'))
+        log.debug(res_id.value)
+
+    return 0
+
+
+def cmd_help(parser):  # type: (argparse.ArgumentParser) -> int
+    print(OVERVIEW_TEXT.format(
+        CFG['executable'],
+        parser.format_help()))
+    return 0
+
+
+def cmd_import():  # type: () -> int
+    exe = CFG['executable']
+    identity = CFG['identity']
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    id_file = CFG['id_file']
+    id_pass = identity
+
+    _auth_sudo()
+
+    if 'SSH_CONNECTION' in os.environ or 'TERM_SESSION_ID' not in os.environ:
+        print(WARN('WARNING!'), "Remote console session detected!",
+              "This procedure must be performed from the system console.")
+
+    print('Verifying privileges ... ', end='')
+    res_verify_privs = S.verify_privileges(CFG['privileges'])
+    if res_verify_privs:
+        print(OK('OK'))
+    else:
+        print(WARN("WARNING"))
+        log.debug(res_verify_privs)
+        print(WARN("Privileges have not been set. Trust may fail."))
+        print(INFO("To set privileges, run: {} --unsafe setup".format(exe)))
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_unlock)
+        print(INFO("Failed to unlock keychain."),
+              "Run: {} check".format(exe))
+        return 1
+
+    print("Importing new identity {} ... ".format(identity), end='')
+    res_import = S.import_identity(keydb, keypass, identity, id_file, id_pass)
+    if res_import:
+        print(OK('OK'))
+        log.debug(res_import.value)
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_import)
+        print(ERROR('ERROR'), res_import.value)
+        if 'exists' in res_import.value:
+            print(WARN("To remove existing identity:"),
+                  "{} remove".format(exe))
+        return 2
+
+    print(WARN("This will test codesigning with the configured identity"))
+    print(WARN("Please authenticate (if requested) and click 'Always Allow'"))
+
+    print("Trusting identity ... ", end='')
+    res_trust = S.trust_identity(identity, keydb)
+    if res_trust:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_trust)
+        print(INFO("Trust unsuccessful:"), res_trust.value)
+        if 'unknown error' in res_trust.value:
+            print(WARN("Please ensure this step is performed"
+                       " from the system console!"))
+
+        print("Rolling back imported identity ... ", end='')
+        res_remove = S.delete_identity(identity, keydb)
+        if res_remove:
+            print(OK('OK'))
+        else:
+            print(ERROR('FAILED'))
+            log.debug(res_remove)
+            print(res_trust.value)
+            return 4
+        return 3
+
+    return 0
+
+
+def cmd_lint():  # type: () -> int
+    print(OK('Running linters... '))
+    lint_problems = _run_linter()
+    if lint_problems:
+        print(WARN('Lint:'), len(lint_problems))
+        map(log.warn, lint_problems)
+
+    return len(lint_problems)
+
+
+def cmd_prep():  # type: () -> int
+    """Deliberately terse method for use in CI"""
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if not res_unlock:
+        log.debug(res_unlock.value)
+        print(ERROR('ERROR'), 'Unable to access signing identity')
+        return 1
+
+    return 0
+
+
+def cmd_remove():  # type: () -> int
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    identity = CFG['identity']
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        log.debug(res_unlock.value)
+        print(ERROR('ERROR'), 'Failed to unlock keychain')
+
+    print("Removing identity from keychain ... ", end='')
+    res_rm_id = S.delete_identity(identity, keydb)
+    if res_rm_id:
+        print(OK('OK'))
+    else:
+        print(WARN('FAILED'))
+        log.debug(res_rm_id)
+        print(WARN('WARNING'), "Failed to delete identity from keychain.")
+        print(INFO(res_rm_id.value))
+
+    return 0
+
+
+def cmd_setup():  # type: () -> int
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    exe = CFG['executable']
+
+    print("Configuring keychain ... ", end='')
+    res_create = S.create_keychain(keydb, keypass)
+    if res_create:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_create)
+        print(INFO('Keychain creation failed'))
+        return 1
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_unlock)
+        if 'keychain could not be found' in res_unlock.value:
+            print(INFO("Keychain creation failed"))
+        else:
+            print(INFO("Failed to unlock keychain"))
+        print(INFO(res_unlock.value))
+        return 2
+
+    print("Adding keychain to search list ... ", end='')
+    res_searchable = S.add_to_search_list(keydb)
+    if res_searchable:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_searchable)
+        print(INFO("Failed to add keychain to search list"))
+        print(WARN("codesign will not be able to find the signing identity."))
+        return 3
+
+    privs = CFG['privileges']
+    print("Checking privileges ... ", end='')
+    if S.verify_privileges(privs):
+        print(OK('OK'))
+    else:
+        print(INFO('NOT SET'))
+
+        _auth_sudo()
+        if not os.getenv(S.UNSAFE_FLAG, False):
+            print(INFO('NOTE'), 'Altering privileges may not be safe.')
+            print(INFO('NOTE'), 'Re-run with the --unsafe flag to enable.')
+        else:
+            priv_value = 'allow'
+            for priv in CFG['privileges']:
+                print('Setting privilege {} ... '.format(priv), end='')
+                res_priv = S.authdb_privilege_write(priv, priv_value)
+                if res_priv:
+                    print(OK('OK'))
+                else:
+                    print(INFO('not set'))
+                    log.debug(res_priv.value)
+                    print(INFO('Privileges have not been set.'))
+                    print(INFO('Please re-run: {} setup'.format(exe)))
+                    return 4
+
+    return 0
+
+
+def cmd_test():  # type: () -> int
+    _auth_sudo()
+
+    print(OK('Running unittests... '))
+    test_problems = _run_unittests()
+    if test_problems:
+        print(ERROR('Failures:'), len(test_problems))
+        map(log.debug, test_problems)
+
+    return len(test_problems)
+
+
+def _auth_sudo():  # type: () -> Result
+    cmd_sudo_check = sh.sudo_run(['-n'])
+    if not cmd_sudo_check:
+        print(WARN("If prompted, authenticate with sudo ... "))
+        cmd_auth = sh.sudo_run(['ls'])
+        if not cmd_auth:
+            print(WARN("WARNING"), "sudo authentication failed")
+        return cmd_auth
+    else:
+        return cmd_sudo_check
+
+
+def _run_linter():  # type: () -> list(str)
+    report_file = 'flake8_report.pep8.txt'
+    fmt = 'lint: %(path)s:%(row)d:%(col)d: %(code)s %(text)s'
+    lint_paths = ['./debugsign', './dbsign/', './unittests/']
+
+    cmd_flake = sh.run(['flake8', '--tee', report_file,
+                        '--format={}'.format(fmt)] + lint_paths)
+    return cmd_flake.stdout.splitlines()
+
+
+def _run_unittests():  # type: () -> list(str)
+    try:
+        import unittest2 as unittest
+    except ImportError:
+        import unittest
+
+    tests = unittest.TestLoader().discover('unittests')
+    test_result = unittest.TextTestRunner(
+        stream=sys.stdout,
+        verbosity=2,
+    ).run(tests)
+
+    problems = test_result.errors + test_result.failures
+    return [str(problem[0]) for problem in problems]