diff options
Diffstat (limited to 'python/qemu/aqmp')
-rw-r--r-- | python/qemu/aqmp/__init__.py | 59 | ||||
-rw-r--r-- | python/qemu/aqmp/aqmp_tui.py | 652 | ||||
-rw-r--r-- | python/qemu/aqmp/error.py | 50 | ||||
-rw-r--r-- | python/qemu/aqmp/events.py | 706 | ||||
-rw-r--r-- | python/qemu/aqmp/message.py | 209 | ||||
-rw-r--r-- | python/qemu/aqmp/models.py | 133 | ||||
-rw-r--r-- | python/qemu/aqmp/protocol.py | 902 | ||||
-rw-r--r-- | python/qemu/aqmp/py.typed | 0 | ||||
-rw-r--r-- | python/qemu/aqmp/qmp_client.py | 621 | ||||
-rw-r--r-- | python/qemu/aqmp/util.py | 217 |
10 files changed, 0 insertions, 3549 deletions
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py deleted file mode 100644 index ab1782999c..0000000000 --- a/python/qemu/aqmp/__init__.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -QEMU Monitor Protocol (QMP) development library & tooling. - -This package provides a fairly low-level class for communicating -asynchronously with QMP protocol servers, as implemented by QEMU, the -QEMU Guest Agent, and the QEMU Storage Daemon. - -`QMPClient` provides the main functionality of this package. All errors -raised by this library dervive from `AQMPError`, see `aqmp.error` for -additional detail. See `aqmp.events` for an in-depth tutorial on -managing QMP events. -""" - -# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc. -# -# Authors: -# John Snow <jsnow@redhat.com> -# -# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>. -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. - -import warnings - -from .error import AQMPError -from .events import EventListener -from .message import Message -from .protocol import ConnectError, Runstate, StateError -from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient - - -_WMSG = """ - -The Asynchronous QMP library is currently in development and its API -should be considered highly fluid and subject to change. It should -not be used by any other scripts checked into the QEMU tree. - -Proceed with caution! -""" - -warnings.warn(_WMSG, FutureWarning) - - -# The order of these fields impact the Sphinx documentation order. -__all__ = ( - # Classes, most to least important - 'QMPClient', - 'Message', - 'EventListener', - 'Runstate', - - # Exceptions, most generic to most explicit - 'AQMPError', - 'StateError', - 'ConnectError', - 'ExecuteError', - 'ExecInterruptedError', -) diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py deleted file mode 100644 index a2929f771c..0000000000 --- a/python/qemu/aqmp/aqmp_tui.py +++ /dev/null @@ -1,652 +0,0 @@ -# Copyright (c) 2021 -# -# Authors: -# Niteesh Babu G S <niteesh.gs@gmail.com> -# -# This work is licensed under the terms of the GNU GPL, version 2 or -# later. See the COPYING file in the top-level directory. -""" -AQMP TUI - -AQMP TUI is an asynchronous interface built on top the of the AQMP library. -It is the successor of QMP-shell and is bought-in as a replacement for it. - -Example Usage: aqmp-tui <SOCKET | TCP IP:PORT> -Full Usage: aqmp-tui --help -""" - -import argparse -import asyncio -import json -import logging -from logging import Handler, LogRecord -import signal -from typing import ( - List, - Optional, - Tuple, - Type, - Union, - cast, -) - -from pygments import lexers -from pygments import token as Token -import urwid -import urwid_readline - -from ..qmp import QEMUMonitorProtocol, QMPBadPortError -from .error import ProtocolError -from .message import DeserializationError, Message, UnexpectedTypeError -from .protocol import ConnectError, Runstate -from .qmp_client import ExecInterruptedError, QMPClient -from .util import create_task, pretty_traceback - - -# The name of the signal that is used to update the history list -UPDATE_MSG: str = 'UPDATE_MSG' - - -palette = [ - (Token.Punctuation, '', '', '', 'h15,bold', 'g7'), - (Token.Text, '', '', '', '', 'g7'), - (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'), - (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'), - (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'), - (Token.Keyword.Constant, '', '', '', '#6af', 'g7'), - ('DEBUG', '', '', '', '#ddf', 'g7'), - ('INFO', '', '', '', 'g100', 'g7'), - ('WARNING', '', '', '', '#ff6', 'g7'), - ('ERROR', '', '', '', '#a00', 'g7'), - ('CRITICAL', '', '', '', '#a00', 'g7'), - ('background', '', 'black', '', '', 'g7'), -] - - -def format_json(msg: str) -> str: - """ - Formats valid/invalid multi-line JSON message into a single-line message. - - Formatting is first tried using the standard json module. If that fails - due to an decoding error then a simple string manipulation is done to - achieve a single line JSON string. - - Converting into single line is more asthetically pleasing when looking - along with error messages. - - Eg: - Input: - [ 1, - true, - 3 ] - The above input is not a valid QMP message and produces the following error - "QMP message is not a JSON object." - When displaying this in TUI in multiline mode we get - - [ 1, - true, - 3 ]: QMP message is not a JSON object. - - whereas in singleline mode we get the following - - [1, true, 3]: QMP message is not a JSON object. - - The single line mode is more asthetically pleasing. - - :param msg: - The message to formatted into single line. - - :return: Formatted singleline message. - """ - try: - msg = json.loads(msg) - return str(json.dumps(msg)) - except json.decoder.JSONDecodeError: - msg = msg.replace('\n', '') - words = msg.split(' ') - words = list(filter(None, words)) - return ' '.join(words) - - -def has_handler_type(logger: logging.Logger, - handler_type: Type[Handler]) -> bool: - """ - The Logger class has no interface to check if a certain type of handler is - installed or not. So we provide an interface to do so. - - :param logger: - Logger object - :param handler_type: - The type of the handler to be checked. - - :return: returns True if handler of type `handler_type`. - """ - for handler in logger.handlers: - if isinstance(handler, handler_type): - return True - return False - - -class App(QMPClient): - """ - Implements the AQMP TUI. - - Initializes the widgets and starts the urwid event loop. - - :param address: - Address of the server to connect to. - :param num_retries: - The number of times to retry before stopping to reconnect. - :param retry_delay: - The delay(sec) before each retry - """ - def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int, - retry_delay: Optional[int]) -> None: - urwid.register_signal(type(self), UPDATE_MSG) - self.window = Window(self) - self.address = address - self.aloop: Optional[asyncio.AbstractEventLoop] = None - self.num_retries = num_retries - self.retry_delay = retry_delay if retry_delay else 2 - self.retry: bool = False - self.exiting: bool = False - super().__init__() - - def add_to_history(self, msg: str, level: Optional[str] = None) -> None: - """ - Appends the msg to the history list. - - :param msg: - The raw message to be appended in string type. - """ - urwid.emit_signal(self, UPDATE_MSG, msg, level) - - def _cb_outbound(self, msg: Message) -> Message: - """ - Callback: outbound message hook. - - Appends the outgoing messages to the history box. - - :param msg: raw outbound message. - :return: final outbound message. - """ - str_msg = str(msg) - - if not has_handler_type(logging.getLogger(), TUILogHandler): - logging.debug('Request: %s', str_msg) - self.add_to_history('<-- ' + str_msg) - return msg - - def _cb_inbound(self, msg: Message) -> Message: - """ - Callback: outbound message hook. - - Appends the incoming messages to the history box. - - :param msg: raw inbound message. - :return: final inbound message. - """ - str_msg = str(msg) - - if not has_handler_type(logging.getLogger(), TUILogHandler): - logging.debug('Request: %s', str_msg) - self.add_to_history('--> ' + str_msg) - return msg - - async def _send_to_server(self, msg: Message) -> None: - """ - This coroutine sends the message to the server. - The message has to be pre-validated. - - :param msg: - Pre-validated message to be to sent to the server. - - :raise Exception: When an unhandled exception is caught. - """ - try: - await self._raw(msg, assign_id='id' not in msg) - except ExecInterruptedError as err: - logging.info('Error server disconnected before reply %s', str(err)) - self.add_to_history('Server disconnected before reply', 'ERROR') - except Exception as err: - logging.error('Exception from _send_to_server: %s', str(err)) - raise err - - def cb_send_to_server(self, raw_msg: str) -> None: - """ - Validates and sends the message to the server. - The raw string message is first converted into a Message object - and is then sent to the server. - - :param raw_msg: - The raw string message to be sent to the server. - - :raise Exception: When an unhandled exception is caught. - """ - try: - msg = Message(bytes(raw_msg, encoding='utf-8')) - create_task(self._send_to_server(msg)) - except (DeserializationError, UnexpectedTypeError) as err: - raw_msg = format_json(raw_msg) - logging.info('Invalid message: %s', err.error_message) - self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR') - - def unhandled_input(self, key: str) -> None: - """ - Handle's keys which haven't been handled by the child widgets. - - :param key: - Unhandled key - """ - if key == 'esc': - self.kill_app() - - def kill_app(self) -> None: - """ - Initiates killing of app. A bridge between asynchronous and synchronous - code. - """ - create_task(self._kill_app()) - - async def _kill_app(self) -> None: - """ - This coroutine initiates the actual disconnect process and calls - urwid.ExitMainLoop() to kill the TUI. - - :raise Exception: When an unhandled exception is caught. - """ - self.exiting = True - await self.disconnect() - logging.debug('Disconnect finished. Exiting app') - raise urwid.ExitMainLoop() - - async def disconnect(self) -> None: - """ - Overrides the disconnect method to handle the errors locally. - """ - try: - await super().disconnect() - except (OSError, EOFError) as err: - logging.info('disconnect: %s', str(err)) - self.retry = True - except ProtocolError as err: - logging.info('disconnect: %s', str(err)) - except Exception as err: - logging.error('disconnect: Unhandled exception %s', str(err)) - raise err - - def _set_status(self, msg: str) -> None: - """ - Sets the message as the status. - - :param msg: - The message to be displayed in the status bar. - """ - self.window.footer.set_text(msg) - - def _get_formatted_address(self) -> str: - """ - Returns a formatted version of the server's address. - - :return: formatted address - """ - if isinstance(self.address, tuple): - host, port = self.address - addr = f'{host}:{port}' - else: - addr = f'{self.address}' - return addr - - async def _initiate_connection(self) -> Optional[ConnectError]: - """ - Tries connecting to a server a number of times with a delay between - each try. If all retries failed then return the error faced during - the last retry. - - :return: Error faced during last retry. - """ - current_retries = 0 - err = None - - # initial try - await self.connect_server() - while self.retry and current_retries < self.num_retries: - logging.info('Connection Failed, retrying in %d', self.retry_delay) - status = f'[Retry #{current_retries} ({self.retry_delay}s)]' - self._set_status(status) - - await asyncio.sleep(self.retry_delay) - - err = await self.connect_server() - current_retries += 1 - # If all retries failed report the last error - if err: - logging.info('All retries failed: %s', err) - return err - return None - - async def manage_connection(self) -> None: - """ - Manage the connection based on the current run state. - - A reconnect is issued when the current state is IDLE and the number - of retries is not exhausted. - A disconnect is issued when the current state is DISCONNECTING. - """ - while not self.exiting: - if self.runstate == Runstate.IDLE: - err = await self._initiate_connection() - # If retry is still true then, we have exhausted all our tries. - if err: - self._set_status(f'[Error: {err.error_message}]') - else: - addr = self._get_formatted_address() - self._set_status(f'[Connected {addr}]') - elif self.runstate == Runstate.DISCONNECTING: - self._set_status('[Disconnected]') - await self.disconnect() - # check if a retry is needed - if self.runstate == Runstate.IDLE: - continue - await self.runstate_changed() - - async def connect_server(self) -> Optional[ConnectError]: - """ - Initiates a connection to the server at address `self.address` - and in case of a failure, sets the status to the respective error. - """ - try: - await self.connect(self.address) - self.retry = False - except ConnectError as err: - logging.info('connect_server: ConnectError %s', str(err)) - self.retry = True - return err - return None - - def run(self, debug: bool = False) -> None: - """ - Starts the long running co-routines and the urwid event loop. - - :param debug: - Enables/Disables asyncio event loop debugging - """ - screen = urwid.raw_display.Screen() - screen.set_terminal_properties(256) - - self.aloop = asyncio.get_event_loop() - self.aloop.set_debug(debug) - - # Gracefully handle SIGTERM and SIGINT signals - cancel_signals = [signal.SIGTERM, signal.SIGINT] - for sig in cancel_signals: - self.aloop.add_signal_handler(sig, self.kill_app) - - event_loop = urwid.AsyncioEventLoop(loop=self.aloop) - main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'), - unhandled_input=self.unhandled_input, - screen=screen, - palette=palette, - handle_mouse=True, - event_loop=event_loop) - - create_task(self.manage_connection(), self.aloop) - try: - main_loop.run() - except Exception as err: - logging.error('%s\n%s\n', str(err), pretty_traceback()) - raise err - - -class StatusBar(urwid.Text): - """ - A simple statusbar modelled using the Text widget. The status can be - set using the set_text function. All text set is aligned to right. - - :param text: Initial text to be displayed. Default is empty str. - """ - def __init__(self, text: str = ''): - super().__init__(text, align='right') - - -class Editor(urwid_readline.ReadlineEdit): - """ - A simple editor modelled using the urwid_readline.ReadlineEdit widget. - Mimcs GNU readline shortcuts and provides history support. - - The readline shortcuts can be found below: - https://github.com/rr-/urwid_readline#features - - Along with the readline features, this editor also has support for - history. Pressing the 'up'/'down' switches between the prev/next messages - available in the history. - - Currently there is no support to save the history to a file. The history of - previous commands is lost on exit. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - super().__init__(caption='> ', multiline=True) - self.parent = parent - self.history: List[str] = [] - self.last_index: int = -1 - self.show_history: bool = False - - def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]: - """ - Handles the keypress on this widget. - - :param size: - The current size of the widget. - :param key: - The key to be handled. - - :return: Unhandled key if any. - """ - msg = self.get_edit_text() - if key == 'up' and not msg: - # Show the history when 'up arrow' is pressed with no input text. - # NOTE: The show_history logic is necessary because in 'multiline' - # mode (which we use) 'up arrow' is used to move between lines. - if not self.history: - return None - self.show_history = True - last_msg = self.history[self.last_index] - self.set_edit_text(last_msg) - self.edit_pos = len(last_msg) - elif key == 'up' and self.show_history: - self.last_index = max(self.last_index - 1, -len(self.history)) - self.set_edit_text(self.history[self.last_index]) - self.edit_pos = len(self.history[self.last_index]) - elif key == 'down' and self.show_history: - if self.last_index == -1: - self.set_edit_text('') - self.show_history = False - else: - self.last_index += 1 - self.set_edit_text(self.history[self.last_index]) - self.edit_pos = len(self.history[self.last_index]) - elif key == 'meta enter': - # When using multiline, enter inserts a new line into the editor - # send the input to the server on alt + enter - self.parent.cb_send_to_server(msg) - self.history.append(msg) - self.set_edit_text('') - self.last_index = -1 - self.show_history = False - else: - self.show_history = False - self.last_index = -1 - return cast(Optional[str], super().keypress(size, key)) - return None - - -class EditorWidget(urwid.Filler): - """ - Wrapper around the editor widget. - - The Editor is a flow widget and has to wrapped inside a box widget. - This class wraps the Editor inside filler widget. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - super().__init__(Editor(parent), valign='top') - - -class HistoryBox(urwid.ListBox): - """ - This widget is modelled using the ListBox widget, contains the list of - all messages both QMP messages and log messsages to be shown in the TUI. - - The messages are urwid.Text widgets. On every append of a message, the - focus is shifted to the last appended message. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - self.history = urwid.SimpleFocusListWalker([]) - super().__init__(self.history) - - def add_to_history(self, - history: Union[str, List[Tuple[str, str]]]) -> None: - """ - Appends a message to the list and set the focus to the last appended - message. - - :param history: - The history item(message/event) to be appended to the list. - """ - self.history.append(urwid.Text(history)) - self.history.set_focus(len(self.history) - 1) - - def mouse_event(self, size: Tuple[int, int], _event: str, button: float, - _x: int, _y: int, focus: bool) -> None: - # Unfortunately there are no urwid constants that represent the mouse - # events. - if button == 4: # Scroll up event - super().keypress(size, 'up') - elif button == 5: # Scroll down event - super().keypress(size, 'down') - - -class HistoryWindow(urwid.Frame): - """ - This window composes the HistoryBox and EditorWidget in a horizontal split. - By default the first focus is given to the history box. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - self.editor_widget = EditorWidget(parent) - self.editor = urwid.LineBox(self.editor_widget) - self.history = HistoryBox(parent) - self.body = urwid.Pile([('weight', 80, self.history), - ('weight', 20, self.editor)]) - super().__init__(self.body) - urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history) - - def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None: - """ - Appends a message to the history box - - :param msg: - The message to be appended to the history box. - :param level: - The log level of the message, if it is a log message. - """ - formatted = [] - if level: - msg = f'[{level}]: {msg}' - formatted.append((level, msg)) - else: - lexer = lexers.JsonLexer() # pylint: disable=no-member - for token in lexer.get_tokens(msg): - formatted.append(token) - self.history.add_to_history(formatted) - - -class Window(urwid.Frame): - """ - This window is the top most widget of the TUI and will contain other - windows. Each child of this widget is responsible for displaying a specific - functionality. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - footer = StatusBar() - body = HistoryWindow(parent) - super().__init__(body, footer=footer) - - -class TUILogHandler(Handler): - """ - This handler routes all the log messages to the TUI screen. - It is installed to the root logger to so that the log message from all - libraries begin used is routed to the screen. - - :param tui: Reference to the TUI object. - """ - def __init__(self, tui: App) -> None: - super().__init__() - self.tui = tui - - def emit(self, record: LogRecord) -> None: - """ - Emits a record to the TUI screen. - - Appends the log message to the TUI screen - """ - level = record.levelname - msg = record.getMessage() - self.tui.add_to_history(msg, level) - - -def main() -> None: - """ - Driver of the whole script, parses arguments, initialize the TUI and - the logger. - """ - parser = argparse.ArgumentParser(description='AQMP TUI') - parser.add_argument('qmp_server', help='Address of the QMP server. ' - 'Format <UNIX socket path | TCP addr:port>') - parser.add_argument('--num-retries', type=int, default=10, - help='Number of times to reconnect before giving up.') - parser.add_argument('--retry-delay', type=int, - help='Time(s) to wait before next retry. ' - 'Default action is to wait 2s between each retry.') - parser.add_argument('--log-file', help='The Log file name') - parser.add_argument('--log-level', default='WARNING', - help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>') - parser.add_argument('--asyncio-debug', action='store_true', - help='Enable debug mode for asyncio loop. ' - 'Generates lot of output, makes TUI unusable when ' - 'logs are logged in the TUI. ' - 'Use only when logging to a file.') - args = parser.parse_args() - - try: - address = QEMUMonitorProtocol.parse_address(args.qmp_server) - except QMPBadPortError as err: - parser.error(str(err)) - - app = App(address, args.num_retries, args.retry_delay) - - root_logger = logging.getLogger() - root_logger.setLevel(logging.getLevelName(args.log_level)) - - if args.log_file: - root_logger.addHandler(logging.FileHandler(args.log_file)) - else: - root_logger.addHandler(TUILogHandler(app)) - - app.run(args.asyncio_debug) - - -if __name__ == '__main__': - main() diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py deleted file mode 100644 index 781f49b008..0000000000 --- a/python/qemu/aqmp/error.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -AQMP Error Classes - -This package seeks to provide semantic error classes that are intended -to be used directly by clients when they would like to handle particular -semantic failures (e.g. "failed to connect") without needing to know the -enumeration of possible reasons for that failure. - -AQMPError serves as the ancestor for all exceptions raised by this -package, and is suitable for use in handling semantic errors from this -library. In most cases, individual public methods will attempt to catch -and re-encapsulate various exceptions to provide a semantic -error-handling interface. - -.. admonition:: AQMP Exception Hierarchy Reference - - | `Exception` - | +-- `AQMPError` - | +-- `ConnectError` - | +-- `StateError` - | +-- `ExecInterruptedError` - | +-- `ExecuteError` - | +-- `ListenerError` - | +-- `ProtocolError` - | +-- `DeserializationError` - | +-- `UnexpectedTypeError` - | +-- `ServerParseError` - | +-- `BadReplyError` - | +-- `GreetingError` - | +-- `NegotiationError` -""" - - -class AQMPError(Exception): - """Abstract error class for all errors originating from this package.""" - - -class ProtocolError(AQMPError): - """ - Abstract error class for protocol failures. - - Semantically, these errors are generally the fault of either the - protocol server or as a result of a bug in this library. - - :param error_message: Human-readable string describing the error. - """ - def __init__(self, error_message: str): - super().__init__(error_message) - #: Human-readable error message, without any prefix. - self.error_message: str = error_message diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py deleted file mode 100644 index fb81d21610..0000000000 --- a/python/qemu/aqmp/events.py +++ /dev/null @@ -1,706 +0,0 @@ -""" -AQMP Events and EventListeners - -Asynchronous QMP uses `EventListener` objects to listen for events. An -`EventListener` is a FIFO event queue that can be pre-filtered to listen -for only specific events. Each `EventListener` instance receives its own -copy of events that it hears, so events may be consumed without fear or -worry for depriving other listeners of events they need to hear. - - -EventListener Tutorial ----------------------- - -In all of the following examples, we assume that we have a `QMPClient` -instantiated named ``qmp`` that is already connected. - - -`listener()` context blocks with one name -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The most basic usage is by using the `listener()` context manager to -construct them: - -.. code:: python - - with qmp.listener('STOP') as listener: - await qmp.execute('stop') - await listener.get() - -The listener is active only for the duration of the ‘with’ block. This -instance listens only for ‘STOP’ events. - - -`listener()` context blocks with two or more names -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Multiple events can be selected for by providing any ``Iterable[str]``: - -.. code:: python - - with qmp.listener(('STOP', 'RESUME')) as listener: - await qmp.execute('stop') - event = await listener.get() - assert event['event'] == 'STOP' - - await qmp.execute('cont') - event = await listener.get() - assert event['event'] == 'RESUME' - - -`listener()` context blocks with no names -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -By omitting names entirely, you can listen to ALL events. - -.. code:: python - - with qmp.listener() as listener: - await qmp.execute('stop') - event = await listener.get() - assert event['event'] == 'STOP' - -This isn’t a very good use case for this feature: In a non-trivial -running system, we may not know what event will arrive next. Grabbing -the top of a FIFO queue returning multiple kinds of events may be prone -to error. - - -Using async iterators to retrieve events -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If you’d like to simply watch what events happen to arrive, you can use -the listener as an async iterator: - -.. code:: python - - with qmp.listener() as listener: - async for event in listener: - print(f"Event arrived: {event['event']}") - -This is analogous to the following code: - -.. code:: python - - with qmp.listener() as listener: - while True: - event = listener.get() - print(f"Event arrived: {event['event']}") - -This event stream will never end, so these blocks will never terminate. - - -Using asyncio.Task to concurrently retrieve events -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Since a listener’s event stream will never terminate, it is not likely -useful to use that form in a script. For longer-running clients, we can -create event handlers by using `asyncio.Task` to create concurrent -coroutines: - -.. code:: python - - async def print_events(listener): - try: - async for event in listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - with qmp.listener() as listener: - task = asyncio.Task(print_events(listener)) - await qmp.execute('stop') - await qmp.execute('cont') - task.cancel() - await task - -However, there is no guarantee that these events will be received by the -time we leave this context block. Once the context block is exited, the -listener will cease to hear any new events, and becomes inert. - -Be mindful of the timing: the above example will *probably*– but does -not *guarantee*– that both STOP/RESUMED events will be printed. The -example below outlines how to use listeners outside of a context block. - - -Using `register_listener()` and `remove_listener()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To create a listener with a longer lifetime, beyond the scope of a -single block, create a listener and then call `register_listener()`: - -.. code:: python - - class MyClient: - def __init__(self, qmp): - self.qmp = qmp - self.listener = EventListener() - - async def print_events(self): - try: - async for event in self.listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - async def run(self): - self.task = asyncio.Task(self.print_events) - self.qmp.register_listener(self.listener) - await qmp.execute('stop') - await qmp.execute('cont') - - async def stop(self): - self.task.cancel() - await self.task - self.qmp.remove_listener(self.listener) - -The listener can be deactivated by using `remove_listener()`. When it is -removed, any possible pending events are cleared and it can be -re-registered at a later time. - - -Using the built-in all events listener -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The `QMPClient` object creates its own default listener named -:py:obj:`~Events.events` that can be used for the same purpose without -having to create your own: - -.. code:: python - - async def print_events(listener): - try: - async for event in listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - task = asyncio.Task(print_events(qmp.events)) - - await qmp.execute('stop') - await qmp.execute('cont') - - task.cancel() - await task - - -Using both .get() and async iterators -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The async iterator and `get()` methods pull events from the same FIFO -queue. If you mix the usage of both, be aware: Events are emitted -precisely once per listener. - -If multiple contexts try to pull events from the same listener instance, -events are still emitted only precisely once. - -This restriction can be lifted by creating additional listeners. - - -Creating multiple listeners -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Additional `EventListener` objects can be created at-will. Each one -receives its own copy of events, with separate FIFO event queues. - -.. code:: python - - my_listener = EventListener() - qmp.register_listener(my_listener) - - await qmp.execute('stop') - copy1 = await my_listener.get() - copy2 = await qmp.events.get() - - assert copy1 == copy2 - -In this example, we await an event from both a user-created -`EventListener` and the built-in events listener. Both receive the same -event. - - -Clearing listeners -~~~~~~~~~~~~~~~~~~ - -`EventListener` objects can be cleared, clearing all events seen thus far: - -.. code:: python - - await qmp.execute('stop') - qmp.events.clear() - await qmp.execute('cont') - event = await qmp.events.get() - assert event['event'] == 'RESUME' - -`EventListener` objects are FIFO queues. If events are not consumed, -they will remain in the queue until they are witnessed or discarded via -`clear()`. FIFO queues will be drained automatically upon leaving a -context block, or when calling `remove_listener()`. - - -Accessing listener history -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -`EventListener` objects record their history. Even after being cleared, -you can obtain a record of all events seen so far: - -.. code:: python - - await qmp.execute('stop') - await qmp.execute('cont') - qmp.events.clear() - - assert len(qmp.events.history) == 2 - assert qmp.events.history[0]['event'] == 'STOP' - assert qmp.events.history[1]['event'] == 'RESUME' - -The history is updated immediately and does not require the event to be -witnessed first. - - -Using event filters -~~~~~~~~~~~~~~~~~~~ - -`EventListener` objects can be given complex filtering criteria if names -are not sufficient: - -.. code:: python - - def job1_filter(event) -> bool: - event_data = event.get('data', {}) - event_job_id = event_data.get('id') - return event_job_id == "job1" - - with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: - await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) - async for event in listener: - if event['data']['status'] == 'concluded': - break - -These filters might be most useful when parameterized. `EventListener` -objects expect a function that takes only a single argument (the raw -event, as a `Message`) and returns a bool; True if the event should be -accepted into the stream. You can create a function that adapts this -signature to accept configuration parameters: - -.. code:: python - - def job_filter(job_id: str) -> EventFilter: - def filter(event: Message) -> bool: - return event['data']['id'] == job_id - return filter - - with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: - await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) - async for event in listener: - if event['data']['status'] == 'concluded': - break - - -Activating an existing listener with `listen()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Listeners with complex, long configurations can also be created manually -and activated temporarily by using `listen()` instead of `listener()`: - -.. code:: python - - listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', - 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', - 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) - - with qmp.listen(listener): - await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) - async for event in listener: - print(event) - if event['event'] == 'BLOCK_JOB_COMPLETED': - break - -Any events that are not witnessed by the time the block is left will be -cleared from the queue; entering the block is an implicit -`register_listener()` and leaving the block is an implicit -`remove_listener()`. - - -Activating multiple existing listeners with `listen()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -While `listener()` is only capable of creating a single listener, -`listen()` is capable of activating multiple listeners simultaneously: - -.. code:: python - - def job_filter(job_id: str) -> EventFilter: - def filter(event: Message) -> bool: - return event['data']['id'] == job_id - return filter - - jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) - jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) - - with qmp.listen(jobA, jobB): - qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) - qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) - - async for event in jobA.get(): - if event['data']['status'] == 'concluded': - break - async for event in jobB.get(): - if event['data']['status'] == 'concluded': - break - - -Extending the `EventListener` class -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -In the case that a more specialized `EventListener` is desired to -provide either more functionality or more compact syntax for specialized -cases, it can be extended. - -One of the key methods to extend or override is -:py:meth:`~EventListener.accept()`. The default implementation checks an -incoming message for: - -1. A qualifying name, if any :py:obj:`~EventListener.names` were - specified at initialization time -2. That :py:obj:`~EventListener.event_filter()` returns True. - -This can be modified however you see fit to change the criteria for -inclusion in the stream. - -For convenience, a ``JobListener`` class could be created that simply -bakes in configuration so it does not need to be repeated: - -.. code:: python - - class JobListener(EventListener): - def __init__(self, job_id: str): - super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', - 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', - 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) - self.job_id = job_id - - def accept(self, event) -> bool: - if not super().accept(event): - return False - if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): - return event['data']['id'] == job_id - return event['data']['device'] == job_id - -From here on out, you can conjure up a custom-purpose listener that -listens only for job-related events for a specific job-id easily: - -.. code:: python - - listener = JobListener('job4') - with qmp.listener(listener): - await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) - async for event in listener: - print(event) - if event['event'] == 'BLOCK_JOB_COMPLETED': - break - - -Experimental Interfaces & Design Issues ---------------------------------------- - -These interfaces are not ones I am sure I will keep or otherwise modify -heavily. - -qmp.listener()’s type signature -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -`listener()` does not return anything, because it was assumed the caller -already had a handle to the listener. However, for -``qmp.listener(EventListener())`` forms, the caller will not have saved -a handle to the listener. - -Because this function can accept *many* listeners, I found it hard to -accurately type in a way where it could be used in both “one” or “many” -forms conveniently and in a statically type-safe manner. - -Ultimately, I removed the return altogether, but perhaps with more time -I can work out a way to re-add it. - - -API Reference -------------- - -""" - -import asyncio -from contextlib import contextmanager -import logging -from typing import ( - AsyncIterator, - Callable, - Iterable, - Iterator, - List, - Optional, - Set, - Tuple, - Union, -) - -from .error import AQMPError -from .message import Message - - -EventNames = Union[str, Iterable[str], None] -EventFilter = Callable[[Message], bool] - - -class ListenerError(AQMPError): - """ - Generic error class for `EventListener`-related problems. - """ - - -class EventListener: - """ - Selectively listens for events with runtime configurable filtering. - - This class is designed to be directly usable for the most common cases, - but it can be extended to provide more rigorous control. - - :param names: - One or more names of events to listen for. - When not provided, listen for ALL events. - :param event_filter: - An optional event filtering function. - When names are also provided, this acts as a secondary filter. - - When ``names`` and ``event_filter`` are both provided, the names - will be filtered first, and then the filter function will be called - second. The event filter function can assume that the format of the - event is a known format. - """ - def __init__( - self, - names: EventNames = None, - event_filter: Optional[EventFilter] = None, - ): - # Queue of 'heard' events yet to be witnessed by a caller. - self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() - - # Intended as a historical record, NOT a processing queue or backlog. - self._history: List[Message] = [] - - #: Primary event filter, based on one or more event names. - self.names: Set[str] = set() - if isinstance(names, str): - self.names.add(names) - elif names is not None: - self.names.update(names) - - #: Optional, secondary event filter. - self.event_filter: Optional[EventFilter] = event_filter - - @property - def history(self) -> Tuple[Message, ...]: - """ - A read-only history of all events seen so far. - - This represents *every* event, including those not yet witnessed - via `get()` or ``async for``. It persists between `clear()` - calls and is immutable. - """ - return tuple(self._history) - - def accept(self, event: Message) -> bool: - """ - Determine if this listener accepts this event. - - This method determines which events will appear in the stream. - The default implementation simply checks the event against the - list of names and the event_filter to decide if this - `EventListener` accepts a given event. It can be - overridden/extended to provide custom listener behavior. - - User code is not expected to need to invoke this method. - - :param event: The event under consideration. - :return: `True`, if this listener accepts this event. - """ - name_ok = (not self.names) or (event['event'] in self.names) - return name_ok and ( - (not self.event_filter) or self.event_filter(event) - ) - - async def put(self, event: Message) -> None: - """ - Conditionally put a new event into the FIFO queue. - - This method is not designed to be invoked from user code, and it - should not need to be overridden. It is a public interface so - that `QMPClient` has an interface by which it can inform - registered listeners of new events. - - The event will be put into the queue if - :py:meth:`~EventListener.accept()` returns `True`. - - :param event: The new event to put into the FIFO queue. - """ - if not self.accept(event): - return - - self._history.append(event) - await self._queue.put(event) - - async def get(self) -> Message: - """ - Wait for the very next event in this stream. - - If one is already available, return that one. - """ - return await self._queue.get() - - def clear(self) -> None: - """ - Clear this listener of all pending events. - - Called when an `EventListener` is being unregistered, this clears the - pending FIFO queue synchronously. It can be also be used to - manually clear any pending events, if desired. - - .. warning:: - Take care when discarding events. Cleared events will be - silently tossed on the floor. All events that were ever - accepted by this listener are visible in `history()`. - """ - while True: - try: - self._queue.get_nowait() - except asyncio.QueueEmpty: - break - - def __aiter__(self) -> AsyncIterator[Message]: - return self - - async def __anext__(self) -> Message: - """ - Enables the `EventListener` to function as an async iterator. - - It may be used like this: - - .. code:: python - - async for event in listener: - print(event) - - These iterators will never terminate of their own accord; you - must provide break conditions or otherwise prepare to run them - in an `asyncio.Task` that can be cancelled. - """ - return await self.get() - - -class Events: - """ - Events is a mix-in class that adds event functionality to the QMP class. - - It's designed specifically as a mix-in for `QMPClient`, and it - relies upon the class it is being mixed into having a 'logger' - property. - """ - def __init__(self) -> None: - self._listeners: List[EventListener] = [] - - #: Default, all-events `EventListener`. - self.events: EventListener = EventListener() - self.register_listener(self.events) - - # Parent class needs to have a logger - self.logger: logging.Logger - - async def _event_dispatch(self, msg: Message) -> None: - """ - Given a new event, propagate it to all of the active listeners. - - :param msg: The event to propagate. - """ - for listener in self._listeners: - await listener.put(msg) - - def register_listener(self, listener: EventListener) -> None: - """ - Register and activate an `EventListener`. - - :param listener: The listener to activate. - :raise ListenerError: If the given listener is already registered. - """ - if listener in self._listeners: - raise ListenerError("Attempted to re-register existing listener") - self.logger.debug("Registering %s.", str(listener)) - self._listeners.append(listener) - - def remove_listener(self, listener: EventListener) -> None: - """ - Unregister and deactivate an `EventListener`. - - The removed listener will have its pending events cleared via - `clear()`. The listener can be re-registered later when - desired. - - :param listener: The listener to deactivate. - :raise ListenerError: If the given listener is not registered. - """ - if listener == self.events: - raise ListenerError("Cannot remove the default listener.") - self.logger.debug("Removing %s.", str(listener)) - listener.clear() - self._listeners.remove(listener) - - @contextmanager - def listen(self, *listeners: EventListener) -> Iterator[None]: - r""" - Context manager: Temporarily listen with an `EventListener`. - - Accepts one or more `EventListener` objects and registers them, - activating them for the duration of the context block. - - `EventListener` objects will have any pending events in their - FIFO queue cleared upon exiting the context block, when they are - deactivated. - - :param \*listeners: One or more EventListeners to activate. - :raise ListenerError: If the given listener(s) are already active. - """ - _added = [] - - try: - for listener in listeners: - self.register_listener(listener) - _added.append(listener) - - yield - - finally: - for listener in _added: - self.remove_listener(listener) - - @contextmanager - def listener( - self, - names: EventNames = (), - event_filter: Optional[EventFilter] = None - ) -> Iterator[EventListener]: - """ - Context manager: Temporarily listen with a new `EventListener`. - - Creates an `EventListener` object and registers it, activating - it for the duration of the context block. - - :param names: - One or more names of events to listen for. - When not provided, listen for ALL events. - :param event_filter: - An optional event filtering function. - When names are also provided, this acts as a secondary filter. - - :return: The newly created and active `EventListener`. - """ - listener = EventListener(names, event_filter) - with self.listen(listener): - yield listener diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py deleted file mode 100644 index f76ccc9074..0000000000 --- a/python/qemu/aqmp/message.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -QMP Message Format - -This module provides the `Message` class, which represents a single QMP -message sent to or from the server. -""" - -import json -from json import JSONDecodeError -from typing import ( - Dict, - Iterator, - Mapping, - MutableMapping, - Optional, - Union, -) - -from .error import ProtocolError - - -class Message(MutableMapping[str, object]): - """ - Represents a single QMP protocol message. - - QMP uses JSON objects as its basic communicative unit; so this - Python object is a :py:obj:`~collections.abc.MutableMapping`. It may - be instantiated from either another mapping (like a `dict`), or from - raw `bytes` that still need to be deserialized. - - Once instantiated, it may be treated like any other MutableMapping:: - - >>> msg = Message(b'{"hello": "world"}') - >>> assert msg['hello'] == 'world' - >>> msg['id'] = 'foobar' - >>> print(msg) - { - "hello": "world", - "id": "foobar" - } - - It can be converted to `bytes`:: - - >>> msg = Message({"hello": "world"}) - >>> print(bytes(msg)) - b'{"hello":"world","id":"foobar"}' - - Or back into a garden-variety `dict`:: - - >>> dict(msg) - {'hello': 'world'} - - - :param value: Initial value, if any. - :param eager: - When `True`, attempt to serialize or deserialize the initial value - immediately, so that conversion exceptions are raised during - the call to ``__init__()``. - """ - # pylint: disable=too-many-ancestors - - def __init__(self, - value: Union[bytes, Mapping[str, object]] = b'{}', *, - eager: bool = True): - self._data: Optional[bytes] = None - self._obj: Optional[Dict[str, object]] = None - - if isinstance(value, bytes): - self._data = value - if eager: - self._obj = self._deserialize(self._data) - else: - self._obj = dict(value) - if eager: - self._data = self._serialize(self._obj) - - # Methods necessary to implement the MutableMapping interface, see: - # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping - - # We get pop, popitem, clear, update, setdefault, __contains__, - # keys, items, values, get, __eq__ and __ne__ for free. - - def __getitem__(self, key: str) -> object: - return self._object[key] - - def __setitem__(self, key: str, value: object) -> None: - self._object[key] = value - self._data = None - - def __delitem__(self, key: str) -> None: - del self._object[key] - self._data = None - - def __iter__(self) -> Iterator[str]: - return iter(self._object) - - def __len__(self) -> int: - return len(self._object) - - # Dunder methods not related to MutableMapping: - - def __repr__(self) -> str: - if self._obj is not None: - return f"Message({self._object!r})" - return f"Message({bytes(self)!r})" - - def __str__(self) -> str: - """Pretty-printed representation of this QMP message.""" - return json.dumps(self._object, indent=2) - - def __bytes__(self) -> bytes: - """bytes representing this QMP message.""" - if self._data is None: - self._data = self._serialize(self._obj or {}) - return self._data - - # Conversion Methods - - @property - def _object(self) -> Dict[str, object]: - """ - A `dict` representing this QMP message. - - Generated on-demand, if required. This property is private - because it returns an object that could be used to invalidate - the internal state of the `Message` object. - """ - if self._obj is None: - self._obj = self._deserialize(self._data or b'{}') - return self._obj - - @classmethod - def _serialize(cls, value: object) -> bytes: - """ - Serialize a JSON object as `bytes`. - - :raise ValueError: When the object cannot be serialized. - :raise TypeError: When the object cannot be serialized. - - :return: `bytes` ready to be sent over the wire. - """ - return json.dumps(value, separators=(',', ':')).encode('utf-8') - - @classmethod - def _deserialize(cls, data: bytes) -> Dict[str, object]: - """ - Deserialize JSON `bytes` into a native Python `dict`. - - :raise DeserializationError: - If JSON deserialization fails for any reason. - :raise UnexpectedTypeError: - If the data does not represent a JSON object. - - :return: A `dict` representing this QMP message. - """ - try: - obj = json.loads(data) - except JSONDecodeError as err: - emsg = "Failed to deserialize QMP message." - raise DeserializationError(emsg, data) from err - if not isinstance(obj, dict): - raise UnexpectedTypeError( - "QMP message is not a JSON object.", - obj - ) - return obj - - -class DeserializationError(ProtocolError): - """ - A QMP message was not understood as JSON. - - When this Exception is raised, ``__cause__`` will be set to the - `json.JSONDecodeError` Exception, which can be interrogated for - further details. - - :param error_message: Human-readable string describing the error. - :param raw: The raw `bytes` that prompted the failure. - """ - def __init__(self, error_message: str, raw: bytes): - super().__init__(error_message) - #: The raw `bytes` that were not understood as JSON. - self.raw: bytes = raw - - def __str__(self) -> str: - return "\n".join([ - super().__str__(), - f" raw bytes were: {str(self.raw)}", - ]) - - -class UnexpectedTypeError(ProtocolError): - """ - A QMP message was JSON, but not a JSON object. - - :param error_message: Human-readable string describing the error. - :param value: The deserialized JSON value that wasn't an object. - """ - def __init__(self, error_message: str, value: object): - super().__init__(error_message) - #: The JSON value that was expected to be an object. - self.value: object = value - - def __str__(self) -> str: - strval = json.dumps(self.value, indent=2) - return "\n".join([ - super().__str__(), - f" json value was: {strval}", - ]) diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py deleted file mode 100644 index 24c94123ac..0000000000 --- a/python/qemu/aqmp/models.py +++ /dev/null @@ -1,133 +0,0 @@ -""" -QMP Data Models - -This module provides simplistic data classes that represent the few -structures that the QMP spec mandates; they are used to verify incoming -data to make sure it conforms to spec. -""" -# pylint: disable=too-few-public-methods - -from collections import abc -from typing import ( - Any, - Mapping, - Optional, - Sequence, -) - - -class Model: - """ - Abstract data model, representing some QMP object of some kind. - - :param raw: The raw object to be validated. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - self._raw = raw - - def _check_key(self, key: str) -> None: - if key not in self._raw: - raise KeyError(f"'{self._name}' object requires '{key}' member") - - def _check_value(self, key: str, type_: type, typestr: str) -> None: - assert key in self._raw - if not isinstance(self._raw[key], type_): - raise TypeError( - f"'{self._name}' member '{key}' must be a {typestr}" - ) - - def _check_member(self, key: str, type_: type, typestr: str) -> None: - self._check_key(key) - self._check_value(key, type_, typestr) - - @property - def _name(self) -> str: - return type(self).__name__ - - def __repr__(self) -> str: - return f"{self._name}({self._raw!r})" - - -class Greeting(Model): - """ - Defined in qmp-spec.txt, section 2.2, "Server Greeting". - - :param raw: The raw Greeting object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'QMP' member - self.QMP: QMPGreeting # pylint: disable=invalid-name - - self._check_member('QMP', abc.Mapping, "JSON object") - self.QMP = QMPGreeting(self._raw['QMP']) - - -class QMPGreeting(Model): - """ - Defined in qmp-spec.txt, section 2.2, "Server Greeting". - - :param raw: The raw QMPGreeting object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'version' member - self.version: Mapping[str, object] - #: 'capabilities' member - self.capabilities: Sequence[object] - - self._check_member('version', abc.Mapping, "JSON object") - self.version = self._raw['version'] - - self._check_member('capabilities', abc.Sequence, "JSON array") - self.capabilities = self._raw['capabilities'] - - -class ErrorResponse(Model): - """ - Defined in qmp-spec.txt, section 2.4.2, "error". - - :param raw: The raw ErrorResponse object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'error' member - self.error: ErrorInfo - #: 'id' member - self.id: Optional[object] = None # pylint: disable=invalid-name - - self._check_member('error', abc.Mapping, "JSON object") - self.error = ErrorInfo(self._raw['error']) - - if 'id' in raw: - self.id = raw['id'] - - -class ErrorInfo(Model): - """ - Defined in qmp-spec.txt, section 2.4.2, "error". - - :param raw: The raw ErrorInfo object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'class' member, with an underscore to avoid conflicts in Python. - self.class_: str - #: 'desc' member - self.desc: str - - self._check_member('class', str, "string") - self.class_ = self._raw['class'] - - self._check_member('desc', str, "string") - self.desc = self._raw['desc'] diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py deleted file mode 100644 index 32e78749c1..0000000000 --- a/python/qemu/aqmp/protocol.py +++ /dev/null @@ -1,902 +0,0 @@ -""" -Generic Asynchronous Message-based Protocol Support - -This module provides a generic framework for sending and receiving -messages over an asyncio stream. `AsyncProtocol` is an abstract class -that implements the core mechanisms of a simple send/receive protocol, -and is designed to be extended. - -In this package, it is used as the implementation for the `QMPClient` -class. -""" - -import asyncio -from asyncio import StreamReader, StreamWriter -from enum import Enum -from functools import wraps -import logging -from ssl import SSLContext -from typing import ( - Any, - Awaitable, - Callable, - Generic, - List, - Optional, - Tuple, - TypeVar, - Union, - cast, -) - -from .error import AQMPError -from .util import ( - bottom_half, - create_task, - exception_summary, - flush, - is_closing, - pretty_traceback, - upper_half, - wait_closed, -) - - -T = TypeVar('T') -_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None`` -_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]']) - - -class Runstate(Enum): - """Protocol session runstate.""" - - #: Fully quiesced and disconnected. - IDLE = 0 - #: In the process of connecting or establishing a session. - CONNECTING = 1 - #: Fully connected and active session. - RUNNING = 2 - #: In the process of disconnecting. - #: Runstate may be returned to `IDLE` by calling `disconnect()`. - DISCONNECTING = 3 - - -class ConnectError(AQMPError): - """ - Raised when the initial connection process has failed. - - This Exception always wraps a "root cause" exception that can be - interrogated for additional information. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - #: Human-readable error string - self.error_message: str = error_message - #: Wrapped root cause exception - self.exc: Exception = exc - - def __str__(self) -> str: - return f"{self.error_message}: {self.exc!s}" - - -class StateError(AQMPError): - """ - An API command (connect, execute, etc) was issued at an inappropriate time. - - This error is raised when a command like - :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate - time. - - :param error_message: Human-readable string describing the state violation. - :param state: The actual `Runstate` seen at the time of the violation. - :param required: The `Runstate` required to process this command. - """ - def __init__(self, error_message: str, - state: Runstate, required: Runstate): - super().__init__(error_message) - self.error_message = error_message - self.state = state - self.required = required - - -F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name - - -# Don't Panic. -def require(required_state: Runstate) -> Callable[[F], F]: - """ - Decorator: protect a method so it can only be run in a certain `Runstate`. - - :param required_state: The `Runstate` required to invoke this method. - :raise StateError: When the required `Runstate` is not met. - """ - def _decorator(func: F) -> F: - # _decorator is the decorator that is built by calling the - # require() decorator factory; e.g.: - # - # @require(Runstate.IDLE) def foo(): ... - # will replace 'foo' with the result of '_decorator(foo)'. - - @wraps(func) - def _wrapper(proto: 'AsyncProtocol[Any]', - *args: Any, **kwargs: Any) -> Any: - # _wrapper is the function that gets executed prior to the - # decorated method. - - name = type(proto).__name__ - - if proto.runstate != required_state: - if proto.runstate == Runstate.CONNECTING: - emsg = f"{name} is currently connecting." - elif proto.runstate == Runstate.DISCONNECTING: - emsg = (f"{name} is disconnecting." - " Call disconnect() to return to IDLE state.") - elif proto.runstate == Runstate.RUNNING: - emsg = f"{name} is already connected and running." - elif proto.runstate == Runstate.IDLE: - emsg = f"{name} is disconnected and idle." - else: - assert False - raise StateError(emsg, proto.runstate, required_state) - # No StateError, so call the wrapped method. - return func(proto, *args, **kwargs) - - # Return the decorated method; - # Transforming Func to Decorated[Func]. - return cast(F, _wrapper) - - # Return the decorator instance from the decorator factory. Phew! - return _decorator - - -class AsyncProtocol(Generic[T]): - """ - AsyncProtocol implements a generic async message-based protocol. - - This protocol assumes the basic unit of information transfer between - client and server is a "message", the details of which are left up - to the implementation. It assumes the sending and receiving of these - messages is full-duplex and not necessarily correlated; i.e. it - supports asynchronous inbound messages. - - It is designed to be extended by a specific protocol which provides - the implementations for how to read and send messages. These must be - defined in `_do_recv()` and `_do_send()`, respectively. - - Other callbacks have a default implementation, but are intended to be - either extended or overridden: - - - `_establish_session`: - The base implementation starts the reader/writer tasks. - A protocol implementation can override this call, inserting - actions to be taken prior to starting the reader/writer tasks - before the super() call; actions needing to occur afterwards - can be written after the super() call. - - `_on_message`: - Actions to be performed when a message is received. - - `_cb_outbound`: - Logging/Filtering hook for all outbound messages. - - `_cb_inbound`: - Logging/Filtering hook for all inbound messages. - This hook runs *before* `_on_message()`. - - :param name: - Name used for logging messages, if any. By default, messages - will log to 'qemu.aqmp.protocol', but each individual connection - can be given its own logger by giving it a name; messages will - then log to 'qemu.aqmp.protocol.${name}'. - """ - # pylint: disable=too-many-instance-attributes - - #: Logger object for debugging messages from this connection. - logger = logging.getLogger(__name__) - - # Maximum allowable size of read buffer - _limit = (64 * 1024) - - # ------------------------- - # Section: Public interface - # ------------------------- - - def __init__(self, name: Optional[str] = None) -> None: - #: The nickname for this connection, if any. - self.name: Optional[str] = name - if self.name is not None: - self.logger = self.logger.getChild(self.name) - - # stream I/O - self._reader: Optional[StreamReader] = None - self._writer: Optional[StreamWriter] = None - - # Outbound Message queue - self._outgoing: asyncio.Queue[T] - - # Special, long-running tasks: - self._reader_task: Optional[asyncio.Future[None]] = None - self._writer_task: Optional[asyncio.Future[None]] = None - - # Aggregate of the above two tasks, used for Exception management. - self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None - - #: Disconnect task. The disconnect implementation runs in a task - #: so that asynchronous disconnects (initiated by the - #: reader/writer) are allowed to wait for the reader/writers to - #: exit. - self._dc_task: Optional[asyncio.Future[None]] = None - - self._runstate = Runstate.IDLE - self._runstate_changed: Optional[asyncio.Event] = None - - def __repr__(self) -> str: - cls_name = type(self).__name__ - tokens = [] - if self.name is not None: - tokens.append(f"name={self.name!r}") - tokens.append(f"runstate={self.runstate.name}") - return f"<{cls_name} {' '.join(tokens)}>" - - @property # @upper_half - def runstate(self) -> Runstate: - """The current `Runstate` of the connection.""" - return self._runstate - - @upper_half - async def runstate_changed(self) -> Runstate: - """ - Wait for the `runstate` to change, then return that runstate. - """ - await self._runstate_event.wait() - return self.runstate - - @upper_half - @require(Runstate.IDLE) - async def accept(self, address: Union[str, Tuple[str, int]], - ssl: Optional[SSLContext] = None) -> None: - """ - Accept a connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :param address: - Address to listen to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection could not be accepted. - """ - await self._new_session(address, ssl, accept=True) - - @upper_half - @require(Runstate.IDLE) - async def connect(self, address: Union[str, Tuple[str, int]], - ssl: Optional[SSLContext] = None) -> None: - """ - Connect to the server and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection cannot be made to the server. - """ - await self._new_session(address, ssl) - - @upper_half - async def disconnect(self) -> None: - """ - Disconnect and wait for all tasks to fully stop. - - If there was an exception that caused the reader/writers to - terminate prematurely, it will be raised here. - - :raise Exception: When the reader or writer terminate unexpectedly. - """ - self.logger.debug("disconnect() called.") - self._schedule_disconnect() - await self._wait_disconnect() - - # -------------------------- - # Section: Session machinery - # -------------------------- - - @property - def _runstate_event(self) -> asyncio.Event: - # asyncio.Event() objects should not be created prior to entrance into - # an event loop, so we can ensure we create it in the correct context. - # Create it on-demand *only* at the behest of an 'async def' method. - if not self._runstate_changed: - self._runstate_changed = asyncio.Event() - return self._runstate_changed - - @upper_half - @bottom_half - def _set_state(self, state: Runstate) -> None: - """ - Change the `Runstate` of the protocol connection. - - Signals the `runstate_changed` event. - """ - if state == self._runstate: - return - - self.logger.debug("Transitioning from '%s' to '%s'.", - str(self._runstate), str(state)) - self._runstate = state - self._runstate_event.set() - self._runstate_event.clear() - - @upper_half - async def _new_session(self, - address: Union[str, Tuple[str, int]], - ssl: Optional[SSLContext] = None, - accept: bool = False) -> None: - """ - Establish a new connection and initialize the session. - - Connect or accept a new connection, then begin the protocol - session machinery. If this call fails, `runstate` is guaranteed - to be set back to `IDLE`. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `AQMPError`. - """ - assert self.runstate == Runstate.IDLE - - try: - phase = "connection" - await self._establish_connection(address, ssl, accept) - - phase = "session" - await self._establish_session() - - except BaseException as err: - emsg = f"Failed to establish {phase}" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - try: - # Reset from CONNECTING back to IDLE. - await self.disconnect() - except: - emsg = "Unexpected bottom half exception" - self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) - raise - - # NB: CancelledError is not a BaseException before Python 3.8 - if isinstance(err, asyncio.CancelledError): - raise - - if isinstance(err, Exception): - raise ConnectError(emsg, err) from err - - # Raise BaseExceptions un-wrapped, they're more important. - raise - - assert self.runstate == Runstate.RUNNING - - @upper_half - async def _establish_connection( - self, - address: Union[str, Tuple[str, int]], - ssl: Optional[SSLContext] = None, - accept: bool = False - ) -> None: - """ - Establish a new connection. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - if accept: - await self._do_accept(address, ssl) - else: - await self._do_connect(address, ssl) - - @upper_half - async def _do_accept(self, address: Union[str, Tuple[str, int]], - ssl: Optional[SSLContext] = None) -> None: - """ - Acting as the transport server, accept a single connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - self.logger.debug("Awaiting connection on %s ...", address) - connected = asyncio.Event() - server: Optional[asyncio.AbstractServer] = None - - async def _client_connected_cb(reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - """Used to accept a single incoming connection, see below.""" - nonlocal server - nonlocal connected - - # A connection has been accepted; stop listening for new ones. - assert server is not None - server.close() - await server.wait_closed() - server = None - - # Register this client as being connected - self._reader, self._writer = (reader, writer) - - # Signal back: We've accepted a client! - connected.set() - - if isinstance(address, tuple): - coro = asyncio.start_server( - _client_connected_cb, - host=address[0], - port=address[1], - ssl=ssl, - backlog=1, - limit=self._limit, - ) - else: - coro = asyncio.start_unix_server( - _client_connected_cb, - path=address, - ssl=ssl, - backlog=1, - limit=self._limit, - ) - - server = await coro # Starts listening - await connected.wait() # Waits for the callback to fire (and finish) - assert server is None - - self.logger.debug("Connection accepted.") - - @upper_half - async def _do_connect(self, address: Union[str, Tuple[str, int]], - ssl: Optional[SSLContext] = None) -> None: - """ - Acting as the transport client, initiate a connection to a server. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - self.logger.debug("Connecting to %s ...", address) - - if isinstance(address, tuple): - connect = asyncio.open_connection( - address[0], - address[1], - ssl=ssl, - limit=self._limit, - ) - else: - connect = asyncio.open_unix_connection( - path=address, - ssl=ssl, - limit=self._limit, - ) - self._reader, self._writer = await connect - - self.logger.debug("Connected.") - - @upper_half - async def _establish_session(self) -> None: - """ - Establish a new session. - - Starts the readers/writer tasks; subclasses may perform their - own negotiations here. The Runstate will be RUNNING upon - successful conclusion. - """ - assert self.runstate == Runstate.CONNECTING - - self._outgoing = asyncio.Queue() - - reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') - writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') - - self._reader_task = create_task(reader_coro) - self._writer_task = create_task(writer_coro) - - self._bh_tasks = asyncio.gather( - self._reader_task, - self._writer_task, - ) - - self._set_state(Runstate.RUNNING) - await asyncio.sleep(0) # Allow runstate_event to process - - @upper_half - @bottom_half - def _schedule_disconnect(self) -> None: - """ - Initiate a disconnect; idempotent. - - This method is used both in the upper-half as a direct - consequence of `disconnect()`, and in the bottom-half in the - case of unhandled exceptions in the reader/writer tasks. - - It can be invoked no matter what the `runstate` is. - """ - if not self._dc_task: - self._set_state(Runstate.DISCONNECTING) - self.logger.debug("Scheduling disconnect.") - self._dc_task = create_task(self._bh_disconnect()) - - @upper_half - async def _wait_disconnect(self) -> None: - """ - Waits for a previously scheduled disconnect to finish. - - This method will gather any bottom half exceptions and re-raise - the one that occurred first; presuming it to be the root cause - of any subsequent Exceptions. It is intended to be used in the - upper half of the call chain. - - :raise Exception: - Arbitrary exception re-raised on behalf of the reader/writer. - """ - assert self.runstate == Runstate.DISCONNECTING - assert self._dc_task - - aws: List[Awaitable[object]] = [self._dc_task] - if self._bh_tasks: - aws.insert(0, self._bh_tasks) - all_defined_tasks = asyncio.gather(*aws) - - # Ensure disconnect is done; Exception (if any) is not raised here: - await asyncio.wait((self._dc_task,)) - - try: - await all_defined_tasks # Raise Exceptions from the bottom half. - finally: - self._cleanup() - self._set_state(Runstate.IDLE) - - @upper_half - def _cleanup(self) -> None: - """ - Fully reset this object to a clean state and return to `IDLE`. - """ - def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]: - # Help to erase a task, ENSURING it is fully quiesced first. - assert (task is None) or task.done() - return None if (task and task.done()) else task - - assert self.runstate == Runstate.DISCONNECTING - self._dc_task = _paranoid_task_erase(self._dc_task) - self._reader_task = _paranoid_task_erase(self._reader_task) - self._writer_task = _paranoid_task_erase(self._writer_task) - self._bh_tasks = _paranoid_task_erase(self._bh_tasks) - - self._reader = None - self._writer = None - - # NB: _runstate_changed cannot be cleared because we still need it to - # send the final runstate changed event ...! - - # ---------------------------- - # Section: Bottom Half methods - # ---------------------------- - - @bottom_half - async def _bh_disconnect(self) -> None: - """ - Disconnect and cancel all outstanding tasks. - - It is designed to be called from its task context, - :py:obj:`~AsyncProtocol._dc_task`. By running in its own task, - it is free to wait on any pending actions that may still need to - occur in either the reader or writer tasks. - """ - assert self.runstate == Runstate.DISCONNECTING - - def _done(task: Optional['asyncio.Future[Any]']) -> bool: - return task is not None and task.done() - - # NB: We can't rely on _bh_tasks being done() here, it may not - # yet have had a chance to run and gather itself. - tasks = tuple(filter(None, (self._writer_task, self._reader_task))) - error_pathway = _done(self._reader_task) or _done(self._writer_task) - - try: - # Try to flush the writer, if possible: - if not error_pathway: - await self._bh_flush_writer() - except BaseException as err: - error_pathway = True - emsg = "Failed to flush the writer" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - finally: - # Cancel any still-running tasks: - if self._writer_task is not None and not self._writer_task.done(): - self.logger.debug("Cancelling writer task.") - self._writer_task.cancel() - if self._reader_task is not None and not self._reader_task.done(): - self.logger.debug("Cancelling reader task.") - self._reader_task.cancel() - - # Close out the tasks entirely (Won't raise): - if tasks: - self.logger.debug("Waiting for tasks to complete ...") - await asyncio.wait(tasks) - - # Lastly, close the stream itself. (May raise): - await self._bh_close_stream(error_pathway) - self.logger.debug("Disconnected.") - - @bottom_half - async def _bh_flush_writer(self) -> None: - if not self._writer_task: - return - - self.logger.debug("Draining the outbound queue ...") - await self._outgoing.join() - if self._writer is not None: - self.logger.debug("Flushing the StreamWriter ...") - await flush(self._writer) - - @bottom_half - async def _bh_close_stream(self, error_pathway: bool = False) -> None: - # NB: Closing the writer also implcitly closes the reader. - if not self._writer: - return - - if not is_closing(self._writer): - self.logger.debug("Closing StreamWriter.") - self._writer.close() - - self.logger.debug("Waiting for StreamWriter to close ...") - try: - await wait_closed(self._writer) - except Exception: # pylint: disable=broad-except - # It's hard to tell if the Stream is already closed or - # not. Even if one of the tasks has failed, it may have - # failed for a higher-layered protocol reason. The - # stream could still be open and perfectly fine. - # I don't know how to discern its health here. - - if error_pathway: - # We already know that *something* went wrong. Let's - # just trust that the Exception we already have is the - # better one to present to the user, even if we don't - # genuinely *know* the relationship between the two. - self.logger.debug( - "Discarding Exception from wait_closed:\n%s\n", - pretty_traceback(), - ) - else: - # Oops, this is a brand-new error! - raise - finally: - self.logger.debug("StreamWriter closed.") - - @bottom_half - async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: - """ - Run one of the bottom-half methods in a loop forever. - - If the bottom half ever raises any exception, schedule a - disconnect that will terminate the entire loop. - - :param async_fn: The bottom-half method to run in a loop. - :param name: The name of this task, used for logging. - """ - try: - while True: - await async_fn() - except asyncio.CancelledError: - # We have been cancelled by _bh_disconnect, exit gracefully. - self.logger.debug("Task.%s: cancelled.", name) - return - except BaseException as err: - self.logger.error("Task.%s: %s", - name, exception_summary(err)) - self.logger.debug("Task.%s: failure:\n%s\n", - name, pretty_traceback()) - self._schedule_disconnect() - raise - finally: - self.logger.debug("Task.%s: exiting.", name) - - @bottom_half - async def _bh_send_message(self) -> None: - """ - Wait for an outgoing message, then send it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._outgoing.get() - try: - await self._send(msg) - finally: - self._outgoing.task_done() - - @bottom_half - async def _bh_recv_message(self) -> None: - """ - Wait for an incoming message and call `_on_message` to route it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._recv() - await self._on_message(msg) - - # -------------------- - # Section: Message I/O - # -------------------- - - @upper_half - @bottom_half - def _cb_outbound(self, msg: T) -> T: - """ - Callback: outbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate outgoing messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - :param msg: raw outbound message - :return: final outbound message - """ - self.logger.debug("--> %s", str(msg)) - return msg - - @upper_half - @bottom_half - def _cb_inbound(self, msg: T) -> T: - """ - Callback: inbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate incoming messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - This method does not "handle" incoming messages; it is a filter. - The actual "endpoint" for incoming messages is `_on_message()`. - - :param msg: raw inbound message - :return: processed inbound message - """ - self.logger.debug("<-- %s", str(msg)) - return msg - - @upper_half - @bottom_half - async def _readline(self) -> bytes: - """ - Wait for a newline from the incoming reader. - - This method is provided as a convenience for upper-layer - protocols, as many are line-based. - - This method *may* return a sequence of bytes without a trailing - newline if EOF occurs, but *some* bytes were received. In this - case, the next call will raise `EOFError`. It is assumed that - the layer 5 protocol will decide if there is anything meaningful - to be done with a partial message. - - :raise OSError: For stream-related errors. - :raise EOFError: - If the reader stream is at EOF and there are no bytes to return. - :return: bytes, including the newline. - """ - assert self._reader is not None - msg_bytes = await self._reader.readline() - - if not msg_bytes: - if self._reader.at_eof(): - raise EOFError - - return msg_bytes - - @upper_half - @bottom_half - async def _do_recv(self) -> T: - """ - Abstract: Read from the stream and return a message. - - Very low-level; intended to only be called by `_recv()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _recv(self) -> T: - """ - Read an arbitrary protocol message. - - .. warning:: - This method is intended primarily for `_bh_recv_message()` - to use in an asynchronous task loop. Using it outside of - this loop will "steal" messages from the normal routing - mechanism. It is safe to use prior to `_establish_session()`, - but should not be used otherwise. - - This method uses `_do_recv()` to retrieve the raw message, and - then transforms it using `_cb_inbound()`. - - :return: A single (filtered, processed) protocol message. - """ - message = await self._do_recv() - return self._cb_inbound(message) - - @upper_half - @bottom_half - def _do_send(self, msg: T) -> None: - """ - Abstract: Write a message to the stream. - - Very low-level; intended to only be called by `_send()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _send(self, msg: T) -> None: - """ - Send an arbitrary protocol message. - - This method will transform any outgoing messages according to - `_cb_outbound()`. - - .. warning:: - Like `_recv()`, this method is intended to be called by - the writer task loop that processes outgoing - messages. Calling it directly may circumvent logic - implemented by the caller meant to correlate outgoing and - incoming messages. - - :raise OSError: For problems with the underlying stream. - """ - msg = self._cb_outbound(msg) - self._do_send(msg) - - @bottom_half - async def _on_message(self, msg: T) -> None: - """ - Called to handle the receipt of a new message. - - .. caution:: - This is executed from within the reader loop, so be advised - that waiting on either the reader or writer task will lead - to deadlock. Additionally, any unhandled exceptions will - directly cause the loop to halt, so logic may be best-kept - to a minimum if at all possible. - - :param msg: The incoming message, already logged/filtered. - """ - # Nothing to do in the abstract case. diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed deleted file mode 100644 index e69de29bb2..0000000000 --- a/python/qemu/aqmp/py.typed +++ /dev/null diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py deleted file mode 100644 index 82e9dab124..0000000000 --- a/python/qemu/aqmp/qmp_client.py +++ /dev/null @@ -1,621 +0,0 @@ -""" -QMP Protocol Implementation - -This module provides the `QMPClient` class, which can be used to connect -and send commands to a QMP server such as QEMU. The QMP class can be -used to either connect to a listening server, or used to listen and -accept an incoming connection from that server. -""" - -import asyncio -import logging -from typing import ( - Dict, - List, - Mapping, - Optional, - Union, - cast, -) - -from .error import AQMPError, ProtocolError -from .events import Events -from .message import Message -from .models import ErrorResponse, Greeting -from .protocol import AsyncProtocol, Runstate, require -from .util import ( - bottom_half, - exception_summary, - pretty_traceback, - upper_half, -) - - -class _WrappedProtocolError(ProtocolError): - """ - Abstract exception class for Protocol errors that wrap an Exception. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - self.exc = exc - - def __str__(self) -> str: - return f"{self.error_message}: {self.exc!s}" - - -class GreetingError(_WrappedProtocolError): - """ - An exception occurred during the Greeting phase. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - - -class NegotiationError(_WrappedProtocolError): - """ - An exception occurred during the Negotiation phase. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - - -class ExecuteError(AQMPError): - """ - Exception raised by `QMPClient.execute()` on RPC failure. - - :param error_response: The RPC error response object. - :param sent: The sent RPC message that caused the failure. - :param received: The raw RPC error reply received. - """ - def __init__(self, error_response: ErrorResponse, - sent: Message, received: Message): - super().__init__(error_response.error.desc) - #: The sent `Message` that caused the failure - self.sent: Message = sent - #: The received `Message` that indicated failure - self.received: Message = received - #: The parsed error response - self.error: ErrorResponse = error_response - #: The QMP error class - self.error_class: str = error_response.error.class_ - - -class ExecInterruptedError(AQMPError): - """ - Exception raised by `execute()` (et al) when an RPC is interrupted. - - This error is raised when an `execute()` statement could not be - completed. This can occur because the connection itself was - terminated before a reply was received. - - The true cause of the interruption will be available via `disconnect()`. - """ - - -class _MsgProtocolError(ProtocolError): - """ - Abstract error class for protocol errors that have a `Message` object. - - This Exception class is used for protocol errors where the `Message` - was mechanically understood, but was found to be inappropriate or - malformed. - - :param error_message: Human-readable string describing the error. - :param msg: The QMP `Message` that caused the error. - """ - def __init__(self, error_message: str, msg: Message): - super().__init__(error_message) - #: The received `Message` that caused the error. - self.msg: Message = msg - - def __str__(self) -> str: - return "\n".join([ - super().__str__(), - f" Message was: {str(self.msg)}\n", - ]) - - -class ServerParseError(_MsgProtocolError): - """ - The Server sent a `Message` indicating parsing failure. - - i.e. A reply has arrived from the server, but it is missing the "ID" - field, indicating a parsing error. - - :param error_message: Human-readable string describing the error. - :param msg: The QMP `Message` that caused the error. - """ - - -class BadReplyError(_MsgProtocolError): - """ - An execution reply was successfully routed, but not understood. - - If a QMP message is received with an 'id' field to allow it to be - routed, but is otherwise malformed, this exception will be raised. - - A reply message is malformed if it is missing either the 'return' or - 'error' keys, or if the 'error' value has missing keys or members of - the wrong type. - - :param error_message: Human-readable string describing the error. - :param msg: The malformed reply that was received. - :param sent: The message that was sent that prompted the error. - """ - def __init__(self, error_message: str, msg: Message, sent: Message): - super().__init__(error_message, msg) - #: The sent `Message` that caused the failure - self.sent = sent - - -class QMPClient(AsyncProtocol[Message], Events): - """ - Implements a QMP client connection. - - QMP can be used to establish a connection as either the transport - client or server, though this class always acts as the QMP client. - - :param name: Optional nickname for the connection, used for logging. - - Basic script-style usage looks like this:: - - qmp = QMPClient('my_virtual_machine_name') - await qmp.connect(('127.0.0.1', 1234)) - ... - res = await qmp.execute('block-query') - ... - await qmp.disconnect() - - Basic async client-style usage looks like this:: - - class Client: - def __init__(self, name: str): - self.qmp = QMPClient(name) - - async def watch_events(self): - try: - async for event in self.qmp.events: - print(f"Event: {event['event']}") - except asyncio.CancelledError: - return - - async def run(self, address='/tmp/qemu.socket'): - await self.qmp.connect(address) - asyncio.create_task(self.watch_events()) - await self.qmp.runstate_changed.wait() - await self.disconnect() - - See `aqmp.events` for more detail on event handling patterns. - """ - #: Logger object used for debugging messages. - logger = logging.getLogger(__name__) - - # Read buffer limit; large enough to accept query-qmp-schema - _limit = (256 * 1024) - - # Type alias for pending execute() result items - _PendingT = Union[Message, ExecInterruptedError] - - def __init__(self, name: Optional[str] = None) -> None: - super().__init__(name) - Events.__init__(self) - - #: Whether or not to await a greeting after establishing a connection. - self.await_greeting: bool = True - - #: Whether or not to perform capabilities negotiation upon connection. - #: Implies `await_greeting`. - self.negotiate: bool = True - - # Cached Greeting, if one was awaited. - self._greeting: Optional[Greeting] = None - - # Command ID counter - self._execute_id = 0 - - # Incoming RPC reply messages. - self._pending: Dict[ - Union[str, None], - 'asyncio.Queue[QMPClient._PendingT]' - ] = {} - - @upper_half - async def _establish_session(self) -> None: - """ - Initiate the QMP session. - - Wait for the QMP greeting and perform capabilities negotiation. - - :raise GreetingError: When the greeting is not understood. - :raise NegotiationError: If the negotiation fails. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - """ - self._greeting = None - self._pending = {} - - if self.await_greeting or self.negotiate: - self._greeting = await self._get_greeting() - - if self.negotiate: - await self._negotiate() - - # This will start the reader/writers: - await super()._establish_session() - - @upper_half - async def _get_greeting(self) -> Greeting: - """ - :raise GreetingError: When the greeting is not understood. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - - :return: the Greeting object given by the server. - """ - self.logger.debug("Awaiting greeting ...") - - try: - msg = await self._recv() - return Greeting(msg) - except (ProtocolError, KeyError, TypeError) as err: - emsg = "Did not understand Greeting" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise GreetingError(emsg, err) from err - except BaseException as err: - # EOFError, OSError, or something unexpected. - emsg = "Failed to receive Greeting" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - - @upper_half - async def _negotiate(self) -> None: - """ - Perform QMP capabilities negotiation. - - :raise NegotiationError: When negotiation fails. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - """ - self.logger.debug("Negotiating capabilities ...") - - arguments: Dict[str, List[str]] = {'enable': []} - if self._greeting and 'oob' in self._greeting.QMP.capabilities: - arguments['enable'].append('oob') - msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) - - # It's not safe to use execute() here, because the reader/writers - # aren't running. AsyncProtocol *requires* that a new session - # does not fail after the reader/writers are running! - try: - await self._send(msg) - reply = await self._recv() - assert 'return' in reply - assert 'error' not in reply - except (ProtocolError, AssertionError) as err: - emsg = "Negotiation failed" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise NegotiationError(emsg, err) from err - except BaseException as err: - # EOFError, OSError, or something unexpected. - emsg = "Negotiation failed" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - - @bottom_half - async def _bh_disconnect(self) -> None: - try: - await super()._bh_disconnect() - finally: - if self._pending: - self.logger.debug("Cancelling pending executions") - keys = self._pending.keys() - for key in keys: - self.logger.debug("Cancelling execution '%s'", key) - self._pending[key].put_nowait( - ExecInterruptedError("Disconnected") - ) - - self.logger.debug("QMP Disconnected.") - - @upper_half - def _cleanup(self) -> None: - super()._cleanup() - assert not self._pending - - @bottom_half - async def _on_message(self, msg: Message) -> None: - """ - Add an incoming message to the appropriate queue/handler. - - :raise ServerParseError: When Message indicates server parse failure. - """ - # Incoming messages are not fully parsed/validated here; - # do only light peeking to know how to route the messages. - - if 'event' in msg: - await self._event_dispatch(msg) - return - - # Below, we assume everything left is an execute/exec-oob response. - - exec_id = cast(Optional[str], msg.get('id')) - - if exec_id in self._pending: - await self._pending[exec_id].put(msg) - return - - # We have a message we can't route back to a caller. - - is_error = 'error' in msg - has_id = 'id' in msg - - if is_error and not has_id: - # This is very likely a server parsing error. - # It doesn't inherently belong to any pending execution. - # Instead of performing clever recovery, just terminate. - # See "NOTE" in qmp-spec.txt, section 2.4.2 - raise ServerParseError( - ("Server sent an error response without an ID, " - "but there are no ID-less executions pending. " - "Assuming this is a server parser failure."), - msg - ) - - # qmp-spec.txt, section 2.4: - # 'Clients should drop all the responses - # that have an unknown "id" field.' - self.logger.log( - logging.ERROR if is_error else logging.WARNING, - "Unknown ID '%s', message dropped.", - exec_id, - ) - self.logger.debug("Unroutable message: %s", str(msg)) - - @upper_half - @bottom_half - async def _do_recv(self) -> Message: - """ - :raise OSError: When a stream error is encountered. - :raise EOFError: When the stream is at EOF. - :raise ProtocolError: - When the Message is not understood. - See also `Message._deserialize`. - - :return: A single QMP `Message`. - """ - msg_bytes = await self._readline() - msg = Message(msg_bytes, eager=True) - return msg - - @upper_half - @bottom_half - def _do_send(self, msg: Message) -> None: - """ - :raise ValueError: JSON serialization failure - :raise TypeError: JSON serialization failure - :raise OSError: When a stream error is encountered. - """ - assert self._writer is not None - self._writer.write(bytes(msg)) - - @upper_half - def _get_exec_id(self) -> str: - exec_id = f"__aqmp#{self._execute_id:05d}" - self._execute_id += 1 - return exec_id - - @upper_half - async def _issue(self, msg: Message) -> Union[None, str]: - """ - Issue a QMP `Message` and do not wait for a reply. - - :param msg: The QMP `Message` to send to the server. - - :return: The ID of the `Message` sent. - """ - msg_id: Optional[str] = None - if 'id' in msg: - assert isinstance(msg['id'], str) - msg_id = msg['id'] - - self._pending[msg_id] = asyncio.Queue(maxsize=1) - await self._outgoing.put(msg) - - return msg_id - - @upper_half - async def _reply(self, msg_id: Union[str, None]) -> Message: - """ - Await a reply to a previously issued QMP message. - - :param msg_id: The ID of the previously issued message. - - :return: The reply from the server. - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - """ - queue = self._pending[msg_id] - result = await queue.get() - - try: - if isinstance(result, ExecInterruptedError): - raise result - return result - finally: - del self._pending[msg_id] - - @upper_half - async def _execute(self, msg: Message, assign_id: bool = True) -> Message: - """ - Send a QMP `Message` to the server and await a reply. - - This method *assumes* you are sending some kind of an execute - statement that *will* receive a reply. - - An execution ID will be assigned if assign_id is `True`. It can be - disabled, but this requires that an ID is manually assigned - instead. For manually assigned IDs, you must not use the string - '__aqmp#' anywhere in the ID. - - :param msg: The QMP `Message` to execute. - :param assign_id: If True, assign a new execution ID. - - :return: Execution reply from the server. - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - """ - if assign_id: - msg['id'] = self._get_exec_id() - elif 'id' in msg: - assert isinstance(msg['id'], str) - assert '__aqmp#' not in msg['id'] - - exec_id = await self._issue(msg) - return await self._reply(exec_id) - - @upper_half - @require(Runstate.RUNNING) - async def _raw( - self, - msg: Union[Message, Mapping[str, object], bytes], - assign_id: bool = True, - ) -> Message: - """ - Issue a raw `Message` to the QMP server and await a reply. - - :param msg: - A Message to send to the server. It may be a `Message`, any - Mapping (including Dict), or raw bytes. - :param assign_id: - Assign an arbitrary execution ID to this message. If - `False`, the existing id must either be absent (and no other - such pending execution may omit an ID) or a string. If it is - a string, it must not start with '__aqmp#' and no other such - pending execution may currently be using that ID. - - :return: Execution reply from the server. - - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - :raise TypeError: - When assign_id is `False`, an ID is given, and it is not a string. - :raise ValueError: - When assign_id is `False`, but the ID is not usable; - Either because it starts with '__aqmp#' or it is already in-use. - """ - # 1. convert generic Mapping or bytes to a QMP Message - # 2. copy Message objects so that we assign an ID only to the copy. - msg = Message(msg) - - exec_id = msg.get('id') - if not assign_id and 'id' in msg: - if not isinstance(exec_id, str): - raise TypeError(f"ID ('{exec_id}') must be a string.") - if exec_id.startswith('__aqmp#'): - raise ValueError( - f"ID ('{exec_id}') must not start with '__aqmp#'." - ) - - if not assign_id and exec_id in self._pending: - raise ValueError( - f"ID '{exec_id}' is in-use and cannot be used." - ) - - return await self._execute(msg, assign_id=assign_id) - - @upper_half - @require(Runstate.RUNNING) - async def execute_msg(self, msg: Message) -> object: - """ - Execute a QMP command and return its value. - - :param msg: The QMP `Message` to execute. - - :return: - The command execution return value from the server. The type of - object returned depends on the command that was issued, - though most in QEMU return a `dict`. - :raise ValueError: - If the QMP `Message` does not have either the 'execute' or - 'exec-oob' fields set. - :raise ExecuteError: When the server returns an error response. - :raise ExecInterruptedError: if the connection was terminated early. - """ - if not ('execute' in msg or 'exec-oob' in msg): - raise ValueError("Requires 'execute' or 'exec-oob' message") - - # Copy the Message so that the ID assigned by _execute() is - # local to this method; allowing the ID to be seen in raised - # Exceptions but without modifying the caller's held copy. - msg = Message(msg) - reply = await self._execute(msg) - - if 'error' in reply: - try: - error_response = ErrorResponse(reply) - except (KeyError, TypeError) as err: - # Error response was malformed. - raise BadReplyError( - "QMP error reply is malformed", reply, msg, - ) from err - - raise ExecuteError(error_response, msg, reply) - - if 'return' not in reply: - raise BadReplyError( - "QMP reply is missing a 'error' or 'return' member", - reply, msg, - ) - - return reply['return'] - - @classmethod - def make_execute_msg(cls, cmd: str, - arguments: Optional[Mapping[str, object]] = None, - oob: bool = False) -> Message: - """ - Create an executable message to be sent by `execute_msg` later. - - :param cmd: QMP command name. - :param arguments: Arguments (if any). Must be JSON-serializable. - :param oob: If `True`, execute "out of band". - - :return: An executable QMP `Message`. - """ - msg = Message({'exec-oob' if oob else 'execute': cmd}) - if arguments is not None: - msg['arguments'] = arguments - return msg - - @upper_half - async def execute(self, cmd: str, - arguments: Optional[Mapping[str, object]] = None, - oob: bool = False) -> object: - """ - Execute a QMP command and return its value. - - :param cmd: QMP command name. - :param arguments: Arguments (if any). Must be JSON-serializable. - :param oob: If `True`, execute "out of band". - - :return: - The command execution return value from the server. The type of - object returned depends on the command that was issued, - though most in QEMU return a `dict`. - :raise ExecuteError: When the server returns an error response. - :raise ExecInterruptedError: if the connection was terminated early. - """ - msg = self.make_execute_msg(cmd, arguments, oob=oob) - return await self.execute_msg(msg) diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py deleted file mode 100644 index eaa5fc7d5f..0000000000 --- a/python/qemu/aqmp/util.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -Miscellaneous Utilities - -This module provides asyncio utilities and compatibility wrappers for -Python 3.6 to provide some features that otherwise become available in -Python 3.7+. - -Various logging and debugging utilities are also provided, such as -`exception_summary()` and `pretty_traceback()`, used primarily for -adding information into the logging stream. -""" - -import asyncio -import sys -import traceback -from typing import ( - Any, - Coroutine, - Optional, - TypeVar, - cast, -) - - -T = TypeVar('T') - - -# -------------------------- -# Section: Utility Functions -# -------------------------- - - -async def flush(writer: asyncio.StreamWriter) -> None: - """ - Utility function to ensure a StreamWriter is *fully* drained. - - `asyncio.StreamWriter.drain` only promises we will return to below - the "high-water mark". This function ensures we flush the entire - buffer -- by setting the high water mark to 0 and then calling - drain. The flow control limits are restored after the call is - completed. - """ - transport = cast(asyncio.WriteTransport, writer.transport) - - # https://github.com/python/typeshed/issues/5779 - low, high = transport.get_write_buffer_limits() # type: ignore - transport.set_write_buffer_limits(0, 0) - try: - await writer.drain() - finally: - transport.set_write_buffer_limits(high, low) - - -def upper_half(func: T) -> T: - """ - Do-nothing decorator that annotates a method as an "upper-half" method. - - These methods must not call bottom-half functions directly, but can - schedule them to run. - """ - return func - - -def bottom_half(func: T) -> T: - """ - Do-nothing decorator that annotates a method as a "bottom-half" method. - - These methods must take great care to handle their own exceptions whenever - possible. If they go unhandled, they will cause termination of the loop. - - These methods do not, in general, have the ability to directly - report information to a caller’s context and will usually be - collected as a Task result instead. - - They must not call upper-half functions directly. - """ - return func - - -# ------------------------------- -# Section: Compatibility Wrappers -# ------------------------------- - - -def create_task(coro: Coroutine[Any, Any, T], - loop: Optional[asyncio.AbstractEventLoop] = None - ) -> 'asyncio.Future[T]': - """ - Python 3.6-compatible `asyncio.create_task` wrapper. - - :param coro: The coroutine to execute in a task. - :param loop: Optionally, the loop to create the task in. - - :return: An `asyncio.Future` object. - """ - if sys.version_info >= (3, 7): - if loop is not None: - return loop.create_task(coro) - return asyncio.create_task(coro) # pylint: disable=no-member - - # Python 3.6: - return asyncio.ensure_future(coro, loop=loop) - - -def is_closing(writer: asyncio.StreamWriter) -> bool: - """ - Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. - - :param writer: The `asyncio.StreamWriter` object. - :return: `True` if the writer is closing, or closed. - """ - if sys.version_info >= (3, 7): - return writer.is_closing() - - # Python 3.6: - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - return transport.is_closing() - - -async def wait_closed(writer: asyncio.StreamWriter) -> None: - """ - Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. - - :param writer: The `asyncio.StreamWriter` to wait on. - """ - if sys.version_info >= (3, 7): - await writer.wait_closed() - return - - # Python 3.6 - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - - while not transport.is_closing(): - await asyncio.sleep(0) - - # This is an ugly workaround, but it's the best I can come up with. - sock = transport.get_extra_info('socket') - - if sock is None: - # Our transport doesn't have a socket? ... - # Nothing we can reasonably do. - return - - while sock.fileno() != -1: - await asyncio.sleep(0) - - -def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: - """ - Python 3.6-compatible `asyncio.run` wrapper. - - :param coro: A coroutine to execute now. - :return: The return value from the coroutine. - """ - if sys.version_info >= (3, 7): - return asyncio.run(coro, debug=debug) - - # Python 3.6 - loop = asyncio.get_event_loop() - loop.set_debug(debug) - ret = loop.run_until_complete(coro) - loop.close() - - return ret - - -# ---------------------------- -# Section: Logging & Debugging -# ---------------------------- - - -def exception_summary(exc: BaseException) -> str: - """ - Return a summary string of an arbitrary exception. - - It will be of the form "ExceptionType: Error Message", if the error - string is non-empty, and just "ExceptionType" otherwise. - """ - name = type(exc).__qualname__ - smod = type(exc).__module__ - if smod not in ("__main__", "builtins"): - name = smod + '.' + name - - error = str(exc) - if error: - return f"{name}: {error}" - return name - - -def pretty_traceback(prefix: str = " | ") -> str: - """ - Formats the current traceback, indented to provide visual distinction. - - This is useful for printing a traceback within a traceback for - debugging purposes when encapsulating errors to deliver them up the - stack; when those errors are printed, this helps provide a nice - visual grouping to quickly identify the parts of the error that - belong to the inner exception. - - :param prefix: The prefix to append to each line of the traceback. - :return: A string, formatted something like the following:: - - | Traceback (most recent call last): - | File "foobar.py", line 42, in arbitrary_example - | foo.baz() - | ArbitraryError: [Errno 42] Something bad happened! - """ - output = "".join(traceback.format_exception(*sys.exc_info())) - - exc_lines = [] - for line in output.split('\n'): - exc_lines.append(prefix + line) - - # The last line is always empty, omit it - return "\n".join(exc_lines[:-1]) |