123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- ########################################################################
- # Wiizard - A Wii games manager
- # Copyright (C) 2023 CYBERDEViL
- #
- # This file is part of Wiizard.
- #
- # Wiizard is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # Wiizard is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- #
- ########################################################################
- import pyudev
- import subprocess
- import time
- from PyQt5.QtCore import (
- pyqtSignal,
- QObject
- )
- from wiizard.thread import AbstractThread, THREAD_FLAG_IS_STOPABLE
- import pyblockinfo
- import pywwt
- class DeviceModel(QObject):
- """ Class to describe a storage device (partition)
- """
- deviceRemoved = pyqtSignal()
- def __init__(self, dev, devType):
- QObject.__init__(self)
- self.__device = dev
- self.__type = devType # 'disc' or 'partition'
- self.__vendor = ""
- self.__model = ""
- try:
- self.__vendor, self.__model = pyblockinfo.getVendorModel(dev)
- except pyblockinfo.error:
- print(f"!! Failed to get vendor and model for {dev}")
- @property
- def path(self):
- return self.__device
- @property
- def type(self):
- return self.__type
- @property
- def vendor(self):
- return self.__vendor
- @property
- def model(self):
- return self.__model
- def isMounted(self):
- try:
- output = subprocess.check_output("mount", shell=False)
- except subprocess.CalledProcessError as err:
- raise Exception("cmd 'mount' failed", err)
- for line in output.decode("utf-8").split("\n"):
- if not line:
- continue
- spline = line.split(" ")
- if self.__device == spline[0]:
- return True
- return False
- def getFsType(self):
- try:
- blkinfo = pyblockinfo.getBlkidValues(self.path)
- except pyblockinfo.error as err:
- print(err)
- return ""
- return blkinfo.get("TYPE", None)
- def getSize(self):
- try:
- return pyblockinfo.getSize(self.path)
- except pyblockinfo.error as err:
- print(err)
- return 0
- def getAvaiableDevices():
- """ Get a all available block devices
- """
- context = pyudev.Context()
- for device in context.list_devices(subsystem="block"):#, DEVTYPE="partition"):
- yield DeviceModel(device.device_node, device.device_type)
- # https://pyudev.readthedocs.io/en/latest/api/pyudev.html
- class DeviceNotifyThread(AbstractThread):
- """ Monitor storage device activity in a thread
- """
- deviceAdded = pyqtSignal(str, str)
- deviceRemoved = pyqtSignal(str)
- deviceChanged = pyqtSignal(str, str)
- def __init__(self):
- AbstractThread.__init__(self, flags=THREAD_FLAG_IS_STOPABLE)
- self.__run = True
- def stop(self):
- self.__run = False
- def run(self):
- context = pyudev.Context()
- monitor = pyudev.Monitor.from_netlink(context)
- monitor.filter_by("block")
- while self.__run is True:
- device = monitor.poll(timeout=3)
- if device is None:
- continue
- if device.action == "add":
- # TODO artificial delay so the device can init properly, else
- # it wont detect the wbfs partition, maybe device.is_initialized is usefull?
- time.sleep(1)
- self.deviceAdded.emit(device.device_node, device.device_type)
- elif device.action == "remove":
- self.deviceRemoved.emit(device.device_node)
- class DevicesModel(QObject):
- """ Main resource for getting storage devices and get updated when one gets
- added or removed.
- """
- deviceAdded = pyqtSignal(str)
- deviceRemoved = pyqtSignal(str)
- wbfsAdded = pyqtSignal(str)
- wbfsRemoved = pyqtSignal(str)
- def __init__(self):
- QObject.__init__(self)
- self.__devices = {}
- self.__wbfsDevices = []
- self.__notifyThread = DeviceNotifyThread()
- @property
- def devices(self):
- return self.__devices
- @property
- def wbfsPartitions(self):
- for devicePath in self.__wbfsDevices:
- if devicePath in self.__devices:
- yield self.__devices[devicePath]
- def init(self):
- self.__devices.clear()
- context = pyudev.Context()
- for device in context.list_devices(subsystem="block"):#, DEVTYPE="partition"):
- devpath = device.device_node
- model = DeviceModel(devpath, device.device_type)
- self.__devices.update({devpath: model})
- self.__rescanWbfsPartitions()
- self.__notifyThread.deviceAdded.connect(self.__onDeviceAdded)
- self.__notifyThread.deviceRemoved.connect(self.__onDeviceRemoved)
- # Start the udev monitor thread
- self.__notifyThread.start()
- def stop(self):
- self.__notifyThread.stop()
- self.__notifyThread.wait()
- def rescanWbfsPartitions(self):
- # Public api (used after format)
- # Rescan for WBFS partitions and emit wbfsAdded for new partitions
- oldWbfsParts = list(self.__wbfsDevices)
- self.__rescanWbfsPartitions()
- for devicePath in self.__wbfsDevices:
- if devicePath not in self.__devices:
- continue
- if devicePath not in oldWbfsParts:
- self.wbfsAdded.emit(devicePath)
- def __rescanWbfsPartitions(self):
- # Internal only
- self.__wbfsDevices.clear()
- try:
- self.__wbfsDevices += pywwt.find_wbfs_partitions()
- except pywwt.error as err:
- print("pywwt.find_wbfs_partitions failed:", err)
- def __onDeviceAdded(self, dev, devType):
- model = DeviceModel(dev, devType)
- self.__devices.update({dev: model})
- self.__rescanWbfsPartitions()
- self.deviceAdded.emit(dev)
- if dev in self.__wbfsDevices:
- self.wbfsAdded.emit(dev)
- def __onDeviceRemoved(self, devicePath):
- device = self.__devices[devicePath]
- device.deviceRemoved.emit()
- if devicePath in self.__wbfsDevices:
- self.__wbfsDevices.remove(devicePath)
- self.wbfsRemoved.emit(devicePath)
- del self.__devices[devicePath]
- #self.__rescanWbfsPartitions() # No need for rescan (only on add)
- self.deviceRemoved.emit(devicePath)
|