123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- # -*- coding: utf-8 -*-
- # - built in - #
- import socket
- import logging
- import dataclasses
- from typing import Union, Optional
- # - pypi - #
- import aiohttp
- from aiohttp.client_exceptions import (
- ClientConnectorSSLError,
- ClientConnectionError,
- ServerConnectionError,
- ClientPayloadError,
- )
- # - local - #
- import config
- HTTP_SESSION_ARGS = {
- 'headers': {'User-Agent': config.USER_AGENT},
- 'connector': aiohttp.TCPConnector(family=socket.AF_INET),
- }
- SSL_EXCEPTIONS = (
- __import__('ssl').SSLError,
- ClientConnectorSSLError,
- ServerConnectionError,
- ClientConnectionError,
- ClientPayloadError,
- )
- TEST_CHANNEL = 'UCAiKrZDrrSJnLpDM-zEVyng'
- # TEST_CHANNEL = 'UC-lHJZR3Gqxm24_Vd_AJ5Yw'
- # channel for testing availability of instance
- @dataclasses.dataclass
- class VideoInfo:
- videoId: str
- author: str
- lengthSeconds: int
- def __init__(self, **kwargs):
- names = set([f.name for f in dataclasses.fields(self)])
- for k, v in kwargs.items():
- if k in names:
- setattr(self, k, v)
- NO_VIDEOS: VideoInfo = VideoInfo(videoId=str(), author=str(), lengthSeconds=~0)
- @dataclasses.dataclass
- class InstanceInfo:
- url: str
- https: bool
- health: Optional[float]
- class Invidious:
- _NAME = 'Invidious'
- # - log - #
- __handler = logging.StreamHandler()
- __handler.setLevel(config.LOGLEVEL)
- __handler.setFormatter(logging.Formatter(config.LOGFMT, config.TIMEFMT))
- _log = logging.getLogger(_NAME)
- _log.addHandler(__handler)
- _log.setLevel(config.LOGLEVEL)
- def __init__(self):
- self._api = '/api/v1'
- self._instance = None
- self._session = aiohttp.ClientSession(**HTTP_SESSION_ARGS)
- @property
- def instance(self) -> Optional[str]:
- return self._instance
- def set_instance(self, instance: str) -> None:
- self._instance = instance
- @property
- def _channel_latest_url(self) -> Optional[str]:
- if self._instance:
- return '%s%s/channels/latest/{}' % (self._instance, self._api)
- return None
- async def _get_instances_info(self) -> Optional[list[InstanceInfo]]:
- url = 'https://api.invidious.io/instances.json'
- params = {
- 'sort_by': 'type,health'
- }
- def get_health(info: dict) -> Optional[float]:
- if monitor := info.get('monitor'):
- if ratio90 := monitor.get('90dRatio'):
- if ratio := ratio90.get('ratio'):
- return float(ratio)
- return None
- def json2info(data: dict) -> list[InstanceInfo]:
- return [
- InstanceInfo(
- url=i[0],
- health=get_health(i[1]),
- https=(i[1].get('type') == 'https'),
- ) for i in data if len(i) >= 2
- ]
- try:
- async with self._session.get(url, params=params) as response:
- if response.status == 200:
- if data := await response.json():
- return json2info(data)
- except SSL_EXCEPTIONS:
- pass
- return None
- async def find_instance(self) -> Optional[str]:
- if (info := await self._get_instances_info()) is None:
- self._log.warning('cannot get instances info')
- return None
- for i in filter(lambda i: i.https, info):
- url = (
- 'https://'
- + i.url
- + self._api
- + '/channels/latest/'
- + TEST_CHANNEL
- )
- try:
- async with self._session.get(url) as response:
- if response.status == 200:
- self._instance = 'https://%s' % i.url
- self._log.info('found instance: %s', self._instance)
- return self._instance
- except SSL_EXCEPTIONS:
- pass
- else:
- self._log.warning('cannot find working instance')
- return None
- async def get_latest_video(self, channel_id: str) -> Optional[VideoInfo]:
- if not (url := self._channel_latest_url):
- return None
- url = url.format(channel_id)
- self._log.debug('%s: downloading latest videos info', channel_id)
- try:
- async with self._session.get(url) as response:
- if response.status == 200:
- if data := await response.json():
- try:
- return VideoInfo(**next(iter(data)))
- except StopIteration:
- pass
- return NO_VIDEOS
- except SSL_EXCEPTIONS:
- pass
- return None
|