streamtest.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. <?php
  2. /**
  3. * StatusNet, the distributed open-source microblogging tool
  4. *
  5. * PHP version 5
  6. *
  7. * LICENCE: This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU Affero General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU Affero General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Affero General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. *
  20. * @category Plugin
  21. * @package StatusNet
  22. * @author Brion Vibber <brion@status.net>
  23. * @copyright 2010 StatusNet, Inc.
  24. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  25. * @link http://status.net/
  26. */
  27. define('INSTALLDIR', realpath(dirname(__FILE__) . '/../../..'));
  28. $shortoptions = 'n:';
  29. $longoptions = array('nick=','import','all','apiroot=');
  30. $helptext = <<<ENDOFHELP
  31. USAGE: streamtest.php -n <username>
  32. -n --nick=<username> Local user whose Twitter timeline to watch
  33. --import Experimental: run incoming messages through import
  34. --all Experimental: run multiuser; requires nick be the app owner
  35. --apiroot=<url> Provide alternate streaming API root URL
  36. Attempts a User Stream connection to Twitter as the given user, dumping
  37. data as it comes.
  38. ENDOFHELP;
  39. require_once INSTALLDIR.'/scripts/commandline.inc';
  40. require_once dirname(dirname(__FILE__)) . '/lib/jsonstreamreader.php';
  41. require_once dirname(dirname(__FILE__)) . '/lib/twitterstreamreader.php';
  42. if (have_option('n')) {
  43. $nickname = get_option_value('n');
  44. } else if (have_option('nick')) {
  45. $nickname = get_option_value('nickname');
  46. } else {
  47. show_help($helptext);
  48. exit(0);
  49. }
  50. /**
  51. *
  52. * @param User $user
  53. * @return TwitterOAuthClient
  54. */
  55. function twitterAuthForUser(User $user)
  56. {
  57. $flink = Foreign_link::getByUserID($user->id,
  58. TWITTER_SERVICE);
  59. if (!$flink) {
  60. throw new ServerException("No Twitter config for this user.");
  61. }
  62. $token = TwitterOAuthClient::unpackToken($flink->credentials);
  63. if (!$token) {
  64. throw new ServerException("No Twitter OAuth credentials for this user.");
  65. }
  66. return new TwitterOAuthClient($token->key, $token->secret);
  67. }
  68. function homeStreamForUser(User $user)
  69. {
  70. $auth = twitterAuthForUser($user);
  71. return new TwitterUserStream($auth);
  72. }
  73. function siteStreamForOwner(User $user)
  74. {
  75. // The user we auth as must be the owner of the application.
  76. $auth = twitterAuthForUser($user);
  77. if (have_option('apiroot')) {
  78. $stream = new TwitterSiteStream($auth, get_option_value('apiroot'));
  79. } else {
  80. $stream = new TwitterSiteStream($auth);
  81. }
  82. // Pull Twitter user IDs for all users we want to pull data for
  83. $userIds = array();
  84. $flink = new Foreign_link();
  85. $flink->service = TWITTER_SERVICE;
  86. $flink->find();
  87. while ($flink->fetch()) {
  88. if (($flink->noticesync & FOREIGN_NOTICE_RECV) ==
  89. FOREIGN_NOTICE_RECV) {
  90. $userIds[] = $flink->foreign_id;
  91. }
  92. }
  93. $stream->followUsers($userIds);
  94. return $stream;
  95. }
  96. $user = User::getKV('nickname', $nickname);
  97. global $myuser;
  98. $myuser = $user;
  99. if (have_option('all')) {
  100. $stream = siteStreamForOwner($user);
  101. } else {
  102. $stream = homeStreamForUser($user);
  103. }
  104. $stream->hookEvent('raw', function($data, $context) {
  105. common_log(LOG_INFO, json_encode($data) . ' for ' . json_encode($context));
  106. });
  107. $stream->hookEvent('friends', function($data, $context) {
  108. printf("Friend list: %s\n", implode(', ', $data->friends));
  109. });
  110. $stream->hookEvent('favorite', function($data, $context) {
  111. printf("%s favorited %s's notice: %s\n",
  112. $data->source->screen_name,
  113. $data->target->screen_name,
  114. $data->target_object->text);
  115. });
  116. $stream->hookEvent('unfavorite', function($data, $context) {
  117. printf("%s unfavorited %s's notice: %s\n",
  118. $data->source->screen_name,
  119. $data->target->screen_name,
  120. $data->target_object->text);
  121. });
  122. $stream->hookEvent('follow', function($data, $context) {
  123. printf("%s friended %s\n",
  124. $data->source->screen_name,
  125. $data->target->screen_name);
  126. });
  127. $stream->hookEvent('unfollow', function($data, $context) {
  128. printf("%s unfriended %s\n",
  129. $data->source->screen_name,
  130. $data->target->screen_name);
  131. });
  132. $stream->hookEvent('delete', function($data, $context) {
  133. printf("Deleted status notification: %s\n",
  134. $data->status->id);
  135. });
  136. $stream->hookEvent('scrub_geo', function($data, $context) {
  137. printf("Req to scrub geo data for user id %s up to status ID %s\n",
  138. $data->user_id,
  139. $data->up_to_status_id);
  140. });
  141. $stream->hookEvent('status', function($data, $context) {
  142. printf("Received status update from %s: %s\n",
  143. $data->user->screen_name,
  144. $data->text);
  145. if (have_option('import')) {
  146. $importer = new TwitterImport();
  147. printf("\timporting...");
  148. $notice = $importer->importStatus($data);
  149. if (!$notice instanceof Notice) {
  150. printf(" FAIL\n");
  151. }
  152. }
  153. });
  154. $stream->hookEvent('direct_message', function($data) {
  155. printf("Direct message from %s to %s: %s\n",
  156. $data->sender->screen_name,
  157. $data->recipient->screen_name,
  158. $data->text);
  159. });
  160. class TwitterManager extends IoManager
  161. {
  162. function __construct(TwitterStreamReader $stream)
  163. {
  164. $this->stream = $stream;
  165. }
  166. function getSockets()
  167. {
  168. return $this->stream->getSockets();
  169. }
  170. function handleInput($data)
  171. {
  172. $this->stream->handleInput($data);
  173. return true;
  174. }
  175. function start()
  176. {
  177. $this->stream->connect();
  178. return true;
  179. }
  180. function finish()
  181. {
  182. $this->stream->close();
  183. return true;
  184. }
  185. public static function get()
  186. {
  187. throw new Exception('not a singleton');
  188. }
  189. }
  190. class TwitterStreamMaster extends IoMaster
  191. {
  192. function __construct($id, $ioManager)
  193. {
  194. parent::__construct($id);
  195. $this->ioManager = $ioManager;
  196. }
  197. /**
  198. * Initialize IoManagers which are appropriate to this instance.
  199. */
  200. function initManagers()
  201. {
  202. $this->instantiate($this->ioManager);
  203. }
  204. }
  205. $master = new TwitterStreamMaster('TwitterStream', new TwitterManager($stream));
  206. $master->init();
  207. $master->service();