|
- #!/usr/bin/env python3
- """
- Functional description
- ----------------------
- chunks
- zstd-compressed sections of files for patching
- manifest
- Provides information for all chunks
- name (= URL), patch offset and checksum
- getBuild
- JSON file that provides information about the available game and voiceover pack files
- Provides manifests and the base URL to download chunks from
- TODO
- ----
- High priority
- New game installation
- Which URL?
- Where is 'deletefiles.txt' ?
- voiceover-packs
- Medium priority
- Make chunk names identical to the official launcher
- Low priority
- Parallelization for file downloads and patching
- """
- from __future__ import annotations
- import argparse
- import hashlib # md5
- import json
- import pathlib
- import re # Regular Expressions
- import shutil # rmtree
- import time
- import urllib.request as request # downloads
- from typing import TYPE_CHECKING
- import zstandard # archive unpacking
- from google.protobuf.json_format import MessageToJson
- import manifest_pb2 # generated
- if TYPE_CHECKING:
- from os import PathLike
- SCRIPTDIR = pathlib.Path(__file__).resolve().parent
- OPT: Options
- # ------------------- CLI interface
- class Options(argparse.Namespace):
- gamedir: pathlib.Path
- tempdir: pathlib.Path
- force_use_cache: bool = False
- predownload: bool = False
- do_install: bool = False
- do_update: bool = False
- list_zips: bool = False
- # This script does currently not write anything to the game installation directory
- def parse_args() -> Options:
- parser = argparse.ArgumentParser(description="Game install and update client (WIP)")
- parser.add_argument("gamedir", type=pathlib.Path)
- parser.add_argument("--tempdir", dest="tempdir", type=pathlib.Path, default=SCRIPTDIR / "tmp")
- parser.add_argument("--force-use-cache", dest="force_use_cache", action="store_true")
- parser.add_argument("--predownload", action="store_true")
- parser.add_argument("--install", dest="do_install", action="store_true")
- parser.add_argument("--update", dest="do_update", action="store_true")
- parser.add_argument("--list-zips", dest="list_zips", action="store_true")
- return parser.parse_args(namespace=Options())
- def main():
- global OPT
- OPT = parse_args()
- cli = SophonClient()
- cli.initialize()
- cli.retrieve_API_keys()
- if OPT.list_zips:
- assert cli.packages_json is not None, "Not available."
- def dump_single(comment, section):
- print(f"Main game: {comment} for {section['version']}")
- for v in section["game_pkgs"]:
- print("\t " + v["url"])
- print("Voiceover packs:")
- for v in section["audio_pkgs"]:
- print(f"\t {v['language']} : " + v["url"])
- print("")
- print("List of available archives")
- print("-" * 26)
- js = cli.packages_json
- dump_single("new install", js["major"])
- for p in js["patches"]:
- dump_single("update", p)
- exit(0)
- assert False, "In development."
- #cli.cleanup_temp()
- cli.load_manifest("game")
- v = cli.find_chunks_by_file_name("pkg_version")
- if v:
- cli.download_and_patch_file(v)
- # ------------------- Utilities
- def tempdir(*args: str | PathLike[str]) -> pathlib.Path:
- return OPT.tempdir.joinpath(*args)
- def gamedir(*args: str | PathLike[str]) -> pathlib.Path:
- return OPT.gamedir.joinpath(*args)
- def debuglog(*args):
- print("DEBUG ", *args)
- def infolog(*args):
- print("INFO ", *args)
- def warnlog(*args):
- print("WARN ", *args)
- # -------------------
- class SophonClient:
- installed_ver: None # "major.minor.patch" or "new" for new installations
- rel_type: str | None = None # os / cn
- gamedatadir: pathlib.Path | None = None # absolute path to *_Data
- branch: str # main / pre_download
- branches_json = None # package_id, password, tag
- packages_json = None # zip archive listing
- getBuild_json = None # json object of the entire getBuild json
- manifest: manifest_pb2.Manifest | None = None # manifest_pb2
- category_json: None # json object of currently selected category
- def initialize(self):
- if OPT.do_install:
- OPT.gamedir.mkdir(exist_ok=True)
- # must be empty (allow config.ini)
- assert len(list(OPT.gamedir.glob("*"))) < 2, "The specified path is not empty"
- elif not OPT.gamedir.is_dir():
- print("Script usage: python3 updater.py /path/to/game/dir")
- exit(1)
- self.branch = "pre_download" if OPT.predownload else "main"
- infolog(f"Selected branch '{self.branch}'")
- OPT.tempdir.mkdir(exist_ok=True)
- # Autodetection
- if OPT.do_install:
- self._initialize_install()
- else:
- self._initialize_update()
- def _initialize_install(self):
- assert False, "TODO"
- # Prompt to ask for the desired game version
- pass
- def _initialize_update(self):
- """
- Find out what kind of installation we need to update
- """
- # Absolute path to the game data directory
- self.gamedatadir = next(OPT.gamedir.glob("*_Data"), None)
- assert self.gamedatadir, "Cannot determine game data dir"
- if gamedir("GenshinImpact.exe").is_file():
- self.rel_type = "os"
- elif gamedir("YuanShen.exe").is_file():
- if self.gamedatadir.joinpath("Plugins", "PCGameSDK.dll").is_file():
- self.rel_type = "bb"
- else:
- self.rel_type = "cn"
- assert isinstance(self.rel_type, str), "Failed to detect release type"
- infolog(f"Release type: {self.rel_type}")
- # Retrieve the installed game version
- if True:
- fullname = gamedir(self.gamedatadir, "globalgamemanagers")
- assert fullname.is_file(), "Game install is incomplete!"
- contents = fullname.read_bytes()
- ver = re.findall(br"\0(\d+\.\d+\.\d+)_\d+_\d+\0", contents)
- assert len(ver) == 1, "Broken script or corrupted game installation"
- self.installed_ver = ver[0].decode("utf-8")
- infolog(f"Installed game version: {self.installed_ver}")
- # Compare game version with what's contained in "config.ini"
- self.check_config_ini()
- def check_config_ini(self):
- """
- Internal function
- """
- fullname = gamedir("config.ini")
- if not fullname.is_file():
- return
- contents = fullname.read_text()
- ver = re.findall(r"game_version=(\d+\.\d+\.\d+)", contents)
- if len(ver) != 1:
- warnlog("config.ini is incomplete or corrupt")
- return
- infolog(f"config.ini: Game version {ver[0]}")
- if ver[0] != self.installed_ver:
- warnlog("config.ini and the actual installed game version differ!")
- def cleanup_temp(self):
- # DANGER
- assert False
- shutil.rmtree(OPT.tempdir)
- def load_or_download_file(self, fname, url):
- """
- fname: file name without path prefix
- url: str or function ptr to retrieve the URL
- Returns: File handle
- """
- fullname = tempdir(fname)
- do_download = True
- if fullname.is_file():
- # keep cached for 24 hours
- do_download = time.time() - fullname.stat().st_mtime > (24 * 3600)
- if OPT.force_use_cache:
- do_download = False
- if do_download:
- # Check whether the file is still up-to-date
- if callable(url):
- url = url()
- OPT.tempdir.mkdir(exist_ok=True)
- request.urlretrieve(url, fullname)
- debuglog(f"Got file '{fname}' (new)")
- else:
- debuglog(f"Got file '{fname}' (cached)")
- return fullname
- def load_or_download_json(self, fname, url):
- path = self.load_or_download_file(fname, url)
- with path.open("rb") as fh:
- js = json.load(fh)
- ret = js["retcode"]
- assert ret == 0, (f"Failed to retrieve '{fname}': " +
- f"server returned status code {ret} ({js['message']})")
- return js["data"]
- def retrieve_API_keys(self):
- """
- Retrieves passkeys for authentication to download URLs
- Depends on "initialize_*".
- """
- assert isinstance(self.rel_type, str), "Missing initialize"
- base_url = None
- tail = None
- if self.rel_type == "os":
- # Up-to-date as of 2024-06-15 (4.7.0)
- game_ids = "gopR6Cufr3"
- launcher_id = "VYTpXlbWo8"
- tail = f"game_ids[]={game_ids}&launcher_id={launcher_id}"
- base_url = "https://sg-hyp-api.hoy" + "overse.com/hyp/hyp-connect/api"
- assert self.rel_type == "os", "CN/BB yet not implemented" # TODO
- if not self.branches_json:
- # MANDATORY. JSON with package_id, password and tag(s)
- js = self.load_or_download_json("getGameBranches.json", f"{base_url}/getGameBranches?{tail}")
- # Array length corresponds to the amount of "game_ids" requested.
- self.branches_json = js["game_branches"][0][self.branch]
- assert self.branches_json is not None, "Cannot find API keys for the selected branch."
- ver = self.branches_json["tag"]
- infolog(f"Sophon provides game version {ver}")
- if False: # TODO
- # JSON with game paths for voiceover packs, logs, screenshots
- self.load_or_download_file("getGameConfigs.json", f"{base_url}/getGameConfigs?{tail}")
- if False: # TODO
- # JSON with SDK files (BiliBili ?)
- channel = 1
- sub_channel = 0
- self.load_or_download_file("getGameChannelSDKs.json",
- f"{base_url}/getGameChannelSDKs?channel={channel}&{tail}&sub_channel={sub_channel}")
- if OPT.do_update or OPT.do_install:
- # zip downloads (successor of the /resource JSON file)
- js = self.load_or_download_json("getGamePackages.json", f"{base_url}/getGamePackages?{tail}")
- self.packages_json = js["game_packages"][0][self.branch]
- if self.packages_json is None:
- infolog("No game packages (zip) available for the selected branch.")
- else:
- count = 1
- if isinstance(self.packages_json, list):
- count = len(self.packages_json)
- infolog(f"Available game packages (zip): {count}")
- def make_getBuild_url(self):
- """
- Compose the URL for the main JSON file for chunk-based downloads (sophon)
- Returns: URL
- """
- if not self.branches_json:
- self.retrieve_API_keys()
- url = None
- if self.rel_type == "os":
- url = "sg-public-api.ho" + "yoverse.com"
- elif self.rel_type == "cn":
- url = "api-takumi.mih" + "oyo.com"
- assert not (url is None), f"Unhandled release type {self.rel_type}"
- url = (
- "https://" + url + "/downloader/sophon_chunk/api/getBuild"
- + "?branch=" + self.branches_json["branch"]
- + "&package_id=" + self.branches_json["package_id"]
- + "&password=" + self.branches_json["password"]
- )
- infolog("Created getBuild JSON URL")
- return url
- def load_getBuild_json(self):
- """
- Loads the main JSON for manifest and chunk information
- """
- path = self.load_or_download_file("getBuild.json", self.make_getBuild_url)
- with path.open("rb") as fh:
- self.getBuild_json = json.load(fh)
- infolog("Loaded getBuild JSON")
- def load_manifest(self, cat_name):
- """
- Loads the specified manifest protobuf
- cat_name: "game", "en-us", "zh-cn", "ja-jp", "ko-kr"
- """
- if not self.getBuild_json:
- self.load_getBuild_json()
- jd = self.getBuild_json["data"]
- infolog(f"Server provides game version {jd['tag']}")
- # Find matching category
- category = None
- for jdm in jd["manifests"]:
- if jdm["matching_field"] == cat_name:
- category = jdm
- break
- assert not (category is None), f"Cannot find the specified field '{cat_name}'"
- infolog(f"Found category {cat_name}")
- self.category_json = category
- # Download and decompress manifest protobuf
- fname_raw = category["manifest"]["id"]
- url = category["manifest_download"]["url_prefix"] + "/" + category["manifest"]["id"]
- zstd_path = self.load_or_download_file(fname_raw + ".zstd", url)
- with zstd_path.open('br') as zfh:
- reader = zstandard.ZstdDecompressor().stream_reader(zfh)
- pb = manifest_pb2.Manifest()
- pb.ParseFromString(reader.read())
- nfiles = len(pb.files)
- infolog(f"Decompressed manifest protobuf ({nfiles} files)")
- json_fname = tempdir(fname_raw + ".json")
- if not json_fname.is_file():
- with json_fname.open("w+") as jfh:
- json.dump(json.loads(MessageToJson(pb)), jfh)
- infolog("Exported protobuf to JSON file")
- self.manifest = pb
- def find_chunks_by_file_name(self, file_name):
- """
- Searches a specific file name in the manifest
- Returns: FileInfo or None
- """
- assert isinstance(file_name, str)
- for v in self.manifest.files:
- if v.filename == file_name:
- return v
- warnlog(f"Cannot find chunks for file: {file_name}")
- return None
- def download_and_patch_file(self, file_info):
- """
- Downloads the chunks and patches a file
- file_info: FileInfo, one of the manifest.files[] objects
- """
- if file_info.flags == 64:
- # Created as soon a file is put inside
- infolog(f"Skipping directory entry: {file_info.filename}")
- return
- assert (file_info.flags == 0), f"Unknown flags {file_info.flags} for '{file_info.filename}'"
- chunk_url_prefix = self.category_json["chunk_download"]["url_prefix"]
- infolog(
- "Chunk downloader:\n"
- f"\t File name: {file_info.filename}\n"
- f"\t Chunk count: {len(file_info.chunks)}\n"
- f"\t File size: {int(file_info.size / (1024 * 1024 / 10) + 0.5) / 10} MiB"
- )
- # Disallow writing to unpredictable locations
- assert (".." not in str(file_info.filename)), "Security alert"
- assert (str(file_info.filename)[0] != '/'), "Security alert"
- # Create directory structure and patch file into the temporary directory
- parent_dir = pathlib.Path(file_info.filename).parent
- tempdir(parent_dir).mkdir(parents=True, exist_ok=True)
- tmp_file = tempdir(file_info.filename)
- shutil.copyfile(gamedir(file_info.filename), tmp_file)
- with tmp_file.open("wb") as fh: # no truncate!
- # Download all chunks
- n = 0
- for chunk in file_info.chunks:
- n += 1
- cfname = tempdir(chunk.chunk_id)
- try:
- if cfname.stat().st_size == chunk.compressed_size:
- continue # TODO: do a proper hash check
- except FileNotFoundError:
- pass # Already downloaded
- request.urlretrieve(chunk_url_prefix + "/" + chunk.chunk_id, cfname)
- print(f"\t * Downloaded chunk {n} / {len(file_info.chunks)}")
- # Apply all chunks
- n = 0
- for chunk in file_info.chunks:
- n += 1
- cfname = tempdir(chunk.chunk_id)
- # Attempt to patch
- with cfname.open("rb") as zfh:
- reader = zstandard.ZstdDecompressor().stream_reader(zfh)
- fh.seek(chunk.offset)
- fh.write(reader.read())
- print(f"\t * Patched {n} / {len(file_info.chunks)}")
- # Verify file integrity
- md5 = hashlib.md5(tmp_file.read_bytes()).hexdigest()
- if file_info.md5 == md5:
- print("\t * Patching succeeded. md5 matches.")
- else:
- print(f"\t * Hash mismatch after patching. File is corrupt: {file_info.filename}")
- print("")
- if __name__ == '__main__':
- main()
|