fake-f5-server.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. #!/usr/bin/env python3
  2. #
  3. # Copyright © 2021 Daniel Lenski
  4. #
  5. # This file is part of openconnect.
  6. #
  7. # This is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public License
  9. # as published by the Free Software Foundation; either version 2.1 of
  10. # the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful, but
  13. # WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public License
  18. # along with this program. If not, see <http://www.gnu.org/licenses/>
  19. ########################################
  20. # This program emulates the authentication-phase behavior of a F5
  21. # server enough to test OpenConnect's authentication behavior against it.
  22. # Specifically, it emulates the following requests:
  23. #
  24. # GET /
  25. # GET /my.policy
  26. # POST /my.policy
  27. #
  28. # It does not actually validate the credentials in any way, but attempts to
  29. # verify their consistency from one request to the next, by saving their
  30. # values via a (cookie-based) session.
  31. ########################################
  32. import sys
  33. import ssl
  34. import random
  35. import base64
  36. import time
  37. from json import dumps
  38. from functools import wraps
  39. from flask import Flask, request, redirect, url_for, make_response, session
  40. host, port, *cert_and_maybe_keyfile = sys.argv[1:]
  41. context = ssl.SSLContext()
  42. context.load_cert_chain(*cert_and_maybe_keyfile)
  43. app = Flask(__name__)
  44. app.config.update(SECRET_KEY=b'fake', DEBUG=True, HOST=host, PORT=int(port), SESSION_COOKIE_NAME='fake')
  45. ########################################
  46. def cookify(jsonable):
  47. return base64.urlsafe_b64encode(dumps(jsonable).encode())
  48. def require_MRHSession(fn):
  49. @wraps(fn)
  50. def wrapped(*args, **kwargs):
  51. if not request.cookies.get('MRHSession'):
  52. session.clear()
  53. return redirect(url_for('get_policy'))
  54. return fn(*args, **kwargs)
  55. return wrapped
  56. def check_form_against_session(*fields, use_query=False):
  57. def inner(fn):
  58. @wraps(fn)
  59. def wrapped(*args, **kwargs):
  60. source = request.args if use_query else request.form
  61. source_name = 'args' if use_query else 'form'
  62. for f in fields:
  63. assert session.get(f) == source.get(f), \
  64. f'at step {session.get("step")}: {source_name} {f!r} {source.get(f)!r} != session {f!r} {session.get(f)!r}'
  65. return fn(*args, **kwargs)
  66. return wrapped
  67. return inner
  68. ########################################
  69. # Respond to initial 'GET /' with a redirect to '/my.policy'
  70. # [Save list of domains/authgroups in the session for use later]
  71. @app.route('/')
  72. def root():
  73. domains, mock_dtls, no_html_login_form = request.args.get('domains'), request.args.get('mock_dtls'), request.args.get('no_html_login_form')
  74. assert not (domains and no_html_login_form), \
  75. f'combination of domains and no_html_login_form is not allow specified'
  76. session.update(step='initial-GET', domains=domains and domains.split(','),
  77. mock_dtls=mock_dtls and bool(mock_dtls),
  78. no_html_login_form = no_html_login_form and bool(no_html_login_form))
  79. # print(session)
  80. return redirect(url_for('get_policy'))
  81. # Respond to 'GET /my.policy with a login form
  82. @app.route('/my.policy')
  83. def get_policy():
  84. session.update(step='GET-login-form')
  85. no_html_login_form = session.get('no_html_login_form')
  86. if no_html_login_form:
  87. return '''<html><body>It would be nice if F5 login pages consistently used actual HTML forms</body></html>'''
  88. domains = session.get('domains')
  89. sel = ''
  90. if domains:
  91. sel = '<select name="domain">%s</select>' % ''.join(
  92. '<option value="%d">%s</option>' % nv for nv in enumerate(domains))
  93. return '''
  94. <html><body><form id="auth_form" method="post">
  95. <input type="text" name="username"/>
  96. <input type="password" name="password"/>
  97. %s</form></body></html>''' % sel
  98. # Respond to 'POST /my.policy with a redirect response containing MRHSession and F5_ST
  99. # cookies (OpenConnect uses the combination of the two to detect successful authentication)
  100. @app.route('/my.policy', methods=['POST'])
  101. def post_policy():
  102. domains = session.get('domains')
  103. if domains:
  104. assert 0 <= int(request.form.get('domain', -1)) < len(domains)
  105. session.update(step='POST-login', username=request.form.get('username'),
  106. credential=request.form.get('password'),
  107. domain=request.form.get('domain'))
  108. # print(session)
  109. resp = redirect(url_for('webtop'))
  110. resp.set_cookie('MRHSession', cookify(dict(session)))
  111. resp.set_cookie('F5_ST', '1z1z1z%dz%d' % (time.time(), 3600))
  112. return resp
  113. @app.route('/vdesk/webtop.eui')
  114. def webtop():
  115. session.update(step='POST-login-webtop')
  116. # print(session)
  117. return 'some junk HTML webtop'
  118. # Respond to 'GET /vdesk/vpn/index.php3?outform=xml&client_version=2.0 with an XML config
  119. # [Save VPN resource name in the session for verification of client state later]
  120. @app.route('/vdesk/vpn/index.php3')
  121. @require_MRHSession
  122. def profile_params():
  123. print(request.args)
  124. assert request.args.get('outform') == 'xml' and request.args.get('client_version') == '2.0'
  125. vpn_name = 'demo%d_vpn_resource' % random.randint(1, 100)
  126. session.update(step='GET-profile-params', resourcename='/Common/'+vpn_name)
  127. # print(session)
  128. return (f'''
  129. <?xml version="1.0" encoding="utf-8"?>
  130. <favorites type="VPN" limited="YES">
  131. <favorite id="/Common/{vpn_name}">
  132. <caption>{vpn_name}</caption>
  133. <name>/Common/{vpn_name}</name>
  134. <params>resourcename=/Common/{vpn_name}</params>
  135. </favorite>
  136. </favorites>''',
  137. {'content-type': 'application/xml'})
  138. # Respond to 'GET /vdesk/vpn/connect.php3?outform=xml&client_version=2.0&resourcename=RESOURCENAME
  139. # with an ugliest-XML-you've-ever-seen config.
  140. # [Save random HDLC flag and ur_Z for verification later.]
  141. @app.route('/vdesk/vpn/connect.php3')
  142. @require_MRHSession
  143. @check_form_against_session('resourcename', use_query=True)
  144. def options():
  145. assert request.args.get('outform') == 'xml' and request.args.get('client_version') == '2.0'
  146. session.update(hdlc_framing=['no', 'yes'][random.randint(0, 1)],
  147. Z=session['resourcename'] + str(random.randint(1, 100)),
  148. ipv4='yes', ipv6=['no', 'yes'][random.randint(0, 1)],
  149. sess=request.cookies['MRHSession'] + str(random.randint(1, 100)))
  150. return (f'''
  151. <?xml version="1.0" encoding="UTF-8" ?><favorite>
  152. <object ID="ur_Host" CLASSID="CLSID:CC85ACDF-B277-486F-8C70-2C9B2ED2A4E7"
  153. CODEBASE="https://{app.config['HOST']}:{app.config['PORT']}/vdesk/terminal/urxshost.cab"
  154. WIDTH="320" HEIGHT="240">
  155. <ur_Z>{session['Z']}</ur_Z>
  156. <Session_ID>{session['sess']}</Session_ID>
  157. <ur_name>{session['resourcename']}</ur_name>
  158. <host0>{app.config['HOST']}</host0>
  159. <port0>{app.config['PORT']}</port0>
  160. <tunnel_host0>{app.config['HOST']}</tunnel_host0>
  161. <tunnel_port0>{app.config['PORT']}</tunnel_port0>
  162. <tunnel_protocol0>https</tunnel_protocol0>
  163. <idle_session_timeout>900</idle_session_timeout>
  164. <IPV4_0>{int(session['ipv4']=='yes')}</IPV4_0>
  165. <IPV6_0>{int(session['ipv6']=='yes')}</IPV6_0>
  166. <tunnel_dtls>{int(session['mock_dtls'] or 0)}</tunnel_dtls>
  167. <tunnel_port_dtls>{app.config['PORT']}</tunnel_port_dtls>
  168. <DNS0>1.1.1.1</DNS0>
  169. <DNS1>8.8.8.8</DNS1>
  170. <DNS6_0>2606:4700:4700::1111</DNS6_0>
  171. <DNS6_1>2001:4860:4860::8888</DNS6_1>
  172. <WINS0></WINS0>
  173. <DNSSuffix0>foo.com</DNSSuffix0>
  174. <SplitTunneling0>2</SplitTunneling0>
  175. <LAN0>10.11.10.10/32 10.11.1.0/24</LAN0>
  176. <LAN6_0>::/1 8000::/1</LAN6_0>
  177. <DNS_SPLIT0>*</DNS_SPLIT0>
  178. <hdlc_framing>{session['hdlc_framing']}</hdlc_framing>
  179. </object>
  180. </favorite>''',
  181. {'content-type': 'application/xml'})
  182. # Respond to faux-CONNECT 'GET /myvpn' with 504 Gateway Timeout
  183. # (what the real F5 server responds with when it doesn't like the parameters, intended
  184. # to trigger "cookie rejected" error in OpenConnect)
  185. @app.route('/myvpn')
  186. # Can't use because OpenConnect doesn't send cookies here (see f5.c for why)
  187. # @check_form_against_session('sess', 'hdlc_framing', 'ipv4', 'ipv6', 'Z', use_query=True)
  188. def tunnel():
  189. try:
  190. base64.urlsafe_b64decode(request.args.get('hostname') or None)
  191. except (ValueError, TypeError):
  192. raise AssertionError('Hostname is not a base64 string')
  193. return make_response('', 504, {'X-VPN-Client-IP': '10.11.1.2', 'X-VPN-Client-IPv6': '2601::f00f:1234'})
  194. # Respond to 'GET /remote/logout' by clearing session and MRHSession
  195. @app.route('/vdesk/hangup.php3')
  196. @require_MRHSession
  197. def logout():
  198. assert request.args.get('hangup_error') == '1'
  199. session.clear()
  200. resp = make_response('successful logout')
  201. resp.set_cookie('MRHSession', '')
  202. return resp
  203. app.run(host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG'],
  204. ssl_context=context, use_debugger=False)