fcp.scm 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. #!/usr/bin/env bash
  2. # -*- scheme -*-
  3. # A Freenet Client Protocol library for Guile Scheme.
  4. exec -a "${0}" guile -L $(dirname $(realpath "$0")) -e '(fcp)' -c '' "${@}"
  5. ; !#
  6. ;; for emacs (defun test-this-file () (interactive) (save-current-buffer) (async-shell-command (concat (buffer-file-name (current-buffer)) " --test")))
  7. (define-module (fcp)
  8. #:export
  9. (main
  10. message-create message-task message-type message-data message-fields
  11. message-client-get message-client-get-realtime message-client-get-bulk
  12. message-client-put message-client-put-realtime message-client-put-bulk
  13. message-remove-request
  14. send-message processor-put! processor-delete!
  15. printing-passthrough-processor printing-discarding-processor
  16. discarding-processor processor-nodehello-printer
  17. processor-datafound-getdata
  18. node-ip-set! node-port-set!
  19. task-id
  20. call-with-fcp-connection with-fcp-connection))
  21. (define version "0.0.0 just-do-it")
  22. (import
  23. (only (srfi srfi-19) current-date date->string string->date date->time-utc time-utc->date
  24. make-time time-utc time-duration add-duration current-time)
  25. (only (securepassword) letterblocks-nice)
  26. (only (srfi srfi-9) define-record-type)
  27. (only (srfi srfi-9 gnu) set-record-type-printer!)
  28. (only (ice-9 pretty-print) pretty-print truncated-print)
  29. (only (ice-9 rdelim) read-line read-delimited)
  30. (only (ice-9 format) format)
  31. (only (srfi srfi-1) first second third alist-cons assoc lset<= lset-intersection lset-difference take)
  32. (only (rnrs bytevectors) make-bytevector bytevector-length string->utf8 utf8->string bytevector?)
  33. (only (rnrs io ports) get-bytevector-all get-bytevector-n
  34. put-bytevector bytevector->string port-eof?)
  35. (only (ice-9 popen) open-output-pipe)
  36. (only (ice-9 regex) string-match match:substring)
  37. (ice-9 threads)
  38. (ice-9 atomic)
  39. (only (ice-9 q) make-q enq! deq! q-empty?)
  40. (sxml simple)
  41. (doctests))
  42. (define (string-replace-string s char replacement-string)
  43. (string-join (string-split s char) replacement-string))
  44. (define (replace-KSK-escaped s)
  45. (string-replace-string (string-replace-string s #\+ "-")
  46. #\= "-"))
  47. (define (task-id)
  48. (replace-KSK-escaped (letterblocks-nice 6)))
  49. ;; the shared FCP socket
  50. (define sock #f)
  51. (define ip "127.0.0.1")
  52. (define port "9483")
  53. (define (fcp-socket-create)
  54. (define addrs (getaddrinfo ip port))
  55. (define addr (first addrs))
  56. (define s (socket (addrinfo:fam addr) (addrinfo:socktype addr) (addrinfo:protocol addr)))
  57. (connect s (addrinfo:addr addr))
  58. s)
  59. (define-record-type <message>
  60. (message-create task type data fields )
  61. message?
  62. (task message-task)
  63. (type message-type)
  64. (data message-data)
  65. (fields message-fields ));; avoid duplicates: fred joins duplicate fields with ";" to a single value
  66. ;; use a custom printer which avoids printing the full data
  67. (set-record-type-printer! <message>
  68. (lambda (record port)
  69. (format port "#<<message> task: ~A type: ~A data: ~a, fields: ~A"
  70. (message-task record)
  71. (message-type record)
  72. (if (bytevector? (message-data record))
  73. (format #f "length=~a" (bytevector-length (message-data record)))
  74. (message-data record))
  75. (message-fields record))))
  76. (define (format-field field)
  77. (format #f "~a=~a"
  78. (car field)
  79. (cdr field)))
  80. (define (join-fields fields)
  81. #((tests (test-equal "A=B\nX=V" (join-fields (list (cons 'A "B") (cons 'X 'V))))))
  82. (string-join
  83. (map format-field fields)
  84. "\n"))
  85. (define field-key car)
  86. (define field-value cdr)
  87. (define (field-split s)
  88. (let ((where (string-index s #\=)))
  89. (if where
  90. (cons
  91. (string->symbol (substring/shared s 0 where))
  92. (substring/shared s (+ where 1) (string-length s)))
  93. (cons s ""))))
  94. (define (write-message message sock)
  95. (display (message-type message) sock)
  96. (newline sock)
  97. (when (message-task message)
  98. (format sock "Identifier=~a\n"
  99. (message-task message)))
  100. (when (not (null? (message-fields message)))
  101. (display (join-fields (message-fields message))
  102. sock)
  103. (newline sock))
  104. (cond
  105. ((message-data message)
  106. (format sock "~a\n"
  107. (format-field (cons 'DataLength (bytevector-length (message-data message)))))
  108. (format sock "Data\n")
  109. (put-bytevector sock (message-data message)))
  110. (else
  111. (display 'EndMessage sock)
  112. (newline sock)))
  113. (atomic-box-set! sending-message #f)
  114. ;; avoid overloading the node ;; FIXME: is this actually needed? Just added because it might fix crashes.
  115. (usleep 1000 ));; max of 1000 messages per second
  116. (define (message-client-hello)
  117. (message-create #f 'ClientHello #f
  118. (list (cons 'Name "FetchpullClient" )
  119. (cons 'ExpectedVersion "2.0"))))
  120. (define (message-watch-global)
  121. (message-create #f 'WatchGlobal #f
  122. (list (cons 'Enabled "true" )
  123. (cons 'VerbosityMask 0 ))));; simple progress
  124. (define (message-disconnect)
  125. (message-create #f 'Disconnect #f
  126. (list)))
  127. (define (message-client-get task URI custom-fields)
  128. ;; https://github.com/freenet/wiki/wiki/FCPv2-ClientGet
  129. (message-create task 'ClientGet #f
  130. (append
  131. (list (cons 'URI URI))
  132. '((Verbosity . 0 );; only be informed when the download is finished
  133. (ReturnType . direct)
  134. (Global . true)
  135. (Persistence . reboot))
  136. custom-fields)))
  137. (define (message-client-get-realtime task URI)
  138. (message-client-get task URI
  139. '(
  140. (PriorityClass . 2)
  141. (RealTimeFlag . true)
  142. (FilterData . false)
  143. (MaxRetries . 0))))
  144. (define (message-client-get-bulk task URI)
  145. (message-client-get task URI
  146. '(
  147. (PriorityClass . 3 );; medium
  148. (RealTimeFlag . false)
  149. (FilterData . false)
  150. (MaxRetries . 1 ))));; -1 means: try indefinitely, with ULPR, essentially long polling
  151. (define (message-client-put task URI data custom-fields)
  152. ;; https://github.com/freenet/wiki/wiki/FCPv2-ClientPut
  153. (message-create task 'ClientPut data
  154. (append
  155. (list (cons 'URI URI))
  156. `((Global . true)
  157. (Persistence . reboot)
  158. (UploadFrom . direct))
  159. custom-fields)))
  160. (define (message-client-put-realtime task URI data)
  161. (message-client-put task URI data
  162. '(
  163. (PriorityClass . 2)
  164. (MaxRetries . 0 );; default: 10
  165. (RealTimeFlag . true)
  166. (DontCompress . true)
  167. (ExtraInsertsSingleBlock . 0)
  168. (ExtraInsertsSplitfileHeaderBlock . 0))))
  169. ;; for realtime do NOT send Metadata.ContentType (or set it
  170. ;; to "" -> Metadata.isTrivial()), else you force at least
  171. ;; one level redirect.
  172. (define (message-client-put-bulk task URI data)
  173. (message-client-put task URI data
  174. '(
  175. (PriorityClass . 3 );; medium
  176. (RealTimeFlag . false)
  177. (DontCompress . false))))
  178. (define (message-remove-request task)
  179. (message-create task 'RemoveRequest #f
  180. (list (cons 'Global 'true))))
  181. (define supported-messages
  182. '(NodeHello GetFailed DataFound AllData PutSuccessful PutFailed))
  183. (define ignored-messages ;; TODO: implement support for these messages
  184. '(CompatibilityMode ExpectedDataLength ExpectedHashes ExpectedMIME PersistentGet PersistentPut SendingToNetwork SimpleProgress URIGenerated PersistentRequestRemoved))
  185. (define (log-warning message things)
  186. (format (current-output-port)
  187. "Warning: ~a: ~a\n" message things))
  188. (define (read-message port)
  189. (if (or (port-closed? port) (port-eof? port))
  190. #f
  191. (let loop ((type (string->symbol (read-line port))))
  192. (define DataLength #f)
  193. (define task #f)
  194. (let readlines ((lines (list (read-line port))))
  195. (define line (first lines))
  196. (define field (field-split line))
  197. (when (equal? 'DataLength (field-key field))
  198. (set! DataLength
  199. (field-value field)))
  200. (when (equal? 'Identifier (field-key field))
  201. (set! task
  202. (field-value field)))
  203. ;; pretty-print : list 'line line 'type type
  204. (cond
  205. ((string-index line #\=)
  206. (readlines (cons (read-line port) lines)))
  207. ((member type supported-messages)
  208. (let
  209. (
  210. (data ;; EndMessage has no Data
  211. (if (and DataLength (not (equal? "EndMessage" line)))
  212. (get-bytevector-n port (string->number DataLength))
  213. #f)))
  214. (message-create task type data
  215. (map field-split (cdr lines)))))
  216. (else
  217. (when (not (member type ignored-messages))
  218. (log-warning "unsupported message type" (list type lines)))
  219. (if (port-eof? port)
  220. #f
  221. (loop (string->symbol (read-line port))))))))))
  222. (define next-message
  223. (make-atomic-box #f))
  224. (define sending-message
  225. (make-atomic-box #f))
  226. (define (send-message message)
  227. ;; wait until the message was retrieved. This only replaces if the previous content was #f. take-message-to-send switches takes the messages
  228. (let try ((failed (atomic-box-compare-and-swap! next-message #f message)))
  229. (when failed
  230. (usleep 100)
  231. (try (atomic-box-compare-and-swap! next-message #f message)))))
  232. (define (take-message-to-send)
  233. ;; get the message and reset next-message to #f to allow taking another message
  234. (atomic-box-set! sending-message #t );; set to false again after successful write-message
  235. (atomic-box-swap! next-message #f))
  236. (define message-processors
  237. (make-atomic-box (list)))
  238. (define (process message)
  239. (let loop ((processors (atomic-box-ref message-processors)) (msg message))
  240. (cond
  241. ((not msg)
  242. #f)
  243. ((null? processors)
  244. msg)
  245. (else
  246. (loop (cdr processors)
  247. ((first processors) msg))))))
  248. (define (processor-put! processor)
  249. (let loop ((old (atomic-box-ref message-processors)))
  250. (define old-now (atomic-box-compare-and-swap! message-processors old (cons processor old)))
  251. (when (not (equal? old old-now))
  252. (loop (atomic-box-ref message-processors)))))
  253. (define (processor-delete! processor)
  254. (let loop ((old (atomic-box-ref message-processors)))
  255. (define old-now (atomic-box-compare-and-swap! message-processors old (delete processor old)))
  256. (when (not (equal? old old-now))
  257. (loop (atomic-box-ref message-processors)))))
  258. (define stop-fcp-threads #f)
  259. (define (fcp-read-loop sock)
  260. (let loop ((message (read-message sock)))
  261. (when message
  262. (warn-unhandled
  263. (process message)))
  264. (usleep 10)
  265. (when (not stop-fcp-threads)
  266. (loop (read-message sock)))))
  267. (define (fcp-write-loop sock)
  268. (let loop ((message (take-message-to-send)))
  269. (if message
  270. (write-message message sock)
  271. (begin
  272. (atomic-box-set! sending-message #f)
  273. (usleep 10)))
  274. (when (not stop-fcp-threads)
  275. (loop (take-message-to-send)))))
  276. (define (warn-unhandled message)
  277. (when message
  278. (format #t ;; avoid writing to the error port elsewhere, that causes multithreading problems. Use current-output-port instead
  279. "Unhandled message ~a: ~A\n"
  280. (message-type message)
  281. message))
  282. #f)
  283. (define (printing-passthrough-processor message)
  284. (pretty-print message)
  285. message)
  286. (define (printing-discarding-processor message)
  287. (pretty-print message)
  288. #f)
  289. (define (discarding-processor message)
  290. #f)
  291. (define (processor-nodehello-printer message)
  292. (cond
  293. ((equal? 'NodeHello (message-type message))
  294. (pretty-print message)
  295. #f)
  296. (else message)))
  297. (define (help args)
  298. (format (current-output-port)
  299. "~a [-i] [--help | --version | --test | YYYY-mm-dd]
  300. Options:
  301. -i load the script and run an interactive REPL."
  302. (first args)))
  303. ;; timing information (alists)
  304. (define get-successful (list))
  305. (define get-failed (list))
  306. (define put-successful (list))
  307. (define put-failed (list))
  308. (define get-alldata (list )); the actual data, for debugging
  309. (define all-found-data-tasks (list))
  310. (define (processor-datafound-getdata message)
  311. (cond
  312. ((equal? 'DataFound (message-type message))
  313. (pretty-print message)
  314. (when (not (member (message-task message) all-found-data-tasks))
  315. (send-message
  316. (message-create (message-task message)
  317. 'GetRequestStatus #f
  318. (list (cons 'Global 'true))))
  319. (set! all-found-data-tasks
  320. (cons (message-task message)
  321. (take all-found-data-tasks (min 100 (length all-found-data-tasks))))))
  322. #f)
  323. (else message)))
  324. (define (processor-record-datafound-time message)
  325. (cond
  326. ((equal? 'DataFound (message-type message))
  327. (let ((task (message-task message)))
  328. (when (not (assoc task get-successful ));; only add if not yet known
  329. (set! get-successful
  330. (alist-cons task (current-time-seconds) get-successful))))
  331. #f)
  332. (else message)))
  333. (define (current-time-seconds)
  334. (car (gettimeofday)))
  335. (define (processor-record-alldata-time message)
  336. (cond
  337. ((equal? 'AllData (message-type message))
  338. (let ((task (message-task message)))
  339. (when (not (assoc task get-successful ));; only add if not yet known
  340. (set! get-successful
  341. (alist-cons task (current-time-seconds) get-successful))))
  342. #f)
  343. (else message)))
  344. (define (processor-record-getfailed-time message)
  345. (cond
  346. ((equal? 'GetFailed (message-type message))
  347. (let ((task (message-task message)))
  348. (when (not (assoc task get-failed ));; only add if not yet known
  349. (set! get-failed
  350. (alist-cons task (current-time-seconds) get-failed))))
  351. #f)
  352. (else message)))
  353. (define (processor-record-putfailed-time message)
  354. (cond
  355. ((equal? 'PutFailed (message-type message))
  356. (let ((task (message-task message)))
  357. (when (not (assoc task put-failed ));; only add if not yet known
  358. (set! put-failed
  359. (alist-cons task (current-time-seconds) put-failed))))
  360. #f)
  361. (else message)))
  362. (define (processor-record-putsuccessful-time message)
  363. (cond
  364. ((equal? 'PutSuccessful (message-type message))
  365. (let ((task (message-task message)))
  366. (when (not (assoc task put-successful ));; only add if not yet known
  367. (set! put-successful
  368. (alist-cons task (current-time-seconds) put-successful))))
  369. #f)
  370. (else message)))
  371. (define (processor-record-identifier-collision-put-time message)
  372. (cond
  373. ((equal? 'IdentifierCollision (message-type message))
  374. (let ((task (message-task message)))
  375. (when (not (assoc task put-failed ));; only add if not yet known
  376. (set! put-failed
  377. (alist-cons task (current-time-seconds) put-failed))))
  378. #f)
  379. (else message)))
  380. (define-record-type <duration-entry>
  381. (duration-entry key duration successful operation mode)
  382. timing-entry?
  383. (key duration-entry-key)
  384. (duration duration-entry-duration)
  385. (successful duration-entry-success)
  386. (operation duration-entry-operation );; get or put
  387. (mode duration-entry-mode ));; realtime bulk speehacks
  388. (define timeout-seconds (* 3600 3 ));; 3 hours maximum wait time
  389. (define (timeout? timeout-seconds start-times)
  390. (and (not (null? start-times))
  391. (pair? (car start-times))
  392. (> (- (current-time-seconds) timeout-seconds)
  393. (cdr (car start-times)))))
  394. (define (remove-all-keys keys)
  395. (define (remove-key key)
  396. (send-message
  397. (message-remove-request key)))
  398. (map remove-key keys))
  399. (define %this-module (current-module))
  400. (define (test)
  401. (processor-put! printing-discarding-processor)
  402. (set! sock (fcp-socket-create))
  403. (let
  404. (
  405. (fcp-read-thread
  406. (begin-thread
  407. (fcp-read-loop sock)))
  408. (fcp-write-thread
  409. (begin-thread
  410. (fcp-write-loop sock))))
  411. (send-message (message-client-hello))
  412. (send-message (message-watch-global))
  413. (send-message (message-client-get-realtime (letterblocks-nice 6) "USK@N82omidQlapADLWIym1u4rXvEQhjoIFbMa5~p1SKoOY,LE3WlYKas1AIdoVX~9wahrTlV5oZYhvJ4AcYYGsBq-w,AQACAAE/irclogs/772/2018-11-23.weechatlog"))
  414. (sleep 30)
  415. (send-message (message-disconnect))
  416. (doctests-testmod %this-module)
  417. (join-thread fcp-write-thread (+ 30 (current-time-seconds)))
  418. (join-thread fcp-read-thread (+ 30 (current-time-seconds)))
  419. (processor-delete! printing-discarding-processor)
  420. (close sock)))
  421. (define (call-with-fcp-connection thunk)
  422. (set! sock (fcp-socket-create))
  423. (set! stop-fcp-threads #f)
  424. (let
  425. (
  426. (fcp-read-thread
  427. (begin-thread
  428. (fcp-read-loop sock)))
  429. (fcp-write-thread
  430. (begin-thread
  431. (fcp-write-loop sock))))
  432. (send-message (message-client-hello))
  433. (send-message (message-watch-global))
  434. (thunk)
  435. (while (or (atomic-box-ref next-message) (atomic-box-ref sending-message))
  436. (format #t "waiting for message to be sent: next-message: ~a , sending: ~a\n" (atomic-box-ref next-message) (atomic-box-ref sending-message))
  437. (sleep 1))
  438. (send-message (message-disconnect))
  439. (set! stop-fcp-threads #t)
  440. (sleep 3)
  441. (close sock)
  442. (join-thread fcp-write-thread (+ 3 (current-time-seconds)))
  443. (join-thread fcp-read-thread (+ 3 (current-time-seconds)))))
  444. ;; FIXME: using new fcp connections in sequential code-parts fails with
  445. ;; ERROR: In procedure display: Wrong type argument in position 2: #<closed: file 7f106e118770>
  446. ;; ERROR: In procedure fport_read: Die Verbindung wurde vom Kommunikationspartner zurückgesetzt
  447. ;; therefore you should only use a single FCP connection for your program.
  448. (define-syntax-rule (with-fcp-connection exp ...)
  449. (call-with-fcp-connection
  450. (λ () exp ...)))
  451. (define (final-action? args)
  452. (if (<= (length args) 1) #f
  453. (cond
  454. ((equal? "--help" (second args))
  455. (help args)
  456. #t)
  457. ((equal? "--version" (second args))
  458. (format (current-output-port)
  459. "~a\n" version)
  460. #t)
  461. ((equal? "--test" (second args))
  462. (test)
  463. #t)
  464. (else #f))))
  465. (define (node-ip-set! node-ip)
  466. (set! ip node-ip))
  467. (define (node-port-set! node-port)
  468. (set! port node-port))
  469. (define (main args)
  470. (define put-task (task-id))
  471. (define get-task (task-id))
  472. (define key (string-append "KSK@" put-task))
  473. (define successful #f)
  474. (define (request-successful-upload message)
  475. (cond
  476. ((equal? 'PutSuccessful (message-type message))
  477. (let ((fields (message-fields message)))
  478. (when (and=> (assoc 'URI fields) (λ (uri) (equal? key (cdr uri))))
  479. (pretty-print message)
  480. (send-message
  481. (message-client-get-realtime get-task key))
  482. (send-message
  483. (message-remove-request (message-task message))))
  484. #f))
  485. (else message)))
  486. (define (record-successful-download message)
  487. (cond
  488. ((equal? 'AllData (message-type message))
  489. (let ((task (message-task message)))
  490. (when (equal? task get-task)
  491. (pretty-print message)
  492. (display "Data: ")
  493. (truncated-print (utf8->string (message-data message)))
  494. (newline)
  495. (set! successful #t)
  496. (send-message
  497. (message-remove-request task)))
  498. #f))
  499. (else message)))
  500. ;; standard processorrs
  501. (processor-put! printing-discarding-processor)
  502. (processor-put! processor-nodehello-printer)
  503. ;; immediately request data from successfull get requests
  504. (processor-put! processor-datafound-getdata)
  505. ;; custom processors
  506. (processor-put! request-successful-upload)
  507. (processor-put! record-successful-download)
  508. (when (not (final-action? args))
  509. (with-fcp-connection
  510. ;; get the ball rolling
  511. (send-message
  512. (message-client-put-realtime put-task key
  513. (string->utf8 (string-append "Hello " key))))
  514. (while (not successful)
  515. (display ".")
  516. (sleep 10)))))