123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- #!/usr/bin/python3
- # Why?
- # - A firm belief in offline first!!!!!
- # - No reason to trust youtube. Please if you ever want to publish anything don't go to the largest platform.
- # remark: you can start as many workers as you want
- # unfortunately, state is not persisted within the program but youtube-dl is smart enough to resume downloads
- import subprocess
- from datetime import datetime
- import re
- import sys
- import queue
- from collections import defaultdict
- import threading
- from threading import Thread, Lock
- from ipcqueue import posixmq
- import getopt
- import random
- import json
- from signal import signal, SIGINT
- from sys import exit
- import pathlib
- # TODO compute and save scores ; output an entry for each experiment, to put in dataframe
- central = None
- def cleanup(central):
- print('%s' % central)
- if central:
- print('unlinking config files')
- for filename in central.cfnfiles:
- print('%s' % filename)
- pathlib.Path(filename).unlink()
- def handler(signal_received, frame):
- # Handle any cleanup here
- print('Interrupted. Exiting gracefully')
- global central
- cleanup(central)
- exit(0)
- # cands = [
- # #############
- # # EDIT HERE #
- # #############
- # # (put some actual SOCKS5 proxies here)
- # # (random addresses for illustrative purposes, not suggesting to use these. please go find a reasonable, up to date and legal list of proxies to use)
- # {"addrport": ("101.99.12.105", "1080")},
- # {"addrport": ("47.75.160.210", "1080"), "location":"us"},
- # {"addrport": ("98.188.47.150", "4145")},
- # ]
- cands = json.loads(open("cands.json", "rb").read())
- # open("cands.json", "wb").write(json.dumps(cands).encode('utf-8'))
- import base64
- def serstr(x):
- """
- make stuff json serializable
- """
- if x == None:
- return {"data": None}
- try:
- y = x.decode("utf-8")
- return {"encoding": "utf-8", "data": y}
- except UnicodeError as e:
- return {"encoding": "base64", "data": base64.b64encode(x)}
- except AttributeError as e:
- # how to check if it is already a utf8 string?
- return {"encoding": "utf-8", "data": x}
- class Central:
- def __init__(self):
- dt = datetime.timestamp(datetime.now())
- self.logfilename = "log-x-%s" % dt
- self.log = open(self.logfilename, "w")
- self.cfnfiles = []
- def attempt(self, chain, ytlink, ytdl_options=[]):
- '''
- Attempt a download
- '''
- dt = datetime.timestamp(datetime.now())
- cfn = "pc-xy-%s.conf" % dt
- # file should be temporary
- self.writeconfig(cfn, chain)
- dic = self.doit(cfn, ytlink, ytdl_options)
- return dic
- def doit(self, cfn, ytlink, ytdl_options):
- cmd = ["pc", "-f", cfn, "youtube-dl"] + ytdl_options + [ytlink]
- self.log.write("%s" % cmd)
- self.log.flush()
- cp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- output = b""
- # https://stackoverflow.com/questions/19880190/interactive-input-output-using-python
- while (True):
- data = cp.stdout.read(1)
- if not data:
- break
- output += data
- blah = ""
- try:
- blah = data.decode("utf-8")
- except Exception:
- pass
- sys.stdout.write(blah) # yeah, thanks a lot. yes it's correct to make the distinction obviously between bytes and text but why can't simply put ascii on sys.stdout?
- sys.stdout.flush()
- cp.wait()
- print("return code: %d" % cp.returncode)
- self.log.write("%s" % output)
- self.log.flush()
- connect_fail = True
- partial_success = False
- addrportstring_connect_fail_sock = None
- reasonstring_unable_generic = None
- reasonstring_error_download = None
- timeout = False
- e429 = False
- e403 = False
- # this is the gold nugget we're panning for
- if re.search(b"\[download\]\s+\d+.\d+%\s+of\s+~[^\s]+\s+at\s+[^\s]+\s+ETA", output):
- partial_success = True
- if re.search(b"... OK\n", output):
- print("Connection chain succeeded")
- connect_fail = False
- st = re.search(b"([^ ]+) <--socket error or timeout!", output)
- if st:
- print("Socket error or timeout")
- addrportstring_connect_fail_sock = st.group(1)
- if re.search(b"Unable to download webpage: HTTP Error 429", output):
- print("HTTP Error 429")
- e429 = True
- st = re.search(b"ERROR: [Uu]nable to download webpage: <urlopen error \[Errno (\d+)\] (.*)>", output)
- if st:
- # 0 = ?
- # 111 = connection refused
- print("Error %s %s" % (st.group(1), st.group(2)))
- reasonstring_unable_generic = st.group(0)
- elif re.search(b"ERROR: [Uu]nable to download webpage: Remote end closed connection without", output):
- print("Connection closed without reason")
- reasonstring_error_download = "Connection closed without reason"
- reasonstring_unable_generic = "Connection closed without reason"
- elif re.search(b"ERROR: [Uu]nable to download webpage: The read operation timed out", output):
- print("Timeout")
- timeout = True
-
- elif re.search(b"ERROR: [Uu]nable to download webpage: <urlopen error .*: The handshake operation timed out>", output):
- print("Timeout")
- timeout = True
- # "webpage" or "video data"
- st1 = re.search(b"ERROR: [Uu]nable to download .*: HTTP Error 403: Forbidden", output)
- st2 = re.search(b"ERROR: [Uu]nable to download .*: <urlopen error \[Errno (\d+)\] (.*)>", output)
- st3 = re.search(b"ERROR: [Uu]nable to download .*: <urlopen error \[SSL: ([A-Z_]+)\] (.*)>", output)
- # IncompleteRead ...
- st4 = re.search(b"ERROR: [Uu]nable to download .*: (.*)", output)
- st_generic = re.search(b"^ERROR:(.*)", output)
- if st1:
- print("Error 403")
- e403 = True
- elif st2:
- print("Error %s %s during download" % (st2.group(1), st2.group(2)))
- reasonstring_error_download = st2.group(0)
- reasonstring_unable_generic = st2.group(0)
- elif st3:
- print("Error %s %s during download" % (st3.group(1), st3.group(2)))
- reasonstring_error_download = st3.group(0)
- reasonstring_unable_generic = st3.group(0)
- elif st4:
- print("Error %s during download" % (st4.group(1)))
- reasonstring_error_download = st4.group(0)
- reasonstring_unable_generic = st4.group(0)
- elif st_generic:
- print("Error %s during download" % st_generic.group(1))
- reasonstring_error_download = st4.group(0)
- reasonstring_unable_generic = st4.group(0)
- success = (addrportstring_connect_fail_sock == None) and (reasonstring_error_download == None) and (reasonstring_unable_generic == None) and not (connect_fail or e429 or e403 or reasonstring_unable_generic or timeout)
- dic = {"success": success,
- "partial_success": partial_success,
- "timeout": timeout,
- "connect_fail": connect_fail,
- "addrportstring_connect_fail_sock": serstr(addrportstring_connect_fail_sock),
- "e403": e403,
- "e429": e429,
- "unable": serstr(reasonstring_unable_generic),
- "reasonstring_error_download": serstr(reasonstring_error_download)}
- return dic
- def writeconfig(self, filename, chain, tcp_read_timeout = 32000, tcp_connect_timeout = 24000):
- conf = open(filename, 'w')
- if conf == None:
- print("error opening %s for writing config" % filename)
- return
- conf.write('strict_chain\n')
- conf.write('proxy_dns\n')
- conf.write('remote_dns_subnet 224\n')
- # timeouts should be settable and recorded in experiment
- conf.write('tcp_read_time_out %d\n' % tcp_read_timeout)
- conf.write('tcp_connect_time_out %d\n' % tcp_connect_timeout)
- conf.write('localnet 127.0.0.0/255.0.0.0\n')
- conf.write('\n')
- conf.write('[ProxyList]\n')
- conf.write('socks5 127.0.0.1 9050\n')
- for c in chain:
- conf.write('socks5 %s %s\n' % (c[0], c[1]))
- conf.close()
- self.cfnfiles += [filename]
- from pathlib import Path
- # use pipes ...
- class FileOutput:
- def __init__(self, filename):
- myPath = Path(filename)
- if myPath.exists():
- if myPath.is_file(): # pipes?
- print("%s already exists, trying to append" % filename)
- else:
- # and os.access(PATH, os.R_OK):
- print("%s already exists and is not a file" % filename)
- return None
- # return False
- # not "ab" for goddamn json
- self.f = open(filename, "a")
- def log(self, x):
- self.f.write(x)
- self.f.write("\n")
- self.f.flush()
- class Collector:
- def __init__(self, output_file):
- self.results = []
- self.output_filename = output_file
- self.fo = FileOutput(output_file)
- def register_result(self, result):
- self.results += [result]
- self.fo.log(json.dumps(result))
- class SearchDriver:
- def __init__(self, central, candidates, collector):
- self.executor = central
- self.collector = collector
- self.candidates = candidates
- # results of experiments
- self.results = defaultdict(lambda: {})
- self.queue = queue.Queue()
- self.candidates_lock = Lock()
- self.fill_queue_initially()
- self.good = defaultdict(lambda: defaultdict(lambda :0))
- def add_candidates(self, supplementary):
- self.candidates_lock.acquire()
- self.candidates += supplementary
- self.candidates_lock.release()
- def fill_queue_initially(self, max_chain_length=1):
- self.candidates_lock.acquire()
- xs1 = self.candidates.copy()
- self.candidates_lock.release()
- xs2 = xs1.copy()
- random.shuffle(xs1)
- random.shuffle(xs2)
- chains = []
- for cand1 in xs1:
- chains += [ { "chain": [cand1.get("addrport")] } ]
- if max_chain_length > 1:
- for cand2 in xs2[:1]:
- if cand1 != cand2:
- chains += [ { "chain": [cand1.get("addrport"), cand2.get("addrport")] } ]
- random.shuffle(chains)
- for ch in chains:
- self.queue.put(ch)
- def drive(self, target, ytdl_options=[]):
- if self.queue.empty():
- self.fill_queue_initially(max_chain_length = 2)
- while not self.queue.empty():
- dt = datetime.timestamp(datetime.now())
- experiment = self.queue.get()
- print("attempting %s" % experiment)
- result = self.executor.attempt(experiment["chain"], target, ytdl_options)
- # unhashable, shmunhashable
- self.results["%s" % experiment][dt] = result
- self.good["%s" % experiment["chain"]]["%s" % result] += 1
- # print("%s" % self.good)
- print(result)
- result["timestamp"] = dt
- result["experiment"] = experiment
- self.collector.register_result(result)
- if result["success"]:
- # TODO upon success, it can be marked as done in session file, for later removal
- self.queue.put( experiment )
- print ("putting back candidate %s and quitting (for the time being)" % experiment)
- return True
- # TODO use statistics and data mining to drive the process
- else:
- # TODO also remove all that contain result["addrportstring_connect_fail_sock"] ip/port.
- # depending on what caused the error, may still be good if it's worked before
- if result["partial_success"]:
- # if not (result["reasonstring_error_download"] or result["timeout"]):
- print ("candidate %s not removed because of partial_success" % experiment)
- self.queue.put( experiment )
- else:
- print ("candidate %s removed for now" % experiment)
-
- print("attempt done")
- return False # some error must've occurred
- # return (good, fail)
- class Main:
- def __init__(self):
- self.executor = Central()
- # global variable for cleanup
- global central
- central = self.executor
- # This is totally insecure against other processes ...
- self.proxy_queue = posixmq.Queue('/unblockcentral_proxy')
- self.target_queue = posixmq.Queue('/unblockcentral_target')
- self.proxy_queue_reader_thread = threading.Thread(
- target = self.proxy_queue_reader, args=())
- result_collector = Collector(output_file = "results.log")
- crands = cands
- random.shuffle(crands)
- self.sd = SearchDriver(self.executor, crands, result_collector)
- def proxy_queue_reader():
- done = False
- while not done:
- proxies = self.proxy_queue.get()
- print("got %d new proxy entries" % len(proxies))
- self.sd.add_candidates(proxies)
- def run(self):
- done = False
- while not done:
- x = self.target_queue.get()
- print(x)
-
- target = x[1]["target"]
- success = False
- while not success:
- success = self.sd.drive(target, x[1].get("ytdl_options", []))
- self.target_queue.close()
- self.target_queue.unlink()
-
- def usage():
- print ("""
- python3 unblockcentral.py [OPTIONS]
- Modal OPTIONS (mutually exclusive):
- -s SESSION
- -V URL
- -A URL
- If no OPTIONS are given, will spawn a worker
- """)
- if __name__=='__main__':
- signal(SIGINT, handler)
- try:
- opts, args = getopt.getopt(sys.argv[1:], "P:V:A:s:r:h")
- except getopt.GetoptError as err:
- print (str(err))
- usage()
- exit(2)
- addProxyMode = False
- addTargetMode = False
- addTargets = []
- session = None
-
- for o, a in opts:
- if o == "-h":
- usage()
- exit(0)
- elif o == "-r":
- session = open(a, "rb").read()
- print("resuming session")
- elif o == "-s":
- session = open(a, "ab")
- print("adding to session")
- elif o == "-V":
- # addTargets = [{"target":a, "ytdl_options":["-f", "bestvideo[height <=? 720]+bestaudio/best"]}]
- print("adding video download")
- addTargets = [{"target":a, "ytdl_options":[]}]
- addTargetMode = True
- elif o == "-A":
- print("adding audio download")
- addTargets = [{"target":a, "ytdl_options":["-x", "--audio-format", "opus", "--audio-quality", "0"]}]
- addTargetMode = True
- else:
- assert False, ("unhandled option %s" % o)
- if addTargetMode:
- target_queue = posixmq.Queue('/unblockcentral_target')
- for a in addTargets:
- target_queue.put([1, a])
- else:
- m = Main()
- m.run()
|