123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # SPDX-FileCopyrightText: Copyright (C) 2021-2023 MH3SP Server Project
- # SPDX-License-Identifier: AGPL-3.0-or-later
- """Monster Hunter PAT item module."""
- import struct
- from collections import OrderedDict
- from mh.constants import pad
- from other.utils import to_bytearray, get_config, get_ip, GenericUnpacker
- from mh.database import Server, Gate, City
- class ItemType:
- Custom = 0
- Byte = 1
- Word = 2
- Long = 3
- LongLong = 4
- String = 5
- Binary = 6
- Object = 7
- def lp_string(s):
- """1-byte length-prefixed string."""
- s = to_bytearray(s)
- return struct.pack(">B", len(s)) + s
- def unpack_lp_string(data, offset=0):
- """Unpack lp_string."""
- size, = struct.unpack_from(">B", data, offset)
- return data[offset+1:offset+1+size]
- def lp2_string(s):
- """2-bytes length-prefixed string."""
- s = to_bytearray(s)
- return struct.pack(">H", len(s)) + s
- def unpack_lp2_string(data, offset=0):
- """Unpack lp2_string."""
- size, = struct.unpack_from(">H", data, offset)
- return data[offset+2:offset+2+size]
- class Item(bytes):
- pass
- def pack_byte(b):
- """Pack PAT item byte."""
- return struct.pack(">BB", ItemType.Byte, b)
- def unpack_byte(data, offset=0):
- """Unpack PAT item byte."""
- item_type, value = struct.unpack_from(">BB", data, offset)
- if item_type != ItemType.Byte:
- raise AssertionError("Invalid type for byte item: {}".format(
- item_type
- ))
- return value
- def pack_optional_fields(info):
- """Pack optional fields (list of field_id and int value).
- Optional fields are retrieved after the following function calls:
- - getLayerData
- - getLayerUserInfo
- - getCircleInfo
- - getUserSearchInfo
- It follows a simple format:
- - field_id, a byte used as an identifier
- - has_value, a byte telling if it has a value
- - value: an integer with the field's value (only set if has_value is true)
- """
- data = struct.pack(">B", len(info))
- for field_id, value in info:
- has_value = value is not None
- data += struct.pack(">BB", field_id, int(has_value))
- if has_value:
- data += struct.pack(">I", value)
- return data
- def unpack_optional_fields(data, offset=0):
- """Unpack optional fields (list of field_id and int value).
- Optional fields are stored after the following function calls:
- - putLayerSet
- - putCircleInfo
- and is also used in PatInterface::sendReqUserSearchSet.
- It seems related to the city info. If correct:
- - field_id 0x01: ???
- - field_id 0x02: City's HR limit (if not applicable, 0xffffffff is set)
- - field_id 0x03: City's goal (if not applicable, 0xffffffff is set)
- - field_id 0x04: City's seeking
- """
- info = []
- count, = struct.unpack_from(">B", data, offset)
- offset += 1
- for _ in range(count):
- field_id, has_value = struct.unpack_from(">BB", data, offset)
- offset += 2
- if has_value:
- value, = struct.unpack_from(">I", data, offset)
- offset += 4
- else:
- value = None
- info.append((field_id, value))
- return info
- def pack_detailed_optional_fields(info):
- """Similar to optional fields but with more fields."""
- data = struct.pack(">II", info["circle"], len(info["data"]))
- for field_id, field_type, value in info["data"]:
- has_value = value is not None
- data += struct.pack(">BBB", field_id, int(has_value), field_type)
- if has_value:
- data += struct.pack(">I", value)
- return data
- def unpack_detailed_optional_fields(data, offset=0):
- """Similar to optional fields but with more fields.
- These fields are used by the following packets:
- - ReqLayerUserSearchHead
- - ReqLayerDetailSearchHead
- - ReqCircleSearchHead
- - ReqUserSearchHead
- Structure format:
- - unk_circle, set to 0 unless from sendReqCircleSearchHead
- - field_count, number of fields
- - array of fields
- Field structure:
- - field_id, a byte used as an identifier
- - has_value, a byte telling if it has a value
- - field_type?, an unknown byte which often holds the value 0x05
- - value, an integer with the field's value (only set if has_value is true)
- """
- info = []
- unk_circle, count = struct.unpack_from(">II", data, offset)
- offset += 8
- for _ in range(count):
- field_id, has_value, field_type = struct.unpack_from(">BBB",
- data, offset)
- offset += 3
- if has_value:
- value, = struct.unpack_from(">I", data, offset)
- offset += 4
- else:
- value = None
- info.append((field_id, field_type, value))
- return {"circle": unk_circle, "data": info}
- class Byte(Item):
- """PAT item byte class."""
- def __new__(cls, b):
- return Item.__new__(cls, pack_byte(b))
- def __repr__(self):
- return "Byte({!r})".format(unpack_byte(self))
- class ByteDec(Byte):
- """PAT item byte that will be decremented."""
- pass
- def pack_word(w):
- """Pack PAT item word."""
- return struct.pack(">BH", ItemType.Word, w)
- def unpack_word(data, offset=0):
- """Unpack PAT item word."""
- item_type, value = struct.unpack_from(">BH", data, offset)
- if item_type != ItemType.Word:
- raise AssertionError("Invalid type for word item: {}".format(
- item_type
- ))
- return value
- class Word(Item):
- """PAT item word class."""
- def __new__(cls, w):
- return Item.__new__(cls, pack_word(w))
- def __repr__(self):
- return "Word({!r})".format(unpack_word(self))
- class WordDec(Word):
- """PAT item word that will be decremented."""
- pass
- def pack_long(lg):
- """Pack PAT item long."""
- return struct.pack(">BI", ItemType.Long, lg)
- def unpack_long(data, offset=0):
- """Unpack PAT item long."""
- item_type, value = struct.unpack_from(">BI", data, offset)
- if item_type != ItemType.Long:
- raise AssertionError("Invalid type for long item: {}".format(
- item_type
- ))
- return value
- class Long(Item):
- """PAT item long class."""
- def __new__(cls, lg):
- return Item.__new__(cls, pack_long(lg))
- def __repr__(self):
- return "Long({!r})".format(unpack_long(self))
- def pack_longlong(q):
- """Pack PAT item long long."""
- return struct.pack(">BQ", ItemType.LongLong, q)
- def unpack_longlong(data, offset=0):
- """Unpack PAT item long long."""
- item_type, value = struct.unpack_from(">BQ", data, offset)
- if item_type != ItemType.LongLong:
- raise AssertionError("Invalid type for long long item: {}".format(
- item_type
- ))
- return value
- class LongLong(Item):
- """PAT item long long class."""
- def __new__(cls, q):
- return Item.__new__(cls, pack_longlong(q))
- def __repr__(self):
- return "LongLong({!r})".format(unpack_longlong(self))
- def pack_string(s):
- """Pack PAT item string."""
- return struct.pack(">B", ItemType.String) + lp2_string(s)
- def unpack_string(data, offset=0):
- """Unpack PAT item string."""
- item_type, length = struct.unpack_from(">BH", data, offset)
- if item_type != ItemType.String:
- raise AssertionError("Invalid type for string item: {}".format(
- item_type
- ))
- return data[offset+3:offset+3+length]
- class String(Item):
- """PAT item string class."""
- def __new__(cls, s):
- return Item.__new__(cls, pack_string(s))
- def __repr__(self):
- return "String({!r})".format(unpack_string(self))
- def pack_binary(s):
- """Pack PAT item binary."""
- s = to_bytearray(s)
- return struct.pack(">BH", ItemType.Binary, len(s)) + s
- def unpack_binary(data, offset=0):
- """Unpack PAT item binary."""
- item_type, length = struct.unpack_from(">BH", data, offset)
- if item_type != ItemType.Binary:
- raise AssertionError("Invalid type for binary item: {}".format(
- item_type
- ))
- return data[offset+3:offset+3+length]
- class Binary(Item):
- """PAT item binary class."""
- def __new__(cls, b):
- return Item.__new__(cls, pack_binary(b))
- def __repr__(self):
- return "Binary({!r})".format(repr(unpack_binary(self)))
- def unpack_any(data, offset=0):
- """Unpack PAT item."""
- item_type, = struct.unpack_from(">B", data, offset)
- handler = {
- ItemType.Byte: unpack_byte,
- ItemType.Word: unpack_word,
- ItemType.Long: unpack_long,
- ItemType.LongLong: unpack_longlong,
- ItemType.String: unpack_string,
- ItemType.Binary: unpack_binary
- }
- return item_type, handler[item_type](data, offset)
- class Custom(Item):
- """PAT custom item class."""
- def __new__(cls, b, item_type=b'\0'):
- return Item.__new__(cls, item_type + b)
- def __repr__(self):
- return "Custom({!r})".format(repr(self[1:]))
- class FallthroughBug(Custom):
- """Wordaround a fallthrough bug.
- After reading LayerData's field_0x17 it falls through a getItemAny call.
- It clobbers the next field by reading the field_id. Then, the game tries
- to read the next field which was clobbered.
- The workaround uses a dummy field_0xff which will be clobbered on purpose.
- """
- def __new__(cls):
- return Custom.__new__(cls, b"\xff", b"\xff")
- def pack_bytes(*args):
- """Pack bytes list."""
- return struct.pack(">B", len(args)) + bytearray(args)
- def unpack_bytes(data, offset=0):
- """Unpack bytes list."""
- count, = struct.unpack_from(">B", data, offset)
- return struct.unpack_from(">" + count * "B", data, offset+1)
- class PatData(OrderedDict):
- """Pat structure holding items."""
- FIELDS = (
- (1, "field_0x01"),
- (2, "field_0x02"),
- (3, "field_0x03"),
- (4, "field_0x04")
- )
- def __len__(self):
- return len(self.pack())
- def __repr__(self):
- items = [
- (index, value)
- for index, value in self.items()
- if value is not None
- ]
- return "{}({})".format(
- type(self).__name__,
- ", ".join(
- "{}={}".format(self.field_name(index), repr(value))
- for index, value in items
- )
- )
- def __getattr__(self, name):
- for field_id, field_name in self.FIELDS:
- if name == field_name:
- if field_id not in self:
- raise AttributeError("{} not set".format(name))
- return self[field_id]
- raise AttributeError("Unknown field: {}".format(name))
- def __setattr__(self, name, value):
- for field_id, field_name in self.FIELDS:
- if name == field_name:
- if not isinstance(value, Item):
- raise ValueError("{!r} not a valid PAT item".format(value))
- self[field_id] = value
- return
- if name.startswith("_"):
- return OrderedDict.__setattr__(self, name, value)
- raise AttributeError("Cannot set unknown field: {}".format(name))
- def __delattr__(self, name):
- for field_id, field_name in self.FIELDS:
- if name == field_name:
- del self[field_id]
- return
- return OrderedDict.__delattr__(self, name)
- def __setitem__(self, key, value):
- if not isinstance(key, int) or not (0 <= key <= 255):
- raise IndexError("index must be a valid numeric value")
- elif not isinstance(value, Item):
- raise ValueError("{!r} not a valid PAT item".format(value))
- return OrderedDict.__setitem__(self, key, value)
- def __contains__(self, key):
- if isinstance(key, str):
- for field_id, field_name in self.FIELDS:
- if field_name == key:
- key = field_id
- break
- else:
- return False
- if not isinstance(key, int):
- raise IndexError("key must be a valid field's name/index")
- return OrderedDict.__contains__(self, key)
- def field_name(self, index):
- for field_id, field_name in self.FIELDS:
- if index == field_id:
- return field_name
- return "field_0x{:02x}".format(index)
- def pack(self):
- """Pack PAT items."""
- items = [
- (index, value)
- for index, value in self.items()
- if value is not None
- ]
- return struct.pack(">B", len(items)) + b"".join(
- (struct.pack(">B", index) + value)
- for index, value in items
- )
- def pack_fields(self, fields):
- """Pack PAT items specified fields."""
- items = [
- (index, value)
- for index, value in self.items()
- if value is not None and index in fields
- ]
- return struct.pack(">B", len(items)) + b"".join(
- (struct.pack(">B", index) + value)
- for index, value in items
- )
- @classmethod
- def unpack(cls, data, offset=0):
- obj = cls()
- field_count, = struct.unpack_from(">B", data, offset)
- offset += 1
- for _ in range(field_count):
- field_id, = struct.unpack_from(">B", data, offset)
- offset += 1
- item_type, value = unpack_any(data, offset)
- offset += 1
- if item_type == ItemType.Byte:
- obj[field_id] = Byte(value)
- offset += 1
- elif item_type == ItemType.Word:
- obj[field_id] = Word(value)
- offset += 2
- elif item_type == ItemType.Long:
- obj[field_id] = Long(value)
- offset += 4
- elif item_type == ItemType.LongLong:
- obj[field_id] = LongLong(value)
- offset += 8
- elif item_type == ItemType.String:
- obj[field_id] = String(value)
- offset += 2 + len(value)
- elif item_type == ItemType.Binary:
- obj[field_id] = Binary(value)
- offset += 2 + len(value)
- else:
- raise ValueError("Unknown type: {}".format(item_type))
- return obj
- def assert_fields(self, fields):
- items = set(self.keys())
- fields = set(fields)
- message = "Fields mismatch: {}\n -> Expected: {}".format(items, fields)
- assert items == fields, message
- class DummyData(PatData):
- FIELDS = tuple()
- class CollectionLog(PatData):
- FIELDS = (
- (0x01, "error_code"),
- (0x02, "unk_long_0x02"),
- (0x03, "timeout_value"),
- )
- class ConnectionData(PatData):
- # Constants are based on the US version of the game
- FIELDS = (
- (0x01, "online_support_code"),
- (0x02, "pat_ticket"),
- (0x03, "constant_0x3"),
- (0x04, "constant_0x2"),
- (0x05, "converted_country_code"),
- (0x06, "unknown_long_0x06"),
- (0x07, "media_version"),
- (0x08, "constant_0x4000"),
- (0x09, "country_code"),
- (0x0a, "language_code"),
- )
- class ChargeInfo(PatData):
- FIELDS = (
- (0x01, "ticket_validity1"),
- (0x02, "ticket_validity2"),
- (0x05, "unk_binary_0x05"),
- (0x07, "online_support_code"),
- )
- class LoginInfo(PatData):
- FIELDS = (
- (0x01, "ticket_validity1"),
- (0x02, "ticket_validity2"),
- (0x06, "unk_binary_0x06"),
- (0x07, "online_support_code"),
- (0x08, "unk_string_0x08"),
- (0x09, "state"),
- (0x0a, "nas_token"),
- )
- class UserObject(PatData):
- FIELDS = (
- (0x01, "slot_index"),
- (0x02, "capcom_id"),
- (0x03, "hunter_name"),
- (0x04, "unk_long_0x04"),
- (0x05, "unk_long_0x05"),
- (0x06, "unk_long_0x06"),
- (0x07, "unk_long_0x07"),
- (0x08, "unk_string_0x08"),
- )
- class FmpData(PatData):
- FIELDS = (
- (0x01, "index"),
- (0x02, "server_address"),
- (0x03, "server_port"),
- (0x07, "server_type"),
- (0x08, "player_count"),
- (0x09, "player_capacity"),
- (0x0a, "server_name"),
- (0x0b, "unk_string_0x0b"),
- (0x0c, "unk_long_0x0c"),
- )
- class UserSearchInfo(PatData):
- FIELDS = (
- (0x01, "capcom_id"),
- (0x02, "hunter_name"),
- (0x03, "stats"),
- (0x04, "layer_host"),
- (0x07, "unk_byte_0x07"), # Index
- (0x08, "server_name"),
- (0x0b, "unk_byte_0x0b"),
- (0x0c, "unk_string_0x0c"),
- (0x0d, "city_capacity"),
- (0x0e, "city_size"),
- (0x0f, "info_mine_0x0f"),
- (0x10, "info_mine_0x10"),
- )
- class LayerPath(object):
- STRUCT = struct.Struct(">IIHHH")
- def __init__(self, server_id=None, gate_id=None, city_id=None):
- # type: (int|None, int|None, int|None) -> None
- self.server_id = server_id or 0
- self.gate_id = gate_id or 0
- self.city_id = city_id or 0
-
- def get_depth(self):
- # type: () -> int
- if self.city_id > 0:
- return City.LAYER_DEPTH
- elif self.gate_id > 0:
- return Gate.LAYER_DEPTH
- elif self.server_id > 0:
- return Server.LAYER_DEPTH
- return -1
- def pack(self):
- # type: () -> bytes
- depth = self.get_depth()
- unk = 1
- return self.STRUCT.pack(depth, self.server_id, unk, self.gate_id,
- self.city_id)
-
- @staticmethod
- def unpack(data):
- # type: (bytes) -> LayerPath
- with Unpacker(data) as unpacker:
- _, server_id = unpacker.struct(">II")
- field_count = len(unpacker) // 2
- assert field_count <= 3
- gate_id = 0
- city_id = 0
- for i in range(field_count):
- if i == 0:
- _ = unpacker.struct(">H")
- elif i == 1:
- gate_id = unpacker.struct(">H")
- else:
- city_id = unpacker.struct(">H")
- return LayerPath(server_id, gate_id, city_id)
- class LayerData(PatData):
- FIELDS = (
- (0x01, "unk_long_0x01"),
- (0x02, "layer_host"),
- (0x03, "name"),
- (0x05, "index"),
- (0x06, "size"),
- (0x07, "size2"), # Used in search
- (0x08, "unk_long_0x08"),
- (0x09, "capacity"),
- (0x0a, "unk_long_0x0a"),
- (0x0b, "child_population"),
- (0x0c, "unk_long_0x0c"),
- (0x0d, "unk_word_0x0d"),
- (0x10, "state"), # 0 = Joinable / 1 = Empty / 2 = Full
- (0x11, "positionInterval"), # player position synchronization timer
- (0x12, "unk_byte_0x12"),
- (0x15, "layer_depth"),
- (0x16, "layer_pathname"),
- (0x17, "unk_binary_0x17"),
- (0xff, "fallthrough_bug") # Fill this if field 0x17 is set !!!
- )
- @staticmethod
- def create_from(index, layer, path=None):
- data = LayerData()
- data.unk_long_0x01 = Long(index)
- data.index = Word(index)
- data.positionInterval = Long(500)
- data.unk_byte_0x12 = Byte(1)
- if isinstance(layer, Server):
- data.name = String(layer.name)
- data.size = Long(layer.get_population())
- data.size2 = Long(layer.get_population())
- data.capacity = Long(layer.get_capacity())
- data.state = Byte(0)
- data.layer_depth = Byte(layer.LAYER_DEPTH)
- elif isinstance(layer, Gate):
- data.name = String(layer.name)
- data.size = Long(layer.get_population())
- data.size2 = Long(layer.get_population())
- data.capacity = Long(layer.get_capacity())
- data.state = Byte(layer.get_state())
- data.layer_depth = Byte(layer.LAYER_DEPTH)
- else:
- assert isinstance(layer, City)
- if path is None and layer.leader is not None:
- path = layer.leader.get_layer_path()
- data.name = String(layer.name)
- data.size = Long(layer.get_population())
- data.size2 = Long(layer.get_population())
- data.capacity = Long(layer.get_capacity())
- data.child_population = Long(layer.in_quest_players())
- data.unk_long_0x0c = Long(0xc) # TODO: Reverse
- data.state = Byte(layer.get_state())
- data.layer_depth = Byte(layer.LAYER_DEPTH)
- data.layer_pathname = String(layer.get_pathname())
- if path is not None:
- data.layer_host = Binary(path.pack())
- # These fields are read when used in the NtcLayerInfoSet packet
- # Purpose is unknown
- # data.unk_binary_0x17 = Binary("test")
- # data.fallthrough_bug = FallthroughBug()
- return data
- class FriendData(PatData):
- FIELDS = (
- (0x01, "index"),
- (0x02, "capcom_id"),
- (0x03, "hunter_name"),
- )
- class BlackListUserData(FriendData):
- pass
- class UserStatusSet(PatData):
- FIELDS = (
- (0x01, "unk_byte_0x01"),
- (0x02, "unk_byte_0x02"),
- (0x03, "unk_byte_0x03"),
- (0x04, "unk_byte_0x04"),
- (0x05, "unk_byte_0x05"),
- (0x08, "unk_byte_0x08"),
- (0x09, "unk_byte_0x09"), # Byte(4) = Deny Friend Requests
- )
- class LayerSet(PatData):
- FIELDS = (
- (0x01, "unk_long_0x01"),
- (0x02, "unk_binary_0x02"),
- (0x03, "unk_string_0x03"),
- (0x05, "unk_binary_0x05"),
- (0x09, "unk_long_0x09"),
- (0x0a, "unk_long_0x0a"),
- (0x0c, "unk_long_0x0c"),
- (0x17, "unk_binary_0x17"),
- )
- class MediationListItem(PatData):
- FIELDS = (
- (0x01, "name"), # name or owner?
- (0x02, "index"),
- (0x03, "is_locked"), # 1 - locked, unlocked otherwise
- )
- class CircleInfo(PatData):
- FIELDS = (
- (0x01, "index"),
- (0x02, "unk_string_0x02"),
- (0x03, "has_password"),
- (0x04, "password"),
- (0x05, "party_members"),
- (0x06, "remarks"),
- (0x07, "player_count"),
- (0x08, "unk_long_0x08"),
- (0x09, "capacity"),
- (0x0a, "unk_long_0x0a"),
- (0x0b, "unk_long_0x0b"),
- (0x0c, "index2"),
- (0x0d, "leader_capcom_id"),
- (0x0e, "unk_byte_0x0e"),
- (0x0f, "not_joinable"),
- (0x10, "unk_byte_0x10"),
- )
- @staticmethod
- def pack_from(circle, circle_index):
- with circle.lock():
- circle_info = CircleInfo()
- circle_info.index = Long(circle_index)
- if not circle.is_empty():
- # circle_info.unk_string_0x02 = pati.String("192.168.23.1")
- if circle.has_password():
- circle_info.has_password = Byte(1)
- circle_info.password = String(circle.password)
- party_members = bytearray(0x88)
- for i, player in circle.players:
- start_offset = 0x10 * i
- end_offset = start_offset + 0x10
- # TODO: Shouldn't we use bytes instead?
- party_members[start_offset:end_offset] = \
- pad(player.capcom_id.encode('ascii'), 0x10)
- party_members[start_offset+0x40:end_offset+0x40] = \
- pad(player.hunter_name, 0x10)
- party_members[0x80+i] = player.hunter_info.weapon_type()
- # TODO: Discover flag meaning?
- party_members[0x84+i] = 0xff
- circle_info.party_members = Binary(party_members)
- if circle.remarks is not None:
- circle_info.remarks = String(circle.remarks)
- # Number of players currently in the quest
- circle_info.player_count = Long(len(circle.players))
- # circle_info.unk_long_0x08 = Long(3)
- circle_info.capacity = Long(circle.get_capacity())
- # circle_info.unk_long_0x0a = Long(5)
- # circle_info.unk_long_0x0b = Long(6)
- circle_info.index2 = Long(circle_index) # TODO: Verify this
- circle_info.leader_capcom_id = String(circle.leader.capcom_id)
- circle_info.unk_byte_0x0e = Byte(circle.unk_byte_0x0e)
- circle_info.not_joinable = Byte(int(not circle.is_joinable()))
- # circle_info.unk_byte_0x10 = pati.Byte(1)
- # TODO: Other optional fields
- optional_fields = [
- (1, circle.get_capacity()),
- (2, circle.quest_id)
- ]
- return circle_info.pack() + pack_optional_fields(optional_fields)
- class CircleUserData(PatData):
- FIELDS = (
- (0x01, "unk_binary_0x01"),
- (0x02, "unk_word_0x02"),
- (0x03, "is_standby"),
- (0x04, "player_index"),
- (0x05, "capcom_id"),
- (0x06, "hunter_name"),
- )
- class MessageInfo(PatData):
- FIELDS = (
- (0x01, "text_color"), # RGBA
- (0x02, "unk_long_0x02"), # Probably time related?
- (0x03, "sender_id"),
- (0x04, "sender_name"),
- )
- class LayerUserInfo(PatData):
- FIELDS = (
- (0x01, "capcom_id"),
- (0x02, "hunter_name"),
- (0x03, "layer_host"),
- (0x06, "unk_long_0x06"),
- (0x07, "stats"),
- )
- class LayerBinaryInfo(PatData):
- FIELDS = (
- (0x01, "unk_long_0x01"), # time related
- (0x02, "capcom_id"),
- (0x03, "hunter_name")
- )
- class LayerUserNum(PatData):
- FIELDS = (
- (0x01, "path"),
- (0x02, "population"),
- (0x03, "index"),
- (0x04, "unk_long_0x04"),
- (0x05, "max_population"),
- (0x06, "unk_long_0x06"),
- (0x07, "child_population"),
- )
- @staticmethod
- def pack_from(layer_data):
- # type: (LayerData) -> bytes
- data = LayerUserNum()
- data.path = layer_data.layer_host
- data.population = layer_data.size
- data.index = Long(unpack_word(layer_data.index))
- data.unk_long_0x04 = Long(0xAF00FA00)
- data.max_population = layer_data.capacity
- data.unk_long_0x06 = Long(0xBF00FB00)
- if "child_population" in layer_data:
- data.child_population = layer_data.child_population
- return data.pack()
- def patdata_extender(unpacker):
- """PatData classes must be defined above this function."""
- for name, value in globals().items():
- if not isinstance(value, type):
- continue # Not a valid class
- if not issubclass(value, PatData):
- continue
- unpacker.MAPPING[name] = (value.unpack, value.pack)
- return unpacker
- @patdata_extender
- class Unpacker(GenericUnpacker):
- """PAT item unpacker with PatData support."""
- MAPPING = {
- "struct": (struct.unpack_from, struct.pack),
- "lp_string": (unpack_lp_string, lp_string),
- "lp2_string": (unpack_lp2_string, lp2_string),
- "byte": (unpack_byte, pack_byte),
- "word": (unpack_word, pack_word),
- "long": (unpack_long, pack_long),
- "longlong": (unpack_longlong, pack_longlong),
- "string": (unpack_string, pack_string),
- "binary": (unpack_binary, pack_binary),
- "bytes": (unpack_bytes, pack_bytes),
- "optional_fields": (unpack_optional_fields,
- pack_optional_fields),
- "detailed_optional_fields": (unpack_detailed_optional_fields,
- pack_detailed_optional_fields)
- }
- class HunterSettings(object):
- """Helper for hunter settings.
- The game's recvNtcUserBinaryNotice function suggests that the maximum size
- is 0x100. However, some parts of the code suggest otherwise due to a
- `< 0x100` check.
- See the dispatched packet, case 0x8071, 0x803edc64 (RMHE08).
- """
- SIZE = 0x100
- def __init__(self):
- self.data = bytearray(self.SIZE)
- def unpack(self, data, length, offset=0):
- self.data[offset:offset+length] = data
- return self
- def rank(self):
- rank, = struct.unpack_from(">H", self.data, 0x00)
- return rank
- def weapon_type(self):
- weapon_type, = struct.unpack_from(">B", self.data, 0x05)
- return weapon_type
- def pack(self):
- return self.data
- def get_fmp_servers(session, first_index, count):
- assert first_index > 0, "Invalid list index"
- config = get_config("FMP")
- fmp_addr = get_ip(config["IP"])
- fmp_port = config["Port"]
- data = b""
- start = first_index - 1
- end = start + count
- servers = session.get_servers()[start:end]
- for i, server in enumerate(servers, first_index):
- fmp_data = FmpData()
- fmp_data.index = Long(i) # The server might be full, if zero
- server.addr = server.addr or fmp_addr
- server.port = server.port or fmp_port
- fmp_data.server_address = String(server.addr)
- fmp_data.server_port = Word(server.port)
- # Might produce invalid reads if too high
- # fmp_data.server_type = LongLong(i+0x10000000)
- # fmp_data.server_type = LongLong(i + (1<<32)) # OK
- fmp_data.server_type = LongLong(server.server_type)
- fmp_data.player_count = Long(server.get_population())
- fmp_data.player_capacity = Long(server.get_capacity())
- fmp_data.server_name = String(server.name)
- fmp_data.unk_string_0x0b = String("X")
- fmp_data.unk_long_0x0c = Long(0x12345678)
- data += fmp_data.pack()
- return data
- def get_layer_children(session, first_index, count, sibling=False):
- assert first_index > 0, "Invalid list index"
- data = b""
- start = first_index - 1
- end = start+count
- if not sibling:
- children = session.get_layer_children()[start:end]
- else:
- children = session.get_layer_sibling()[start:end]
- for i, child in enumerate(children, first_index):
- layer = LayerData.create_from(i, child)
- data += layer.pack()
- data += pack_optional_fields(child.optional_fields)
- return data
- def get_layer_sibling(session, first_index, count):
- return get_layer_children(session, first_index, count, True)
- def getDummyLayerData():
- layer = LayerData()
- layer.index = Word(1) # Index
- # layer.unk_custom_0x02 = Custom(b"")
- layer.name = String("LayerStart")
- # layer.unk_worddec_0x05 = Word(2) # City no longer exists message
- # Player in the city, displayed at the gate
- layer.size = Long(0)
- # layer.unk_long_0x07 = Long(1)
- # layer.unk_long_0x08 = Long(1)
- # Maximum number of players in the city, displayed at the gate
- layer.capacity = Long(4)
- # layer.unk_long_0x0a = Long(1)
- # In city population, displayed at the gate
- # layer.unk_long_0x0b = Long(0)
- # layer.unk_long_0x0c = Long(1)
- # layer.unk_word_0x0d = Word(1)
- layer.state = Byte(1)
- layer.positionInterval = Long(500)
- # layer.unk_long_0x11 = Long(1)
- # Might be needed to be >=1 to keep NetworkConnectionStable alive
- layer.unk_byte_0x12 = Byte(1)
- # layer.layer_depth = Byte(1)
- # layer.layer_pathname = String("UnkStart")
- # layer.unk_binary_0x17 = Binary(b"binStart")
- return layer
- def getHunterStats(hr=921, profile=b"Navaldeus",
- title=117, status=1, hr_limit=2, goal=35, seeking=23,
- server_type=3, weapon_type=10, weapon_id=11):
- """
- Offsets:
- - 0x00: Hunter Rank
- - 0x10~0x7c: Equipment
- - 0x9c: Profile description
- - 0xf2: Profile titles
- - 0xf3: Profile status
- - 0xf5: City's HR limit
- - 0xf6: City's goal
- - 0xf7: City's seeking
- - 0xf8: Server type
- """
- profile = to_bytearray(profile)
- if profile[-1] != b"\0":
- profile += b"\0"
- data = bytearray(0x100) # fuzz.repeat(fuzz.MSF_PATTERN, 0x100)
- def slot(type_id, equipment_id, slots=0):
- """Equipment slot / TODO: Handle gems"""
- return struct.pack(">BBHII", type_id, slots, equipment_id, 0, 0)
- data[:2] = struct.pack(">H", hr)
- # Weapon / Gun slots (Lance: Nega-Babylon)
- data[0x10:0x1c] = slot(weapon_type, weapon_id)
- data[0x1c:0x28] = b"\xff" * 0xc
- data[0x28:0x34] = b"\xff" * 0xc
- # Armors (Helios+ set)
- data[0x34:0x40] = slot(5, 111)
- data[0x40:0x4c] = slot(1, 116)
- data[0x4c:0x58] = slot(3, 115)
- data[0x58:0x64] = slot(2, 109)
- data[0x64:0x70] = slot(4, 113)
- data[0x70:0x7c] = slot(6, 7, slots=3)
- data[0x9c:0x9c+len(profile)] = profile
- data[0xf2] = title
- data[0xf3] = status
- data[0xf5] = hr_limit
- data[0xf6] = goal
- data[0xf7] = seeking
- data[0xf8] = server_type
- return data
|