123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- from typing import Callable, Union
- import platform
- import socket
- import urllib.parse
- from picard import PICARD_VERSION_STR, log
- from picard.coverart.image import CoverArtImage
- from picard.file import File
- from picard.track import Track
- from picard.album import Album
- from picard.util import thread
- from PyQt5.QtNetwork import QNetworkReply
- from .server import Server
- from .utils import get_available_browsers, open_browser
- class Client:
- def __init__(self, host: str, port: int, item: Union[File, Track, Album], pick_handler: Callable[[Union[File, Track, Album], CoverArtImage], None]):
- self._host = host
- self._port = port
- self._item = item
- self._pick_handler = pick_handler
- self._server = Server(self._handle_message)
- @staticmethod
- def find_available_port(min_port: int, max_port: int) -> Union[int, None]:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- port = min_port
- while port <= max_port:
- try:
- sock.bind(("127.0.0.1", port))
- sock.close()
- return port
- except Exception:
- port += 1
- raise Exception(f"Could not find available port between {min_port} and {max_port}.")
- def start(self, browser: str, query: str):
- log.debug("Listening on %d", self._port)
- self._server.start("127.0.0.1", self._port)
- params: dict[str, str] = {
- "artist": self._item.metadata.get("albumartist", ""),
- "album": self._item.metadata.get("album", ""),
- "barcode": self._item.metadata.get("barcode", ""),
- "catalog": self._item.metadata.get("catalog", "")
- }
- try:
- for key, values in urllib.parse.parse_qs(query, True).items():
- params[key] = values[0]
- except ValueError:
- pass
- params["remote.port"] = f"ws:{self._port}"
- params["remote.agent"] = f"Picard/({PICARD_VERSION_STR}) ({platform.platform()}; Python {platform.python_version()})"
- params["remote.text"] = "Pick cover for MusicBrainz Picard"
- browsers = get_available_browsers()
- open_browser(browsers[browser], f"{self._host}?{urllib.parse.urlencode(params)}")
- def _handle_message(self, msg: dict):
- log.debug("Received message %s", msg)
- type = msg.get("type")
- if type == "pick":
- self._server.stop()
- thread.to_main(self._handle_pick_message, msg)
- def _handle_pick_message(self, msg: dict):
- def handle_download(data: bytearray, reply: QNetworkReply, error: int):
- url = reply.url().toString()
- cover = CoverArtImage(url, ["front"], f"COV ({msg['source']})", data)
- cover.is_front = True
- self._pick_handler(self._item, cover)
- self._item.tagger.window.set_statusbar_message("Downloading cover '%(url)s'", {
- "url": msg["bigCoverUrl"]
- }, echo=log.info, timeout=3000)
- self._item.tagger.webservice.download_url(url=msg["bigCoverUrl"], handler=handle_download)
|