From be71bf80d290c98c0f6a66d3dcab28515b4ea371 Mon Sep 17 00:00:00 2001 From: Lucas Fryzek Date: Fri, 4 Oct 2024 20:53:17 +0100 Subject: Initial commit --- src/relay/color.py | 198 +++++++++++++++++++++++++ src/relay/network.py | 403 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/relay/protocol.py | 362 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 963 insertions(+) create mode 100644 src/relay/color.py create mode 100644 src/relay/network.py create mode 100644 src/relay/protocol.py (limited to 'src/relay') diff --git a/src/relay/color.py b/src/relay/color.py new file mode 100644 index 0000000..61b70ff --- /dev/null +++ b/src/relay/color.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +# +# color.py - remove/replace colors in WeeChat strings +# +# Copyright (C) 2011-2022 Sébastien Helleu +# +# This file is part of QWeeChat, a Qt remote GUI for WeeChat. +# +# QWeeChat is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# QWeeChat is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with QWeeChat. If not, see . +# + +"""Remove/replace colors in WeeChat strings.""" + +import re +import logging + +RE_COLOR_ATTRS = r'[*!/_|]*' +RE_COLOR_STD = r'(?:%s\d{2})' % RE_COLOR_ATTRS +RE_COLOR_EXT = r'(?:@%s\d{5})' % RE_COLOR_ATTRS +RE_COLOR_ANY = r'(?:%s|%s)' % (RE_COLOR_STD, RE_COLOR_EXT) +# \x19: color code, \x1A: set attribute, \x1B: remove attribute, \x1C: reset +RE_COLOR = re.compile( + r'(\x19(?:\d{2}|F%s|B\d{2}|B@\d{5}|E|\\*%s(~%s)?|@\d{5}|b.|\x1C))|\x1A.|' + r'\x1B.|\x1C' + % (RE_COLOR_ANY, RE_COLOR_ANY, RE_COLOR_ANY)) + +TERMINAL_COLORS = \ + '000000cd000000cd00cdcd000000cdcd00cd00cdcde5e5e5' \ + '4d4d4dff000000ff00ffff000000ffff00ff00ffffffffff' \ + '00000000002a0000550000800000aa0000d4002a00002a2a' \ + '002a55002a80002aaa002ad400550000552a005555005580' \ + '0055aa0055d400800000802a0080550080800080aa0080d4' \ + '00aa0000aa2a00aa5500aa8000aaaa00aad400d40000d42a' \ + '00d45500d48000d4aa00d4d42a00002a002a2a00552a0080' \ + '2a00aa2a00d42a2a002a2a2a2a2a552a2a802a2aaa2a2ad4' \ + '2a55002a552a2a55552a55802a55aa2a55d42a80002a802a' \ + '2a80552a80802a80aa2a80d42aaa002aaa2a2aaa552aaa80' \ + '2aaaaa2aaad42ad4002ad42a2ad4552ad4802ad4aa2ad4d4' \ + '55000055002a5500555500805500aa5500d4552a00552a2a' \ + '552a55552a80552aaa552ad455550055552a555555555580' \ + '5555aa5555d455800055802a5580555580805580aa5580d4' \ + '55aa0055aa2a55aa5555aa8055aaaa55aad455d40055d42a' \ + '55d45555d48055d4aa55d4d480000080002a800055800080' \ + '8000aa8000d4802a00802a2a802a55802a80802aaa802ad4' \ + '80550080552a8055558055808055aa8055d480800080802a' \ + '8080558080808080aa8080d480aa0080aa2a80aa5580aa80' \ + '80aaaa80aad480d40080d42a80d45580d48080d4aa80d4d4' \ + 'aa0000aa002aaa0055aa0080aa00aaaa00d4aa2a00aa2a2a' \ + 'aa2a55aa2a80aa2aaaaa2ad4aa5500aa552aaa5555aa5580' \ + 'aa55aaaa55d4aa8000aa802aaa8055aa8080aa80aaaa80d4' \ + 'aaaa00aaaa2aaaaa55aaaa80aaaaaaaaaad4aad400aad42a' \ + 'aad455aad480aad4aaaad4d4d40000d4002ad40055d40080' \ + 'd400aad400d4d42a00d42a2ad42a55d42a80d42aaad42ad4' \ + 'd45500d4552ad45555d45580d455aad455d4d48000d4802a' \ + 'd48055d48080d480aad480d4d4aa00d4aa2ad4aa55d4aa80' \ + 'd4aaaad4aad4d4d400d4d42ad4d455d4d480d4d4aad4d4d4' \ + '0808081212121c1c1c2626263030303a3a3a4444444e4e4e' \ + '5858586262626c6c6c7676768080808a8a8a9494949e9e9e' \ + 'a8a8a8b2b2b2bcbcbcc6c6c6d0d0d0dadadae4e4e4eeeeee' + +# WeeChat basic colors (color name, index in terminal colors) +WEECHAT_BASIC_COLORS = ( + ('default', 0), ('black', 0), ('darkgray', 8), ('red', 1), + ('lightred', 9), ('green', 2), ('lightgreen', 10), ('brown', 3), + ('yellow', 11), ('blue', 4), ('lightblue', 12), ('magenta', 5), + ('lightmagenta', 13), ('cyan', 6), ('lightcyan', 14), ('gray', 7), + ('white', 0)) + + +log = logging.getLogger(__name__) + + +class Color(): + def __init__(self, color_options, debug=False): + self.color_options = color_options + self.debug = debug + + def _rgb_color(self, index): + color = TERMINAL_COLORS[index*6:(index*6)+6] + col_r = int(color[0:2], 16) * 0.85 + col_g = int(color[2:4], 16) * 0.85 + col_b = int(color[4:6], 16) * 0.85 + return '%02x%02x%02x' % (col_r, col_g, col_b) + + def _convert_weechat_color(self, color): + # TODO figure out if I want to do something here + return '' + + def _convert_terminal_color(self, fg_bg, attrs, color): + try: + index = int(color) + return '\x01(%s%s#%s)' % (fg_bg, attrs, self._rgb_color(index)) + except Exception: # noqa: E722 + log.debug('Error decoding terminal color "%s"', color) + return '' + + def _convert_color_attr(self, fg_bg, color): + extended = False + if color[0].startswith('@'): + extended = True + color = color[1:] + attrs = '' + # keep_attrs = False + while color.startswith(('*', '!', '/', '_', '|')): + # TODO: manage the "keep attributes" flag + # if color[0] == '|': + # keep_attrs = True + attrs += color[0] + color = color[1:] + if extended: + return self._convert_terminal_color(fg_bg, attrs, color) + try: + index = int(color) + return self._convert_terminal_color(fg_bg, attrs, + WEECHAT_BASIC_COLORS[index][1]) + except Exception: # noqa: E722 + log.debug('Error decoding color "%s"', color) + return '' + + def _attrcode_to_char(self, code): + codes = { + '\x01': '*', + '\x02': '!', + '\x03': '/', + '\x04': '_', + } + return codes.get(code, '') + + def _convert_color(self, match): + # TODO figure out how to deal with color + color = match.group(0) + if color[0] == '\x19': + if color[1] == 'b': + # bar code, ignored + return '' + if color[1] == '\x1C': + # reset + return '' + if color[1] in ('F', 'B'): + # foreground or background + return self._convert_color_attr(color[1], color[2:]) + if color[1] == '*': + # foreground with optional background + items = color[2:].split(',') + str_col = self._convert_color_attr('F', items[0]) + if len(items) > 1: + str_col += self._convert_color_attr('B', items[1]) + return str_col + if color[1] == '@': + # direct ncurses pair number, ignored + return '' + if color[1] == 'E': + # text emphasis, ignored + return '' + if color[1:].isdigit(): + return self._convert_weechat_color(int(color[1:])) + elif color[0] == '\x1A': + # set attribute + return '\x01(+%s)' % self._attrcode_to_char(color[1]) + elif color[0] == '\x1B': + # remove attribute + return '\x01(-%s)' % self._attrcode_to_char(color[1]) + elif color[0] == '\x1C': + # reset + return '' + # should never be executed! + return match.group(0) + + def _convert_color_debug(self, match): + group = match.group(0) + for code in (0x01, 0x02, 0x03, 0x04, 0x19, 0x1A, 0x1B): + group = group.replace(chr(code), '' % code) + return group + + def convert(self, text): + if not text: + return '' + if self.debug: + return RE_COLOR.sub(self._convert_color_debug, text) + return RE_COLOR.sub(self._convert_color, text) + + +def remove(text): + """Remove colors in a WeeChat string.""" + if not text: + return '' + return re.sub(RE_COLOR, '', text) diff --git a/src/relay/network.py b/src/relay/network.py new file mode 100644 index 0000000..1071f1a --- /dev/null +++ b/src/relay/network.py @@ -0,0 +1,403 @@ +# -*- coding: utf-8 -*- +# +# network.py - I/O with WeeChat/relay +# +# Copyright (C) 2011-2022 Sébastien Helleu +# +# This file is part of QWeeChat, a Qt remote GUI for WeeChat. +# +# QWeeChat is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# QWeeChat is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with QWeeChat. If not, see . +# + +"""I/O with WeeChat/relay.""" + +import hashlib +import secrets +import struct + +from gi.repository import GObject, GLib, Gio + +from weegtk import config + +from sshtunnel import SSHTunnelForwarder + +# list of supported hash algorithms on our side +# (the hash algorithm will be negotiated with the remote WeeChat) +_HASH_ALGOS_LIST = [ + 'plain', + 'sha256', + 'sha512', + 'pbkdf2+sha256', + 'pbkdf2+sha512', +] +_HASH_ALGOS = ':'.join(_HASH_ALGOS_LIST) + +# handshake with remote WeeChat (before init) +_PROTO_HANDSHAKE = f'(handshake) handshake password_hash_algo={_HASH_ALGOS}\n' + +# initialize with the password (plain text) +_PROTO_INIT_PWD = 'init password=%(password)s%(totp)s\n' # nosec + +# initialize with the hashed password +_PROTO_INIT_HASH = ('init password_hash=' + '%(algo)s:%(salt)s%(iter)s:%(hash)s%(totp)s\n') + +_PROTO_SYNC_CMDS = [ + # get buffers + '(listbuffers) hdata buffer:gui_buffers(*) number,full_name,short_name,' + 'type,nicklist,title,local_variables', + # get lines + '(listlines) hdata buffer:gui_buffers(*)/own_lines/last_line(-%(lines)d)/' + 'data date,displayed,prefix,message', + # get nicklist for all buffers + '(nicklist) nicklist', + # enable synchronization + 'sync', +] + +STATUS_DISCONNECTED = 'disconnected' +STATUS_CONNECTING = 'connecting' +STATUS_AUTHENTICATING = 'authenticating' +STATUS_CONNECTED = 'connected' + +NETWORK_STATUS = { + STATUS_DISCONNECTED: { + 'label': 'Disconnected', + 'color': '#aa0000', + 'icon': 'dialog-close.png', + }, + STATUS_CONNECTING: { + 'label': 'Connecting…', + 'color': '#dd5f00', + 'icon': 'dialog-warning.png', + }, + STATUS_AUTHENTICATING: { + 'label': 'Authenticating…', + 'color': '#007fff', + 'icon': 'dialog-password.png', + }, + STATUS_CONNECTED: { + 'label': 'Connected', + 'color': 'green', + 'icon': 'dialog-ok-apply.png', + }, +} + + +class Network(GObject.GObject): + """I/O with WeeChat/relay.""" + __gsignals__ = { + "status_changed" : ( + GObject.SignalFlags.RUN_FIRST, + None, + (str, str) + ), + "message_from_weechat" : ( + GObject.SignalFlags.RUN_FIRST, + None, + (GLib.Bytes,) + ) + } + + def __init__(self, *args): + super().__init__(*args) + self._init_connection() + self.debug_lines = [] + self.debug_dialog = None + self._lines = config.CONFIG_DEFAULT_RELAY_LINES + self.cancel_network_reads = Gio.Cancellable() + self._buffer = bytearray() + self._socketclient = Gio.SocketClient.new() + self._socket = None + + # TODO figure out how to deal with these signals + #self._socket.connected.self._socket_connected) + #self._socket.readyRead.connect(self._socket_read) + #self._socket.disconnected.connect(self._socket_disconnected) + + def _init_connection(self): + self.status = STATUS_DISCONNECTED + self._hostname = None + self._port = None + self._ssl = None + self._password = None + self._totp = None + self._handshake_received = False + self._handshake_timer = None + self._pwd_hash_algo = None + self._pwd_hash_iter = 0 + self._server_nonce = None + + def set_status(self, status): + """Set current status.""" + self.status = status + self.emit("status_changed", status, None) + + def pbkdf2(self, hash_name, salt): + """Return hashed password with PBKDF2-HMAC.""" + return hashlib.pbkdf2_hmac( + hash_name, + password=self._password.encode('utf-8'), + salt=salt, + iterations=self._pwd_hash_iter, + ).hex() + + def _build_init_command(self): + """Build the init command to send to WeeChat.""" + totp = f',totp={self._totp}' if self._totp else '' + if self._pwd_hash_algo == 'plain': # nosec + cmd = _PROTO_INIT_PWD % { + 'password': self._password, + 'totp': totp, + } + else: + client_nonce = secrets.token_bytes(16) + salt = self._server_nonce + client_nonce + pwd_hash = None + iterations = '' + if self._pwd_hash_algo == 'pbkdf2+sha512': # nosec + pwd_hash = self.pbkdf2('sha512', salt) + iterations = f':{self._pwd_hash_iter}' + elif self._pwd_hash_algo == 'pbkdf2+sha256': # nosec + pwd_hash = self.pbkdf2('sha256', salt) + iterations = f':{self._pwd_hash_iter}' + elif self._pwd_hash_algo == 'sha512': # nosec + pwd = salt + self._password.encode('utf-8') + pwd_hash = hashlib.sha512(pwd).hexdigest() + elif self._pwd_hash_algo == 'sha256': # nosec + pwd = salt + self._password.encode('utf-8') + pwd_hash = hashlib.sha256(pwd).hexdigest() + if not pwd_hash: + return None + cmd = _PROTO_INIT_HASH % { + 'algo': self._pwd_hash_algo, + 'salt': bytearray(salt).hex(), + 'iter': iterations, + 'hash': pwd_hash, + 'totp': totp, + } + return cmd + + def _build_sync_command(self): + """Build the sync commands to send to WeeChat.""" + cmd = '\n'.join(_PROTO_SYNC_CMDS) + '\n' + return cmd % {'lines': self._lines} + + def handshake_timer_expired(self): + if self.status == STATUS_AUTHENTICATING: + self._pwd_hash_algo = 'plain' # nosec + self.send_to_weechat(self._build_init_command()) + self.sync_weechat() + self.set_status(STATUS_CONNECTED) + return False + + def _socket_connected(self): + """Slot: socket connected.""" + self.set_status(STATUS_AUTHENTICATING) + self.send_to_weechat(_PROTO_HANDSHAKE) + self._handshake_timer = GLib.timeout_add(2000, self.handshake_timer_expired) + + def _socket_read(self, source_object, res, *user_data): + """Slot: data available on socket.""" + try: + gbytes = self.input.read_bytes_finish(res) + except GLib.Error as err: + self.handle_network_error(err) + return + + self._buffer.extend(gbytes.get_data()) + while len(self._buffer) >= 4: + remainder = None + length = struct.unpack('>i', self._buffer[0:4])[0] + if len(self._buffer) < length: + # partial message, just wait for end of message + break + # more than one message? + if length < len(self._buffer): + # save beginning of another message + remainder = self._buffer[length:] + self._buffer = self._buffer[0:length] + self.emit("message_from_weechat", GLib.Bytes(self._buffer)) + if not self.is_connected(): + return + self._buffer.clear() + if remainder: + self._buffer.extend(remainder) + + self.input.read_bytes_async( + 4096, 0, self.cancel_network_reads, self._socket_read) + + def _socket_disconnected(self): + """Slot: socket disconnected.""" + if self._handshake_timer: + self._handshake_timer.stop() + self._init_connection() + self.set_status(STATUS_DISCONNECTED) + + def is_connected(self): + """Return True if the socket is connected, False otherwise.""" + return self._socket.is_connected() + + def is_ssl(self): + """Return True if SSL is used, False otherwise.""" + return self._ssl + + def connect_weechat_ssh(self, ssh_host, ssh_port, ssh_username, ssh_key, + relay_host, relay_port, relay_pw): + self.ssh_tunnel = SSHTunnelForwarder( + ssh_host, + ssh_username=ssh_username, + ssh_pkey=ssh_key, + remote_bind_address=(relay_host, int(relay_port))) + + self.ssh_tunnel.start() + + self.connect_weechat("localhost", self.ssh_tunnel.local_bind_port, + False, relay_pw, None, "") + + def connect_weechat(self, hostname, port, ssl, password, totp, lines): + """Connect to WeeChat.""" + self._hostname = hostname + try: + self._port = int(port) + except ValueError: + self._port = 0 + self._ssl = ssl + self._password = password + self._totp = totp + try: + self._lines = int(lines) + except ValueError: + self._lines = config.CONFIG_DEFAULT_RELAY_LINES + + # TODO handle SSL + self._socketclient.connect_async( + Gio.NetworkAddress.new(self._hostname, self._port), + None, + self._connected_func, None) + self.set_status(STATUS_CONNECTING) + + def _connected_func(self, source_object, res, *user_data): + """Callback function called after connection attempt.""" + try: + self._socket = self._socketclient.connect_finish(res) + except GLib.Error as err: + print("Connection failed:\n{}".format(err.message)) + self.set_status(STATUS_NOT_CONNECTED) + return + else: + print("Connected") + self.set_status(STATUS_CONNECTED) + self._socket_connected() + self.input = self._socket.get_input_stream() + self.input.read_bytes_async( + 4096, 0, self.cancel_network_reads, self._socket_read) + + def disconnect_weechat(self): + """Disconnect from WeeChat.""" + if self._socket.state() == QtNetwork.QAbstractSocket.UnconnectedState: + self.set_status(STATUS_DISCONNECTED) + return + if self._socket.state() == QtNetwork.QAbstractSocket.ConnectedState: + self.send_to_weechat('quit\n') + self._socket.waitForBytesWritten(1000) + else: + self.set_status(STATUS_DISCONNECTED) + self._socket.abort() + + def send_to_weechat(self, message): + """Send a message to WeeChat.""" + output = self._socket.get_output_stream() + try: + output.write(message.encode("utf-8")) + except GLib.Error as err: + self.handle_network_error(err) + + def init_with_handshake(self, response): + """Initialize with WeeChat using the handshake response.""" + self._pwd_hash_algo = response['password_hash_algo'] + self._pwd_hash_iter = int(response['password_hash_iterations']) + self._server_nonce = bytearray.fromhex(response['nonce']) + if self._pwd_hash_algo: + cmd = self._build_init_command() + if cmd: + self.send_to_weechat(cmd) + self.sync_weechat() + self.set_status(STATUS_CONNECTED) + return + # failed to initialize: disconnect + self.disconnect_weechat() + + def desync_weechat(self): + """Desynchronize from WeeChat.""" + self.send_to_weechat('desync\n') + + def sync_weechat(self): + """Synchronize with WeeChat.""" + self.send_to_weechat(self._build_sync_command()) + + def status_label(self, status): + """Return the label for a given status.""" + return NETWORK_STATUS.get(status, {}).get('label', '') + + def status_color(self, status): + """Return the color for a given status.""" + return NETWORK_STATUS.get(status, {}).get('color', 'black') + + def status_icon(self, status): + """Return the name of icon for a given status.""" + return NETWORK_STATUS.get(status, {}).get('icon', '') + + def get_options(self): + """Get connection options.""" + return { + 'hostname': self._hostname, + 'port': self._port, + 'ssl': 'on' if self._ssl else 'off', + 'password': self._password, + 'lines': str(self._lines), + } + + def debug_print(self, *args, **kwargs): + """Display a debug message.""" + self.debug_lines.append((args, kwargs)) + if self.debug_dialog: + self.debug_dialog.chat.display(*args, **kwargs) + + def _debug_dialog_closed(self, result): + """Called when debug dialog is closed.""" + self.debug_dialog = None + + def debug_input_text_sent(self, text): + """Send debug buffer input to WeeChat.""" + if self.network.is_connected(): + text = str(text) + pos = text.find(')') + if text.startswith('(') and pos >= 0: + text = '(debug_%s)%s' % (text[1:pos], text[pos+1:]) + else: + text = '(debug) %s' % text + self.network.debug_print(0, '<==', text, forcecolor='#AA0000') + self.network.send_to_weechat(text + '\n') + + def open_debug_dialog(self): + """Open a dialog with debug messages.""" + if not self.debug_dialog: + self.debug_dialog = DebugDialog() + self.debug_dialog.input.textSent.connect( + self.debug_input_text_sent) + self.debug_dialog.finished.connect(self._debug_dialog_closed) + self.debug_dialog.display_lines(self.debug_lines) + self.debug_dialog.chat.scroll_bottom() + diff --git a/src/relay/protocol.py b/src/relay/protocol.py new file mode 100644 index 0000000..6f70544 --- /dev/null +++ b/src/relay/protocol.py @@ -0,0 +1,362 @@ +# -*- coding: utf-8 -*- +# +# protocol.py - decode binary messages received from WeeChat/relay +# +# Copyright (C) 2011-2022 Sébastien Helleu +# +# This file is part of QWeeChat, a Qt remote GUI for WeeChat. +# +# QWeeChat is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# QWeeChat is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with QWeeChat. If not, see . +# + +# +# For info about protocol and format of messages, please read document +# "WeeChat Relay Protocol", available at: https://weechat.org/doc/ +# +# History: +# +# 2011-11-23, Sébastien Helleu : +# start dev +# + +"""Decode binary messages received from WeeChat/relay.""" + +import collections +import struct +import zlib + + +class WeechatDict(collections.OrderedDict): + def __str__(self): + return '{%s}' % ', '.join( + ['%s: %s' % (repr(key), repr(self[key])) for key in self]) + + +class WeechatObject: + def __init__(self, objtype, value, separator='\n'): + self.objtype = objtype + self.value = value + self.separator = separator + self.indent = ' ' if separator == '\n' else '' + self.separator1 = '\n%s' % self.indent if separator == '\n' else '' + + def _str_value(self, val): + if isinstance(val, str) and val is not None: + return '\'%s\'' % val + return str(val) + + def _str_value_hdata(self): + lines = ['%skeys: %s%s%spath: %s' % (self.separator1, + str(self.value['keys']), + self.separator, + self.indent, + str(self.value['path']))] + for i, item in enumerate(self.value['items']): + lines.append(' item %d:%s%s' % ( + (i + 1), self.separator, + self.separator.join( + ['%s%s: %s' % (self.indent * 2, key, + self._str_value(value)) + for key, value in item.items()]))) + return '\n'.join(lines) + + def _str_value_infolist(self): + lines = ['%sname: %s' % (self.separator1, self.value['name'])] + for i, item in enumerate(self.value['items']): + lines.append(' item %d:%s%s' % ( + (i + 1), self.separator, + self.separator.join( + ['%s%s: %s' % (self.indent * 2, key, + self._str_value(value)) + for key, value in item.items()]))) + return '\n'.join(lines) + + def _str_value_other(self): + return self._str_value(self.value) + + def __str__(self): + obj_cb = { + 'hda': self._str_value_hdata, + 'inl': self._str_value_infolist, + } + return '%s: %s' % (self.objtype, + obj_cb.get(self.objtype, self._str_value_other)()) + + +class WeechatObjects(list): + def __init__(self, separator='\n'): + super().__init__() + self.separator = separator + + def __str__(self): + return self.separator.join([str(obj) for obj in self]) + + +class WeechatMessage: + def __init__(self, size, size_uncompressed, compression, uncompressed, + msgid, objects): + self.size = size + self.size_uncompressed = size_uncompressed + self.compression = compression + self.uncompressed = uncompressed + self.msgid = msgid + self.objects = objects + + def __str__(self): + if self.compression != 0: + return 'size: %d/%d (%d%%), id=\'%s\', objects:\n%s' % ( + self.size, self.size_uncompressed, + 100 - ((self.size * 100) // self.size_uncompressed), + self.msgid, self.objects) + return 'size: %d, id=\'%s\', objects:\n%s' % (self.size, + self.msgid, + self.objects) + + +class Protocol: + """Decode binary message received from WeeChat/relay.""" + + def __init__(self): + self.data = '' + self._obj_cb = { + 'chr': self._obj_char, + 'int': self._obj_int, + 'lon': self._obj_long, + 'str': self._obj_str, + 'buf': self._obj_buffer, + 'ptr': self._obj_ptr, + 'tim': self._obj_time, + 'htb': self._obj_hashtable, + 'hda': self._obj_hdata, + 'inf': self._obj_info, + 'inl': self._obj_infolist, + 'arr': self._obj_array, + } + + def _obj_type(self): + """Read type in data (3 chars).""" + if len(self.data) < 3: + self.data = '' + return '' + objtype = self.data[0:3].decode() + self.data = self.data[3:] + return objtype + + def _obj_len_data(self, length_size): + """Read length (1 or 4 bytes), then value with this length.""" + if len(self.data) < length_size: + self.data = '' + return None + if length_size == 1: + length = struct.unpack('B', self.data[0:1])[0] + self.data = self.data[1:] + else: + length = self._obj_int() + if length < 0: + return None + if length > 0: + value = self.data[0:length] + self.data = self.data[length:] + else: + value = '' + return value + + def _obj_char(self): + """Read a char in data.""" + if len(self.data) < 1: + return 0 + value = struct.unpack('b', self.data[0:1])[0] + self.data = self.data[1:] + return value + + def _obj_int(self): + """Read an integer in data (4 bytes).""" + if len(self.data) < 4: + self.data = '' + return 0 + value = struct.unpack('>i', self.data[0:4])[0] + self.data = self.data[4:] + return value + + def _obj_long(self): + """Read a long integer in data (length on 1 byte + value as string).""" + value = self._obj_len_data(1) + if value is None: + return None + return int(value) + + def _obj_str(self): + """Read a string in data (length on 4 bytes + content).""" + value = self._obj_len_data(4) + if value in ("", None): + return "" + return value.decode() + + def _obj_buffer(self): + """Read a buffer in data (length on 4 bytes + data).""" + return self._obj_len_data(4) + + def _obj_ptr(self): + """Read a pointer in data (length on 1 byte + value as string).""" + value = self._obj_len_data(1) + if value is None: + return None + return '0x%s' % value + + def _obj_time(self): + """Read a time in data (length on 1 byte + value as string).""" + value = self._obj_len_data(1) + if value is None: + return None + return int(value) + + def _obj_hashtable(self): + """ + Read a hashtable in data + (type for keys + type for values + count + items). + """ + type_keys = self._obj_type() + type_values = self._obj_type() + count = self._obj_int() + hashtable = WeechatDict() + for _ in range(count): + key = self._obj_cb[type_keys]() + value = self._obj_cb[type_values]() + hashtable[key] = value + return hashtable + + def _obj_hdata(self): + """Read a hdata in data.""" + path = self._obj_str() + keys = self._obj_str() + count = self._obj_int() + list_path = path.split('/') if path else [] + list_keys = keys.split(',') if keys else [] + keys_types = [] + dict_keys = WeechatDict() + for key in list_keys: + items = key.split(':') + keys_types.append(items) + dict_keys[items[0]] = items[1] + items = [] + for _ in range(count): + item = WeechatDict() + item['__path'] = [] + pointers = [] + for _ in enumerate(list_path): + pointers.append(self._obj_ptr()) + for key, objtype in keys_types: + item[key] = self._obj_cb[objtype]() + item['__path'] = pointers + items.append(item) + return { + 'path': list_path, + 'keys': dict_keys, + 'count': count, + 'items': items, + } + + def _obj_info(self): + """Read an info in data.""" + name = self._obj_str() + value = self._obj_str() + return (name, value) + + def _obj_infolist(self): + """Read an infolist in data.""" + name = self._obj_str() + count_items = self._obj_int() + items = [] + for _ in range(count_items): + count_vars = self._obj_int() + variables = WeechatDict() + for _ in range(count_vars): + var_name = self._obj_str() + var_type = self._obj_type() + var_value = self._obj_cb[var_type]() + variables[var_name] = var_value + items.append(variables) + return { + 'name': name, + 'items': items + } + + def _obj_array(self): + """Read an array of values in data.""" + type_values = self._obj_type() + count_values = self._obj_int() + values = [] + for _ in range(count_values): + values.append(self._obj_cb[type_values]()) + return values + + def decode(self, data, separator='\n'): + """Decode binary data and return list of objects.""" + self.data = data + size = len(self.data) + size_uncompressed = size + uncompressed = None + # uncompress data (if it is compressed) + compression = struct.unpack('b', self.data[4:5])[0] + if compression: + uncompressed = zlib.decompress(self.data[5:]) + size_uncompressed = len(uncompressed) + 5 + uncompressed = b'%s%s%s' % (struct.pack('>i', size_uncompressed), + struct.pack('b', 0), uncompressed) + self.data = uncompressed + else: + uncompressed = self.data[:] + # skip length and compression flag + self.data = self.data[5:] + # read id + msgid = self._obj_str() + if msgid is None: + msgid = '' + # read objects + objects = WeechatObjects(separator=separator) + while len(self.data) > 0: + objtype = self._obj_type() + value = self._obj_cb[objtype]() + objects.append(WeechatObject(objtype, value, separator=separator)) + return WeechatMessage(size, size_uncompressed, compression, + uncompressed, msgid, objects) + + +def hex_and_ascii(data, bytes_per_line=10): + """Convert a QByteArray to hex + ascii output.""" + num_lines = ((len(data) - 1) // bytes_per_line) + 1 + if num_lines == 0: + return '' + lines = [] + for i in range(num_lines): + str_hex = [] + str_ascii = [] + for j in range(bytes_per_line): + # We can't easily iterate over individual bytes, so we are going to + # do it this way. + index = (i*bytes_per_line) + j + char = data[index:index+1] + if not char: + char = b'x' + byte = struct.unpack('B', char)[0] + str_hex.append(b'%02X' % int(byte)) + if 32 <= byte <= 127: + str_ascii.append(char) + else: + str_ascii.append(b'.') + fmt = b'%%-%ds %%s' % ((bytes_per_line * 3) - 1) + lines.append(fmt % (b' '.join(str_hex), + b''.join(str_ascii))) + return b'\n'.join(lines) + -- cgit