api.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
  2. // First create a patcher with:
  3. // p = NewPatcher()
  4. // Create a signature for the file you want to update using
  5. // p.CreateSignatureIterator(file_to_update)
  6. // Now create a Differ with the created signature
  7. // d = NewDiffer()
  8. // d.AddSignatureData(signature_data_from_previous_step)
  9. // Now create a delta based on the signature and the reference file
  10. // d.CreateDelta(reference_file)
  11. // Finally, apply this delta using the patcher to produce a file identical to reference_file
  12. // based ont he delta data and file_to_update
  13. // p.StartDelta(output_file, file_to_update)
  14. // p.UpdateDelta(...)
  15. // p.FinishDelta()
  16. package rsync
  17. import (
  18. "fmt"
  19. "io"
  20. "math"
  21. "kitty/tools/utils"
  22. )
  23. var _ = fmt.Print
  24. const MaxBlockSize int = 1024 * 1024 // sqrt of 1TB
  25. type StrongHashType uint16
  26. type WeakHashType uint16
  27. type ChecksumType uint16
  28. const (
  29. XXH3 StrongHashType = iota
  30. )
  31. const (
  32. XXH3128Sum ChecksumType = iota
  33. )
  34. const (
  35. Rsync WeakHashType = iota
  36. )
  37. type GrowBufferFunction = func(slice []byte, sz int) []byte
  38. type Api struct {
  39. rsync rsync
  40. signature []BlockHash
  41. Checksum_type ChecksumType
  42. Strong_hash_type StrongHashType
  43. Weak_hash_type WeakHashType
  44. }
  45. type Differ struct {
  46. Api
  47. unconsumed_signature_data []byte
  48. }
  49. type Patcher struct {
  50. Api
  51. unconsumed_delta_data []byte
  52. expected_input_size_for_signature_generation int64
  53. delta_output io.Writer
  54. delta_input io.ReadSeeker
  55. total_data_in_delta int
  56. }
  57. // internal implementation {{{
  58. func (self *Api) read_signature_header(data []byte) (consumed int, err error) {
  59. if len(data) < 12 {
  60. return -1, io.ErrShortBuffer
  61. }
  62. if version := bin.Uint16(data); version != 0 {
  63. return consumed, fmt.Errorf("Invalid version in signature header: %d", version)
  64. }
  65. switch csum := ChecksumType(bin.Uint16(data[2:])); csum {
  66. case XXH3128Sum:
  67. self.Checksum_type = XXH3128Sum
  68. self.rsync.SetChecksummer(new_xxh3_128)
  69. default:
  70. return consumed, fmt.Errorf("Invalid checksum_type in signature header: %d", csum)
  71. }
  72. switch strong_hash := StrongHashType(bin.Uint16(data[4:])); strong_hash {
  73. case XXH3:
  74. self.Strong_hash_type = strong_hash
  75. self.rsync.SetHasher(new_xxh3_64)
  76. default:
  77. return consumed, fmt.Errorf("Invalid strong_hash in signature header: %d", strong_hash)
  78. }
  79. switch weak_hash := WeakHashType(bin.Uint16(data[6:])); weak_hash {
  80. case Rsync:
  81. self.Weak_hash_type = weak_hash
  82. default:
  83. return consumed, fmt.Errorf("Invalid weak_hash in signature header: %d", weak_hash)
  84. }
  85. block_size := int(bin.Uint32(data[8:]))
  86. consumed = 12
  87. if block_size == 0 {
  88. return consumed, fmt.Errorf("rsync signature header has zero block size")
  89. }
  90. if block_size > MaxBlockSize {
  91. return consumed, fmt.Errorf("rsync signature header has too large block size %d > %d", block_size, MaxBlockSize)
  92. }
  93. self.rsync.BlockSize = block_size
  94. self.signature = make([]BlockHash, 0, 1024)
  95. return
  96. }
  97. func (self *Api) read_signature_blocks(data []byte) (consumed int) {
  98. block_hash_size := self.rsync.HashSize() + 12
  99. for ; len(data) >= block_hash_size; data = data[block_hash_size:] {
  100. bl := BlockHash{}
  101. bl.Unserialize(data[:block_hash_size])
  102. self.signature = append(self.signature, bl)
  103. consumed += block_hash_size
  104. }
  105. return
  106. }
  107. func (self *Differ) FinishSignatureData() (err error) {
  108. if len(self.unconsumed_signature_data) > 0 {
  109. return fmt.Errorf("There were %d leftover bytes in the signature data", len(self.unconsumed_signature_data))
  110. }
  111. self.unconsumed_signature_data = nil
  112. if !self.rsync.HasHasher() {
  113. return fmt.Errorf("No header was found in the signature data")
  114. }
  115. return
  116. }
  117. func (self *Patcher) update_delta(data []byte) (consumed int, err error) {
  118. op := Operation{}
  119. for len(data) > 0 {
  120. n, uerr := op.Unserialize(data)
  121. if uerr == nil {
  122. consumed += n
  123. data = data[n:]
  124. if err = self.rsync.ApplyDelta(self.delta_output, self.delta_input, op); err != nil {
  125. return
  126. }
  127. if op.Type == OpData {
  128. self.total_data_in_delta += len(op.Data)
  129. }
  130. } else {
  131. if n < 0 {
  132. return consumed, nil
  133. }
  134. return consumed, uerr
  135. }
  136. }
  137. return
  138. }
  139. // }}}
  140. // Start applying serialized delta
  141. func (self *Patcher) StartDelta(delta_output io.Writer, delta_input io.ReadSeeker) {
  142. self.delta_output = delta_output
  143. self.delta_input = delta_input
  144. self.total_data_in_delta = 0
  145. self.unconsumed_delta_data = nil
  146. }
  147. // Apply a chunk of delta data
  148. func (self *Patcher) UpdateDelta(data []byte) (err error) {
  149. self.unconsumed_delta_data = append(self.unconsumed_delta_data, data...)
  150. consumed, err := self.update_delta(self.unconsumed_delta_data)
  151. if err != nil {
  152. return err
  153. }
  154. self.unconsumed_delta_data = utils.ShiftLeft(self.unconsumed_delta_data, consumed)
  155. return
  156. }
  157. // Finish applying delta data
  158. func (self *Patcher) FinishDelta() (err error) {
  159. if err = self.UpdateDelta([]byte{}); err != nil {
  160. return err
  161. }
  162. if len(self.unconsumed_delta_data) > 0 {
  163. return fmt.Errorf("There are %d leftover bytes in the delta", len(self.unconsumed_delta_data))
  164. }
  165. self.delta_input = nil
  166. self.delta_output = nil
  167. self.unconsumed_delta_data = nil
  168. if !self.rsync.checksum_done {
  169. return fmt.Errorf("The checksum was not received at the end of the delta data")
  170. }
  171. return
  172. }
  173. // Create a signature for the data source in src.
  174. func (self *Patcher) CreateSignatureIterator(src io.Reader, output io.Writer) func() error {
  175. var it func() (BlockHash, error)
  176. finished := false
  177. var b [BlockHashSize]byte
  178. return func() error {
  179. if finished {
  180. return io.EOF
  181. }
  182. if it == nil { // write signature header
  183. it = self.rsync.CreateSignatureIterator(src)
  184. bin.PutUint16(b[:], 0)
  185. bin.PutUint16(b[2:], uint16(self.Checksum_type))
  186. bin.PutUint16(b[4:], uint16(self.Strong_hash_type))
  187. bin.PutUint16(b[6:], uint16(self.Weak_hash_type))
  188. bin.PutUint32(b[8:], uint32(self.rsync.BlockSize))
  189. if _, err := output.Write(b[:12]); err != nil {
  190. return err
  191. }
  192. }
  193. bl, err := it()
  194. switch err {
  195. case io.EOF:
  196. finished = true
  197. return io.EOF
  198. case nil:
  199. bl.Serialize(b[:BlockHashSize])
  200. _, err = output.Write(b[:BlockHashSize])
  201. return err
  202. default:
  203. return err
  204. }
  205. }
  206. }
  207. // Create a serialized delta based on the previously loaded signature
  208. func (self *Differ) CreateDelta(src io.Reader, output io.Writer) func() error {
  209. if err := self.FinishSignatureData(); err != nil {
  210. return func() error { return err }
  211. }
  212. if self.signature == nil {
  213. return func() error {
  214. return fmt.Errorf("Cannot call CreateDelta() before loading a signature")
  215. }
  216. }
  217. return self.rsync.CreateDiff(src, self.signature, output)
  218. }
  219. func (self *Differ) BlockSize() int {
  220. return self.rsync.BlockSize
  221. }
  222. // Add more external signature data
  223. func (self *Differ) AddSignatureData(data []byte) (err error) {
  224. self.unconsumed_signature_data = append(self.unconsumed_signature_data, data...)
  225. if !self.rsync.HasHasher() {
  226. consumed, err := self.read_signature_header(self.unconsumed_signature_data)
  227. if err != nil {
  228. if consumed < 0 {
  229. return nil
  230. }
  231. return err
  232. }
  233. self.unconsumed_signature_data = utils.ShiftLeft(self.unconsumed_signature_data, consumed)
  234. }
  235. consumed := self.read_signature_blocks(self.unconsumed_signature_data)
  236. self.unconsumed_signature_data = utils.ShiftLeft(self.unconsumed_signature_data, consumed)
  237. return nil
  238. }
  239. // Use to calculate a delta based on a supplied signature, via AddSignatureData
  240. func NewDiffer() *Differ {
  241. return &Differ{}
  242. }
  243. // Use to create a signature and possibly apply a delta
  244. func NewPatcher(expected_input_size int64) (ans *Patcher) {
  245. bs := DefaultBlockSize
  246. sz := max(0, expected_input_size)
  247. if sz > 0 {
  248. bs = int(math.Round(math.Sqrt(float64(sz))))
  249. }
  250. ans = &Patcher{}
  251. ans.rsync.BlockSize = min(bs, MaxBlockSize)
  252. ans.rsync.SetHasher(new_xxh3_64)
  253. ans.rsync.SetChecksummer(new_xxh3_128)
  254. if ans.rsync.HashBlockSize() > 0 && ans.rsync.HashBlockSize() < ans.rsync.BlockSize {
  255. ans.rsync.BlockSize = (ans.rsync.BlockSize / ans.rsync.HashBlockSize()) * ans.rsync.HashBlockSize()
  256. }
  257. ans.expected_input_size_for_signature_generation = sz
  258. return
  259. }