algorithm.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. // Algorithm found at: https://rsync.samba.org/tech_report/tech_report.html
  2. // Code in this file is inspired by: https://github.com/jbreiding/rsync-go
  3. //
  4. // Definitions
  5. //
  6. // Source: The final content.
  7. // Target: The content to be made into final content.
  8. // Signature: The sequence of hashes used to identify the content.
  9. package rsync
  10. import (
  11. "bytes"
  12. "encoding/binary"
  13. "encoding/hex"
  14. "fmt"
  15. "hash"
  16. "io"
  17. "slices"
  18. "strconv"
  19. "github.com/zeebo/xxh3"
  20. )
  21. // If no BlockSize is specified in the rsync instance, this value is used.
  22. const DefaultBlockSize = 1024 * 6
  23. // Internal constant used in rolling checksum.
  24. const _M = 1 << 16
  25. // Operation Types.
  26. type OpType byte // enum
  27. const (
  28. OpBlock OpType = iota
  29. OpData
  30. OpHash
  31. OpBlockRange
  32. )
  33. type xxh3_128 struct {
  34. xxh3.Hasher
  35. }
  36. func (self *xxh3_128) Sum(b []byte) []byte {
  37. s := self.Sum128()
  38. pos := len(b)
  39. limit := pos + 16
  40. if limit > cap(b) {
  41. var x [16]byte
  42. b = append(b, x[:]...)
  43. } else {
  44. b = b[:limit]
  45. }
  46. binary.BigEndian.PutUint64(b[pos:], s.Hi)
  47. binary.BigEndian.PutUint64(b[pos+8:], s.Lo)
  48. return b
  49. }
  50. func new_xxh3_64() hash.Hash64 {
  51. ans := xxh3.New()
  52. ans.Reset()
  53. return ans
  54. }
  55. func new_xxh3_128() hash.Hash {
  56. ans := new(xxh3_128)
  57. ans.Reset()
  58. return ans
  59. }
  60. // Instruction to mutate target to align to source.
  61. type Operation struct {
  62. Type OpType
  63. BlockIndex uint64
  64. BlockIndexEnd uint64
  65. Data []byte
  66. }
  67. func (self Operation) String() string {
  68. ans := "{" + self.Type.String() + " "
  69. switch self.Type {
  70. case OpBlock:
  71. ans += strconv.FormatUint(self.BlockIndex, 10)
  72. case OpBlockRange:
  73. ans += strconv.FormatUint(self.BlockIndex, 10) + " to " + strconv.FormatUint(self.BlockIndexEnd, 10)
  74. case OpData:
  75. ans += strconv.Itoa(len(self.Data))
  76. case OpHash:
  77. ans += hex.EncodeToString(self.Data)
  78. }
  79. return ans + "}"
  80. }
  81. var bin = binary.LittleEndian
  82. func (self Operation) SerializeSize() int {
  83. switch self.Type {
  84. case OpBlock:
  85. return 9
  86. case OpBlockRange:
  87. return 13
  88. case OpHash:
  89. return 3 + len(self.Data)
  90. case OpData:
  91. return 5 + len(self.Data)
  92. }
  93. return -1
  94. }
  95. func (self Operation) Serialize(ans []byte) {
  96. switch self.Type {
  97. case OpBlock:
  98. bin.PutUint64(ans[1:], self.BlockIndex)
  99. case OpBlockRange:
  100. bin.PutUint64(ans[1:], self.BlockIndex)
  101. bin.PutUint32(ans[9:], uint32(self.BlockIndexEnd-self.BlockIndex))
  102. case OpHash:
  103. bin.PutUint16(ans[1:], uint16(len(self.Data)))
  104. copy(ans[3:], self.Data)
  105. case OpData:
  106. bin.PutUint32(ans[1:], uint32(len(self.Data)))
  107. copy(ans[5:], self.Data)
  108. }
  109. ans[0] = byte(self.Type)
  110. }
  111. func (self *Operation) Unserialize(data []byte) (n int, err error) {
  112. if len(data) < 1 {
  113. return -1, io.ErrShortBuffer
  114. }
  115. switch OpType(data[0]) {
  116. case OpBlock:
  117. n = 9
  118. if len(data) < n {
  119. return -1, io.ErrShortBuffer
  120. }
  121. self.BlockIndex = bin.Uint64(data[1:])
  122. self.Data = nil
  123. case OpBlockRange:
  124. n = 13
  125. if len(data) < n {
  126. return -1, io.ErrShortBuffer
  127. }
  128. self.BlockIndex = bin.Uint64(data[1:])
  129. self.BlockIndexEnd = self.BlockIndex + uint64(bin.Uint32(data[9:]))
  130. self.Data = nil
  131. case OpHash:
  132. n = 3
  133. if len(data) < n {
  134. return -1, io.ErrShortBuffer
  135. }
  136. sz := int(bin.Uint16(data[1:]))
  137. n += sz
  138. if len(data) < n {
  139. return -1, io.ErrShortBuffer
  140. }
  141. self.Data = data[3:n]
  142. case OpData:
  143. n = 5
  144. if len(data) < n {
  145. return -1, io.ErrShortBuffer
  146. }
  147. sz := int(bin.Uint32(data[1:]))
  148. n += sz
  149. if len(data) < n {
  150. return -1, io.ErrShortBuffer
  151. }
  152. self.Data = data[5:n]
  153. default:
  154. return 0, fmt.Errorf("record has unknown operation type: %d", data[0])
  155. }
  156. self.Type = OpType(data[0])
  157. return
  158. }
  159. // Signature hash item generated from target.
  160. type BlockHash struct {
  161. Index uint64
  162. WeakHash uint32
  163. StrongHash uint64
  164. }
  165. const BlockHashSize = 20
  166. // Put the serialization of this BlockHash to output
  167. func (self BlockHash) Serialize(output []byte) {
  168. bin.PutUint64(output, self.Index)
  169. bin.PutUint32(output[8:], self.WeakHash)
  170. bin.PutUint64(output[12:], self.StrongHash)
  171. }
  172. func (self *BlockHash) Unserialize(data []byte) (err error) {
  173. if len(data) < 20 {
  174. return fmt.Errorf("record too small to be a BlockHash: %d < %d", len(data), 20)
  175. }
  176. self.Index = bin.Uint64(data)
  177. self.WeakHash = bin.Uint32(data[8:])
  178. self.StrongHash = bin.Uint64(data[12:])
  179. return
  180. }
  181. // Properties to use while working with the rsync algorithm.
  182. // A single rsync should not be used concurrently as it may contain
  183. // internal buffers and hash sums.
  184. type rsync struct {
  185. BlockSize int
  186. // This must be non-nil before using any functions
  187. hasher hash.Hash64
  188. hasher_constructor func() hash.Hash64
  189. checksummer_constructor func() hash.Hash
  190. checksummer hash.Hash
  191. checksum_done bool
  192. buffer []byte
  193. }
  194. func (r *rsync) SetHasher(c func() hash.Hash64) {
  195. r.hasher_constructor = c
  196. r.hasher = c()
  197. }
  198. func (r *rsync) SetChecksummer(c func() hash.Hash) {
  199. r.checksummer_constructor = c
  200. r.checksummer = c()
  201. }
  202. // If the target length is known the number of hashes in the
  203. // signature can be determined.
  204. func (r *rsync) BlockHashCount(targetLength int64) (count int64) {
  205. bs := int64(r.BlockSize)
  206. count = targetLength / bs
  207. if targetLength%bs != 0 {
  208. count++
  209. }
  210. return
  211. }
  212. type signature_iterator struct {
  213. hasher hash.Hash64
  214. buffer []byte
  215. src io.Reader
  216. rc rolling_checksum
  217. index uint64
  218. }
  219. // ans is valid iff err == nil
  220. func (self *signature_iterator) next() (ans BlockHash, err error) {
  221. n, err := io.ReadAtLeast(self.src, self.buffer, cap(self.buffer))
  222. switch err {
  223. case io.ErrUnexpectedEOF, io.EOF, nil:
  224. err = nil
  225. default:
  226. return
  227. }
  228. if n == 0 {
  229. return ans, io.EOF
  230. }
  231. b := self.buffer[:n]
  232. self.hasher.Reset()
  233. self.hasher.Write(b)
  234. ans = BlockHash{Index: self.index, WeakHash: self.rc.full(b), StrongHash: self.hasher.Sum64()}
  235. self.index++
  236. return
  237. }
  238. // Calculate the signature of target.
  239. func (r *rsync) CreateSignatureIterator(target io.Reader) func() (BlockHash, error) {
  240. return (&signature_iterator{
  241. hasher: r.hasher_constructor(), buffer: make([]byte, r.BlockSize), src: target,
  242. }).next
  243. }
  244. // Apply the difference to the target.
  245. func (r *rsync) ApplyDelta(output io.Writer, target io.ReadSeeker, op Operation) error {
  246. var err error
  247. var n int
  248. var block []byte
  249. r.set_buffer_to_size(r.BlockSize)
  250. buffer := r.buffer
  251. if r.checksummer == nil {
  252. r.checksummer = r.checksummer_constructor()
  253. }
  254. write := func(b []byte) (err error) {
  255. if _, err = r.checksummer.Write(b); err == nil {
  256. _, err = output.Write(b)
  257. }
  258. return err
  259. }
  260. write_block := func(op Operation) (err error) {
  261. if _, err = target.Seek(int64(r.BlockSize*int(op.BlockIndex)), io.SeekStart); err != nil {
  262. return err
  263. }
  264. if n, err = io.ReadAtLeast(target, buffer, r.BlockSize); err != nil && err != io.ErrUnexpectedEOF {
  265. return err
  266. }
  267. block = buffer[:n]
  268. return write(block)
  269. }
  270. switch op.Type {
  271. case OpBlockRange:
  272. for i := op.BlockIndex; i <= op.BlockIndexEnd; i++ {
  273. if err = write_block(Operation{
  274. Type: OpBlock,
  275. BlockIndex: i,
  276. }); err != nil {
  277. return err
  278. }
  279. }
  280. case OpBlock:
  281. return write_block(op)
  282. case OpData:
  283. return write(op.Data)
  284. case OpHash:
  285. actual := r.checksummer.Sum(nil)
  286. if !bytes.Equal(actual, op.Data) {
  287. return fmt.Errorf("Failed to verify overall file checksum actual: %s != expected: %s. This usually happens if some data was corrupted in transit or one of the involved files was altered while the transfer was in progress.", hex.EncodeToString(actual), hex.EncodeToString(op.Data))
  288. }
  289. r.checksum_done = true
  290. }
  291. return nil
  292. }
  293. func (r *rsync) set_buffer_to_size(sz int) {
  294. if cap(r.buffer) < sz {
  295. r.buffer = make([]byte, sz)
  296. } else {
  297. r.buffer = r.buffer[:sz]
  298. }
  299. }
  300. // see https://rsync.samba.org/tech_report/node3.html
  301. type rolling_checksum struct {
  302. alpha, beta, val, l uint32
  303. first_byte_of_previous_window uint32
  304. }
  305. func (self *rolling_checksum) full(data []byte) uint32 {
  306. var alpha, beta uint32
  307. self.l = uint32(len(data)) // actually should be len(data) - 1 but the equations always use l+1
  308. for i, b := range data {
  309. alpha += uint32(b)
  310. beta += (self.l - uint32(i)) * uint32(b)
  311. }
  312. self.first_byte_of_previous_window = uint32(data[0])
  313. self.alpha = alpha % _M
  314. self.beta = beta % _M
  315. self.val = self.alpha + _M*self.beta
  316. return self.val
  317. }
  318. func (self *rolling_checksum) add_one_byte(first_byte, last_byte byte) {
  319. self.alpha = (self.alpha - self.first_byte_of_previous_window + uint32(last_byte)) % _M
  320. self.beta = (self.beta - (self.l)*self.first_byte_of_previous_window + self.alpha) % _M
  321. self.val = self.alpha + _M*self.beta
  322. self.first_byte_of_previous_window = uint32(first_byte)
  323. }
  324. type diff struct {
  325. buffer []byte
  326. op_write_buf [32]byte
  327. // A single β hash may correlate with many unique hashes.
  328. hash_lookup map[uint32][]BlockHash
  329. source io.Reader
  330. hasher hash.Hash64
  331. checksummer hash.Hash
  332. output io.Writer
  333. window, data struct{ pos, sz int }
  334. block_size int
  335. finished, written bool
  336. rc rolling_checksum
  337. pending_op *Operation
  338. }
  339. func (self *diff) Next() (err error) {
  340. return self.pump_till_op_written()
  341. }
  342. func (self *diff) hash(b []byte) uint64 {
  343. self.hasher.Reset()
  344. self.hasher.Write(b)
  345. return self.hasher.Sum64()
  346. }
  347. // Combine OpBlock into OpBlockRange. To do this store the previous
  348. // non-data operation and determine if it can be extended.
  349. func (self *diff) send_pending() (err error) {
  350. if self.pending_op != nil {
  351. err = self.send_op(self.pending_op)
  352. self.pending_op = nil
  353. }
  354. return
  355. }
  356. func (self *diff) enqueue(op Operation) (err error) {
  357. switch op.Type {
  358. case OpBlock:
  359. if self.pending_op != nil {
  360. switch self.pending_op.Type {
  361. case OpBlock:
  362. if self.pending_op.BlockIndex+1 == op.BlockIndex {
  363. self.pending_op = &Operation{
  364. Type: OpBlockRange,
  365. BlockIndex: self.pending_op.BlockIndex,
  366. BlockIndexEnd: op.BlockIndex,
  367. }
  368. return
  369. }
  370. case OpBlockRange:
  371. if self.pending_op.BlockIndexEnd+1 == op.BlockIndex {
  372. self.pending_op.BlockIndexEnd = op.BlockIndex
  373. return
  374. }
  375. }
  376. if err = self.send_pending(); err != nil {
  377. return err
  378. }
  379. }
  380. self.pending_op = &op
  381. case OpHash:
  382. if err = self.send_pending(); err != nil {
  383. return
  384. }
  385. if err = self.send_op(&op); err != nil {
  386. return
  387. }
  388. }
  389. return
  390. }
  391. func (self *diff) send_op(op *Operation) error {
  392. b := self.op_write_buf[:op.SerializeSize()]
  393. op.Serialize(b)
  394. self.written = true
  395. _, err := self.output.Write(b)
  396. return err
  397. }
  398. func (self *diff) send_data() error {
  399. if self.data.sz > 0 {
  400. if err := self.send_pending(); err != nil {
  401. return err
  402. }
  403. self.written = true
  404. data := self.buffer[self.data.pos : self.data.pos+self.data.sz]
  405. var buf [5]byte
  406. bin.PutUint32(buf[1:], uint32(len(data)))
  407. buf[0] = byte(OpData)
  408. if _, err := self.output.Write(buf[:]); err != nil {
  409. return err
  410. }
  411. if _, err := self.output.Write(data); err != nil {
  412. return err
  413. }
  414. self.data.pos += self.data.sz
  415. self.data.sz = 0
  416. }
  417. return nil
  418. }
  419. func (self *diff) pump_till_op_written() error {
  420. self.written = false
  421. for !self.finished && !self.written {
  422. if err := self.read_next(); err != nil {
  423. return err
  424. }
  425. }
  426. if self.finished {
  427. if err := self.send_pending(); err != nil {
  428. return err
  429. }
  430. return io.EOF
  431. }
  432. return nil
  433. }
  434. func (self *diff) ensure_idx_valid(idx int) (ok bool, err error) {
  435. if idx < len(self.buffer) {
  436. return true, nil
  437. }
  438. if idx >= cap(self.buffer) {
  439. // need to wrap the buffer, so send off any data present behind the window
  440. if err = self.send_data(); err != nil {
  441. return
  442. }
  443. // copy the window and any data present after it to the start of the buffer
  444. distance_from_window_pos := idx - self.window.pos
  445. amt_to_copy := len(self.buffer) - self.window.pos
  446. copy(self.buffer, self.buffer[self.window.pos:self.window.pos+amt_to_copy])
  447. self.buffer = self.buffer[:amt_to_copy]
  448. self.window.pos = 0
  449. self.data.pos = 0
  450. return self.ensure_idx_valid(distance_from_window_pos)
  451. }
  452. extra := idx - len(self.buffer) + 1
  453. var n int
  454. n, err = io.ReadAtLeast(self.source, self.buffer[len(self.buffer):cap(self.buffer)], extra)
  455. block := self.buffer[len(self.buffer):][:n]
  456. switch err {
  457. case nil:
  458. ok = true
  459. self.buffer = self.buffer[:len(self.buffer)+n]
  460. self.checksummer.Write(block)
  461. case io.ErrUnexpectedEOF, io.EOF:
  462. err = nil
  463. self.buffer = self.buffer[:len(self.buffer)+n]
  464. self.checksummer.Write(block)
  465. }
  466. return
  467. }
  468. func (self *diff) finish_up() (err error) {
  469. if err = self.send_data(); err != nil {
  470. return
  471. }
  472. self.data.pos = self.window.pos
  473. self.data.sz = len(self.buffer) - self.window.pos
  474. if err = self.send_data(); err != nil {
  475. return
  476. }
  477. self.enqueue(Operation{Type: OpHash, Data: self.checksummer.Sum(nil)})
  478. self.finished = true
  479. return
  480. }
  481. // See https://rsync.samba.org/tech_report/node4.html for the design of this algorithm
  482. func (self *diff) read_next() (err error) {
  483. if self.window.sz > 0 {
  484. if ok, err := self.ensure_idx_valid(self.window.pos + self.window.sz); !ok {
  485. if err != nil {
  486. return err
  487. }
  488. return self.finish_up()
  489. }
  490. self.window.pos++
  491. self.data.sz++
  492. self.rc.add_one_byte(self.buffer[self.window.pos], self.buffer[self.window.pos+self.window.sz-1])
  493. } else {
  494. if ok, err := self.ensure_idx_valid(self.window.pos + self.block_size - 1); !ok {
  495. if err != nil {
  496. return err
  497. }
  498. return self.finish_up()
  499. }
  500. self.window.sz = self.block_size
  501. self.rc.full(self.buffer[self.window.pos : self.window.pos+self.window.sz])
  502. }
  503. found_hash := false
  504. var block_index uint64
  505. if hh, ok := self.hash_lookup[self.rc.val]; ok {
  506. block_index, found_hash = find_hash(hh, self.hash(self.buffer[self.window.pos:self.window.pos+self.window.sz]))
  507. }
  508. if found_hash {
  509. if err = self.send_data(); err != nil {
  510. return
  511. }
  512. self.enqueue(Operation{Type: OpBlock, BlockIndex: block_index})
  513. self.window.pos += self.window.sz
  514. self.data.pos = self.window.pos
  515. self.window.sz = 0
  516. }
  517. return nil
  518. }
  519. type OperationWriter struct {
  520. Operations []Operation
  521. expecting_data bool
  522. }
  523. func (self *OperationWriter) Write(p []byte) (n int, err error) {
  524. if self.expecting_data {
  525. self.expecting_data = false
  526. self.Operations = append(self.Operations, Operation{Type: OpData, Data: slices.Clone(p)})
  527. } else {
  528. switch OpType(p[0]) {
  529. case OpData:
  530. self.expecting_data = true
  531. case OpBlock, OpBlockRange, OpHash:
  532. op := Operation{}
  533. if n, err = op.Unserialize(p); err != nil {
  534. return 0, err
  535. } else if n < len(p) {
  536. return 0, io.ErrShortWrite
  537. }
  538. self.Operations = append(self.Operations, op)
  539. default:
  540. return 0, fmt.Errorf("Unknown OpType: %d", p[0])
  541. }
  542. }
  543. return
  544. }
  545. func (r *rsync) CreateDelta(source io.Reader, signature []BlockHash) ([]Operation, error) {
  546. w := OperationWriter{}
  547. it := r.CreateDiff(source, signature, &w)
  548. for {
  549. if err := it(); err != nil {
  550. if err == io.EOF {
  551. return w.Operations, nil
  552. }
  553. return nil, err
  554. }
  555. }
  556. }
  557. const DataSizeMultiple int = 8
  558. func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash, output io.Writer) func() error {
  559. ans := &diff{
  560. block_size: r.BlockSize, buffer: make([]byte, 0, (r.BlockSize * DataSizeMultiple)),
  561. hash_lookup: make(map[uint32][]BlockHash, len(signature)),
  562. source: source, hasher: r.hasher_constructor(),
  563. checksummer: r.checksummer_constructor(), output: output,
  564. }
  565. for _, h := range signature {
  566. key := h.WeakHash
  567. ans.hash_lookup[key] = append(ans.hash_lookup[key], h)
  568. }
  569. return ans.Next
  570. }
  571. // Use a more unique way to identify a set of bytes.
  572. func (r *rsync) hash(v []byte) uint64 {
  573. r.hasher.Reset()
  574. r.hasher.Write(v)
  575. return r.hasher.Sum64()
  576. }
  577. func (r *rsync) HashSize() int { return r.hasher.Size() }
  578. func (r *rsync) HashBlockSize() int { return r.hasher.BlockSize() }
  579. func (r *rsync) HasHasher() bool { return r.hasher != nil }
  580. // Searches for a given strong hash among all strong hashes in this bucket.
  581. func find_hash(hh []BlockHash, hv uint64) (uint64, bool) {
  582. for _, block := range hh {
  583. if block.StrongHash == hv {
  584. return block.Index, true
  585. }
  586. }
  587. return 0, false
  588. }
  589. func min(a, b int) int {
  590. if a < b {
  591. return a
  592. }
  593. return b
  594. }