Tim Hammerquist | 545ba4e | 2017-12-12 02:36:17 +0000 | [diff] [blame^] | 1 | # COPYRIGHT LINE: FIXME |
| 2 | |
| 3 | """ |
| 4 | dbsign.commands |
| 5 | """ |
| 6 | |
| 7 | from __future__ import print_function |
| 8 | |
| 9 | import os |
| 10 | import sys |
| 11 | |
| 12 | from dbsign.ansi import ERROR, INFO, OK, WARN |
| 13 | import dbsign.logger as L |
| 14 | import dbsign.security as S |
| 15 | import dbsign.shell as sh |
| 16 | |
| 17 | |
| 18 | # |
| 19 | # Globals and configurables |
| 20 | # |
| 21 | |
| 22 | OVERVIEW_TEXT = '''\ |
| 23 | |
| 24 | {1} |
| 25 | OVERVIEW: |
| 26 | |
| 27 | To configure code signing on a new system, do the following (in order): |
| 28 | |
| 29 | {0} setup |
| 30 | {0} import P12_FILE # MUST BE DONE FROM GUI CONSOLE! |
| 31 | |
| 32 | To verify the configuration: |
| 33 | |
| 34 | {0} check # Not foolproof, but catches most issues |
| 35 | |
| 36 | To enable access to the identity for code signing (eg, from Jenkins job): |
| 37 | |
| 38 | {0} prep |
| 39 | |
| 40 | To replace the configured identity with a new one: |
| 41 | |
| 42 | {0} remove # Removes the whole keychain! |
| 43 | {0} import NEW_P12 # Must be done from GUI console! |
| 44 | |
| 45 | Note that this script currently assumes the following: |
| 46 | |
| 47 | * The identity's common name will be "lldb_codesign" |
| 48 | * The keychain will be named "lldb_codesign" |
| 49 | * The keychain will be locked using the password "lldb_codesign" |
| 50 | * The P12 archive will be encrypted with the password "lldb_codesign" |
| 51 | |
| 52 | This is intended to make it trivial to codesign utilities using the |
| 53 | imported certificate, without exposing any local account information |
| 54 | (eg, user's login keychain password). Please take these factors into |
| 55 | account when evaluating security. |
| 56 | ''' |
| 57 | |
| 58 | log = L.get_logger(__name__) |
| 59 | |
| 60 | CFG = { |
| 61 | 'debug': False, |
| 62 | 'executable': os.path.basename(sys.argv[0]), |
| 63 | 'identity': 'lldb_codesign', |
| 64 | 'id_file': None, # from command line argument |
| 65 | 'keynick': 'lldb', |
| 66 | 'keydb': None, |
| 67 | 'keypass': 'lldb_codesign', |
| 68 | 'privileges': ['system.privilege.taskport'], |
| 69 | } |
| 70 | |
| 71 | |
| 72 | # |
| 73 | # Top-Level Commands |
| 74 | # |
| 75 | |
| 76 | def cmd_check(): # type: () -> int |
| 77 | keydb = CFG['keydb'] |
| 78 | keypass = CFG['keypass'] |
| 79 | identity = CFG['identity'] |
| 80 | exe = CFG['executable'] |
| 81 | |
| 82 | for priv in CFG['privileges']: |
| 83 | print('Verifying privilege {} ... '.format(priv), end='') |
| 84 | res_priv = S.verify_privilege(priv) |
| 85 | if res_priv: |
| 86 | print(OK('OK')) |
| 87 | else: |
| 88 | print(WARN('NOT SET')) |
| 89 | log.debug(res_priv.value) |
| 90 | print(WARN('WARNING'), 'Privileges have not been set.') |
| 91 | print(INFO('To set, run: {} --unsafe setup'.format(exe))) |
| 92 | |
| 93 | print("Unlocking keychain ... ", end='') |
| 94 | res_unlock = S.unlock_keychain(keydb, keypass) |
| 95 | if res_unlock: |
| 96 | print(OK('OK')) |
| 97 | else: |
| 98 | print(ERROR('FAILED')) |
| 99 | log.debug(res_unlock.value) |
| 100 | print(WARN('WARNING'), 'Keychain not configured.') |
| 101 | print(INFO('Please run: {} setup'.format(exe))) |
| 102 | return 1 |
| 103 | |
| 104 | print("Verifying keychain ... ", end='') |
| 105 | res_find = S.keychain_exists(keydb) |
| 106 | if res_find: |
| 107 | print(OK('OK')) |
| 108 | else: |
| 109 | print(ERROR('FAILED')) |
| 110 | log.debug(res_find) |
| 111 | print(INFO(res_find.value)) |
| 112 | return 2 |
| 113 | |
| 114 | print("Searching for identity in keychain ... ", end='') |
| 115 | res_find = S.identity_installed(identity, keydb) |
| 116 | if res_find: |
| 117 | print(OK('OK')) |
| 118 | else: |
| 119 | print(ERROR('FAILED')) |
| 120 | log.debug(res_find) |
| 121 | print(WARN('WARNING'), res_find.value) |
| 122 | print(INFO("Please run: {} import".format(exe))) |
| 123 | return 3 |
| 124 | |
| 125 | print('Verifying identity ... ', end='') |
| 126 | res_id = S.verify_identity(identity, keydb) |
| 127 | if res_id: |
| 128 | print(OK('OK')) |
| 129 | else: |
| 130 | print(ERROR('FAILED')) |
| 131 | log.debug(res_id.value) |
| 132 | print(WARN('WARNING'), "Unable to verify identity") |
| 133 | print(INFO('Please run: {} import'.format(exe))) |
| 134 | return 4 |
| 135 | |
| 136 | return 0 |
| 137 | |
| 138 | |
| 139 | def cmd_clean(): # type: () -> int |
| 140 | identity = CFG['identity'] |
| 141 | keydb = CFG['keydb'] |
| 142 | keypass = CFG['keypass'] |
| 143 | |
| 144 | print("Unlocking keychain ... ", end='') |
| 145 | res_unlock = S.unlock_keychain(keydb, keypass) |
| 146 | if res_unlock: |
| 147 | print(OK('OK')) |
| 148 | else: |
| 149 | print(WARN('FAILED')) |
| 150 | log.debug(res_unlock.value) |
| 151 | print(INFO('Failed to unlock keychain.')) |
| 152 | |
| 153 | print('Removing identity and trust settings ... ', end='') |
| 154 | res_id = S.delete_identity(identity, keydb) |
| 155 | if res_id: |
| 156 | print(OK('OK')) |
| 157 | else: |
| 158 | print(WARN('Failed to remove identity')) |
| 159 | log.debug(res_id.value) |
| 160 | |
| 161 | print('Backing up and removing keychain ... ', end='') |
| 162 | res_key = S.delete_keychain(keydb, backup=True) |
| 163 | if res_key: |
| 164 | print(OK('OK')) |
| 165 | else: |
| 166 | print(WARN('Failed to remove keychain')) |
| 167 | log.debug(res_id.value) |
| 168 | |
| 169 | return 0 |
| 170 | |
| 171 | |
| 172 | def cmd_help(parser): # type: (argparse.ArgumentParser) -> int |
| 173 | print(OVERVIEW_TEXT.format( |
| 174 | CFG['executable'], |
| 175 | parser.format_help())) |
| 176 | return 0 |
| 177 | |
| 178 | |
| 179 | def cmd_import(): # type: () -> int |
| 180 | exe = CFG['executable'] |
| 181 | identity = CFG['identity'] |
| 182 | keydb = CFG['keydb'] |
| 183 | keypass = CFG['keypass'] |
| 184 | id_file = CFG['id_file'] |
| 185 | id_pass = identity |
| 186 | |
| 187 | _auth_sudo() |
| 188 | |
| 189 | if 'SSH_CONNECTION' in os.environ or 'TERM_SESSION_ID' not in os.environ: |
| 190 | print(WARN('WARNING!'), "Remote console session detected!", |
| 191 | "This procedure must be performed from the system console.") |
| 192 | |
| 193 | print('Verifying privileges ... ', end='') |
| 194 | res_verify_privs = S.verify_privileges(CFG['privileges']) |
| 195 | if res_verify_privs: |
| 196 | print(OK('OK')) |
| 197 | else: |
| 198 | print(WARN("WARNING")) |
| 199 | log.debug(res_verify_privs) |
| 200 | print(WARN("Privileges have not been set. Trust may fail.")) |
| 201 | print(INFO("To set privileges, run: {} --unsafe setup".format(exe))) |
| 202 | |
| 203 | print("Unlocking keychain ... ", end='') |
| 204 | res_unlock = S.unlock_keychain(keydb, keypass) |
| 205 | if res_unlock: |
| 206 | print(OK('OK')) |
| 207 | else: |
| 208 | print(ERROR('FAILED')) |
| 209 | log.debug(res_unlock) |
| 210 | print(INFO("Failed to unlock keychain."), |
| 211 | "Run: {} check".format(exe)) |
| 212 | return 1 |
| 213 | |
| 214 | print("Importing new identity {} ... ".format(identity), end='') |
| 215 | res_import = S.import_identity(keydb, keypass, identity, id_file, id_pass) |
| 216 | if res_import: |
| 217 | print(OK('OK')) |
| 218 | log.debug(res_import.value) |
| 219 | else: |
| 220 | print(ERROR('FAILED')) |
| 221 | log.debug(res_import) |
| 222 | print(ERROR('ERROR'), res_import.value) |
| 223 | if 'exists' in res_import.value: |
| 224 | print(WARN("To remove existing identity:"), |
| 225 | "{} remove".format(exe)) |
| 226 | return 2 |
| 227 | |
| 228 | print(WARN("This will test codesigning with the configured identity")) |
| 229 | print(WARN("Please authenticate (if requested) and click 'Always Allow'")) |
| 230 | |
| 231 | print("Trusting identity ... ", end='') |
| 232 | res_trust = S.trust_identity(identity, keydb) |
| 233 | if res_trust: |
| 234 | print(OK('OK')) |
| 235 | else: |
| 236 | print(ERROR('FAILED')) |
| 237 | log.debug(res_trust) |
| 238 | print(INFO("Trust unsuccessful:"), res_trust.value) |
| 239 | if 'unknown error' in res_trust.value: |
| 240 | print(WARN("Please ensure this step is performed" |
| 241 | " from the system console!")) |
| 242 | |
| 243 | print("Rolling back imported identity ... ", end='') |
| 244 | res_remove = S.delete_identity(identity, keydb) |
| 245 | if res_remove: |
| 246 | print(OK('OK')) |
| 247 | else: |
| 248 | print(ERROR('FAILED')) |
| 249 | log.debug(res_remove) |
| 250 | print(res_trust.value) |
| 251 | return 4 |
| 252 | return 3 |
| 253 | |
| 254 | return 0 |
| 255 | |
| 256 | |
| 257 | def cmd_lint(): # type: () -> int |
| 258 | print(OK('Running linters... ')) |
| 259 | lint_problems = _run_linter() |
| 260 | if lint_problems: |
| 261 | print(WARN('Lint:'), len(lint_problems)) |
| 262 | map(log.warn, lint_problems) |
| 263 | |
| 264 | return len(lint_problems) |
| 265 | |
| 266 | |
| 267 | def cmd_prep(): # type: () -> int |
| 268 | """Deliberately terse method for use in CI""" |
| 269 | keydb = CFG['keydb'] |
| 270 | keypass = CFG['keypass'] |
| 271 | |
| 272 | res_unlock = S.unlock_keychain(keydb, keypass) |
| 273 | if not res_unlock: |
| 274 | log.debug(res_unlock.value) |
| 275 | print(ERROR('ERROR'), 'Unable to access signing identity') |
| 276 | return 1 |
| 277 | |
| 278 | return 0 |
| 279 | |
| 280 | |
| 281 | def cmd_remove(): # type: () -> int |
| 282 | keydb = CFG['keydb'] |
| 283 | keypass = CFG['keypass'] |
| 284 | identity = CFG['identity'] |
| 285 | |
| 286 | print("Unlocking keychain ... ", end='') |
| 287 | res_unlock = S.unlock_keychain(keydb, keypass) |
| 288 | if res_unlock: |
| 289 | print(OK('OK')) |
| 290 | else: |
| 291 | log.debug(res_unlock.value) |
| 292 | print(ERROR('ERROR'), 'Failed to unlock keychain') |
| 293 | |
| 294 | print("Removing identity from keychain ... ", end='') |
| 295 | res_rm_id = S.delete_identity(identity, keydb) |
| 296 | if res_rm_id: |
| 297 | print(OK('OK')) |
| 298 | else: |
| 299 | print(WARN('FAILED')) |
| 300 | log.debug(res_rm_id) |
| 301 | print(WARN('WARNING'), "Failed to delete identity from keychain.") |
| 302 | print(INFO(res_rm_id.value)) |
| 303 | |
| 304 | return 0 |
| 305 | |
| 306 | |
| 307 | def cmd_setup(): # type: () -> int |
| 308 | keydb = CFG['keydb'] |
| 309 | keypass = CFG['keypass'] |
| 310 | exe = CFG['executable'] |
| 311 | |
| 312 | print("Configuring keychain ... ", end='') |
| 313 | res_create = S.create_keychain(keydb, keypass) |
| 314 | if res_create: |
| 315 | print(OK('OK')) |
| 316 | else: |
| 317 | print(ERROR('FAILED')) |
| 318 | log.debug(res_create) |
| 319 | print(INFO('Keychain creation failed')) |
| 320 | return 1 |
| 321 | |
| 322 | print("Unlocking keychain ... ", end='') |
| 323 | res_unlock = S.unlock_keychain(keydb, keypass) |
| 324 | if res_unlock: |
| 325 | print(OK('OK')) |
| 326 | else: |
| 327 | print(ERROR('FAILED')) |
| 328 | log.debug(res_unlock) |
| 329 | if 'keychain could not be found' in res_unlock.value: |
| 330 | print(INFO("Keychain creation failed")) |
| 331 | else: |
| 332 | print(INFO("Failed to unlock keychain")) |
| 333 | print(INFO(res_unlock.value)) |
| 334 | return 2 |
| 335 | |
| 336 | print("Adding keychain to search list ... ", end='') |
| 337 | res_searchable = S.add_to_search_list(keydb) |
| 338 | if res_searchable: |
| 339 | print(OK('OK')) |
| 340 | else: |
| 341 | print(ERROR('FAILED')) |
| 342 | log.debug(res_searchable) |
| 343 | print(INFO("Failed to add keychain to search list")) |
| 344 | print(WARN("codesign will not be able to find the signing identity.")) |
| 345 | return 3 |
| 346 | |
| 347 | privs = CFG['privileges'] |
| 348 | print("Checking privileges ... ", end='') |
| 349 | if S.verify_privileges(privs): |
| 350 | print(OK('OK')) |
| 351 | else: |
| 352 | print(INFO('NOT SET')) |
| 353 | |
| 354 | _auth_sudo() |
| 355 | if not os.getenv(S.UNSAFE_FLAG, False): |
| 356 | print(INFO('NOTE'), 'Altering privileges may not be safe.') |
| 357 | print(INFO('NOTE'), 'Re-run with the --unsafe flag to enable.') |
| 358 | else: |
| 359 | priv_value = 'allow' |
| 360 | for priv in CFG['privileges']: |
| 361 | print('Setting privilege {} ... '.format(priv), end='') |
| 362 | res_priv = S.authdb_privilege_write(priv, priv_value) |
| 363 | if res_priv: |
| 364 | print(OK('OK')) |
| 365 | else: |
| 366 | print(INFO('not set')) |
| 367 | log.debug(res_priv.value) |
| 368 | print(INFO('Privileges have not been set.')) |
| 369 | print(INFO('Please re-run: {} setup'.format(exe))) |
| 370 | return 4 |
| 371 | |
| 372 | return 0 |
| 373 | |
| 374 | |
| 375 | def cmd_test(): # type: () -> int |
| 376 | _auth_sudo() |
| 377 | |
| 378 | print(OK('Running unittests... ')) |
| 379 | test_problems = _run_unittests() |
| 380 | if test_problems: |
| 381 | print(ERROR('Failures:'), len(test_problems)) |
| 382 | map(log.debug, test_problems) |
| 383 | |
| 384 | return len(test_problems) |
| 385 | |
| 386 | |
| 387 | def _auth_sudo(): # type: () -> Result |
| 388 | cmd_sudo_check = sh.sudo_run(['-n']) |
| 389 | if not cmd_sudo_check: |
| 390 | print(WARN("If prompted, authenticate with sudo ... ")) |
| 391 | cmd_auth = sh.sudo_run(['ls']) |
| 392 | if not cmd_auth: |
| 393 | print(WARN("WARNING"), "sudo authentication failed") |
| 394 | return cmd_auth |
| 395 | else: |
| 396 | return cmd_sudo_check |
| 397 | |
| 398 | |
| 399 | def _run_linter(): # type: () -> list(str) |
| 400 | report_file = 'flake8_report.pep8.txt' |
| 401 | fmt = 'lint: %(path)s:%(row)d:%(col)d: %(code)s %(text)s' |
| 402 | lint_paths = ['./debugsign', './dbsign/', './unittests/'] |
| 403 | |
| 404 | cmd_flake = sh.run(['flake8', '--tee', report_file, |
| 405 | '--format={}'.format(fmt)] + lint_paths) |
| 406 | return cmd_flake.stdout.splitlines() |
| 407 | |
| 408 | |
| 409 | def _run_unittests(): # type: () -> list(str) |
| 410 | try: |
| 411 | import unittest2 as unittest |
| 412 | except ImportError: |
| 413 | import unittest |
| 414 | |
| 415 | tests = unittest.TestLoader().discover('unittests') |
| 416 | test_result = unittest.TextTestRunner( |
| 417 | stream=sys.stdout, |
| 418 | verbosity=2, |
| 419 | ).run(tests) |
| 420 | |
| 421 | problems = test_result.errors + test_result.failures |
| 422 | return [str(problem[0]) for problem in problems] |