graphics.py 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import os
  4. import random
  5. import tempfile
  6. import time
  7. import unittest
  8. import zlib
  9. from contextlib import suppress
  10. from dataclasses import dataclass
  11. from io import BytesIO
  12. from kitty.fast_data_types import base64_decode, base64_encode, has_avx2, has_sse4_2, load_png_data, shm_unlink, shm_write, test_xor64
  13. from . import BaseTest, parse_bytes
  14. try:
  15. from PIL import Image
  16. except ImportError:
  17. Image = None
  18. def send_command(screen, cmd, payload=b''):
  19. cmd = '\033_G' + cmd
  20. if payload:
  21. if isinstance(payload, str):
  22. payload = payload.encode('utf-8')
  23. payload = base64_encode(payload).decode('ascii')
  24. cmd += ';' + payload
  25. cmd += '\033\\'
  26. c = screen.callbacks
  27. c.clear()
  28. parse_bytes(screen, cmd.encode('ascii'))
  29. return c.wtcbuf
  30. def parse_response(res):
  31. if not res:
  32. return
  33. return res.decode('ascii').partition(';')[2].partition('\033')[0]
  34. def parse_response_with_ids(res):
  35. if not res:
  36. return
  37. a, b = res.decode('ascii').split(';', 1)
  38. code = b.partition('\033')[0].split(':', 1)[0]
  39. a = a.split('G', 1)[1]
  40. return code, a
  41. @dataclass(frozen=True)
  42. class Response:
  43. code: str = 'OK'
  44. msg: str = ''
  45. image_id: int = 0
  46. image_number: int = 0
  47. frame_number: int = 0
  48. def parse_full_response(res):
  49. if not res:
  50. return
  51. a, b = res.decode('ascii').split(';', 1)
  52. code = b.partition('\033')[0].split(':', 1)
  53. if len(code) == 1:
  54. code = code[0]
  55. msg = ''
  56. else:
  57. code, msg = code
  58. a = a.split('G', 1)[1]
  59. ans = {'code': code, 'msg': msg}
  60. for x in a.split(','):
  61. k, _, v = x.partition('=')
  62. ans[{'i': 'image_id', 'I': 'image_number', 'r': 'frame_number'}[k]] = int(v)
  63. return Response(**ans)
  64. all_bytes = bytes(bytearray(range(256)))
  65. def byte_block(sz):
  66. d, m = divmod(sz, len(all_bytes))
  67. return (all_bytes * d) + all_bytes[:m]
  68. def load_helpers(self):
  69. s = self.create_screen()
  70. g = s.grman
  71. def pl(payload, **kw):
  72. kw.setdefault('i', 1)
  73. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  74. res = send_command(s, cmd, payload)
  75. return parse_response(res)
  76. def sl(payload, **kw):
  77. if isinstance(payload, str):
  78. payload = payload.encode('utf-8')
  79. data = kw.pop('expecting_data', payload)
  80. cid = kw.setdefault('i', 1)
  81. self.ae('OK', pl(payload, **kw))
  82. img = g.image_for_client_id(cid)
  83. self.assertIsNotNone(img, f'No image with id {cid} found')
  84. self.ae(img['client_id'], cid)
  85. self.ae(img['data'], data)
  86. if 's' in kw:
  87. self.ae((kw['s'], kw['v']), (img['width'], img['height']))
  88. self.ae(img['is_4byte_aligned'], kw.get('f') != 24)
  89. return img
  90. return s, g, pl, sl
  91. def put_helpers(self, cw, ch, cols=10, lines=5):
  92. iid = 0
  93. def create_screen():
  94. s = self.create_screen(cols, lines, cell_width=cw, cell_height=ch)
  95. return s, 2 / s.columns, 2 / s.lines
  96. def put_cmd(
  97. z=0, num_cols=0, num_lines=0, x_off=0, y_off=0, width=0, height=0, cell_x_off=0,
  98. cell_y_off=0, placement_id=0, cursor_movement=0, unicode_placeholder=0, parent_id=0,
  99. parent_placement_id=0, offset_from_parent_x=0, offset_from_parent_y=0,
  100. ):
  101. return (
  102. f'z={z},c={num_cols},r={num_lines},x={x_off},y={y_off},w={width},h={height},'
  103. f'X={cell_x_off},Y={cell_y_off},p={placement_id},C={cursor_movement},'
  104. f'U={unicode_placeholder},P={parent_id},Q={parent_placement_id},'
  105. f'H={offset_from_parent_x},V={offset_from_parent_y}'
  106. )
  107. def put_image(screen, w, h, **kw):
  108. nonlocal iid
  109. iid += 1
  110. imgid = kw.pop('id', None) or iid
  111. no_id = kw.pop('no_id', False)
  112. if no_id:
  113. cmd = 'a=T,f=24,s=%d,v=%d,%s' % (w, h, put_cmd(**kw))
  114. else:
  115. cmd = 'a=T,f=24,i=%d,s=%d,v=%d,%s' % (imgid, w, h, put_cmd(**kw))
  116. data = b'x' * w * h * 3
  117. res = send_command(screen, cmd, data)
  118. return imgid, parse_response(res)
  119. def put_ref(screen, **kw):
  120. imgid = kw.pop('id', None) or iid
  121. cmd = 'a=p,i=%d,%s' % (imgid, put_cmd(**kw))
  122. return imgid, parse_response_with_ids(send_command(screen, cmd))
  123. def layers(screen, scrolled_by=0, xstart=-1, ystart=1):
  124. return screen.grman.update_layers(scrolled_by, xstart, ystart, dx, dy, screen.columns, screen.lines, cw, ch)
  125. def rect_eq(r, left, top, right, bottom):
  126. for side in 'left top right bottom'.split():
  127. a, b = r[side], locals()[side]
  128. if abs(a - b) > 0.0001:
  129. self.ae(a, b, 'the %s side is not equal' % side)
  130. s, dx, dy = create_screen()
  131. return s, dx, dy, put_image, put_ref, layers, rect_eq
  132. def make_send_command(screen):
  133. def li(payload='abcdefghijkl'*3, s=4, v=3, f=24, a='f', i=1, **kw):
  134. if s:
  135. kw['s'] = s
  136. if v:
  137. kw['v'] = v
  138. if f:
  139. kw['f'] = f
  140. if i:
  141. kw['i'] = i
  142. kw['a'] = a
  143. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  144. res = send_command(screen, cmd, payload)
  145. return parse_full_response(res)
  146. return li
  147. class TestGraphics(BaseTest):
  148. def test_xor_data(self):
  149. base_data = b'\x01' * 64
  150. key = b'\x02' * 64
  151. sizes = []
  152. if has_sse4_2:
  153. sizes.append(2)
  154. if has_avx2:
  155. sizes.append(3)
  156. sizes.append(0)
  157. def t(key, data, align_offset=0):
  158. expected = test_xor64(key, data, 1, 0)
  159. for which_function in sizes:
  160. actual = test_xor64(key, data, which_function, align_offset)
  161. self.ae(expected, actual, f'{align_offset=} {len(data)=}')
  162. t(key, b'')
  163. for base in (b'abc', base_data):
  164. for extra in range(len(base_data)):
  165. for align_offset in range(64):
  166. data = base + base_data[:extra]
  167. t(key, data, align_offset)
  168. def test_disk_cache(self):
  169. s = self.create_screen()
  170. dc = s.grman.disk_cache
  171. dc.small_hole_threshold = 0
  172. data = {}
  173. def key_as_bytes(key):
  174. if isinstance(key, int):
  175. key = str(key)
  176. if isinstance(key, str):
  177. key = key.encode('utf-8')
  178. return bytes(key)
  179. def add(key, val):
  180. bkey = key_as_bytes(key)
  181. data[key] = key_as_bytes(val)
  182. dc.add(bkey, data[key])
  183. def remove(key):
  184. bkey = key_as_bytes(key)
  185. data.pop(key, None)
  186. return dc.remove(bkey)
  187. def check_data():
  188. for key, val in data.items():
  189. self.ae(dc.get(key_as_bytes(key)), val)
  190. def reset(small_hole_threshold=0, defrag_factor=2):
  191. nonlocal dc, data, s
  192. s = self.create_screen()
  193. dc = s.grman.disk_cache
  194. dc.small_hole_threshold = small_hole_threshold
  195. dc.defrag_factor = defrag_factor
  196. data = {}
  197. holes_to_create = 2, 4, 6, 8
  198. for i in range(25):
  199. self.assertIsNone(add(i, f'{i}' * i))
  200. if i <= max(holes_to_create):
  201. # We wait here to ensure data is written in order, otherwise the
  202. # holes test below can fail
  203. self.assertTrue(dc.wait_for_write())
  204. self.assertEqual(dc.total_size, sum(map(len, data.values())))
  205. self.assertTrue(dc.wait_for_write())
  206. check_data()
  207. sz = dc.size_on_disk()
  208. self.assertEqual(sz, sum(map(len, data.values())))
  209. self.assertFalse(dc.holes())
  210. holes = set()
  211. for x in holes_to_create:
  212. remove(x)
  213. holes.add(x)
  214. check_data()
  215. self.assertRaises(KeyError, dc.get, key_as_bytes(x))
  216. self.assertEqual(sz, dc.size_on_disk())
  217. self.assertEqual(holes, {x[1] for x in dc.holes()})
  218. self.assertEqual(sz, dc.size_on_disk())
  219. # fill holes largest first to ensure small one doesn't go into large accidentally causing fragmentation
  220. for i, x in enumerate(sorted(holes, reverse=True)):
  221. x = 'ABCDEFGH'[i] * x
  222. add(x, x)
  223. self.assertTrue(dc.wait_for_write())
  224. check_data()
  225. holes.discard(len(x))
  226. self.assertEqual(holes, {x[1] for x in dc.holes()})
  227. self.assertEqual(sz, dc.size_on_disk(), f'Disk cache has unexpectedly grown from {sz} to {dc.size_on_disk} with data: {x!r}')
  228. check_data()
  229. dc.clear()
  230. st = time.monotonic()
  231. while dc.size_on_disk() and time.monotonic() - st < 2:
  232. time.sleep(0.001)
  233. self.assertEqual(dc.size_on_disk(), 0)
  234. data.clear()
  235. for i in range(25):
  236. self.assertIsNone(add(i, f'{i}' * i))
  237. dc.wait_for_write()
  238. check_data()
  239. before = dc.size_on_disk()
  240. while dc.total_size > before // 3:
  241. key = random.choice(tuple(data))
  242. self.assertTrue(remove(key))
  243. check_data()
  244. add('trigger defrag', 'XXX')
  245. dc.wait_for_write()
  246. self.assertLess(dc.size_on_disk(), before)
  247. check_data()
  248. dc.clear()
  249. st = time.monotonic()
  250. while dc.size_on_disk() and time.monotonic() - st < 20:
  251. time.sleep(0.01)
  252. self.assertEqual(dc.size_on_disk(), 0)
  253. for frame in range(32):
  254. add(f'1:{frame}', f'{frame:02d}' * 8)
  255. dc.wait_for_write()
  256. self.assertEqual(dc.size_on_disk(), 32 * 16)
  257. self.assertEqual(dc.num_cached_in_ram(), 0)
  258. num_in_ram = 0
  259. for frame in range(32):
  260. dc.get(key_as_bytes(f'1:{frame}'))
  261. self.assertEqual(dc.num_cached_in_ram(), num_in_ram)
  262. for frame in range(32):
  263. dc.get(key_as_bytes(f'1:{frame}'), True)
  264. num_in_ram += 1
  265. self.assertEqual(dc.num_cached_in_ram(), num_in_ram)
  266. def clear_predicate(key):
  267. return key.startswith(b'1:')
  268. dc.remove_from_ram(clear_predicate)
  269. self.assertEqual(dc.num_cached_in_ram(), 0)
  270. reset(small_hole_threshold=512, defrag_factor=20)
  271. self.assertIsNone(add(1, '1' * 1024))
  272. self.assertIsNone(add(2, '2' * 1024))
  273. dc.wait_for_write()
  274. sz = dc.size_on_disk()
  275. remove(1)
  276. self.ae(sz, dc.size_on_disk())
  277. self.ae({x[1] for x in dc.holes()}, {1024})
  278. self.assertIsNone(add(3, '3' * 800))
  279. dc.wait_for_write()
  280. self.assertFalse(dc.holes())
  281. self.ae(sz, dc.size_on_disk())
  282. self.assertIsNone(add(4, '4' * 100))
  283. sz += 100
  284. dc.wait_for_write()
  285. self.ae(sz, dc.size_on_disk())
  286. check_data()
  287. self.assertFalse(dc.holes())
  288. remove(4)
  289. self.assertFalse(dc.holes())
  290. self.assertIsNone(add(5, '5' * 10))
  291. sz += 10
  292. dc.wait_for_write()
  293. self.ae(sz, dc.size_on_disk())
  294. # test hole coalescing
  295. reset(defrag_factor=20)
  296. for i in range(1, 6):
  297. self.assertIsNone(add(i, str(i)*i))
  298. dc.wait_for_write()
  299. remove(2)
  300. remove(4)
  301. self.assertEqual(dc.holes(), {(1, 2), (6, 4)})
  302. remove(3)
  303. self.assertEqual(dc.holes(), {(1, 9)})
  304. def test_suppressing_gr_command_responses(self):
  305. s, g, pl, sl = load_helpers(self)
  306. self.ae(pl('abcd', s=10, v=10, q=1), 'ENODATA:Insufficient image data: 4 < 400')
  307. self.ae(pl('abcd', s=10, v=10, q=2), None)
  308. self.assertIsNone(pl('abcd', s=1, v=1, a='q', q=1))
  309. # Test chunked load
  310. self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=1))
  311. self.assertIsNone(pl('efgh', m=1))
  312. self.assertIsNone(pl('ijkl', m=1))
  313. self.assertIsNone(pl('mnop', m=0))
  314. # errors
  315. self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=1))
  316. self.ae(pl('mnop', m=0), 'ENODATA:Insufficient image data: 8 < 16')
  317. self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=2))
  318. self.assertIsNone(pl('mnop', m=0))
  319. # frames
  320. s = self.create_screen()
  321. li = make_send_command(s)
  322. self.assertEqual(li().code, 'ENOENT')
  323. self.assertIsNone(li(q=2))
  324. self.assertIsNone(li(a='t', q=1))
  325. self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=1))
  326. self.assertIsNone(li(payload='2' * 12, m=1))
  327. self.assertIsNone(li(payload='2' * 12))
  328. self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=1))
  329. self.ae(li(payload='2' * 12).code, 'ENODATA')
  330. self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=2))
  331. self.assertIsNone(li(payload='2' * 12))
  332. def test_load_images(self):
  333. s, g, pl, sl = load_helpers(self)
  334. self.assertEqual(g.disk_cache.total_size, 0)
  335. # Test load query
  336. self.ae(pl('abcd', s=1, v=1, a='q'), 'OK')
  337. self.ae(g.image_count, 0)
  338. # Test simple load
  339. for f in 32, 24:
  340. p = 'abc' + ('d' if f == 32 else '')
  341. img = sl(p, s=1, v=1, f=f)
  342. self.ae(bool(img['is_4byte_aligned']), f == 32)
  343. # Test chunked load
  344. self.assertIsNone(pl('abcd', s=2, v=2, m=1))
  345. self.assertIsNone(pl('efgh', m=1))
  346. self.assertIsNone(pl('ijkl', m=1))
  347. self.ae(pl('mnop', m=0), 'OK')
  348. img = g.image_for_client_id(1)
  349. self.ae(img['data'], b'abcdefghijklmnop')
  350. random_data = byte_block(32 * 1024)
  351. sl(
  352. random_data,
  353. s=1024,
  354. v=8,
  355. expecting_data=random_data
  356. )
  357. # Test compression
  358. compressed_random_data = zlib.compress(random_data)
  359. sl(
  360. compressed_random_data,
  361. s=1024,
  362. v=8,
  363. o='z',
  364. expecting_data=random_data
  365. )
  366. # Test chunked + compressed
  367. b = len(compressed_random_data) // 2
  368. self.assertIsNone(pl(compressed_random_data[:b], s=1024, v=8, o='z', m=1))
  369. self.ae(pl(compressed_random_data[b:], m=0), 'OK')
  370. img = g.image_for_client_id(1)
  371. self.ae(img['data'], random_data)
  372. # Test loading from file
  373. def load_temp(prefix='tty-graphics-protocol-'):
  374. f = tempfile.NamedTemporaryFile(prefix=prefix)
  375. f.write(random_data), f.flush()
  376. sl(f.name, s=1024, v=8, t='f', expecting_data=random_data)
  377. self.assertTrue(os.path.exists(f.name))
  378. f.seek(0), f.truncate(), f.write(compressed_random_data), f.flush()
  379. sl(f.name, s=1024, v=8, t='t', o='z', expecting_data=random_data)
  380. return f
  381. f = load_temp()
  382. self.assertFalse(os.path.exists(f.name), f'Temp file at {f.name} was not deleted')
  383. with suppress(FileNotFoundError):
  384. f.close()
  385. f = load_temp('')
  386. self.assertTrue(os.path.exists(f.name), f'Temp file at {f.name} was deleted')
  387. f.close()
  388. # Test loading from POSIX SHM
  389. name = '/kitty-test-shm'
  390. shm_write(name, random_data)
  391. sl(name, s=1024, v=8, t='s', expecting_data=random_data)
  392. self.assertRaises(
  393. FileNotFoundError, shm_unlink, name
  394. ) # check that file was deleted
  395. s.reset()
  396. self.assertEqual(g.disk_cache.total_size, 0)
  397. @unittest.skipIf(Image is None, 'PIL not available, skipping PNG tests')
  398. def test_load_png(self):
  399. s, g, pl, sl = load_helpers(self)
  400. w, h = 5, 3
  401. rgba_data = byte_block(w * h * 4)
  402. img = Image.frombytes('RGBA', (w, h), rgba_data)
  403. rgb_data = img.convert('RGB').convert('RGBA').tobytes()
  404. self.assertEqual(g.disk_cache.total_size, 0)
  405. def png(mode='RGBA'):
  406. buf = BytesIO()
  407. i = img
  408. if mode != i.mode:
  409. i = img.convert(mode)
  410. i.save(buf, 'PNG')
  411. return buf.getvalue()
  412. for mode in 'RGBA RGB'.split():
  413. data = png(mode)
  414. sl(data, f=100, expecting_data=rgb_data if mode == 'RGB' else rgba_data)
  415. for m in 'LP':
  416. img = img.convert(m)
  417. rgba_data = img.convert('RGBA').tobytes()
  418. data = png(m)
  419. sl(data, f=100, expecting_data=rgba_data)
  420. self.ae(pl(b'a' * 20, f=100, S=20).partition(':')[0], 'EBADPNG')
  421. s.reset()
  422. self.assertEqual(g.disk_cache.total_size, 0)
  423. def test_load_png_simple(self):
  424. # 1x1 transparent PNG
  425. png_data = base64_decode('iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+P+/HgAFhAJ/wlseKgAAAABJRU5ErkJggg==')
  426. expected = b'\x00\xff\xff\x7f'
  427. self.ae(load_png_data(png_data), (expected, 1, 1))
  428. s, g, pl, sl = load_helpers(self)
  429. sl(png_data, f=100, expecting_data=expected)
  430. # test error handling for loading bad png data
  431. self.assertRaisesRegex(ValueError, '[EBADPNG]', load_png_data, b'dsfsdfsfsfd')
  432. def test_gr_operations_with_numbers(self):
  433. s = self.create_screen()
  434. g = s.grman
  435. self.assertEqual(g.disk_cache.total_size, 0)
  436. def li(payload, **kw):
  437. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  438. res = send_command(s, cmd, payload)
  439. return parse_response_with_ids(res)
  440. code, ids = li('abc', s=1, v=1, f=24, I=1, i=3)
  441. self.ae(code, 'EINVAL')
  442. code, ids = li('abc', s=1, v=1, f=24, I=1)
  443. self.ae((code, ids), ('OK', 'i=1,I=1'))
  444. img = g.image_for_client_number(1)
  445. self.ae(img['client_number'], 1)
  446. self.ae(img['client_id'], 1)
  447. code, ids = li('abc', s=1, v=1, f=24, I=1)
  448. self.ae((code, ids), ('OK', 'i=2,I=1'))
  449. img = g.image_for_client_number(1)
  450. self.ae(img['client_number'], 1)
  451. self.ae(img['client_id'], 2)
  452. code, ids = li('abc', s=1, v=1, f=24, I=1)
  453. self.ae((code, ids), ('OK', 'i=3,I=1'))
  454. code, ids = li('abc', s=1, v=1, f=24, i=5)
  455. self.ae((code, ids), ('OK', 'i=5'))
  456. code, ids = li('abc', s=1, v=1, f=24, I=3)
  457. self.ae((code, ids), ('OK', 'i=4,I=3'))
  458. # Test chunked load with number
  459. self.assertIsNone(li('abcd', s=2, v=2, m=1, I=93))
  460. self.assertIsNone(li('efgh', m=1))
  461. self.assertIsNone(li('ijkx', m=1))
  462. self.ae(li('mnop', m=0), ('OK', 'i=6,I=93'))
  463. img = g.image_for_client_number(93)
  464. self.ae(img['data'], b'abcdefghijkxmnop')
  465. self.ae(img['client_id'], 6)
  466. # test put with number
  467. def put(**kw):
  468. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  469. cmd = 'a=p,' + cmd
  470. return parse_response_with_ids(send_command(s, cmd))
  471. code, idstr = put(c=2, r=2, I=93)
  472. self.ae((code, idstr), ('OK', 'i=6,I=93'))
  473. code, idstr = put(c=2, r=2, I=94)
  474. self.ae(code, 'ENOENT')
  475. # test delete with number
  476. def delete(ac='N', **kw):
  477. cmd = 'a=d'
  478. if ac:
  479. cmd += f',d={ac}'
  480. if kw:
  481. cmd += ',' + ','.join(f'{k}={v}' for k, v in kw.items())
  482. send_command(s, cmd)
  483. count = s.grman.image_count
  484. put(i=1), put(i=2), put(i=3), put(i=4), put(i=5)
  485. delete(I=94)
  486. self.ae(s.grman.image_count, count)
  487. delete(I=93)
  488. self.ae(s.grman.image_count, count - 1)
  489. delete(I=1)
  490. self.ae(s.grman.image_count, count - 2)
  491. s.reset()
  492. self.assertEqual(g.disk_cache.total_size, 0)
  493. def test_image_put(self):
  494. cw, ch = 10, 20
  495. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  496. self.ae(put_image(s, cw, ch)[1], 'OK')
  497. l0 = layers(s)
  498. self.ae(len(l0), 1)
  499. rect_eq(l0[0]['src_rect'], 0, 0, 1, 1)
  500. rect_eq(l0[0]['dest_rect'], -1, 1, -1 + dx, 1 - dy)
  501. self.ae(l0[0]['group_count'], 1)
  502. self.ae(s.cursor.x, 1), self.ae(s.cursor.y, 0)
  503. src_width, src_height = 3, 5
  504. iid, (code, idstr) = put_ref(s, num_cols=s.columns, num_lines=1, x_off=2, y_off=1, width=src_width, height=src_height,
  505. cell_x_off=3, cell_y_off=1, z=-1, placement_id=17)
  506. self.ae(idstr, f'i={iid},p=17')
  507. l2 = layers(s)
  508. self.ae(len(l2), 2)
  509. self.ae(l2[1], l0[0])
  510. rect_eq(l2[0]['src_rect'], 2 / 10, 1 / 20, (2 + 3) / 10, (1 + 5)/20)
  511. self.ae(l2[0]['group_count'], 2)
  512. left, top = -1 + dx + 3 * dx / cw, 1 - 1 * dy / ch
  513. right = -1 + (1 + s.columns) * dx
  514. bottom = 1 - dy
  515. rect_eq(l2[0]['dest_rect'], left, top, right, bottom)
  516. self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 1)
  517. self.ae(put_image(s, 10, 20, cursor_movement=1)[1], 'OK')
  518. self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 1)
  519. s.reset()
  520. self.assertEqual(s.grman.disk_cache.total_size, 0)
  521. self.ae(put_image(s, 2*cw, 2*ch, num_cols=3)[1], 'OK')
  522. self.ae((s.cursor.x, s.cursor.y), (3, 2))
  523. rect_eq(layers(s)[0]['dest_rect'], -1, 1, -1 + 3 * dx, 1 - 3*dy)
  524. def test_image_layer_grouping(self):
  525. cw, ch = 10, 20
  526. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  527. def group_counts():
  528. return tuple(x['group_count'] for x in layers(s))
  529. self.ae(put_image(s, 10, 20, id=1)[1], 'OK')
  530. self.ae(group_counts(), (1,))
  531. put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=2)
  532. put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=3, z=-2)
  533. put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=4, z=-2)
  534. self.ae(group_counts(), (4, 3, 2, 1))
  535. self.ae(put_image(s, 8, 16, id=2, z=-1)[1], 'OK')
  536. self.ae(group_counts(), (2, 1, 1, 2, 1))
  537. def test_image_parents(self):
  538. cw, ch = 10, 20
  539. iw, ih = 10, 20
  540. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  541. def positions():
  542. ans = {}
  543. def x(x):
  544. return round(((x + 1)/2) * s.columns)
  545. def y(y):
  546. return int(((-y + 1)/2) * s.lines)
  547. for i in layers(s):
  548. d = i['dest_rect']
  549. ans[(i['image_id'], i['ref_id'])] = {'x': x(d['left']), 'y': y(d['top'])}
  550. return ans
  551. def p(x, y=0):
  552. return {'x':x, 'y': y}
  553. self.ae(put_image(s, iw, ih, id=1)[1], 'OK')
  554. self.ae(put_ref(s, id=1, placement_id=1), (1, ('OK', 'i=1,p=1')))
  555. pos = {(1, 1): p(0), (1, 2): p(1)}
  556. self.ae(positions(), pos)
  557. # check that adding a reference to a non-existent parent fails
  558. self.ae(put_ref(s, id=1, placement_id=33, parent_id=1, parent_placement_id=2), (1, ('ENOPARENT', 'i=1,p=33')))
  559. self.ae(put_ref(s, id=1, placement_id=33, parent_id=33), (1, ('ENOPARENT', 'i=1,p=33')))
  560. # check that we cannot add a reference that is its own parent
  561. self.ae(put_ref(s, id=1, placement_id=1, parent_id=1, parent_placement_id=1), (1, ('EINVAL', 'i=1,p=1')))
  562. self.ae(put_image(s, iw, ih, id=2)[1], 'OK')
  563. pos[(2,1)] = p(2)
  564. self.ae(positions(), pos)
  565. # Add two children to the first placement of img2
  566. before = s.cursor.x, s.cursor.y
  567. self.ae(put_ref(s, id=1, placement_id=2, parent_id=2, offset_from_parent_y=3), (1, ('OK', 'i=1,p=2')))
  568. self.ae(before, (s.cursor.x, s.cursor.y), 'Cursor must not move for child image')
  569. pos[(1,3)] = p(2, 3)
  570. self.ae(positions(), pos)
  571. self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, offset_from_parent_y=4), (2, ('OK', 'i=2,p=3')))
  572. pos[(2,2)] = p(2, 4)
  573. self.ae(positions(), pos)
  574. # Add a grand child to the second child of img2
  575. self.ae(put_ref(s, id=2, placement_id=4, parent_id=2, parent_placement_id=3, offset_from_parent_x=-1), (2, ('OK', 'i=2,p=4')))
  576. pos[(2,3)] = p(pos[(2,2)]['x']-1, pos[(2,2)]['y'])
  577. self.ae(positions(), pos)
  578. # Check that creating a cycle is prevented
  579. self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, parent_placement_id=4), (2, ('ECYCLE', 'i=2,p=3')))
  580. self.ae(positions(), pos)
  581. # Check that depth is limited
  582. for i in range(5, 12):
  583. q = put_ref(s, id=2, placement_id=i, parent_id=2, parent_placement_id=i-1, offset_from_parent_x=-1)[1][0]
  584. if q == 'ETOODEEP':
  585. break
  586. self.ae(q, 'OK')
  587. else:
  588. self.assertTrue(False, 'Failed to limit reference chain depth')
  589. # Check that deleting a parent removes all descendants
  590. send_command(s, 'a=d,d=i,i=2,p=3')
  591. pos.pop((2,3)), pos.pop((2,2))
  592. self.ae(positions(), pos)
  593. # Check that deleting a parent deletes all descendants and also removes
  594. # images with no remaining placements
  595. self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, offset_from_parent_y=4), (2, ('OK', 'i=2,p=3')))
  596. pos[(2,11)] = p(2, 4)
  597. self.ae(positions(), pos)
  598. self.ae(put_image(s, iw, ih, id=3, placement_id=97, parent_id=2, parent_placement_id=3)[1], 'OK')
  599. pos[(3,1)] = p(2, 4)
  600. self.ae(positions(), pos)
  601. send_command(s, 'a=d,d=i,i=2')
  602. pos.pop((3,1)), pos.pop((2,11)), pos.pop((2,1)), pos.pop((1,3))
  603. self.ae(positions(), pos)
  604. # Check that virtual placements that try to be relative are rejected
  605. self.ae(put_ref(s, id=1, placement_id=11, parent_id=1, unicode_placeholder=1), (1, ('EINVAL', 'i=1,p=11')))
  606. # Check creation of children of a unicode placeholder based image
  607. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  608. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42)
  609. s.update_only_line_graphics_data()
  610. self.assertFalse(positions()) # the reference is virtual
  611. self.ae(put_ref(s, id=42, placement_id=11, parent_id=42, offset_from_parent_y=2, offset_from_parent_x=1), (42, ('OK', 'i=42,p=11')))
  612. self.assertFalse(positions()) # the reference is virtual without any cell images so the child is invisible
  613. s.apply_sgr("38;5;42")
  614. # These two characters will become one 2x1 ref.
  615. s.cursor.x = s.cursor.y = 1
  616. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  617. s.cursor.x = s.cursor.y = 0
  618. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  619. s.update_only_line_graphics_data()
  620. pos = {(1, 2): p(1, 2), (1, 3): p(0), (1, 4): p(1)}
  621. self.ae(positions(), pos)
  622. s.cursor.x = s.cursor.y = 0
  623. s.erase_in_display(0, False)
  624. s.update_only_line_graphics_data()
  625. self.assertFalse(positions()) # the reference is virtual without any cell images so the child is invisible
  626. s.cursor.x = s.cursor.y = 2
  627. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  628. s.update_only_line_graphics_data()
  629. self.ae(positions(), {(1, 5): {'x': 2, 'y': 2}, (1, 2): {'x': 3, 'y': 4}})
  630. def test_unicode_placeholders(self):
  631. # This test tests basic image placement using using unicode placeholders
  632. cw, ch = 10, 20
  633. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  634. # Upload two images.
  635. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42)
  636. put_image(s, 10, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=(42<<16) + (43<<8) + 44)
  637. # The references are virtual, so no visible refs yet.
  638. s.update_only_line_graphics_data()
  639. refs = layers(s)
  640. self.ae(len(refs), 0)
  641. # A reminder of row/column diacritics meaning (assuming 0-based):
  642. # \u0305 -> 0
  643. # \u030D -> 1
  644. # \u030E -> 2
  645. # \u0310 -> 3
  646. # Now print the placeholders for the first image.
  647. # Encode the id as an 8-bit color.
  648. s.apply_sgr("38;5;42")
  649. # These two characters will become one 2x1 ref.
  650. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  651. # These two characters will be two separate refs (not contiguous).
  652. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030E")
  653. s.cursor_back(4)
  654. s.update_only_line_graphics_data()
  655. refs = layers(s)
  656. self.ae(len(refs), 3)
  657. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5})
  658. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.25, 'bottom': 0.5})
  659. self.ae(refs[2]['src_rect'], {'left': 0.5, 'top': 0.0, 'right': 0.75, 'bottom': 0.5})
  660. # Erase the line.
  661. s.erase_in_line(2)
  662. # There must be 0 refs after the line is erased.
  663. s.update_only_line_graphics_data()
  664. refs = layers(s)
  665. self.ae(len(refs), 0)
  666. # Now test encoding IDs with the 24-bit color.
  667. # The first image, 1x1
  668. s.apply_sgr("38;2;0;0;42")
  669. s.draw("\U0010EEEE\u0305\u0305")
  670. # The second image, 2x1
  671. s.apply_sgr("38;2;42;43;44")
  672. s.draw("\U0010EEEE\u0305\u030D\U0010EEEE\u0305\u030E")
  673. s.cursor_back(2)
  674. s.update_only_line_graphics_data()
  675. refs = layers(s)
  676. self.ae(len(refs), 2)
  677. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.25, 'bottom': 0.5})
  678. # The second ref spans the whole widths of the second image because it's
  679. # fit to height and centered in a 4x2 box (specified in put_image).
  680. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  681. # Erase the line.
  682. s.erase_in_line(2)
  683. # Now test implicit column numbers.
  684. # We will mix implicit and explicit column/row specifications, but they
  685. # will be combine into just two references.
  686. s.apply_sgr("38;5;42")
  687. # full row 0 of the first image
  688. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\U0010EEEE\U0010EEEE\u0305")
  689. # full row 1 of the first image
  690. s.draw("\U0010EEEE\u030D\U0010EEEE\U0010EEEE\U0010EEEE\u030D\u0310")
  691. s.cursor_back(8)
  692. s.update_only_line_graphics_data()
  693. refs = layers(s)
  694. self.ae(len(refs), 2)
  695. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  696. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.5, 'right': 1.0, 'bottom': 1.0})
  697. # Now reset the screen, the images should be erased.
  698. s.reset()
  699. refs = layers(s)
  700. self.ae(len(refs), 0)
  701. def test_unicode_placeholders_3rd_combining_char(self):
  702. # This test tests that we can use the 3rd diacritic for the most
  703. # significant byte
  704. cw, ch = 10, 20
  705. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  706. # Upload two images.
  707. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42)
  708. put_image(s, 20, 10, num_cols=4, num_lines=1, unicode_placeholder=1, id=(42 << 24) + 43)
  709. # This one will have id=43, which does not exist.
  710. s.apply_sgr("38;2;0;0;43")
  711. s.draw("\U0010EEEE\u0305\U0010EEEE\U0010EEEE\U0010EEEE")
  712. s.cursor_back(4)
  713. s.update_only_line_graphics_data()
  714. refs = layers(s)
  715. self.ae(len(refs), 0)
  716. s.erase_in_line(2)
  717. # This one will have id=42. We explicitly specify that the most
  718. # significant byte is 0 (third \u305). Specifying the zero byte like
  719. # this is not necessary but is correct.
  720. s.apply_sgr("38;2;0;0;42")
  721. s.draw("\U0010EEEE\u0305\u0305\u0305\U0010EEEE\u0305\u030D\u0305")
  722. # This is the second image.
  723. # \u059C -> 42
  724. s.apply_sgr("38;2;0;0;43")
  725. s.draw("\U0010EEEE\u0305\u0305\u059C\U0010EEEE\u0305\u030D\u059C")
  726. # Check that we can continue by using implicit row/column specification.
  727. s.draw("\U0010EEEE\u0305\U0010EEEE")
  728. s.cursor_back(6)
  729. s.update_only_line_graphics_data()
  730. refs = layers(s)
  731. self.ae(len(refs), 2)
  732. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5})
  733. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  734. s.erase_in_line(2)
  735. # Now test the 8-bit color mode. Using the third diacritic, we can
  736. # specify 16 bits: the most significant byte and the least significant
  737. # byte.
  738. s.apply_sgr("38;5;42")
  739. s.draw("\U0010EEEE\u0305\u0305\u0305\U0010EEEE")
  740. s.apply_sgr("38;5;43")
  741. s.draw("\U0010EEEE\u0305\u0305\u059C\U0010EEEE\U0010EEEE\u0305\U0010EEEE")
  742. s.cursor_back(6)
  743. s.update_only_line_graphics_data()
  744. refs = layers(s)
  745. self.ae(len(refs), 2)
  746. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5})
  747. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  748. def test_unicode_placeholders_multiple_placements(self):
  749. # Here we test placement specification via underline color.
  750. cw, ch = 10, 20
  751. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  752. put_image(s, 20, 20, num_cols=1, num_lines=1, placement_id=1, unicode_placeholder=1, id=42)
  753. put_ref(s, id=42, num_cols=2, num_lines=1, placement_id=22, unicode_placeholder=1)
  754. put_ref(s, id=42, num_cols=4, num_lines=2, placement_id=44, unicode_placeholder=1)
  755. # The references are virtual, so no visible refs yet.
  756. s.update_only_line_graphics_data()
  757. refs = layers(s)
  758. self.ae(len(refs), 0)
  759. # Draw the first row of each placement.
  760. s.apply_sgr("38;5;42")
  761. s.apply_sgr("58;5;1")
  762. s.draw("\U0010EEEE\u0305")
  763. s.apply_sgr("58;5;22")
  764. s.draw("\U0010EEEE\u0305\U0010EEEE\u0305")
  765. s.apply_sgr("58;5;44")
  766. s.draw("\U0010EEEE\u0305\U0010EEEE\u0305\U0010EEEE\u0305\U0010EEEE\u0305")
  767. s.update_only_line_graphics_data()
  768. refs = layers(s)
  769. self.ae(len(refs), 3)
  770. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.5})
  771. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  772. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  773. def test_unicode_placeholders_scroll(self):
  774. # Here we test scrolling of a region. We'll draw an image spanning 8
  775. # rows and then scroll only the middle part of this image. Each
  776. # reference corresponds to one row.
  777. cw, ch = 5, 10
  778. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch, lines=8)
  779. put_image(s, 5, 80, num_cols=1, num_lines=8, unicode_placeholder=1, id=42)
  780. s.apply_sgr("38;5;42")
  781. s.cursor_position(1, 0)
  782. s.draw("\U0010EEEE\u0305\n")
  783. s.cursor_position(2, 0)
  784. s.draw("\U0010EEEE\u030D\n")
  785. s.cursor_position(3, 0)
  786. s.draw("\U0010EEEE\u030E\n")
  787. s.cursor_position(4, 0)
  788. s.draw("\U0010EEEE\u0310\n")
  789. s.cursor_position(5, 0)
  790. s.draw("\U0010EEEE\u0312\n")
  791. s.cursor_position(6, 0)
  792. s.draw("\U0010EEEE\u033D\n")
  793. s.cursor_position(7, 0)
  794. s.draw("\U0010EEEE\u033E\n")
  795. s.cursor_position(8, 0)
  796. s.draw("\U0010EEEE\u033F")
  797. # Each line will contain a part of the image.
  798. s.update_only_line_graphics_data()
  799. refs = layers(s)
  800. refs = sorted(refs, key=lambda r: r['src_rect']['top'])
  801. self.ae(len(refs), 8)
  802. for i in range(8):
  803. self.ae(refs[i]['src_rect'], {'left': 0.0, 'top': 0.125*i, 'right': 1.0, 'bottom': 0.125*(i + 1)})
  804. self.ae(refs[i]['dest_rect']['top'], 1 - 0.25*i)
  805. # Now set margins to lines 3 and 6.
  806. s.set_margins(3, 6) # 1-based indexing
  807. # Scroll two lines down (i.e. move lines 3..6 up).
  808. # Lines 3 and 4 will be erased.
  809. s.cursor_position(6, 0)
  810. s.index()
  811. s.index()
  812. s.update_only_line_graphics_data()
  813. refs = layers(s)
  814. refs = sorted(refs, key=lambda r: r['src_rect']['top'])
  815. self.ae(len(refs), 6)
  816. # Lines 1 and 2 are outside of the region, not scrolled.
  817. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.125})
  818. self.ae(refs[0]['dest_rect']['top'], 1.0)
  819. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.125*1, 'right': 1.0, 'bottom': 0.125*2})
  820. self.ae(refs[1]['dest_rect']['top'], 1.0 - 0.25*1)
  821. # Lines 3 and 4 are erased.
  822. # Lines 5 and 6 are now higher.
  823. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.125*4, 'right': 1.0, 'bottom': 0.125*5})
  824. self.ae(refs[2]['dest_rect']['top'], 1.0 - 0.25*2)
  825. self.ae(refs[3]['src_rect'], {'left': 0.0, 'top': 0.125*5, 'right': 1.0, 'bottom': 0.125*6})
  826. self.ae(refs[3]['dest_rect']['top'], 1.0 - 0.25*3)
  827. # Lines 7 and 8 are outside of the region.
  828. self.ae(refs[4]['src_rect'], {'left': 0.0, 'top': 0.125*6, 'right': 1.0, 'bottom': 0.125*7})
  829. self.ae(refs[4]['dest_rect']['top'], 1.0 - 0.25*6)
  830. self.ae(refs[5]['src_rect'], {'left': 0.0, 'top': 0.125*7, 'right': 1.0, 'bottom': 0.125*8})
  831. self.ae(refs[5]['dest_rect']['top'], 1.0 - 0.25*7)
  832. # Now scroll three lines up (i.e. move lines 5..6 down).
  833. # Line 6 will be erased.
  834. s.cursor_position(3, 0)
  835. s.reverse_index()
  836. s.reverse_index()
  837. s.reverse_index()
  838. s.update_only_line_graphics_data()
  839. refs = layers(s)
  840. refs = sorted(refs, key=lambda r: r['src_rect']['top'])
  841. self.ae(len(refs), 5)
  842. # Lines 1 and 2 are outside of the region, not scrolled.
  843. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.125})
  844. self.ae(refs[0]['dest_rect']['top'], 1.0)
  845. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.125*1, 'right': 1.0, 'bottom': 0.125*2})
  846. self.ae(refs[1]['dest_rect']['top'], 1.0 - 0.25*1)
  847. # Lines 3, 4 and 6 are erased.
  848. # Line 5 is now lower.
  849. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.125*4, 'right': 1.0, 'bottom': 0.125*5})
  850. self.ae(refs[2]['dest_rect']['top'], 1.0 - 0.25*5)
  851. # Lines 7 and 8 are outside of the region.
  852. self.ae(refs[3]['src_rect'], {'left': 0.0, 'top': 0.125*6, 'right': 1.0, 'bottom': 0.125*7})
  853. self.ae(refs[3]['dest_rect']['top'], 1.0 - 0.25*6)
  854. self.ae(refs[4]['src_rect'], {'left': 0.0, 'top': 0.125*7, 'right': 1.0, 'bottom': 0.125*8})
  855. self.ae(refs[4]['dest_rect']['top'], 1.0 - 0.25*7)
  856. def test_gr_scroll(self):
  857. cw, ch = 10, 20
  858. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  859. put_image(s, 10, 20, no_id=True) # a one cell image at (0, 0)
  860. self.ae(len(layers(s)), 1)
  861. for i in range(s.lines):
  862. s.index()
  863. self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1)
  864. for i in range(s.historybuf.ynum - 1):
  865. s.index()
  866. self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1)
  867. s.index()
  868. self.ae(s.grman.image_count, 0)
  869. # Now test with margins
  870. s.reset()
  871. # Test images outside page area untouched
  872. put_image(s, cw, ch) # a one cell image at (0, 0)
  873. for i in range(s.lines - 1):
  874. s.index()
  875. put_image(s, cw, ch) # a one cell image at (0, bottom)
  876. s.set_margins(2, 4) # 1-based indexing
  877. self.ae(s.grman.image_count, 2)
  878. for i in range(s.lines + s.historybuf.ynum):
  879. s.index()
  880. self.ae(s.grman.image_count, 2)
  881. for i in range(s.lines): # ensure cursor is at top margin
  882. s.reverse_index()
  883. # Test clipped scrolling during index
  884. put_image(s, cw, 2*ch, z=-1, no_id=True) # 1x2 cell image
  885. self.ae(s.grman.image_count, 3)
  886. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  887. s.index(), s.index()
  888. l0 = layers(s)
  889. self.ae(len(l0), 3)
  890. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.5, 'right': 1.0, 'bottom': 1.0})
  891. s.index()
  892. self.ae(s.grman.image_count, 2)
  893. # Test clipped scrolling during reverse_index
  894. for i in range(s.lines):
  895. s.reverse_index()
  896. put_image(s, cw, 2*ch, z=-1, no_id=True) # 1x2 cell image
  897. self.ae(s.grman.image_count, 3)
  898. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  899. while s.cursor.y != 1:
  900. s.reverse_index()
  901. s.reverse_index(), s.reverse_index()
  902. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  903. s.reverse_index()
  904. self.ae(s.grman.image_count, 2)
  905. s.reset()
  906. self.assertEqual(s.grman.disk_cache.total_size, 0)
  907. def test_gr_reset(self):
  908. cw, ch = 10, 20
  909. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  910. put_image(s, cw, ch) # a one cell image at (0, 0)
  911. self.ae(len(layers(s)), 1)
  912. s.reset()
  913. self.ae(s.grman.image_count, 0)
  914. put_image(s, cw, ch) # a one cell image at (0, 0)
  915. self.ae(s.grman.image_count, 1)
  916. for i in range(s.lines):
  917. s.index()
  918. s.reset()
  919. self.ae(s.grman.image_count, 1)
  920. def test_gr_delete(self):
  921. cw, ch = 10, 20
  922. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  923. def delete(ac=None, **kw):
  924. cmd = 'a=d'
  925. if ac:
  926. cmd += f',d={ac}'
  927. if kw:
  928. cmd += ',' + ','.join(f'{k}={v}' for k, v in kw.items())
  929. send_command(s, cmd)
  930. put_image(s, cw, ch)
  931. delete()
  932. self.ae(s.grman.image_count, 1)
  933. self.ae(len(layers(s)), 0)
  934. self.ae(s.grman.image_count, 1)
  935. delete('A')
  936. self.ae(s.grman.image_count, 0)
  937. self.assertEqual(s.grman.disk_cache.total_size, 0)
  938. iid = put_image(s, cw, ch)[0]
  939. delete('I', i=iid, p=7)
  940. self.ae(s.grman.image_count, 1)
  941. delete('I', i=iid)
  942. self.ae(s.grman.image_count, 0)
  943. self.assertEqual(s.grman.disk_cache.total_size, 0)
  944. iid = put_image(s, cw, ch, placement_id=9)[0]
  945. delete('I', i=iid, p=9)
  946. self.ae(s.grman.image_count, 0)
  947. self.assertEqual(s.grman.disk_cache.total_size, 0)
  948. s.reset()
  949. put_image(s, cw, ch)
  950. put_image(s, cw, ch)
  951. delete('C')
  952. self.ae(s.grman.image_count, 2)
  953. s.cursor_position(1, 1)
  954. delete('C')
  955. self.ae(s.grman.image_count, 1)
  956. delete('P', x=2, y=1)
  957. self.ae(s.grman.image_count, 0)
  958. self.assertEqual(s.grman.disk_cache.total_size, 0)
  959. put_image(s, cw, ch, z=9)
  960. delete('Z', z=9)
  961. self.ae(s.grman.image_count, 0)
  962. put_image(s, cw, ch, id=1)
  963. put_image(s, cw, ch, id=2)
  964. put_image(s, cw, ch, id=3)
  965. delete('R', y=2)
  966. self.ae(s.grman.image_count, 1)
  967. delete('R', x=3, y=3)
  968. self.ae(s.grman.image_count, 0)
  969. self.assertEqual(s.grman.disk_cache.total_size, 0)
  970. # test put + delete + put
  971. iid = 999999
  972. self.ae(put_image(s, cw, ch, id=iid), (iid, 'OK'))
  973. self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}')))
  974. delete('i', i=iid)
  975. self.ae(s.grman.image_count, 1)
  976. self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}')))
  977. delete('I', i=iid)
  978. self.ae(put_ref(s, id=iid), (iid, ('ENOENT', f'i={iid}')))
  979. self.ae(s.grman.image_count, 0)
  980. self.assertEqual(s.grman.disk_cache.total_size, 0)
  981. def test_animation_frame_loading(self):
  982. s = self.create_screen()
  983. g = s.grman
  984. li = make_send_command(s)
  985. def t(code='OK', image_id=1, frame_number=2, **kw):
  986. res = li(**kw)
  987. if code is not None:
  988. self.assertEqual(code, res.code, f'{code} != {res.code}: {res.msg}')
  989. if image_id is not None:
  990. self.assertEqual(image_id, res.image_id)
  991. if frame_number is not None:
  992. self.assertEqual(frame_number, res.frame_number)
  993. # test error on send frame for non-existent image
  994. self.assertEqual(li().code, 'ENOENT')
  995. # create image
  996. self.assertEqual(li(a='t').code, 'OK')
  997. self.assertEqual(g.disk_cache.total_size, 36)
  998. # simple new frame (width=4, height=3)
  999. self.assertIsNone(li(payload='2' * 12, z=77, m=1))
  1000. self.assertIsNone(li(payload='2' * 12, z=77, m=1))
  1001. t(payload='2' * 12, z=77)
  1002. img = g.image_for_client_id(1)
  1003. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'2' * 36},))
  1004. # test editing a frame
  1005. t(payload='3' * 36, r=2)
  1006. img = g.image_for_client_id(1)
  1007. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'3' * 36},))
  1008. # test editing part of a frame
  1009. t(payload='4' * 12, r=2, s=2, v=2)
  1010. img = g.image_for_client_id(1)
  1011. def expand(*rows):
  1012. ans = []
  1013. for r in rows:
  1014. ans.append(''.join(x * 3 for x in str(r)))
  1015. return ''.join(ans).encode('ascii')
  1016. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': expand(4433, 4433, 3333)},))
  1017. t(payload='5' * 12, r=2, s=2, v=2, x=1, y=1)
  1018. img = g.image_for_client_id(1)
  1019. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': expand(4433, 4553, 3553)},))
  1020. t(payload='3' * 36, r=2)
  1021. img = g.image_for_client_id(1)
  1022. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'3' * 36},))
  1023. # test loading from previous frame
  1024. t(payload='4' * 12, c=2, s=2, v=2, z=101, frame_number=3)
  1025. img = g.image_for_client_id(1)
  1026. self.assertEqual(img['extra_frames'], (
  1027. {'gap': 77, 'id': 2, 'data': b'3' * 36},
  1028. {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'},
  1029. ))
  1030. # test changing gaps
  1031. img = g.image_for_client_id(1)
  1032. self.assertEqual(img['root_frame_gap'], 0)
  1033. self.assertIsNone(li(a='a', i=1, r=1, z=13))
  1034. img = g.image_for_client_id(1)
  1035. self.assertEqual(img['root_frame_gap'], 13)
  1036. self.assertIsNone(li(a='a', i=1, r=2, z=43))
  1037. img = g.image_for_client_id(1)
  1038. self.assertEqual(img['extra_frames'][0]['gap'], 43)
  1039. # test changing current frame
  1040. img = g.image_for_client_id(1)
  1041. self.assertEqual(img['current_frame_index'], 0)
  1042. self.assertIsNone(li(a='a', i=1, c=2))
  1043. img = g.image_for_client_id(1)
  1044. self.assertEqual(img['current_frame_index'], 1)
  1045. # test delete of frames
  1046. t(payload='5' * 36, frame_number=4)
  1047. img = g.image_for_client_id(1)
  1048. self.assertEqual(img['extra_frames'], (
  1049. {'gap': 43, 'id': 2, 'data': b'3' * 36},
  1050. {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'},
  1051. {'gap': 40, 'id': 4, 'data': b'5' * 36},
  1052. ))
  1053. self.assertEqual(img['current_frame_index'], 1)
  1054. self.assertIsNone(li(a='d', d='f', i=1, r=1))
  1055. img = g.image_for_client_id(1)
  1056. self.assertEqual(img['current_frame_index'], 0)
  1057. self.assertEqual(img['data'], b'3' * 36)
  1058. self.assertEqual(img['extra_frames'], (
  1059. {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'},
  1060. {'gap': 40, 'id': 4, 'data': b'5' * 36},
  1061. ))
  1062. self.assertIsNone(li(a='a', i=1, c=3))
  1063. img = g.image_for_client_id(1)
  1064. self.assertEqual(img['current_frame_index'], 2)
  1065. self.assertIsNone(li(a='d', d='f', i=1, r=2))
  1066. img = g.image_for_client_id(1)
  1067. self.assertEqual(img['current_frame_index'], 1)
  1068. self.assertEqual(img['data'], b'3' * 36)
  1069. self.assertEqual(img['extra_frames'], (
  1070. {'gap': 40, 'id': 4, 'data': b'5' * 36},
  1071. ))
  1072. self.assertIsNone(li(a='d', d='f', i=1))
  1073. img = g.image_for_client_id(1)
  1074. self.assertEqual(img['current_frame_index'], 0)
  1075. self.assertEqual(img['data'], b'5' * 36)
  1076. self.assertFalse(img['extra_frames'])
  1077. self.assertIsNone(li(a='d', d='f', i=1))
  1078. img = g.image_for_client_id(1)
  1079. self.assertEqual(img['data'], b'5' * 36)
  1080. self.assertIsNone(li(a='d', d='F', i=1))
  1081. self.ae(g.image_count, 0)
  1082. self.assertEqual(g.disk_cache.total_size, 0)
  1083. # test frame composition
  1084. self.assertEqual(li(a='t').code, 'OK')
  1085. self.assertEqual(g.disk_cache.total_size, 36)
  1086. t(payload='2' * 36)
  1087. t(payload='3' * 36, frame_number=3)
  1088. img = g.image_for_client_id(1)
  1089. self.assertEqual(img['extra_frames'], (
  1090. {'gap': 40, 'id': 2, 'data': b'2' * 36},
  1091. {'gap': 40, 'id': 3, 'data': b'3' * 36},
  1092. ))
  1093. self.assertEqual(li(a='c', i=11).code, 'ENOENT')
  1094. self.assertEqual(li(a='c', i=1, r=1, c=2).code, 'OK')
  1095. img = g.image_for_client_id(1)
  1096. self.assertEqual(img['extra_frames'], (
  1097. {'gap': 40, 'id': 2, 'data': b'abcdefghijkl'*3},
  1098. {'gap': 40, 'id': 3, 'data': b'3' * 36},
  1099. ))
  1100. self.assertEqual(li(a='c', i=1, r=2, c=3, w=1, h=2, x=1, y=1).code, 'OK')
  1101. img = g.image_for_client_id(1)
  1102. self.assertEqual(img['extra_frames'], (
  1103. {'gap': 40, 'id': 2, 'data': b'abcdefghijkl'*3},
  1104. {'gap': 40, 'id': 3, 'data': b'3' * 12 + (b'333abc' + b'3' * 6) * 2},
  1105. ))
  1106. def test_graphics_quota_enforcement(self):
  1107. s = self.create_screen()
  1108. g = s.grman
  1109. g.storage_limit = 36*2
  1110. li = make_send_command(s)
  1111. # test quota for simple images
  1112. self.assertEqual(li(a='T').code, 'OK')
  1113. self.assertEqual(li(a='T', i=2).code, 'OK')
  1114. self.assertEqual(g.disk_cache.total_size, g.storage_limit)
  1115. self.assertEqual(g.image_count, 2)
  1116. self.assertEqual(li(a='T', i=3).code, 'OK')
  1117. self.assertEqual(g.disk_cache.total_size, g.storage_limit)
  1118. self.assertEqual(g.image_count, 2)
  1119. # test quota for frames
  1120. for i in range(8):
  1121. self.assertEqual(li(payload=f'{i}' * 36, i=2).code, 'OK')
  1122. self.assertEqual(li(payload='x' * 36, i=2).code, 'ENOSPC')
  1123. # test editing should not trigger quota
  1124. self.assertEqual(li(payload='4' * 12, r=2, s=2, v=2, i=2).code, 'OK')
  1125. s.reset()
  1126. self.ae(g.image_count, 0)
  1127. self.assertEqual(g.disk_cache.total_size, 0)
  1128. @unittest.skipIf(Image is None, 'PIL not available, skipping PNG tests')
  1129. def test_cached_rgba_conversion(self):
  1130. from kitty.render_cache import ImageRenderCacheForTesting
  1131. w, h = 5, 3
  1132. rgba_data = byte_block(w * h * 4)
  1133. img = Image.frombytes('RGBA', (w, h), rgba_data)
  1134. buf = BytesIO()
  1135. img.save(buf, 'PNG')
  1136. png_data = buf.getvalue()
  1137. with tempfile.TemporaryDirectory() as cache_path:
  1138. irc = ImageRenderCacheForTesting(cache_path)
  1139. srcs, outputs = [], []
  1140. for i in range(2 * irc.max_entries):
  1141. with open(os.path.join(cache_path, f'{i}.png'), 'wb') as f:
  1142. f.write(png_data)
  1143. srcs.append(f.name)
  1144. outputs.append(irc.render(f.name))
  1145. entries = list(irc.entries())
  1146. self.assertLessEqual(len(entries), irc.max_entries)
  1147. self.ae(irc.num_of_renders, len(outputs))
  1148. remaining_outputs = outputs[-irc.max_entries:]
  1149. for x in remaining_outputs:
  1150. self.assertTrue(os.path.exists(x))
  1151. for x in outputs[:-irc.max_entries]:
  1152. self.assertFalse(os.path.exists(x))
  1153. self.assertLess(os.path.getmtime(remaining_outputs[0]), os.path.getmtime(remaining_outputs[1]))
  1154. remaining_srcs = srcs[-irc.max_entries:]
  1155. self.ae(irc.render(remaining_srcs[0]), remaining_outputs[0])
  1156. self.ae(irc.num_of_renders, len(outputs))
  1157. self.assertGreater(os.path.getmtime(remaining_outputs[0]), os.path.getmtime(remaining_outputs[1]))
  1158. width, height, fd = irc(remaining_srcs[-1])
  1159. with open(fd, 'rb') as f:
  1160. self.ae((width, height), (w, h))
  1161. f.seek(8)
  1162. self.ae(rgba_data, f.read())