proxy.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import requests
  2. import signal
  3. from database import get_proxys
  4. from Console import console
  5. from settings import USE_PROXY
  6. class Proxy:
  7. def __init__(self, ip: str, protocol: str, port: int):
  8. self.ip = ip
  9. self.protocol = protocol
  10. self.port = port
  11. def telethon_proxy(self):
  12. if self.protocol == "SOCKS5":
  13. return "socks5", self.ip, self.port
  14. elif self.protocol == "SOCKS4":
  15. return "socks5", self.ip, self.port
  16. elif self.protocol == "HTTP":
  17. return "http", self.ip, self.port
  18. elif self.protocol == "HTTPS":
  19. return "https", self.ip, self.port
  20. else:
  21. return None, None, None
  22. def selenium_proxy(self):
  23. if self.ip and self.port:
  24. return self.ip + ':' + str(self.port)
  25. else:
  26. return ""
  27. def proxy(self):
  28. return self.ip, self.protocol, self.port
  29. class Proxy_controller:
  30. def __init__(self, proxy: list, for_one=5, use_proxy=USE_PROXY):
  31. self.list_of_proxy = []
  32. for el in proxy:
  33. self.list_of_proxy.append(Proxy(el[0], el[1], int(el[2])))
  34. self.pos = 0
  35. self.tries = 0
  36. self.tries_for_one = for_one
  37. self.use_proxy = use_proxy
  38. def get_proxy(self):
  39. if len(self.list_of_proxy) == 0 or not self.use_proxy:
  40. return None
  41. if self.tries < self.tries_for_one:
  42. return self.list_of_proxy[self.pos]
  43. else:
  44. if self.pos < len(self.list_of_proxy) - 1:
  45. self.pos += 1
  46. else:
  47. self.pos = 0
  48. self.tries = 0
  49. return self.list_of_proxy[self.pos]
  50. def add_proxy(self, proxy):
  51. self.list_of_proxy.append(proxy)
  52. def remove_proxy(self, proxy):
  53. self.list_of_proxy.remove(proxy)
  54. def signal_handler(signum, frame):
  55. raise Exception("Timed out!")
  56. def check_proxy(proxy_ip, mode, timeout=10, url="https://google.com"):
  57. proxy = {"http": "socks5://{}".format(proxy_ip), "https": "socks5://{}".format(proxy_ip)}
  58. signal.signal(signal.SIGALRM, signal_handler)
  59. signal.alarm(timeout) # Timeout in seconds
  60. try:
  61. requests.get(url, proxies=proxy)
  62. return True
  63. except Exception as e:
  64. print("Exception:", e)
  65. return False
  66. def available_proxy():
  67. proxy = get_proxys()
  68. res = []
  69. if proxy:
  70. for el in proxy:
  71. # el: ip, protocol, port
  72. if check_proxy(el[0], el[1]):
  73. res.append(el)
  74. return res
  75. def get_all_proxys():
  76. all_proxy = get_proxys()
  77. ret = []
  78. for el in all_proxy:
  79. if int(el[3]):
  80. ret.append(Proxy(el[0], el[1], int(el[2])))
  81. return ret
  82. def socks_proxy():
  83. ret = []
  84. for el in get_proxys():
  85. if el[1] == "SOCKS5" and el[3]:
  86. ret.append(Proxy(el[0], el[1], int(el[2])))
  87. return ret
  88. def http_proxy():
  89. ret = []
  90. for el in get_proxys():
  91. if el[1] == "HTTP" and el[3]:
  92. ret.append(Proxy(el[0], el[1], int(el[2])))
  93. return ret
  94. if __name__ == '__main__':
  95. c = Proxy_controller(get_proxys())
  96. print(c.get_proxy().telethon_proxy())