utils.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867
  1. """Utilities"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ ▄▀█ ▀█▀ ▄▀█ █▀▄▀█ ▄▀█
  15. # █▀█ █ █ █ █▀█ █▀▄ █ ▄ █▀█ █ █▀█ █ ▀ █ █▀█
  16. #
  17. # © Copyright 2022
  18. #
  19. # https://t.me/hikariatama
  20. #
  21. # 🔒 Licensed under the GNU GPLv3
  22. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  23. import asyncio
  24. import functools
  25. import io
  26. import json
  27. import logging
  28. import os
  29. import random
  30. import re
  31. import shlex
  32. import string
  33. import time
  34. from datetime import timedelta
  35. from typing import Any, List, Optional, Tuple, Union
  36. from urllib.parse import urlparse
  37. import git
  38. import grapheme
  39. import requests
  40. import telethon
  41. from aiogram.types import CallbackQuery
  42. from telethon.hints import Entity
  43. from telethon.tl.custom.message import Message
  44. from telethon.tl.functions.account import UpdateNotifySettingsRequest
  45. from telethon.tl.functions.channels import CreateChannelRequest, EditPhotoRequest
  46. from telethon.tl.functions.messages import (
  47. GetDialogFiltersRequest,
  48. UpdateDialogFilterRequest,
  49. )
  50. from telethon.tl.types import (
  51. Channel,
  52. InputPeerNotifySettings,
  53. MessageEntityBankCard,
  54. MessageEntityBlockquote,
  55. MessageEntityBold,
  56. MessageEntityBotCommand,
  57. MessageEntityCashtag,
  58. MessageEntityCode,
  59. MessageEntityEmail,
  60. MessageEntityHashtag,
  61. MessageEntityItalic,
  62. MessageEntityMention,
  63. MessageEntityMentionName,
  64. MessageEntityPhone,
  65. MessageEntityPre,
  66. MessageEntitySpoiler,
  67. MessageEntityStrike,
  68. MessageEntityTextUrl,
  69. MessageEntityUnderline,
  70. MessageEntityUnknown,
  71. MessageEntityUrl,
  72. MessageMediaWebPage,
  73. PeerChannel,
  74. PeerChat,
  75. PeerUser,
  76. User,
  77. )
  78. from .inline.types import InlineCall
  79. FormattingEntity = Union[
  80. MessageEntityUnknown,
  81. MessageEntityMention,
  82. MessageEntityHashtag,
  83. MessageEntityBotCommand,
  84. MessageEntityUrl,
  85. MessageEntityEmail,
  86. MessageEntityBold,
  87. MessageEntityItalic,
  88. MessageEntityCode,
  89. MessageEntityPre,
  90. MessageEntityTextUrl,
  91. MessageEntityMentionName,
  92. MessageEntityPhone,
  93. MessageEntityCashtag,
  94. MessageEntityUnderline,
  95. MessageEntityStrike,
  96. MessageEntityBlockquote,
  97. MessageEntityBankCard,
  98. MessageEntitySpoiler,
  99. ]
  100. ListLike = Union[list, set, tuple]
  101. emoji_pattern = re.compile(
  102. "["
  103. "\U0001F600-\U0001F64F" # emoticons
  104. "\U0001F300-\U0001F5FF" # symbols & pictographs
  105. "\U0001F680-\U0001F6FF" # transport & map symbols
  106. "\U0001F1E0-\U0001F1FF" # flags (iOS)
  107. "]+",
  108. flags=re.UNICODE,
  109. )
  110. parser = telethon.utils.sanitize_parse_mode("html")
  111. def get_args(message: Message) -> List[str]:
  112. """Get arguments from message (str or Message), return list of arguments"""
  113. if not (message := getattr(message, "message", message)):
  114. return False
  115. if len(message := message.split(maxsplit=1)) <= 1:
  116. return []
  117. message = message[1]
  118. try:
  119. split = shlex.split(message)
  120. except ValueError:
  121. return message # Cannot split, let's assume that it's just one long message
  122. return list(filter(lambda x: len(x) > 0, split))
  123. def get_args_raw(message: Message) -> str:
  124. """Get the parameters to the command as a raw string (not split)"""
  125. if not (message := getattr(message, "message", message)):
  126. return False
  127. if len(args := message.split(maxsplit=1)) > 1:
  128. return args[1]
  129. return ""
  130. def get_args_split_by(message: Message, separator: str) -> List[str]:
  131. """Split args with a specific separator"""
  132. return [
  133. section.strip() for section in get_args_raw(message).split(separator) if section
  134. ]
  135. def get_chat_id(message: Message) -> int:
  136. """Get the chat ID, but without -100 if its a channel"""
  137. return telethon.utils.resolve_id(message.chat_id)[0]
  138. def get_entity_id(entity: Entity) -> int:
  139. """Get entity ID"""
  140. return telethon.utils.get_peer_id(entity)
  141. def escape_html(text: str, /) -> str:
  142. """Pass all untrusted/potentially corrupt input here"""
  143. return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
  144. def escape_quotes(text: str, /) -> str:
  145. """Escape quotes to html quotes"""
  146. return escape_html(text).replace('"', "&quot;")
  147. def get_base_dir() -> str:
  148. """Get directory of this file"""
  149. from . import __main__
  150. return get_dir(__main__.__file__)
  151. def get_dir(mod: str) -> str:
  152. """Get directory of given module"""
  153. return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
  154. async def get_user(message: Message) -> Union[None, User]:
  155. """Get user who sent message, searching if not found easily"""
  156. try:
  157. return await message.client.get_entity(message.sender_id)
  158. except ValueError: # Not in database. Lets go looking for them.
  159. logging.debug("User not in session cache. Searching...")
  160. if isinstance(message.peer_id, PeerUser):
  161. await message.client.get_dialogs()
  162. return await message.client.get_entity(message.sender_id)
  163. if isinstance(message.peer_id, (PeerChannel, PeerChat)):
  164. try:
  165. return await message.client.get_entity(message.sender_id)
  166. except Exception:
  167. pass
  168. async for user in message.client.iter_participants(
  169. message.peer_id,
  170. aggressive=True,
  171. ):
  172. if user.id == message.sender_id:
  173. return user
  174. logging.error("User isn't in the group where they sent the message")
  175. return None
  176. logging.error("`peer_id` is not a user, chat or channel")
  177. return None
  178. def run_sync(func, *args, **kwargs):
  179. """
  180. Run a non-async function in a new thread and return an awaitable
  181. :param func: Sync-only function to execute
  182. :returns: Awaitable coroutine
  183. """
  184. return asyncio.get_event_loop().run_in_executor(
  185. None,
  186. functools.partial(func, *args, **kwargs),
  187. )
  188. def run_async(loop, coro):
  189. """Run an async function as a non-async function, blocking till it's done"""
  190. # When we bump minimum support to 3.7, use run()
  191. return asyncio.run_coroutine_threadsafe(coro, loop).result()
  192. def censor(
  193. obj,
  194. to_censor: Optional[List[str]] = None,
  195. replace_with: Optional[str] = "redacted_{count}_chars",
  196. ):
  197. """May modify the original object, but don't rely on it"""
  198. if to_censor is None:
  199. to_censor = ["phone"]
  200. for k, v in vars(obj).items():
  201. if k in to_censor:
  202. setattr(obj, k, replace_with.format(count=len(v)))
  203. elif k[0] != "_" and hasattr(v, "__dict__"):
  204. setattr(obj, k, censor(v, to_censor, replace_with))
  205. return obj
  206. def relocate_entities(
  207. entities: list,
  208. offset: int,
  209. text: Union[str, None] = None,
  210. ) -> list:
  211. """Move all entities by offset (truncating at text)"""
  212. length = len(text) if text is not None else 0
  213. for ent in entities.copy() if entities else ():
  214. ent.offset += offset
  215. if ent.offset < 0:
  216. ent.length += ent.offset
  217. ent.offset = 0
  218. if text is not None and ent.offset + ent.length > length:
  219. ent.length = length - ent.offset
  220. if ent.length <= 0:
  221. entities.remove(ent)
  222. return entities
  223. async def answer(
  224. message: Union[Message, CallbackQuery, InlineCall],
  225. response: str,
  226. **kwargs,
  227. ) -> Union[CallbackQuery, Message]:
  228. """Use this to give the response to a command"""
  229. # Compatibility with FTG\GeekTG
  230. if isinstance(message, list) and message:
  231. message = message[0]
  232. if isinstance(message, (CallbackQuery, InlineCall)):
  233. await message.edit(response)
  234. return message
  235. kwargs.setdefault("link_preview", False)
  236. if not (edit := message.out):
  237. kwargs.setdefault(
  238. "reply_to",
  239. getattr(message, "reply_to_msg_id", None),
  240. )
  241. parse_mode = telethon.utils.sanitize_parse_mode(
  242. kwargs.pop(
  243. "parse_mode",
  244. message.client.parse_mode,
  245. )
  246. )
  247. if isinstance(response, str) and not kwargs.pop("asfile", False):
  248. text, entity = parse_mode.parse(response)
  249. if len(text) >= 4096:
  250. try:
  251. if not message.client.loader.inline.init_complete:
  252. raise
  253. strings = list(smart_split(text, entity, 4096))
  254. if len(strings) > 10:
  255. raise
  256. list_ = await message.client.loader.inline.list(
  257. message=message,
  258. strings=strings,
  259. )
  260. if not list_:
  261. raise
  262. return list_
  263. except Exception:
  264. file = io.BytesIO(text.encode("utf-8"))
  265. file.name = "command_result.txt"
  266. result = await message.client.send_file(
  267. message.peer_id,
  268. file,
  269. caption="<b>📤 Command output seems to be too long, so it's sent in file.</b>",
  270. )
  271. if message.out:
  272. await message.delete()
  273. return result
  274. result = await (message.edit if edit else message.respond)(
  275. text,
  276. parse_mode=lambda t: (t, entity),
  277. **kwargs,
  278. )
  279. elif isinstance(response, Message):
  280. if message.media is None and (
  281. response.media is None or isinstance(response.media, MessageMediaWebPage)
  282. ):
  283. result = await message.edit(
  284. response.message,
  285. parse_mode=lambda t: (t, response.entities or []),
  286. link_preview=isinstance(response.media, MessageMediaWebPage),
  287. )
  288. else:
  289. result = await message.respond(response, **kwargs)
  290. else:
  291. if isinstance(response, bytes):
  292. response = io.BytesIO(response)
  293. elif isinstance(response, str):
  294. response = io.BytesIO(response.encode("utf-8"))
  295. if name := kwargs.pop("filename", None):
  296. response.name = name
  297. if message.media is not None and edit:
  298. await message.edit(file=response, **kwargs)
  299. else:
  300. kwargs.setdefault(
  301. "reply_to",
  302. getattr(message, "reply_to_msg_id", None),
  303. )
  304. result = await message.client.send_file(message.chat_id, response, **kwargs)
  305. return result
  306. async def get_target(message: Message, arg_no: Optional[int] = 0) -> Union[int, None]:
  307. if any(
  308. isinstance(entity, MessageEntityMentionName)
  309. for entity in (message.entities or [])
  310. ):
  311. e = sorted(
  312. filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
  313. key=lambda x: x.offset,
  314. )[0]
  315. return e.user_id
  316. if len(get_args(message)) > arg_no:
  317. user = get_args(message)[arg_no]
  318. elif message.is_reply:
  319. return (await message.get_reply_message()).sender_id
  320. elif hasattr(message.peer_id, "user_id"):
  321. user = message.peer_id.user_id
  322. else:
  323. return None
  324. try:
  325. entity = await message.client.get_entity(user)
  326. except ValueError:
  327. return None
  328. else:
  329. if isinstance(entity, User):
  330. return entity.id
  331. def merge(a: dict, b: dict) -> dict:
  332. """Merge with replace dictionary a to dictionary b"""
  333. for key in a:
  334. if key in b:
  335. if isinstance(a[key], dict) and isinstance(b[key], dict):
  336. b[key] = merge(a[key], b[key])
  337. elif isinstance(a[key], list) and isinstance(b[key], list):
  338. b[key] = list(set(b[key] + a[key]))
  339. else:
  340. b[key] = a[key]
  341. b[key] = a[key]
  342. return b
  343. async def set_avatar(
  344. client: "TelegramClient", # noqa: F821
  345. peer: Entity,
  346. avatar: str,
  347. ) -> bool:
  348. """Sets an entity avatar"""
  349. if isinstance(avatar, str) and check_url(avatar):
  350. f = (
  351. await run_sync(
  352. requests.get,
  353. avatar,
  354. )
  355. ).content
  356. elif isinstance(avatar, bytes):
  357. f = avatar
  358. else:
  359. return False
  360. await client(
  361. EditPhotoRequest(
  362. channel=peer,
  363. photo=await client.upload_file(f, file_name="photo.png"),
  364. )
  365. )
  366. return True
  367. async def asset_channel(
  368. client: "TelegramClient", # noqa: F821
  369. title: str,
  370. description: str,
  371. *,
  372. silent: Optional[bool] = False,
  373. archive: Optional[bool] = False,
  374. avatar: Optional[str] = "",
  375. _folder: Optional[str] = "",
  376. ) -> Tuple[Channel, bool]:
  377. """
  378. Create new channel (if needed) and return its entity
  379. :param client: Telegram client to create channel by
  380. :param title: Channel title
  381. :param description: Description
  382. :param silent: Automatically mute channel
  383. :param archive: Automatically archive channel
  384. :param avatar: Url to an avatar to set as pfp of created peer
  385. :param _folder: Do not use it, or things will go wrong
  386. :returns: Peer and bool: is channel new or pre-existent
  387. """
  388. async for d in client.iter_dialogs():
  389. if d.title == title:
  390. return d.entity, False
  391. peer = (
  392. await client(
  393. CreateChannelRequest(
  394. title,
  395. description,
  396. megagroup=True,
  397. )
  398. )
  399. ).chats[0]
  400. if silent:
  401. await dnd(client, peer, archive)
  402. elif archive:
  403. await client.edit_folder(peer, 1)
  404. if avatar:
  405. await set_avatar(client, peer, avatar)
  406. if _folder:
  407. if _folder != "hikka":
  408. raise NotImplementedError
  409. folders = await client(GetDialogFiltersRequest())
  410. try:
  411. folder = next(folder for folder in folders if folder.title == "hikka")
  412. except Exception:
  413. return
  414. if any(
  415. peer.id == getattr(folder_peer, "channel_id", None)
  416. for folder_peer in folder.include_peers
  417. ):
  418. return
  419. folder.include_peers += [await client.get_input_entity(peer)]
  420. await client(
  421. UpdateDialogFilterRequest(
  422. folder.id,
  423. folder,
  424. )
  425. )
  426. return peer, True
  427. async def dnd(
  428. client: "TelegramClient", # noqa: F821
  429. peer: Entity,
  430. archive: Optional[bool] = True,
  431. ) -> bool:
  432. """
  433. Mutes and optionally archives peer
  434. :param peer: Anything entity-link
  435. :param archive: Archive peer, or just mute?
  436. :returns: `True` on success, otherwise `False`
  437. """
  438. try:
  439. await client(
  440. UpdateNotifySettingsRequest(
  441. peer=peer,
  442. settings=InputPeerNotifySettings(
  443. show_previews=False,
  444. silent=True,
  445. mute_until=2**31 - 1,
  446. ),
  447. )
  448. )
  449. if archive:
  450. await client.edit_folder(peer, 1)
  451. except Exception:
  452. logging.exception("utils.dnd error")
  453. return False
  454. return True
  455. def get_link(user: Union[User, Channel], /) -> str:
  456. """Get telegram permalink to entity"""
  457. return (
  458. f"tg://user?id={user.id}"
  459. if isinstance(user, User)
  460. else (
  461. f"tg://resolve?domain={user.username}"
  462. if getattr(user, "username", None)
  463. else ""
  464. )
  465. )
  466. def chunks(_list: Union[list, tuple, set], n: int, /) -> list:
  467. """Split provided `_list` into chunks of `n`"""
  468. return [_list[i : i + n] for i in range(0, len(_list), n)]
  469. def get_named_platform() -> str:
  470. """Returns formatted platform name"""
  471. if os.path.isfile("/proc/device-tree/model"):
  472. with open("/proc/device-tree/model") as f:
  473. model = f.read()
  474. return f"🍇 {model}" if model.startswith("Raspberry") else f"❓ {model}"
  475. is_termux = bool(os.popen('echo $PREFIX | grep -o "com.termux"').read())
  476. is_okteto = "OKTETO" in os.environ
  477. is_lavhost = "LAVHOST" in os.environ
  478. if is_termux:
  479. return "🕶 Termux"
  480. if is_okteto:
  481. return "☁️ Okteto"
  482. if is_lavhost:
  483. return f"✌️ lavHost {os.environ['LAVHOST']}"
  484. return "📻 VDS"
  485. def uptime() -> int:
  486. """Returns userbot uptime in seconds"""
  487. return round(time.perf_counter() - init_ts)
  488. def formatted_uptime() -> str:
  489. """Returnes formmated uptime"""
  490. return "{}".format(str(timedelta(seconds=uptime())))
  491. def ascii_face() -> str:
  492. """Returnes cute ASCII-art face"""
  493. return escape_html(
  494. random.choice(
  495. [
  496. "ヽ(๑◠ܫ◠๑)ノ",
  497. "☜(⌒▽⌒)☞",
  498. "/|\\ ^._.^ /|\\",
  499. "(◕ᴥ◕ʋ)",
  500. "ᕙ(`▽´)ᕗ",
  501. "(☞゚∀゚)☞",
  502. "(✿◠‿◠)",
  503. "(▰˘◡˘▰)",
  504. "(˵ ͡° ͜ʖ ͡°˵)",
  505. "ʕっ•ᴥ•ʔっ",
  506. "( ͡° ᴥ ͡°)",
  507. "ʕ♥ᴥ♥ʔ",
  508. "\\m/,(> . <)_\\m/",
  509. "(๑•́ ヮ •̀๑)",
  510. "٩(^‿^)۶",
  511. "(っˆڡˆς)",
  512. "ψ(`∇´)ψ",
  513. "⊙ω⊙",
  514. "٩(^ᴗ^)۶",
  515. "(´・ω・)っ由",
  516. "※\\(^o^)/※",
  517. "٩(*❛⊰❛)~❤",
  518. "( ͡~ ͜ʖ ͡°)",
  519. "✧♡(◕‿◕✿)",
  520. "โ๏௰๏ใ ื",
  521. "∩。• ᵕ •。∩ ♡",
  522. "(♡´౪`♡)",
  523. "(◍>◡<◍)⋈。✧♡",
  524. "♥(ˆ⌣ˆԅ)",
  525. "╰(✿´⌣`✿)╯♡",
  526. "ʕ•ᴥ•ʔ",
  527. "ᶘ ◕ᴥ◕ᶅ",
  528. "▼・ᴥ・▼",
  529. "【≽ܫ≼】",
  530. "ฅ^•ﻌ•^ฅ",
  531. "(΄◞ิ౪◟ิ‵)",
  532. ]
  533. )
  534. )
  535. def array_sum(array: List[Any], /) -> List[Any]:
  536. """Performs basic sum operation on array"""
  537. result = []
  538. for item in array:
  539. result += item
  540. return result
  541. def rand(size: int, /) -> str:
  542. """Return random string of len `size`"""
  543. return "".join(
  544. [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
  545. )
  546. def smart_split(
  547. text: str,
  548. entities: List[FormattingEntity],
  549. length: Optional[int] = 4096,
  550. split_on: Optional[ListLike] = ("\n", " "),
  551. min_length: Optional[int] = 1,
  552. ):
  553. """
  554. Split the message into smaller messages.
  555. A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
  556. The end of each message except the last one is stripped of characters from [split_on]
  557. :param text: the plain text input
  558. :param entities: the entities
  559. :param length: the maximum length of a single message
  560. :param split_on: characters (or strings) which are preferred for a message break
  561. :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
  562. :return:
  563. """
  564. # Authored by @bsolute
  565. # https://t.me/LonamiWebs/27777
  566. encoded = text.encode("utf-16le")
  567. pending_entities = entities
  568. text_offset = 0
  569. bytes_offset = 0
  570. text_length = len(text)
  571. bytes_length = len(encoded)
  572. while text_offset < text_length:
  573. if bytes_offset + length * 2 >= bytes_length:
  574. yield parser.unparse(
  575. text[text_offset:],
  576. list(sorted(pending_entities, key=lambda x: x.offset)),
  577. )
  578. break
  579. codepoint_count = len(
  580. encoded[bytes_offset : bytes_offset + length * 2].decode(
  581. "utf-16le",
  582. errors="ignore",
  583. )
  584. )
  585. for search in split_on:
  586. search_index = text.rfind(
  587. search,
  588. text_offset + min_length,
  589. text_offset + codepoint_count,
  590. )
  591. if search_index != -1:
  592. break
  593. else:
  594. search_index = text_offset + codepoint_count
  595. split_index = grapheme.safe_split_index(text, search_index)
  596. split_offset_utf16 = (
  597. len(text[text_offset:split_index].encode("utf-16le"))
  598. ) // 2
  599. exclude = 0
  600. while (
  601. split_index + exclude < text_length
  602. and text[split_index + exclude] in split_on
  603. ):
  604. exclude += 1
  605. current_entities = []
  606. entities = pending_entities.copy()
  607. pending_entities = []
  608. for entity in entities:
  609. if (
  610. entity.offset < split_offset_utf16
  611. and entity.offset + entity.length > split_offset_utf16 + exclude
  612. ):
  613. # spans boundary
  614. current_entities.append(
  615. _copy_tl(
  616. entity,
  617. length=split_offset_utf16 - entity.offset,
  618. )
  619. )
  620. pending_entities.append(
  621. _copy_tl(
  622. entity,
  623. offset=0,
  624. length=entity.offset
  625. + entity.length
  626. - split_offset_utf16
  627. - exclude,
  628. )
  629. )
  630. elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
  631. # overlaps boundary
  632. current_entities.append(
  633. _copy_tl(
  634. entity,
  635. length=split_offset_utf16 - entity.offset,
  636. )
  637. )
  638. elif entity.offset < split_offset_utf16:
  639. # wholly left
  640. current_entities.append(entity)
  641. elif (
  642. entity.offset + entity.length
  643. > split_offset_utf16 + exclude
  644. > entity.offset
  645. ):
  646. # overlaps right boundary
  647. pending_entities.append(
  648. _copy_tl(
  649. entity,
  650. offset=0,
  651. length=entity.offset
  652. + entity.length
  653. - split_offset_utf16
  654. - exclude,
  655. )
  656. )
  657. elif entity.offset + entity.length > split_offset_utf16 + exclude:
  658. # wholly right
  659. pending_entities.append(
  660. _copy_tl(
  661. entity,
  662. offset=entity.offset - split_offset_utf16 - exclude,
  663. )
  664. )
  665. current_text = text[text_offset:split_index]
  666. yield parser.unparse(
  667. current_text,
  668. list(sorted(current_entities, key=lambda x: x.offset)),
  669. )
  670. text_offset = split_index + exclude
  671. bytes_offset += len(current_text.encode("utf-16le"))
  672. def _copy_tl(o, **kwargs):
  673. d = o.to_dict()
  674. del d["_"]
  675. d.update(kwargs)
  676. return o.__class__(**d)
  677. def check_url(url: str) -> bool:
  678. """Checks url for validity"""
  679. return bool(urlparse(url).netloc)
  680. def get_git_hash() -> Union[str, bool]:
  681. """Get current Hikka git hash"""
  682. try:
  683. repo = git.Repo()
  684. return repo.heads[0].commit.hexsha
  685. except Exception:
  686. return False
  687. def is_serializable(x: Any, /) -> bool:
  688. """Checks if object is JSON-serializable"""
  689. try:
  690. json.dumps(x)
  691. return True
  692. except Exception:
  693. return False
  694. def get_lang_flag(countrycode: str) -> str:
  695. """
  696. Gets an emoji of specified countrycode
  697. :param countrycode: 2-letter countrycode
  698. :returns: Emoji flag
  699. """
  700. if (
  701. len(
  702. code := [
  703. c
  704. for c in countrycode.lower()
  705. if c in string.ascii_letters + string.digits
  706. ]
  707. )
  708. == 2
  709. ):
  710. return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
  711. return countrycode
  712. init_ts = time.perf_counter()