123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- #!/usr/bin/python3
- import shutil
- import re
- import threading
- from subprocess import Popen, run, DEVNULL, PIPE
- USBGUARD_EXEC_NAME = shutil.which('usbguard')
- DUNSTIFY_EXEC_NAME = shutil.which('dunstify')
- open_notifications = {}
- def parse_event_type_and_id(stream):
- line = stream.readline()
- if not line.startswith('[device] '):
- return None
- event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line)
- if len(event_type) == 0:
- return None
- event_id = re.findall('(?<=id=)[0-9]+', line)
- if len(event_id) == 0:
- return None
- return event_type[0], int(event_id[0])
- def parse_event_properties(stream, count):
- props = {}
- for _ in range(count):
- line = stream.readline()
- try:
- sep_ind = line.index('=')
- prop_name = line[1:sep_ind]
- props[prop_name] = line[sep_ind + 1:-1]
- if prop_name == 'device_rule':
- break
- except ValueError:
- continue
- return props
- def get_name_and_id_from_rule(rule):
- name = re.findall('(?<=name ")[^"]+(?=")', rule)
- if len(name) == 0:
- name = ''
- else:
- name = name[0]
- id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule)
- if len(id) == 0:
- id = ''
- else:
- id = id[0]
- return name, id
- def prompt_device_action(dev_id, name, long_id):
- proc = Popen([DUNSTIFY_EXEC_NAME, '-p',
- '-A', 'block,Block',
- '-A', 'allow,Allow',
- '-A', 'reject,Reject',
- f'{name} ({long_id})',
- 'New Device'],
- stdout=PIPE, text=True, bufsize=0)
- open_notifications[dev_id] = int(proc.stdout.readline())
- option = proc.communicate()[0][:-1]
- try:
- open_notifications.pop(dev_id)
- except KeyError:
- pass
- match option:
- case 'reject':
- run([USBGUARD_EXEC_NAME, 'reject-device', long_id])
- case 'allow':
- run([USBGUARD_EXEC_NAME, 'allow-device', long_id])
- case _:
- run([USBGUARD_EXEC_NAME, 'block-device', long_id])
- def close_notification(dev_id):
- if dev_id in open_notifications:
- notif_id = open_notifications.pop(dev_id)
- run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)])
- with Popen([USBGUARD_EXEC_NAME, 'watch'],
- stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc:
- new_devices = set()
- usbguard_proc.stdout.readline() # get rid of initial connection message
- while True:
- event_type_result = parse_event_type_and_id(usbguard_proc.stdout)
- if event_type_result is None:
- continue
- event_type, dev_id = event_type_result
- if event_type not in ['PresenceChanged', 'PolicyApplied']:
- continue
- props = parse_event_properties(usbguard_proc.stdout, 3)
- name, long_id = get_name_and_id_from_rule(props['device_rule'])
- match event_type:
- case 'PresenceChanged':
- if props['event'] == 'Insert':
- new_devices.add(dev_id)
- else:
- close_notification(dev_id)
- new_devices.discard(dev_id)
- case 'PolicyApplied':
- if props['target_new'] == 'block':
- threading.Thread(target=prompt_device_action,
- args=(dev_id, name, long_id)).start()
- new_devices.discard(dev_id)
|