linker.php 7.7 KB


  1. <?php
  2. require __DIR__ . '/vendor/autoload.php';
  3. define('DOCUMENT_ROOT', dirname(__FILE__));
  4. gc_enable();
  5. use Movim\Bootstrap;
  6. use Movim\RPC;
  7. use Movim\Session;
  8. $bootstrap = new Bootstrap;
  9. $bootstrap->boot();
  10. $loop = React\EventLoop\Factory::create();
  11. $connector = new React\Socket\TcpConnector($loop);
  12. $stdin = new React\Stream\ReadableResourceStream(STDIN, $loop);
  13. // We load and register all the widgets
  14. $wrapper = \Movim\Widget\Wrapper::getInstance();
  15. $wrapper->registerAll($bootstrap->getWidgets());
  16. $conn = null;
  17. $parser = new \Moxl\Parser(function ($node) {
  18. \Moxl\Xec\Handler::handle($node);
  19. });
  20. $buffer = '';
  21. $timestamp = time();
  22. function handleSSLErrors($errno, $errstr) {
  23. fwrite(
  24. STDERR,
  25. colorize(getenv('sid'), 'yellow').
  26. " : ".colorize($errno, 'red').
  27. " ".
  28. colorize($errstr, 'red').
  29. "\n"
  30. );
  31. }
  32. // Temporary linker killer
  33. $loop->addPeriodicTimer(5, function() use(&$conn, &$timestamp) {
  34. if($timestamp < time() - 3600*4
  35. && isset($conn)) {
  36. $conn->close();
  37. }
  38. });
  39. $zmq = new \React\ZMQ\Context($loop, new \ZMQContext(2, false));
  40. $file = CACHE_PATH . 'movim_feeds_' . getenv('sid') . '.ipc';
  41. $pullSocket = $zmq->getSocket(ZMQ::SOCKET_PUSH);
  42. $pullSocket->getWrappedSocket()->setSockOpt(\ZMQ::SOCKOPT_LINGER, 0);
  43. $pullSocket->connect('ipc://' . $file . '_pull', true);
  44. $pushSocket = $zmq->getSocket(ZMQ::SOCKET_PULL);
  45. $pushSocket->getWrappedSocket()->setSockOpt(\ZMQ::SOCKOPT_LINGER, 0);
  46. $pushSocket->connect('ipc://'.$file . '_push', true);
  47. function writeOut($msg = null)
  48. {
  49. global $pullSocket;
  50. if(!empty($msg)) {
  51. $pullSocket->send(json_encode($msg));
  52. }
  53. }
  54. function writeXMPP($xml)
  55. {
  56. global $conn;
  57. if(!empty($xml) && $conn) {
  58. $conn->write(trim($xml));
  59. if(getenv('debug')) {
  60. fwrite(STDERR, colorize(trim($xml), 'yellow')." : ".colorize('sent to XMPP', 'green')."\n");
  61. }
  62. }
  63. }
  64. function shutdown()
  65. {
  66. global $pullSocket;
  67. global $pushSocket;
  68. global $loop;
  69. $pullSocket->close();
  70. $pushSocket->close();
  71. $loop->stop();
  72. }
  73. $pushSocketBehaviour = function ($msg) use (&$conn, $loop, &$buffer, &$connector, &$xmppBehaviour)
  74. {
  75. global $pullSocket;
  76. $msg = json_decode($msg);
  77. if(isset($msg)) {
  78. switch ($msg->func) {
  79. case 'message':
  80. (new RPC)->handleJSON($msg->body);
  81. break;
  82. case 'ping':
  83. // And we say that we are ready !
  84. $obj = new \StdClass;
  85. $obj->func = 'pong';
  86. $pullSocket->send(json_encode($obj));
  87. break;
  88. case 'down':
  89. if(isset($conn)
  90. && is_resource($conn->stream)) {
  91. $evt = new Movim\Widget\Event;
  92. $evt->run('session_down');
  93. }
  94. break;
  95. case 'up':
  96. if(isset($conn)
  97. && is_resource($conn->stream)) {
  98. $evt = new Movim\Widget\Event;
  99. $evt->run('session_up');
  100. }
  101. break;
  102. case 'unregister':
  103. \Moxl\Stanza\Stream::end();
  104. if(isset($conn)) $conn->close();
  105. shutdown();
  106. break;
  107. case 'register':
  108. $cd = new \Modl\ConfigDAO;
  109. $config = $cd->get();
  110. $port = 5222;
  111. $dns = \Moxl\Utils::resolveHost($msg->host);
  112. if(isset($dns->target) && $dns->target != null) $msg->host = $dns->target;
  113. if(isset($dns->port) && $dns->port != null) $port = $dns->port;
  114. $ip = \Moxl\Utils::resolveIp($msg->host);
  115. $ip = (!$ip || !isset($ip->address)) ? gethostbyname($msg->host) : $ip->address;
  116. if(getenv('verbose')) {
  117. fwrite(
  118. STDERR,
  119. colorize(
  120. getenv('sid'), 'yellow')." : ".
  121. colorize('Connection to '.$msg->host.' ('.$ip.')', 'blue').
  122. "\n");
  123. }
  124. $connector->connect('['.$ip.']:'.$port)->then($xmppBehaviour);
  125. break;
  126. }
  127. } else {
  128. return;
  129. }
  130. };
  131. $xmppBehaviour = function (React\Socket\Connection $stream) use (&$conn, $loop, &$stdin, $pushSocketBehaviour, $parser, &$timestamp)
  132. {
  133. global $pullSocket;
  134. $conn = $stream;
  135. if(getenv('verbose')) {
  136. fwrite(STDERR, colorize(getenv('sid'), 'yellow')." : ".colorize('linker launched', 'blue')."\n");
  137. fwrite(STDERR, colorize(getenv('sid'), 'yellow')." launched : ".\sizeToCleanSize(memory_get_usage())."\n");
  138. }
  139. // We define a huge buffer to prevent issues with SSL streams, see https://bugs.php.net/bug.php?id=65137
  140. $conn->on('data', function($message) use (&$conn, $loop, $parser, &$timestamp) {
  141. if(!empty($message)) {
  142. $restart = false;
  143. if(getenv('debug')) {
  144. fwrite(STDERR, colorize($message, 'yellow')." : ".colorize('received', 'green')."\n");
  145. }
  146. if($message == '</stream:stream>') {
  147. $conn->close();
  148. shutdown();
  149. } elseif($message == "<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"
  150. || $message == '<proceed xmlns="urn:ietf:params:xml:ns:xmpp-tls"/>') {
  151. $session = Session::start();
  152. stream_set_blocking($conn->stream, 1);
  153. stream_context_set_option($conn->stream, 'ssl', 'SNI_enabled', false);
  154. stream_context_set_option($conn->stream, 'ssl', 'peer_name', $session->get('host'));
  155. stream_context_set_option($conn->stream, 'ssl', 'allow_self_signed', true);
  156. // See http://php.net/manual/en/function.stream-socket-enable-crypto.php#119122
  157. $crypto_method = STREAM_CRYPTO_METHOD_TLS_CLIENT;
  158. if (defined('STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT')) {
  159. $crypto_method |= STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT;
  160. $crypto_method |= STREAM_CRYPTO_METHOD_TLSv1_1_CLIENT;
  161. }
  162. set_error_handler('handleSSLErrors');
  163. $out = stream_socket_enable_crypto($conn->stream, 1, $crypto_method);
  164. restore_error_handler();
  165. if($out !== true) {
  166. $evt = new Movim\Widget\Event;
  167. $evt->run('ssl_error');
  168. shutdown();
  169. return;
  170. }
  171. if(getenv('verbose')) {
  172. fwrite(STDERR, colorize(getenv('sid'), 'yellow')." : ".colorize('TLS enabled', 'blue')."\n");
  173. }
  174. $restart = true;
  175. }
  176. $timestamp = time();
  177. if($restart) {
  178. $session = Session::start();
  179. \Moxl\Stanza\Stream::init($session->get('host'));
  180. stream_set_blocking($conn->stream, 0);
  181. $restart = false;
  182. }
  183. if(!$parser->parse($message)) {
  184. fwrite(STDERR, colorize(getenv('sid'), 'yellow')." ".$parser->getError()."\n");
  185. }
  186. }
  187. });
  188. $conn->on('error', function() use ($conn, $loop) {
  189. shutdown();
  190. });
  191. $conn->on('close', function() use ($conn, $loop) {
  192. shutdown();
  193. });
  194. // And we say that we are ready !
  195. $obj = new \StdClass;
  196. $obj->func = 'registered';
  197. fwrite(STDERR, 'registered');
  198. $pullSocket->send(json_encode($obj));
  199. };
  200. $pushSocket->on('message', $pushSocketBehaviour);
  201. $stdin->on('error', function() use($loop) { shutdown(); } );
  202. $stdin->on('close', function() use($loop) { shutdown(); } );
  203. $loop->run();