twitterdaemon.php 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. #!/usr/bin/env php
  2. <?php
  3. // This file is part of GNU social - https://www.gnu.org/software/social
  4. //
  5. // GNU social is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // GNU social is distributed in the hope that it will be useful,
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with GNU social. If not, see <http://www.gnu.org/licenses/>.
  17. /**
  18. * @copyright 2008-2010 StatusNet, Inc
  19. * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
  20. */
  21. define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..'));
  22. define('PUBLICDIR', INSTALLDIR . DIRECTORY_SEPARATOR . 'public');
  23. $shortoptions = 'fi::a';
  24. $longoptions = array('id::', 'foreground', 'all');
  25. $helptext = <<<END_OF_TWITTERDAEMON_HELP
  26. Daemon script for receiving new notices from Twitter users.
  27. -i --id Identity (default none)
  28. -a --all Handle Twitter for all local sites
  29. (requires Stomp queue handler, status_network setup)
  30. -f --foreground Stay in the foreground (default background)
  31. END_OF_TWITTERDAEMON_HELP;
  32. require_once INSTALLDIR.'/scripts/commandline.inc';
  33. class TwitterDaemon extends SpawningDaemon
  34. {
  35. protected $allsites = false;
  36. public function __construct($id = null, $daemonize = true, $threads = 1, $allsites = false)
  37. {
  38. if ($threads != 1) {
  39. // This should never happen. :)
  40. throw new Exception("TwitterDaemon must run single-threaded");
  41. }
  42. parent::__construct($id, $daemonize, $threads);
  43. $this->allsites = $allsites;
  44. }
  45. public function runThread()
  46. {
  47. common_log(LOG_INFO, 'Waiting to listen to Twitter and queues');
  48. $master = new TwitterMaster($this->get_id(), $this->processManager());
  49. $master->init($this->allsites);
  50. $master->service();
  51. common_log(LOG_INFO, 'terminating normally');
  52. return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
  53. }
  54. }
  55. class TwitterMaster extends IoMaster
  56. {
  57. protected $processManager;
  58. public function __construct($id, $processManager)
  59. {
  60. parent::__construct($id);
  61. $this->processManager = $processManager;
  62. }
  63. /**
  64. * Initialize IoManagers for the currently configured site
  65. * which are appropriate to this instance.
  66. */
  67. public function initManagers()
  68. {
  69. $qm = QueueManager::get();
  70. $qm->setActiveGroup('twitter');
  71. $this->instantiate($qm);
  72. $this->instantiate(new TwitterManager());
  73. $this->instantiate($this->processManager);
  74. }
  75. }
  76. class TwitterManager extends IoManager
  77. {
  78. // Recommended resource limits from http://dev.twitter.com/pages/site_streams
  79. const MAX_STREAMS = 1000;
  80. const USERS_PER_STREAM = 100;
  81. const STREAMS_PER_SECOND = 20;
  82. protected $streams;
  83. protected $users;
  84. /**
  85. * Pull the site's active Twitter-importing users and start spawning
  86. * some data streams for them!
  87. *
  88. * @fixme check their last-id and check whether we'll need to do a manual pull.
  89. * @fixme abstract out the fetching so we can work over multiple sites.
  90. */
  91. protected function initStreams()
  92. {
  93. common_log(LOG_INFO, 'init...');
  94. // Pull Twitter user IDs for all users we want to pull data for
  95. $flink = new Foreign_link();
  96. $flink->service = TWITTER_SERVICE;
  97. // @fixme probably should do the bitfield check in a whereAdd but it's ugly :D
  98. $flink->find();
  99. $userIds = array();
  100. while ($flink->fetch()) {
  101. if (($flink->noticesync & FOREIGN_NOTICE_RECV) ==
  102. FOREIGN_NOTICE_RECV) {
  103. $userIds[] = $flink->foreign_id;
  104. if (count($userIds) >= self::USERS_PER_STREAM) {
  105. $this->spawnStream($userIds);
  106. $userIds = array();
  107. }
  108. }
  109. }
  110. if (count($userIds)) {
  111. $this->spawnStream($userIds);
  112. }
  113. }
  114. /**
  115. * Prepare a Site Stream connection for the given chunk of users.
  116. * The actual connection will be opened later.
  117. *
  118. * @param $userIds array of Twitter-side user IDs
  119. */
  120. protected function spawnStream($userIds)
  121. {
  122. $stream = $this->initSiteStream();
  123. $stream->followUsers($userIds);
  124. // Slip the stream reader into our list of active streams.
  125. // We'll manage its actual connection on the next go-around.
  126. $this->streams[] = $stream;
  127. // Record the user->stream mappings; this makes it easier for us to know
  128. // later if we need to kill something.
  129. foreach ($userIds as $id) {
  130. $this->users[$id] = $stream;
  131. }
  132. }
  133. /**
  134. * Initialize a generic site streams connection object.
  135. * All our connections will look like this, then we'll add users to them.
  136. *
  137. * @return TwitterStreamReader
  138. */
  139. protected function initSiteStream()
  140. {
  141. $auth = $this->siteStreamAuth();
  142. $stream = new TwitterSiteStream($auth);
  143. // Add our event handler callbacks. Whee!
  144. $this->setupEvents($stream);
  145. return $stream;
  146. }
  147. /**
  148. * Fetch the Twitter OAuth credentials to use to connect to the Site Streams API.
  149. *
  150. * This will use the locally-stored credentials for the applictation's owner account
  151. * from the site configuration. These should be configured through the administration
  152. * panels or manually in the config file.
  153. *
  154. * Will throw an exception if no credentials can be found -- but beware that invalid
  155. * credentials won't cause breakage until later.
  156. *
  157. * @return TwitterOAuthClient
  158. */
  159. protected function siteStreamAuth()
  160. {
  161. $token = common_config('twitter', 'stream_token');
  162. $secret = common_config('twitter', 'stream_secret');
  163. if (empty($token) || empty($secret)) {
  164. throw new ServerException('Twitter site streams have not been correctly configured. Configure the app owner account via the admin panel.');
  165. }
  166. return new TwitterOAuthClient($token, $secret);
  167. }
  168. /**
  169. * Collect the sockets for all active connections for i/o monitoring.
  170. *
  171. * @return array of resources
  172. */
  173. public function getSockets()
  174. {
  175. $sockets = array();
  176. foreach ($this->streams as $stream) {
  177. foreach ($stream->getSockets() as $socket) {
  178. $sockets[] = $socket;
  179. }
  180. }
  181. return $sockets;
  182. }
  183. /**
  184. * We're ready to process input from one of our data sources! Woooooo!
  185. * @fixme is there an easier way to map from socket back to owning plugin? :(
  186. *
  187. * @param resource $socket
  188. * @return boolean success
  189. */
  190. public function handleInput($socket)
  191. {
  192. foreach ($this->streams as $stream) {
  193. foreach ($stream->getSockets() as $aSocket) {
  194. if ($socket === $aSocket) {
  195. $stream->handleInput($socket);
  196. }
  197. }
  198. }
  199. return true;
  200. }
  201. /**
  202. * Start the i/o system up! Prepare our connections and start opening them.
  203. *
  204. * @fixme do some rate-limiting on the stream setup
  205. * @fixme do some sensible backoff on failure etc
  206. */
  207. public function start()
  208. {
  209. $this->initStreams();
  210. foreach ($this->streams as $stream) {
  211. $stream->connect();
  212. }
  213. return true;
  214. }
  215. /**
  216. * Close down our connections when the daemon wraps up for business.
  217. */
  218. public function finish()
  219. {
  220. foreach ($this->streams as $index => $stream) {
  221. $stream->close();
  222. unset($this->streams[$index]);
  223. }
  224. return true;
  225. }
  226. public static function get()
  227. {
  228. throw new Exception('not a singleton');
  229. }
  230. /**
  231. * Set up event handlers on the streaming interface.
  232. *
  233. * @fixme add more event types as we add handling for them
  234. */
  235. protected function setupEvents(TwitterStreamReader $stream)
  236. {
  237. $handlers = array(
  238. 'status',
  239. );
  240. foreach ($handlers as $event) {
  241. $stream->hookEvent($event, array($this, 'onTwitter' . ucfirst($event)));
  242. }
  243. }
  244. /**
  245. * Event callback notifying that a user has a new message in their home timeline.
  246. * We store the incoming message into the queues for processing, keeping our own
  247. * daemon running as shiny-fast as possible.
  248. *
  249. * @param object $status JSON data: Twitter status update
  250. * @fixme in all-sites mode we may need to route queue items into another site's
  251. * destination queues, or multiple sites.
  252. */
  253. protected function onTwitterStatus($status, $context)
  254. {
  255. $data = array(
  256. 'status' => $status,
  257. 'for_user' => $context->for_user,
  258. );
  259. $qm = QueueManager::get();
  260. $qm->enqueue($data, 'tweetin');
  261. }
  262. }
  263. if (have_option('i', 'id')) {
  264. $id = get_option_value('i', 'id');
  265. } elseif (count($args) > 0) {
  266. $id = $args[0];
  267. } else {
  268. $id = null;
  269. }
  270. $foreground = have_option('f', 'foreground');
  271. $all = have_option('a') || have_option('--all');
  272. $daemon = new TwitterDaemon($id, !$foreground, 1, $all);
  273. $daemon->runOnce();