Thu Apr 6 23:38:13 UTC 2017

commit 3828bd1d72c5fd3d3e0bac4548aee9594f63b3ca
Author: Rob Austein <sra at hactrn.net>
AuthorDate: Thu Apr 6 19:31:12 2017 -0400

    Initial test and debug script for key backup.
    In the long run this code will fork for two different purposes:
    a) A user backup script, which need not handle ASN.1 or crypto, thus
       can (and should) be really simple; and
    b) Unit test code for the key export and import functions.
    So far, hal_rpc_pkey_export() appears to be working, at least under
    light testing (result of unpacking everything looks like a key,
    anyway, haven't tested the unpacked key yet).
    Still need to write test code for hal_rpc_pkey_import().
 key-backup.py | 232 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 232 insertions(+)

diff --git a/key-backup.py b/key-backup.py
new file mode 100644
index 0000000..41b8f10
--- /dev/null
+++ b/key-backup.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python
+# Test of key backup code, will evolve into unit tests and a user
+# backup script after initial debugging.
+#    KEY SOURCE                            KEY BACKUP
+#                                          Generate and export KEKEK:
+#                                               hal_rpc_pkey_generate_rsa()
+#                                               hal_rpc_pkey_get_public_key()
+#   Load KEKEK public   <----------------  Export KEKEK public
+#                       {
+#                               "kekek-uuid":   "[UUID]",
+#                               "kekek":        "[Base64]"
+#                       }
+#       hal_rpc_pkey_load()
+#       hal_rpc_pkey_export()
+#   Export PKCS #8 and KEK   ---------->   Load PKCS #8 and KEK, import key:
+#                       {
+#                               "kekek-uuid":   "[UUID]",
+#                               "pkey":         "[Base64]",
+#                               "kek":          "[Base64]"
+#                       }
+#                                               hal_rpc_pkey_import()
+from libhal import *
+from Crypto.PublicKey   import RSA
+from Crypto.Cipher      import AES, PKCS1_v1_5
+from Crypto.Util.asn1   import DerObject, DerSequence, DerNull, DerOctetString, DerObjectId
+from Crypto.Random      import new as csprng
+from struct             import pack, unpack
+def dumpasn1(der):
+    from subprocess import call
+    from tempfile import NamedTemporaryFile
+    with NamedTemporaryFile() as f:
+        f.write(der)
+        f.flush()
+        call(("dumpasn1", "-aop", f.name))
+def t(x):
+    return filter(x.isType, x.typeTags)[0]
+hal_asn1_oid_rsaEncryption = "\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01"
+hal_asn1_oid_aesKeyWrap    = "\x60\x86\x48\x01\x65\x03\x04\x01\x30"
+kek_length = 256/8      # We can determine this from the keywrap OID, this is for AES-256
+def main():
+    kekek = RSA.importKey(kekek_pem)
+    from atexit import register as atexit
+    hsm = HSM()
+    #hsm.debug_io = args.io_log
+    hsm.login(HAL_USER_WHEEL, "fnord")
+    atexit(hsm.logout)
+    kekek_handle = hsm.pkey_load(type  = HAL_KEY_TYPE_RSA_PUBLIC,
+                                 curve = HAL_CURVE_NONE,
+                                 der   = kekek.publickey().exportKey(format = "DER"))
+    atexit(kekek_handle.delete)
+    print "KEKEK:", kekek_handle.uuid
+    pkey_handle = hsm.pkey_generate_ec(HAL_CURVE_P256, HAL_KEY_FLAG_EXPORTABLE)
+    atexit(pkey_handle.delete)
+    print "PKey: ", pkey_handle.uuid
+    pkcs8_der, kek_der = kekek_handle.export_pkey(pkey_handle)
+    pkcs8_alg, pkcs8_data = show_encrypted_pkcs8("PKCS #8:", pkcs8_der, hal_asn1_oid_aesKeyWrap)
+    kek_alg,   kek_data   = show_encrypted_pkcs8("KEK:    ", kek_der,   hal_asn1_oid_rsaEncryption)
+    # Voodoo to defend against Bleichenbacher Attack per Crypto.Cipher.PKCS1_v1_5 API documentation
+    kek = PKCS1_v1_5.new(kekek).decrypt(kek_data, csprng().read(kek_length))
+    pkey = AESKeyWrapWithPadding(kek).unwrap(pkcs8_data)
+    dumpasn1(pkey)
+def show_encrypted_pkcs8(label, der, oid):
+    print
+    print label
+    dumpasn1(der)
+    print
+    result = parse_encrypted_pkcs8(der)
+    for name, value in zip(("algorithm", "encryptedData"), result):
+        print "{:14s} {}".format(name, "-".join("{:02x}".format(ord(v)) for v in value))
+    print
+    print "OID {}".format("matches" if result[0] == oid else "doesn't match")
+    return result
+def parse_encrypted_pkcs8(der):
+    encryptedPrivateKeyInfo = DerSequence()
+    encryptedPrivateKeyInfo.decode(der)
+    encryptionAlgorithm = DerSequence()
+    encryptionAlgorithm.decode(encryptedPrivateKeyInfo[0])
+    algorithm = DerObjectId()
+    algorithm.decode(encryptionAlgorithm[0])
+    encryptedData = DerObject()
+    encryptedData.decode(encryptedPrivateKeyInfo[1])
+    return algorithm.payload, encryptedData.payload
+class AESKeyWrapWithPadding(object):
+    """
+    Implementation of AES Key Wrap With Padding from RFC 5649.
+    """
+    class UnwrapError(Exception):
+        "Something went wrong during unwrap."
+    def __init__(self, key):
+        self.ctx = AES.new(key, AES.MODE_ECB)
+    def _encrypt(self, b1, b2):
+        aes_block = self.ctx.encrypt(b1 + b2)
+        return aes_block[:8], aes_block[8:]
+    def _decrypt(self, b1, b2):
+        aes_block = self.ctx.decrypt(b1 + b2)
+        return aes_block[:8], aes_block[8:]
+    @staticmethod
+    def _start_stop(start, stop):               # Syntactic sugar
+        step = -1 if start > stop else 1
+        return xrange(start, stop + step, step)
+    def wrap(self, Q):
+        "RFC 5649 section 4.1."
+        m = len(Q)                              # Plaintext length
+        if m % 8 != 0:                          # Pad Q if needed
+            Q += "\x00" * (8 - (m % 8))
+        R = [pack(">LL", 0xa65959a6, m)]        # Magic MSB(32,A), build LSB(32,A)
+        R.extend(Q[i : i + 8]                   # Append Q
+                 for i in xrange(0, len(Q), 8))
+        n = len(R) - 1
+        if n == 1:
+            R[0], R[1] = self._encrypt(R[0], R[1])
+        else:
+            # RFC 3394 section 2.2.1
+            for j in self._start_stop(0, 5):
+                for i in self._start_stop(1, n):
+                    R[0], R[i] = self._encrypt(R[0], R[i])
+                    W0, W1 = unpack(">LL", R[0])
+                    W1 ^= n * j + i
+                    R[0] = pack(">LL", W0, W1)
+        assert len(R) == (n + 1) and all(len(r) == 8 for r in R)
+        return "".join(R)
+    def unwrap(self, C):
+        "RFC 5649 section 4.2."
+        if len(C) % 8 != 0:
+            raise self.UnwrapError("Ciphertext length {} is not an integral number of blocks"
+                                   .format(len(C)))
+        n = (len(C) / 8) - 1
+        R = [C[i : i + 8] for i in xrange(0, len(C), 8)]
+        if n == 1:
+            R[0], R[1] = self._decrypt(R[0], R[1])
+        else:
+            # RFC 3394 section 2.2.2 steps (1), (2), and part of (3)
+            for j in self._start_stop(5, 0):
+                for i in self._start_stop(n, 1):
+                    W0, W1 = unpack(">LL", R[0])
+                    W1 ^= n * j + i
+                    R[0] = pack(">LL", W0, W1)
+                    R[0], R[i] = self._decrypt(R[0], R[i])
+        magic, m = unpack(">LL", R[0])
+        if magic != 0xa65959a6:
+            raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:02x}"
+                              .format(magic))
+        if m <= 8 * (n - 1) or m > 8 * n:
+            raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n))
+        R = "".join(R[1:])
+        assert len(R) ==  8 * n
+        if any(r != "\x00" for r in R[m:]):
+            raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex")))
+        return R[:m]
+# Static KEKEK for testing, this should come from the backup HSM.
+kekek_pem = '''\
+-----END PRIVATE KEY-----
+if __name__ == "__main__":
+    main()

