1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # SPDX-FileCopyrightText: Copyright (C) 2021-2023 MH3SP Server Project
- # SPDX-License-Identifier: AGPL-3.0-or-later
- """Monster Hunter PAT module."""
- import struct
- import traceback
- from datetime import timedelta
- from other.utils import Logger, get_config, get_ip, hexdump, to_str
- import mh.pat_item as pati
- import mh.server as server
- import mh.time_utils as time_utils
- from mh.constants import \
- LAYER_CHAT_COLORS, TERMS_VERSION, TERMS, SUBTERMS, ANNOUNCE, \
- CHARGE, VULGARITY_INFO, FMP_VERSION, PAT_BINARIES, PAT_NAMES, \
- PatID4, get_pat_binary_from_version
- from mh.session import Session
- import mh.database as db
- try:
- from typing import Literal, List, Union, Optional # noqa: F401
- LayerUserNumUpdate = Literal[1,2,3,4,5]
- except ImportError:
- pass
- g_circle = None
- g_circle_info_set = None
- class PatServer(server.BasicPatServer, Logger):
- """Generic PAT server class."""
- def __init__(self, address, handler_class, binary_loader,
- max_thread_count=0, logger=None, debug_mode=False,
- ssl_cert=None, ssl_key=None):
- server.BasicPatServer.__init__(
- self, address, handler_class, max_thread_count,
- ssl_cert=ssl_cert, ssl_key=ssl_key
- )
- Logger.__init__(self)
- if logger:
- self.set_logger(logger)
- self.info("Running on {} port {}".format(*address))
- self.debug_con = []
- self.debug_mode = debug_mode
- self.binary_loader = binary_loader
- def add_to_debug(self, con):
- """Add connection to the debug connection list."""
- self.debug_con.append(con)
- def del_from_debug(self, con):
- """Delete connection from the debug connection list."""
- self.debug_con.remove(con)
- def get_debug(self):
- """Return the debug connection list."""
- return self.debug_con
- def debug_enabled(self):
- return self.debug_mode
- def get_pat_handler(self, session):
- """Return pat handler from session"""
- for handler in self.debug_con:
- if handler.session == session:
- return handler
- return None
- def broadcast(self, players, packet_id, data, seq, to_exclude=None):
- # type: (db.Players, int, bytes, int, Session|None) -> None
- handlers = []
- with players.lock():
- for _, player in players:
- if player == to_exclude:
- continue
- handler = self.get_pat_handler(player)
- if handler:
- handlers.append(handler)
- for handler in handlers:
- handler.try_send_packet(packet_id, data, seq)
- def layer_broadcast(self, session, packet_id, data, seq,
- exclude_self=True):
- # type: (Session, int, bytes, int, bool) -> None
- self.broadcast(session.get_layer_players(), packet_id, data, seq,
- session if exclude_self else None)
- def circle_broadcast(self, circle, packet_id, data, seq,
- session=None):
- # type: (db.Circle, int, bytes, int, Session|None) -> None
- self.broadcast(circle.players, packet_id, data, seq, session)
-
- class PatRequestHandler(server.BasicPatHandler):
- """Generic PAT request handler class.
- When possible, each packet is described with:
- - ID: Packet ID (in hexadecimal)
- - JP: Japanese description from the game's packet table
- - TR: A roughly translated description
- At some point, I'll add packet hexdumps and description for
- a better understanding. All descriptions are provided with
- the best of our understanding of the packets and might be
- inaccurate. `unk` stands for `unknown`.
- """
- def on_init(self):
- """Default PAT handler."""
- self.server.info("Handle client from {}".format(self.client_address))
- self.server.add_to_debug(self)
- self.session = Session(self)
- self.ping_timer = time_utils.Timer()
- self.requested_connection = False
- self.line_check = True
- def try_send_packet(self, packet_id=0, data=b'', seq=0):
- """Send PAT packet and catch exceptions."""
- try:
- self.send_packet(packet_id, data, seq)
- except Exception:
- self.server.warning(
- "Failed to send %s[ID=%08x; Seq=%04x]\n%s\n%s",
- PAT_NAMES.get(packet_id, "Packet"),
- packet_id, seq, traceback.format_exc(), hexdump(data)
- )
- def try_send_packet_to(self, capcom_id, packet_id=0, data=b'', seq=0):
- """Send PAT packet to specific Capcom ID and catch exceptions."""
- session = self.session.find_user_by_capcom_id(capcom_id)
- if not session:
- self.server.warning("Failed to send packet to %s: user not found",
- capcom_id)
- return
- handler = self.server.get_pat_handler(session)
- if handler:
- handler.try_send_packet(packet_id, data, seq)
- def sendAnsNg(self, packet_id, message, seq):
- unk1 = 1 # If value is 0, the message is not rendered
- data = struct.pack(">I", unk1)
- data += pati.lp2_string(message)
- packet_id = packet_id | 0xff
- self.send_packet(packet_id, data, seq)
- def sendAnsAlert(self, packet_id, message, seq):
- unk1 = 1 # If value is 0, the message is not rendered
- data = struct.pack(">I", unk1)
- data += pati.lp2_string(message)
- packet_id = packet_id | 0x01
- self.send_packet(packet_id, data, seq)
- def send_error(self, message, seq=0):
- """Send an error message."""
- try:
- MAX_SIZE = 0x200
- MAX_WIDTH = 55
- MAX_HEIGHT = 15
- LINE_PER_PAGE = min(MAX_HEIGHT, MAX_SIZE // MAX_WIDTH)
- lines = [
- "<LF=2><BODY><CENTER>A communication error occurred",
- "<BR><BODY>Check the server log files for more details",
- "<BR><LEFT>",
- ] + message.replace("\\", "/").split("\n")
- def range_slice(obj, chunk_size):
- """Range helper."""
- length = len(obj)
- return range(length // chunk_size + (length % chunk_size > 0))
- def helper(line):
- """Split lines helper."""
- if len(line) <= MAX_WIDTH:
- return [line]
- return [
- line[i*MAX_WIDTH:(i+1)*MAX_WIDTH]
- for i in range_slice(line, MAX_WIDTH)
- ]
- # Flatten the results
- lines = sum([helper(line) for line in lines], [])
- for i in range_slice(lines, LINE_PER_PAGE):
- seq += 1
- message = "<BR><BODY>".join(lines[i*LINE_PER_PAGE:
- (i+1)*LINE_PER_PAGE])
- # It seems we can't send multiple messages
- # self.sendNtcShut("<LF=2><BODY>"+message+"<END>", seq)
- self.sendNtcShut(message + "<END>", 0)
- # The game will close the connection and the next messages
- # won't be received. The except block will be reached if the
- # message is too long.
- except Exception:
- # Probably unreachable and was disconnected
- self.server.warning("Failed to send a complete error message")
- finally:
- self.session.request_reconnection = False
- self.finish()
- def recvNtcCollectionLog(self, packet_id, data, seq):
- """NtcCollectionLog packet.
- ID: 60501000
- JP: 収集ログ通知
- TR: Collection log notification
- This packet is sent by the game when an error occurs.
- TODO: Find all error codes and their meanings.
- Error codes:
- - 0x80050037: Unable to find CircleListLayer Slot
- - 0x80060002: Fail to warp to location (Layer index error?)
- - 0x80070002: Empty NetworkUniqueId?
- """
- data = pati.CollectionLog.unpack(data)
- self.server.debug("CollectionLog: {!r}".format(data))
- def sendReqLineCheck(self):
- """ReqLineCheck packet.
- ID: 60010100
- JP: ラインチェック
- TR: Line check
- The server sends a request to check if the player is still online.
- The game will close the connection after 90s, if it doesn't receive it.
- """
- self.send_packet(PatID4.ReqLineCheck)
- def recvAnsLineCheck(self, packet_id, data, seq):
- """AnsLineCheck packet.
- ID: 60010200
- JP: ラインチェック
- TR: Line check
- The game sends this packet after receiving a ReqLineCheck packet.
- """
- self.line_check = True
- def sendReqConnection(self, unused=0, seq=0):
- """ReqConnection packet.
- ID: 60200100
- JP: PAT接続環境要求
- TR: PAT connection settings request
- The server sends a request to the game to establish a PAT connection.
- It also sends a parameter that seems unused on the western versions.
- """
- data = struct.pack(">I", unused)
- self.send_packet(PatID4.ReqConnection, data, seq)
- def recvAnsConnection(self, packet_id, data, seq):
- """AnsConnection packet.
- ID: 60200200
- JP: PAT接続環境返答
- TR: PAT connection settings response
- The games sends the PAT environment properties.
- """
- settings = pati.ConnectionData.unpack(data)
- self.server.debug("Connection: {!r}".format(settings))
- pat_ticket = b""
- if "pat_ticket" in settings:
- _, pat_ticket = pati.unpack_any(settings.pat_ticket)
- elif "online_support_code" in settings:
- _, pat_ticket = pati.unpack_any(settings.online_support_code)
- self.server.info("Client {} Ticket `{}`".format(self.client_address,
- pat_ticket))
- self.sendNtcLogin(5, settings, seq)
- def sendNtcLogin(self, server_status, connection_data, seq):
- """NtcLogin packet.
- ID: 60211000
- JP: ログイン処理概要通知
- TR: Login process summary notification
- The server sends upon login a notification with the server status.
- """
- data = struct.pack(">B", server_status)
- self.session = self.session.get(connection_data)
- self.send_packet(PatID4.NtcLogin, data, seq)
- def recvReqAuthenticationToken(self, packet_id, data, seq):
- """ReqAuthenticationToken packet.
- ID: 62600100
- JP: 認証トークン送信
- TR: Send authentication token
- The games requests a PAT authentication by forwarding the token
- obtained from Nintendo NAS server.
- """
- nas_token = pati.unpack_lp2_string(data)
- self.server.info("Client {} - NAS `{}`".format(self.client_address,
- nas_token))
- self.sendAnsAuthenticationToken(nas_token, seq)
- def sendAnsAuthenticationToken(self, nas_token, seq):
- """AnsAuthenticationToken packet.
- ID: 62600200
- JP: 認証トークン返答
- TR: Authentication token response
- The server replies this packet to acknowledge the authentication.
- """
- self.send_packet(PatID4.AnsAuthenticationToken, b'', seq)
- def recvReqTermsVersion(self, packet_id, data, seq):
- """ReqTermsVersion packet.
- ID: 62100100
- JP: 利用規約情報確認
- TR: Terms of use information verification
- The game requests the terms version and its total size.
- """
- self.sendAnsTermsVersion(TERMS_VERSION, len(TERMS[TERMS_VERSION]), seq)
- def sendAnsTermsVersion(self, terms_version, terms_size, seq):
- """AnsTermsVersion packet.
- ID: 62100200
- JP: 利用規約情報応答
- TR: Terms of use information response
- The server replies with the terms version and total size.
- """
- data = struct.pack(">II", terms_version, terms_size)
- self.send_packet(PatID4.AnsTermsVersion, data, seq)
- def recvReqTerms(self, packet_id, data, seq):
- """ReqTerms packet.
- ID: 62110100
- JP: 利用規約要求
- TR: Terms of use request
- The game requests the terms based on what it has already read.
- """
- version, offset, size = struct.unpack(">III", data)
- self.sendAnsTerms(offset, size, TERMS[version], seq)
- def sendAnsTerms(self, offset, size, terms, seq):
- """AnsTerms packet.
- ID: 62110200
- JP: 利用規約応答
- TR: Terms of use response
- The server replies with the terms offset, size and chunk requested.
- """
- data = struct.pack(">II", offset, size)
- data += pati.lp2_string(terms[offset:offset+size])
- self.send_packet(PatID4.AnsTerms, data, seq)
- def recvReqSubTermsInfo(self, packet_id, data, seq):
- """ReqSubTermsInfo european packet.
- ID: 62130100
- JP: サブ利用規約指定情報確認
- TR: Sub-terms of use information verification
- The game requests the sub-terms info.
- """
- unk, = struct.unpack(">B", data)
- self.sendAnsSubTermsInfo(unk, len(SUBTERMS[TERMS_VERSION]), seq)
- def sendAnsSubTermsInfo(self, unk, size, seq):
- """AnsSubTermsInfo european packet.
- ID: 62130200
- JP: サブ利用規約指定情報応答
- TR: Sub-terms of use information response
- The server acknowledges the request.
- """
- unk1 = 1
- data = struct.pack(">IBI", unk1, unk, size)
- self.send_packet(PatID4.AnsSubTermsInfo, data, seq)
- def recvReqSubTerms(self, packet_id, data, seq):
- """ReqSubTerms european packet.
- ID: 62140100
- JP: 利用規約要求
- TR: Sub-terms of use request
- The game requests the sub-terms based on what it has already read.
- """
- version, unk, offset, size = struct.unpack(">IBII", data)
- assert version == TERMS_VERSION, "Terms and subterms version mismatch"
- self.sendAnsSubTerms(unk, offset, size, SUBTERMS[version], seq)
- def sendAnsSubTerms(self, unk, offset, size, subterms, seq):
- """AnsSubTerms european packet.
- ID: 62140200
- JP: サブ利用規約応答
- TR: Sub-terms of use response
- The server replies with the terms offset, size and chunk requested.
- """
- data = struct.pack(">BII", unk, offset, size)
- data += pati.lp2_string(subterms[offset:offset+size])
- self.send_packet(PatID4.AnsSubTerms, data, seq)
- def recvReqAnnounce(self, packet_id, data, seq):
- """ReqAnnounce packet.
- ID: 62300100
- JP: お知らせ要求
- TR: Notice request
- The game requests the announce text.
- """
- self.sendAnsAnnounce(ANNOUNCE, seq)
- def sendAnsAnnounce(self, announce, seq):
- """AnsAnnounce packet.
- ID: 62300200
- JP: お知らせ通知
- TR: Notice response
- The server replies with the announce text.
- """
- data = pati.lp2_string(announce)
- self.send_packet(PatID4.AnsAnnounce, data, seq)
- def recvReqNoCharge(self, packet_id, data, seq):
- """ReqNoCharge packet.
- ID: 62310100
- JP: 未課金メッセージ要求
- TR: Unpaid message request
- The game requests the no-charge text.
- NB:
- - Japanese servers were online paid.
- - Western servers were free-to-play.
- """
- self.sendAnsNoCharge(CHARGE, seq)
- def sendAnsNoCharge(self, no_charge, seq):
- """AnsNoCharge packet.
- ID: 62310200
- JP: 未課金メッセージ通知
- TR: Unpaid message response
- The server replies with the no-charge text.
- """
- data = pati.lp2_string(no_charge)
- self.send_packet(PatID4.AnsNoCharge, data, seq)
- def recvReqVulgarityInfoHighJAP(self, packet_id, data, seq):
- """ReqVulgarityInfoHigh japanese packet.
- ID: 62500100
- JP: 名前用禁止文言要求
- TR: Forbidden names request
- """
- self.sendAnsVulgarityInfoHighJAP(seq)
- def sendAnsVulgarityInfoHighJAP(self, seq):
- """AnsVulgarityInfoHigh japanese packet.
- ID: 62500200
- JP: 名前用禁止文言応答
- TR: Forbidden names response
- """
- unk = 1
- data = struct.pack(">II", unk, len(VULGARITY_INFO))
- self.send_packet(PatID4.AnsVulgarityInfoHighJAP, data, seq)
- def recvReqVulgarityInfoLowJAP(self, packet_id, data, seq):
- """ReqVulgarityInfoLow japanese packet.
- ID: 62520100
- JP: 名前以外用禁止文言要求
- TR: Forbidden words request
- """
- self.sendAnsVulgarityInfoLowJAP(seq)
- def sendAnsVulgarityInfoLowJAP(self, seq):
- """AnsVulgarityInfoLow japanese packet.
- ID: 62520200
- JP: 名前以外用禁止文言応答
- TR: Forbidden words response
- """
- unk = 1
- data = struct.pack(">II", unk, len(VULGARITY_INFO))
- self.send_packet(PatID4.AnsVulgarityInfoLowJAP, data, seq)
- def recvReqVulgarityInfoLow(self, packet_id, data, seq):
- """ReqVulgarityInfoLow packet.
- ID: 62560100
- JP: 真・名前以外用禁止文言要求
- TR: (New) Forbidden words request
- """
- info, = struct.unpack(">I", data)
- self.sendAnsVulgarityInfoLow(info, seq)
- def sendAnsVulgarityInfoLow(self, info, seq):
- """AnsVulgarityInfoLow packet.
- ID: 62560200
- JP: 真・名前以外用禁止文言応答
- TR: (New) Forbidden words response
- """
- unk = 1
- data = struct.pack(">III", unk, info, len(VULGARITY_INFO))
- self.send_packet(PatID4.AnsVulgarityInfoLow, data, seq)
- def recvReqVulgarityLow(self, packet_id, data, seq):
- """ReqVulgarityLow packet.
- ID: 62570100
- JP: 真・名前以外用禁止文言取得要求
- TR: (New) Get forbidden words request
- """
- unk, info, offset, size = struct.unpack(">IIII", data)
- self.sendAnsVulgarityLow(
- info, offset, size, VULGARITY_INFO, seq
- )
- def sendAnsVulgarityLow(self, info, offset, size, vulg, seq):
- """AnsVulgarityLow packet.
- ID: 62570200
- JP: 真・名前以外用禁止文言取得応答
- TR: (New) Get forbidden words response
- """
- data = struct.pack(">III", info, offset, size)
- data += pati.lp2_string(vulg)
- self.send_packet(PatID4.AnsVulgarityLow, data, seq)
- def recvReqCommonKey(self, packet_id, data, seq):
- """ReqCommonKey packet.
- ID: 60700100
- JP: 共通鍵要求
- TR: Common key request
- """
- self.sendAnsCommonKey(seq)
- def sendAnsCommonKey(self, seq):
- """AnsCommonKey packet.
- ID: 60700200
- JP: 共通鍵返答
- TR: Common key response
- TODO: Handle encryption properly.
- """
- # Bypass upcoming encryption by sending a dummy packet instead
- # self.send_packet(PatID4.AnsCommonKey, b'', seq)
- self.sendAnsAuthenticationToken(b'', seq)
- def recvReqLmpConnect(self, packet_id, data, seq):
- """ReqLmpConnect packet.
- ID: 62010100
- JP: LMPの接続先要求
- TR: LMP's access point request
- TODO: I don't think it's related to LMP protocol.
- """
- config = get_config("LMP")
- self.sendAnsLmpConnect(get_ip(config["IP"]), config["Port"], seq)
- def sendAnsLmpConnect(self, address, port, seq):
- """AnsLmpConnect packet.
- ID: 62010200
- JP: LMPの接続先応答
- TR: LMP's access point response
- TODO: Handle/Convert special addresses like 127.0.0.1 and 0.0.0.0.
- """
- data = struct.pack(">H", len(address))
- data += address.encode("ascii")
- data += struct.pack(">H", port)
- self.send_packet(PatID4.AnsLmpConnect, data, seq)
- def recvReqShut(self, packet_id, data, seq):
- """ReqShut packet.
- ID: 60100100
- JP: 切断要求
- TR: Disconnection request
- # 1: full logout(?)
- # 2: logout to different server(?)
- """
- shutdown_type, = struct.unpack(">B", data)
- self.sendAnsShut(shutdown_type, seq)
- def sendAnsShut(self, shutdown_type, seq):
- """AnsShut packet.
- ID: 60100200
- JP: 切断返答
- TR: Disconnection response
- """
- data = struct.pack(">B", shutdown_type)
- self.send_packet(PatID4.AnsShut, data, seq)
- if shutdown_type == 1:
- # Only modify request_reconnection if it's a 1 shutdown_type,
- # otherwise leave it at what it was before
- self.session.request_reconnection = False
- self.finish()
- def sendNtcShut(self, message, seq):
- """NtcShut packet.
- ID: 60101000
- JP: 切断返答
- TR: Disconnection notification
- """
- has_message = bool(message)
- data = struct.pack(">B", int(has_message))
- data += pati.lp2_string(message[:0x200])
- self.send_packet(PatID4.NtcShut, data, seq)
- def recvReqChargeInfo(self, packet_id, data, seq):
- """ReqChargeInfo packet.
- ID: 61020100
- JP: 課金情報要求
- TR: Billing information request
- """
- info_type, = struct.unpack('>B', data)
- self.sendAnsChargeInfo(info_type, seq)
- def sendAnsChargeInfo(self, info_type, seq):
- """AnsChargeInfo packet.
- ID: 61020200
- JP: 課金情報返答
- TR: Billing information response
- """
- info = pati.ChargeInfo()
- info.ticket_validity1 = pati.Long(
- int(timedelta(days=1).total_seconds())
- )
- info.ticket_validity2 = pati.Long(
- int(timedelta(days=1).total_seconds())
- )
- info.unk_binary_0x05 = pati.Binary("Cid")
- info.online_support_code = pati.String(self.session.get_support_code())
- data = info.pack()
- self.send_packet(PatID4.AnsChargeInfo, data, seq)
- def recvReqLoginInfo(self, packet_id, data, seq):
- """ReqLoginInfo packet.
- ID: 61010100
- JP: ログイン情報送信
- TR: Send login information
- """
- data = pati.LoginInfo.unpack(data)
- self.server.debug("LoginInfo: {!r}".format(data))
- self.sendAnsLoginInfo(seq)
- def sendAnsLoginInfo(self, seq):
- """AnsLoginInfo packet.
- ID: 61010200
- JP: ログイン情報返信
- TR: Login information response
- TODO: Implement this properly.
- """
- need_ticket = 1 # The game will call sendReqTicket if set to 1
- data = struct.pack(">B", need_ticket)
- data += pati.lp2_string("dummy_data")
- info = pati.ChargeInfo()
- info.online_support_code = pati.String(
- self.session.get_support_code())
- data += info.pack()
- self.send_packet(PatID4.AnsLoginInfo, data, seq)
- def recvReqTicket(self, packet_id, data, seq):
- """ReqTicket packet.
- ID: 60300100
- JP: PATチケット要求
- TR: PAT ticket request
- It seems both the client and server can:
- - Send a PAT ticket
- - Receive a PAT ticket
- TODO: Investigate how PAT tickets should be used.
- """
- if packet_id == PatID4.ReqTicket:
- self.sendAnsTicket(seq)
- elif packet_id == PatID4.ReqTicket2:
- self.server.error("Unimplemented recvReqTicket")
- def sendAnsTicket(self, seq):
- """AnsTicket packet.
- ID: 60300200
- JP: PATチケット返答
- TR: PAT ticket response
- """
- pat_ticket = self.session.new_pat_ticket()
- data = struct.pack(">H", len(pat_ticket)) + pat_ticket
- self.send_packet(PatID4.AnsTicket, data, seq)
- def recvReqUserListHead(self, packet_id, data, seq):
- """ReqUserListHead packet.
- ID: 61100100
- JP: PAT ID候補数要求
- TR: PAT ID candidate count request
- The games' instruction leaflets seem to refer "PAT ID" as "Capcom ID".
- """
- first_index, count = struct.unpack_from(">II", data)
- header = pati.unpack_bytes(data, 8)
- self.sendAnsUserListHead(first_index, count, header, seq)
- def sendAnsUserListHead(self, first_index, count, header, seq):
- """AnsUserListHead packet.
- ID: 61100200
- JP: PAT ID候補数応答
- TR: PAT ID candidate count response
- """
- data = struct.pack(">II", first_index, count)
- self.send_packet(PatID4.AnsUserListHead, data, seq)
- def recvReqUserListData(self, packet_id, data, seq):
- """ReqUserListData packet.
- ID: 61110100
- JP: PAT ID候補要求
- TR: PAT ID candidate request
- """
- first_index, count = struct.unpack(">II", data)
- self.sendAnsUserListData(first_index, count, seq)
- def sendAnsUserListData(self, first_index, count, seq):
- """AnsUserListData packet.
- ID: 61110200
- JP: PAT ID候補送信
- TR: Send PAT ID candidate
- TODO: Properly create/save/load Capcom ID profiles.
- """
- data = struct.pack(">II", first_index, count)
- for i, obj in self.session.get_users(first_index, count):
- capcom_id, info = obj
- user = pati.UserObject()
- user.slot_index = pati.Long(i)
- user.capcom_id = pati.String(capcom_id)
- if info.get("name"):
- user.hunter_name = pati.String(info["name"])
- data += user.pack()
- self.send_packet(PatID4.AnsUserListData, data, seq)
- def recvReqUserListFoot(self, packet_id, data, seq):
- """ReqUserList packet.
- ID: 61120100
- JP: PAT ID候補送信終了確認
- TR: PAT ID candidate end of transmission request
- """
- self.sendAnsUserListFoot(seq)
- def sendAnsUserListFoot(self, seq):
- """AnsUserListFoot packet.
- ID: 61120200
- JP: PAT ID候補送信終了返答
- TR: PAT ID candidate end of transmission response
- """
- self.send_packet(PatID4.AnsUserListFoot, b"", seq)
- def recvReqServerTime(self, packet_id, data, seq):
- """ReqServerTime packet.
- ID: 60020100
- JP: サーバ時刻要求
- TR: Server time request
- """
- converted_country_code, = struct.unpack(">I", data)
- self.sendAnsServerTime(converted_country_code, seq)
- def sendAnsServerTime(self, converted_country_code, seq):
- """AnsServerTime packet.
- ID: 60020200
- JP: サーバ時刻返答
- TR: Server time response
- """
- # Counter that ticks up once per second, Epoch works
- server_time = time_utils.current_server_time()
- # Always Epoch, used for Wii Shop subscription ticket
- current_time = time_utils.current_server_time()
- data = struct.pack(">II", server_time, current_time)
- self.send_packet(PatID4.AnsServerTime, data, seq)
- def recvReqUserObject(self, packet_id, data, seq):
- """ReqUserObject packet.
- ID: 61200100
- JP: ユーザオブジェクト送信
- TR: Send user object
- TODO: Find the purpose of all UserObject fields.
- """
- is_slot_empty, slot_index = struct.unpack_from(">BI", data)
- user_obj = pati.UserObject.unpack(data, 5)
- self.server.debug("UserObject: {}, {}, {!r}".format(
- is_slot_empty, slot_index, user_obj
- ))
- hunter_name = None
- if hasattr(user_obj, "hunter_name"):
- hunter_name = pati.unpack_string(user_obj.hunter_name)
- self.session.use_user(slot_index, hunter_name)
- user_obj.capcom_id = pati.String(self.session.capcom_id)
- self.server.info("Client {} Capcom ID `{}`".format(
- self.client_address, self.session.capcom_id
- ))
- self.sendAnsUserObject(is_slot_empty, slot_index, user_obj, seq)
- def sendAnsUserObject(self, is_slot_empty, slot_index, user_obj, seq):
- """AnsUserObject packet.
- ID: 61200200
- JP: ユーザオブジェクト結果
- TR: User object result
- TODO: Properly store/update user objects.
- """
- unused = 0
- need_ticket = 1
- data = struct.pack(">B", need_ticket)
- data += pati.lp2_string(b"Unk_UserObj_str")
- data += struct.pack(">I", unused)
- data += user_obj.pack()
- self.send_packet(PatID4.AnsUserObject, data, seq)
- def recvReqFmpListVersion(self, packet_id, data, seq):
- """ReqFmpListVersion packet.
- ID: 61300100 / 63100100
- JP: FMPリストバージョン確認
- TR: FMP list version check
- TODO:
- - Find why there are 2 versions of FMP packets.
- - Find why most of the 2 versions are ignored.
- """
- if packet_id == PatID4.ReqFmpListVersion:
- self.sendAnsFmpListVersion(seq)
- elif packet_id == PatID4.ReqFmpListVersion2:
- self.sendAnsFmpListVersion2(seq)
- def sendAnsFmpListVersion(self, seq):
- """AnsFmpListVersion packet.
- ID: 61300200
- JP: FMPリストバージョン確認応答
- TR: FMP list version acknowledgment
- """
- data = struct.pack(">I", FMP_VERSION)
- self.send_packet(PatID4.AnsFmpListVersion, data, seq)
- def sendAnsFmpListVersion2(self, seq):
- """AnsFmpListVersion2 packet.
- ID: 63100200
- JP: FMPリストバージョン確認応答
- TR: FMP list version acknowledgment
- """
- data = struct.pack(">I", FMP_VERSION)
- self.send_packet(PatID4.AnsFmpListVersion2, data, seq)
- def recvReqFmpListHead(self, packet_id, data, seq):
- """ReqFmpListHead packet.
- ID: 61310100 / 63110100
- JP: FMPリスト数送信 / FMPリスト数要求
- TR: Send FMP list count / FMP list count request
- """
- # TODO: Might be worth investigating these parameters as
- # they might be useful when using multiple FMP servers.
- version, first_index, count = struct.unpack_from(
- ">III", data
- ) # noqa: F841
- # TODO: Unpack it using pati.Unpacker
- header = pati.unpack_bytes(data, 12) # noqa: F841
- if packet_id == PatID4.ReqFmpListHead:
- self.sendAnsFmpListHead(seq)
- elif packet_id == PatID4.ReqFmpListHead2:
- self.sendAnsFmpListHead2(seq)
- def sendAnsFmpListHead(self, seq):
- """AnsFmpListHead packet.
- ID: 61310200
- JP: FMPリスト数応答
- TR: FMP list count response
- """
- unused = 0
- count = len(self.session.get_servers())
- data = struct.pack(">II", unused, count)
- self.send_packet(PatID4.AnsFmpListHead, data, seq)
- def sendAnsFmpListHead2(self, seq):
- """AnsFmpListHead2 packet.
- ID: 63110200
- JP: FMPリスト数応答
- TR: FMP list count response
- TODO: Check if it's always ignored compared to the previous one.
- """
- unused = 0
- count = len(self.session.get_servers())
- data = struct.pack(">II", unused, count)
- self.send_packet(PatID4.AnsFmpListHead2, data, seq)
- def recvReqFmpListData(self, packet_id, data, seq):
- """ReqFmpListData packet.
- ID: 61320100 / 63120100
- JP: FMPリスト送信 / FMPリスト要求
- TR: Send FMP list / FMP list response
- """
- first_index, count = struct.unpack_from(">II", data)
- if packet_id == PatID4.ReqFmpListData:
- self.sendAnsFmpListData(first_index, count, seq)
- elif packet_id == PatID4.ReqFmpListData2:
- self.sendAnsFmpListData2(first_index, count, seq)
- def sendAnsFmpListData(self, first_index, count, seq):
- """AnsFmpListData packet.
- ID: 61320200
- JP: FMPリスト応答
- TR: FMP list response
- TODO:
- - Do more reverse engineering.
- - This packet seems to affect only the first entry.
- """
- unused = 0
- data = struct.pack(">II", unused, count)
- data += pati.get_fmp_servers(self.session, first_index, count)
- self.send_packet(PatID4.AnsFmpListData, data, seq)
- def sendAnsFmpListData2(self, first_index, count, seq):
- """AnsFmpListData2 packet.
- ID: 63120200
- JP: FMPリスト応答
- TR: FMP list response
- TODO:
- - Do more reverse engineering.
- - This packet seems to affect entries past the first one.
- """
- unused = 0
- data = struct.pack(">II", unused, count)
- data += pati.get_fmp_servers(self.session, first_index, count)
- self.send_packet(PatID4.AnsFmpListData2, data, seq)
- def recvReqFmpListFoot(self, packet_id, data, seq):
- """ReqFmpListFoot packet.
- ID: 61330100 / 63130100
- JP: FMPリスト送信終了 / FMPリスト終了送信
- TR: FMP list end of transmission / FMP list transmission end
- """
- if packet_id == PatID4.ReqFmpListFoot:
- self.sendAnsFmpListFoot(seq)
- elif packet_id == PatID4.ReqFmpListFoot2:
- self.sendAnsFmpListFoot2(seq)
- def sendAnsFmpListFoot(self, seq):
- """AnsFmpListFoot packet.
- ID: 61330200
- JP: FMPリスト送信終了
- TR: FMP list end of transmission
- """
- self.send_packet(PatID4.AnsFmpListFoot, b"", seq)
- def sendAnsFmpListFoot2(self, seq):
- """AnsFmpListFoot2 packet.
- ID: 63130200
- JP: FMPリスト終了返答
- TR: FMP list end of transmission response
- """
- self.send_packet(PatID4.AnsFmpListFoot2, b"", seq)
- def recvReqLayerEnd(self, packet_id, data, seq):
- """ReqLayerEnd packet.
- ID: 64020100
- JP: レイヤ終了要求
- TR: Layer end request
- """
- self.sendAnsLayerEnd(seq)
- def sendAnsLayerEnd(self, seq):
- """AnsLayerEnd packet.
- ID: 64020200
- JP: レイヤ終了応答
- TR: Layer end response
- """
- self.notify_layer_departure(True)
- self.send_packet(PatID4.AnsLayerEnd, b"", seq)
- @staticmethod
- def packNtcLayerUserNum(update_type, layer_data):
- # type: (LayerUserNumUpdate, pati.LayerData) -> bytes
- """NtcLayerUserNum packet.
- ID: NtcLayerUserNum
- JP: レイヤ人数通知
- TR: Layer number notification
-
- UPDATE TYPE:
- 1 - Update numbers in the current layer
- 2 - Update numbers in the current layer plus fire an event (unknown)
- 3 - Update numbers in an unknown struct in an array
- 4 - Update numbers to the current layer's child (child_id=layer_path)
- 5 - Update numbers in unk fields in the NetworkLayerPat struct
- """
- data = struct.pack(">B", update_type)
- data += pati.LayerUserNum.pack_from(layer_data)
- return data
- def recvReqFmpInfo(self, packet_id, data, seq):
- """ReqFmpInfo packet.
- ID: 61340100 / 63140100
- JP: FMPデータ要求 / FMPデータ要求
- TR: FMP data request
- TODO: Do not hardcode the data and find the meaning of all fields.
- """
- index, = struct.unpack_from(">I", data)
- fields = pati.unpack_bytes(data, 4)
- server = self.session.join_server(index)
- config = get_config("FMP")
- fmp_addr = get_ip(config["IP"])
- fmp_port = config["Port"]
- fmp_data = pati.FmpData()
- fmp_data.server_address = pati.String(server.addr or fmp_addr)
- fmp_data.server_port = pati.Word(server.port or fmp_port)
- fmp_data.assert_fields(fields)
- if packet_id == PatID4.ReqFmpInfo:
- self.sendAnsFmpInfo(fmp_data, fields, seq)
- elif packet_id == PatID4.ReqFmpInfo2:
- self.sendAnsFmpInfo2(fmp_data, fields, seq)
- # Preserve session in database, due to server selection
- self.session.request_reconnection = True
- def sendAnsFmpInfo(self, fmp_data, fields, seq):
- """AnsFmpInfo packet.
- ID: 61340200
- JP: FMPデータ返答
- TR: FMP data response
- """
- data = fmp_data.pack_fields(fields)
- self.send_packet(PatID4.AnsFmpInfo, data, seq)
- def sendAnsFmpInfo2(self, fmp_data, fields, seq):
- """AnsFmpInfo2 packet.
- ID: 63140200
- JP: FMPデータ返答
- TR: FMP data response
- """
- data = fmp_data.pack_fields(fields)
- self.send_packet(PatID4.AnsFmpInfo2, data, seq)
- def recvReqBinaryHead(self, packet_id, data, seq):
- """ReqBinaryHead packet.
- ID: 63020100
- JP: バイナリデータ開始要求
- TR: Binary data start request
- TODO: Find all binary types and their meaning.
- """
- binary_type, = struct.unpack(">B", data)
- self.sendAnsBinaryHead(binary_type, seq)
- def sendAnsBinaryHead(self, binary_type, seq):
- """AnsBinaryHead packet.
- ID: 63020200
- JP: バイナリデータ開始応答
- TR: Binary data start response
- Examples of types sent during the login process
- - English: 0x05, 0x01, 0x05, 0x02, 0x03, 0x04
- - Japanese: 0x0a, 0x01, 0x0a, 0x02, 0x03, 0x04
- - French: 0x14, 0x10, 0x14, 0x11, 0x12, 0x13
- - German: 0x23, 0x1f, 0x23, 0x20, 0x21, 0x22
- - Italian: 0x32, 0x2e, 0x32, 0x2f, 0x30, 0x31
- - Spanish: 0x41, 0x3d, 0x41, 0x3e, 0x3f, 0x40
- """
- binary = PAT_BINARIES[binary_type]
- version = binary["version"]
- if callable(version):
- version = version(self.server.binary_loader)
- content = get_pat_binary_from_version(binary_type, version)
- if callable(content):
- content = content(self.server.binary_loader)
- data = struct.pack(">II", version, len(content))
- self.send_packet(PatID4.AnsBinaryHead, data, seq)
- def recvReqBinaryData(self, packet_id, data, seq):
- """ReqBinaryData packet.
- ID: 63030100
- JP: バイナリデータ要求
- TR: Binary data request
- """
- binary_type, version, offset, size = struct.unpack(">BIII", data)
- content = get_pat_binary_from_version(binary_type, version)
- if callable(content):
- content = content(self.server.binary_loader)
- self.sendAnsBinaryData(version, offset, size, content, seq)
- def sendAnsBinaryData(self, version, offset, size, binary, seq):
- """AnsBinaryData packet.
- ID: 63030200
- JP: バイナリデータ応答
- TR: Binary data response
- """
- data = struct.pack(">III", version, offset, size)
- data += pati.lp2_string(binary[offset:offset+size])
- self.send_packet(PatID4.AnsBinaryData, data, seq)
- def recvReqBinaryFoot(self, packet_id, data, seq):
- """ReqBinaryFoot packet.
- ID: 63040100
- JP: バイナリデータ完了要求
- TR: Binary date end of transmission request
- """
- binary_type, = struct.unpack(">B", data)
- self.sendAnsBinaryFoot(binary_type, seq)
- def sendAnsBinaryFoot(self, binary_type, seq):
- """AnsBinaryData packet.
- ID: 63040200
- JP: バイナリデータ完了応答
- TR: Binary date end of transmission response
- """
- self.send_packet(PatID4.AnsBinaryFoot, b"", seq)
- def recvReqUserSearchHead(self, packet_id, data, seq):
- """ReqUserSearchHead packet.
- ID: 66330100
- JP: ユーザ検索数要求
- TR: User search count request
- Sent by the game when searching for players:
- - Online > Player Search > By Name
- - Online > Player Search > By Id
- """
- with pati.Unpacker(data) as unpacker:
- self.search_info = {
- "capcom_id": to_str(unpacker.lp2_string()),
- "hunter_name": unpacker.lp2_string(),
- "search": unpacker.detailed_optional_fields(),
- "offset": unpacker.struct(">I")[0],
- "limit": unpacker.struct(">I")[0],
- "fields": unpacker.bytes()
- }
- self.server.debug((
- "ReqUserSearchHead("
- "hunter_name={hunter_name!r}, capcom_id={capcom_id!r}, "
- "search={search!r}, offset={offset}, limit={limit}, "
- "fields={fields!r})"
- ).format(**self.search_info))
- self.sendAnsUserSearchHead(seq)
- def sendAnsUserSearchHead(self, seq):
- """AnsUserSearchHead packet.
- ID: 66330200
- JP: ユーザ検索数返答
- TR: User search count response
- """
- unk = 0
- self.search_data = self.session.find_users(
- self.search_info["capcom_id"], self.search_info["hunter_name"],
- self.search_info["offset"], self.search_info["limit"]
- )
- data = struct.pack(">II", unk, len(self.search_data))
- self.send_packet(PatID4.AnsUserSearchHead, data, seq)
- def recvReqUserSearchData(self, packet_id, data, seq):
- """ReqUserSearchData packet.
- ID: 66340100
- JP: ユーザ検索要求
- TR: User search request
- """
- offset, size = struct.unpack(">II", data)
- self.sendAnsUserSearchData(offset, size, seq)
- def sendAnsUserSearchData(self, offset, size, seq):
- """AnsUserSearchData packet.
- ID: 66340200
- JP: ユーザ検索返答
- TR: User search response
- """
- unk = 0
- users = self.search_data
- count = len(users)
- data = struct.pack(">II", unk, count)
- for user in users:
- user_info = pati.UserSearchInfo()
- user_info.capcom_id = pati.String(user.capcom_id)
- user_info.hunter_name = pati.String(user.hunter_name)
- user_info.stats = pati.Binary(user.hunter_info.pack())
- user_info.layer_host = pati.Binary(user.get_layer_host_data())
- user_info.assert_fields(self.search_info["fields"])
- data += user_info.pack()
- data += pati.pack_optional_fields(user.get_optional_fields())
- self.send_packet(PatID4.AnsUserSearchData, data, seq)
- def recvReqUserSearchFoot(self, packet_id, data, seq):
- """ReqUserSearchFoot packet.
- ID: 66350100
- JP: ユーザ検索終了要求
- TR: User search end of transmission request
- """
- self.sendAnsUserSearchFoot(seq)
- def sendAnsUserSearchFoot(self, seq):
- """AnsUserSearchFoot packet.
- ID: 66350200
- JP: ユーザ検索終了返答
- TR: User search end of transmission response
- """
- self.send_packet(PatID4.AnsUserSearchFoot, b"", seq)
- def recvReqUserSearchInfo(self, packet_id, data, seq):
- """ReqUserSearchInfo packet.
- ID: 66360100
- JP: ユーザ検索データ要求
- TR: User search data request
- """
- with pati.Unpacker(data) as unpacker:
- capcom_id = to_str(unpacker.lp2_string())
- search_info = unpacker.UserSearchInfo()
- self.server.debug("SearchInfo: {}, {!r}".format(capcom_id,
- search_info))
- self.sendAnsUserSearchInfo(capcom_id, search_info, seq)
- def sendAnsUserSearchInfo(self, capcom_id, search_info, seq):
- """AnsUserSearchInfo packet.
- ID: 66360200
- JP: ユーザ検索データ返答
- TR: User search data response
- """
- user = self.session.find_user_by_capcom_id(capcom_id)
- if not user:
- # TODO: Find a better way to notify offline status in friend list
- user_info = pati.UserSearchInfo()
- user_info.capcom_id = pati.String(capcom_id)
- user_info.hunter_name = pati.String("OFFLINE")
- data = user_info.pack() + pati.pack_optional_fields([])
- self.sendAnsAlert(PatID4.AnsUserSearchInfo, data, seq)
- return
- user_info = pati.UserSearchInfo()
- user_info.capcom_id = pati.String(user.capcom_id)
- user_info.hunter_name = pati.String(user.hunter_name)
- user_info.stats = pati.Binary(user.hunter_info.pack())
- user_info.layer_host = pati.Binary(user.get_layer_host_data())
- user_info.unk_byte_0x07 = pati.Byte(1)
- user_info.server_name = pati.String("\t".join([
- user.local_info["server_name"] or "",
- user.local_info["gate_name"] or "",
- user.local_info["city_name"] or ""
- ]))
- user_info.unk_byte_0x0b = pati.Byte(1)
- user_info.unk_string_0x0c = pati.String("StrC")
- user_info.city_capacity = pati.Long(4)
- user_info.city_size = pati.Long(3)
- # This fields are used to identify a user.
- # Specifically when a client is deserializing data from the packets
- # `NtcLayerBinary` and `NtcLayerBinary2`
- # TODO: Proper field value and name
- user_info.info_mine_0x0f = pati.Long(int(hash(user.capcom_id))
- & 0xffffffff)
- user_info.info_mine_0x10 = pati.Long(int(hash(user.capcom_id[::-1]))
- & 0xffffffff)
- data = user_info.pack()
- # TODO: Figure out the optional fields
- data += pati.pack_optional_fields([])
- self.send_packet(PatID4.AnsUserSearchInfo, data, seq)
- def recvReqLayerStart(self, packet_id, data, seq):
- """ReqLayerStart packet.
- ID: 64010100
- JP: レイヤ開始要求
- TR: Layer start request
- """
- unk1 = pati.unpack_bytes(data)
- unk2 = pati.unpack_bytes(data, len(unk1) + 1)
- self.sendAnsLayerStart(unk1, unk2, seq)
- def sendAnsLayerStart(self, unk1, unk2, seq):
- """AnsLayerStart packet.
- ID: 64010200
- JP: レイヤ開始応答
- TR: Layer start response
- """
- layer = self.session.layer_start()
- self.send_packet(PatID4.AnsLayerStart, layer.pack(), seq)
- def sendNtcCircleLeave(self, circle, circle_index, seq):
- """NtcCircleLeave packet.
- ID: 65041000
- JP: サークルアウト通知
- TR: Circle out notification
- """
- player_index = circle.players.index(self.session) + 1
- data = struct.pack(">I", circle_index)
- data += pati.lp2_string(self.session.capcom_id)
- data += struct.pack(">B", player_index)
- unk01 = 0x00 # Flag of some sort
- data += struct.pack(">B", unk01)
- self.server.circle_broadcast(circle, PatID4.NtcCircleLeave, data, seq,
- self.session)
- def recvReqCircleInfoNoticeSet(self, packet_id, data, seq):
- """ReqCircleInfoNoticeSet packet.
- ID: 65800100
- JP: サークル通知定義登録要求
- TR: Circle notification subscription request
- """
- unk1 = pati.unpack_bytes(data)
- unk2 = pati.unpack_bytes(data, len(unk1) + 1)
- self.sendAnsCircleInfoNoticeSet(unk1, unk2, seq)
- def sendAnsCircleInfoNoticeSet(self, unk1, unk2, seq):
- """AnsCircleInfoNoticeSet packet.
- ID: 65800200
- JP: サークル通知定義登録返答
- TR: Circle notification subscription response
- """
- self.send_packet(PatID4.AnsCircleInfoNoticeSet, b"", seq)
- def recvNtcCheatCheck(self, packet_id, data, seq):
- """NtcCheatCheck packet.
- ID: 60801000
- JP: チートチェックデータ送信
- TR: Send cheat check data
- TODO: Handle cheat check data.
- """
- pass
- def recvReqUserSearchSet(self, packet_id, data, seq):
- """ReqUserSearchSet packet.
- ID: 66300100
- JP: ユーザ検索設定要求
- TR: User search settings request
- """
- extra = pati.unpack_optional_fields(data)
- self.server.debug("UserSearchSet: {!r}".format(extra))
- self.sendAnsUserSearchSet(extra, seq)
- def sendAnsUserSearchSet(self, extra, seq):
- """AnsUserSearchSet packet.
- ID: 66300200
- JP: ユーザ検索設定返答
- TR: User search settings response
- """
- self.send_packet(PatID4.AnsUserSearchSet, b"", seq)
- def recvReqBinaryVersion(self, packet_id, data, seq):
- """ReqBinaryVersion packet.
- ID: 63010100
- JP: バイナリバージョン確認
- TR: Binary version check
- """
- binary_type, = struct.unpack(">B", data)
- self.sendAnsBinaryVersion(binary_type, seq)
- def sendAnsBinaryVersion(self, binary_type, seq):
- """AnsBinaryVersion packet.
- ID: 63010200
- JP: バイナリバージョン確認応答
- TR: Binary version acknowledgment
- """
- unused = 0
- binary = PAT_BINARIES[binary_type]
- version = binary["version"]
- if callable(version):
- version = version(self.server.binary_loader)
- data = struct.pack(">BI", unused, version)
- self.send_packet(PatID4.AnsBinaryVersion, data, seq)
- def recvReqLayerUserListHead(self, packet_id, data, seq):
- """ReqLayerUserListHead packet.
- ID: 64640100
- JP: レイヤユーザリスト数要求
- TR: Layer user list count request
- """
- with pati.Unpacker(data) as unpacker:
- unk, = unpacker.struct(">B")
- layer = unpacker.lp2_string()
- first_index, count = unpacker.struct(">II")
- fields = unpacker.bytes()
- self.server.debug("LayerUserListHead({}, {!r}, {}, {}, {!r}".format(
- unk, layer, first_index, count, fields))
- self.sendAnsLayerUserListHead(unk, layer, first_index, count, fields,
- seq)
- def sendAnsLayerUserListHead(self, unk, layer, first_index, count,
- fields,
- seq):
- """AnsLayerUserListHead packet.
- ID: 64640200
- JP: レイヤユーザリスト数返答
- TR: Layer user list count response
- """
- depth, server_id, unk_id, gate_id, city_id = struct.unpack_from(
- ">IIHHH", layer)
- self.search_data = self.session.find_users_by_layer(
- server_id, gate_id, city_id, first_index, count)
- data = struct.pack(">II", first_index, len(self.search_data))
- self.send_packet(PatID4.AnsLayerUserListHead, data, seq)
- def recvReqLayerUserListData(self, packet_id, data, seq):
- """ReqLayerUserListData packet.
- ID: 64650100
- JP: レイヤユーザリスト要求
- TR: Layer user list request
- """
- first_index, count = struct.unpack(">II", data)
- self.sendAnsLayerUserListData(first_index, count, seq)
- def sendAnsLayerUserListData(self, first_index, count, seq):
- """AnsLayerUserListData packet.
- ID: 64650200
- JP: レイヤユーザリスト返答
- TR: Layer user list response
- """
- unk = 1
- users = self.search_data
- data = struct.pack(">II", unk, len(users))
- for _, user in users:
- layer_user = pati.LayerUserInfo()
- layer_user.capcom_id = pati.String(user.capcom_id)
- layer_user.hunter_name = pati.String(user.hunter_name)
- layer_user.layer_host = pati.Binary(user.get_layer_host_data())
- data += layer_user.pack()
- data += pati.pack_optional_fields(user.get_optional_fields())
- self.send_packet(PatID4.AnsLayerUserListData, data, seq)
- def recvReqLayerUserListFoot(self, packet_id, data, seq):
- """ReqLayerUserListFoot packet.
- ID: 64660100
- JP: レイヤユーザリスト終了要求
- TR: Layer user list end of transmission request
- """
- self.sendAnsLayerUserListFoot(seq)
- def sendAnsLayerUserListFoot(self, seq):
- """AnsLayerUserListFoot packet.
- ID: 64660200
- JP: レイヤユーザリスト終了返答
- TR: Layer user list end of transmission response
- """
- data = b""
- self.send_packet(PatID4.AnsLayerUserListFoot, data, seq)
- def recvReqLayerUserSearchHead(self, packet_id, data, seq):
- """ReqLayerUserSearchHead packet.
- ID: 64670100
- JP: レイヤユーザ検索リスト数要求
- TR: Layer user search list count request
- Sent by the game when searching for players at the gate:
- - Online > Player Search > Gate Search
- """
- with pati.Unpacker(data) as unpacker:
- self.search_info = {
- "unk": unpacker.struct(">B")[0],
- "layer": unpacker.lp2_string(),
- "capcom_id": to_str(unpacker.lp2_string()),
- "hunter_name": unpacker.lp2_string(),
- "search": unpacker.detailed_optional_fields(),
- "offset": unpacker.struct(">I")[0],
- "limit": unpacker.struct(">I")[0],
- "fields": unpacker.bytes()
- }
- self.server.debug((
- "ReqLayerUserSearchHead("
- "unk={unk}, layer={layer!r}, "
- "capcom_id={capcom_id!r}, hunter_name={hunter_name!r}, "
- "search={search!r}, offset={offset}, limit={limit}, "
- "fields={fields!r})"
- ).format(**self.search_info))
- self.sendAnsLayerUserSearchHead(seq)
- def sendAnsLayerUserSearchHead(self, seq):
- """AnsLayerUserSearchHead packet.
- ID: 64670200
- JP: レイヤユーザ検索リスト数返答
- TR: Layer user search list count response
- """
- depth, server_id, unk_id, gate_id, city_id = struct.unpack_from(
- ">IIHHH", self.search_info["layer"]
- )
- self.search_data = self.session.find_users_by_layer(
- server_id, gate_id, city_id,
- self.search_info["offset"], self.search_info["limit"],
- recursive=True
- )
- unk = 0
- data = struct.pack(">II", unk, len(self.search_data))
- self.send_packet(PatID4.AnsLayerUserSearchHead, data, seq)
- def recvReqLayerUserSearchData(self, packet_id, data, seq):
- """ReqLayerUserSearchData packet.
- ID: 64680100
- JP: レイヤユーザ検索リスト要求
- TR: Layer user search list request
- """
- offset, size = struct.unpack(">II", data)
- self.sendAnsLayerUserSearchData(offset, size, seq)
- def sendAnsLayerUserSearchData(self, offset, size, seq):
- """AnsLayerUserSearchData packet.
- ID: 64680200
- JP: レイヤユーザ検索リスト返答
- TR: Layer user search list response
- """
- unk = 0
- layer_users = self.search_data
- count = len(layer_users)
- data = struct.pack(">II", unk, count)
- for _, user in layer_users:
- layer_user = pati.LayerUserInfo()
- layer_user.capcom_id = pati.String(user.capcom_id)
- layer_user.hunter_name = pati.String(user.hunter_name)
- layer_user.layer_host = pati.Binary(user.get_layer_host_data())
- layer_user.assert_fields(self.search_info["fields"])
- data += layer_user.pack()
- data += pati.pack_optional_fields(user.get_optional_fields())
- self.send_packet(PatID4.AnsLayerUserSearchData, data, seq)
- def recvReqLayerUserSearchFoot(self, packet_id, data, seq):
- """ReqLayerUserSearchFoot packet.
- ID: 64690100
- JP: レイヤユーザ検索リスト終了要求
- TR: Layer user search list end of transmission request
- """
- self.sendAnsLayerUserSearchFoot(seq)
- def sendAnsLayerUserSearchFoot(self, seq):
- """AnsLayerUserSearchFoot packet.
- ID: 64690200
- JP: レイヤユーザ検索リスト終了返答
- TR: Layer user search list end of transmission response
- """
- self.send_packet(PatID4.AnsLayerUserSearchFoot, b"", seq)
- def recvReqFriendList(self, packet_id, data, seq):
- """ReqFriendList packet.
- ID: 66540100
- JP: フレンドリスト要求
- TR: Friend list request
- """
- with pati.Unpacker(data) as unpacker:
- first_index, count = unpacker.struct(">II")
- fields = unpacker.bytes()
- self.sendAnsFriendList(first_index, count, fields, seq)
- def sendAnsFriendList(self, first_index, count, fields, seq):
- """AnsFriendList packet.
- ID: 66540200
- JP: フレンドリスト返答
- TR: Friend list response
- """
- unk = 0
- friends = self.session.get_friends(first_index, count)
- count = len(friends)
- data = struct.pack(">II", unk, count)
- for index, (capcom_id, hunter_name) in enumerate(friends, 1):
- friend = pati.FriendData()
- friend.index = pati.Long(index)
- friend.capcom_id = pati.String(capcom_id)
- friend.hunter_name = pati.String(hunter_name)
- friend.assert_fields(fields)
- data += friend.pack()
- self.send_packet(PatID4.AnsFriendList, data, seq)
- def recvReqBlackAdd(self, packet_id, data, seq):
- """ReqBlackAdd packet.
- ID: 66600100
- JP: ブラックデータ登録要求
- TR: Black data registration request
- TODO: Implement it properly
- """
- with pati.Unpacker(data) as unpacker:
- capcom_id = to_str(unpacker.lp2_string())
- black_data = unpacker.BlackListUserData()
- self.sendAnsBlackAdd(capcom_id, black_data, seq)
- def sendAnsBlackAdd(self, capcom_id, black_data, seq):
- """AnsBlackAdd packet.
- ID: 66600200
- JP: ブラックデータ登録返答
- TR: Black data registration response
- """
- self.send_packet(PatID4.AnsBlackAdd, b"", seq)
- def recvReqBlackDelete(self, packet_id, data, seq):
- """ReqBlackDelete packet.
- ID: 66610100
- JP: ブラックデータ削除要求
- TR: Black data deletion request
- """
- capcom_id = to_str(pati.unpack_lp2_string(data))
- self.sendAnsBlackDelete(capcom_id, seq)
- def sendAnsBlackDelete(self, capcom_id, seq):
- """AnsBlackDelete packet.
- ID: 66610200
- JP: ブラックデータ削除返答
- TR: Black data deletion response
- """
- self.send_packet(PatID4.AnsBlackDelete, b"", seq)
- def recvReqBlackList(self, packet_id, data, seq):
- """ReqBlackList packet.
- ID: 66620100
- JP: ブラックリスト要求
- TR: Blacklist request
- TODO: Implement it properly
- """
- unk1, unk2 = struct.unpack_from(">II", data) # 1st/last index?
- unk3 = data[8:] # PAT fields filter? (1,2,3)?
- self.sendAnsBlackList(unk1, unk2, unk3, seq)
- def sendAnsBlackList(self, unk1, unk2, unk3, seq):
- """AnsBlackList packet.
- ID: 66620200
- JP: ブラックリスト返答
- TR: Blacklist response
- """
- dummy = pati.BlackListUserData()
- dummy.index = pati.Long(0)
- dummy.capcom_id = pati.String("")
- dummy.hunter_name = pati.String("")
- blacklisted_users = [dummy]
- unk = 0
- count = len(blacklisted_users)
- data = struct.pack(">II", unk, count)
- data += b"".join([item.pack() for item in blacklisted_users])
- self.send_packet(PatID4.AnsBlackList, data, seq)
- def recvNtcLayerChat(self, packet_id, data, seq):
- """NtcLayerChat packet.
- ID: 64721000
- JP: レイヤチャット通知
- TR: Layer chat notification
- The game sends a chat message to relay.
- NB: The message won't be displayed if the Capcom ID is blank.
- """
- with pati.Unpacker(data) as unpacker:
- unk1, = unpacker.struct(">B")
- info = unpacker.MessageInfo()
- message = unpacker.lp2_string()
- self.server.debug("NtcLayerChat: {}, {!r}, {}".format(
- unk1, info, message))
- self.sendNtcLayerChat(unk1, info, message, seq)
- def sendNtcLayerChat(self, unk1, info, message, seq):
- """NtcLayerChat packet.
- ID: 64721000
- JP: レイヤチャット送信
- TR: Layer chat transmission
- NB: Request and response share the same packet ID
- The server sends a chat message.
- """
- data = struct.pack(">B", unk1)
- info.text_color = pati.Long(LAYER_CHAT_COLORS[self.session.layer])
- info.sender_id = pati.String(self.session.capcom_id)
- info.sender_name = pati.String(self.session.hunter_name)
- data += info.pack()
- data += pati.lp2_string(message)
- self.server.layer_broadcast(self.session, PatID4.NtcLayerChat, data,
- seq)
- def recvReqLayerTell(self, packet_id, data, seq):
- """ReqLayerTell packet.
- ID: 64730100
- JP: レイヤ相手指定チャット送信
- TR: Layer specified chat transmission
- """
- with pati.Unpacker(data) as unpacker:
- recipient_id = to_str(unpacker.lp2_string())
- info = unpacker.MessageInfo()
- message = unpacker.lp2_string()
- self.server.debug("ReqTell: {}, {!r}, {}".format(
- recipient_id, info, message))
- self.sendAnsLayerTell(recipient_id, info, message, seq)
- def sendAnsLayerTell(self, recipient_id, info, message, seq):
- """AnsLayerTell packet.
- ID: 64730200
- JP: レイヤ相手指定チャット返信
- TR: Layer partner specified chat reply
- """
- self.send_packet(PatID4.AnsLayerTell, b"", seq)
- self.sendNtcLayerTell(recipient_id, info, message, seq)
- def sendNtcLayerTell(self, recipient_id, info, message, seq):
- """NtcLayerTell packet.
- ID: 64731000
- JP: レイヤ相手指定チャット通知
- TR: Layer partner specified chat notification
- """
- data = pati.lp2_string(recipient_id)
- info.unk_long_0x02 = pati.Long(20)
- info.sender_id = pati.String(self.session.capcom_id)
- info.sender_name = pati.String(self.session.hunter_name)
- data += info.pack()
- data += pati.lp2_string(message)
- self.send_packet(PatID4.NtcLayerTell, data, seq)
- for _, player in self.session.get_layer_players():
- if player.capcom_id == recipient_id:
- handler = self.server.get_pat_handler(player)
- if handler:
- handler.try_send_packet(PatID4.NtcLayerTell, data, seq)
- def recvNtcCircleChat(self, packet_id, data, seq):
- """NtcCircleChat packet.
- ID: 65721000
- JP: サークルチャット通知
- TR: Circle chat notification
- """
- with pati.Unpacker(data) as unpacker:
- info = unpacker.MessageInfo()
- message = unpacker.lp2_string()
- self.server.debug("NtcCircleChat: {!r}, {}".format(
- info, message))
- self.sendNtcCircleChat(info, message, seq)
- def sendNtcCircleChat(self, info, message, seq):
- """NtcCircleChat packet.
- ID: 65721000
- JP: サークルチャット送信
- TR: Circle chat transmission
- """
- info.sender_id = pati.String(self.session.capcom_id)
- info.sender_name = pati.String(self.session.hunter_name)
- data = info.pack()
- data += pati.lp2_string(message)
- circle = self.session.get_circle()
- self.server.circle_broadcast(circle, PatID4.NtcCircleChat, data,
- seq, self.session)
- def recvReqTell(self, packet_id, data, seq):
- """ReqTell packet.
- ID: 66110100
- JP: 相手指定チャット送信
- TR: Send partner message
- """
- with pati.Unpacker(data) as unpacker:
- recipient_id = to_str(unpacker.lp2_string())
- info = unpacker.MessageInfo()
- message = unpacker.lp2_string()
- self.server.debug("ReqTell: {}, {!r}, {}".format(
- recipient_id, info, message))
- self.sendAnsTell(recipient_id, info, message, seq)
- def sendAnsTell(self, recipient_id, info, message, seq):
- """AnsTell packet.
- ID: 66110200
- JP: 相手指定チャット返信
- TR: Receive partner message
- """
- self.send_packet(PatID4.AnsTell, b"", seq)
- self.sendNtcTell(recipient_id, info, message, seq)
- def sendNtcTell(self, recipient_id, info, message, seq):
- """NtcTell packet.
- ID: 66111000
- JP: 相手指定チャット通知
- TR: Partner message notification
- """
- data = b""
- data += pati.lp2_string(recipient_id)
- info.unk_long_0x02 = pati.Long(20)
- info.sender_id = pati.String(self.session.capcom_id)
- info.sender_name = pati.String(self.session.hunter_name)
- data += info.pack()
- data += pati.lp2_string(message)
- self.try_send_packet_to(recipient_id, PatID4.NtcTell, data, seq)
- def recvReqFriendAdd(self, packet_id, data, seq):
- """ReqFriendAdd packet.
- ID: 66500100
- JP: フレンド登録要求
- TR: Friend registration request
- TODO: Merge this with ReqTell?
- """
- with pati.Unpacker(data) as unpacker:
- recipient_id = to_str(unpacker.lp2_string())
- info = unpacker.MessageInfo()
- message = unpacker.lp2_string()
- self.server.debug("ReqFriendAdd: {}, {!r}, {}".format(
- recipient_id, info, message))
- self.sendAnsFriendAdd(recipient_id, info, message, seq)
- def sendAnsFriendAdd(self, recipient_id, info, message, seq):
- """AnsFriendAdd packet.
- ID: 66500200
- JP: フレンド登録返答
- TR: Friend registration response
- """
- if not self.session.add_friend_request(recipient_id):
- self.sendAnsAlert(PatID4.AnsFriendAdd,
- "<LF=8><BODY><CENTER>Unknown Capcom ID.<END>",
- seq)
- return
- self.send_packet(PatID4.AnsFriendAdd, b"", seq)
- self.sendNtcFriendAccept(recipient_id, info, message, seq)
- def sendNtcFriendAdd(self, recipient_id, accepted, seq):
- """NtcFriendAdd packet.
- ID: 66501000
- JP: フレンド登録完了通知
- TR: Friend registration completed notification
- """
- if not accepted:
- return
- # Send notification to friend
- data = b""
- data += pati.lp2_string(recipient_id)
- friend = pati.FriendData()
- friend.index = pati.Long(1) # TODO: Reverse-engineer this field
- friend.capcom_id = pati.String(self.session.capcom_id)
- friend.hunter_name = pati.String(self.session.hunter_name)
- data += friend.pack()
- data += struct.pack(">B", accepted)
- self.try_send_packet_to(recipient_id, PatID4.NtcFriendAdd, data, seq)
- # Send notification to self
- data = pati.lp2_string(self.session.capcom_id)
- friend = pati.FriendData()
- friend.index = pati.Long(1) # TODO: Reverse-engineer this field
- friend.capcom_id = pati.String(recipient_id)
- friend.hunter_name = pati.String(
- self.session.get_user_name(recipient_id))
- data += friend.pack()
- data += struct.pack(">B", accepted)
- self.send_packet(PatID4.NtcFriendAdd, data, seq)
- def recvReqFriendAccept(self, packet_id, data, seq):
- """ReqFriendAccept packet.
- ID: 66510100
- JP: フレンド登録依頼返答要求
- TR: Friend registration accept request
- """
- with pati.Unpacker(data) as unpacker:
- recipient_id = to_str(unpacker.lp2_string())
- accepted, = unpacker.struct(">B")
- self.sendAnsFriendAccept(recipient_id, accepted, seq)
- self.sendNtcFriendAdd(recipient_id, accepted, seq)
- def sendAnsFriendAccept(self, recipient_id, accepted, seq):
- """AnsFriendAccept packet.
- ID: 66510200
- JP: フレンド登録依頼返答返答
- TR: Friend registration accept response
- """
- self.session.accept_friend(recipient_id, accepted)
- self.send_packet(PatID4.AnsFriendAccept, b"", seq)
- def sendNtcFriendAccept(self, recipient_id, info, message, seq):
- """NtcFriendAccept packet.
- ID: 66511000
- JP: フレンド登録依頼通知
- TR: Friend registration request notification
- TODO: Merge this with NtcTell?
- """
- data = b""
- data += pati.lp2_string(recipient_id)
- info.unk_long_0x02 = pati.Long(20)
- info.sender_id = pati.String(self.session.capcom_id)
- info.sender_name = pati.String(self.session.hunter_name)
- data += info.pack()
- data += pati.lp2_string(message)
- self.try_send_packet_to(recipient_id, PatID4.NtcFriendAccept, data,
- seq)
- def recvReqFriendDelete(self, packet_id, data, seq):
- """ReqFriendDelete packet.
- ID: 66530100
- JP: フレンドデータ削除要求
- TR: Friend data deletion request
- """
- capcom_id = to_str(pati.unpack_lp2_string(data))
- self.sendAnsFriendDelete(capcom_id, seq)
- def sendAnsFriendDelete(self, capcom_id, seq):
- """AnsFriendDelete packet.
- ID: 66530200
- JP: フレンドデータ削除返答
- TR: Friend data deletion response
- """
- self.session.delete_friend(capcom_id)
- self.send_packet(PatID4.AnsFriendDelete, b"", seq)
- def recvReqLayerChildListHead(self, packet_id, data, seq):
- """ReqLayerChildListHead packet.
- ID: 64240100
- JP: 子レイヤリスト数要求
- TR: Child layer list count request
- """
- unk1, unk2 = struct.unpack_from(">II", data) # 1st/last index?
- layer_info = data[8:8+0xf]
- self.sendAnsLayerChildListHead(unk1, unk2, layer_info, seq)
- def sendAnsLayerChildListHead(self, unk1, unk2, layer_info, seq):
- """AnsLayerChildListHead packet.
- ID: 64240200
- JP: 子レイヤリスト数返答
- TR: Child layer list count response
- """
- unk = 0
- count = len(self.session.get_layer_children())
- data = struct.pack(">II", unk, count)
- self.send_packet(PatID4.AnsLayerChildListHead, data, seq)
- def recvReqLayerChildListData(self, packet_id, data, seq):
- """ReqLayerChildListData packet.
- ID: 64250100
- JP: 子レイヤリスト要求
- TR: Child layer list request
- """
- first_index, count = struct.unpack_from(">II", data)
- self.sendAnsLayerChildListData(first_index, count, seq)
- def sendAnsLayerChildListData(self, first_index, count, seq):
- """AnsLayerChildListData packet.
- ID: 64250200
- JP: 子レイヤリスト返答
- TR: Child layer list response
- """
- unk = first_index
- data = struct.pack(">II", unk, count)
- data += pati.get_layer_children(self.session, first_index, count)
- self.send_packet(PatID4.AnsLayerChildListData, data, seq)
- def recvReqLayerChildListFoot(self, packet_id, data, seq):
- """ReqLayerChildListFoot packet.
- ID: 64260100
- JP: 子レイヤリスト終了要求
- TR: Child layer list end of transmission request
- """
- self.sendAnsLayerChildListFoot(seq)
- def sendAnsLayerChildListFoot(self, seq):
- """AnsLayerChildListFoot packet.
- ID: 64260200
- JP: 子レイヤリスト終了返答
- TR: Child layer list end of transmission response
- """
- self.send_packet(PatID4.AnsLayerChildListFoot, b"", seq)
- def recvReqLayerSiblingListHead(self, packet_id, data, seq):
- """ReqLayerSiblingListHead packet.
- ID: 64270100
- JP: 兄弟レイヤリスト数要求
- TR: Layer sibling list count request
- Sent by the game when leaving the gate via the entrance:
- - Relocate > Change Gates
- TODO: Merge this with ReqLayerChildListHead?
- """
- unk1, unk2 = struct.unpack_from(">II", data) # 1st/last index?
- layer_info = data[8:8+0xf]
- self.sendAnsLayerSiblingListHead(unk1, unk2, layer_info, seq)
- def sendAnsLayerSiblingListHead(self, unk1, unk2, layer_info, seq):
- """AnsLayerSiblingListHead packet.
- ID: 64270200
- JP: 兄弟レイヤリスト数返答
- TR: Layer sibling list count response
- TODO: Merge this with AnsLayerChildListHead?
- """
- unk = 0
- count = len(self.session.get_layer_sibling())
- data = struct.pack(">II", unk, count)
- self.send_packet(PatID4.AnsLayerSiblingListHead, data, seq)
- def recvReqLayerSiblingListData(self, packet_id, data, seq):
- """ReqLayerSiblingListData packet.
- ID: 64280100
- JP: 兄弟レイヤリスト要求
- TR: Layer sibling list request
- TODO: Merge this with ReqLayerChildListData?
- """
- first_index, count = struct.unpack_from(">II", data)
- self.sendAnsLayerSiblingListData(first_index, count, seq)
- def sendAnsLayerSiblingListData(self, first_index, count, seq):
- """AnsLayerSiblingListData packet.
- ID: 64280200
- JP: 兄弟レイヤリスト返答
- TR: Layer sibling list response
- TODO: Merge this with AnsLayerChildListData?
- """
- unk = first_index
- data = struct.pack(">II", unk, count)
- data += pati.get_layer_sibling(self.session, first_index, count)
- self.send_packet(PatID4.AnsLayerSiblingListData, data, seq)
- def recvReqLayerSiblingListFoot(self, packet_id, data, seq):
- """ReqLayerSiblingListFoot packet.
- ID: 64290100
- JP: 兄弟レイヤリスト終了要求
- TR: Layer sibling list end of transmission request
- """
- self.sendAnsLayerSiblingListFoot(seq)
- def sendAnsLayerSiblingListFoot(self, seq):
- """AnsLayerSiblingListFoot packet.
- ID: 64290200
- JP: 兄弟レイヤリスト終了返答
- TR: Layer sibling list end of transmission response
- """
- self.send_packet(PatID4.AnsLayerSiblingListFoot, b"", seq)
- def recvReqLayerChildInfo(self, packet_id, data, seq):
- """ReqLayerChildInfo packet.
- ID: 64230100
- JP: 子レイヤ情報要求
- TR: Child layer information request
- """
- unk1, = struct.unpack_from(">H", data)
- layer_data = data[2:] # layer_data with some unknown ones appended
- self.sendAnsLayerChildInfo(unk1, layer_data, seq)
- def sendAnsLayerChildInfo(self, unk1, layer_data, seq):
- """AnsLayerChildInfo packet.
- ID: 64230200
- JP: 子レイヤ情報返答
- TR: Child layer information response
- """
- data = struct.pack(">H", 1)
- data += pati.getDummyLayerData().pack()
- data += b"\0" * 2
- self.send_packet(PatID4.AnsLayerChildInfo, data, seq)
- def recvReqLayerDown(self, packet_id, data, seq):
- """ReqLayerDown packet.
- ID: 64140100
- JP: レイヤダウン要求(番号指定)
- TR: Layer down request (number specified)
- """
- layer_id, = struct.unpack_from(">H", data) # WordInc
- layer_set = data[2:] # TODO parse LayerSet
- self.sendAnsLayerDown(layer_id, layer_set, seq)
- def sendAnsLayerDown(self, layer_id, layer_set, seq):
- """AnsLayerDown packet.
- ID: 64140200
- JP: レイヤダウン返答
- TR: Layer down response
- """
- data = struct.pack(">H", layer_id)
- self.session.layer_down(layer_id)
- self.send_packet(PatID4.AnsLayerDown, data, seq)
- def recvReqUserStatusSet(self, packet_id, data, seq):
- """ReqUserStatusSet packet.
- ID: 66400100
- JP: ユーザステータス設定要求
- TR: User status settings request
- """
- status_set = pati.UserStatusSet.unpack(data)
- self.server.debug("UserStatusSet: {!r}".format(status_set))
- self.sendAnsUserStatusSet(status_set, seq)
- def sendAnsUserStatusSet(self, status_set, seq):
- """AnsUserStatusSet packet.
- ID: 66400200
- JP: ユーザステータス設定返答
- TR: User status settings response
- """
- self.send_packet(PatID4.AnsUserStatusSet, b"", seq)
- def recvReqUserBinaryNotice(self, packet_id, data, seq):
- """ReqUserBinaryNotice packet.
- ID: 66320100
- JP: ユーザ表示用バイナリ通知要求
- TR: User display binary notification request
- """
- with pati.Unpacker(data) as unpacker:
- unk1, = unpacker.struct(">B")
- capcom_id = to_str(unpacker.lp2_string())
- offset, length = unpacker.struct(">II")
- self.sendAnsUserBinaryNotice(unk1, capcom_id, offset, length, seq)
- def sendAnsUserBinaryNotice(self, unk1, capcom_id, offset, length, seq):
- """AnsUserBinaryNotice packet.
- ID: 66320200
- JP: ユーザ表示用バイナリ通知返答
- TR: User display binary notification response
- """
- self.sendNtcUserBinaryNotice(unk1, capcom_id, offset, length, seq)
- self.send_packet(PatID4.AnsUserBinaryNotice, b"", seq)
- def sendNtcUserBinaryNotice(self, unk1, capcom_id, offset, length, seq):
- """NtcUserBinaryNotice packet.
- ID: 66321000
- JP: ユーザ表示用バイナリ通知通知
- TR: User display binary notification notice
- """
- data = struct.pack(">B", unk1)
- data += pati.lp2_string(self.session.capcom_id)
- data += struct.pack(">I", offset)
- data += pati.lp2_string(self.session.hunter_info.pack()[offset:
- offset+length])
- self.server.layer_broadcast(self.session, PatID4.NtcUserBinaryNotice,
- data, seq)
- def recvReqLayerDetailSearchHead(self, packet_id, data, seq):
- """ReqLayerDetailSearchHead packet.
- ID: 64900100
- JP: レイヤ検索詳細数要求
- TR: Layer detail search count request
- """
- with pati.Unpacker(data) as unpacker:
- self.search_info = {
- "layer": unpacker.struct(">B")[0],
- "search": unpacker.detailed_optional_fields(),
- "offset": unpacker.struct(">I")[0],
- "limit": unpacker.struct(">I")[0],
- "layer_fields": unpacker.bytes(),
- "user_fields": unpacker.bytes()
- }
- self.server.debug((
- "ReqLayerDetailSearchHead("
- "layer={layer!r}, search={search!r}, "
- "offset={offset}, limit={limit}, "
- "layer_fields={layer_fields!r}, user_fields={user_fields!r})"
- ).format(**self.search_info))
- self.sendAnsLayerDetailSearchHead(data, seq)
- def sendAnsLayerDetailSearchHead(self, data, seq):
- """AnsLayerDetailSearchHead packet.
- ID: 64900200
- JP: レイヤ検索詳細数返答
- TR: Layer detail search count response
- TODO: Find what to send.
- """
- unk1 = 0
- unk2 = 0
- self.search_data = self.session.layer_detail_search(
- self.search_info["search"]["data"]
- )
- count = len(self.search_data)
- data = struct.pack(">III", unk1, count, unk2)
- self.send_packet(PatID4.AnsLayerDetailSearchHead, data, seq)
- def recvReqLayerDetailSearchData(self, packet_id, data, seq):
- """ReqLayerDetailSearchData packet.
- ID: 64910100
- JP: レイヤ検索詳細データ要求
- TR: Layer detail search data request
- TODO: Handle this request.
- """
- offset, count = struct.unpack_from(">II", data)
- self.sendAnsLayerDetailSearchData(offset, count, seq)
- def sendAnsLayerDetailSearchData(self, offset, count, seq):
- """AnsLayerDetailSearchData packet.
- ID: 64910200
- JP: レイヤ検索詳細データ返答
- TR: Layer detail search data response
- TODO: Handle this response.
- """
- unk = 0
- cities = self.search_data
- data = struct.pack(">II", unk, len(cities))
- for i, city in enumerate(cities):
- with city.lock():
- layer_data = pati.LayerData.create_from(i, city)
- layer_data.assert_fields(self.search_info["layer_fields"])
- data += layer_data.pack()
- data += pati.pack_optional_fields(city.optional_fields)
- with city.players.lock():
- data += struct.pack(">I", len(city.players))
- for _, player in city.players:
- layer_user = pati.LayerUserInfo()
- layer_user.capcom_id = pati.String(player.capcom_id)
- layer_user.hunter_name = pati.String(
- player.hunter_name
- )
- layer_user.assert_fields(
- self.search_info["user_fields"]
- )
- data += layer_user.pack()
- data += pati.pack_optional_fields(
- player.get_optional_fields()
- )
- self.send_packet(PatID4.AnsLayerDetailSearchData, data, seq)
- def recvReqLayerDetailSearchFoot(self, packet_id, data, seq):
- """ReqLayerDetailSearchFoot packet.
- ID: 64920100
- JP: レイヤ検索詳細終了要求
- TR: Layer detail search end of transmission request
- """
- self.sendAnsLayerDetailSearchFoot(data, seq)
- def sendAnsLayerDetailSearchFoot(self, data, seq):
- """AnsLayerDetailSearchFoot packet.
- ID: 64920200
- JP: レイヤ検索詳細終了返答
- TR: Layer detail search end of transmission response
- """
- self.send_packet(PatID4.AnsLayerDetailSearchFoot, b"", seq)
- def recvReqLayerCreateHead(self, packet_id, data, seq):
- """ReqLayerCreateHead packet.
- ID: 64110100
- JP: レイヤ作成要求(番号指定)
- TR: Layer creation request (number specified)
- TODO: Lock it to prevent race-condition.
- """
- number, = struct.unpack(">H", data)
- self.sendAnsLayerCreateHead(number, seq)
- def sendAnsLayerCreateHead(self, number, seq):
- """AnsLayerCreateHead packet.
- ID: 64110200
- JP: レイヤ作成返答
- TR: Layer creation response
- """
- data = struct.pack(">H", number)
- if not self.session.is_city_empty(number):
- self.sendAnsAlert(PatID4.AnsLayerCreateHead,
- "<LF=8><BODY><CENTER>City already exists.<END>",
- seq)
- return
- if not self.session.reserve_city(number, True):
- self.sendAnsAlert(PatID4.AnsLayerCreateHead,
- "<LF=8><BODY><CENTER>City reserved.<END>",
- seq)
- return
- self.send_packet(PatID4.AnsLayerCreateHead, data, seq)
- path = self.session.get_layer_path()
- path.city_id = number
- self.notify_city_info_set(path)
-
- def recvReqLayerCreateSet(self, packet_id, data, seq):
- """ReqLayerCreateSet packet.
- ID: 64120100
- JP: レイヤ作成設定要求(番号指定)
- TR: Layer creation settings request (number specified)
- """
- with pati.Unpacker(data) as unpacker:
- number, = unpacker.struct(">H")
- layer_set = unpacker.LayerSet()
- extra = unpacker.optional_fields()
- self.server.debug("LayerCreateSet: {}, {!r}, {!r}".format(
- number, layer_set, extra))
- self.sendAnsLayerCreateSet(number, layer_set, extra, seq)
- def sendAnsLayerCreateSet(self, number, layer_set, extra, seq):
- """AnsLayerCreateSet packet.
- ID: 64120200
- JP: レイヤ作成設定返答
- TR: Layer creation settings response
- """
- data = struct.pack(">H", number)
- self.session.layer_create(number, layer_set, extra)
- self.send_packet(PatID4.AnsLayerCreateSet, data, seq)
- path = self.session.get_layer_path()
- self.notify_city_number_set(path)
-
- def recvReqLayerCreateFoot(self, packet_id, data, seq):
- """ReqLayerCreateFoot packet.
- ID: 64130100
- JP: レイヤ作成完了要求(番号指定)
- TR: Layer creation end of transmission request
- """
- number, unk = struct.unpack(">HB", data)
- self.session.reserve_city(number, False)
- self.sendAnsLayerCreateFoot(number, unk, seq)
- def sendAnsLayerCreateFoot(self, number, unk, seq):
- """AnsLayerCreateFoot packet.
- ID: 64130200
- JP: レイヤ作成完了返答
- TR: Layer creation end of transmission response
- """
- data = struct.pack(">H", number)
- self.send_packet(PatID4.AnsLayerCreateFoot, data, seq)
- path = self.session.get_layer_path()
- path.city_id = number
- self.notify_city_info_set(path)
- def recvReqLayerUp(self, packet_id, data, seq):
- """ReqLayerUp packet.
- ID: 64150100
- JP: レイヤアップ要求
- TR: Layer up request
- Sent by the game when leaving the gate via the entrance:
- - Relocate > Select Server
- """
- self.sendAnsLayerUp(data, seq)
- def sendAnsLayerUp(self, data, seq):
- """AnsLayerUp packet.
- ID: 64150200
- JP: レイヤアップ返答
- TR: Layer up response
- """
- self.notify_layer_departure(False)
- self.send_packet(PatID4.AnsLayerUp, b"", seq)
- @staticmethod
- def packNtcLayerInfoSet(layer_path, layer_data, optional_fields):
- # type: (pati.LayerPath, pati.LayerData, List[int]) -> bytes
- """NtcLayerInfoSet packet.
- ID: 64201000
- JP: レイヤ情報設定通知
- TR: Layer information setting notification
- """
- data = pati.lp2_string(layer_path.pack())
- data += layer_data.pack()
- data += pati.pack_optional_fields(optional_fields)
- return data
- def recvReqLayerMediationList(self, packet_id, data, seq):
- """ReqLayerMediationList packet.
- ID: 64820100
- JP: レイヤ調停データリスト取得要求
- TR: Get layer mediation list request
- """
- unk1, unk2 = struct.unpack_from(">BB", data)
- unk3 = data[2:]
- self.sendAnsLayerMediationList(unk1, unk2, unk3, seq)
- def sendAnsLayerMediationList(self, unk1, unk2, unk3, seq):
- """AnsLayerMediationList packet.
- ID: 64820200
- JP: レイヤ調停データリスト取得返答
- TR: Get layer mediation list response
- TODO: Store the item list and its state in the city.
- """
- unk = 0
- count = 14
- data = struct.pack(">BB", unk, count)
- for i in range(1, count+1):
- item = pati.MediationListItem()
- item.name = pati.String(b"Test")
- item.index = pati.Byte(i)
- item.is_locked = pati.Byte(0)
- data += item.pack()
- self.send_packet(PatID4.AnsLayerMediationList, data, seq)
- def recvReqLayerMediationLock(self, packet_id, data, seq):
- """ReqLayerMediationLock packet.
- ID: 64800100
- JP: レイヤ調停データ確保要求
- TR: Layer mediation data lock request
- Sent by the game when the player want to acquire a resource that
- cannot be shared by multiple players at the same time.
- - Seats are from index 1 to 12
- - Arm wrestling is from index 13 to 14
- """
- index, = struct.unpack_from(">B", data)
- mediation_data = pati.MediationListItem.unpack(data, 1)
- self.server.debug("LayerMediationLock: {}, {!r}".format(
- index, mediation_data))
- self.sendAnsLayerMediationLock(index, mediation_data, seq)
- def sendAnsLayerMediationLock(self, index, mediation_data, seq):
- """AnsLayerMediationLock packet.
- ID: 64800200
- JP: レイヤ調停データ確保返答
- TR: Layer mediation data lock response
- TODO: Enable notifications and handle player disconnection.
- """
- success = 1
- data = struct.pack(">B", success)
- # self.sendNtcLayerMediationLock(index, mediation_data, seq)
- self.send_packet(PatID4.AnsLayerMediationLock, data, seq)
- def sendNtcLayerMediationLock(self, index, mediation_data, seq):
- """NtcLayerMediationLock packet.
- ID: 64801000
- JP: レイヤ調停データ確保通知
- TR: Layer mediation data lock notification
- """
- data = pati.lp2_string(self.session.capcom_id)
- data += struct.pack(">B", index)
- data += mediation_data.pack()
- self.server.layer_broadcast(self.session,
- PatID4.NtcLayerMediationLock, data, seq)
- def recvReqLayerMediationUnlock(self, packet_id, data, seq):
- """ReqLayerMediationUnlock packet.
- ID: 64810100
- JP: レイヤ調停データ開放要求
- TR: Layer mediation data unlock request
- """
- index, = struct.unpack_from(">B", data)
- mediation_data = pati.MediationListItem.unpack(data, 1)
- self.server.debug("LayerMediationUnlock: {}, {!r}".format(
- index, mediation_data))
- self.sendAnsLayerMediationUnlock(index, mediation_data, seq)
- def sendAnsLayerMediationUnlock(self, index, mediation_data, seq):
- """AnsLayerMediationUnlock packet.
- ID: 64810200
- JP: レイヤ調停データ開放要求
- TR: Layer mediation data unlock response
- TODO: Enable notifications and handle player disconnection.
- """
- success = 1
- data = struct.pack(">B", success)
- # self.sendNtcLayerMediationUnlock(index, mediation_data, seq)
- self.send_packet(PatID4.AnsLayerMediationUnlock, data, seq)
- def sendNtcLayerMediationUnlock(self, index, mediation_data, seq):
- """NtcLayerMediationUnlock packet.
- ID: 64811000
- JP: レイヤ調停データ開放通知
- TR: Layer mediation data unlock notification
- """
- data = pati.lp2_string(self.session.capcom_id)
- data += struct.pack(">B", index)
- data += mediation_data.pack()
- self.server.layer_broadcast(self.session,
- PatID4.NtcLayerMediationUnlock, data, seq)
- def sendNtcLayerHost(self, new_leader, seq):
- """NtcLayerHost packet.
- ID: 64411000
- JP: レイヤのホスト通知
- TR: Layer host notification
- """
- data = struct.pack(">HII", 0x08, 0x00, 0x00) # unk short array patitem
- data += pati.lp2_string(new_leader.capcom_id)
- data += pati.lp2_string(new_leader.hunter_name)
- self.server.layer_broadcast(self.session, PatID4.NtcLayerHost, data,
- seq)
- def sendNtcCircleHostHandover(self, circle, new_leader, new_leader_index,
- seq):
- """NtcCircleHostHandover packet.
- ID: 65401000
- JP: サークルのホスト移譲通知
- TR: Circle host transfer notice
- This packet is presently unused as it does not appear to have
- any valid callback function branch within Monster Hunter 3.
- """
- unk1 = 0
- data = struct.pack(">IB", unk1, new_leader_index + 1)
- data += pati.lp2_string(new_leader.capcom_id)
- data += pati.lp2_string(new_leader.hunter_name)
- data += pati.lp2_string(b"") # Unk max-1 length short array
- self.server.circle_broadcast(circle, PatID4.NtcCircleHostHandover,
- data, seq, self.session)
- def sendNtcCircleHost(self, circle, new_leader, new_leader_index, seq):
- """NtcCircleHost packet.
- ID: 65411000
- JP: サークルのホスト通知
- TR: Circle host notification
- """
- unk1 = 0
- data = struct.pack(">IB", unk1, new_leader_index + 1)
- data += pati.lp2_string(new_leader.capcom_id)
- data += pati.lp2_string(new_leader.hunter_name)
- self.server.circle_broadcast(circle, PatID4.NtcCircleHost, data,
- seq, self.session)
- def notify_city_info_set(self, path):
- # type: (pati.LayerPath) -> None
- city = self.get_layer(path)
- assert isinstance(city, db.City)
- gate = city.parent
- layer_data = pati.LayerData.create_from(path.city_id, city, path)
- info_set = self.packNtcLayerInfoSet(path, layer_data,
- city.optional_fields)
- self.server.broadcast(gate.players, PatID4.NtcLayerInfoSet, info_set, 0,
- self.session)
-
- def notify_city_number_set(self, path):
- # type: (pati.LayerPath) -> None
- city = self.get_layer(path)
- assert isinstance(city, db.City)
- gate = city.parent
- layer_data = pati.LayerData.create_from(path.city_id, city, path)
- number_set = self.packNtcLayerUserNum(4, layer_data)
- self.server.broadcast(gate.players, PatID4.NtcLayerUserNum, number_set, 0,
- self.session)
-
- @staticmethod
- def get_layer(path):
- # type: (pati.LayerPath) -> Optional[db.Server | db.Gate | db.City]
- database = db.get_instance()
- if path.city_id > 0:
- return database.get_city(path.server_id, path.gate_id, path.city_id)
- elif path.gate_id > 0:
- return database.get_gate(path.server_id, path.gate_id)
- elif path.server_id > 0:
- return database.get_server(path.server_id)
- return None
- def notify_layer_departure(self, end):
- # type: (bool) -> None
- path = self.session.get_layer_path()
- if self.session.layer == 2:
- new_host = self.session.try_transfer_city_leadership()
- if new_host:
- self.sendNtcLayerHost(new_host, 0)
- if self.session.layer > 0:
- ntc_data = pati.lp2_string(self.session.capcom_id)
- self.server.layer_broadcast(self.session, PatID4.NtcLayerOut,
- ntc_data, 0)
- if self.session.local_info['circle_id'] is not None:
- self.notify_circle_leave(self.session.local_info['circle_id'] + 1,
- seq=0)
- if end:
- self.session.layer_end()
- else:
- self.session.layer_up()
- if path.city_id > 0:
- city = self.get_layer(path)
- assert isinstance(city, db.City)
- self.notify_city_number_set(path)
- if city.leader is None:
- self.notify_city_info_set(path)
- def notify_circle_leave(self, circle_index, seq):
- circle = self.session.get_circle()
- with circle.lock():
- self.sendNtcCircleLeave(circle, circle_index, seq)
- if circle.leader == self.session:
- if circle.departed:
- new_host_index, new_host = \
- self.session.try_transfer_circle_leadership()
- if new_host:
- self.sendNtcCircleHost(
- circle, new_host, new_host_index, seq
- )
- else:
- self.sendNtcCircleBreak(circle, seq)
- self.session.leave_circle()
- self.sendNtcCircleListLayerChange(circle, circle_index, seq)
- def on_exception(self, e):
- # type: (Exception) -> None
- self.server.error(traceback.format_exc())
- self.send_error("{}: {}".format(type(e).__name__, str(e)))
- def on_finish(self):
- self.notify_layer_departure(True)
- self.session.disconnect()
- self.session.delete()
- self.server.del_from_debug(self)
- self.server.info("Client finished!")
- def on_packet(self, data):
- if not data:
- self.finish()
- return
- packet_id, data, seq = data
- self.server.info(
- "RECV %s[ID=%08x; Seq=%04x]",
- PAT_NAMES.get(packet_id, "Packet"),
- packet_id, seq
- )
- self.server.debug("%s", hexdump(data))
- self.dispatch(packet_id, data, seq)
- def send_packet(self, packet_id=0, data=b'', seq=0):
- super(PatRequestHandler, self).send_packet(packet_id, data, seq)
- self.server.info(
- "SEND %s[ID=%08x; Seq=%04x]",
- PAT_NAMES.get(packet_id, "Packet"),
- packet_id, seq
- )
- self.server.debug("%s", hexdump(data))
- def dispatch(self, packet_id, data, seq):
- """Packet dispatcher."""
- if packet_id not in PAT_NAMES:
- self.server.error("Unknown packet ID: %08x", packet_id)
- return
- name = "recv{}".format(PAT_NAMES[packet_id])
- if not hasattr(self, name):
- self.server.error("Unsupported packet: %08x | %s", packet_id, name)
- return
- handler = getattr(self, name)
- return handler(packet_id, data, seq)
- def on_tick(self):
- if not self.requested_connection:
- # TODO: Investigate why do we need to wait a certain amount of
- # seconds before sending the `ReqConnection` packet?
- if self.ping_timer.elapsed() >= 1.5:
- self.requested_connection = True
- self.sendReqConnection()
- return
- # Send a ping with 30 seconds interval
- if self.ping_timer.elapsed() >= 30:
- if not self.server.debug_enabled() and not self.line_check:
- raise Exception("Client timed out.")
- self.line_check = False
- self.sendReqLineCheck()
- self.ping_timer.restart()
|