proto.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. from math import ceil
  2. import base64
  3. import io
  4. import traceback
  5. def byte(n):
  6. return bytes((n,))
  7. def varint_encode(offset):
  8. '''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
  9. The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
  10. aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
  11. 1ccccccc 1bbbbbbb 0aaaaaaa
  12. This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
  13. See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
  14. needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case.
  15. encoded_bytes = bytearray(needed_bytes)
  16. for i in range(0, needed_bytes - 1):
  17. encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
  18. offset = offset >> 7
  19. encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
  20. return bytes(encoded_bytes)
  21. def varint_decode(encoded):
  22. decoded = 0
  23. for i, byte in enumerate(encoded):
  24. decoded |= (byte & 127) << 7*i
  25. if not (byte & 128):
  26. break
  27. return decoded
  28. def string(field_number, data):
  29. data = as_bytes(data)
  30. return _proto_field(2, field_number, varint_encode(len(data)) + data)
  31. nested = string
  32. def uint(field_number, value):
  33. return _proto_field(0, field_number, varint_encode(value))
  34. def _proto_field(wire_type, field_number, data):
  35. ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
  36. return varint_encode((field_number << 3) | wire_type) + data
  37. def percent_b64encode(data):
  38. return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
  39. def unpadded_b64encode(data):
  40. return base64.urlsafe_b64encode(data).replace(b'=', b'')
  41. def as_bytes(value):
  42. if isinstance(value, str):
  43. return value.encode('utf-8')
  44. return value
  45. def read_varint(data):
  46. result = 0
  47. i = 0
  48. while True:
  49. try:
  50. byte = data.read(1)[0]
  51. except IndexError:
  52. if i == 0:
  53. raise EOFError()
  54. raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
  55. result |= (byte & 127) << 7*i
  56. if not byte & 128:
  57. break
  58. i += 1
  59. return result
  60. def read_group(data, end_sequence):
  61. start = data.tell()
  62. index = data.original.find(end_sequence, start)
  63. if index == -1:
  64. raise Exception('Unterminated group')
  65. data.seek(index + len(end_sequence))
  66. return data.original[start:index]
  67. def read_protobuf(data):
  68. data_original = data
  69. data = io.BytesIO(data)
  70. data.original = data_original
  71. while True:
  72. try:
  73. tag = read_varint(data)
  74. except EOFError:
  75. break
  76. wire_type = tag & 7
  77. field_number = tag >> 3
  78. if wire_type == 0:
  79. value = read_varint(data)
  80. elif wire_type == 1:
  81. value = data.read(8)
  82. elif wire_type == 2:
  83. length = read_varint(data)
  84. value = data.read(length)
  85. elif wire_type == 3:
  86. end_bytes = encode_varint((field_number << 3) | 4)
  87. value = read_group(data, end_bytes)
  88. elif wire_type == 5:
  89. value = data.read(4)
  90. else:
  91. raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(succinct_encode(tag)) + ", at position " + str(data.tell()))
  92. yield (wire_type, field_number, value)
  93. def parse(data, include_wire_type=False):
  94. '''Returns a dict mapping field numbers to values
  95. data is the protobuf structure, which must not be b64-encoded'''
  96. if include_wire_type:
  97. return {field_number: [wire_type, value]
  98. for wire_type, field_number, value in read_protobuf(data)}
  99. return {field_number: value
  100. for _, field_number, value in read_protobuf(data)}
  101. base64_enc_funcs = {
  102. 'base64': base64.urlsafe_b64encode,
  103. 'base64s': unpadded_b64encode,
  104. 'base64p': percent_b64encode,
  105. }
  106. def _make_protobuf(data):
  107. '''
  108. Input: Recursive list of protobuf objects or base-64 encodings
  109. Output: Protobuf bytestring
  110. Each protobuf object takes the form [wire_type, field_number, field_data]
  111. If a string protobuf has a list/tuple of length 2, this has the form
  112. (base64 type, data)
  113. The base64 types are
  114. - base64 means a base64 encode with equals sign paddings
  115. - base64s means a base64 encode without padding
  116. - base64p means a url base64 encode with equals signs replaced with %3D
  117. '''
  118. # must be dict mapping field_number to [wire_type, value]
  119. if isinstance(data, dict):
  120. new_data = []
  121. for field_num, (wire_type, value) in sorted(data.items()):
  122. new_data.append((wire_type, field_num, value))
  123. data = new_data
  124. if isinstance(data, str):
  125. return data.encode('utf-8')
  126. elif len(data) == 2 and data[0] in list(base64_enc_funcs.keys()):
  127. return base64_enc_funcs[data[0]](_make_protobuf(data[1]))
  128. elif isinstance(data, list):
  129. result = b''
  130. for field in data:
  131. if field[0] == 0:
  132. result += uint(field[1], field[2])
  133. elif field[0] == 2:
  134. result += string(field[1], _make_protobuf(field[2]))
  135. else:
  136. raise NotImplementedError('Wire type ' + str(field[0])
  137. + ' not implemented')
  138. return result
  139. return data
  140. def make_protobuf(data):
  141. return _make_protobuf(data).decode('ascii')
  142. def _set_protobuf_value(data, *path, value):
  143. if not path:
  144. return value
  145. op = path[0]
  146. if op in base64_enc_funcs:
  147. inner_data = b64_to_bytes(data)
  148. return base64_enc_funcs[op](
  149. _set_protobuf_value(inner_data, *path[1:], value=value)
  150. )
  151. pb_dict = parse(data, include_wire_type=True)
  152. pb_dict[op][1] = _set_protobuf_value(
  153. pb_dict[op][1], *path[1:], value=value
  154. )
  155. return _make_protobuf(pb_dict)
  156. def set_protobuf_value(data, *path, value):
  157. '''Set a field's value in a raw protobuf structure
  158. path is a list of field numbers and/or base64 encoding directives
  159. The directives are
  160. base64: normal base64 encoding with equal signs padding
  161. base64s ("stripped"): no padding
  162. base64p: %3D instead of = for padding
  163. return new_protobuf, err'''
  164. try:
  165. new_protobuf = _set_protobuf_value(data, *path, value=value)
  166. return new_protobuf.decode('ascii'), None
  167. except Exception:
  168. return None, traceback.format_exc()
  169. def b64_to_bytes(data):
  170. if isinstance(data, bytes):
  171. data = data.decode('ascii')
  172. data = data.replace("%3D", "=")
  173. return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))