fake_dns.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import dns.resolver
  2. import socketserver
  3. class DNSQuery:
  4. def __init__(self, data):
  5. self.data = data
  6. self.domain = ''
  7. t = (data[2] >> 3) & 15
  8. if t == 0:
  9. i = 12
  10. l = data[i]
  11. while l != 0:
  12. self.domain += data[i + 1:i + l + 1].decode('utf-8') + '.'
  13. i += l + 1
  14. l = data[i]
  15. def response(self):
  16. packet = b''
  17. if self.domain:
  18. name = self.domain
  19. namemap = DNSServer.namemap
  20. if namemap.__contains__(name):
  21. ip = namemap[name]
  22. else:
  23. answer = DNSServer.resolver.cache.data.get((name, 1, 1))
  24. if not answer:
  25. answer = DNSServer.resolver.resolve(name)
  26. DNSServer.resolver.cache.put((name, 1, 1), answer)
  27. ip = answer[0].to_text()
  28. packet += self.data[:2] + b'\x81\x80'
  29. packet += self.data[4:6] + self.data[4:6] + b'\x00\x00\x00\x00'
  30. packet += self.data[12:]
  31. packet += b'\xc0\x0c'
  32. packet += b'\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04'
  33. packet += bytes(map(int, ip.split('.')))
  34. return packet
  35. class DNSUDPHandler(socketserver.BaseRequestHandler):
  36. def handle(self):
  37. data = self.request[0]
  38. socket = self.request[1]
  39. try:
  40. query = DNSQuery(data)
  41. socket.sendto(query.response(), self.client_address)
  42. except Exception as e:
  43. print('fake_dns: %s' % repr(e))
  44. class DNSServer:
  45. def __init__(self, port=53):
  46. DNSServer.namemap = {}
  47. DNSServer.resolver = dns.resolver.Resolver(configure=False)
  48. DNSServer.resolver.nameservers = ['8.8.8.8', '8.8.4.4']
  49. DNSServer.resolver.cache = dns.resolver.Cache()
  50. self.port = port
  51. def addname(self, name, ip):
  52. DNSServer.namemap[name] = ip
  53. def start(self):
  54. HOST, PORT = "0.0.0.0", self.port
  55. socketserver.ThreadingUDPServer.allow_reuse_address = True
  56. server = socketserver.ThreadingUDPServer((HOST, PORT), DNSUDPHandler)
  57. server.serve_forever()
  58. def fake_dns(server_ip):
  59. dns = DNSServer()
  60. dns.addname('secure.zwift.com.', server_ip)
  61. dns.addname('us-or-rly101.zwift.com.', server_ip)
  62. dns.start()