unblockcentral.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. #!/usr/bin/python3
  2. # Why?
  3. # - A firm belief in offline first!!!!!
  4. # - No reason to trust youtube. Please if you ever want to publish anything don't go to the largest platform.
  5. # remark: you can start as many workers as you want
  6. # unfortunately, state is not persisted within the program but youtube-dl is smart enough to resume downloads
  7. import subprocess
  8. from datetime import datetime
  9. import re
  10. import sys
  11. import queue
  12. from collections import defaultdict
  13. import threading
  14. from threading import Thread, Lock
  15. from ipcqueue import posixmq
  16. import getopt
  17. import random
  18. import json
  19. from signal import signal, SIGINT
  20. from sys import exit
  21. import pathlib
  22. # TODO compute and save scores ; output an entry for each experiment, to put in dataframe
  23. central = None
  24. def cleanup(central):
  25. print('%s' % central)
  26. if central:
  27. print('unlinking config files')
  28. for filename in central.cfnfiles:
  29. print('%s' % filename)
  30. pathlib.Path(filename).unlink()
  31. def handler(signal_received, frame):
  32. # Handle any cleanup here
  33. print('Interrupted. Exiting gracefully')
  34. global central
  35. cleanup(central)
  36. exit(0)
  37. # cands = [
  38. # #############
  39. # # EDIT HERE #
  40. # #############
  41. # # (put some actual SOCKS5 proxies here)
  42. # # (random addresses for illustrative purposes, not suggesting to use these. please go find a reasonable, up to date and legal list of proxies to use)
  43. # {"addrport": ("101.99.12.105", "1080")},
  44. # {"addrport": ("47.75.160.210", "1080"), "location":"us"},
  45. # {"addrport": ("98.188.47.150", "4145")},
  46. # ]
  47. cands = json.loads(open("cands.json", "rb").read())
  48. # open("cands.json", "wb").write(json.dumps(cands).encode('utf-8'))
  49. import base64
  50. def serstr(x):
  51. """
  52. make stuff json serializable
  53. """
  54. if x == None:
  55. return {"data": None}
  56. try:
  57. y = x.decode("utf-8")
  58. return {"encoding": "utf-8", "data": y}
  59. except UnicodeError as e:
  60. return {"encoding": "base64", "data": base64.b64encode(x)}
  61. except AttributeError as e:
  62. # how to check if it is already a utf8 string?
  63. return {"encoding": "utf-8", "data": x}
  64. class Central:
  65. def __init__(self):
  66. dt = datetime.timestamp(datetime.now())
  67. self.logfilename = "log-x-%s" % dt
  68. self.log = open(self.logfilename, "w")
  69. self.cfnfiles = []
  70. def attempt(self, chain, ytlink, ytdl_options=[]):
  71. '''
  72. Attempt a download
  73. '''
  74. dt = datetime.timestamp(datetime.now())
  75. cfn = "pc-xy-%s.conf" % dt
  76. # file should be temporary
  77. self.writeconfig(cfn, chain)
  78. dic = self.doit(cfn, ytlink, ytdl_options)
  79. return dic
  80. def doit(self, cfn, ytlink, ytdl_options):
  81. cmd = ["pc", "-f", cfn, "youtube-dl"] + ytdl_options + [ytlink]
  82. self.log.write("%s" % cmd)
  83. self.log.flush()
  84. cp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
  85. output = b""
  86. # https://stackoverflow.com/questions/19880190/interactive-input-output-using-python
  87. while (True):
  88. data = cp.stdout.read(1)
  89. if not data:
  90. break
  91. output += data
  92. blah = ""
  93. try:
  94. blah = data.decode("utf-8")
  95. except Exception:
  96. pass
  97. sys.stdout.write(blah) # yeah, thanks a lot. yes it's correct to make the distinction obviously between bytes and text but why can't simply put ascii on sys.stdout?
  98. sys.stdout.flush()
  99. cp.wait()
  100. print("return code: %d" % cp.returncode)
  101. self.log.write("%s" % output)
  102. self.log.flush()
  103. connect_fail = True
  104. partial_success = False
  105. addrportstring_connect_fail_sock = None
  106. reasonstring_unable_generic = None
  107. reasonstring_error_download = None
  108. timeout = False
  109. e429 = False
  110. e403 = False
  111. # this is the gold nugget we're panning for
  112. if re.search(b"\[download\]\s+\d+.\d+%\s+of\s+~[^\s]+\s+at\s+[^\s]+\s+ETA", output):
  113. partial_success = True
  114. if re.search(b"... OK\n", output):
  115. print("Connection chain succeeded")
  116. connect_fail = False
  117. st = re.search(b"([^ ]+) <--socket error or timeout!", output)
  118. if st:
  119. print("Socket error or timeout")
  120. addrportstring_connect_fail_sock = st.group(1)
  121. if re.search(b"Unable to download webpage: HTTP Error 429", output):
  122. print("HTTP Error 429")
  123. e429 = True
  124. st = re.search(b"ERROR: [Uu]nable to download webpage: <urlopen error \[Errno (\d+)\] (.*)>", output)
  125. if st:
  126. # 0 = ?
  127. # 111 = connection refused
  128. print("Error %s %s" % (st.group(1), st.group(2)))
  129. reasonstring_unable_generic = st.group(0)
  130. elif re.search(b"ERROR: [Uu]nable to download webpage: Remote end closed connection without", output):
  131. print("Connection closed without reason")
  132. reasonstring_error_download = "Connection closed without reason"
  133. reasonstring_unable_generic = "Connection closed without reason"
  134. elif re.search(b"ERROR: [Uu]nable to download webpage: The read operation timed out", output):
  135. print("Timeout")
  136. timeout = True
  137. elif re.search(b"ERROR: [Uu]nable to download webpage: <urlopen error .*: The handshake operation timed out>", output):
  138. print("Timeout")
  139. timeout = True
  140. # "webpage" or "video data"
  141. st1 = re.search(b"ERROR: [Uu]nable to download .*: HTTP Error 403: Forbidden", output)
  142. st2 = re.search(b"ERROR: [Uu]nable to download .*: <urlopen error \[Errno (\d+)\] (.*)>", output)
  143. st3 = re.search(b"ERROR: [Uu]nable to download .*: <urlopen error \[SSL: ([A-Z_]+)\] (.*)>", output)
  144. # IncompleteRead ...
  145. st4 = re.search(b"ERROR: [Uu]nable to download .*: (.*)", output)
  146. st_generic = re.search(b"^ERROR:(.*)", output)
  147. if st1:
  148. print("Error 403")
  149. e403 = True
  150. elif st2:
  151. print("Error %s %s during download" % (st2.group(1), st2.group(2)))
  152. reasonstring_error_download = st2.group(0)
  153. reasonstring_unable_generic = st2.group(0)
  154. elif st3:
  155. print("Error %s %s during download" % (st3.group(1), st3.group(2)))
  156. reasonstring_error_download = st3.group(0)
  157. reasonstring_unable_generic = st3.group(0)
  158. elif st4:
  159. print("Error %s during download" % (st4.group(1)))
  160. reasonstring_error_download = st4.group(0)
  161. reasonstring_unable_generic = st4.group(0)
  162. elif st_generic:
  163. print("Error %s during download" % st_generic.group(1))
  164. reasonstring_error_download = st4.group(0)
  165. reasonstring_unable_generic = st4.group(0)
  166. success = (addrportstring_connect_fail_sock == None) and (reasonstring_error_download == None) and (reasonstring_unable_generic == None) and not (connect_fail or e429 or e403 or reasonstring_unable_generic or timeout)
  167. dic = {"success": success,
  168. "partial_success": partial_success,
  169. "timeout": timeout,
  170. "connect_fail": connect_fail,
  171. "addrportstring_connect_fail_sock": serstr(addrportstring_connect_fail_sock),
  172. "e403": e403,
  173. "e429": e429,
  174. "unable": serstr(reasonstring_unable_generic),
  175. "reasonstring_error_download": serstr(reasonstring_error_download)}
  176. return dic
  177. def writeconfig(self, filename, chain, tcp_read_timeout = 32000, tcp_connect_timeout = 24000):
  178. conf = open(filename, 'w')
  179. if conf == None:
  180. print("error opening %s for writing config" % filename)
  181. return
  182. conf.write('strict_chain\n')
  183. conf.write('proxy_dns\n')
  184. conf.write('remote_dns_subnet 224\n')
  185. # timeouts should be settable and recorded in experiment
  186. conf.write('tcp_read_time_out %d\n' % tcp_read_timeout)
  187. conf.write('tcp_connect_time_out %d\n' % tcp_connect_timeout)
  188. conf.write('localnet 127.0.0.0/255.0.0.0\n')
  189. conf.write('\n')
  190. conf.write('[ProxyList]\n')
  191. conf.write('socks5 127.0.0.1 9050\n')
  192. for c in chain:
  193. conf.write('socks5 %s %s\n' % (c[0], c[1]))
  194. conf.close()
  195. self.cfnfiles += [filename]
  196. from pathlib import Path
  197. # use pipes ...
  198. class FileOutput:
  199. def __init__(self, filename):
  200. myPath = Path(filename)
  201. if myPath.exists():
  202. if myPath.is_file(): # pipes?
  203. print("%s already exists, trying to append" % filename)
  204. else:
  205. # and os.access(PATH, os.R_OK):
  206. print("%s already exists and is not a file" % filename)
  207. return None
  208. # return False
  209. # not "ab" for goddamn json
  210. self.f = open(filename, "a")
  211. def log(self, x):
  212. self.f.write(x)
  213. self.f.write("\n")
  214. self.f.flush()
  215. class Collector:
  216. def __init__(self, output_file):
  217. self.results = []
  218. self.output_filename = output_file
  219. self.fo = FileOutput(output_file)
  220. def register_result(self, result):
  221. self.results += [result]
  222. self.fo.log(json.dumps(result))
  223. class SearchDriver:
  224. def __init__(self, central, candidates, collector):
  225. self.executor = central
  226. self.collector = collector
  227. self.candidates = candidates
  228. # results of experiments
  229. self.results = defaultdict(lambda: {})
  230. self.queue = queue.Queue()
  231. self.candidates_lock = Lock()
  232. self.fill_queue_initially()
  233. self.good = defaultdict(lambda: defaultdict(lambda :0))
  234. def add_candidates(self, supplementary):
  235. self.candidates_lock.acquire()
  236. self.candidates += supplementary
  237. self.candidates_lock.release()
  238. def fill_queue_initially(self, max_chain_length=1):
  239. self.candidates_lock.acquire()
  240. xs1 = self.candidates.copy()
  241. self.candidates_lock.release()
  242. xs2 = xs1.copy()
  243. random.shuffle(xs1)
  244. random.shuffle(xs2)
  245. chains = []
  246. for cand1 in xs1:
  247. chains += [ { "chain": [cand1.get("addrport")] } ]
  248. if max_chain_length > 1:
  249. for cand2 in xs2[:1]:
  250. if cand1 != cand2:
  251. chains += [ { "chain": [cand1.get("addrport"), cand2.get("addrport")] } ]
  252. random.shuffle(chains)
  253. for ch in chains:
  254. self.queue.put(ch)
  255. def drive(self, target, ytdl_options=[]):
  256. if self.queue.empty():
  257. self.fill_queue_initially(max_chain_length = 2)
  258. while not self.queue.empty():
  259. dt = datetime.timestamp(datetime.now())
  260. experiment = self.queue.get()
  261. print("attempting %s" % experiment)
  262. result = self.executor.attempt(experiment["chain"], target, ytdl_options)
  263. # unhashable, shmunhashable
  264. self.results["%s" % experiment][dt] = result
  265. self.good["%s" % experiment["chain"]]["%s" % result] += 1
  266. # print("%s" % self.good)
  267. print(result)
  268. result["timestamp"] = dt
  269. result["experiment"] = experiment
  270. self.collector.register_result(result)
  271. if result["success"]:
  272. # TODO upon success, it can be marked as done in session file, for later removal
  273. self.queue.put( experiment )
  274. print ("putting back candidate %s and quitting (for the time being)" % experiment)
  275. return True
  276. # TODO use statistics and data mining to drive the process
  277. else:
  278. # TODO also remove all that contain result["addrportstring_connect_fail_sock"] ip/port.
  279. # depending on what caused the error, may still be good if it's worked before
  280. if result["partial_success"]:
  281. # if not (result["reasonstring_error_download"] or result["timeout"]):
  282. print ("candidate %s not removed because of partial_success" % experiment)
  283. self.queue.put( experiment )
  284. else:
  285. print ("candidate %s removed for now" % experiment)
  286. print("attempt done")
  287. return False # some error must've occurred
  288. # return (good, fail)
  289. class Main:
  290. def __init__(self):
  291. self.executor = Central()
  292. # global variable for cleanup
  293. global central
  294. central = self.executor
  295. # This is totally insecure against other processes ...
  296. self.proxy_queue = posixmq.Queue('/unblockcentral_proxy')
  297. self.target_queue = posixmq.Queue('/unblockcentral_target')
  298. self.proxy_queue_reader_thread = threading.Thread(
  299. target = self.proxy_queue_reader, args=())
  300. result_collector = Collector(output_file = "results.log")
  301. crands = cands
  302. random.shuffle(crands)
  303. self.sd = SearchDriver(self.executor, crands, result_collector)
  304. def proxy_queue_reader():
  305. done = False
  306. while not done:
  307. proxies = self.proxy_queue.get()
  308. print("got %d new proxy entries" % len(proxies))
  309. self.sd.add_candidates(proxies)
  310. def run(self):
  311. done = False
  312. while not done:
  313. x = self.target_queue.get()
  314. print(x)
  315. target = x[1]["target"]
  316. success = False
  317. while not success:
  318. success = self.sd.drive(target, x[1].get("ytdl_options", []))
  319. self.target_queue.close()
  320. self.target_queue.unlink()
  321. def usage():
  322. print ("""
  323. python3 unblockcentral.py [OPTIONS]
  324. Modal OPTIONS (mutually exclusive):
  325. -s SESSION
  326. -V URL
  327. -A URL
  328. If no OPTIONS are given, will spawn a worker
  329. """)
  330. if __name__=='__main__':
  331. signal(SIGINT, handler)
  332. try:
  333. opts, args = getopt.getopt(sys.argv[1:], "P:V:A:s:r:h")
  334. except getopt.GetoptError as err:
  335. print (str(err))
  336. usage()
  337. exit(2)
  338. addProxyMode = False
  339. addTargetMode = False
  340. addTargets = []
  341. session = None
  342. for o, a in opts:
  343. if o == "-h":
  344. usage()
  345. exit(0)
  346. elif o == "-r":
  347. session = open(a, "rb").read()
  348. print("resuming session")
  349. elif o == "-s":
  350. session = open(a, "ab")
  351. print("adding to session")
  352. elif o == "-V":
  353. # addTargets = [{"target":a, "ytdl_options":["-f", "bestvideo[height <=? 720]+bestaudio/best"]}]
  354. print("adding video download")
  355. addTargets = [{"target":a, "ytdl_options":[]}]
  356. addTargetMode = True
  357. elif o == "-A":
  358. print("adding audio download")
  359. addTargets = [{"target":a, "ytdl_options":["-x", "--audio-format", "opus", "--audio-quality", "0"]}]
  360. addTargetMode = True
  361. else:
  362. assert False, ("unhandled option %s" % o)
  363. if addTargetMode:
  364. target_queue = posixmq.Queue('/unblockcentral_target')
  365. for a in addTargets:
  366. target_queue.put([1, a])
  367. else:
  368. m = Main()
  369. m.run()