utils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. """DWC Network Server Emulator
  2. Copyright (C) 2014 polaris-
  3. Copyright (C) 2014 msoucy
  4. Copyright (C) 2016 Sepalani
  5. This program is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU Affero General Public License as
  7. published by the Free Software Foundation, either version 3 of the
  8. License, or (at your option) any later version.
  9. This program is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU Affero General Public License for more details.
  13. You should have received a copy of the GNU Affero General Public License
  14. along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. """
  16. import base64
  17. import logging
  18. import logging.handlers
  19. import random
  20. import string
  21. import struct
  22. import urllib.parse
  23. import ctypes
  24. import os
  25. from functools import reduce
  26. def generate_random_str_from_set(ln, chs):
  27. """Generate a random string of size <ln> based on charset <chs>."""
  28. return ''.join(random.choice(chs) for _ in range(ln))
  29. def generate_random_str(ln, chs=""):
  30. """Generate a random string of size <ln>."""
  31. return generate_random_str_from_set(
  32. ln,
  33. chs or (string.ascii_letters + string.digits)
  34. )
  35. def generate_random_number_str(ln):
  36. """Generate a random number string of size <ln>."""
  37. return generate_random_str_from_set(ln, string.digits)
  38. def generate_random_hex_str(ln):
  39. """Generate a random hexadecimal number string of size <ln>."""
  40. return generate_random_str_from_set(ln, string.hexdigits.lower())
  41. def calculate_crc8(inp):
  42. """
  43. Code: Tetris DS @ 020573F4
  44. """
  45. crc_table = [
  46. 0x00, 0x07, 0x0E, 0x09, 0x1C, 0x1B, 0x12, 0x15, 0x38, 0x3F, 0x36, 0x31, 0x24, 0x23, 0x2A, 0x2D,
  47. 0x70, 0x77, 0x7E, 0x79, 0x6C, 0x6B, 0x62, 0x65, 0x48, 0x4F, 0x46, 0x41, 0x54, 0x53, 0x5A, 0x5D,
  48. 0xE0, 0xE7, 0xEE, 0xE9, 0xFC, 0xFB, 0xF2, 0xF5, 0xD8, 0xDF, 0xD6, 0xD1, 0xC4, 0xC3, 0xCA, 0xCD,
  49. 0x90, 0x97, 0x9E, 0x99, 0x8C, 0x8B, 0x82, 0x85, 0xA8, 0xAF, 0xA6, 0xA1, 0xB4, 0xB3, 0xBA, 0xBD,
  50. 0xC7, 0xC0, 0xC9, 0xCE, 0xDB, 0xDC, 0xD5, 0xD2, 0xFF, 0xF8, 0xF1, 0xF6, 0xE3, 0xE4, 0xED, 0xEA,
  51. 0xB7, 0xB0, 0xB9, 0xBE, 0xAB, 0xAC, 0xA5, 0xA2, 0x8F, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9D, 0x9A,
  52. 0x27, 0x20, 0x29, 0x2E, 0x3B, 0x3C, 0x35, 0x32, 0x1F, 0x18, 0x11, 0x16, 0x03, 0x04, 0x0D, 0x0A,
  53. 0x57, 0x50, 0x59, 0x5E, 0x4B, 0x4C, 0x45, 0x42, 0x6F, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7D, 0x7A,
  54. 0x89, 0x8E, 0x87, 0x80, 0x95, 0x92, 0x9B, 0x9C, 0xB1, 0xB6, 0xBF, 0xB8, 0xAD, 0xAA, 0xA3, 0xA4,
  55. 0xF9, 0xFE, 0xF7, 0xF0, 0xE5, 0xE2, 0xEB, 0xEC, 0xC1, 0xC6, 0xCF, 0xC8, 0xDD, 0xDA, 0xD3, 0xD4,
  56. 0x69, 0x6E, 0x67, 0x60, 0x75, 0x72, 0x7B, 0x7C, 0x51, 0x56, 0x5F, 0x58, 0x4D, 0x4A, 0x43, 0x44,
  57. 0x19, 0x1E, 0x17, 0x10, 0x05, 0x02, 0x0B, 0x0C, 0x21, 0x26, 0x2F, 0x28, 0x3D, 0x3A, 0x33, 0x34,
  58. 0x4E, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5C, 0x5B, 0x76, 0x71, 0x78, 0x7F, 0x6A, 0x6D, 0x64, 0x63,
  59. 0x3E, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2C, 0x2B, 0x06, 0x01, 0x08, 0x0F, 0x1A, 0x1D, 0x14, 0x13,
  60. 0xAE, 0xA9, 0xA0, 0xA7, 0xB2, 0xB5, 0xBC, 0xBB, 0x96, 0x91, 0x98, 0x9F, 0x8A, 0x8D, 0x84, 0x83,
  61. 0xDE, 0xD9, 0xD0, 0xD7, 0xC2, 0xC5, 0xCC, 0xCB, 0xE6, 0xE1, 0xE8, 0xEF, 0xFA, 0xFD, 0xF4, 0xF3
  62. ]
  63. crc = 0
  64. for b in inp:
  65. crc = crc_table[(b ^ crc) & 0xff]
  66. return crc
  67. def base32_encode(num, reverse=True):
  68. """Encode a number in base 32.
  69. Result string is reversed by default.
  70. """
  71. alpha = "0123456789abcdefghijklmnopqrstuv"
  72. encoded = ""
  73. while num > 0:
  74. encoded += alpha[num & 0x1f]
  75. num >>= 5
  76. encoded.ljust(9, '0')
  77. if reverse:
  78. encoded = encoded[::-1]
  79. return encoded
  80. def base32_decode(s, reverse=False):
  81. """Decode a number in base 32.
  82. Input string is not reversed by default.
  83. """
  84. alpha = "0123456789abcdefghijklmnopqrstuv"
  85. if reverse:
  86. s = s[::-1]
  87. return reduce(lambda orig, b: ((orig << 5) | alpha.index(b)), s, 0)
  88. # Number routines
  89. def get_num_from_bytes(data, idx, fmt, bigEndian=False):
  90. """Get number from bytes.
  91. Endianness by default is little.
  92. """
  93. return struct.unpack_from("<>"[bigEndian] + fmt, buffer(bytearray(data)), idx)[0]
  94. # Instead of passing slices, pass the buffer and index so we can calculate
  95. # the length automatically.
  96. def get_short_signed(data, idx, be=False):
  97. """Get short from bytes.
  98. Endianness by default is little.
  99. """
  100. return get_num_from_bytes(data, idx, 'h', be)
  101. def get_short(data, idx, be=False):
  102. """Get unsigned short from bytes.
  103. Endianness by default is little.
  104. """
  105. return get_num_from_bytes(data, idx, 'H', be)
  106. def get_int_signed(data, idx, be=False):
  107. """Get int from bytes.
  108. Endianness by default is little.
  109. """
  110. return get_num_from_bytes(data, idx, 'i', be)
  111. def get_int(data, idx, be=False):
  112. """Get unsigned int from bytes.
  113. Endianness by default is little.
  114. """
  115. return get_num_from_bytes(data, idx, 'I', be)
  116. def get_ip(data, idx, be=False):
  117. """Get IP from bytes.
  118. Endianness by default is little.
  119. """
  120. return ctypes.c_int32(get_int(data, idx, be)).value
  121. def get_ip_str(data, idx):
  122. """Get IP string from bytes."""
  123. return '.'.join("%d" % x for x in bytearray(data[idx:idx+4]))
  124. def get_ip_from_str(ip_str, be=False):
  125. """Get IP from string.
  126. Endianness by default is little.
  127. """
  128. return get_ip(bytearray([int(x) for x in ip_str.split('.')]), 0, be)
  129. def get_local_addr(data, idx):
  130. """Get local address."""
  131. localip = get_ip_str(data, idx)
  132. localip_int_le = get_ip(data, idx)
  133. localip_int_be = get_ip(data, idx, True)
  134. localport = get_short(data, idx + 4, True)
  135. return (localip, localport, localip_int_le, localip_int_be)
  136. def get_string(data, idx):
  137. """Get string from bytes."""
  138. data = data[idx:]
  139. end = data.index('\x00')
  140. return str(''.join(data[:end]))
  141. def get_bytes_from_num(num, fmt, bigEndian=False):
  142. """Get bytes from number.
  143. Endianness by default is little.
  144. """
  145. return struct.pack("<>"[bigEndian] + fmt, num)
  146. def get_bytes_from_short_signed(num, be=False):
  147. """Get bytes from short.
  148. Endianness by default is little.
  149. """
  150. return get_bytes_from_num(num, 'h', be)
  151. def get_bytes_from_short(num, be=False):
  152. """Get bytes from unsigned short.
  153. Endianness by default is little.
  154. """
  155. return get_bytes_from_num(num, 'H', be)
  156. def get_bytes_from_int_signed(num, be=False):
  157. """Get bytes from int.
  158. Endianness by default is little.
  159. """
  160. return get_bytes_from_num(num, 'i', be)
  161. def get_bytes_from_int(num, be=False):
  162. """Get bytes from unsigned int.
  163. Endianness by default is little.
  164. """
  165. return get_bytes_from_num(num, 'I', be)
  166. def get_bytes_from_ip_str(ip_str):
  167. """Get bytes from IP string."""
  168. return bytearray([int(x) for x in ip_str.split('.')])
  169. def create_logger(loggername, filename, level, log_to_console, log_to_file):
  170. """Server logging."""
  171. log_folder = "logs"
  172. # Create log folder if it doesn't exist
  173. if not os.path.exists(log_folder):
  174. os.makedirs(log_folder)
  175. # Build full path to log file
  176. filename = os.path.join(log_folder, filename)
  177. logging.addLevelName(-1, "TRACE")
  178. fmt = "[%(asctime)s | " + loggername + "] %(message)s"
  179. date_format = "%Y-%m-%d %H:%M:%S"
  180. # logging.basicConfig(format=format, datefmt=date_format)
  181. logger = logging.getLogger(loggername)
  182. logger.setLevel(level)
  183. # Only needed when logging.basicConfig isn't set.
  184. if log_to_console:
  185. console_logger = logging.StreamHandler()
  186. console_logger.setFormatter(
  187. logging.Formatter(fmt, datefmt=date_format)
  188. )
  189. logger.addHandler(console_logger)
  190. if log_to_file and filename:
  191. # Use a rotating log set to rotate every night at midnight with a
  192. # max of 10 backups
  193. file_logger = logging.handlers.TimedRotatingFileHandler(
  194. filename,
  195. when='midnight',
  196. backupCount=10) # logging.FileHandler(filename)
  197. file_logger.setFormatter(
  198. logging.Formatter(fmt, datefmt=date_format)
  199. )
  200. logger.addHandler(file_logger)
  201. return logger
  202. def print_hex(data, cols=16, sep=' ', pretty=True):
  203. """Print data in hexadecimal.
  204. Customizable separator and columns number.
  205. Can be pretty printed but takes more time.
  206. """
  207. if pretty:
  208. print((pretty_print_hex(data, cols, sep)))
  209. else:
  210. print((sep.join("%02x" % b for b in bytearray(data))))
  211. def pretty_print_hex(orig_data, cols=16, sep=' '):
  212. """Hexadecimal pretty print.
  213. Takes ~1s per characters.
  214. Customizable separator and columns number.
  215. """
  216. data = bytearray(orig_data)
  217. end = len(data)
  218. line = "\n%08x | %-*s | %s"
  219. size = cols * 3 - 1
  220. i = 0
  221. output = ""
  222. while i < end:
  223. if i + cols < end:
  224. j = i + cols
  225. else:
  226. j = end
  227. output += line % (
  228. i,
  229. size,
  230. sep.join("%02x" % c for c in data[i:j]),
  231. "".join(chr(c) if 0x20 <= c < 0x7F else
  232. '.'
  233. for c in data[i:j])
  234. )
  235. i += cols
  236. return output
  237. # def pretty_print_hex(orig_data, cols=16):
  238. # """Takes ~1.5s per characters"""
  239. #
  240. # data = bytearray(orig_data)
  241. # output = "\n"
  242. #
  243. # for i in range(len(data) / cols + 1):
  244. # output += "%08x | " % (i * 16)
  245. #
  246. # c = 0
  247. # for x in range(cols):
  248. # if (i * cols + x + 1) > len(data):
  249. # break
  250. #
  251. # output += "%02x " % data[i * cols + x]
  252. # c += 1
  253. #
  254. # c = cols - c
  255. # output += " " * (c * 3 + 1)
  256. # for x in range(cols):
  257. # if (i * cols + x + 1) > len(data):
  258. # break
  259. #
  260. # if not chr(data[i * cols + x]) in string.printable:
  261. # output += "."
  262. # else:
  263. # output += "%c" % data[i * cols + x]
  264. # output += "\n"
  265. #
  266. # return output
  267. def qs_to_dict(s):
  268. """Convert query string to dict."""
  269. ret = urllib.parse.parse_qs(s, True)
  270. for k, v in list(ret.items()):
  271. try:
  272. # I'm not sure about the replacement for '-', but it'll at
  273. # least let it be decoded.
  274. # For the most part it's not important since it's mostly
  275. # used for the devname/ingamesn fields.
  276. ret[k] = base64.b64decode(urllib.parse.unquote(v[0])
  277. .replace("*", "=")
  278. .replace("?", "/")
  279. .replace(">", "+")
  280. .replace("-", "/"))
  281. except TypeError:
  282. """
  283. print("Could not decode following string: ret[%s] = %s"
  284. % (k, v[0]))
  285. print("url: %s" % s)
  286. """
  287. # If you don't assign it like this it'll be a list, which
  288. # breaks other code.
  289. ret[k] = v[0]
  290. return ret
  291. def dict_to_qs(d):
  292. """Convert dict to query string.
  293. nas(wii).nintendowifi.net has a URL query-like format but does not
  294. use encoding for special characters.
  295. """
  296. # Dictionary comprehension is used to not modify the original
  297. ret = {k: (base64.b64encode(v.encode())).decode('utf-8').replace("=", "*").encode() for k, v in list(d.items())}
  298. print(ret)
  299. return "&".join("{!s}={!s}".format(k, v.decode('utf-8')) for k, v in list(ret.items())) + "\r\n"