main.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import ast
  2. import os
  3. import re
  4. import sys
  5. from enum import IntEnum, unique
  6. from queue import Queue
  7. from threading import Thread
  8. import requests
  9. from bs4 import BeautifulSoup as bs
  10. # Author link example:
  11. # https://store.line.me/stickershop/author/97829/en
  12. # Pack link example:
  13. # https://store.line.me/stickershop/product/1000509/en
  14. # Sticker link example:
  15. # https://stickershop.line-scdn.net/stickershop/v1/sticker/67058943/iPhone/sticker@2x.png
  16. # https://stickershop.line-scdn.net/stickershop/v1/sticker/73694/android/sticker.png
  17. @unique
  18. class Enums(IntEnum):
  19. AUTHOR = 1
  20. PACK = 2
  21. ERROR = 3
  22. DIR_STICKERS: str = "stickers"
  23. Q1: Queue = Queue(maxsize=0)
  24. Q2: Queue = Queue(maxsize=0)
  25. Q3: Queue = Queue(maxsize=0)
  26. NUM_THREADS: int = 8
  27. DOMAIN: str = "https://store.line.me"
  28. sticker_count: int = 0
  29. set_count: int = 0
  30. def rip_line_stickers(argument: str):
  31. session = requests.Session()
  32. template_static = "https://stickershop.line-scdn.net/stickershop/v1/sticker/{}/{}/{}.{}"
  33. pattern_link_sticker = re.compile(
  34. r"https://stickershop.line-scdn.net/stickershop/v1/sticker/(?P<id>\d+)/(?P<platform>.+)/(?P<filename_with_ext>(?P<filename>.+)\.(?P<ext>.+))")
  35. pattern_a_next = re.compile(r"\?page=\d+")
  36. def determine_type(link: str):
  37. if "/author/" in link:
  38. return Enums.AUTHOR
  39. elif "/product/" in link:
  40. return Enums.PACK
  41. else:
  42. return Enums.ERROR
  43. def get_pack(link: str):
  44. result = session.get(link)
  45. result.raise_for_status()
  46. soup = bs(result.content, "lxml")
  47. tag_p = soup.find_all("p")
  48. name_pack = None
  49. for p in tag_p:
  50. if p.has_attr("data-test") and p["data-test"] == "sticker-name-title":
  51. name_pack = p.string
  52. tag_li = soup.find_all("li")
  53. for li in tag_li:
  54. if li.has_attr("data-preview"):
  55. data_preview = ast.literal_eval(li["data-preview"])
  56. if data_preview["type"] == "static":
  57. url = data_preview["staticUrl"]
  58. if "/sticonshop/" not in url:
  59. match = pattern_link_sticker.search(url)
  60. try:
  61. if match.group("platform") == "android":
  62. url = template_static.format(match.group("id"), "iPhone",
  63. match.group("filename") + "@2x",
  64. match.group("ext"))
  65. except Exception as e:
  66. print("Exception while processing {}".format(name_pack))
  67. print(e)
  68. print(tag_li)
  69. elif data_preview["type"] == "animation":
  70. url = data_preview["animationUrl"]
  71. match = pattern_link_sticker.search(url)
  72. else:
  73. print("Encountered unknown type: {}".format(data_preview["type"]))
  74. exit(2)
  75. if "/sticonshop/" not in url:
  76. Q1.put(
  77. {
  78. "name_pack": name_pack,
  79. "filename": match.group("id"),
  80. "ext": match.group("ext"),
  81. "link": url
  82. }
  83. )
  84. def get_author(link: str):
  85. result = session.get(link)
  86. result.raise_for_status()
  87. soup = bs(result.content, "lxml")
  88. tag_li = soup.find_all("li")
  89. for li in tag_li:
  90. if li.has_attr("data-test"):
  91. data_test = li["data-test"]
  92. if data_test == "author-item":
  93. Q2.put(
  94. {
  95. "link": "{}{}".format(DOMAIN, li.a.get("href"))
  96. }
  97. )
  98. tag_a = soup.find_all("a")
  99. for a in tag_a:
  100. if a.has_attr("href") and a.text == "Next":
  101. if pattern_a_next.search(a.get("href")):
  102. link = link.split("?")[0]
  103. link = "{}{}".format(link, a.get("href"))
  104. get_author(link)
  105. def threaded_scrape(q: Queue):
  106. while True:
  107. things = q.get()
  108. name_pack = things['name_pack']
  109. filename = things['filename']
  110. ext = things['ext']
  111. link = things['link']
  112. path = "{}/{}/{}.{}".format(DIR_STICKERS, name_pack, filename, ext)
  113. if not os.path.exists("{}/{}".format(DIR_STICKERS, name_pack)):
  114. os.makedirs("{}/{}".format(DIR_STICKERS, name_pack), exist_ok=True)
  115. with open(path, "wb") as f:
  116. result = session.get(link)
  117. result.raise_for_status()
  118. f.write(result.content)
  119. global sticker_count
  120. sticker_count += 1
  121. q.task_done()
  122. def threaded_crawl(q: Queue):
  123. while True:
  124. things = q.get()
  125. link = things["link"]
  126. get_pack(link)
  127. global set_count
  128. set_count += 1
  129. q.task_done()
  130. link_type = determine_type(argument)
  131. if link_type == Enums.AUTHOR:
  132. get_author(argument)
  133. elif link_type == Enums.PACK:
  134. get_pack(argument)
  135. else:
  136. print("Could not determine link type!")
  137. exit(1)
  138. for i in range(NUM_THREADS):
  139. worker = Thread(target=threaded_crawl, args=(Q2,))
  140. worker.daemon = True
  141. worker.start()
  142. for i in range(NUM_THREADS):
  143. worker = Thread(target=threaded_scrape, args=(Q1,))
  144. worker.daemon = True
  145. worker.start()
  146. Q2.join()
  147. print("Queued", set_count, "sets!")
  148. Q1.join()
  149. print("Downloaded", sticker_count, "files!")
  150. if __name__ == '__main__':
  151. if len(sys.argv) <= 1:
  152. print("Pass a link as an argument.")
  153. else:
  154. rip_line_stickers(sys.argv[1])