aboutsummaryrefslogtreecommitdiff
path: root/python/qemu/aqmp
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp')
-rw-r--r--python/qemu/aqmp/__init__.py59
-rw-r--r--python/qemu/aqmp/aqmp_tui.py652
-rw-r--r--python/qemu/aqmp/error.py50
-rw-r--r--python/qemu/aqmp/events.py706
-rw-r--r--python/qemu/aqmp/message.py209
-rw-r--r--python/qemu/aqmp/models.py133
-rw-r--r--python/qemu/aqmp/protocol.py902
-rw-r--r--python/qemu/aqmp/py.typed0
-rw-r--r--python/qemu/aqmp/qmp_client.py621
-rw-r--r--python/qemu/aqmp/util.py217
10 files changed, 0 insertions, 3549 deletions
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
deleted file mode 100644
index ab1782999c..0000000000
--- a/python/qemu/aqmp/__init__.py
+++ /dev/null
@@ -1,59 +0,0 @@
-"""
-QEMU Monitor Protocol (QMP) development library & tooling.
-
-This package provides a fairly low-level class for communicating
-asynchronously with QMP protocol servers, as implemented by QEMU, the
-QEMU Guest Agent, and the QEMU Storage Daemon.
-
-`QMPClient` provides the main functionality of this package. All errors
-raised by this library dervive from `AQMPError`, see `aqmp.error` for
-additional detail. See `aqmp.events` for an in-depth tutorial on
-managing QMP events.
-"""
-
-# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc.
-#
-# Authors:
-# John Snow <jsnow@redhat.com>
-#
-# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-
-import warnings
-
-from .error import AQMPError
-from .events import EventListener
-from .message import Message
-from .protocol import ConnectError, Runstate, StateError
-from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
-
-
-_WMSG = """
-
-The Asynchronous QMP library is currently in development and its API
-should be considered highly fluid and subject to change. It should
-not be used by any other scripts checked into the QEMU tree.
-
-Proceed with caution!
-"""
-
-warnings.warn(_WMSG, FutureWarning)
-
-
-# The order of these fields impact the Sphinx documentation order.
-__all__ = (
- # Classes, most to least important
- 'QMPClient',
- 'Message',
- 'EventListener',
- 'Runstate',
-
- # Exceptions, most generic to most explicit
- 'AQMPError',
- 'StateError',
- 'ConnectError',
- 'ExecuteError',
- 'ExecInterruptedError',
-)
diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
deleted file mode 100644
index a2929f771c..0000000000
--- a/python/qemu/aqmp/aqmp_tui.py
+++ /dev/null
@@ -1,652 +0,0 @@
-# Copyright (c) 2021
-#
-# Authors:
-# Niteesh Babu G S <niteesh.gs@gmail.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2 or
-# later. See the COPYING file in the top-level directory.
-"""
-AQMP TUI
-
-AQMP TUI is an asynchronous interface built on top the of the AQMP library.
-It is the successor of QMP-shell and is bought-in as a replacement for it.
-
-Example Usage: aqmp-tui <SOCKET | TCP IP:PORT>
-Full Usage: aqmp-tui --help
-"""
-
-import argparse
-import asyncio
-import json
-import logging
-from logging import Handler, LogRecord
-import signal
-from typing import (
- List,
- Optional,
- Tuple,
- Type,
- Union,
- cast,
-)
-
-from pygments import lexers
-from pygments import token as Token
-import urwid
-import urwid_readline
-
-from ..qmp import QEMUMonitorProtocol, QMPBadPortError
-from .error import ProtocolError
-from .message import DeserializationError, Message, UnexpectedTypeError
-from .protocol import ConnectError, Runstate
-from .qmp_client import ExecInterruptedError, QMPClient
-from .util import create_task, pretty_traceback
-
-
-# The name of the signal that is used to update the history list
-UPDATE_MSG: str = 'UPDATE_MSG'
-
-
-palette = [
- (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),
- (Token.Text, '', '', '', '', 'g7'),
- (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),
- (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),
- (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),
- (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),
- ('DEBUG', '', '', '', '#ddf', 'g7'),
- ('INFO', '', '', '', 'g100', 'g7'),
- ('WARNING', '', '', '', '#ff6', 'g7'),
- ('ERROR', '', '', '', '#a00', 'g7'),
- ('CRITICAL', '', '', '', '#a00', 'g7'),
- ('background', '', 'black', '', '', 'g7'),
-]
-
-
-def format_json(msg: str) -> str:
- """
- Formats valid/invalid multi-line JSON message into a single-line message.
-
- Formatting is first tried using the standard json module. If that fails
- due to an decoding error then a simple string manipulation is done to
- achieve a single line JSON string.
-
- Converting into single line is more asthetically pleasing when looking
- along with error messages.
-
- Eg:
- Input:
- [ 1,
- true,
- 3 ]
- The above input is not a valid QMP message and produces the following error
- "QMP message is not a JSON object."
- When displaying this in TUI in multiline mode we get
-
- [ 1,
- true,
- 3 ]: QMP message is not a JSON object.
-
- whereas in singleline mode we get the following
-
- [1, true, 3]: QMP message is not a JSON object.
-
- The single line mode is more asthetically pleasing.
-
- :param msg:
- The message to formatted into single line.
-
- :return: Formatted singleline message.
- """
- try:
- msg = json.loads(msg)
- return str(json.dumps(msg))
- except json.decoder.JSONDecodeError:
- msg = msg.replace('\n', '')
- words = msg.split(' ')
- words = list(filter(None, words))
- return ' '.join(words)
-
-
-def has_handler_type(logger: logging.Logger,
- handler_type: Type[Handler]) -> bool:
- """
- The Logger class has no interface to check if a certain type of handler is
- installed or not. So we provide an interface to do so.
-
- :param logger:
- Logger object
- :param handler_type:
- The type of the handler to be checked.
-
- :return: returns True if handler of type `handler_type`.
- """
- for handler in logger.handlers:
- if isinstance(handler, handler_type):
- return True
- return False
-
-
-class App(QMPClient):
- """
- Implements the AQMP TUI.
-
- Initializes the widgets and starts the urwid event loop.
-
- :param address:
- Address of the server to connect to.
- :param num_retries:
- The number of times to retry before stopping to reconnect.
- :param retry_delay:
- The delay(sec) before each retry
- """
- def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
- retry_delay: Optional[int]) -> None:
- urwid.register_signal(type(self), UPDATE_MSG)
- self.window = Window(self)
- self.address = address
- self.aloop: Optional[asyncio.AbstractEventLoop] = None
- self.num_retries = num_retries
- self.retry_delay = retry_delay if retry_delay else 2
- self.retry: bool = False
- self.exiting: bool = False
- super().__init__()
-
- def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
- """
- Appends the msg to the history list.
-
- :param msg:
- The raw message to be appended in string type.
- """
- urwid.emit_signal(self, UPDATE_MSG, msg, level)
-
- def _cb_outbound(self, msg: Message) -> Message:
- """
- Callback: outbound message hook.
-
- Appends the outgoing messages to the history box.
-
- :param msg: raw outbound message.
- :return: final outbound message.
- """
- str_msg = str(msg)
-
- if not has_handler_type(logging.getLogger(), TUILogHandler):
- logging.debug('Request: %s', str_msg)
- self.add_to_history('<-- ' + str_msg)
- return msg
-
- def _cb_inbound(self, msg: Message) -> Message:
- """
- Callback: outbound message hook.
-
- Appends the incoming messages to the history box.
-
- :param msg: raw inbound message.
- :return: final inbound message.
- """
- str_msg = str(msg)
-
- if not has_handler_type(logging.getLogger(), TUILogHandler):
- logging.debug('Request: %s', str_msg)
- self.add_to_history('--> ' + str_msg)
- return msg
-
- async def _send_to_server(self, msg: Message) -> None:
- """
- This coroutine sends the message to the server.
- The message has to be pre-validated.
-
- :param msg:
- Pre-validated message to be to sent to the server.
-
- :raise Exception: When an unhandled exception is caught.
- """
- try:
- await self._raw(msg, assign_id='id' not in msg)
- except ExecInterruptedError as err:
- logging.info('Error server disconnected before reply %s', str(err))
- self.add_to_history('Server disconnected before reply', 'ERROR')
- except Exception as err:
- logging.error('Exception from _send_to_server: %s', str(err))
- raise err
-
- def cb_send_to_server(self, raw_msg: str) -> None:
- """
- Validates and sends the message to the server.
- The raw string message is first converted into a Message object
- and is then sent to the server.
-
- :param raw_msg:
- The raw string message to be sent to the server.
-
- :raise Exception: When an unhandled exception is caught.
- """
- try:
- msg = Message(bytes(raw_msg, encoding='utf-8'))
- create_task(self._send_to_server(msg))
- except (DeserializationError, UnexpectedTypeError) as err:
- raw_msg = format_json(raw_msg)
- logging.info('Invalid message: %s', err.error_message)
- self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
-
- def unhandled_input(self, key: str) -> None:
- """
- Handle's keys which haven't been handled by the child widgets.
-
- :param key:
- Unhandled key
- """
- if key == 'esc':
- self.kill_app()
-
- def kill_app(self) -> None:
- """
- Initiates killing of app. A bridge between asynchronous and synchronous
- code.
- """
- create_task(self._kill_app())
-
- async def _kill_app(self) -> None:
- """
- This coroutine initiates the actual disconnect process and calls
- urwid.ExitMainLoop() to kill the TUI.
-
- :raise Exception: When an unhandled exception is caught.
- """
- self.exiting = True
- await self.disconnect()
- logging.debug('Disconnect finished. Exiting app')
- raise urwid.ExitMainLoop()
-
- async def disconnect(self) -> None:
- """
- Overrides the disconnect method to handle the errors locally.
- """
- try:
- await super().disconnect()
- except (OSError, EOFError) as err:
- logging.info('disconnect: %s', str(err))
- self.retry = True
- except ProtocolError as err:
- logging.info('disconnect: %s', str(err))
- except Exception as err:
- logging.error('disconnect: Unhandled exception %s', str(err))
- raise err
-
- def _set_status(self, msg: str) -> None:
- """
- Sets the message as the status.
-
- :param msg:
- The message to be displayed in the status bar.
- """
- self.window.footer.set_text(msg)
-
- def _get_formatted_address(self) -> str:
- """
- Returns a formatted version of the server's address.
-
- :return: formatted address
- """
- if isinstance(self.address, tuple):
- host, port = self.address
- addr = f'{host}:{port}'
- else:
- addr = f'{self.address}'
- return addr
-
- async def _initiate_connection(self) -> Optional[ConnectError]:
- """
- Tries connecting to a server a number of times with a delay between
- each try. If all retries failed then return the error faced during
- the last retry.
-
- :return: Error faced during last retry.
- """
- current_retries = 0
- err = None
-
- # initial try
- await self.connect_server()
- while self.retry and current_retries < self.num_retries:
- logging.info('Connection Failed, retrying in %d', self.retry_delay)
- status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
- self._set_status(status)
-
- await asyncio.sleep(self.retry_delay)
-
- err = await self.connect_server()
- current_retries += 1
- # If all retries failed report the last error
- if err:
- logging.info('All retries failed: %s', err)
- return err
- return None
-
- async def manage_connection(self) -> None:
- """
- Manage the connection based on the current run state.
-
- A reconnect is issued when the current state is IDLE and the number
- of retries is not exhausted.
- A disconnect is issued when the current state is DISCONNECTING.
- """
- while not self.exiting:
- if self.runstate == Runstate.IDLE:
- err = await self._initiate_connection()
- # If retry is still true then, we have exhausted all our tries.
- if err:
- self._set_status(f'[Error: {err.error_message}]')
- else:
- addr = self._get_formatted_address()
- self._set_status(f'[Connected {addr}]')
- elif self.runstate == Runstate.DISCONNECTING:
- self._set_status('[Disconnected]')
- await self.disconnect()
- # check if a retry is needed
- if self.runstate == Runstate.IDLE:
- continue
- await self.runstate_changed()
-
- async def connect_server(self) -> Optional[ConnectError]:
- """
- Initiates a connection to the server at address `self.address`
- and in case of a failure, sets the status to the respective error.
- """
- try:
- await self.connect(self.address)
- self.retry = False
- except ConnectError as err:
- logging.info('connect_server: ConnectError %s', str(err))
- self.retry = True
- return err
- return None
-
- def run(self, debug: bool = False) -> None:
- """
- Starts the long running co-routines and the urwid event loop.
-
- :param debug:
- Enables/Disables asyncio event loop debugging
- """
- screen = urwid.raw_display.Screen()
- screen.set_terminal_properties(256)
-
- self.aloop = asyncio.get_event_loop()
- self.aloop.set_debug(debug)
-
- # Gracefully handle SIGTERM and SIGINT signals
- cancel_signals = [signal.SIGTERM, signal.SIGINT]
- for sig in cancel_signals:
- self.aloop.add_signal_handler(sig, self.kill_app)
-
- event_loop = urwid.AsyncioEventLoop(loop=self.aloop)
- main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),
- unhandled_input=self.unhandled_input,
- screen=screen,
- palette=palette,
- handle_mouse=True,
- event_loop=event_loop)
-
- create_task(self.manage_connection(), self.aloop)
- try:
- main_loop.run()
- except Exception as err:
- logging.error('%s\n%s\n', str(err), pretty_traceback())
- raise err
-
-
-class StatusBar(urwid.Text):
- """
- A simple statusbar modelled using the Text widget. The status can be
- set using the set_text function. All text set is aligned to right.
-
- :param text: Initial text to be displayed. Default is empty str.
- """
- def __init__(self, text: str = ''):
- super().__init__(text, align='right')
-
-
-class Editor(urwid_readline.ReadlineEdit):
- """
- A simple editor modelled using the urwid_readline.ReadlineEdit widget.
- Mimcs GNU readline shortcuts and provides history support.
-
- The readline shortcuts can be found below:
- https://github.com/rr-/urwid_readline#features
-
- Along with the readline features, this editor also has support for
- history. Pressing the 'up'/'down' switches between the prev/next messages
- available in the history.
-
- Currently there is no support to save the history to a file. The history of
- previous commands is lost on exit.
-
- :param parent: Reference to the TUI object.
- """
- def __init__(self, parent: App) -> None:
- super().__init__(caption='> ', multiline=True)
- self.parent = parent
- self.history: List[str] = []
- self.last_index: int = -1
- self.show_history: bool = False
-
- def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:
- """
- Handles the keypress on this widget.
-
- :param size:
- The current size of the widget.
- :param key:
- The key to be handled.
-
- :return: Unhandled key if any.
- """
- msg = self.get_edit_text()
- if key == 'up' and not msg:
- # Show the history when 'up arrow' is pressed with no input text.
- # NOTE: The show_history logic is necessary because in 'multiline'
- # mode (which we use) 'up arrow' is used to move between lines.
- if not self.history:
- return None
- self.show_history = True
- last_msg = self.history[self.last_index]
- self.set_edit_text(last_msg)
- self.edit_pos = len(last_msg)
- elif key == 'up' and self.show_history:
- self.last_index = max(self.last_index - 1, -len(self.history))
- self.set_edit_text(self.history[self.last_index])
- self.edit_pos = len(self.history[self.last_index])
- elif key == 'down' and self.show_history:
- if self.last_index == -1:
- self.set_edit_text('')
- self.show_history = False
- else:
- self.last_index += 1
- self.set_edit_text(self.history[self.last_index])
- self.edit_pos = len(self.history[self.last_index])
- elif key == 'meta enter':
- # When using multiline, enter inserts a new line into the editor
- # send the input to the server on alt + enter
- self.parent.cb_send_to_server(msg)
- self.history.append(msg)
- self.set_edit_text('')
- self.last_index = -1
- self.show_history = False
- else:
- self.show_history = False
- self.last_index = -1
- return cast(Optional[str], super().keypress(size, key))
- return None
-
-
-class EditorWidget(urwid.Filler):
- """
- Wrapper around the editor widget.
-
- The Editor is a flow widget and has to wrapped inside a box widget.
- This class wraps the Editor inside filler widget.
-
- :param parent: Reference to the TUI object.
- """
- def __init__(self, parent: App) -> None:
- super().__init__(Editor(parent), valign='top')
-
-
-class HistoryBox(urwid.ListBox):
- """
- This widget is modelled using the ListBox widget, contains the list of
- all messages both QMP messages and log messsages to be shown in the TUI.
-
- The messages are urwid.Text widgets. On every append of a message, the
- focus is shifted to the last appended message.
-
- :param parent: Reference to the TUI object.
- """
- def __init__(self, parent: App) -> None:
- self.parent = parent
- self.history = urwid.SimpleFocusListWalker([])
- super().__init__(self.history)
-
- def add_to_history(self,
- history: Union[str, List[Tuple[str, str]]]) -> None:
- """
- Appends a message to the list and set the focus to the last appended
- message.
-
- :param history:
- The history item(message/event) to be appended to the list.
- """
- self.history.append(urwid.Text(history))
- self.history.set_focus(len(self.history) - 1)
-
- def mouse_event(self, size: Tuple[int, int], _event: str, button: float,
- _x: int, _y: int, focus: bool) -> None:
- # Unfortunately there are no urwid constants that represent the mouse
- # events.
- if button == 4: # Scroll up event
- super().keypress(size, 'up')
- elif button == 5: # Scroll down event
- super().keypress(size, 'down')
-
-
-class HistoryWindow(urwid.Frame):
- """
- This window composes the HistoryBox and EditorWidget in a horizontal split.
- By default the first focus is given to the history box.
-
- :param parent: Reference to the TUI object.
- """
- def __init__(self, parent: App) -> None:
- self.parent = parent
- self.editor_widget = EditorWidget(parent)
- self.editor = urwid.LineBox(self.editor_widget)
- self.history = HistoryBox(parent)
- self.body = urwid.Pile([('weight', 80, self.history),
- ('weight', 20, self.editor)])
- super().__init__(self.body)
- urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)
-
- def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:
- """
- Appends a message to the history box
-
- :param msg:
- The message to be appended to the history box.
- :param level:
- The log level of the message, if it is a log message.
- """
- formatted = []
- if level:
- msg = f'[{level}]: {msg}'
- formatted.append((level, msg))
- else:
- lexer = lexers.JsonLexer() # pylint: disable=no-member
- for token in lexer.get_tokens(msg):
- formatted.append(token)
- self.history.add_to_history(formatted)
-
-
-class Window(urwid.Frame):
- """
- This window is the top most widget of the TUI and will contain other
- windows. Each child of this widget is responsible for displaying a specific
- functionality.
-
- :param parent: Reference to the TUI object.
- """
- def __init__(self, parent: App) -> None:
- self.parent = parent
- footer = StatusBar()
- body = HistoryWindow(parent)
- super().__init__(body, footer=footer)
-
-
-class TUILogHandler(Handler):
- """
- This handler routes all the log messages to the TUI screen.
- It is installed to the root logger to so that the log message from all
- libraries begin used is routed to the screen.
-
- :param tui: Reference to the TUI object.
- """
- def __init__(self, tui: App) -> None:
- super().__init__()
- self.tui = tui
-
- def emit(self, record: LogRecord) -> None:
- """
- Emits a record to the TUI screen.
-
- Appends the log message to the TUI screen
- """
- level = record.levelname
- msg = record.getMessage()
- self.tui.add_to_history(msg, level)
-
-
-def main() -> None:
- """
- Driver of the whole script, parses arguments, initialize the TUI and
- the logger.
- """
- parser = argparse.ArgumentParser(description='AQMP TUI')
- parser.add_argument('qmp_server', help='Address of the QMP server. '
- 'Format <UNIX socket path | TCP addr:port>')
- parser.add_argument('--num-retries', type=int, default=10,
- help='Number of times to reconnect before giving up.')
- parser.add_argument('--retry-delay', type=int,
- help='Time(s) to wait before next retry. '
- 'Default action is to wait 2s between each retry.')
- parser.add_argument('--log-file', help='The Log file name')
- parser.add_argument('--log-level', default='WARNING',
- help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
- parser.add_argument('--asyncio-debug', action='store_true',
- help='Enable debug mode for asyncio loop. '
- 'Generates lot of output, makes TUI unusable when '
- 'logs are logged in the TUI. '
- 'Use only when logging to a file.')
- args = parser.parse_args()
-
- try:
- address = QEMUMonitorProtocol.parse_address(args.qmp_server)
- except QMPBadPortError as err:
- parser.error(str(err))
-
- app = App(address, args.num_retries, args.retry_delay)
-
- root_logger = logging.getLogger()
- root_logger.setLevel(logging.getLevelName(args.log_level))
-
- if args.log_file:
- root_logger.addHandler(logging.FileHandler(args.log_file))
- else:
- root_logger.addHandler(TUILogHandler(app))
-
- app.run(args.asyncio_debug)
-
-
-if __name__ == '__main__':
- main()
diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py
deleted file mode 100644
index 781f49b008..0000000000
--- a/python/qemu/aqmp/error.py
+++ /dev/null
@@ -1,50 +0,0 @@
-"""
-AQMP Error Classes
-
-This package seeks to provide semantic error classes that are intended
-to be used directly by clients when they would like to handle particular
-semantic failures (e.g. "failed to connect") without needing to know the
-enumeration of possible reasons for that failure.
-
-AQMPError serves as the ancestor for all exceptions raised by this
-package, and is suitable for use in handling semantic errors from this
-library. In most cases, individual public methods will attempt to catch
-and re-encapsulate various exceptions to provide a semantic
-error-handling interface.
-
-.. admonition:: AQMP Exception Hierarchy Reference
-
- | `Exception`
- | +-- `AQMPError`
- | +-- `ConnectError`
- | +-- `StateError`
- | +-- `ExecInterruptedError`
- | +-- `ExecuteError`
- | +-- `ListenerError`
- | +-- `ProtocolError`
- | +-- `DeserializationError`
- | +-- `UnexpectedTypeError`
- | +-- `ServerParseError`
- | +-- `BadReplyError`
- | +-- `GreetingError`
- | +-- `NegotiationError`
-"""
-
-
-class AQMPError(Exception):
- """Abstract error class for all errors originating from this package."""
-
-
-class ProtocolError(AQMPError):
- """
- Abstract error class for protocol failures.
-
- Semantically, these errors are generally the fault of either the
- protocol server or as a result of a bug in this library.
-
- :param error_message: Human-readable string describing the error.
- """
- def __init__(self, error_message: str):
- super().__init__(error_message)
- #: Human-readable error message, without any prefix.
- self.error_message: str = error_message
diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py
deleted file mode 100644
index fb81d21610..0000000000
--- a/python/qemu/aqmp/events.py
+++ /dev/null
@@ -1,706 +0,0 @@
-"""
-AQMP Events and EventListeners
-
-Asynchronous QMP uses `EventListener` objects to listen for events. An
-`EventListener` is a FIFO event queue that can be pre-filtered to listen
-for only specific events. Each `EventListener` instance receives its own
-copy of events that it hears, so events may be consumed without fear or
-worry for depriving other listeners of events they need to hear.
-
-
-EventListener Tutorial
-----------------------
-
-In all of the following examples, we assume that we have a `QMPClient`
-instantiated named ``qmp`` that is already connected.
-
-
-`listener()` context blocks with one name
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The most basic usage is by using the `listener()` context manager to
-construct them:
-
-.. code:: python
-
- with qmp.listener('STOP') as listener:
- await qmp.execute('stop')
- await listener.get()
-
-The listener is active only for the duration of the ‘with’ block. This
-instance listens only for ‘STOP’ events.
-
-
-`listener()` context blocks with two or more names
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Multiple events can be selected for by providing any ``Iterable[str]``:
-
-.. code:: python
-
- with qmp.listener(('STOP', 'RESUME')) as listener:
- await qmp.execute('stop')
- event = await listener.get()
- assert event['event'] == 'STOP'
-
- await qmp.execute('cont')
- event = await listener.get()
- assert event['event'] == 'RESUME'
-
-
-`listener()` context blocks with no names
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-By omitting names entirely, you can listen to ALL events.
-
-.. code:: python
-
- with qmp.listener() as listener:
- await qmp.execute('stop')
- event = await listener.get()
- assert event['event'] == 'STOP'
-
-This isn’t a very good use case for this feature: In a non-trivial
-running system, we may not know what event will arrive next. Grabbing
-the top of a FIFO queue returning multiple kinds of events may be prone
-to error.
-
-
-Using async iterators to retrieve events
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-If you’d like to simply watch what events happen to arrive, you can use
-the listener as an async iterator:
-
-.. code:: python
-
- with qmp.listener() as listener:
- async for event in listener:
- print(f"Event arrived: {event['event']}")
-
-This is analogous to the following code:
-
-.. code:: python
-
- with qmp.listener() as listener:
- while True:
- event = listener.get()
- print(f"Event arrived: {event['event']}")
-
-This event stream will never end, so these blocks will never terminate.
-
-
-Using asyncio.Task to concurrently retrieve events
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Since a listener’s event stream will never terminate, it is not likely
-useful to use that form in a script. For longer-running clients, we can
-create event handlers by using `asyncio.Task` to create concurrent
-coroutines:
-
-.. code:: python
-
- async def print_events(listener):
- try:
- async for event in listener:
- print(f"Event arrived: {event['event']}")
- except asyncio.CancelledError:
- return
-
- with qmp.listener() as listener:
- task = asyncio.Task(print_events(listener))
- await qmp.execute('stop')
- await qmp.execute('cont')
- task.cancel()
- await task
-
-However, there is no guarantee that these events will be received by the
-time we leave this context block. Once the context block is exited, the
-listener will cease to hear any new events, and becomes inert.
-
-Be mindful of the timing: the above example will *probably*– but does
-not *guarantee*– that both STOP/RESUMED events will be printed. The
-example below outlines how to use listeners outside of a context block.
-
-
-Using `register_listener()` and `remove_listener()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-To create a listener with a longer lifetime, beyond the scope of a
-single block, create a listener and then call `register_listener()`:
-
-.. code:: python
-
- class MyClient:
- def __init__(self, qmp):
- self.qmp = qmp
- self.listener = EventListener()
-
- async def print_events(self):
- try:
- async for event in self.listener:
- print(f"Event arrived: {event['event']}")
- except asyncio.CancelledError:
- return
-
- async def run(self):
- self.task = asyncio.Task(self.print_events)
- self.qmp.register_listener(self.listener)
- await qmp.execute('stop')
- await qmp.execute('cont')
-
- async def stop(self):
- self.task.cancel()
- await self.task
- self.qmp.remove_listener(self.listener)
-
-The listener can be deactivated by using `remove_listener()`. When it is
-removed, any possible pending events are cleared and it can be
-re-registered at a later time.
-
-
-Using the built-in all events listener
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The `QMPClient` object creates its own default listener named
-:py:obj:`~Events.events` that can be used for the same purpose without
-having to create your own:
-
-.. code:: python
-
- async def print_events(listener):
- try:
- async for event in listener:
- print(f"Event arrived: {event['event']}")
- except asyncio.CancelledError:
- return
-
- task = asyncio.Task(print_events(qmp.events))
-
- await qmp.execute('stop')
- await qmp.execute('cont')
-
- task.cancel()
- await task
-
-
-Using both .get() and async iterators
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The async iterator and `get()` methods pull events from the same FIFO
-queue. If you mix the usage of both, be aware: Events are emitted
-precisely once per listener.
-
-If multiple contexts try to pull events from the same listener instance,
-events are still emitted only precisely once.
-
-This restriction can be lifted by creating additional listeners.
-
-
-Creating multiple listeners
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Additional `EventListener` objects can be created at-will. Each one
-receives its own copy of events, with separate FIFO event queues.
-
-.. code:: python
-
- my_listener = EventListener()
- qmp.register_listener(my_listener)
-
- await qmp.execute('stop')
- copy1 = await my_listener.get()
- copy2 = await qmp.events.get()
-
- assert copy1 == copy2
-
-In this example, we await an event from both a user-created
-`EventListener` and the built-in events listener. Both receive the same
-event.
-
-
-Clearing listeners
-~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects can be cleared, clearing all events seen thus far:
-
-.. code:: python
-
- await qmp.execute('stop')
- qmp.events.clear()
- await qmp.execute('cont')
- event = await qmp.events.get()
- assert event['event'] == 'RESUME'
-
-`EventListener` objects are FIFO queues. If events are not consumed,
-they will remain in the queue until they are witnessed or discarded via
-`clear()`. FIFO queues will be drained automatically upon leaving a
-context block, or when calling `remove_listener()`.
-
-
-Accessing listener history
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects record their history. Even after being cleared,
-you can obtain a record of all events seen so far:
-
-.. code:: python
-
- await qmp.execute('stop')
- await qmp.execute('cont')
- qmp.events.clear()
-
- assert len(qmp.events.history) == 2
- assert qmp.events.history[0]['event'] == 'STOP'
- assert qmp.events.history[1]['event'] == 'RESUME'
-
-The history is updated immediately and does not require the event to be
-witnessed first.
-
-
-Using event filters
-~~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects can be given complex filtering criteria if names
-are not sufficient:
-
-.. code:: python
-
- def job1_filter(event) -> bool:
- event_data = event.get('data', {})
- event_job_id = event_data.get('id')
- return event_job_id == "job1"
-
- with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
- async for event in listener:
- if event['data']['status'] == 'concluded':
- break
-
-These filters might be most useful when parameterized. `EventListener`
-objects expect a function that takes only a single argument (the raw
-event, as a `Message`) and returns a bool; True if the event should be
-accepted into the stream. You can create a function that adapts this
-signature to accept configuration parameters:
-
-.. code:: python
-
- def job_filter(job_id: str) -> EventFilter:
- def filter(event: Message) -> bool:
- return event['data']['id'] == job_id
- return filter
-
- with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
- async for event in listener:
- if event['data']['status'] == 'concluded':
- break
-
-
-Activating an existing listener with `listen()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Listeners with complex, long configurations can also be created manually
-and activated temporarily by using `listen()` instead of `listener()`:
-
-.. code:: python
-
- listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
- 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
- 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
-
- with qmp.listen(listener):
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
- async for event in listener:
- print(event)
- if event['event'] == 'BLOCK_JOB_COMPLETED':
- break
-
-Any events that are not witnessed by the time the block is left will be
-cleared from the queue; entering the block is an implicit
-`register_listener()` and leaving the block is an implicit
-`remove_listener()`.
-
-
-Activating multiple existing listeners with `listen()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-While `listener()` is only capable of creating a single listener,
-`listen()` is capable of activating multiple listeners simultaneously:
-
-.. code:: python
-
- def job_filter(job_id: str) -> EventFilter:
- def filter(event: Message) -> bool:
- return event['data']['id'] == job_id
- return filter
-
- jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
- jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
-
- with qmp.listen(jobA, jobB):
- qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
- qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
-
- async for event in jobA.get():
- if event['data']['status'] == 'concluded':
- break
- async for event in jobB.get():
- if event['data']['status'] == 'concluded':
- break
-
-
-Extending the `EventListener` class
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-In the case that a more specialized `EventListener` is desired to
-provide either more functionality or more compact syntax for specialized
-cases, it can be extended.
-
-One of the key methods to extend or override is
-:py:meth:`~EventListener.accept()`. The default implementation checks an
-incoming message for:
-
-1. A qualifying name, if any :py:obj:`~EventListener.names` were
- specified at initialization time
-2. That :py:obj:`~EventListener.event_filter()` returns True.
-
-This can be modified however you see fit to change the criteria for
-inclusion in the stream.
-
-For convenience, a ``JobListener`` class could be created that simply
-bakes in configuration so it does not need to be repeated:
-
-.. code:: python
-
- class JobListener(EventListener):
- def __init__(self, job_id: str):
- super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
- 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
- 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
- self.job_id = job_id
-
- def accept(self, event) -> bool:
- if not super().accept(event):
- return False
- if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
- return event['data']['id'] == job_id
- return event['data']['device'] == job_id
-
-From here on out, you can conjure up a custom-purpose listener that
-listens only for job-related events for a specific job-id easily:
-
-.. code:: python
-
- listener = JobListener('job4')
- with qmp.listener(listener):
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
- async for event in listener:
- print(event)
- if event['event'] == 'BLOCK_JOB_COMPLETED':
- break
-
-
-Experimental Interfaces & Design Issues
----------------------------------------
-
-These interfaces are not ones I am sure I will keep or otherwise modify
-heavily.
-
-qmp.listener()’s type signature
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-`listener()` does not return anything, because it was assumed the caller
-already had a handle to the listener. However, for
-``qmp.listener(EventListener())`` forms, the caller will not have saved
-a handle to the listener.
-
-Because this function can accept *many* listeners, I found it hard to
-accurately type in a way where it could be used in both “one” or “many”
-forms conveniently and in a statically type-safe manner.
-
-Ultimately, I removed the return altogether, but perhaps with more time
-I can work out a way to re-add it.
-
-
-API Reference
--------------
-
-"""
-
-import asyncio
-from contextlib import contextmanager
-import logging
-from typing import (
- AsyncIterator,
- Callable,
- Iterable,
- Iterator,
- List,
- Optional,
- Set,
- Tuple,
- Union,
-)
-
-from .error import AQMPError
-from .message import Message
-
-
-EventNames = Union[str, Iterable[str], None]
-EventFilter = Callable[[Message], bool]
-
-
-class ListenerError(AQMPError):
- """
- Generic error class for `EventListener`-related problems.
- """
-
-
-class EventListener:
- """
- Selectively listens for events with runtime configurable filtering.
-
- This class is designed to be directly usable for the most common cases,
- but it can be extended to provide more rigorous control.
-
- :param names:
- One or more names of events to listen for.
- When not provided, listen for ALL events.
- :param event_filter:
- An optional event filtering function.
- When names are also provided, this acts as a secondary filter.
-
- When ``names`` and ``event_filter`` are both provided, the names
- will be filtered first, and then the filter function will be called
- second. The event filter function can assume that the format of the
- event is a known format.
- """
- def __init__(
- self,
- names: EventNames = None,
- event_filter: Optional[EventFilter] = None,
- ):
- # Queue of 'heard' events yet to be witnessed by a caller.
- self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
-
- # Intended as a historical record, NOT a processing queue or backlog.
- self._history: List[Message] = []
-
- #: Primary event filter, based on one or more event names.
- self.names: Set[str] = set()
- if isinstance(names, str):
- self.names.add(names)
- elif names is not None:
- self.names.update(names)
-
- #: Optional, secondary event filter.
- self.event_filter: Optional[EventFilter] = event_filter
-
- @property
- def history(self) -> Tuple[Message, ...]:
- """
- A read-only history of all events seen so far.
-
- This represents *every* event, including those not yet witnessed
- via `get()` or ``async for``. It persists between `clear()`
- calls and is immutable.
- """
- return tuple(self._history)
-
- def accept(self, event: Message) -> bool:
- """
- Determine if this listener accepts this event.
-
- This method determines which events will appear in the stream.
- The default implementation simply checks the event against the
- list of names and the event_filter to decide if this
- `EventListener` accepts a given event. It can be
- overridden/extended to provide custom listener behavior.
-
- User code is not expected to need to invoke this method.
-
- :param event: The event under consideration.
- :return: `True`, if this listener accepts this event.
- """
- name_ok = (not self.names) or (event['event'] in self.names)
- return name_ok and (
- (not self.event_filter) or self.event_filter(event)
- )
-
- async def put(self, event: Message) -> None:
- """
- Conditionally put a new event into the FIFO queue.
-
- This method is not designed to be invoked from user code, and it
- should not need to be overridden. It is a public interface so
- that `QMPClient` has an interface by which it can inform
- registered listeners of new events.
-
- The event will be put into the queue if
- :py:meth:`~EventListener.accept()` returns `True`.
-
- :param event: The new event to put into the FIFO queue.
- """
- if not self.accept(event):
- return
-
- self._history.append(event)
- await self._queue.put(event)
-
- async def get(self) -> Message:
- """
- Wait for the very next event in this stream.
-
- If one is already available, return that one.
- """
- return await self._queue.get()
-
- def clear(self) -> None:
- """
- Clear this listener of all pending events.
-
- Called when an `EventListener` is being unregistered, this clears the
- pending FIFO queue synchronously. It can be also be used to
- manually clear any pending events, if desired.
-
- .. warning::
- Take care when discarding events. Cleared events will be
- silently tossed on the floor. All events that were ever
- accepted by this listener are visible in `history()`.
- """
- while True:
- try:
- self._queue.get_nowait()
- except asyncio.QueueEmpty:
- break
-
- def __aiter__(self) -> AsyncIterator[Message]:
- return self
-
- async def __anext__(self) -> Message:
- """
- Enables the `EventListener` to function as an async iterator.
-
- It may be used like this:
-
- .. code:: python
-
- async for event in listener:
- print(event)
-
- These iterators will never terminate of their own accord; you
- must provide break conditions or otherwise prepare to run them
- in an `asyncio.Task` that can be cancelled.
- """
- return await self.get()
-
-
-class Events:
- """
- Events is a mix-in class that adds event functionality to the QMP class.
-
- It's designed specifically as a mix-in for `QMPClient`, and it
- relies upon the class it is being mixed into having a 'logger'
- property.
- """
- def __init__(self) -> None:
- self._listeners: List[EventListener] = []
-
- #: Default, all-events `EventListener`.
- self.events: EventListener = EventListener()
- self.register_listener(self.events)
-
- # Parent class needs to have a logger
- self.logger: logging.Logger
-
- async def _event_dispatch(self, msg: Message) -> None:
- """
- Given a new event, propagate it to all of the active listeners.
-
- :param msg: The event to propagate.
- """
- for listener in self._listeners:
- await listener.put(msg)
-
- def register_listener(self, listener: EventListener) -> None:
- """
- Register and activate an `EventListener`.
-
- :param listener: The listener to activate.
- :raise ListenerError: If the given listener is already registered.
- """
- if listener in self._listeners:
- raise ListenerError("Attempted to re-register existing listener")
- self.logger.debug("Registering %s.", str(listener))
- self._listeners.append(listener)
-
- def remove_listener(self, listener: EventListener) -> None:
- """
- Unregister and deactivate an `EventListener`.
-
- The removed listener will have its pending events cleared via
- `clear()`. The listener can be re-registered later when
- desired.
-
- :param listener: The listener to deactivate.
- :raise ListenerError: If the given listener is not registered.
- """
- if listener == self.events:
- raise ListenerError("Cannot remove the default listener.")
- self.logger.debug("Removing %s.", str(listener))
- listener.clear()
- self._listeners.remove(listener)
-
- @contextmanager
- def listen(self, *listeners: EventListener) -> Iterator[None]:
- r"""
- Context manager: Temporarily listen with an `EventListener`.
-
- Accepts one or more `EventListener` objects and registers them,
- activating them for the duration of the context block.
-
- `EventListener` objects will have any pending events in their
- FIFO queue cleared upon exiting the context block, when they are
- deactivated.
-
- :param \*listeners: One or more EventListeners to activate.
- :raise ListenerError: If the given listener(s) are already active.
- """
- _added = []
-
- try:
- for listener in listeners:
- self.register_listener(listener)
- _added.append(listener)
-
- yield
-
- finally:
- for listener in _added:
- self.remove_listener(listener)
-
- @contextmanager
- def listener(
- self,
- names: EventNames = (),
- event_filter: Optional[EventFilter] = None
- ) -> Iterator[EventListener]:
- """
- Context manager: Temporarily listen with a new `EventListener`.
-
- Creates an `EventListener` object and registers it, activating
- it for the duration of the context block.
-
- :param names:
- One or more names of events to listen for.
- When not provided, listen for ALL events.
- :param event_filter:
- An optional event filtering function.
- When names are also provided, this acts as a secondary filter.
-
- :return: The newly created and active `EventListener`.
- """
- listener = EventListener(names, event_filter)
- with self.listen(listener):
- yield listener
diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py
deleted file mode 100644
index f76ccc9074..0000000000
--- a/python/qemu/aqmp/message.py
+++ /dev/null
@@ -1,209 +0,0 @@
-"""
-QMP Message Format
-
-This module provides the `Message` class, which represents a single QMP
-message sent to or from the server.
-"""
-
-import json
-from json import JSONDecodeError
-from typing import (
- Dict,
- Iterator,
- Mapping,
- MutableMapping,
- Optional,
- Union,
-)
-
-from .error import ProtocolError
-
-
-class Message(MutableMapping[str, object]):
- """
- Represents a single QMP protocol message.
-
- QMP uses JSON objects as its basic communicative unit; so this
- Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
- be instantiated from either another mapping (like a `dict`), or from
- raw `bytes` that still need to be deserialized.
-
- Once instantiated, it may be treated like any other MutableMapping::
-
- >>> msg = Message(b'{"hello": "world"}')
- >>> assert msg['hello'] == 'world'
- >>> msg['id'] = 'foobar'
- >>> print(msg)
- {
- "hello": "world",
- "id": "foobar"
- }
-
- It can be converted to `bytes`::
-
- >>> msg = Message({"hello": "world"})
- >>> print(bytes(msg))
- b'{"hello":"world","id":"foobar"}'
-
- Or back into a garden-variety `dict`::
-
- >>> dict(msg)
- {'hello': 'world'}
-
-
- :param value: Initial value, if any.
- :param eager:
- When `True`, attempt to serialize or deserialize the initial value
- immediately, so that conversion exceptions are raised during
- the call to ``__init__()``.
- """
- # pylint: disable=too-many-ancestors
-
- def __init__(self,
- value: Union[bytes, Mapping[str, object]] = b'{}', *,
- eager: bool = True):
- self._data: Optional[bytes] = None
- self._obj: Optional[Dict[str, object]] = None
-
- if isinstance(value, bytes):
- self._data = value
- if eager:
- self._obj = self._deserialize(self._data)
- else:
- self._obj = dict(value)
- if eager:
- self._data = self._serialize(self._obj)
-
- # Methods necessary to implement the MutableMapping interface, see:
- # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
-
- # We get pop, popitem, clear, update, setdefault, __contains__,
- # keys, items, values, get, __eq__ and __ne__ for free.
-
- def __getitem__(self, key: str) -> object:
- return self._object[key]
-
- def __setitem__(self, key: str, value: object) -> None:
- self._object[key] = value
- self._data = None
-
- def __delitem__(self, key: str) -> None:
- del self._object[key]
- self._data = None
-
- def __iter__(self) -> Iterator[str]:
- return iter(self._object)
-
- def __len__(self) -> int:
- return len(self._object)
-
- # Dunder methods not related to MutableMapping:
-
- def __repr__(self) -> str:
- if self._obj is not None:
- return f"Message({self._object!r})"
- return f"Message({bytes(self)!r})"
-
- def __str__(self) -> str:
- """Pretty-printed representation of this QMP message."""
- return json.dumps(self._object, indent=2)
-
- def __bytes__(self) -> bytes:
- """bytes representing this QMP message."""
- if self._data is None:
- self._data = self._serialize(self._obj or {})
- return self._data
-
- # Conversion Methods
-
- @property
- def _object(self) -> Dict[str, object]:
- """
- A `dict` representing this QMP message.
-
- Generated on-demand, if required. This property is private
- because it returns an object that could be used to invalidate
- the internal state of the `Message` object.
- """
- if self._obj is None:
- self._obj = self._deserialize(self._data or b'{}')
- return self._obj
-
- @classmethod
- def _serialize(cls, value: object) -> bytes:
- """
- Serialize a JSON object as `bytes`.
-
- :raise ValueError: When the object cannot be serialized.
- :raise TypeError: When the object cannot be serialized.
-
- :return: `bytes` ready to be sent over the wire.
- """
- return json.dumps(value, separators=(',', ':')).encode('utf-8')
-
- @classmethod
- def _deserialize(cls, data: bytes) -> Dict[str, object]:
- """
- Deserialize JSON `bytes` into a native Python `dict`.
-
- :raise DeserializationError:
- If JSON deserialization fails for any reason.
- :raise UnexpectedTypeError:
- If the data does not represent a JSON object.
-
- :return: A `dict` representing this QMP message.
- """
- try:
- obj = json.loads(data)
- except JSONDecodeError as err:
- emsg = "Failed to deserialize QMP message."
- raise DeserializationError(emsg, data) from err
- if not isinstance(obj, dict):
- raise UnexpectedTypeError(
- "QMP message is not a JSON object.",
- obj
- )
- return obj
-
-
-class DeserializationError(ProtocolError):
- """
- A QMP message was not understood as JSON.
-
- When this Exception is raised, ``__cause__`` will be set to the
- `json.JSONDecodeError` Exception, which can be interrogated for
- further details.
-
- :param error_message: Human-readable string describing the error.
- :param raw: The raw `bytes` that prompted the failure.
- """
- def __init__(self, error_message: str, raw: bytes):
- super().__init__(error_message)
- #: The raw `bytes` that were not understood as JSON.
- self.raw: bytes = raw
-
- def __str__(self) -> str:
- return "\n".join([
- super().__str__(),
- f" raw bytes were: {str(self.raw)}",
- ])
-
-
-class UnexpectedTypeError(ProtocolError):
- """
- A QMP message was JSON, but not a JSON object.
-
- :param error_message: Human-readable string describing the error.
- :param value: The deserialized JSON value that wasn't an object.
- """
- def __init__(self, error_message: str, value: object):
- super().__init__(error_message)
- #: The JSON value that was expected to be an object.
- self.value: object = value
-
- def __str__(self) -> str:
- strval = json.dumps(self.value, indent=2)
- return "\n".join([
- super().__str__(),
- f" json value was: {strval}",
- ])
diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py
deleted file mode 100644
index 24c94123ac..0000000000
--- a/python/qemu/aqmp/models.py
+++ /dev/null
@@ -1,133 +0,0 @@
-"""
-QMP Data Models
-
-This module provides simplistic data classes that represent the few
-structures that the QMP spec mandates; they are used to verify incoming
-data to make sure it conforms to spec.
-"""
-# pylint: disable=too-few-public-methods
-
-from collections import abc
-from typing import (
- Any,
- Mapping,
- Optional,
- Sequence,
-)
-
-
-class Model:
- """
- Abstract data model, representing some QMP object of some kind.
-
- :param raw: The raw object to be validated.
- :raise KeyError: If any required fields are absent.
- :raise TypeError: If any required fields have the wrong type.
- """
- def __init__(self, raw: Mapping[str, Any]):
- self._raw = raw
-
- def _check_key(self, key: str) -> None:
- if key not in self._raw:
- raise KeyError(f"'{self._name}' object requires '{key}' member")
-
- def _check_value(self, key: str, type_: type, typestr: str) -> None:
- assert key in self._raw
- if not isinstance(self._raw[key], type_):
- raise TypeError(
- f"'{self._name}' member '{key}' must be a {typestr}"
- )
-
- def _check_member(self, key: str, type_: type, typestr: str) -> None:
- self._check_key(key)
- self._check_value(key, type_, typestr)
-
- @property
- def _name(self) -> str:
- return type(self).__name__
-
- def __repr__(self) -> str:
- return f"{self._name}({self._raw!r})"
-
-
-class Greeting(Model):
- """
- Defined in qmp-spec.txt, section 2.2, "Server Greeting".
-
- :param raw: The raw Greeting object.
- :raise KeyError: If any required fields are absent.
- :raise TypeError: If any required fields have the wrong type.
- """
- def __init__(self, raw: Mapping[str, Any]):
- super().__init__(raw)
- #: 'QMP' member
- self.QMP: QMPGreeting # pylint: disable=invalid-name
-
- self._check_member('QMP', abc.Mapping, "JSON object")
- self.QMP = QMPGreeting(self._raw['QMP'])
-
-
-class QMPGreeting(Model):
- """
- Defined in qmp-spec.txt, section 2.2, "Server Greeting".
-
- :param raw: The raw QMPGreeting object.
- :raise KeyError: If any required fields are absent.
- :raise TypeError: If any required fields have the wrong type.
- """
- def __init__(self, raw: Mapping[str, Any]):
- super().__init__(raw)
- #: 'version' member
- self.version: Mapping[str, object]
- #: 'capabilities' member
- self.capabilities: Sequence[object]
-
- self._check_member('version', abc.Mapping, "JSON object")
- self.version = self._raw['version']
-
- self._check_member('capabilities', abc.Sequence, "JSON array")
- self.capabilities = self._raw['capabilities']
-
-
-class ErrorResponse(Model):
- """
- Defined in qmp-spec.txt, section 2.4.2, "error".
-
- :param raw: The raw ErrorResponse object.
- :raise KeyError: If any required fields are absent.
- :raise TypeError: If any required fields have the wrong type.
- """
- def __init__(self, raw: Mapping[str, Any]):
- super().__init__(raw)
- #: 'error' member
- self.error: ErrorInfo
- #: 'id' member
- self.id: Optional[object] = None # pylint: disable=invalid-name
-
- self._check_member('error', abc.Mapping, "JSON object")
- self.error = ErrorInfo(self._raw['error'])
-
- if 'id' in raw:
- self.id = raw['id']
-
-
-class ErrorInfo(Model):
- """
- Defined in qmp-spec.txt, section 2.4.2, "error".
-
- :param raw: The raw ErrorInfo object.
- :raise KeyError: If any required fields are absent.
- :raise TypeError: If any required fields have the wrong type.
- """
- def __init__(self, raw: Mapping[str, Any]):
- super().__init__(raw)
- #: 'class' member, with an underscore to avoid conflicts in Python.
- self.class_: str
- #: 'desc' member
- self.desc: str
-
- self._check_member('class', str, "string")
- self.class_ = self._raw['class']
-
- self._check_member('desc', str, "string")
- self.desc = self._raw['desc']
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
deleted file mode 100644
index 32e78749c1..0000000000
--- a/python/qemu/aqmp/protocol.py
+++ /dev/null
@@ -1,902 +0,0 @@
-"""
-Generic Asynchronous Message-based Protocol Support
-
-This module provides a generic framework for sending and receiving
-messages over an asyncio stream. `AsyncProtocol` is an abstract class
-that implements the core mechanisms of a simple send/receive protocol,
-and is designed to be extended.
-
-In this package, it is used as the implementation for the `QMPClient`
-class.
-"""
-
-import asyncio
-from asyncio import StreamReader, StreamWriter
-from enum import Enum
-from functools import wraps
-import logging
-from ssl import SSLContext
-from typing import (
- Any,
- Awaitable,
- Callable,
- Generic,
- List,
- Optional,
- Tuple,
- TypeVar,
- Union,
- cast,
-)
-
-from .error import AQMPError
-from .util import (
- bottom_half,
- create_task,
- exception_summary,
- flush,
- is_closing,
- pretty_traceback,
- upper_half,
- wait_closed,
-)
-
-
-T = TypeVar('T')
-_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``
-_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
-
-
-class Runstate(Enum):
- """Protocol session runstate."""
-
- #: Fully quiesced and disconnected.
- IDLE = 0
- #: In the process of connecting or establishing a session.
- CONNECTING = 1
- #: Fully connected and active session.
- RUNNING = 2
- #: In the process of disconnecting.
- #: Runstate may be returned to `IDLE` by calling `disconnect()`.
- DISCONNECTING = 3
-
-
-class ConnectError(AQMPError):
- """
- Raised when the initial connection process has failed.
-
- This Exception always wraps a "root cause" exception that can be
- interrogated for additional information.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
- def __init__(self, error_message: str, exc: Exception):
- super().__init__(error_message)
- #: Human-readable error string
- self.error_message: str = error_message
- #: Wrapped root cause exception
- self.exc: Exception = exc
-
- def __str__(self) -> str:
- return f"{self.error_message}: {self.exc!s}"
-
-
-class StateError(AQMPError):
- """
- An API command (connect, execute, etc) was issued at an inappropriate time.
-
- This error is raised when a command like
- :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
- time.
-
- :param error_message: Human-readable string describing the state violation.
- :param state: The actual `Runstate` seen at the time of the violation.
- :param required: The `Runstate` required to process this command.
- """
- def __init__(self, error_message: str,
- state: Runstate, required: Runstate):
- super().__init__(error_message)
- self.error_message = error_message
- self.state = state
- self.required = required
-
-
-F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name
-
-
-# Don't Panic.
-def require(required_state: Runstate) -> Callable[[F], F]:
- """
- Decorator: protect a method so it can only be run in a certain `Runstate`.
-
- :param required_state: The `Runstate` required to invoke this method.
- :raise StateError: When the required `Runstate` is not met.
- """
- def _decorator(func: F) -> F:
- # _decorator is the decorator that is built by calling the
- # require() decorator factory; e.g.:
- #
- # @require(Runstate.IDLE) def foo(): ...
- # will replace 'foo' with the result of '_decorator(foo)'.
-
- @wraps(func)
- def _wrapper(proto: 'AsyncProtocol[Any]',
- *args: Any, **kwargs: Any) -> Any:
- # _wrapper is the function that gets executed prior to the
- # decorated method.
-
- name = type(proto).__name__
-
- if proto.runstate != required_state:
- if proto.runstate == Runstate.CONNECTING:
- emsg = f"{name} is currently connecting."
- elif proto.runstate == Runstate.DISCONNECTING:
- emsg = (f"{name} is disconnecting."
- " Call disconnect() to return to IDLE state.")
- elif proto.runstate == Runstate.RUNNING:
- emsg = f"{name} is already connected and running."
- elif proto.runstate == Runstate.IDLE:
- emsg = f"{name} is disconnected and idle."
- else:
- assert False
- raise StateError(emsg, proto.runstate, required_state)
- # No StateError, so call the wrapped method.
- return func(proto, *args, **kwargs)
-
- # Return the decorated method;
- # Transforming Func to Decorated[Func].
- return cast(F, _wrapper)
-
- # Return the decorator instance from the decorator factory. Phew!
- return _decorator
-
-
-class AsyncProtocol(Generic[T]):
- """
- AsyncProtocol implements a generic async message-based protocol.
-
- This protocol assumes the basic unit of information transfer between
- client and server is a "message", the details of which are left up
- to the implementation. It assumes the sending and receiving of these
- messages is full-duplex and not necessarily correlated; i.e. it
- supports asynchronous inbound messages.
-
- It is designed to be extended by a specific protocol which provides
- the implementations for how to read and send messages. These must be
- defined in `_do_recv()` and `_do_send()`, respectively.
-
- Other callbacks have a default implementation, but are intended to be
- either extended or overridden:
-
- - `_establish_session`:
- The base implementation starts the reader/writer tasks.
- A protocol implementation can override this call, inserting
- actions to be taken prior to starting the reader/writer tasks
- before the super() call; actions needing to occur afterwards
- can be written after the super() call.
- - `_on_message`:
- Actions to be performed when a message is received.
- - `_cb_outbound`:
- Logging/Filtering hook for all outbound messages.
- - `_cb_inbound`:
- Logging/Filtering hook for all inbound messages.
- This hook runs *before* `_on_message()`.
-
- :param name:
- Name used for logging messages, if any. By default, messages
- will log to 'qemu.aqmp.protocol', but each individual connection
- can be given its own logger by giving it a name; messages will
- then log to 'qemu.aqmp.protocol.${name}'.
- """
- # pylint: disable=too-many-instance-attributes
-
- #: Logger object for debugging messages from this connection.
- logger = logging.getLogger(__name__)
-
- # Maximum allowable size of read buffer
- _limit = (64 * 1024)
-
- # -------------------------
- # Section: Public interface
- # -------------------------
-
- def __init__(self, name: Optional[str] = None) -> None:
- #: The nickname for this connection, if any.
- self.name: Optional[str] = name
- if self.name is not None:
- self.logger = self.logger.getChild(self.name)
-
- # stream I/O
- self._reader: Optional[StreamReader] = None
- self._writer: Optional[StreamWriter] = None
-
- # Outbound Message queue
- self._outgoing: asyncio.Queue[T]
-
- # Special, long-running tasks:
- self._reader_task: Optional[asyncio.Future[None]] = None
- self._writer_task: Optional[asyncio.Future[None]] = None
-
- # Aggregate of the above two tasks, used for Exception management.
- self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
-
- #: Disconnect task. The disconnect implementation runs in a task
- #: so that asynchronous disconnects (initiated by the
- #: reader/writer) are allowed to wait for the reader/writers to
- #: exit.
- self._dc_task: Optional[asyncio.Future[None]] = None
-
- self._runstate = Runstate.IDLE
- self._runstate_changed: Optional[asyncio.Event] = None
-
- def __repr__(self) -> str:
- cls_name = type(self).__name__
- tokens = []
- if self.name is not None:
- tokens.append(f"name={self.name!r}")
- tokens.append(f"runstate={self.runstate.name}")
- return f"<{cls_name} {' '.join(tokens)}>"
-
- @property # @upper_half
- def runstate(self) -> Runstate:
- """The current `Runstate` of the connection."""
- return self._runstate
-
- @upper_half
- async def runstate_changed(self) -> Runstate:
- """
- Wait for the `runstate` to change, then return that runstate.
- """
- await self._runstate_event.wait()
- return self.runstate
-
- @upper_half
- @require(Runstate.IDLE)
- async def accept(self, address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None) -> None:
- """
- Accept a connection and begin processing message queues.
-
- If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
-
- :param address:
- Address to listen to; UNIX socket path or TCP address/port.
- :param ssl: SSL context to use, if any.
-
- :raise StateError: When the `Runstate` is not `IDLE`.
- :raise ConnectError: If a connection could not be accepted.
- """
- await self._new_session(address, ssl, accept=True)
-
- @upper_half
- @require(Runstate.IDLE)
- async def connect(self, address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None) -> None:
- """
- Connect to the server and begin processing message queues.
-
- If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
-
- :param address:
- Address to connect to; UNIX socket path or TCP address/port.
- :param ssl: SSL context to use, if any.
-
- :raise StateError: When the `Runstate` is not `IDLE`.
- :raise ConnectError: If a connection cannot be made to the server.
- """
- await self._new_session(address, ssl)
-
- @upper_half
- async def disconnect(self) -> None:
- """
- Disconnect and wait for all tasks to fully stop.
-
- If there was an exception that caused the reader/writers to
- terminate prematurely, it will be raised here.
-
- :raise Exception: When the reader or writer terminate unexpectedly.
- """
- self.logger.debug("disconnect() called.")
- self._schedule_disconnect()
- await self._wait_disconnect()
-
- # --------------------------
- # Section: Session machinery
- # --------------------------
-
- @property
- def _runstate_event(self) -> asyncio.Event:
- # asyncio.Event() objects should not be created prior to entrance into
- # an event loop, so we can ensure we create it in the correct context.
- # Create it on-demand *only* at the behest of an 'async def' method.
- if not self._runstate_changed:
- self._runstate_changed = asyncio.Event()
- return self._runstate_changed
-
- @upper_half
- @bottom_half
- def _set_state(self, state: Runstate) -> None:
- """
- Change the `Runstate` of the protocol connection.
-
- Signals the `runstate_changed` event.
- """
- if state == self._runstate:
- return
-
- self.logger.debug("Transitioning from '%s' to '%s'.",
- str(self._runstate), str(state))
- self._runstate = state
- self._runstate_event.set()
- self._runstate_event.clear()
-
- @upper_half
- async def _new_session(self,
- address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None,
- accept: bool = False) -> None:
- """
- Establish a new connection and initialize the session.
-
- Connect or accept a new connection, then begin the protocol
- session machinery. If this call fails, `runstate` is guaranteed
- to be set back to `IDLE`.
-
- :param address:
- Address to connect to/listen on;
- UNIX socket path or TCP address/port.
- :param ssl: SSL context to use, if any.
- :param accept: Accept a connection instead of connecting when `True`.
-
- :raise ConnectError:
- When a connection or session cannot be established.
-
- This exception will wrap a more concrete one. In most cases,
- the wrapped exception will be `OSError` or `EOFError`. If a
- protocol-level failure occurs while establishing a new
- session, the wrapped error may also be an `AQMPError`.
- """
- assert self.runstate == Runstate.IDLE
-
- try:
- phase = "connection"
- await self._establish_connection(address, ssl, accept)
-
- phase = "session"
- await self._establish_session()
-
- except BaseException as err:
- emsg = f"Failed to establish {phase}"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- try:
- # Reset from CONNECTING back to IDLE.
- await self.disconnect()
- except:
- emsg = "Unexpected bottom half exception"
- self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
- raise
-
- # NB: CancelledError is not a BaseException before Python 3.8
- if isinstance(err, asyncio.CancelledError):
- raise
-
- if isinstance(err, Exception):
- raise ConnectError(emsg, err) from err
-
- # Raise BaseExceptions un-wrapped, they're more important.
- raise
-
- assert self.runstate == Runstate.RUNNING
-
- @upper_half
- async def _establish_connection(
- self,
- address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None,
- accept: bool = False
- ) -> None:
- """
- Establish a new connection.
-
- :param address:
- Address to connect to/listen on;
- UNIX socket path or TCP address/port.
- :param ssl: SSL context to use, if any.
- :param accept: Accept a connection instead of connecting when `True`.
- """
- assert self.runstate == Runstate.IDLE
- self._set_state(Runstate.CONNECTING)
-
- # Allow runstate watchers to witness 'CONNECTING' state; some
- # failures in the streaming layer are synchronous and will not
- # otherwise yield.
- await asyncio.sleep(0)
-
- if accept:
- await self._do_accept(address, ssl)
- else:
- await self._do_connect(address, ssl)
-
- @upper_half
- async def _do_accept(self, address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None) -> None:
- """
- Acting as the transport server, accept a single connection.
-
- :param address:
- Address to listen on; UNIX socket path or TCP address/port.
- :param ssl: SSL context to use, if any.
-
- :raise OSError: For stream-related errors.
- """
- self.logger.debug("Awaiting connection on %s ...", address)
- connected = asyncio.Event()
- server: Optional[asyncio.AbstractServer] = None
-
- async def _client_connected_cb(reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter) -> None:
- """Used to accept a single incoming connection, see below."""
- nonlocal server
- nonlocal connected
-
- # A connection has been accepted; stop listening for new ones.
- assert server is not None
- server.close()
- await server.wait_closed()
- server = None
-
- # Register this client as being connected
- self._reader, self._writer = (reader, writer)
-
- # Signal back: We've accepted a client!
- connected.set()
-
- if isinstance(address, tuple):
- coro = asyncio.start_server(
- _client_connected_cb,
- host=address[0],
- port=address[1],
- ssl=ssl,
- backlog=1,
- limit=self._limit,
- )
- else:
- coro = asyncio.start_unix_server(
- _client_connected_cb,
- path=address,
- ssl=ssl,
- backlog=1,
- limit=self._limit,
- )
-
- server = await coro # Starts listening
- await connected.wait() # Waits for the callback to fire (and finish)
- assert server is None
-
- self.logger.debug("Connection accepted.")
-
- @upper_half
- async def _do_connect(self, address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None) -> None:
- """
- Acting as the transport client, initiate a connection to a server.
-
- :param address:
- Address to connect to; UNIX socket path or TCP address/port.
- :param ssl: SSL context to use, if any.
-
- :raise OSError: For stream-related errors.
- """
- self.logger.debug("Connecting to %s ...", address)
-
- if isinstance(address, tuple):
- connect = asyncio.open_connection(
- address[0],
- address[1],
- ssl=ssl,
- limit=self._limit,
- )
- else:
- connect = asyncio.open_unix_connection(
- path=address,
- ssl=ssl,
- limit=self._limit,
- )
- self._reader, self._writer = await connect
-
- self.logger.debug("Connected.")
-
- @upper_half
- async def _establish_session(self) -> None:
- """
- Establish a new session.
-
- Starts the readers/writer tasks; subclasses may perform their
- own negotiations here. The Runstate will be RUNNING upon
- successful conclusion.
- """
- assert self.runstate == Runstate.CONNECTING
-
- self._outgoing = asyncio.Queue()
-
- reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
- writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
-
- self._reader_task = create_task(reader_coro)
- self._writer_task = create_task(writer_coro)
-
- self._bh_tasks = asyncio.gather(
- self._reader_task,
- self._writer_task,
- )
-
- self._set_state(Runstate.RUNNING)
- await asyncio.sleep(0) # Allow runstate_event to process
-
- @upper_half
- @bottom_half
- def _schedule_disconnect(self) -> None:
- """
- Initiate a disconnect; idempotent.
-
- This method is used both in the upper-half as a direct
- consequence of `disconnect()`, and in the bottom-half in the
- case of unhandled exceptions in the reader/writer tasks.
-
- It can be invoked no matter what the `runstate` is.
- """
- if not self._dc_task:
- self._set_state(Runstate.DISCONNECTING)
- self.logger.debug("Scheduling disconnect.")
- self._dc_task = create_task(self._bh_disconnect())
-
- @upper_half
- async def _wait_disconnect(self) -> None:
- """
- Waits for a previously scheduled disconnect to finish.
-
- This method will gather any bottom half exceptions and re-raise
- the one that occurred first; presuming it to be the root cause
- of any subsequent Exceptions. It is intended to be used in the
- upper half of the call chain.
-
- :raise Exception:
- Arbitrary exception re-raised on behalf of the reader/writer.
- """
- assert self.runstate == Runstate.DISCONNECTING
- assert self._dc_task
-
- aws: List[Awaitable[object]] = [self._dc_task]
- if self._bh_tasks:
- aws.insert(0, self._bh_tasks)
- all_defined_tasks = asyncio.gather(*aws)
-
- # Ensure disconnect is done; Exception (if any) is not raised here:
- await asyncio.wait((self._dc_task,))
-
- try:
- await all_defined_tasks # Raise Exceptions from the bottom half.
- finally:
- self._cleanup()
- self._set_state(Runstate.IDLE)
-
- @upper_half
- def _cleanup(self) -> None:
- """
- Fully reset this object to a clean state and return to `IDLE`.
- """
- def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
- # Help to erase a task, ENSURING it is fully quiesced first.
- assert (task is None) or task.done()
- return None if (task and task.done()) else task
-
- assert self.runstate == Runstate.DISCONNECTING
- self._dc_task = _paranoid_task_erase(self._dc_task)
- self._reader_task = _paranoid_task_erase(self._reader_task)
- self._writer_task = _paranoid_task_erase(self._writer_task)
- self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
-
- self._reader = None
- self._writer = None
-
- # NB: _runstate_changed cannot be cleared because we still need it to
- # send the final runstate changed event ...!
-
- # ----------------------------
- # Section: Bottom Half methods
- # ----------------------------
-
- @bottom_half
- async def _bh_disconnect(self) -> None:
- """
- Disconnect and cancel all outstanding tasks.
-
- It is designed to be called from its task context,
- :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
- it is free to wait on any pending actions that may still need to
- occur in either the reader or writer tasks.
- """
- assert self.runstate == Runstate.DISCONNECTING
-
- def _done(task: Optional['asyncio.Future[Any]']) -> bool:
- return task is not None and task.done()
-
- # NB: We can't rely on _bh_tasks being done() here, it may not
- # yet have had a chance to run and gather itself.
- tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
- error_pathway = _done(self._reader_task) or _done(self._writer_task)
-
- try:
- # Try to flush the writer, if possible:
- if not error_pathway:
- await self._bh_flush_writer()
- except BaseException as err:
- error_pathway = True
- emsg = "Failed to flush the writer"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise
- finally:
- # Cancel any still-running tasks:
- if self._writer_task is not None and not self._writer_task.done():
- self.logger.debug("Cancelling writer task.")
- self._writer_task.cancel()
- if self._reader_task is not None and not self._reader_task.done():
- self.logger.debug("Cancelling reader task.")
- self._reader_task.cancel()
-
- # Close out the tasks entirely (Won't raise):
- if tasks:
- self.logger.debug("Waiting for tasks to complete ...")
- await asyncio.wait(tasks)
-
- # Lastly, close the stream itself. (May raise):
- await self._bh_close_stream(error_pathway)
- self.logger.debug("Disconnected.")
-
- @bottom_half
- async def _bh_flush_writer(self) -> None:
- if not self._writer_task:
- return
-
- self.logger.debug("Draining the outbound queue ...")
- await self._outgoing.join()
- if self._writer is not None:
- self.logger.debug("Flushing the StreamWriter ...")
- await flush(self._writer)
-
- @bottom_half
- async def _bh_close_stream(self, error_pathway: bool = False) -> None:
- # NB: Closing the writer also implcitly closes the reader.
- if not self._writer:
- return
-
- if not is_closing(self._writer):
- self.logger.debug("Closing StreamWriter.")
- self._writer.close()
-
- self.logger.debug("Waiting for StreamWriter to close ...")
- try:
- await wait_closed(self._writer)
- except Exception: # pylint: disable=broad-except
- # It's hard to tell if the Stream is already closed or
- # not. Even if one of the tasks has failed, it may have
- # failed for a higher-layered protocol reason. The
- # stream could still be open and perfectly fine.
- # I don't know how to discern its health here.
-
- if error_pathway:
- # We already know that *something* went wrong. Let's
- # just trust that the Exception we already have is the
- # better one to present to the user, even if we don't
- # genuinely *know* the relationship between the two.
- self.logger.debug(
- "Discarding Exception from wait_closed:\n%s\n",
- pretty_traceback(),
- )
- else:
- # Oops, this is a brand-new error!
- raise
- finally:
- self.logger.debug("StreamWriter closed.")
-
- @bottom_half
- async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
- """
- Run one of the bottom-half methods in a loop forever.
-
- If the bottom half ever raises any exception, schedule a
- disconnect that will terminate the entire loop.
-
- :param async_fn: The bottom-half method to run in a loop.
- :param name: The name of this task, used for logging.
- """
- try:
- while True:
- await async_fn()
- except asyncio.CancelledError:
- # We have been cancelled by _bh_disconnect, exit gracefully.
- self.logger.debug("Task.%s: cancelled.", name)
- return
- except BaseException as err:
- self.logger.error("Task.%s: %s",
- name, exception_summary(err))
- self.logger.debug("Task.%s: failure:\n%s\n",
- name, pretty_traceback())
- self._schedule_disconnect()
- raise
- finally:
- self.logger.debug("Task.%s: exiting.", name)
-
- @bottom_half
- async def _bh_send_message(self) -> None:
- """
- Wait for an outgoing message, then send it.
-
- Designed to be run in `_bh_loop_forever()`.
- """
- msg = await self._outgoing.get()
- try:
- await self._send(msg)
- finally:
- self._outgoing.task_done()
-
- @bottom_half
- async def _bh_recv_message(self) -> None:
- """
- Wait for an incoming message and call `_on_message` to route it.
-
- Designed to be run in `_bh_loop_forever()`.
- """
- msg = await self._recv()
- await self._on_message(msg)
-
- # --------------------
- # Section: Message I/O
- # --------------------
-
- @upper_half
- @bottom_half
- def _cb_outbound(self, msg: T) -> T:
- """
- Callback: outbound message hook.
-
- This is intended for subclasses to be able to add arbitrary
- hooks to filter or manipulate outgoing messages. The base
- implementation does nothing but log the message without any
- manipulation of the message.
-
- :param msg: raw outbound message
- :return: final outbound message
- """
- self.logger.debug("--> %s", str(msg))
- return msg
-
- @upper_half
- @bottom_half
- def _cb_inbound(self, msg: T) -> T:
- """
- Callback: inbound message hook.
-
- This is intended for subclasses to be able to add arbitrary
- hooks to filter or manipulate incoming messages. The base
- implementation does nothing but log the message without any
- manipulation of the message.
-
- This method does not "handle" incoming messages; it is a filter.
- The actual "endpoint" for incoming messages is `_on_message()`.
-
- :param msg: raw inbound message
- :return: processed inbound message
- """
- self.logger.debug("<-- %s", str(msg))
- return msg
-
- @upper_half
- @bottom_half
- async def _readline(self) -> bytes:
- """
- Wait for a newline from the incoming reader.
-
- This method is provided as a convenience for upper-layer
- protocols, as many are line-based.
-
- This method *may* return a sequence of bytes without a trailing
- newline if EOF occurs, but *some* bytes were received. In this
- case, the next call will raise `EOFError`. It is assumed that
- the layer 5 protocol will decide if there is anything meaningful
- to be done with a partial message.
-
- :raise OSError: For stream-related errors.
- :raise EOFError:
- If the reader stream is at EOF and there are no bytes to return.
- :return: bytes, including the newline.
- """
- assert self._reader is not None
- msg_bytes = await self._reader.readline()
-
- if not msg_bytes:
- if self._reader.at_eof():
- raise EOFError
-
- return msg_bytes
-
- @upper_half
- @bottom_half
- async def _do_recv(self) -> T:
- """
- Abstract: Read from the stream and return a message.
-
- Very low-level; intended to only be called by `_recv()`.
- """
- raise NotImplementedError
-
- @upper_half
- @bottom_half
- async def _recv(self) -> T:
- """
- Read an arbitrary protocol message.
-
- .. warning::
- This method is intended primarily for `_bh_recv_message()`
- to use in an asynchronous task loop. Using it outside of
- this loop will "steal" messages from the normal routing
- mechanism. It is safe to use prior to `_establish_session()`,
- but should not be used otherwise.
-
- This method uses `_do_recv()` to retrieve the raw message, and
- then transforms it using `_cb_inbound()`.
-
- :return: A single (filtered, processed) protocol message.
- """
- message = await self._do_recv()
- return self._cb_inbound(message)
-
- @upper_half
- @bottom_half
- def _do_send(self, msg: T) -> None:
- """
- Abstract: Write a message to the stream.
-
- Very low-level; intended to only be called by `_send()`.
- """
- raise NotImplementedError
-
- @upper_half
- @bottom_half
- async def _send(self, msg: T) -> None:
- """
- Send an arbitrary protocol message.
-
- This method will transform any outgoing messages according to
- `_cb_outbound()`.
-
- .. warning::
- Like `_recv()`, this method is intended to be called by
- the writer task loop that processes outgoing
- messages. Calling it directly may circumvent logic
- implemented by the caller meant to correlate outgoing and
- incoming messages.
-
- :raise OSError: For problems with the underlying stream.
- """
- msg = self._cb_outbound(msg)
- self._do_send(msg)
-
- @bottom_half
- async def _on_message(self, msg: T) -> None:
- """
- Called to handle the receipt of a new message.
-
- .. caution::
- This is executed from within the reader loop, so be advised
- that waiting on either the reader or writer task will lead
- to deadlock. Additionally, any unhandled exceptions will
- directly cause the loop to halt, so logic may be best-kept
- to a minimum if at all possible.
-
- :param msg: The incoming message, already logged/filtered.
- """
- # Nothing to do in the abstract case.
diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed
deleted file mode 100644
index e69de29bb2..0000000000
--- a/python/qemu/aqmp/py.typed
+++ /dev/null
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
deleted file mode 100644
index 82e9dab124..0000000000
--- a/python/qemu/aqmp/qmp_client.py
+++ /dev/null
@@ -1,621 +0,0 @@
-"""
-QMP Protocol Implementation
-
-This module provides the `QMPClient` class, which can be used to connect
-and send commands to a QMP server such as QEMU. The QMP class can be
-used to either connect to a listening server, or used to listen and
-accept an incoming connection from that server.
-"""
-
-import asyncio
-import logging
-from typing import (
- Dict,
- List,
- Mapping,
- Optional,
- Union,
- cast,
-)
-
-from .error import AQMPError, ProtocolError
-from .events import Events
-from .message import Message
-from .models import ErrorResponse, Greeting
-from .protocol import AsyncProtocol, Runstate, require
-from .util import (
- bottom_half,
- exception_summary,
- pretty_traceback,
- upper_half,
-)
-
-
-class _WrappedProtocolError(ProtocolError):
- """
- Abstract exception class for Protocol errors that wrap an Exception.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
- def __init__(self, error_message: str, exc: Exception):
- super().__init__(error_message)
- self.exc = exc
-
- def __str__(self) -> str:
- return f"{self.error_message}: {self.exc!s}"
-
-
-class GreetingError(_WrappedProtocolError):
- """
- An exception occurred during the Greeting phase.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
-
-
-class NegotiationError(_WrappedProtocolError):
- """
- An exception occurred during the Negotiation phase.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
-
-
-class ExecuteError(AQMPError):
- """
- Exception raised by `QMPClient.execute()` on RPC failure.
-
- :param error_response: The RPC error response object.
- :param sent: The sent RPC message that caused the failure.
- :param received: The raw RPC error reply received.
- """
- def __init__(self, error_response: ErrorResponse,
- sent: Message, received: Message):
- super().__init__(error_response.error.desc)
- #: The sent `Message` that caused the failure
- self.sent: Message = sent
- #: The received `Message` that indicated failure
- self.received: Message = received
- #: The parsed error response
- self.error: ErrorResponse = error_response
- #: The QMP error class
- self.error_class: str = error_response.error.class_
-
-
-class ExecInterruptedError(AQMPError):
- """
- Exception raised by `execute()` (et al) when an RPC is interrupted.
-
- This error is raised when an `execute()` statement could not be
- completed. This can occur because the connection itself was
- terminated before a reply was received.
-
- The true cause of the interruption will be available via `disconnect()`.
- """
-
-
-class _MsgProtocolError(ProtocolError):
- """
- Abstract error class for protocol errors that have a `Message` object.
-
- This Exception class is used for protocol errors where the `Message`
- was mechanically understood, but was found to be inappropriate or
- malformed.
-
- :param error_message: Human-readable string describing the error.
- :param msg: The QMP `Message` that caused the error.
- """
- def __init__(self, error_message: str, msg: Message):
- super().__init__(error_message)
- #: The received `Message` that caused the error.
- self.msg: Message = msg
-
- def __str__(self) -> str:
- return "\n".join([
- super().__str__(),
- f" Message was: {str(self.msg)}\n",
- ])
-
-
-class ServerParseError(_MsgProtocolError):
- """
- The Server sent a `Message` indicating parsing failure.
-
- i.e. A reply has arrived from the server, but it is missing the "ID"
- field, indicating a parsing error.
-
- :param error_message: Human-readable string describing the error.
- :param msg: The QMP `Message` that caused the error.
- """
-
-
-class BadReplyError(_MsgProtocolError):
- """
- An execution reply was successfully routed, but not understood.
-
- If a QMP message is received with an 'id' field to allow it to be
- routed, but is otherwise malformed, this exception will be raised.
-
- A reply message is malformed if it is missing either the 'return' or
- 'error' keys, or if the 'error' value has missing keys or members of
- the wrong type.
-
- :param error_message: Human-readable string describing the error.
- :param msg: The malformed reply that was received.
- :param sent: The message that was sent that prompted the error.
- """
- def __init__(self, error_message: str, msg: Message, sent: Message):
- super().__init__(error_message, msg)
- #: The sent `Message` that caused the failure
- self.sent = sent
-
-
-class QMPClient(AsyncProtocol[Message], Events):
- """
- Implements a QMP client connection.
-
- QMP can be used to establish a connection as either the transport
- client or server, though this class always acts as the QMP client.
-
- :param name: Optional nickname for the connection, used for logging.
-
- Basic script-style usage looks like this::
-
- qmp = QMPClient('my_virtual_machine_name')
- await qmp.connect(('127.0.0.1', 1234))
- ...
- res = await qmp.execute('block-query')
- ...
- await qmp.disconnect()
-
- Basic async client-style usage looks like this::
-
- class Client:
- def __init__(self, name: str):
- self.qmp = QMPClient(name)
-
- async def watch_events(self):
- try:
- async for event in self.qmp.events:
- print(f"Event: {event['event']}")
- except asyncio.CancelledError:
- return
-
- async def run(self, address='/tmp/qemu.socket'):
- await self.qmp.connect(address)
- asyncio.create_task(self.watch_events())
- await self.qmp.runstate_changed.wait()
- await self.disconnect()
-
- See `aqmp.events` for more detail on event handling patterns.
- """
- #: Logger object used for debugging messages.
- logger = logging.getLogger(__name__)
-
- # Read buffer limit; large enough to accept query-qmp-schema
- _limit = (256 * 1024)
-
- # Type alias for pending execute() result items
- _PendingT = Union[Message, ExecInterruptedError]
-
- def __init__(self, name: Optional[str] = None) -> None:
- super().__init__(name)
- Events.__init__(self)
-
- #: Whether or not to await a greeting after establishing a connection.
- self.await_greeting: bool = True
-
- #: Whether or not to perform capabilities negotiation upon connection.
- #: Implies `await_greeting`.
- self.negotiate: bool = True
-
- # Cached Greeting, if one was awaited.
- self._greeting: Optional[Greeting] = None
-
- # Command ID counter
- self._execute_id = 0
-
- # Incoming RPC reply messages.
- self._pending: Dict[
- Union[str, None],
- 'asyncio.Queue[QMPClient._PendingT]'
- ] = {}
-
- @upper_half
- async def _establish_session(self) -> None:
- """
- Initiate the QMP session.
-
- Wait for the QMP greeting and perform capabilities negotiation.
-
- :raise GreetingError: When the greeting is not understood.
- :raise NegotiationError: If the negotiation fails.
- :raise EOFError: When the server unexpectedly hangs up.
- :raise OSError: For underlying stream errors.
- """
- self._greeting = None
- self._pending = {}
-
- if self.await_greeting or self.negotiate:
- self._greeting = await self._get_greeting()
-
- if self.negotiate:
- await self._negotiate()
-
- # This will start the reader/writers:
- await super()._establish_session()
-
- @upper_half
- async def _get_greeting(self) -> Greeting:
- """
- :raise GreetingError: When the greeting is not understood.
- :raise EOFError: When the server unexpectedly hangs up.
- :raise OSError: For underlying stream errors.
-
- :return: the Greeting object given by the server.
- """
- self.logger.debug("Awaiting greeting ...")
-
- try:
- msg = await self._recv()
- return Greeting(msg)
- except (ProtocolError, KeyError, TypeError) as err:
- emsg = "Did not understand Greeting"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise GreetingError(emsg, err) from err
- except BaseException as err:
- # EOFError, OSError, or something unexpected.
- emsg = "Failed to receive Greeting"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise
-
- @upper_half
- async def _negotiate(self) -> None:
- """
- Perform QMP capabilities negotiation.
-
- :raise NegotiationError: When negotiation fails.
- :raise EOFError: When the server unexpectedly hangs up.
- :raise OSError: For underlying stream errors.
- """
- self.logger.debug("Negotiating capabilities ...")
-
- arguments: Dict[str, List[str]] = {'enable': []}
- if self._greeting and 'oob' in self._greeting.QMP.capabilities:
- arguments['enable'].append('oob')
- msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
-
- # It's not safe to use execute() here, because the reader/writers
- # aren't running. AsyncProtocol *requires* that a new session
- # does not fail after the reader/writers are running!
- try:
- await self._send(msg)
- reply = await self._recv()
- assert 'return' in reply
- assert 'error' not in reply
- except (ProtocolError, AssertionError) as err:
- emsg = "Negotiation failed"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise NegotiationError(emsg, err) from err
- except BaseException as err:
- # EOFError, OSError, or something unexpected.
- emsg = "Negotiation failed"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise
-
- @bottom_half
- async def _bh_disconnect(self) -> None:
- try:
- await super()._bh_disconnect()
- finally:
- if self._pending:
- self.logger.debug("Cancelling pending executions")
- keys = self._pending.keys()
- for key in keys:
- self.logger.debug("Cancelling execution '%s'", key)
- self._pending[key].put_nowait(
- ExecInterruptedError("Disconnected")
- )
-
- self.logger.debug("QMP Disconnected.")
-
- @upper_half
- def _cleanup(self) -> None:
- super()._cleanup()
- assert not self._pending
-
- @bottom_half
- async def _on_message(self, msg: Message) -> None:
- """
- Add an incoming message to the appropriate queue/handler.
-
- :raise ServerParseError: When Message indicates server parse failure.
- """
- # Incoming messages are not fully parsed/validated here;
- # do only light peeking to know how to route the messages.
-
- if 'event' in msg:
- await self._event_dispatch(msg)
- return
-
- # Below, we assume everything left is an execute/exec-oob response.
-
- exec_id = cast(Optional[str], msg.get('id'))
-
- if exec_id in self._pending:
- await self._pending[exec_id].put(msg)
- return
-
- # We have a message we can't route back to a caller.
-
- is_error = 'error' in msg
- has_id = 'id' in msg
-
- if is_error and not has_id:
- # This is very likely a server parsing error.
- # It doesn't inherently belong to any pending execution.
- # Instead of performing clever recovery, just terminate.
- # See "NOTE" in qmp-spec.txt, section 2.4.2
- raise ServerParseError(
- ("Server sent an error response without an ID, "
- "but there are no ID-less executions pending. "
- "Assuming this is a server parser failure."),
- msg
- )
-
- # qmp-spec.txt, section 2.4:
- # 'Clients should drop all the responses
- # that have an unknown "id" field.'
- self.logger.log(
- logging.ERROR if is_error else logging.WARNING,
- "Unknown ID '%s', message dropped.",
- exec_id,
- )
- self.logger.debug("Unroutable message: %s", str(msg))
-
- @upper_half
- @bottom_half
- async def _do_recv(self) -> Message:
- """
- :raise OSError: When a stream error is encountered.
- :raise EOFError: When the stream is at EOF.
- :raise ProtocolError:
- When the Message is not understood.
- See also `Message._deserialize`.
-
- :return: A single QMP `Message`.
- """
- msg_bytes = await self._readline()
- msg = Message(msg_bytes, eager=True)
- return msg
-
- @upper_half
- @bottom_half
- def _do_send(self, msg: Message) -> None:
- """
- :raise ValueError: JSON serialization failure
- :raise TypeError: JSON serialization failure
- :raise OSError: When a stream error is encountered.
- """
- assert self._writer is not None
- self._writer.write(bytes(msg))
-
- @upper_half
- def _get_exec_id(self) -> str:
- exec_id = f"__aqmp#{self._execute_id:05d}"
- self._execute_id += 1
- return exec_id
-
- @upper_half
- async def _issue(self, msg: Message) -> Union[None, str]:
- """
- Issue a QMP `Message` and do not wait for a reply.
-
- :param msg: The QMP `Message` to send to the server.
-
- :return: The ID of the `Message` sent.
- """
- msg_id: Optional[str] = None
- if 'id' in msg:
- assert isinstance(msg['id'], str)
- msg_id = msg['id']
-
- self._pending[msg_id] = asyncio.Queue(maxsize=1)
- await self._outgoing.put(msg)
-
- return msg_id
-
- @upper_half
- async def _reply(self, msg_id: Union[str, None]) -> Message:
- """
- Await a reply to a previously issued QMP message.
-
- :param msg_id: The ID of the previously issued message.
-
- :return: The reply from the server.
- :raise ExecInterruptedError:
- When the reply could not be retrieved because the connection
- was lost, or some other problem.
- """
- queue = self._pending[msg_id]
- result = await queue.get()
-
- try:
- if isinstance(result, ExecInterruptedError):
- raise result
- return result
- finally:
- del self._pending[msg_id]
-
- @upper_half
- async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
- """
- Send a QMP `Message` to the server and await a reply.
-
- This method *assumes* you are sending some kind of an execute
- statement that *will* receive a reply.
-
- An execution ID will be assigned if assign_id is `True`. It can be
- disabled, but this requires that an ID is manually assigned
- instead. For manually assigned IDs, you must not use the string
- '__aqmp#' anywhere in the ID.
-
- :param msg: The QMP `Message` to execute.
- :param assign_id: If True, assign a new execution ID.
-
- :return: Execution reply from the server.
- :raise ExecInterruptedError:
- When the reply could not be retrieved because the connection
- was lost, or some other problem.
- """
- if assign_id:
- msg['id'] = self._get_exec_id()
- elif 'id' in msg:
- assert isinstance(msg['id'], str)
- assert '__aqmp#' not in msg['id']
-
- exec_id = await self._issue(msg)
- return await self._reply(exec_id)
-
- @upper_half
- @require(Runstate.RUNNING)
- async def _raw(
- self,
- msg: Union[Message, Mapping[str, object], bytes],
- assign_id: bool = True,
- ) -> Message:
- """
- Issue a raw `Message` to the QMP server and await a reply.
-
- :param msg:
- A Message to send to the server. It may be a `Message`, any
- Mapping (including Dict), or raw bytes.
- :param assign_id:
- Assign an arbitrary execution ID to this message. If
- `False`, the existing id must either be absent (and no other
- such pending execution may omit an ID) or a string. If it is
- a string, it must not start with '__aqmp#' and no other such
- pending execution may currently be using that ID.
-
- :return: Execution reply from the server.
-
- :raise ExecInterruptedError:
- When the reply could not be retrieved because the connection
- was lost, or some other problem.
- :raise TypeError:
- When assign_id is `False`, an ID is given, and it is not a string.
- :raise ValueError:
- When assign_id is `False`, but the ID is not usable;
- Either because it starts with '__aqmp#' or it is already in-use.
- """
- # 1. convert generic Mapping or bytes to a QMP Message
- # 2. copy Message objects so that we assign an ID only to the copy.
- msg = Message(msg)
-
- exec_id = msg.get('id')
- if not assign_id and 'id' in msg:
- if not isinstance(exec_id, str):
- raise TypeError(f"ID ('{exec_id}') must be a string.")
- if exec_id.startswith('__aqmp#'):
- raise ValueError(
- f"ID ('{exec_id}') must not start with '__aqmp#'."
- )
-
- if not assign_id and exec_id in self._pending:
- raise ValueError(
- f"ID '{exec_id}' is in-use and cannot be used."
- )
-
- return await self._execute(msg, assign_id=assign_id)
-
- @upper_half
- @require(Runstate.RUNNING)
- async def execute_msg(self, msg: Message) -> object:
- """
- Execute a QMP command and return its value.
-
- :param msg: The QMP `Message` to execute.
-
- :return:
- The command execution return value from the server. The type of
- object returned depends on the command that was issued,
- though most in QEMU return a `dict`.
- :raise ValueError:
- If the QMP `Message` does not have either the 'execute' or
- 'exec-oob' fields set.
- :raise ExecuteError: When the server returns an error response.
- :raise ExecInterruptedError: if the connection was terminated early.
- """
- if not ('execute' in msg or 'exec-oob' in msg):
- raise ValueError("Requires 'execute' or 'exec-oob' message")
-
- # Copy the Message so that the ID assigned by _execute() is
- # local to this method; allowing the ID to be seen in raised
- # Exceptions but without modifying the caller's held copy.
- msg = Message(msg)
- reply = await self._execute(msg)
-
- if 'error' in reply:
- try:
- error_response = ErrorResponse(reply)
- except (KeyError, TypeError) as err:
- # Error response was malformed.
- raise BadReplyError(
- "QMP error reply is malformed", reply, msg,
- ) from err
-
- raise ExecuteError(error_response, msg, reply)
-
- if 'return' not in reply:
- raise BadReplyError(
- "QMP reply is missing a 'error' or 'return' member",
- reply, msg,
- )
-
- return reply['return']
-
- @classmethod
- def make_execute_msg(cls, cmd: str,
- arguments: Optional[Mapping[str, object]] = None,
- oob: bool = False) -> Message:
- """
- Create an executable message to be sent by `execute_msg` later.
-
- :param cmd: QMP command name.
- :param arguments: Arguments (if any). Must be JSON-serializable.
- :param oob: If `True`, execute "out of band".
-
- :return: An executable QMP `Message`.
- """
- msg = Message({'exec-oob' if oob else 'execute': cmd})
- if arguments is not None:
- msg['arguments'] = arguments
- return msg
-
- @upper_half
- async def execute(self, cmd: str,
- arguments: Optional[Mapping[str, object]] = None,
- oob: bool = False) -> object:
- """
- Execute a QMP command and return its value.
-
- :param cmd: QMP command name.
- :param arguments: Arguments (if any). Must be JSON-serializable.
- :param oob: If `True`, execute "out of band".
-
- :return:
- The command execution return value from the server. The type of
- object returned depends on the command that was issued,
- though most in QEMU return a `dict`.
- :raise ExecuteError: When the server returns an error response.
- :raise ExecInterruptedError: if the connection was terminated early.
- """
- msg = self.make_execute_msg(cmd, arguments, oob=oob)
- return await self.execute_msg(msg)
diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
deleted file mode 100644
index eaa5fc7d5f..0000000000
--- a/python/qemu/aqmp/util.py
+++ /dev/null
@@ -1,217 +0,0 @@
-"""
-Miscellaneous Utilities
-
-This module provides asyncio utilities and compatibility wrappers for
-Python 3.6 to provide some features that otherwise become available in
-Python 3.7+.
-
-Various logging and debugging utilities are also provided, such as
-`exception_summary()` and `pretty_traceback()`, used primarily for
-adding information into the logging stream.
-"""
-
-import asyncio
-import sys
-import traceback
-from typing import (
- Any,
- Coroutine,
- Optional,
- TypeVar,
- cast,
-)
-
-
-T = TypeVar('T')
-
-
-# --------------------------
-# Section: Utility Functions
-# --------------------------
-
-
-async def flush(writer: asyncio.StreamWriter) -> None:
- """
- Utility function to ensure a StreamWriter is *fully* drained.
-
- `asyncio.StreamWriter.drain` only promises we will return to below
- the "high-water mark". This function ensures we flush the entire
- buffer -- by setting the high water mark to 0 and then calling
- drain. The flow control limits are restored after the call is
- completed.
- """
- transport = cast(asyncio.WriteTransport, writer.transport)
-
- # https://github.com/python/typeshed/issues/5779
- low, high = transport.get_write_buffer_limits() # type: ignore
- transport.set_write_buffer_limits(0, 0)
- try:
- await writer.drain()
- finally:
- transport.set_write_buffer_limits(high, low)
-
-
-def upper_half(func: T) -> T:
- """
- Do-nothing decorator that annotates a method as an "upper-half" method.
-
- These methods must not call bottom-half functions directly, but can
- schedule them to run.
- """
- return func
-
-
-def bottom_half(func: T) -> T:
- """
- Do-nothing decorator that annotates a method as a "bottom-half" method.
-
- These methods must take great care to handle their own exceptions whenever
- possible. If they go unhandled, they will cause termination of the loop.
-
- These methods do not, in general, have the ability to directly
- report information to a caller’s context and will usually be
- collected as a Task result instead.
-
- They must not call upper-half functions directly.
- """
- return func
-
-
-# -------------------------------
-# Section: Compatibility Wrappers
-# -------------------------------
-
-
-def create_task(coro: Coroutine[Any, Any, T],
- loop: Optional[asyncio.AbstractEventLoop] = None
- ) -> 'asyncio.Future[T]':
- """
- Python 3.6-compatible `asyncio.create_task` wrapper.
-
- :param coro: The coroutine to execute in a task.
- :param loop: Optionally, the loop to create the task in.
-
- :return: An `asyncio.Future` object.
- """
- if sys.version_info >= (3, 7):
- if loop is not None:
- return loop.create_task(coro)
- return asyncio.create_task(coro) # pylint: disable=no-member
-
- # Python 3.6:
- return asyncio.ensure_future(coro, loop=loop)
-
-
-def is_closing(writer: asyncio.StreamWriter) -> bool:
- """
- Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
-
- :param writer: The `asyncio.StreamWriter` object.
- :return: `True` if the writer is closing, or closed.
- """
- if sys.version_info >= (3, 7):
- return writer.is_closing()
-
- # Python 3.6:
- transport = writer.transport
- assert isinstance(transport, asyncio.WriteTransport)
- return transport.is_closing()
-
-
-async def wait_closed(writer: asyncio.StreamWriter) -> None:
- """
- Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
-
- :param writer: The `asyncio.StreamWriter` to wait on.
- """
- if sys.version_info >= (3, 7):
- await writer.wait_closed()
- return
-
- # Python 3.6
- transport = writer.transport
- assert isinstance(transport, asyncio.WriteTransport)
-
- while not transport.is_closing():
- await asyncio.sleep(0)
-
- # This is an ugly workaround, but it's the best I can come up with.
- sock = transport.get_extra_info('socket')
-
- if sock is None:
- # Our transport doesn't have a socket? ...
- # Nothing we can reasonably do.
- return
-
- while sock.fileno() != -1:
- await asyncio.sleep(0)
-
-
-def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
- """
- Python 3.6-compatible `asyncio.run` wrapper.
-
- :param coro: A coroutine to execute now.
- :return: The return value from the coroutine.
- """
- if sys.version_info >= (3, 7):
- return asyncio.run(coro, debug=debug)
-
- # Python 3.6
- loop = asyncio.get_event_loop()
- loop.set_debug(debug)
- ret = loop.run_until_complete(coro)
- loop.close()
-
- return ret
-
-
-# ----------------------------
-# Section: Logging & Debugging
-# ----------------------------
-
-
-def exception_summary(exc: BaseException) -> str:
- """
- Return a summary string of an arbitrary exception.
-
- It will be of the form "ExceptionType: Error Message", if the error
- string is non-empty, and just "ExceptionType" otherwise.
- """
- name = type(exc).__qualname__
- smod = type(exc).__module__
- if smod not in ("__main__", "builtins"):
- name = smod + '.' + name
-
- error = str(exc)
- if error:
- return f"{name}: {error}"
- return name
-
-
-def pretty_traceback(prefix: str = " | ") -> str:
- """
- Formats the current traceback, indented to provide visual distinction.
-
- This is useful for printing a traceback within a traceback for
- debugging purposes when encapsulating errors to deliver them up the
- stack; when those errors are printed, this helps provide a nice
- visual grouping to quickly identify the parts of the error that
- belong to the inner exception.
-
- :param prefix: The prefix to append to each line of the traceback.
- :return: A string, formatted something like the following::
-
- | Traceback (most recent call last):
- | File "foobar.py", line 42, in arbitrary_example
- | foo.baz()
- | ArbitraryError: [Errno 42] Something bad happened!
- """
- output = "".join(traceback.format_exception(*sys.exc_info()))
-
- exc_lines = []
- for line in output.split('\n'):
- exc_lines.append(prefix + line)
-
- # The last line is always empty, omit it
- return "\n".join(exc_lines[:-1])