12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/python3
- import subprocess
- import re
- import lxml
- from lxml import html
- import urllib.request
- import urllib3
- import logging
- import subprocess
- headers = {}
- headers['User-Agent'] = 'Mozilla/5.0'
- pool = urllib3.PoolManager()
- defaultdummy='google.com'
- class ConnectStateExplorer:
- def __init__(self):
- self.dummy = defaultdummy
- def explore_init(self):
- logging.info("explore: init")
- (r,s,h,redir) = self.get(url=self.dummy, headers=headers)
- parsed_redir = urllib.parse.urlparse(redir)
- parsed_dummy = urllib.parse.urlparse(self.dummy)
- redirected = parsed_redir.netloc != parsed_dummy.netloc
- logging.info("code %s" % s)
- logging.info("was redirected? %s" % redirected)
- logging.info("connected? %s" % self.test_connection_ping())
- x = self.find_html_form_elems(h)
- if not isinstance(x, tuple) or not x[0] == "ok":
- logging.info("couldn't interpret / find suitable form in redirected html")
- logging.debug(h)
- formaction = x[1]
- fields = x[2]
- if urllib.parse.urlparse(formaction).netloc == '':
- formaction = "https://" + parsed_redir.netloc + "/" + re.sub("^/", "", formaction)
- self.post_form(url=formaction, headers=headers, fields=fields)
- logging.info("connected? %s" % self.test_connection_ping())
- def find_html_form_elems(self, text):
- try:
- hdoc = html.document_fromstring(text)
- inputs = hdoc.xpath('//input')
- kvs = [(elt.attrib.get('name'),
- elt.attrib.get('value')) for elt in inputs if elt.attrib.get('type') == 'hidden']
- chk = [(elt.attrib.get('name'), "checked") for elt in inputs if elt.attrib.get('type') == 'checkbox']
- forms = hdoc.xpath('//form')
- if len(forms) == 0:
- return False
- action = forms[0].attrib.get('action')
- fields = dict(kvs)
- if len(chk) > 0:
- fields[chk[0][0]] = chk[0][1]
- return ("ok", action, fields)
- except Exception as e:
- logging.info("Exception interpreting HTML, %s" % e)
- return False
- def post_form(self, **kwargs):
- logging.info("requesting %s" % kwargs)
- f3 = pool.request_encode_body(method='POST', **kwargs)
- # print(f3.getheaders())
- # print(f3.data)
- def get(self, **kwargs):
- logging.info("requesting %s" % kwargs)
- r = pool.request(method="GET", **kwargs)
- s = r.status
- h = r.data
- redir = r.geturl()
- return (r, s, h, redir)
- def test_connection_ping(self, destination="google.com"):
- retval = subprocess.call(["ping", destination, "-c", "1", "-W", "2"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- return retval == 0
- def try_dummy(self):
- (r, s, h) = self.get(method='GET', url=dummy, headers=headers)
- d = html.document_fromstring(h)
- self.dummyresponse = r
-
- if __name__=="__main__":
- logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
- cse = ConnectStateExplorer()
- cse.explore_init()
|