twitterdaemon.php 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. #!/usr/bin/env php
  2. <?php
  3. /*
  4. * StatusNet - the distributed open-source microblogging tool
  5. * Copyright (C) 2008-2010, StatusNet, Inc.
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU Affero General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU Affero General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Affero General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. */
  20. define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..'));
  21. $shortoptions = 'fi::a';
  22. $longoptions = array('id::', 'foreground', 'all');
  23. $helptext = <<<END_OF_TWITTERDAEMON_HELP
  24. Daemon script for receiving new notices from Twitter users.
  25. -i --id Identity (default none)
  26. -a --all Handle Twitter for all local sites
  27. (requires Stomp queue handler, status_network setup)
  28. -f --foreground Stay in the foreground (default background)
  29. END_OF_TWITTERDAEMON_HELP;
  30. require_once INSTALLDIR.'/scripts/commandline.inc';
  31. class TwitterDaemon extends SpawningDaemon
  32. {
  33. protected $allsites = false;
  34. function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
  35. {
  36. if ($threads != 1) {
  37. // This should never happen. :)
  38. throw new Exception("TwitterDaemon must run single-threaded");
  39. }
  40. parent::__construct($id, $daemonize, $threads);
  41. $this->allsites = $allsites;
  42. }
  43. function runThread()
  44. {
  45. common_log(LOG_INFO, 'Waiting to listen to Twitter and queues');
  46. $master = new TwitterMaster($this->get_id(), $this->processManager());
  47. $master->init($this->allsites);
  48. $master->service();
  49. common_log(LOG_INFO, 'terminating normally');
  50. return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
  51. }
  52. }
  53. class TwitterMaster extends IoMaster
  54. {
  55. protected $processManager;
  56. function __construct($id, $processManager)
  57. {
  58. parent::__construct($id);
  59. $this->processManager = $processManager;
  60. }
  61. /**
  62. * Initialize IoManagers for the currently configured site
  63. * which are appropriate to this instance.
  64. */
  65. function initManagers()
  66. {
  67. $qm = QueueManager::get();
  68. $qm->setActiveGroup('twitter');
  69. $this->instantiate($qm);
  70. $this->instantiate(new TwitterManager());
  71. $this->instantiate($this->processManager);
  72. }
  73. }
  74. class TwitterManager extends IoManager
  75. {
  76. // Recommended resource limits from http://dev.twitter.com/pages/site_streams
  77. const MAX_STREAMS = 1000;
  78. const USERS_PER_STREAM = 100;
  79. const STREAMS_PER_SECOND = 20;
  80. protected $streams;
  81. protected $users;
  82. /**
  83. * Pull the site's active Twitter-importing users and start spawning
  84. * some data streams for them!
  85. *
  86. * @fixme check their last-id and check whether we'll need to do a manual pull.
  87. * @fixme abstract out the fetching so we can work over multiple sites.
  88. */
  89. protected function initStreams()
  90. {
  91. common_log(LOG_INFO, 'init...');
  92. // Pull Twitter user IDs for all users we want to pull data for
  93. $flink = new Foreign_link();
  94. $flink->service = TWITTER_SERVICE;
  95. // @fixme probably should do the bitfield check in a whereAdd but it's ugly :D
  96. $flink->find();
  97. $userIds = array();
  98. while ($flink->fetch()) {
  99. if (($flink->noticesync & FOREIGN_NOTICE_RECV) ==
  100. FOREIGN_NOTICE_RECV) {
  101. $userIds[] = $flink->foreign_id;
  102. if (count($userIds) >= self::USERS_PER_STREAM) {
  103. $this->spawnStream($userIds);
  104. $userIds = array();
  105. }
  106. }
  107. }
  108. if (count($userIds)) {
  109. $this->spawnStream($userIds);
  110. }
  111. }
  112. /**
  113. * Prepare a Site Stream connection for the given chunk of users.
  114. * The actual connection will be opened later.
  115. *
  116. * @param $userIds array of Twitter-side user IDs
  117. */
  118. protected function spawnStream($userIds)
  119. {
  120. $stream = $this->initSiteStream();
  121. $stream->followUsers($userIds);
  122. // Slip the stream reader into our list of active streams.
  123. // We'll manage its actual connection on the next go-around.
  124. $this->streams[] = $stream;
  125. // Record the user->stream mappings; this makes it easier for us to know
  126. // later if we need to kill something.
  127. foreach ($userIds as $id) {
  128. $this->users[$id] = $stream;
  129. }
  130. }
  131. /**
  132. * Initialize a generic site streams connection object.
  133. * All our connections will look like this, then we'll add users to them.
  134. *
  135. * @return TwitterStreamReader
  136. */
  137. protected function initSiteStream()
  138. {
  139. $auth = $this->siteStreamAuth();
  140. $stream = new TwitterSiteStream($auth);
  141. // Add our event handler callbacks. Whee!
  142. $this->setupEvents($stream);
  143. return $stream;
  144. }
  145. /**
  146. * Fetch the Twitter OAuth credentials to use to connect to the Site Streams API.
  147. *
  148. * This will use the locally-stored credentials for the applictation's owner account
  149. * from the site configuration. These should be configured through the administration
  150. * panels or manually in the config file.
  151. *
  152. * Will throw an exception if no credentials can be found -- but beware that invalid
  153. * credentials won't cause breakage until later.
  154. *
  155. * @return TwitterOAuthClient
  156. */
  157. protected function siteStreamAuth()
  158. {
  159. $token = common_config('twitter', 'stream_token');
  160. $secret = common_config('twitter', 'stream_secret');
  161. if (empty($token) || empty($secret)) {
  162. throw new ServerException('Twitter site streams have not been correctly configured. Configure the app owner account via the admin panel.');
  163. }
  164. return new TwitterOAuthClient($token, $secret);
  165. }
  166. /**
  167. * Collect the sockets for all active connections for i/o monitoring.
  168. *
  169. * @return array of resources
  170. */
  171. public function getSockets()
  172. {
  173. $sockets = array();
  174. foreach ($this->streams as $stream) {
  175. foreach ($stream->getSockets() as $socket) {
  176. $sockets[] = $socket;
  177. }
  178. }
  179. return $sockets;
  180. }
  181. /**
  182. * We're ready to process input from one of our data sources! Woooooo!
  183. * @fixme is there an easier way to map from socket back to owning module? :(
  184. *
  185. * @param resource $socket
  186. * @return boolean success
  187. */
  188. public function handleInput($socket)
  189. {
  190. foreach ($this->streams as $stream) {
  191. foreach ($stream->getSockets() as $aSocket) {
  192. if ($socket === $aSocket) {
  193. $stream->handleInput($socket);
  194. }
  195. }
  196. }
  197. return true;
  198. }
  199. /**
  200. * Start the i/o system up! Prepare our connections and start opening them.
  201. *
  202. * @fixme do some rate-limiting on the stream setup
  203. * @fixme do some sensible backoff on failure etc
  204. */
  205. public function start()
  206. {
  207. $this->initStreams();
  208. foreach ($this->streams as $stream) {
  209. $stream->connect();
  210. }
  211. return true;
  212. }
  213. /**
  214. * Close down our connections when the daemon wraps up for business.
  215. */
  216. public function finish()
  217. {
  218. foreach ($this->streams as $index => $stream) {
  219. $stream->close();
  220. unset($this->streams[$index]);
  221. }
  222. return true;
  223. }
  224. public static function get()
  225. {
  226. throw new Exception('not a singleton');
  227. }
  228. /**
  229. * Set up event handlers on the streaming interface.
  230. *
  231. * @fixme add more event types as we add handling for them
  232. */
  233. protected function setupEvents(TwitterStreamReader $stream)
  234. {
  235. $handlers = array(
  236. 'status',
  237. );
  238. foreach ($handlers as $event) {
  239. $stream->hookEvent($event, array($this, 'onTwitter' . ucfirst($event)));
  240. }
  241. }
  242. /**
  243. * Event callback notifying that a user has a new message in their home timeline.
  244. * We store the incoming message into the queues for processing, keeping our own
  245. * daemon running as shiny-fast as possible.
  246. *
  247. * @param object $status JSON data: Twitter status update
  248. * @fixme in all-sites mode we may need to route queue items into another site's
  249. * destination queues, or multiple sites.
  250. */
  251. protected function onTwitterStatus($status, $context)
  252. {
  253. $data = array(
  254. 'status' => $status,
  255. 'for_user' => $context->for_user,
  256. );
  257. $qm = QueueManager::get();
  258. $qm->enqueue($data, 'tweetin');
  259. }
  260. }
  261. if (have_option('i', 'id')) {
  262. $id = get_option_value('i', 'id');
  263. } else if (count($args) > 0) {
  264. $id = $args[0];
  265. } else {
  266. $id = null;
  267. }
  268. $foreground = have_option('f', 'foreground');
  269. $all = have_option('a') || have_option('--all');
  270. $daemon = new TwitterDaemon($id, !$foreground, 1, $all);
  271. $daemon->runOnce();