CryptMessagePlugin.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import base64
  2. import os
  3. import gevent
  4. from Plugin import PluginManager
  5. from Crypt import CryptBitcoin, CryptHash
  6. from Config import config
  7. import sslcrypto
  8. from . import CryptMessage
  9. curve = sslcrypto.ecc.get_curve("secp256k1")
  10. @PluginManager.registerTo("UiWebsocket")
  11. class UiWebsocketPlugin(object):
  12. # - Actions -
  13. # Returns user's public key unique to site
  14. # Return: Public key
  15. def actionUserPublickey(self, to, index=0):
  16. self.response(to, self.user.getEncryptPublickey(self.site.address, index))
  17. # Encrypt a text using the publickey or user's sites unique publickey
  18. # Return: Encrypted text using base64 encoding
  19. def actionEciesEncrypt(self, to, text, publickey=0, return_aes_key=False):
  20. if type(publickey) is int: # Encrypt using user's publickey
  21. publickey = self.user.getEncryptPublickey(self.site.address, publickey)
  22. aes_key, encrypted = CryptMessage.eciesEncrypt(text.encode("utf8"), publickey)
  23. if return_aes_key:
  24. self.response(to, [base64.b64encode(encrypted).decode("utf8"), base64.b64encode(aes_key).decode("utf8")])
  25. else:
  26. self.response(to, base64.b64encode(encrypted).decode("utf8"))
  27. # Decrypt a text using privatekey or the user's site unique private key
  28. # Return: Decrypted text or list of decrypted texts
  29. def actionEciesDecrypt(self, to, param, privatekey=0):
  30. if type(privatekey) is int: # Decrypt using user's privatekey
  31. privatekey = self.user.getEncryptPrivatekey(self.site.address, privatekey)
  32. if type(param) == list:
  33. encrypted_texts = param
  34. else:
  35. encrypted_texts = [param]
  36. texts = CryptMessage.eciesDecryptMulti(encrypted_texts, privatekey)
  37. if type(param) == list:
  38. self.response(to, texts)
  39. else:
  40. self.response(to, texts[0])
  41. # Encrypt a text using AES
  42. # Return: Iv, AES key, Encrypted text
  43. def actionAesEncrypt(self, to, text, key=None):
  44. if key:
  45. key = base64.b64decode(key)
  46. else:
  47. key = sslcrypto.aes.new_key()
  48. if text:
  49. encrypted, iv = sslcrypto.aes.encrypt(text.encode("utf8"), key)
  50. else:
  51. encrypted, iv = b"", b""
  52. res = [base64.b64encode(item).decode("utf8") for item in [key, iv, encrypted]]
  53. self.response(to, res)
  54. # Decrypt a text using AES
  55. # Return: Decrypted text
  56. def actionAesDecrypt(self, to, *args):
  57. if len(args) == 3: # Single decrypt
  58. encrypted_texts = [(args[0], args[1])]
  59. keys = [args[2]]
  60. else: # Batch decrypt
  61. encrypted_texts, keys = args
  62. texts = [] # Decoded texts
  63. for iv, encrypted_text in encrypted_texts:
  64. encrypted_text = base64.b64decode(encrypted_text)
  65. iv = base64.b64decode(iv)
  66. text = None
  67. for key in keys:
  68. try:
  69. decrypted = sslcrypto.aes.decrypt(encrypted_text, iv, base64.b64decode(key))
  70. if decrypted and decrypted.decode("utf8"): # Valid text decoded
  71. text = decrypted.decode("utf8")
  72. except Exception as err:
  73. pass
  74. texts.append(text)
  75. if len(args) == 3:
  76. self.response(to, texts[0])
  77. else:
  78. self.response(to, texts)
  79. # Sign data using ECDSA
  80. # Return: Signature
  81. def actionEcdsaSign(self, to, data, privatekey=None):
  82. if privatekey is None: # Sign using user's privatekey
  83. privatekey = self.user.getAuthPrivatekey(self.site.address)
  84. self.response(to, CryptBitcoin.sign(data, privatekey))
  85. # Verify data using ECDSA (address is either a address or array of addresses)
  86. # Return: bool
  87. def actionEcdsaVerify(self, to, data, address, signature):
  88. self.response(to, CryptBitcoin.verify(data, address, signature))
  89. # Gets the publickey of a given privatekey
  90. def actionEccPrivToPub(self, to, privatekey):
  91. self.response(to, curve.private_to_public(curve.wif_to_private(privatekey.encode())))
  92. # Gets the address of a given publickey
  93. def actionEccPubToAddr(self, to, publickey):
  94. self.response(to, curve.public_to_address(bytes.fromhex(publickey)))
  95. @PluginManager.registerTo("User")
  96. class UserPlugin(object):
  97. def getEncryptPrivatekey(self, address, param_index=0):
  98. if param_index < 0 or param_index > 1000:
  99. raise Exception("Param_index out of range")
  100. site_data = self.getSiteData(address)
  101. if site_data.get("cert"): # Different privatekey for different cert provider
  102. index = param_index + self.getAddressAuthIndex(site_data["cert"])
  103. else:
  104. index = param_index
  105. if "encrypt_privatekey_%s" % index not in site_data:
  106. address_index = self.getAddressAuthIndex(address)
  107. crypt_index = address_index + 1000 + index
  108. site_data["encrypt_privatekey_%s" % index] = CryptBitcoin.hdPrivatekey(self.master_seed, crypt_index)
  109. self.log.debug("New encrypt privatekey generated for %s:%s" % (address, index))
  110. return site_data["encrypt_privatekey_%s" % index]
  111. def getEncryptPublickey(self, address, param_index=0):
  112. if param_index < 0 or param_index > 1000:
  113. raise Exception("Param_index out of range")
  114. site_data = self.getSiteData(address)
  115. if site_data.get("cert"): # Different privatekey for different cert provider
  116. index = param_index + self.getAddressAuthIndex(site_data["cert"])
  117. else:
  118. index = param_index
  119. if "encrypt_publickey_%s" % index not in site_data:
  120. privatekey = self.getEncryptPrivatekey(address, param_index).encode()
  121. publickey = curve.private_to_public(curve.wif_to_private(privatekey) + b"\x01")
  122. site_data["encrypt_publickey_%s" % index] = base64.b64encode(publickey).decode("utf8")
  123. return site_data["encrypt_publickey_%s" % index]
  124. @PluginManager.registerTo("Actions")
  125. class ActionsPlugin:
  126. publickey = "A3HatibU4S6eZfIQhVs2u7GLN5G9wXa9WwlkyYIfwYaj"
  127. privatekey = "5JBiKFYBm94EUdbxtnuLi6cvNcPzcKymCUHBDf2B6aq19vvG3rL"
  128. utf8_text = '\xc1rv\xedzt\xfbr\xf5t\xfck\xf6rf\xfar\xf3g\xe9p'
  129. def getBenchmarkTests(self, online=False):
  130. if hasattr(super(), "getBenchmarkTests"):
  131. tests = super().getBenchmarkTests(online)
  132. else:
  133. tests = []
  134. aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey) # Warm-up
  135. tests.extend([
  136. {"func": self.testCryptEciesEncrypt, "kwargs": {}, "num": 100, "time_standard": 1.2},
  137. {"func": self.testCryptEciesDecrypt, "kwargs": {}, "num": 500, "time_standard": 1.3},
  138. {"func": self.testCryptEciesDecryptMulti, "kwargs": {}, "num": 5, "time_standard": 0.68},
  139. {"func": self.testCryptAesEncrypt, "kwargs": {}, "num": 10000, "time_standard": 0.27},
  140. {"func": self.testCryptAesDecrypt, "kwargs": {}, "num": 10000, "time_standard": 0.25}
  141. ])
  142. return tests
  143. def testCryptEciesEncrypt(self, num_run=1):
  144. for i in range(num_run):
  145. aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey)
  146. assert len(aes_key) == 32
  147. yield "."
  148. def testCryptEciesDecrypt(self, num_run=1):
  149. aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey)
  150. for i in range(num_run):
  151. assert len(aes_key) == 32
  152. decrypted = CryptMessage.eciesDecrypt(base64.b64encode(encrypted), self.privatekey)
  153. assert decrypted == self.utf8_text.encode("utf8"), "%s != %s" % (decrypted, self.utf8_text.encode("utf8"))
  154. yield "."
  155. def testCryptEciesDecryptMulti(self, num_run=1):
  156. yield "x 100 (%s threads) " % config.threads_crypt
  157. aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey)
  158. threads = []
  159. for i in range(num_run):
  160. assert len(aes_key) == 32
  161. threads.append(gevent.spawn(
  162. CryptMessage.eciesDecryptMulti, [base64.b64encode(encrypted)] * 100, self.privatekey
  163. ))
  164. for thread in threads:
  165. res = thread.get()
  166. assert res[0] == self.utf8_text, "%s != %s" % (res[0], self.utf8_text)
  167. assert res[0] == res[-1], "%s != %s" % (res[0], res[-1])
  168. yield "."
  169. gevent.joinall(threads)
  170. def testCryptAesEncrypt(self, num_run=1):
  171. for i in range(num_run):
  172. key = os.urandom(32)
  173. encrypted = sslcrypto.aes.encrypt(self.utf8_text.encode("utf8"), key)
  174. yield "."
  175. def testCryptAesDecrypt(self, num_run=1):
  176. key = os.urandom(32)
  177. encrypted_text, iv = sslcrypto.aes.encrypt(self.utf8_text.encode("utf8"), key)
  178. for i in range(num_run):
  179. decrypted = sslcrypto.aes.decrypt(encrypted_text, iv, key).decode("utf8")
  180. assert decrypted == self.utf8_text
  181. yield "."