usbguard-notify.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. #!/usr/bin/python3
  2. import shutil
  3. import re
  4. import threading
  5. from subprocess import Popen, run, DEVNULL, PIPE
  6. USBGUARD_EXEC_NAME = shutil.which('usbguard')
  7. DUNSTIFY_EXEC_NAME = shutil.which('dunstify')
  8. open_notifications = {}
  9. def parse_event_type_and_id(stream):
  10. line = stream.readline()
  11. if not line.startswith('[device] '):
  12. return None
  13. event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line)
  14. if len(event_type) == 0:
  15. return None
  16. event_id = re.findall('(?<=id=)[0-9]+', line)
  17. if len(event_id) == 0:
  18. return None
  19. return event_type[0], int(event_id[0])
  20. def parse_event_properties(stream, count):
  21. props = {}
  22. for _ in range(count):
  23. line = stream.readline()
  24. try:
  25. sep_ind = line.index('=')
  26. prop_name = line[1:sep_ind]
  27. props[prop_name] = line[sep_ind + 1:-1]
  28. if prop_name == 'device_rule':
  29. break
  30. except ValueError:
  31. continue
  32. return props
  33. def get_name_and_id_from_rule(rule):
  34. name = re.findall('(?<=name ")[^"]+(?=")', rule)
  35. if len(name) == 0:
  36. name = ''
  37. else:
  38. name = name[0]
  39. id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule)
  40. if len(id) == 0:
  41. id = ''
  42. else:
  43. id = id[0]
  44. return name, id
  45. def prompt_device_action(dev_id, name, long_id):
  46. proc = Popen([DUNSTIFY_EXEC_NAME, '-p',
  47. '-A', 'block,Block',
  48. '-A', 'allow,Allow',
  49. '-A', 'reject,Reject',
  50. f'{name} ({long_id})',
  51. 'New Device'],
  52. stdout=PIPE, text=True, bufsize=0)
  53. open_notifications[dev_id] = int(proc.stdout.readline())
  54. option = proc.communicate()[0][:-1]
  55. try:
  56. open_notifications.pop(dev_id)
  57. except KeyError:
  58. pass
  59. match option:
  60. case 'reject':
  61. run([USBGUARD_EXEC_NAME, 'reject-device', long_id])
  62. case 'allow':
  63. run([USBGUARD_EXEC_NAME, 'allow-device', long_id])
  64. case _:
  65. run([USBGUARD_EXEC_NAME, 'block-device', long_id])
  66. def close_notification(dev_id):
  67. if dev_id in open_notifications:
  68. notif_id = open_notifications.pop(dev_id)
  69. run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)])
  70. with Popen([USBGUARD_EXEC_NAME, 'watch'],
  71. stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc:
  72. new_devices = set()
  73. usbguard_proc.stdout.readline() # get rid of initial connection message
  74. while True:
  75. event_type_result = parse_event_type_and_id(usbguard_proc.stdout)
  76. if event_type_result is None:
  77. continue
  78. event_type, dev_id = event_type_result
  79. if event_type not in ['PresenceChanged', 'PolicyApplied']:
  80. continue
  81. props = parse_event_properties(usbguard_proc.stdout, 3)
  82. name, long_id = get_name_and_id_from_rule(props['device_rule'])
  83. match event_type:
  84. case 'PresenceChanged':
  85. if props['event'] == 'Insert':
  86. new_devices.add(dev_id)
  87. else:
  88. close_notification(dev_id)
  89. new_devices.discard(dev_id)
  90. case 'PolicyApplied':
  91. if props['target_new'] == 'block':
  92. threading.Thread(target=prompt_device_action,
  93. args=(dev_id, name, long_id)).start()
  94. new_devices.discard(dev_id)