Now About Social Code
aboutsummaryrefslogtreecommitdiff
path: root/src/relay
diff options
context:
space:
mode:
authorLucas Fryzek <lucas.fryzek@fryzekconcepts.com>2024-10-04 20:53:17 +0100
committerLucas Fryzek <lucas.fryzek@fryzekconcepts.com>2024-10-04 20:53:17 +0100
commitbe71bf80d290c98c0f6a66d3dcab28515b4ea371 (patch)
treed936090c5b5c9435febfe220770ff7f25399a235 /src/relay
Initial commit
Diffstat (limited to 'src/relay')
-rw-r--r--src/relay/color.py198
-rw-r--r--src/relay/network.py403
-rw-r--r--src/relay/protocol.py362
3 files changed, 963 insertions, 0 deletions
diff --git a/src/relay/color.py b/src/relay/color.py
new file mode 100644
index 0000000..61b70ff
--- /dev/null
+++ b/src/relay/color.py
@@ -0,0 +1,198 @@
+# -*- coding: utf-8 -*-
+#
+# color.py - remove/replace colors in WeeChat strings
+#
+# Copyright (C) 2011-2022 Sébastien Helleu <flashcode@flashtux.org>
+#
+# This file is part of QWeeChat, a Qt remote GUI for WeeChat.
+#
+# QWeeChat is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# QWeeChat is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Remove/replace colors in WeeChat strings."""
+
+import re
+import logging
+
+RE_COLOR_ATTRS = r'[*!/_|]*'
+RE_COLOR_STD = r'(?:%s\d{2})' % RE_COLOR_ATTRS
+RE_COLOR_EXT = r'(?:@%s\d{5})' % RE_COLOR_ATTRS
+RE_COLOR_ANY = r'(?:%s|%s)' % (RE_COLOR_STD, RE_COLOR_EXT)
+# \x19: color code, \x1A: set attribute, \x1B: remove attribute, \x1C: reset
+RE_COLOR = re.compile(
+ r'(\x19(?:\d{2}|F%s|B\d{2}|B@\d{5}|E|\\*%s(~%s)?|@\d{5}|b.|\x1C))|\x1A.|'
+ r'\x1B.|\x1C'
+ % (RE_COLOR_ANY, RE_COLOR_ANY, RE_COLOR_ANY))
+
+TERMINAL_COLORS = \
+ '000000cd000000cd00cdcd000000cdcd00cd00cdcde5e5e5' \
+ '4d4d4dff000000ff00ffff000000ffff00ff00ffffffffff' \
+ '00000000002a0000550000800000aa0000d4002a00002a2a' \
+ '002a55002a80002aaa002ad400550000552a005555005580' \
+ '0055aa0055d400800000802a0080550080800080aa0080d4' \
+ '00aa0000aa2a00aa5500aa8000aaaa00aad400d40000d42a' \
+ '00d45500d48000d4aa00d4d42a00002a002a2a00552a0080' \
+ '2a00aa2a00d42a2a002a2a2a2a2a552a2a802a2aaa2a2ad4' \
+ '2a55002a552a2a55552a55802a55aa2a55d42a80002a802a' \
+ '2a80552a80802a80aa2a80d42aaa002aaa2a2aaa552aaa80' \
+ '2aaaaa2aaad42ad4002ad42a2ad4552ad4802ad4aa2ad4d4' \
+ '55000055002a5500555500805500aa5500d4552a00552a2a' \
+ '552a55552a80552aaa552ad455550055552a555555555580' \
+ '5555aa5555d455800055802a5580555580805580aa5580d4' \
+ '55aa0055aa2a55aa5555aa8055aaaa55aad455d40055d42a' \
+ '55d45555d48055d4aa55d4d480000080002a800055800080' \
+ '8000aa8000d4802a00802a2a802a55802a80802aaa802ad4' \
+ '80550080552a8055558055808055aa8055d480800080802a' \
+ '8080558080808080aa8080d480aa0080aa2a80aa5580aa80' \
+ '80aaaa80aad480d40080d42a80d45580d48080d4aa80d4d4' \
+ 'aa0000aa002aaa0055aa0080aa00aaaa00d4aa2a00aa2a2a' \
+ 'aa2a55aa2a80aa2aaaaa2ad4aa5500aa552aaa5555aa5580' \
+ 'aa55aaaa55d4aa8000aa802aaa8055aa8080aa80aaaa80d4' \
+ 'aaaa00aaaa2aaaaa55aaaa80aaaaaaaaaad4aad400aad42a' \
+ 'aad455aad480aad4aaaad4d4d40000d4002ad40055d40080' \
+ 'd400aad400d4d42a00d42a2ad42a55d42a80d42aaad42ad4' \
+ 'd45500d4552ad45555d45580d455aad455d4d48000d4802a' \
+ 'd48055d48080d480aad480d4d4aa00d4aa2ad4aa55d4aa80' \
+ 'd4aaaad4aad4d4d400d4d42ad4d455d4d480d4d4aad4d4d4' \
+ '0808081212121c1c1c2626263030303a3a3a4444444e4e4e' \
+ '5858586262626c6c6c7676768080808a8a8a9494949e9e9e' \
+ 'a8a8a8b2b2b2bcbcbcc6c6c6d0d0d0dadadae4e4e4eeeeee'
+
+# WeeChat basic colors (color name, index in terminal colors)
+WEECHAT_BASIC_COLORS = (
+ ('default', 0), ('black', 0), ('darkgray', 8), ('red', 1),
+ ('lightred', 9), ('green', 2), ('lightgreen', 10), ('brown', 3),
+ ('yellow', 11), ('blue', 4), ('lightblue', 12), ('magenta', 5),
+ ('lightmagenta', 13), ('cyan', 6), ('lightcyan', 14), ('gray', 7),
+ ('white', 0))
+
+
+log = logging.getLogger(__name__)
+
+
+class Color():
+ def __init__(self, color_options, debug=False):
+ self.color_options = color_options
+ self.debug = debug
+
+ def _rgb_color(self, index):
+ color = TERMINAL_COLORS[index*6:(index*6)+6]
+ col_r = int(color[0:2], 16) * 0.85
+ col_g = int(color[2:4], 16) * 0.85
+ col_b = int(color[4:6], 16) * 0.85
+ return '%02x%02x%02x' % (col_r, col_g, col_b)
+
+ def _convert_weechat_color(self, color):
+ # TODO figure out if I want to do something here
+ return ''
+
+ def _convert_terminal_color(self, fg_bg, attrs, color):
+ try:
+ index = int(color)
+ return '\x01(%s%s#%s)' % (fg_bg, attrs, self._rgb_color(index))
+ except Exception: # noqa: E722
+ log.debug('Error decoding terminal color "%s"', color)
+ return ''
+
+ def _convert_color_attr(self, fg_bg, color):
+ extended = False
+ if color[0].startswith('@'):
+ extended = True
+ color = color[1:]
+ attrs = ''
+ # keep_attrs = False
+ while color.startswith(('*', '!', '/', '_', '|')):
+ # TODO: manage the "keep attributes" flag
+ # if color[0] == '|':
+ # keep_attrs = True
+ attrs += color[0]
+ color = color[1:]
+ if extended:
+ return self._convert_terminal_color(fg_bg, attrs, color)
+ try:
+ index = int(color)
+ return self._convert_terminal_color(fg_bg, attrs,
+ WEECHAT_BASIC_COLORS[index][1])
+ except Exception: # noqa: E722
+ log.debug('Error decoding color "%s"', color)
+ return ''
+
+ def _attrcode_to_char(self, code):
+ codes = {
+ '\x01': '*',
+ '\x02': '!',
+ '\x03': '/',
+ '\x04': '_',
+ }
+ return codes.get(code, '')
+
+ def _convert_color(self, match):
+ # TODO figure out how to deal with color
+ color = match.group(0)
+ if color[0] == '\x19':
+ if color[1] == 'b':
+ # bar code, ignored
+ return ''
+ if color[1] == '\x1C':
+ # reset
+ return ''
+ if color[1] in ('F', 'B'):
+ # foreground or background
+ return self._convert_color_attr(color[1], color[2:])
+ if color[1] == '*':
+ # foreground with optional background
+ items = color[2:].split(',')
+ str_col = self._convert_color_attr('F', items[0])
+ if len(items) > 1:
+ str_col += self._convert_color_attr('B', items[1])
+ return str_col
+ if color[1] == '@':
+ # direct ncurses pair number, ignored
+ return ''
+ if color[1] == 'E':
+ # text emphasis, ignored
+ return ''
+ if color[1:].isdigit():
+ return self._convert_weechat_color(int(color[1:]))
+ elif color[0] == '\x1A':
+ # set attribute
+ return '\x01(+%s)' % self._attrcode_to_char(color[1])
+ elif color[0] == '\x1B':
+ # remove attribute
+ return '\x01(-%s)' % self._attrcode_to_char(color[1])
+ elif color[0] == '\x1C':
+ # reset
+ return ''
+ # should never be executed!
+ return match.group(0)
+
+ def _convert_color_debug(self, match):
+ group = match.group(0)
+ for code in (0x01, 0x02, 0x03, 0x04, 0x19, 0x1A, 0x1B):
+ group = group.replace(chr(code), '<x%02X>' % code)
+ return group
+
+ def convert(self, text):
+ if not text:
+ return ''
+ if self.debug:
+ return RE_COLOR.sub(self._convert_color_debug, text)
+ return RE_COLOR.sub(self._convert_color, text)
+
+
+def remove(text):
+ """Remove colors in a WeeChat string."""
+ if not text:
+ return ''
+ return re.sub(RE_COLOR, '', text)
diff --git a/src/relay/network.py b/src/relay/network.py
new file mode 100644
index 0000000..1071f1a
--- /dev/null
+++ b/src/relay/network.py
@@ -0,0 +1,403 @@
+# -*- coding: utf-8 -*-
+#
+# network.py - I/O with WeeChat/relay
+#
+# Copyright (C) 2011-2022 Sébastien Helleu <flashcode@flashtux.org>
+#
+# This file is part of QWeeChat, a Qt remote GUI for WeeChat.
+#
+# QWeeChat is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# QWeeChat is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""I/O with WeeChat/relay."""
+
+import hashlib
+import secrets
+import struct
+
+from gi.repository import GObject, GLib, Gio
+
+from weegtk import config
+
+from sshtunnel import SSHTunnelForwarder
+
+# list of supported hash algorithms on our side
+# (the hash algorithm will be negotiated with the remote WeeChat)
+_HASH_ALGOS_LIST = [
+ 'plain',
+ 'sha256',
+ 'sha512',
+ 'pbkdf2+sha256',
+ 'pbkdf2+sha512',
+]
+_HASH_ALGOS = ':'.join(_HASH_ALGOS_LIST)
+
+# handshake with remote WeeChat (before init)
+_PROTO_HANDSHAKE = f'(handshake) handshake password_hash_algo={_HASH_ALGOS}\n'
+
+# initialize with the password (plain text)
+_PROTO_INIT_PWD = 'init password=%(password)s%(totp)s\n' # nosec
+
+# initialize with the hashed password
+_PROTO_INIT_HASH = ('init password_hash='
+ '%(algo)s:%(salt)s%(iter)s:%(hash)s%(totp)s\n')
+
+_PROTO_SYNC_CMDS = [
+ # get buffers
+ '(listbuffers) hdata buffer:gui_buffers(*) number,full_name,short_name,'
+ 'type,nicklist,title,local_variables',
+ # get lines
+ '(listlines) hdata buffer:gui_buffers(*)/own_lines/last_line(-%(lines)d)/'
+ 'data date,displayed,prefix,message',
+ # get nicklist for all buffers
+ '(nicklist) nicklist',
+ # enable synchronization
+ 'sync',
+]
+
+STATUS_DISCONNECTED = 'disconnected'
+STATUS_CONNECTING = 'connecting'
+STATUS_AUTHENTICATING = 'authenticating'
+STATUS_CONNECTED = 'connected'
+
+NETWORK_STATUS = {
+ STATUS_DISCONNECTED: {
+ 'label': 'Disconnected',
+ 'color': '#aa0000',
+ 'icon': 'dialog-close.png',
+ },
+ STATUS_CONNECTING: {
+ 'label': 'Connecting…',
+ 'color': '#dd5f00',
+ 'icon': 'dialog-warning.png',
+ },
+ STATUS_AUTHENTICATING: {
+ 'label': 'Authenticating…',
+ 'color': '#007fff',
+ 'icon': 'dialog-password.png',
+ },
+ STATUS_CONNECTED: {
+ 'label': 'Connected',
+ 'color': 'green',
+ 'icon': 'dialog-ok-apply.png',
+ },
+}
+
+
+class Network(GObject.GObject):
+ """I/O with WeeChat/relay."""
+ __gsignals__ = {
+ "status_changed" : (
+ GObject.SignalFlags.RUN_FIRST,
+ None,
+ (str, str)
+ ),
+ "message_from_weechat" : (
+ GObject.SignalFlags.RUN_FIRST,
+ None,
+ (GLib.Bytes,)
+ )
+ }
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self._init_connection()
+ self.debug_lines = []
+ self.debug_dialog = None
+ self._lines = config.CONFIG_DEFAULT_RELAY_LINES
+ self.cancel_network_reads = Gio.Cancellable()
+ self._buffer = bytearray()
+ self._socketclient = Gio.SocketClient.new()
+ self._socket = None
+
+ # TODO figure out how to deal with these signals
+ #self._socket.connected.self._socket_connected)
+ #self._socket.readyRead.connect(self._socket_read)
+ #self._socket.disconnected.connect(self._socket_disconnected)
+
+ def _init_connection(self):
+ self.status = STATUS_DISCONNECTED
+ self._hostname = None
+ self._port = None
+ self._ssl = None
+ self._password = None
+ self._totp = None
+ self._handshake_received = False
+ self._handshake_timer = None
+ self._pwd_hash_algo = None
+ self._pwd_hash_iter = 0
+ self._server_nonce = None
+
+ def set_status(self, status):
+ """Set current status."""
+ self.status = status
+ self.emit("status_changed", status, None)
+
+ def pbkdf2(self, hash_name, salt):
+ """Return hashed password with PBKDF2-HMAC."""
+ return hashlib.pbkdf2_hmac(
+ hash_name,
+ password=self._password.encode('utf-8'),
+ salt=salt,
+ iterations=self._pwd_hash_iter,
+ ).hex()
+
+ def _build_init_command(self):
+ """Build the init command to send to WeeChat."""
+ totp = f',totp={self._totp}' if self._totp else ''
+ if self._pwd_hash_algo == 'plain': # nosec
+ cmd = _PROTO_INIT_PWD % {
+ 'password': self._password,
+ 'totp': totp,
+ }
+ else:
+ client_nonce = secrets.token_bytes(16)
+ salt = self._server_nonce + client_nonce
+ pwd_hash = None
+ iterations = ''
+ if self._pwd_hash_algo == 'pbkdf2+sha512': # nosec
+ pwd_hash = self.pbkdf2('sha512', salt)
+ iterations = f':{self._pwd_hash_iter}'
+ elif self._pwd_hash_algo == 'pbkdf2+sha256': # nosec
+ pwd_hash = self.pbkdf2('sha256', salt)
+ iterations = f':{self._pwd_hash_iter}'
+ elif self._pwd_hash_algo == 'sha512': # nosec
+ pwd = salt + self._password.encode('utf-8')
+ pwd_hash = hashlib.sha512(pwd).hexdigest()
+ elif self._pwd_hash_algo == 'sha256': # nosec
+ pwd = salt + self._password.encode('utf-8')
+ pwd_hash = hashlib.sha256(pwd).hexdigest()
+ if not pwd_hash:
+ return None
+ cmd = _PROTO_INIT_HASH % {
+ 'algo': self._pwd_hash_algo,
+ 'salt': bytearray(salt).hex(),
+ 'iter': iterations,
+ 'hash': pwd_hash,
+ 'totp': totp,
+ }
+ return cmd
+
+ def _build_sync_command(self):
+ """Build the sync commands to send to WeeChat."""
+ cmd = '\n'.join(_PROTO_SYNC_CMDS) + '\n'
+ return cmd % {'lines': self._lines}
+
+ def handshake_timer_expired(self):
+ if self.status == STATUS_AUTHENTICATING:
+ self._pwd_hash_algo = 'plain' # nosec
+ self.send_to_weechat(self._build_init_command())
+ self.sync_weechat()
+ self.set_status(STATUS_CONNECTED)
+ return False
+
+ def _socket_connected(self):
+ """Slot: socket connected."""
+ self.set_status(STATUS_AUTHENTICATING)
+ self.send_to_weechat(_PROTO_HANDSHAKE)
+ self._handshake_timer = GLib.timeout_add(2000, self.handshake_timer_expired)
+
+ def _socket_read(self, source_object, res, *user_data):
+ """Slot: data available on socket."""
+ try:
+ gbytes = self.input.read_bytes_finish(res)
+ except GLib.Error as err:
+ self.handle_network_error(err)
+ return
+
+ self._buffer.extend(gbytes.get_data())
+ while len(self._buffer) >= 4:
+ remainder = None
+ length = struct.unpack('>i', self._buffer[0:4])[0]
+ if len(self._buffer) < length:
+ # partial message, just wait for end of message
+ break
+ # more than one message?
+ if length < len(self._buffer):
+ # save beginning of another message
+ remainder = self._buffer[length:]
+ self._buffer = self._buffer[0:length]
+ self.emit("message_from_weechat", GLib.Bytes(self._buffer))
+ if not self.is_connected():
+ return
+ self._buffer.clear()
+ if remainder:
+ self._buffer.extend(remainder)
+
+ self.input.read_bytes_async(
+ 4096, 0, self.cancel_network_reads, self._socket_read)
+
+ def _socket_disconnected(self):
+ """Slot: socket disconnected."""
+ if self._handshake_timer:
+ self._handshake_timer.stop()
+ self._init_connection()
+ self.set_status(STATUS_DISCONNECTED)
+
+ def is_connected(self):
+ """Return True if the socket is connected, False otherwise."""
+ return self._socket.is_connected()
+
+ def is_ssl(self):
+ """Return True if SSL is used, False otherwise."""
+ return self._ssl
+
+ def connect_weechat_ssh(self, ssh_host, ssh_port, ssh_username, ssh_key,
+ relay_host, relay_port, relay_pw):
+ self.ssh_tunnel = SSHTunnelForwarder(
+ ssh_host,
+ ssh_username=ssh_username,
+ ssh_pkey=ssh_key,
+ remote_bind_address=(relay_host, int(relay_port)))
+
+ self.ssh_tunnel.start()
+
+ self.connect_weechat("localhost", self.ssh_tunnel.local_bind_port,
+ False, relay_pw, None, "")
+
+ def connect_weechat(self, hostname, port, ssl, password, totp, lines):
+ """Connect to WeeChat."""
+ self._hostname = hostname
+ try:
+ self._port = int(port)
+ except ValueError:
+ self._port = 0
+ self._ssl = ssl
+ self._password = password
+ self._totp = totp
+ try:
+ self._lines = int(lines)
+ except ValueError:
+ self._lines = config.CONFIG_DEFAULT_RELAY_LINES
+
+ # TODO handle SSL
+ self._socketclient.connect_async(
+ Gio.NetworkAddress.new(self._hostname, self._port),
+ None,
+ self._connected_func, None)
+ self.set_status(STATUS_CONNECTING)
+
+ def _connected_func(self, source_object, res, *user_data):
+ """Callback function called after connection attempt."""
+ try:
+ self._socket = self._socketclient.connect_finish(res)
+ except GLib.Error as err:
+ print("Connection failed:\n{}".format(err.message))
+ self.set_status(STATUS_NOT_CONNECTED)
+ return
+ else:
+ print("Connected")
+ self.set_status(STATUS_CONNECTED)
+ self._socket_connected()
+ self.input = self._socket.get_input_stream()
+ self.input.read_bytes_async(
+ 4096, 0, self.cancel_network_reads, self._socket_read)
+
+ def disconnect_weechat(self):
+ """Disconnect from WeeChat."""
+ if self._socket.state() == QtNetwork.QAbstractSocket.UnconnectedState:
+ self.set_status(STATUS_DISCONNECTED)
+ return
+ if self._socket.state() == QtNetwork.QAbstractSocket.ConnectedState:
+ self.send_to_weechat('quit\n')
+ self._socket.waitForBytesWritten(1000)
+ else:
+ self.set_status(STATUS_DISCONNECTED)
+ self._socket.abort()
+
+ def send_to_weechat(self, message):
+ """Send a message to WeeChat."""
+ output = self._socket.get_output_stream()
+ try:
+ output.write(message.encode("utf-8"))
+ except GLib.Error as err:
+ self.handle_network_error(err)
+
+ def init_with_handshake(self, response):
+ """Initialize with WeeChat using the handshake response."""
+ self._pwd_hash_algo = response['password_hash_algo']
+ self._pwd_hash_iter = int(response['password_hash_iterations'])
+ self._server_nonce = bytearray.fromhex(response['nonce'])
+ if self._pwd_hash_algo:
+ cmd = self._build_init_command()
+ if cmd:
+ self.send_to_weechat(cmd)
+ self.sync_weechat()
+ self.set_status(STATUS_CONNECTED)
+ return
+ # failed to initialize: disconnect
+ self.disconnect_weechat()
+
+ def desync_weechat(self):
+ """Desynchronize from WeeChat."""
+ self.send_to_weechat('desync\n')
+
+ def sync_weechat(self):
+ """Synchronize with WeeChat."""
+ self.send_to_weechat(self._build_sync_command())
+
+ def status_label(self, status):
+ """Return the label for a given status."""
+ return NETWORK_STATUS.get(status, {}).get('label', '')
+
+ def status_color(self, status):
+ """Return the color for a given status."""
+ return NETWORK_STATUS.get(status, {}).get('color', 'black')
+
+ def status_icon(self, status):
+ """Return the name of icon for a given status."""
+ return NETWORK_STATUS.get(status, {}).get('icon', '')
+
+ def get_options(self):
+ """Get connection options."""
+ return {
+ 'hostname': self._hostname,
+ 'port': self._port,
+ 'ssl': 'on' if self._ssl else 'off',
+ 'password': self._password,
+ 'lines': str(self._lines),
+ }
+
+ def debug_print(self, *args, **kwargs):
+ """Display a debug message."""
+ self.debug_lines.append((args, kwargs))
+ if self.debug_dialog:
+ self.debug_dialog.chat.display(*args, **kwargs)
+
+ def _debug_dialog_closed(self, result):
+ """Called when debug dialog is closed."""
+ self.debug_dialog = None
+
+ def debug_input_text_sent(self, text):
+ """Send debug buffer input to WeeChat."""
+ if self.network.is_connected():
+ text = str(text)
+ pos = text.find(')')
+ if text.startswith('(') and pos >= 0:
+ text = '(debug_%s)%s' % (text[1:pos], text[pos+1:])
+ else:
+ text = '(debug) %s' % text
+ self.network.debug_print(0, '<==', text, forcecolor='#AA0000')
+ self.network.send_to_weechat(text + '\n')
+
+ def open_debug_dialog(self):
+ """Open a dialog with debug messages."""
+ if not self.debug_dialog:
+ self.debug_dialog = DebugDialog()
+ self.debug_dialog.input.textSent.connect(
+ self.debug_input_text_sent)
+ self.debug_dialog.finished.connect(self._debug_dialog_closed)
+ self.debug_dialog.display_lines(self.debug_lines)
+ self.debug_dialog.chat.scroll_bottom()
+
diff --git a/src/relay/protocol.py b/src/relay/protocol.py
new file mode 100644
index 0000000..6f70544
--- /dev/null
+++ b/src/relay/protocol.py
@@ -0,0 +1,362 @@
+# -*- coding: utf-8 -*-
+#
+# protocol.py - decode binary messages received from WeeChat/relay
+#
+# Copyright (C) 2011-2022 Sébastien Helleu <flashcode@flashtux.org>
+#
+# This file is part of QWeeChat, a Qt remote GUI for WeeChat.
+#
+# QWeeChat is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# QWeeChat is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# For info about protocol and format of messages, please read document
+# "WeeChat Relay Protocol", available at: https://weechat.org/doc/
+#
+# History:
+#
+# 2011-11-23, Sébastien Helleu <flashcode@flashtux.org>:
+# start dev
+#
+
+"""Decode binary messages received from WeeChat/relay."""
+
+import collections
+import struct
+import zlib
+
+
+class WeechatDict(collections.OrderedDict):
+ def __str__(self):
+ return '{%s}' % ', '.join(
+ ['%s: %s' % (repr(key), repr(self[key])) for key in self])
+
+
+class WeechatObject:
+ def __init__(self, objtype, value, separator='\n'):
+ self.objtype = objtype
+ self.value = value
+ self.separator = separator
+ self.indent = ' ' if separator == '\n' else ''
+ self.separator1 = '\n%s' % self.indent if separator == '\n' else ''
+
+ def _str_value(self, val):
+ if isinstance(val, str) and val is not None:
+ return '\'%s\'' % val
+ return str(val)
+
+ def _str_value_hdata(self):
+ lines = ['%skeys: %s%s%spath: %s' % (self.separator1,
+ str(self.value['keys']),
+ self.separator,
+ self.indent,
+ str(self.value['path']))]
+ for i, item in enumerate(self.value['items']):
+ lines.append(' item %d:%s%s' % (
+ (i + 1), self.separator,
+ self.separator.join(
+ ['%s%s: %s' % (self.indent * 2, key,
+ self._str_value(value))
+ for key, value in item.items()])))
+ return '\n'.join(lines)
+
+ def _str_value_infolist(self):
+ lines = ['%sname: %s' % (self.separator1, self.value['name'])]
+ for i, item in enumerate(self.value['items']):
+ lines.append(' item %d:%s%s' % (
+ (i + 1), self.separator,
+ self.separator.join(
+ ['%s%s: %s' % (self.indent * 2, key,
+ self._str_value(value))
+ for key, value in item.items()])))
+ return '\n'.join(lines)
+
+ def _str_value_other(self):
+ return self._str_value(self.value)
+
+ def __str__(self):
+ obj_cb = {
+ 'hda': self._str_value_hdata,
+ 'inl': self._str_value_infolist,
+ }
+ return '%s: %s' % (self.objtype,
+ obj_cb.get(self.objtype, self._str_value_other)())
+
+
+class WeechatObjects(list):
+ def __init__(self, separator='\n'):
+ super().__init__()
+ self.separator = separator
+
+ def __str__(self):
+ return self.separator.join([str(obj) for obj in self])
+
+
+class WeechatMessage:
+ def __init__(self, size, size_uncompressed, compression, uncompressed,
+ msgid, objects):
+ self.size = size
+ self.size_uncompressed = size_uncompressed
+ self.compression = compression
+ self.uncompressed = uncompressed
+ self.msgid = msgid
+ self.objects = objects
+
+ def __str__(self):
+ if self.compression != 0:
+ return 'size: %d/%d (%d%%), id=\'%s\', objects:\n%s' % (
+ self.size, self.size_uncompressed,
+ 100 - ((self.size * 100) // self.size_uncompressed),
+ self.msgid, self.objects)
+ return 'size: %d, id=\'%s\', objects:\n%s' % (self.size,
+ self.msgid,
+ self.objects)
+
+
+class Protocol:
+ """Decode binary message received from WeeChat/relay."""
+
+ def __init__(self):
+ self.data = ''
+ self._obj_cb = {
+ 'chr': self._obj_char,
+ 'int': self._obj_int,
+ 'lon': self._obj_long,
+ 'str': self._obj_str,
+ 'buf': self._obj_buffer,
+ 'ptr': self._obj_ptr,
+ 'tim': self._obj_time,
+ 'htb': self._obj_hashtable,
+ 'hda': self._obj_hdata,
+ 'inf': self._obj_info,
+ 'inl': self._obj_infolist,
+ 'arr': self._obj_array,
+ }
+
+ def _obj_type(self):
+ """Read type in data (3 chars)."""
+ if len(self.data) < 3:
+ self.data = ''
+ return ''
+ objtype = self.data[0:3].decode()
+ self.data = self.data[3:]
+ return objtype
+
+ def _obj_len_data(self, length_size):
+ """Read length (1 or 4 bytes), then value with this length."""
+ if len(self.data) < length_size:
+ self.data = ''
+ return None
+ if length_size == 1:
+ length = struct.unpack('B', self.data[0:1])[0]
+ self.data = self.data[1:]
+ else:
+ length = self._obj_int()
+ if length < 0:
+ return None
+ if length > 0:
+ value = self.data[0:length]
+ self.data = self.data[length:]
+ else:
+ value = ''
+ return value
+
+ def _obj_char(self):
+ """Read a char in data."""
+ if len(self.data) < 1:
+ return 0
+ value = struct.unpack('b', self.data[0:1])[0]
+ self.data = self.data[1:]
+ return value
+
+ def _obj_int(self):
+ """Read an integer in data (4 bytes)."""
+ if len(self.data) < 4:
+ self.data = ''
+ return 0
+ value = struct.unpack('>i', self.data[0:4])[0]
+ self.data = self.data[4:]
+ return value
+
+ def _obj_long(self):
+ """Read a long integer in data (length on 1 byte + value as string)."""
+ value = self._obj_len_data(1)
+ if value is None:
+ return None
+ return int(value)
+
+ def _obj_str(self):
+ """Read a string in data (length on 4 bytes + content)."""
+ value = self._obj_len_data(4)
+ if value in ("", None):
+ return ""
+ return value.decode()
+
+ def _obj_buffer(self):
+ """Read a buffer in data (length on 4 bytes + data)."""
+ return self._obj_len_data(4)
+
+ def _obj_ptr(self):
+ """Read a pointer in data (length on 1 byte + value as string)."""
+ value = self._obj_len_data(1)
+ if value is None:
+ return None
+ return '0x%s' % value
+
+ def _obj_time(self):
+ """Read a time in data (length on 1 byte + value as string)."""
+ value = self._obj_len_data(1)
+ if value is None:
+ return None
+ return int(value)
+
+ def _obj_hashtable(self):
+ """
+ Read a hashtable in data
+ (type for keys + type for values + count + items).
+ """
+ type_keys = self._obj_type()
+ type_values = self._obj_type()
+ count = self._obj_int()
+ hashtable = WeechatDict()
+ for _ in range(count):
+ key = self._obj_cb[type_keys]()
+ value = self._obj_cb[type_values]()
+ hashtable[key] = value
+ return hashtable
+
+ def _obj_hdata(self):
+ """Read a hdata in data."""
+ path = self._obj_str()
+ keys = self._obj_str()
+ count = self._obj_int()
+ list_path = path.split('/') if path else []
+ list_keys = keys.split(',') if keys else []
+ keys_types = []
+ dict_keys = WeechatDict()
+ for key in list_keys:
+ items = key.split(':')
+ keys_types.append(items)
+ dict_keys[items[0]] = items[1]
+ items = []
+ for _ in range(count):
+ item = WeechatDict()
+ item['__path'] = []
+ pointers = []
+ for _ in enumerate(list_path):
+ pointers.append(self._obj_ptr())
+ for key, objtype in keys_types:
+ item[key] = self._obj_cb[objtype]()
+ item['__path'] = pointers
+ items.append(item)
+ return {
+ 'path': list_path,
+ 'keys': dict_keys,
+ 'count': count,
+ 'items': items,
+ }
+
+ def _obj_info(self):
+ """Read an info in data."""
+ name = self._obj_str()
+ value = self._obj_str()
+ return (name, value)
+
+ def _obj_infolist(self):
+ """Read an infolist in data."""
+ name = self._obj_str()
+ count_items = self._obj_int()
+ items = []
+ for _ in range(count_items):
+ count_vars = self._obj_int()
+ variables = WeechatDict()
+ for _ in range(count_vars):
+ var_name = self._obj_str()
+ var_type = self._obj_type()
+ var_value = self._obj_cb[var_type]()
+ variables[var_name] = var_value
+ items.append(variables)
+ return {
+ 'name': name,
+ 'items': items
+ }
+
+ def _obj_array(self):
+ """Read an array of values in data."""
+ type_values = self._obj_type()
+ count_values = self._obj_int()
+ values = []
+ for _ in range(count_values):
+ values.append(self._obj_cb[type_values]())
+ return values
+
+ def decode(self, data, separator='\n'):
+ """Decode binary data and return list of objects."""
+ self.data = data
+ size = len(self.data)
+ size_uncompressed = size
+ uncompressed = None
+ # uncompress data (if it is compressed)
+ compression = struct.unpack('b', self.data[4:5])[0]
+ if compression:
+ uncompressed = zlib.decompress(self.data[5:])
+ size_uncompressed = len(uncompressed) + 5
+ uncompressed = b'%s%s%s' % (struct.pack('>i', size_uncompressed),
+ struct.pack('b', 0), uncompressed)
+ self.data = uncompressed
+ else:
+ uncompressed = self.data[:]
+ # skip length and compression flag
+ self.data = self.data[5:]
+ # read id
+ msgid = self._obj_str()
+ if msgid is None:
+ msgid = ''
+ # read objects
+ objects = WeechatObjects(separator=separator)
+ while len(self.data) > 0:
+ objtype = self._obj_type()
+ value = self._obj_cb[objtype]()
+ objects.append(WeechatObject(objtype, value, separator=separator))
+ return WeechatMessage(size, size_uncompressed, compression,
+ uncompressed, msgid, objects)
+
+
+def hex_and_ascii(data, bytes_per_line=10):
+ """Convert a QByteArray to hex + ascii output."""
+ num_lines = ((len(data) - 1) // bytes_per_line) + 1
+ if num_lines == 0:
+ return ''
+ lines = []
+ for i in range(num_lines):
+ str_hex = []
+ str_ascii = []
+ for j in range(bytes_per_line):
+ # We can't easily iterate over individual bytes, so we are going to
+ # do it this way.
+ index = (i*bytes_per_line) + j
+ char = data[index:index+1]
+ if not char:
+ char = b'x'
+ byte = struct.unpack('B', char)[0]
+ str_hex.append(b'%02X' % int(byte))
+ if 32 <= byte <= 127:
+ str_ascii.append(char)
+ else:
+ str_ascii.append(b'.')
+ fmt = b'%%-%ds %%s' % ((bytes_per_line * 3) - 1)
+ lines.append(fmt % (b' '.join(str_hex),
+ b''.join(str_ascii)))
+ return b'\n'.join(lines)
+