streamtest.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. <?php
  2. /**
  3. * StatusNet, the distributed open-source microblogging tool
  4. *
  5. * PHP version 5
  6. *
  7. * LICENCE: This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU Affero General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU Affero General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Affero General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. *
  20. * @category Plugin
  21. * @package StatusNet
  22. * @author Brion Vibber <brion@status.net>
  23. * @copyright 2010 StatusNet, Inc.
  24. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  25. * @link http://status.net/
  26. */
  27. define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..'));
  28. $shortoptions = 'n:';
  29. $longoptions = array('nick=','import','all','apiroot=');
  30. $helptext = <<<ENDOFHELP
  31. USAGE: streamtest.php -n <username>
  32. -n --nick=<username> Local user whose Twitter timeline to watch
  33. --import Experimental: run incoming messages through import
  34. --all Experimental: run multiuser; requires nick be the app owner
  35. --apiroot=<url> Provide alternate streaming API root URL
  36. Attempts a User Stream connection to Twitter as the given user, dumping
  37. data as it comes.
  38. ENDOFHELP;
  39. require_once INSTALLDIR.'/scripts/commandline.inc';
  40. require_once dirname(dirname(__FILE__)) . '/lib/jsonstreamreader.php';
  41. require_once dirname(dirname(__FILE__)) . '/lib/twitterstreamreader.php';
  42. if (have_option('n')) {
  43. $nickname = get_option_value('n');
  44. } else if (have_option('nick')) {
  45. $nickname = get_option_value('nickname');
  46. } else {
  47. show_help($helptext);
  48. exit(0);
  49. }
  50. /**
  51. *
  52. * @param User $user
  53. * @return TwitterOAuthClient
  54. */
  55. function twitterAuthForUser(User $user)
  56. {
  57. $flink = Foreign_link::getByUserID($user->id, TWITTER_SERVICE);
  58. $token = TwitterOAuthClient::unpackToken($flink->credentials);
  59. if (!$token) {
  60. throw new ServerException("No Twitter OAuth credentials for this user.");
  61. }
  62. return new TwitterOAuthClient($token->key, $token->secret);
  63. }
  64. function homeStreamForUser(User $user)
  65. {
  66. $auth = twitterAuthForUser($user);
  67. return new TwitterUserStream($auth);
  68. }
  69. function siteStreamForOwner(User $user)
  70. {
  71. // The user we auth as must be the owner of the application.
  72. $auth = twitterAuthForUser($user);
  73. if (have_option('apiroot')) {
  74. $stream = new TwitterSiteStream($auth, get_option_value('apiroot'));
  75. } else {
  76. $stream = new TwitterSiteStream($auth);
  77. }
  78. // Pull Twitter user IDs for all users we want to pull data for
  79. $userIds = array();
  80. $flink = new Foreign_link();
  81. $flink->service = TWITTER_SERVICE;
  82. $flink->find();
  83. while ($flink->fetch()) {
  84. if (($flink->noticesync & FOREIGN_NOTICE_RECV) ==
  85. FOREIGN_NOTICE_RECV) {
  86. $userIds[] = $flink->foreign_id;
  87. }
  88. }
  89. $stream->followUsers($userIds);
  90. return $stream;
  91. }
  92. $user = User::getKV('nickname', $nickname);
  93. global $myuser;
  94. $myuser = $user;
  95. if (have_option('all')) {
  96. $stream = siteStreamForOwner($user);
  97. } else {
  98. $stream = homeStreamForUser($user);
  99. }
  100. $stream->hookEvent('raw', function($data, $context) {
  101. common_log(LOG_INFO, json_encode($data) . ' for ' . json_encode($context));
  102. });
  103. $stream->hookEvent('friends', function($data, $context) {
  104. printf("Friend list: %s\n", implode(', ', $data->friends));
  105. });
  106. $stream->hookEvent('favorite', function($data, $context) {
  107. printf("%s favorited %s's notice: %s\n",
  108. $data->source->screen_name,
  109. $data->target->screen_name,
  110. $data->target_object->text);
  111. });
  112. $stream->hookEvent('unfavorite', function($data, $context) {
  113. printf("%s unfavorited %s's notice: %s\n",
  114. $data->source->screen_name,
  115. $data->target->screen_name,
  116. $data->target_object->text);
  117. });
  118. $stream->hookEvent('follow', function($data, $context) {
  119. printf("%s friended %s\n",
  120. $data->source->screen_name,
  121. $data->target->screen_name);
  122. });
  123. $stream->hookEvent('unfollow', function($data, $context) {
  124. printf("%s unfriended %s\n",
  125. $data->source->screen_name,
  126. $data->target->screen_name);
  127. });
  128. $stream->hookEvent('delete', function($data, $context) {
  129. printf("Deleted status notification: %s\n",
  130. $data->status->id);
  131. });
  132. $stream->hookEvent('scrub_geo', function($data, $context) {
  133. printf("Req to scrub geo data for user id %s up to status ID %s\n",
  134. $data->user_id,
  135. $data->up_to_status_id);
  136. });
  137. $stream->hookEvent('status', function($data, $context) {
  138. printf("Received status update from %s: %s\n",
  139. $data->user->screen_name,
  140. $data->text);
  141. if (have_option('import')) {
  142. $importer = new TwitterImport();
  143. printf("\timporting...");
  144. $notice = $importer->importStatus($data);
  145. if (!$notice instanceof Notice) {
  146. printf(" FAIL\n");
  147. }
  148. }
  149. });
  150. $stream->hookEvent('direct_message', function($data) {
  151. printf("Direct message from %s to %s: %s\n",
  152. $data->sender->screen_name,
  153. $data->recipient->screen_name,
  154. $data->text);
  155. });
  156. class TwitterManager extends IoManager
  157. {
  158. function __construct(TwitterStreamReader $stream)
  159. {
  160. $this->stream = $stream;
  161. }
  162. function getSockets()
  163. {
  164. return $this->stream->getSockets();
  165. }
  166. function handleInput($data)
  167. {
  168. $this->stream->handleInput($data);
  169. return true;
  170. }
  171. function start()
  172. {
  173. $this->stream->connect();
  174. return true;
  175. }
  176. function finish()
  177. {
  178. $this->stream->close();
  179. return true;
  180. }
  181. public static function get()
  182. {
  183. throw new Exception('not a singleton');
  184. }
  185. }
  186. class TwitterStreamMaster extends IoMaster
  187. {
  188. function __construct($id, $ioManager)
  189. {
  190. parent::__construct($id);
  191. $this->ioManager = $ioManager;
  192. }
  193. /**
  194. * Initialize IoManagers which are appropriate to this instance.
  195. */
  196. function initManagers()
  197. {
  198. $this->instantiate($this->ioManager);
  199. }
  200. }
  201. $master = new TwitterStreamMaster('TwitterStream', new TwitterManager($stream));
  202. $master->init();
  203. $master->service();