From be71bf80d290c98c0f6a66d3dcab28515b4ea371 Mon Sep 17 00:00:00 2001 From: Lucas Fryzek Date: Fri, 4 Oct 2024 20:53:17 +0100 Subject: Initial commit --- src/relay/protocol.py | 362 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 362 insertions(+) create mode 100644 src/relay/protocol.py (limited to 'src/relay/protocol.py') diff --git a/src/relay/protocol.py b/src/relay/protocol.py new file mode 100644 index 0000000..6f70544 --- /dev/null +++ b/src/relay/protocol.py @@ -0,0 +1,362 @@ +# -*- coding: utf-8 -*- +# +# protocol.py - decode binary messages received from WeeChat/relay +# +# Copyright (C) 2011-2022 Sébastien Helleu +# +# This file is part of QWeeChat, a Qt remote GUI for WeeChat. +# +# QWeeChat is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# QWeeChat is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with QWeeChat. If not, see . +# + +# +# For info about protocol and format of messages, please read document +# "WeeChat Relay Protocol", available at: https://weechat.org/doc/ +# +# History: +# +# 2011-11-23, Sébastien Helleu : +# start dev +# + +"""Decode binary messages received from WeeChat/relay.""" + +import collections +import struct +import zlib + + +class WeechatDict(collections.OrderedDict): + def __str__(self): + return '{%s}' % ', '.join( + ['%s: %s' % (repr(key), repr(self[key])) for key in self]) + + +class WeechatObject: + def __init__(self, objtype, value, separator='\n'): + self.objtype = objtype + self.value = value + self.separator = separator + self.indent = ' ' if separator == '\n' else '' + self.separator1 = '\n%s' % self.indent if separator == '\n' else '' + + def _str_value(self, val): + if isinstance(val, str) and val is not None: + return '\'%s\'' % val + return str(val) + + def _str_value_hdata(self): + lines = ['%skeys: %s%s%spath: %s' % (self.separator1, + str(self.value['keys']), + self.separator, + self.indent, + str(self.value['path']))] + for i, item in enumerate(self.value['items']): + lines.append(' item %d:%s%s' % ( + (i + 1), self.separator, + self.separator.join( + ['%s%s: %s' % (self.indent * 2, key, + self._str_value(value)) + for key, value in item.items()]))) + return '\n'.join(lines) + + def _str_value_infolist(self): + lines = ['%sname: %s' % (self.separator1, self.value['name'])] + for i, item in enumerate(self.value['items']): + lines.append(' item %d:%s%s' % ( + (i + 1), self.separator, + self.separator.join( + ['%s%s: %s' % (self.indent * 2, key, + self._str_value(value)) + for key, value in item.items()]))) + return '\n'.join(lines) + + def _str_value_other(self): + return self._str_value(self.value) + + def __str__(self): + obj_cb = { + 'hda': self._str_value_hdata, + 'inl': self._str_value_infolist, + } + return '%s: %s' % (self.objtype, + obj_cb.get(self.objtype, self._str_value_other)()) + + +class WeechatObjects(list): + def __init__(self, separator='\n'): + super().__init__() + self.separator = separator + + def __str__(self): + return self.separator.join([str(obj) for obj in self]) + + +class WeechatMessage: + def __init__(self, size, size_uncompressed, compression, uncompressed, + msgid, objects): + self.size = size + self.size_uncompressed = size_uncompressed + self.compression = compression + self.uncompressed = uncompressed + self.msgid = msgid + self.objects = objects + + def __str__(self): + if self.compression != 0: + return 'size: %d/%d (%d%%), id=\'%s\', objects:\n%s' % ( + self.size, self.size_uncompressed, + 100 - ((self.size * 100) // self.size_uncompressed), + self.msgid, self.objects) + return 'size: %d, id=\'%s\', objects:\n%s' % (self.size, + self.msgid, + self.objects) + + +class Protocol: + """Decode binary message received from WeeChat/relay.""" + + def __init__(self): + self.data = '' + self._obj_cb = { + 'chr': self._obj_char, + 'int': self._obj_int, + 'lon': self._obj_long, + 'str': self._obj_str, + 'buf': self._obj_buffer, + 'ptr': self._obj_ptr, + 'tim': self._obj_time, + 'htb': self._obj_hashtable, + 'hda': self._obj_hdata, + 'inf': self._obj_info, + 'inl': self._obj_infolist, + 'arr': self._obj_array, + } + + def _obj_type(self): + """Read type in data (3 chars).""" + if len(self.data) < 3: + self.data = '' + return '' + objtype = self.data[0:3].decode() + self.data = self.data[3:] + return objtype + + def _obj_len_data(self, length_size): + """Read length (1 or 4 bytes), then value with this length.""" + if len(self.data) < length_size: + self.data = '' + return None + if length_size == 1: + length = struct.unpack('B', self.data[0:1])[0] + self.data = self.data[1:] + else: + length = self._obj_int() + if length < 0: + return None + if length > 0: + value = self.data[0:length] + self.data = self.data[length:] + else: + value = '' + return value + + def _obj_char(self): + """Read a char in data.""" + if len(self.data) < 1: + return 0 + value = struct.unpack('b', self.data[0:1])[0] + self.data = self.data[1:] + return value + + def _obj_int(self): + """Read an integer in data (4 bytes).""" + if len(self.data) < 4: + self.data = '' + return 0 + value = struct.unpack('>i', self.data[0:4])[0] + self.data = self.data[4:] + return value + + def _obj_long(self): + """Read a long integer in data (length on 1 byte + value as string).""" + value = self._obj_len_data(1) + if value is None: + return None + return int(value) + + def _obj_str(self): + """Read a string in data (length on 4 bytes + content).""" + value = self._obj_len_data(4) + if value in ("", None): + return "" + return value.decode() + + def _obj_buffer(self): + """Read a buffer in data (length on 4 bytes + data).""" + return self._obj_len_data(4) + + def _obj_ptr(self): + """Read a pointer in data (length on 1 byte + value as string).""" + value = self._obj_len_data(1) + if value is None: + return None + return '0x%s' % value + + def _obj_time(self): + """Read a time in data (length on 1 byte + value as string).""" + value = self._obj_len_data(1) + if value is None: + return None + return int(value) + + def _obj_hashtable(self): + """ + Read a hashtable in data + (type for keys + type for values + count + items). + """ + type_keys = self._obj_type() + type_values = self._obj_type() + count = self._obj_int() + hashtable = WeechatDict() + for _ in range(count): + key = self._obj_cb[type_keys]() + value = self._obj_cb[type_values]() + hashtable[key] = value + return hashtable + + def _obj_hdata(self): + """Read a hdata in data.""" + path = self._obj_str() + keys = self._obj_str() + count = self._obj_int() + list_path = path.split('/') if path else [] + list_keys = keys.split(',') if keys else [] + keys_types = [] + dict_keys = WeechatDict() + for key in list_keys: + items = key.split(':') + keys_types.append(items) + dict_keys[items[0]] = items[1] + items = [] + for _ in range(count): + item = WeechatDict() + item['__path'] = [] + pointers = [] + for _ in enumerate(list_path): + pointers.append(self._obj_ptr()) + for key, objtype in keys_types: + item[key] = self._obj_cb[objtype]() + item['__path'] = pointers + items.append(item) + return { + 'path': list_path, + 'keys': dict_keys, + 'count': count, + 'items': items, + } + + def _obj_info(self): + """Read an info in data.""" + name = self._obj_str() + value = self._obj_str() + return (name, value) + + def _obj_infolist(self): + """Read an infolist in data.""" + name = self._obj_str() + count_items = self._obj_int() + items = [] + for _ in range(count_items): + count_vars = self._obj_int() + variables = WeechatDict() + for _ in range(count_vars): + var_name = self._obj_str() + var_type = self._obj_type() + var_value = self._obj_cb[var_type]() + variables[var_name] = var_value + items.append(variables) + return { + 'name': name, + 'items': items + } + + def _obj_array(self): + """Read an array of values in data.""" + type_values = self._obj_type() + count_values = self._obj_int() + values = [] + for _ in range(count_values): + values.append(self._obj_cb[type_values]()) + return values + + def decode(self, data, separator='\n'): + """Decode binary data and return list of objects.""" + self.data = data + size = len(self.data) + size_uncompressed = size + uncompressed = None + # uncompress data (if it is compressed) + compression = struct.unpack('b', self.data[4:5])[0] + if compression: + uncompressed = zlib.decompress(self.data[5:]) + size_uncompressed = len(uncompressed) + 5 + uncompressed = b'%s%s%s' % (struct.pack('>i', size_uncompressed), + struct.pack('b', 0), uncompressed) + self.data = uncompressed + else: + uncompressed = self.data[:] + # skip length and compression flag + self.data = self.data[5:] + # read id + msgid = self._obj_str() + if msgid is None: + msgid = '' + # read objects + objects = WeechatObjects(separator=separator) + while len(self.data) > 0: + objtype = self._obj_type() + value = self._obj_cb[objtype]() + objects.append(WeechatObject(objtype, value, separator=separator)) + return WeechatMessage(size, size_uncompressed, compression, + uncompressed, msgid, objects) + + +def hex_and_ascii(data, bytes_per_line=10): + """Convert a QByteArray to hex + ascii output.""" + num_lines = ((len(data) - 1) // bytes_per_line) + 1 + if num_lines == 0: + return '' + lines = [] + for i in range(num_lines): + str_hex = [] + str_ascii = [] + for j in range(bytes_per_line): + # We can't easily iterate over individual bytes, so we are going to + # do it this way. + index = (i*bytes_per_line) + j + char = data[index:index+1] + if not char: + char = b'x' + byte = struct.unpack('B', char)[0] + str_hex.append(b'%02X' % int(byte)) + if 32 <= byte <= 127: + str_ascii.append(char) + else: + str_ascii.append(b'.') + fmt = b'%%-%ds %%s' % ((bytes_per_line * 3) - 1) + lines.append(fmt % (b' '.join(str_hex), + b''.join(str_ascii))) + return b'\n'.join(lines) + -- cgit