pdiff.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. import asyncio
  2. import collections
  3. import os
  4. import subprocess
  5. import sys
  6. import tempfile
  7. import apt_pkg
  8. from daklib.dakapt import DakHashes
  9. HASH_FIELDS = [
  10. ("SHA1-History", 0, 1, "", True),
  11. ("SHA256-History", 0, 2, "", True),
  12. ("SHA1-Patches", 1, 1, "", True),
  13. ("SHA256-Patches", 1, 2, "", True),
  14. ("SHA1-Download", 2, 1, ".gz", True),
  15. ("SHA256-Download", 2, 2, ".gz", True),
  16. ("X-Unmerged-SHA1-History", 0, 1, "", False),
  17. ("X-Unmerged-SHA256-History", 0, 2, "", False),
  18. ("X-Unmerged-SHA1-Patches", 1, 1, "", False),
  19. ("X-Unmerged-SHA256-Patches", 1, 2, "", False),
  20. ("X-Unmerged-SHA1-Download", 2, 1, ".gz", False),
  21. ("X-Unmerged-SHA256-Download", 2, 2, ".gz", False),
  22. ]
  23. HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
  24. _PDiffHashes = collections.namedtuple("_PDiffHashes", ["size", "sha1", "sha256"])
  25. async def asyncio_check_call(*args, **kwargs):
  26. """async variant of subprocess.check_call
  27. Parameters reflect that of asyncio.create_subprocess_exec or
  28. (if "shell=True") that of asyncio.create_subprocess_shell
  29. with restore_signals=True being the default.
  30. """
  31. kwargs.setdefault("restore_signals", True)
  32. shell = kwargs.pop("shell", False)
  33. if shell:
  34. proc = await asyncio.create_subprocess_shell(*args, **kwargs)
  35. else:
  36. proc = await asyncio.create_subprocess_exec(*args, **kwargs)
  37. retcode = await proc.wait()
  38. if retcode != 0:
  39. raise subprocess.CalledProcessError(retcode, args[0])
  40. return 0
  41. async def open_decompressed(file, named_temp_file=False):
  42. async def call_decompressor(cmd, inpath):
  43. fh = (
  44. tempfile.NamedTemporaryFile("w+")
  45. if named_temp_file
  46. else tempfile.TemporaryFile("w+")
  47. )
  48. with open(inpath, "rb") as rfd:
  49. await asyncio_check_call(
  50. *cmd,
  51. stdin=rfd,
  52. stdout=fh,
  53. )
  54. fh.seek(0)
  55. return fh
  56. if os.path.isfile(file):
  57. return open(file, "r")
  58. elif os.path.isfile("%s.gz" % file):
  59. return await call_decompressor(["zcat"], "{}.gz".format(file))
  60. elif os.path.isfile("%s.bz2" % file):
  61. return await call_decompressor(["bzcat"], "{}.bz2".format(file))
  62. elif os.path.isfile("%s.xz" % file):
  63. return await call_decompressor(["xzcat"], "{}.xz".format(file))
  64. elif os.path.isfile(f"{file}.zst"):
  65. return await call_decompressor(["zstdcat"], f"{file}.zst")
  66. else:
  67. return None
  68. async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
  69. """Merge two pdiff in to a merged pdiff
  70. While rred support merging more than 2, we only need support for merging two.
  71. In the steady state, we will have N merged patches plus 1 new patch. Here
  72. we need to do N pairwise merges (i.e. merge two patches N times).
  73. Therefore, supporting merging of 3+ patches does not help at all.
  74. The setup state looks like it could do with a bulk merging. However, if you
  75. merge from "latest to earliest" then you will be building in optimal order
  76. and still only need to do N-1 pairwise merges (rather than N-1 merges
  77. between N, N-1, N-2, ... 3, 2 patches).
  78. Combined, supporting pairwise merges is sufficient for our use case.
  79. """
  80. with await open_decompressed(
  81. patch_a, named_temp_file=True
  82. ) as fd_a, await open_decompressed(patch_b, named_temp_file=True) as fd_b:
  83. await asyncio_check_call(
  84. "/usr/lib/apt/methods/rred %s %s | gzip -9n > %s"
  85. % (fd_a.name, fd_b.name, resulting_patch_without_extension + ".gz"),
  86. shell=True,
  87. )
  88. class PDiffHashes(_PDiffHashes):
  89. @classmethod
  90. def from_file(cls, fd):
  91. size = os.fstat(fd.fileno())[6]
  92. hashes = DakHashes(fd)
  93. return cls(size, hashes.sha1, hashes.sha256)
  94. async def _pdiff_hashes_from_patch(path_without_extension):
  95. with await open_decompressed(path_without_extension) as difff:
  96. hashes_decompressed = PDiffHashes.from_file(difff)
  97. with open(path_without_extension + ".gz", "r") as difffgz:
  98. hashes_compressed = PDiffHashes.from_file(difffgz)
  99. return hashes_decompressed, hashes_compressed
  100. def _prune_history(order, history, maximum):
  101. cnt = len(order)
  102. if cnt <= maximum:
  103. return order
  104. for h in order[: cnt - maximum]:
  105. del history[h]
  106. return order[cnt - maximum :]
  107. def _read_hashes(history, history_order, ind, hashind, lines):
  108. current_order = []
  109. for line in lines:
  110. parts = line.split()
  111. fname = parts[2]
  112. if fname.endswith(".gz"):
  113. fname = fname[:-3]
  114. current_order.append(fname)
  115. if fname not in history:
  116. history[fname] = [None, None, None]
  117. if not history[fname][ind]:
  118. history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
  119. if hashind == 1:
  120. history[fname][ind] = PDiffHashes(
  121. history[fname][ind].size,
  122. parts[0],
  123. history[fname][ind].sha256,
  124. )
  125. else:
  126. history[fname][ind] = PDiffHashes(
  127. history[fname][ind].size,
  128. history[fname][ind].sha1,
  129. parts[0],
  130. )
  131. # Common-case: Either this is the first sequence we read and we
  132. # simply adopt that
  133. if not history_order:
  134. return current_order
  135. # Common-case: The current history perfectly matches the existing, so
  136. # we just stop here.
  137. if current_order == history_order:
  138. return history_order
  139. # Special-case, the histories are not aligned. This "should not happen"
  140. # but has done so in the past due to bugs. Depending on which field is
  141. # out of sync, dak would either self heal or be stuff forever. We
  142. # realign the history to ensure we always end with "self-heal".
  143. #
  144. # Typically, the patches are aligned from the end as we always add a
  145. # patch in the end of the series.
  146. patches_from_the_end = 0
  147. for p1, p2 in zip(reversed(current_order), reversed(history_order)):
  148. if p1 == p2:
  149. patches_from_the_end += 1
  150. else:
  151. break
  152. if not patches_from_the_end:
  153. return None
  154. return current_order[-patches_from_the_end:]
  155. class PDiffIndex:
  156. def __init__(self, patches_dir, max=56, merge_pdiffs=False):
  157. self.can_path = None
  158. self._history = {}
  159. self._history_order = []
  160. self._unmerged_history = {}
  161. self._unmerged_history_order = []
  162. self._old_merged_patches_prefix = []
  163. self.max = max
  164. self.patches_dir = patches_dir
  165. self.filesizehashes = None
  166. self.wants_merged_pdiffs = merge_pdiffs
  167. self.has_merged_pdiffs = False
  168. self.index_path = os.path.join(patches_dir, "Index")
  169. self.read_index_file(self.index_path)
  170. async def generate_and_add_patch_file(
  171. self, original_file, new_file_uncompressed, patch_name
  172. ):
  173. with await open_decompressed(original_file) as oldf:
  174. oldsizehashes = PDiffHashes.from_file(oldf)
  175. with open(new_file_uncompressed, "r") as newf:
  176. newsizehashes = PDiffHashes.from_file(newf)
  177. if newsizehashes == oldsizehashes:
  178. return
  179. if not os.path.isdir(self.patches_dir):
  180. os.mkdir(self.patches_dir)
  181. oldf.seek(0)
  182. patch_path = os.path.join(self.patches_dir, patch_name)
  183. with open("{}.gz".format(patch_path), "wb") as fh:
  184. await asyncio_check_call(
  185. "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(
  186. new_file_uncompressed
  187. ),
  188. shell=True,
  189. stdin=oldf,
  190. stdout=fh,
  191. )
  192. difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
  193. self.filesizehashes = newsizehashes
  194. self._unmerged_history[patch_name] = [
  195. oldsizehashes,
  196. difsizehashes,
  197. difgzsizehashes,
  198. ]
  199. self._unmerged_history_order.append(patch_name)
  200. if self.has_merged_pdiffs != self.wants_merged_pdiffs:
  201. # Convert patches
  202. if self.wants_merged_pdiffs:
  203. await self._convert_to_merged_patches()
  204. else:
  205. self._convert_to_unmerged()
  206. # Conversion also covers the newly added patch. Accordingly,
  207. # the elif here.
  208. else:
  209. second_patch_name = patch_name
  210. if self.wants_merged_pdiffs:
  211. await self._bump_merged_patches()
  212. second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
  213. os.link(
  214. os.path.join(self.patches_dir, patch_name + ".gz"),
  215. os.path.join(self.patches_dir, second_patch_name + ".gz"),
  216. )
  217. # Without merged PDiffs, keep _history and _unmerged_history aligned
  218. self._history[second_patch_name] = [
  219. oldsizehashes,
  220. difsizehashes,
  221. difgzsizehashes,
  222. ]
  223. self._history_order.append(second_patch_name)
  224. async def _bump_merged_patches(self):
  225. # When bumping patches, we need to "rewrite" all merged patches. As
  226. # neither apt nor dak supports by-hash for pdiffs, we leave the old
  227. # versions of merged pdiffs behind.
  228. target_name = self._unmerged_history_order[-1]
  229. target_path = os.path.join(self.patches_dir, target_name)
  230. new_merged_order = []
  231. new_merged_history = {}
  232. for old_merged_patch_name in self._history_order:
  233. try:
  234. old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
  235. except IndexError:
  236. old_orig_name = old_merged_patch_name
  237. new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
  238. old_merged_patch_path = os.path.join(
  239. self.patches_dir, old_merged_patch_name
  240. )
  241. new_merged_patch_path = os.path.join(
  242. self.patches_dir, new_merged_patch_name
  243. )
  244. await _merge_pdiffs(
  245. old_merged_patch_path, target_path, new_merged_patch_path
  246. )
  247. hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(
  248. new_merged_patch_path
  249. )
  250. new_merged_history[new_merged_patch_name] = [
  251. self._history[old_merged_patch_name][0],
  252. hashes_decompressed,
  253. hashes_compressed,
  254. ]
  255. new_merged_order.append(new_merged_patch_name)
  256. self._history_order = new_merged_order
  257. self._history = new_merged_history
  258. self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
  259. def _convert_to_unmerged(self):
  260. if not self.has_merged_pdiffs:
  261. return
  262. # Converting from merged patches to unmerged patches is simply. Discard the merged
  263. # patches. Cleanup will be handled by find_obsolete_patches
  264. self._history = {k: v for k, v in self._unmerged_history.items()}
  265. self._history_order = list(self._unmerged_history_order)
  266. self._old_merged_patches_prefix = []
  267. self.has_merged_pdiffs = False
  268. async def _convert_to_merged_patches(self):
  269. if self.has_merged_pdiffs:
  270. return
  271. target_name = self._unmerged_history_order[-1]
  272. self._history = {}
  273. self._history_order = []
  274. new_patches = []
  275. # We merge from newest to oldest
  276. #
  277. # Assume we got N unmerged patches (u1 - uN) where given s1 then
  278. # you can apply u1 to get to s2. From s2 you use u2 to move to s3
  279. # and so on until you reach your target T (= sN+1).
  280. #
  281. # In the merged patch world, we want N merged patches called m1-N,
  282. # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
  283. # T directly regardless of where you start.
  284. #
  285. # A note worthy special case is that m(N-1)-N is identical uN
  286. # content-wise. This will be important in a moment. For now,
  287. # lets start with looking at creating merged patches.
  288. #
  289. # We can get m1-N by merging u1 with m2-N because u1 will take s1
  290. # to s2 and m2-N will take s2 to T. By the same argument, we get
  291. # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
  292. # we get to the base-case m(N-1)-N - which is uN.
  293. #
  294. # From this, we can conclude that generating the patches in
  295. # reverse order (i.e. m2-N is generated before m1-N) will get
  296. # us the desired result in N-1 pair-wise merges without having
  297. # to use all patches in one go. (This is also optimal in the
  298. # sense that we need to update N-1 patches to preserve the
  299. # entire history).
  300. #
  301. for patch_name in reversed(self._unmerged_history_order):
  302. merged_patch = "T-%s-F-%s" % (target_name, patch_name)
  303. merged_patch_path = os.path.join(self.patches_dir, merged_patch)
  304. if new_patches:
  305. oldest_patch = os.path.join(self.patches_dir, patch_name)
  306. previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
  307. await _merge_pdiffs(
  308. oldest_patch, previous_merged_patch, merged_patch_path
  309. )
  310. hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(
  311. merged_patch_path
  312. )
  313. self._history[merged_patch] = [
  314. self._unmerged_history[patch_name][0],
  315. hashes_decompressed,
  316. hashes_compressed,
  317. ]
  318. else:
  319. # Special_case; the latest patch is its own "merged" variant.
  320. os.link(
  321. os.path.join(self.patches_dir, patch_name + ".gz"),
  322. merged_patch_path + ".gz",
  323. )
  324. self._history[merged_patch] = self._unmerged_history[patch_name]
  325. new_patches.append(merged_patch)
  326. self._history_order = list(reversed(new_patches))
  327. self._old_merged_patches_prefix.append(target_name)
  328. self.has_merged_pdiffs = True
  329. def read_index_file(self, index_file_path):
  330. try:
  331. with apt_pkg.TagFile(index_file_path) as index:
  332. index.step()
  333. section = index.section
  334. self.has_merged_pdiffs = section.get("X-Patch-Precedence") == "merged"
  335. self._old_merged_patches_prefix = section.get(
  336. "X-DAK-Older-Patches", ""
  337. ).split()
  338. for field in section.keys():
  339. value = section[field]
  340. if field in HASH_FIELDS_TABLE:
  341. ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
  342. if primary_history:
  343. history = self._history
  344. history_order = self._history_order
  345. else:
  346. history = self._unmerged_history
  347. history_order = self._unmerged_history_order
  348. if history_order is None:
  349. # History is already misaligned and we cannot find a common restore point.
  350. continue
  351. new_order = _read_hashes(
  352. history, history_order, ind, hashind, value.splitlines()
  353. )
  354. if primary_history:
  355. self._history_order = new_order
  356. else:
  357. self._unmerged_history_order = new_order
  358. continue
  359. if field in ("Canonical-Name", "Canonical-Path"):
  360. self.can_path = value
  361. continue
  362. if field not in ("SHA1-Current", "SHA256-Current"):
  363. continue
  364. columns = value.split()
  365. if len(columns) != 2:
  366. continue
  367. if not self.filesizehashes:
  368. self.filesizehashes = PDiffHashes(int(columns[1]), None, None)
  369. if field == "SHA1-Current":
  370. self.filesizehashes = PDiffHashes(
  371. self.filesizehashes.size,
  372. columns[0],
  373. self.filesizehashes.sha256,
  374. )
  375. if field == "SHA256-Current":
  376. self.filesizehashes = PDiffHashes(
  377. self.filesizehashes.size,
  378. self.filesizehashes.sha1,
  379. columns[0],
  380. )
  381. # Ensure that the order lists are defined again.
  382. if self._history_order is None:
  383. self._history_order = []
  384. if self._unmerged_history_order is None:
  385. self._unmerged_history_order = []
  386. if not self.has_merged_pdiffs:
  387. # When X-Patch-Precedence != merged, then the two histories are the same.
  388. self._unmerged_history = {k: v for k, v in self._history.items()}
  389. self._unmerged_history_order = list(self._history_order)
  390. self._old_merged_patches_prefix = []
  391. except (OSError, apt_pkg.Error):
  392. # On error, we ignore everything. This causes the file to be regenerated from scratch.
  393. # It forces everyone to download the full file for if they are behind.
  394. # But it is self-healing providing that we generate valid files from here on.
  395. pass
  396. def prune_patch_history(self):
  397. # Truncate our history if necessary
  398. hs = self._history
  399. order = self._history_order
  400. unmerged_hs = self._unmerged_history
  401. unmerged_order = self._unmerged_history_order
  402. self._history_order = _prune_history(order, hs, self.max)
  403. self._unmerged_history_order = _prune_history(
  404. unmerged_order, unmerged_hs, self.max
  405. )
  406. prefix_cnt = len(self._old_merged_patches_prefix)
  407. if prefix_cnt > 3:
  408. self._old_merged_patches_prefix = self._old_merged_patches_prefix[
  409. prefix_cnt - 3 :
  410. ]
  411. def find_obsolete_patches(self):
  412. if not os.path.isdir(self.patches_dir):
  413. return
  414. hs = self._history
  415. unmerged_hs = self._unmerged_history
  416. keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
  417. # Scan for obsolete patches. While we could have computed these
  418. # from the history, this method has the advantage of cleaning up
  419. # old patches left that we failed to remove previously (e.g. if
  420. # we had an index corruption, which happened in fed7ada36b609 and
  421. # was later fixed in a36f867acf029)
  422. for name in os.listdir(self.patches_dir):
  423. if name in ("Index", "by-hash"):
  424. continue
  425. # We keep some old merged patches around (as neither apt nor
  426. # dak supports by-hash for pdiffs)
  427. if keep_prefixes and name.startswith(keep_prefixes):
  428. continue
  429. basename, ext = os.path.splitext(name)
  430. if ext in ("", ".gz") and (basename in hs or basename in unmerged_hs):
  431. continue
  432. path = os.path.join(self.patches_dir, name)
  433. if not os.path.isfile(path):
  434. # Non-files are probably not patches.
  435. continue
  436. # Unknown patch file; flag it as obsolete
  437. yield path
  438. def dump(self, out=sys.stdout):
  439. if self.can_path:
  440. out.write("Canonical-Path: %s\n" % self.can_path)
  441. if self.filesizehashes:
  442. if self.filesizehashes.sha1:
  443. out.write(
  444. "SHA1-Current: %s %7d\n"
  445. % (self.filesizehashes.sha1, self.filesizehashes.size)
  446. )
  447. if self.filesizehashes.sha256:
  448. out.write(
  449. "SHA256-Current: %s %7d\n"
  450. % (self.filesizehashes.sha256, self.filesizehashes.size)
  451. )
  452. for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
  453. if primary_history:
  454. hs = self._history
  455. order = self._history_order
  456. elif self.has_merged_pdiffs:
  457. hs = self._unmerged_history
  458. order = self._unmerged_history_order
  459. else:
  460. continue
  461. out.write("%s:\n" % fieldname)
  462. for h in order:
  463. if hs[h][ind] and hs[h][ind][hashind]:
  464. out.write(
  465. " %s %7d %s%s\n"
  466. % (hs[h][ind][hashind], hs[h][ind].size, h, ext)
  467. )
  468. if self.has_merged_pdiffs:
  469. out.write("X-Patch-Precedence: merged\n")
  470. if self._old_merged_patches_prefix:
  471. out.write(
  472. "X-DAK-Older-Patches: %s\n"
  473. % " ".join(self._old_merged_patches_prefix)
  474. )
  475. def update_index(self, tmp_suffix=".new"):
  476. if not os.path.isdir(self.patches_dir):
  477. # If there is no patch directory, then we have no patches.
  478. # It seems weird to have an Index of patches when we know there are
  479. # none.
  480. return
  481. tmp_path = self.index_path + tmp_suffix
  482. with open(tmp_path, "w") as f:
  483. self.dump(f)
  484. os.rename(tmp_path, self.index_path)