client.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from typing import Callable, Union
  2. import platform
  3. import socket
  4. import urllib.parse
  5. from picard import PICARD_VERSION_STR, log
  6. from picard.coverart.image import CoverArtImage
  7. from picard.file import File
  8. from picard.track import Track
  9. from picard.album import Album
  10. from picard.util import thread
  11. from PyQt5.QtNetwork import QNetworkReply
  12. from .server import Server
  13. from .utils import get_available_browsers, open_browser
  14. class Client:
  15. def __init__(self, host: str, port: int, item: Union[File, Track, Album], pick_handler: Callable[[Union[File, Track, Album], CoverArtImage], None]):
  16. self._host = host
  17. self._port = port
  18. self._item = item
  19. self._pick_handler = pick_handler
  20. self._server = Server(self._handle_message)
  21. @staticmethod
  22. def find_available_port(min_port: int, max_port: int) -> Union[int, None]:
  23. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  24. port = min_port
  25. while port <= max_port:
  26. try:
  27. sock.bind(("127.0.0.1", port))
  28. sock.close()
  29. return port
  30. except Exception:
  31. port += 1
  32. raise Exception(f"Could not find available port between {min_port} and {max_port}.")
  33. def start(self, browser: str, query: str):
  34. log.debug("Listening on %d", self._port)
  35. self._server.start("127.0.0.1", self._port)
  36. params: dict[str, str] = {
  37. "artist": self._item.metadata.get("albumartist", ""),
  38. "album": self._item.metadata.get("album", ""),
  39. "barcode": self._item.metadata.get("barcode", ""),
  40. "catalog": self._item.metadata.get("catalog", "")
  41. }
  42. try:
  43. for key, values in urllib.parse.parse_qs(query, True).items():
  44. params[key] = values[0]
  45. except ValueError:
  46. pass
  47. params["remote.port"] = f"ws:{self._port}"
  48. params["remote.agent"] = f"Picard/({PICARD_VERSION_STR}) ({platform.platform()}; Python {platform.python_version()})"
  49. params["remote.text"] = "Pick cover for MusicBrainz Picard"
  50. browsers = get_available_browsers()
  51. open_browser(browsers[browser], f"{self._host}?{urllib.parse.urlencode(params)}")
  52. def _handle_message(self, msg: dict):
  53. log.debug("Received message %s", msg)
  54. type = msg.get("type")
  55. if type == "pick":
  56. self._server.stop()
  57. thread.to_main(self._handle_pick_message, msg)
  58. def _handle_pick_message(self, msg: dict):
  59. def handle_download(data: bytearray, reply: QNetworkReply, error: int):
  60. url = reply.url().toString()
  61. cover = CoverArtImage(url, ["front"], f"COV ({msg['source']})", data)
  62. cover.is_front = True
  63. self._pick_handler(self._item, cover)
  64. self._item.tagger.window.set_statusbar_message("Downloading cover '%(url)s'", {
  65. "url": msg["bigCoverUrl"]
  66. }, echo=log.info, timeout=3000)
  67. self._item.tagger.webservice.download_url(url=msg["bigCoverUrl"], handler=handle_download)