streaming.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. /*
  2. * Copyright (c) 2011, Google Inc.
  3. */
  4. #include "cache.h"
  5. #include "streaming.h"
  6. #include "repository.h"
  7. #include "object-store.h"
  8. #include "replace-object.h"
  9. #include "packfile.h"
  10. enum input_source {
  11. stream_error = -1,
  12. incore = 0,
  13. loose = 1,
  14. pack_non_delta = 2
  15. };
  16. typedef int (*open_istream_fn)(struct git_istream *,
  17. struct repository *,
  18. struct object_info *,
  19. const struct object_id *,
  20. enum object_type *);
  21. typedef int (*close_istream_fn)(struct git_istream *);
  22. typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
  23. struct stream_vtbl {
  24. close_istream_fn close;
  25. read_istream_fn read;
  26. };
  27. #define open_method_decl(name) \
  28. int open_istream_ ##name \
  29. (struct git_istream *st, struct repository *r, \
  30. struct object_info *oi, const struct object_id *oid, \
  31. enum object_type *type)
  32. #define close_method_decl(name) \
  33. int close_istream_ ##name \
  34. (struct git_istream *st)
  35. #define read_method_decl(name) \
  36. ssize_t read_istream_ ##name \
  37. (struct git_istream *st, char *buf, size_t sz)
  38. /* forward declaration */
  39. static open_method_decl(incore);
  40. static open_method_decl(loose);
  41. static open_method_decl(pack_non_delta);
  42. static struct git_istream *attach_stream_filter(struct git_istream *st,
  43. struct stream_filter *filter);
  44. static open_istream_fn open_istream_tbl[] = {
  45. open_istream_incore,
  46. open_istream_loose,
  47. open_istream_pack_non_delta,
  48. };
  49. #define FILTER_BUFFER (1024*16)
  50. struct filtered_istream {
  51. struct git_istream *upstream;
  52. struct stream_filter *filter;
  53. char ibuf[FILTER_BUFFER];
  54. char obuf[FILTER_BUFFER];
  55. int i_end, i_ptr;
  56. int o_end, o_ptr;
  57. int input_finished;
  58. };
  59. struct git_istream {
  60. const struct stream_vtbl *vtbl;
  61. unsigned long size; /* inflated size of full object */
  62. git_zstream z;
  63. enum { z_unused, z_used, z_done, z_error } z_state;
  64. union {
  65. struct {
  66. char *buf; /* from read_object() */
  67. unsigned long read_ptr;
  68. } incore;
  69. struct {
  70. void *mapped;
  71. unsigned long mapsize;
  72. char hdr[32];
  73. int hdr_avail;
  74. int hdr_used;
  75. } loose;
  76. struct {
  77. struct packed_git *pack;
  78. off_t pos;
  79. } in_pack;
  80. struct filtered_istream filtered;
  81. } u;
  82. };
  83. int close_istream(struct git_istream *st)
  84. {
  85. int r = st->vtbl->close(st);
  86. free(st);
  87. return r;
  88. }
  89. ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
  90. {
  91. return st->vtbl->read(st, buf, sz);
  92. }
  93. static enum input_source istream_source(struct repository *r,
  94. const struct object_id *oid,
  95. enum object_type *type,
  96. struct object_info *oi)
  97. {
  98. unsigned long size;
  99. int status;
  100. oi->typep = type;
  101. oi->sizep = &size;
  102. status = oid_object_info_extended(r, oid, oi, 0);
  103. if (status < 0)
  104. return stream_error;
  105. switch (oi->whence) {
  106. case OI_LOOSE:
  107. return loose;
  108. case OI_PACKED:
  109. if (!oi->u.packed.is_delta && big_file_threshold < size)
  110. return pack_non_delta;
  111. /* fallthru */
  112. default:
  113. return incore;
  114. }
  115. }
  116. struct git_istream *open_istream(struct repository *r,
  117. const struct object_id *oid,
  118. enum object_type *type,
  119. unsigned long *size,
  120. struct stream_filter *filter)
  121. {
  122. struct git_istream *st;
  123. struct object_info oi = OBJECT_INFO_INIT;
  124. const struct object_id *real = lookup_replace_object(r, oid);
  125. enum input_source src = istream_source(r, real, type, &oi);
  126. if (src < 0)
  127. return NULL;
  128. st = xmalloc(sizeof(*st));
  129. if (open_istream_tbl[src](st, r, &oi, real, type)) {
  130. if (open_istream_incore(st, r, &oi, real, type)) {
  131. free(st);
  132. return NULL;
  133. }
  134. }
  135. if (filter) {
  136. /* Add "&& !is_null_stream_filter(filter)" for performance */
  137. struct git_istream *nst = attach_stream_filter(st, filter);
  138. if (!nst) {
  139. close_istream(st);
  140. return NULL;
  141. }
  142. st = nst;
  143. }
  144. *size = st->size;
  145. return st;
  146. }
  147. /*****************************************************************
  148. *
  149. * Common helpers
  150. *
  151. *****************************************************************/
  152. static void close_deflated_stream(struct git_istream *st)
  153. {
  154. if (st->z_state == z_used)
  155. git_inflate_end(&st->z);
  156. }
  157. /*****************************************************************
  158. *
  159. * Filtered stream
  160. *
  161. *****************************************************************/
  162. static close_method_decl(filtered)
  163. {
  164. free_stream_filter(st->u.filtered.filter);
  165. return close_istream(st->u.filtered.upstream);
  166. }
  167. static read_method_decl(filtered)
  168. {
  169. struct filtered_istream *fs = &(st->u.filtered);
  170. size_t filled = 0;
  171. while (sz) {
  172. /* do we already have filtered output? */
  173. if (fs->o_ptr < fs->o_end) {
  174. size_t to_move = fs->o_end - fs->o_ptr;
  175. if (sz < to_move)
  176. to_move = sz;
  177. memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
  178. fs->o_ptr += to_move;
  179. sz -= to_move;
  180. filled += to_move;
  181. continue;
  182. }
  183. fs->o_end = fs->o_ptr = 0;
  184. /* do we have anything to feed the filter with? */
  185. if (fs->i_ptr < fs->i_end) {
  186. size_t to_feed = fs->i_end - fs->i_ptr;
  187. size_t to_receive = FILTER_BUFFER;
  188. if (stream_filter(fs->filter,
  189. fs->ibuf + fs->i_ptr, &to_feed,
  190. fs->obuf, &to_receive))
  191. return -1;
  192. fs->i_ptr = fs->i_end - to_feed;
  193. fs->o_end = FILTER_BUFFER - to_receive;
  194. continue;
  195. }
  196. /* tell the filter to drain upon no more input */
  197. if (fs->input_finished) {
  198. size_t to_receive = FILTER_BUFFER;
  199. if (stream_filter(fs->filter,
  200. NULL, NULL,
  201. fs->obuf, &to_receive))
  202. return -1;
  203. fs->o_end = FILTER_BUFFER - to_receive;
  204. if (!fs->o_end)
  205. break;
  206. continue;
  207. }
  208. fs->i_end = fs->i_ptr = 0;
  209. /* refill the input from the upstream */
  210. if (!fs->input_finished) {
  211. fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
  212. if (fs->i_end < 0)
  213. return -1;
  214. if (fs->i_end)
  215. continue;
  216. }
  217. fs->input_finished = 1;
  218. }
  219. return filled;
  220. }
  221. static struct stream_vtbl filtered_vtbl = {
  222. close_istream_filtered,
  223. read_istream_filtered,
  224. };
  225. static struct git_istream *attach_stream_filter(struct git_istream *st,
  226. struct stream_filter *filter)
  227. {
  228. struct git_istream *ifs = xmalloc(sizeof(*ifs));
  229. struct filtered_istream *fs = &(ifs->u.filtered);
  230. ifs->vtbl = &filtered_vtbl;
  231. fs->upstream = st;
  232. fs->filter = filter;
  233. fs->i_end = fs->i_ptr = 0;
  234. fs->o_end = fs->o_ptr = 0;
  235. fs->input_finished = 0;
  236. ifs->size = -1; /* unknown */
  237. return ifs;
  238. }
  239. /*****************************************************************
  240. *
  241. * Loose object stream
  242. *
  243. *****************************************************************/
  244. static read_method_decl(loose)
  245. {
  246. size_t total_read = 0;
  247. switch (st->z_state) {
  248. case z_done:
  249. return 0;
  250. case z_error:
  251. return -1;
  252. default:
  253. break;
  254. }
  255. if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
  256. size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
  257. if (sz < to_copy)
  258. to_copy = sz;
  259. memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
  260. st->u.loose.hdr_used += to_copy;
  261. total_read += to_copy;
  262. }
  263. while (total_read < sz) {
  264. int status;
  265. st->z.next_out = (unsigned char *)buf + total_read;
  266. st->z.avail_out = sz - total_read;
  267. status = git_inflate(&st->z, Z_FINISH);
  268. total_read = st->z.next_out - (unsigned char *)buf;
  269. if (status == Z_STREAM_END) {
  270. git_inflate_end(&st->z);
  271. st->z_state = z_done;
  272. break;
  273. }
  274. if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
  275. git_inflate_end(&st->z);
  276. st->z_state = z_error;
  277. return -1;
  278. }
  279. }
  280. return total_read;
  281. }
  282. static close_method_decl(loose)
  283. {
  284. close_deflated_stream(st);
  285. munmap(st->u.loose.mapped, st->u.loose.mapsize);
  286. return 0;
  287. }
  288. static struct stream_vtbl loose_vtbl = {
  289. close_istream_loose,
  290. read_istream_loose,
  291. };
  292. static open_method_decl(loose)
  293. {
  294. st->u.loose.mapped = map_loose_object(r, oid, &st->u.loose.mapsize);
  295. if (!st->u.loose.mapped)
  296. return -1;
  297. if ((unpack_loose_header(&st->z,
  298. st->u.loose.mapped,
  299. st->u.loose.mapsize,
  300. st->u.loose.hdr,
  301. sizeof(st->u.loose.hdr)) < 0) ||
  302. (parse_loose_header(st->u.loose.hdr, &st->size) < 0)) {
  303. git_inflate_end(&st->z);
  304. munmap(st->u.loose.mapped, st->u.loose.mapsize);
  305. return -1;
  306. }
  307. st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
  308. st->u.loose.hdr_avail = st->z.total_out;
  309. st->z_state = z_used;
  310. st->vtbl = &loose_vtbl;
  311. return 0;
  312. }
  313. /*****************************************************************
  314. *
  315. * Non-delta packed object stream
  316. *
  317. *****************************************************************/
  318. static read_method_decl(pack_non_delta)
  319. {
  320. size_t total_read = 0;
  321. switch (st->z_state) {
  322. case z_unused:
  323. memset(&st->z, 0, sizeof(st->z));
  324. git_inflate_init(&st->z);
  325. st->z_state = z_used;
  326. break;
  327. case z_done:
  328. return 0;
  329. case z_error:
  330. return -1;
  331. case z_used:
  332. break;
  333. }
  334. while (total_read < sz) {
  335. int status;
  336. struct pack_window *window = NULL;
  337. unsigned char *mapped;
  338. mapped = use_pack(st->u.in_pack.pack, &window,
  339. st->u.in_pack.pos, &st->z.avail_in);
  340. st->z.next_out = (unsigned char *)buf + total_read;
  341. st->z.avail_out = sz - total_read;
  342. st->z.next_in = mapped;
  343. status = git_inflate(&st->z, Z_FINISH);
  344. st->u.in_pack.pos += st->z.next_in - mapped;
  345. total_read = st->z.next_out - (unsigned char *)buf;
  346. unuse_pack(&window);
  347. if (status == Z_STREAM_END) {
  348. git_inflate_end(&st->z);
  349. st->z_state = z_done;
  350. break;
  351. }
  352. /*
  353. * Unlike the loose object case, we do not have to worry here
  354. * about running out of input bytes and spinning infinitely. If
  355. * we get Z_BUF_ERROR due to too few input bytes, then we'll
  356. * replenish them in the next use_pack() call when we loop. If
  357. * we truly hit the end of the pack (i.e., because it's corrupt
  358. * or truncated), then use_pack() catches that and will die().
  359. */
  360. if (status != Z_OK && status != Z_BUF_ERROR) {
  361. git_inflate_end(&st->z);
  362. st->z_state = z_error;
  363. return -1;
  364. }
  365. }
  366. return total_read;
  367. }
  368. static close_method_decl(pack_non_delta)
  369. {
  370. close_deflated_stream(st);
  371. return 0;
  372. }
  373. static struct stream_vtbl pack_non_delta_vtbl = {
  374. close_istream_pack_non_delta,
  375. read_istream_pack_non_delta,
  376. };
  377. static open_method_decl(pack_non_delta)
  378. {
  379. struct pack_window *window;
  380. enum object_type in_pack_type;
  381. st->u.in_pack.pack = oi->u.packed.pack;
  382. st->u.in_pack.pos = oi->u.packed.offset;
  383. window = NULL;
  384. in_pack_type = unpack_object_header(st->u.in_pack.pack,
  385. &window,
  386. &st->u.in_pack.pos,
  387. &st->size);
  388. unuse_pack(&window);
  389. switch (in_pack_type) {
  390. default:
  391. return -1; /* we do not do deltas for now */
  392. case OBJ_COMMIT:
  393. case OBJ_TREE:
  394. case OBJ_BLOB:
  395. case OBJ_TAG:
  396. break;
  397. }
  398. st->z_state = z_unused;
  399. st->vtbl = &pack_non_delta_vtbl;
  400. return 0;
  401. }
  402. /*****************************************************************
  403. *
  404. * In-core stream
  405. *
  406. *****************************************************************/
  407. static close_method_decl(incore)
  408. {
  409. free(st->u.incore.buf);
  410. return 0;
  411. }
  412. static read_method_decl(incore)
  413. {
  414. size_t read_size = sz;
  415. size_t remainder = st->size - st->u.incore.read_ptr;
  416. if (remainder <= read_size)
  417. read_size = remainder;
  418. if (read_size) {
  419. memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
  420. st->u.incore.read_ptr += read_size;
  421. }
  422. return read_size;
  423. }
  424. static struct stream_vtbl incore_vtbl = {
  425. close_istream_incore,
  426. read_istream_incore,
  427. };
  428. static open_method_decl(incore)
  429. {
  430. st->u.incore.buf = read_object_file_extended(r, oid, type, &st->size, 0);
  431. st->u.incore.read_ptr = 0;
  432. st->vtbl = &incore_vtbl;
  433. return st->u.incore.buf ? 0 : -1;
  434. }
  435. /****************************************************************
  436. * Users of streaming interface
  437. ****************************************************************/
  438. int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
  439. int can_seek)
  440. {
  441. struct git_istream *st;
  442. enum object_type type;
  443. unsigned long sz;
  444. ssize_t kept = 0;
  445. int result = -1;
  446. st = open_istream(the_repository, oid, &type, &sz, filter);
  447. if (!st) {
  448. if (filter)
  449. free_stream_filter(filter);
  450. return result;
  451. }
  452. if (type != OBJ_BLOB)
  453. goto close_and_exit;
  454. for (;;) {
  455. char buf[1024 * 16];
  456. ssize_t wrote, holeto;
  457. ssize_t readlen = read_istream(st, buf, sizeof(buf));
  458. if (readlen < 0)
  459. goto close_and_exit;
  460. if (!readlen)
  461. break;
  462. if (can_seek && sizeof(buf) == readlen) {
  463. for (holeto = 0; holeto < readlen; holeto++)
  464. if (buf[holeto])
  465. break;
  466. if (readlen == holeto) {
  467. kept += holeto;
  468. continue;
  469. }
  470. }
  471. if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
  472. goto close_and_exit;
  473. else
  474. kept = 0;
  475. wrote = write_in_full(fd, buf, readlen);
  476. if (wrote < 0)
  477. goto close_and_exit;
  478. }
  479. if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
  480. xwrite(fd, "", 1) != 1))
  481. goto close_and_exit;
  482. result = 0;
  483. close_and_exit:
  484. close_istream(st);
  485. return result;
  486. }