storage_write.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. #include "platform.h"
  2. #include <signal.h>
  3. #include <stdint.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include "crypto.h"
  7. #include "netpacket.h"
  8. #include "netproto.h"
  9. #include "storage_internal.h"
  10. #include "tarsnap_opt.h"
  11. #include "tsnetwork.h"
  12. #include "warnp.h"
  13. #include "storage.h"
  14. /*
  15. * Maximum number of bytes of file writes which are allowed to be pending
  16. * before storage_write_file will block.
  17. */
  18. #define MAXPENDING_WRITEBYTES (5 * 1024 * 1024)
  19. /*
  20. * Number of connections to use for writes when --aggressive-networking is
  21. * enabled. This MUST NOT be set to more than 8.
  22. */
  23. #define AGGRESSIVE_CNUM 8
  24. struct storage_write_internal {
  25. /* Transaction parameters. */
  26. NETPACKET_CONNECTION * NPC[AGGRESSIVE_CNUM];
  27. uint64_t machinenum;
  28. uint8_t nonce[32];
  29. /* Number of connections to use. */
  30. size_t numconns;
  31. /* Last connection through which a request was sent. */
  32. size_t lastcnum;
  33. /* Number of bytes of pending writes. */
  34. size_t nbytespending;
  35. /* Last time we wrote a checkpoint. */
  36. uint64_t lastcheckpoint;
  37. };
  38. struct write_fexist_internal {
  39. /* General state information. */
  40. uint64_t machinenum;
  41. int done;
  42. /* Parameters used in write_fexist. */
  43. uint8_t class;
  44. uint8_t name[32];
  45. uint8_t nonce[32];
  46. int status;
  47. };
  48. struct write_file_internal {
  49. /* Pointer to transaction to which this belongs. */
  50. struct storage_write_internal * S;
  51. /* General state information. */
  52. uint64_t machinenum;
  53. int done;
  54. /* Parameters used in write_file. */
  55. uint8_t class;
  56. uint8_t name[32];
  57. uint8_t nonce[32];
  58. size_t flen;
  59. uint8_t * filebuf;
  60. };
  61. static void raisesigs(struct storage_write_internal * S);
  62. static sendpacket_callback callback_fexist_send;
  63. static handlepacket_callback callback_fexist_response;
  64. static sendpacket_callback callback_write_file_send;
  65. static handlepacket_callback callback_write_file_response;
  66. /**
  67. * raisesigs(S):
  68. * Look at how much bandwidth has been used plus what will be used once all
  69. * pending requests are sent, and send a SIGQUIT to ourselves if this exceeds
  70. * tarsnap_opt_maxbytesout, or a SIGUSR2 if it exceeds
  71. * tarsnap_opt_nextcheckpoint.
  72. */
  73. static void
  74. raisesigs(struct storage_write_internal * S)
  75. {
  76. uint64_t in, out, queued;
  77. uint64_t totalout = 0;
  78. size_t i;
  79. /* Add up bandwidth from all the connections. */
  80. for (i = 0; i < S->numconns; i++) {
  81. netpacket_getstats(S->NPC[i], &in, &out, &queued);
  82. totalout += out + queued;
  83. }
  84. /* Send a SIGQUIT if appropriate. */
  85. if (totalout > tarsnap_opt_maxbytesout) {
  86. if (raise(SIGQUIT))
  87. warnp("raise(SIGQUIT)");
  88. }
  89. /* Send a SIGUSR2 if appropriate. */
  90. if (tarsnap_opt_checkpointbytes != (uint64_t)(-1)) {
  91. if (totalout > S->lastcheckpoint + tarsnap_opt_checkpointbytes) {
  92. S->lastcheckpoint = totalout;
  93. if (raise(SIGUSR2))
  94. warnp("raise(SIGUSR2)");
  95. }
  96. }
  97. }
  98. /**
  99. * storage_write_start(machinenum, lastseq, seqnum):
  100. * Start a write transaction, presuming that ${lastseq} is the sequence
  101. * number of the last committed transaction, or zeroes if there is no
  102. * previous transaction; and store the sequence number of the new transaction
  103. * into ${seqnum}.
  104. */
  105. STORAGE_W *
  106. storage_write_start(uint64_t machinenum, const uint8_t lastseq[32],
  107. uint8_t seqnum[32])
  108. {
  109. struct storage_write_internal * S;
  110. size_t i;
  111. /* Allocate memory. */
  112. if ((S = malloc(sizeof(struct storage_write_internal))) == NULL)
  113. goto err0;
  114. /* Store machine number. */
  115. S->machinenum = machinenum;
  116. /* Figure out how many connections to use. */
  117. S->numconns = tarsnap_opt_aggressive_networking ? AGGRESSIVE_CNUM : 1;
  118. /* No connections used yet. */
  119. S->lastcnum = 0;
  120. /* No pending writes so far. */
  121. S->nbytespending = 0;
  122. /* No checkpoint yet. */
  123. S->lastcheckpoint = 0;
  124. /* Open netpacket connections. */
  125. for (i = 0; i < S->numconns; i++) {
  126. if ((S->NPC[i] = netpacket_open(USERAGENT)) == NULL)
  127. goto err1;
  128. }
  129. /* Start a write transaction. */
  130. if (storage_transaction_start_write(S->NPC[0], machinenum,
  131. lastseq, S->nonce))
  132. goto err2;
  133. /* Copy the transaction nonce out. */
  134. memcpy(seqnum, S->nonce, 32);
  135. /* Success! */
  136. return (S);
  137. err2:
  138. i = S->numconns;
  139. err1:
  140. for (i--; i < S->numconns; i--)
  141. netpacket_close(S->NPC[i]);
  142. free(S);
  143. err0:
  144. /* Failure! */
  145. return (NULL);
  146. }
  147. /**
  148. * storage_write_fexist(S, class, name):
  149. * Test if a file ${name} exists in class ${class}, as part of the write
  150. * transaction associated with the cookie ${S}; return 1 if the file
  151. * exists, 0 if not, and -1 on error. If ${S} is NULL, return 0 without doing
  152. * anything.
  153. */
  154. int
  155. storage_write_fexist(STORAGE_W * S, char class, const uint8_t name[32])
  156. {
  157. struct write_fexist_internal C;
  158. /* No-op on NULL. */
  159. if (S == NULL)
  160. return (0);
  161. /* Initialize structure. */
  162. C.machinenum = S->machinenum;
  163. C.class = (uint8_t)class;
  164. memcpy(C.name, name, 32);
  165. memcpy(C.nonce, S->nonce, 32);
  166. C.done = 0;
  167. /* Ask the netpacket layer to send a request and get a response. */
  168. if (netpacket_op(S->NPC[0], callback_fexist_send, &C))
  169. goto err0;
  170. /* Wait until the server has responded or we have failed. */
  171. if (network_spin(&C.done))
  172. goto err0;
  173. /* Parse status returned by server. */
  174. switch (C.status) {
  175. case 0:
  176. /* File does not exist. */
  177. break;
  178. case 1:
  179. /* File exists. */
  180. break;
  181. case 2:
  182. /* Bad nonce. */
  183. warn0("Transaction interrupted");
  184. goto err0;
  185. default:
  186. netproto_printerr(NETPROTO_STATUS_PROTERR);
  187. goto err0;
  188. }
  189. /* Success! */
  190. return (C.status);
  191. err0:
  192. /* Failure! */
  193. return (-1);
  194. }
  195. static int
  196. callback_fexist_send(void * cookie, NETPACKET_CONNECTION * NPC)
  197. {
  198. struct write_fexist_internal * C = cookie;
  199. /* Ask the server if the file exists. */
  200. return (netpacket_write_fexist(NPC, C->machinenum, C->class,
  201. C->name, C->nonce, callback_fexist_response));
  202. }
  203. static int
  204. callback_fexist_response(void * cookie, NETPACKET_CONNECTION * NPC,
  205. int status, uint8_t packettype, const uint8_t * packetbuf,
  206. size_t packetlen)
  207. {
  208. struct write_fexist_internal * C = cookie;
  209. (void)packetlen; /* UNUSED */
  210. (void)NPC; /* UNUSED */
  211. /* Handle errors. */
  212. if (status != NETWORK_STATUS_OK) {
  213. netproto_printerr(status);
  214. goto err0;
  215. }
  216. /* Make sure we received the right type of packet. */
  217. if (packettype != NETPACKET_WRITE_FEXIST_RESPONSE)
  218. goto err1;
  219. /* Verify packet hmac. */
  220. switch (netpacket_hmac_verify(packettype, C->nonce,
  221. packetbuf, 34, CRYPTO_KEY_AUTH_PUT)) {
  222. case 1:
  223. goto err1;
  224. case -1:
  225. goto err0;
  226. }
  227. /* Make sure that the packet corresponds to the right file. */
  228. if ((packetbuf[1] != C->class) ||
  229. (memcmp(&packetbuf[2], C->name, 32)))
  230. goto err1;
  231. /* Record status code returned by server. */
  232. C->status = packetbuf[0];
  233. /* We're done! */
  234. C->done = 1;
  235. /* Success! */
  236. return (0);
  237. err1:
  238. netproto_printerr(NETPROTO_STATUS_PROTERR);
  239. err0:
  240. /* Failure! */
  241. return (-1);
  242. }
  243. /**
  244. * storage_write_file(S, buf, len, class, name):
  245. * Write ${len} bytes from ${buf} to the file ${name} in class ${class} as
  246. * part of the write transaction associated with the cookie ${S}. If ${S} is
  247. * NULL, return 0 without doing anything.
  248. */
  249. int
  250. storage_write_file(STORAGE_W * S, uint8_t * buf, size_t len,
  251. char class, const uint8_t name[32])
  252. {
  253. struct write_file_internal * C;
  254. /* No-op on NULL. */
  255. if (S == NULL)
  256. return (0);
  257. /* Create write cookie. */
  258. if ((C = malloc(sizeof(struct write_file_internal))) == NULL)
  259. goto err0;
  260. C->S = S;
  261. C->machinenum = S->machinenum;
  262. C->class = (uint8_t)class;
  263. memcpy(C->name, name, 32);
  264. memcpy(C->nonce, S->nonce, 32);
  265. C->done = 0;
  266. /* Sanity-check file length. */
  267. if (len > 262144 - CRYPTO_FILE_TLEN - CRYPTO_FILE_HLEN) {
  268. warn0("File is too large");
  269. goto err1;
  270. }
  271. C->flen = CRYPTO_FILE_HLEN + len + CRYPTO_FILE_TLEN;
  272. /* Allocate space for encrypted file. */
  273. if ((C->filebuf = malloc(C->flen)) == NULL)
  274. goto err1;
  275. /* Encrypt and hash file. */
  276. if (crypto_file_enc(buf, len, C->filebuf))
  277. goto err2;
  278. /* We're issuing a write operation. */
  279. S->nbytespending += C->flen;
  280. /*
  281. * Make sure the pending operation queue isn't too large before we
  282. * add yet another operation to it.
  283. */
  284. while (S->nbytespending > MAXPENDING_WRITEBYTES) {
  285. if (network_select(1))
  286. goto err2;
  287. }
  288. /* Ask the netpacket layer to send a request and get a response. */
  289. S->lastcnum = (S->lastcnum + 1) % S->numconns;
  290. if (netpacket_op(S->NPC[S->lastcnum], callback_write_file_send, C))
  291. goto err0;
  292. /* Send ourself SIGQUIT or SIGUSR2 if necessary. */
  293. raisesigs(S);
  294. /* Success! */
  295. return (0);
  296. err2:
  297. free(C->filebuf);
  298. err1:
  299. free(C);
  300. err0:
  301. /* Failure! */
  302. return (-1);
  303. }
  304. static int
  305. callback_write_file_send(void * cookie, NETPACKET_CONNECTION * NPC)
  306. {
  307. struct write_file_internal * C = cookie;
  308. /* Write the file. */
  309. return (netpacket_write_file(NPC, C->machinenum, C->class, C->name,
  310. C->filebuf, C->flen, C->nonce, callback_write_file_response));
  311. }
  312. static int
  313. callback_write_file_response(void * cookie,
  314. NETPACKET_CONNECTION * NPC, int status, uint8_t packettype,
  315. const uint8_t * packetbuf, size_t packetlen)
  316. {
  317. struct write_file_internal * C = cookie;
  318. (void)packetlen; /* UNUSED */
  319. (void)NPC; /* UNUSED */
  320. /* Handle errors. */
  321. if (status != NETWORK_STATUS_OK) {
  322. netproto_printerr(status);
  323. goto err1;
  324. }
  325. /* Make sure we received the right type of packet. */
  326. if (packettype != NETPACKET_WRITE_FILE_RESPONSE)
  327. goto err2;
  328. /* Verify packet hmac. */
  329. switch (netpacket_hmac_verify(packettype, C->nonce,
  330. packetbuf, 34, CRYPTO_KEY_AUTH_PUT)) {
  331. case 1:
  332. goto err2;
  333. case -1:
  334. goto err1;
  335. }
  336. /* Make sure that the packet corresponds to the right file. */
  337. if ((packetbuf[1] != C->class) ||
  338. (memcmp(&packetbuf[2], C->name, 32)))
  339. goto err2;
  340. /* Parse status returned by server. */
  341. switch (packetbuf[0]) {
  342. case 0:
  343. /* This write operation is no longer pending. */
  344. C->S->nbytespending -= C->flen;
  345. break;
  346. case 1:
  347. warn0("Cannot store file: File already exists");
  348. goto err1;
  349. case 2:
  350. /* Bad nonce. */
  351. warn0("Transaction interrupted");
  352. goto err1;
  353. default:
  354. goto err2;
  355. }
  356. /*
  357. * Send ourself SIGQUIT or SIGUSR2 if necessary. We do this
  358. * here in addition to in storage_write_file because a write will
  359. * use more bandwidth than expected if it needs to be retried.
  360. */
  361. raisesigs(C->S);
  362. /* Free file buffer. */
  363. free(C->filebuf);
  364. /* Free write cookie. */
  365. free(C);
  366. /* Success! */
  367. return (0);
  368. err2:
  369. netproto_printerr(NETPROTO_STATUS_PROTERR);
  370. err1:
  371. free(C->filebuf);
  372. free(C);
  373. /* Failure! */
  374. return (-1);
  375. }
  376. /**
  377. * storage_write_flush(S):
  378. * Make sure all files written as part of the transaction associated with
  379. * the cookie ${S} have been safely stored in preparation for being committed.
  380. * If ${S} is NULL, return 0 without doing anything.
  381. */
  382. int
  383. storage_write_flush(STORAGE_W * S)
  384. {
  385. /* No-op on NULL. */
  386. if (S == NULL)
  387. return (0);
  388. /* Wait until all pending writes have been completed. */
  389. while (S->nbytespending > 0) {
  390. if (network_select(1))
  391. goto err0;
  392. }
  393. /* Success! */
  394. return (0);
  395. err0:
  396. /* Failure! */
  397. return (-1);
  398. }
  399. /**
  400. * storage_write_end(S):
  401. * Make sure all files written as part of the transaction associated with
  402. * the cookie ${S} have been safely stored in preparation for being
  403. * committed; and close the transaction and free associated memory. If ${S}
  404. * is NULL, return 0 without doing anything.
  405. */
  406. int
  407. storage_write_end(STORAGE_W * S)
  408. {
  409. size_t i;
  410. /* No-op on NULL. */
  411. if (S == NULL)
  412. return (0);
  413. /* Flush any pending writes. */
  414. if (storage_write_flush(S))
  415. goto err2;
  416. /* Close netpacket connections. */
  417. for (i = S->numconns - 1; i < S->numconns; i--)
  418. if (netpacket_close(S->NPC[i]))
  419. goto err1;
  420. /* Free structure. */
  421. free(S);
  422. /* Success! */
  423. return (0);
  424. err2:
  425. i = S->numconns;
  426. err1:
  427. for (i--; i < S->numconns; i--)
  428. netpacket_close(S->NPC[i]);
  429. free(S);
  430. /* Failure! */
  431. return (-1);
  432. }
  433. /**
  434. * storage_write_free(S):
  435. * Free any memory allocated as part of the write transaction associated with
  436. * the cookie ${S}; the transaction will not be committed.
  437. */
  438. void
  439. storage_write_free(STORAGE_W * S)
  440. {
  441. size_t i;
  442. /* Behave consistently with free(NULL). */
  443. if (S == NULL)
  444. return;
  445. /* Close netpacket connections. */
  446. for (i = S->numconns - 1; i < S->numconns; i--)
  447. netpacket_close(S->NPC[i]);
  448. /* Free structure. */
  449. free(S);
  450. }