[Cryptech-Commits] [sw/libhal] 02/02: Add --soft-backup option to cryptech_backup.

git at cryptech.is git at cryptech.is
Sat Jun 3 15:15:55 UTC 2017


This is an automated email from the git hooks/post-receive script.

sra at hactrn.net pushed a commit to branch ks9
in repository sw/libhal.

commit 61029eb57165c181497c09549cc2dd0fa9928f16
Author: Rob Austein <sra at hactrn.net>
AuthorDate: Sat Jun 3 10:56:47 2017 -0400

    Add --soft-backup option to cryptech_backup.
    
    cryptech_backup is designed to help the user transfer keys from one
    Cryptech HSM to another, but what is is a user who has no second HSM
    supposed to do for backup?  The --soft-backup option enables a mode in
    which cryptech_backup generates its own KEKEK instead of getting one
    from the (nonexistent) target HSM.  We make a best-effort attempt to
    keep this soft KEKEK secure, by wrapping it with a symmetric key
    derived from a passphrase, using AESKeyWrapWithPadding and PBKDF2,
    but there's a limit to what a software-only solution can do here.
    
    The --soft-backup code depends (heavily) on PyCrypto.
---
 cryptech_backup | 187 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 183 insertions(+), 4 deletions(-)

diff --git a/cryptech_backup b/cryptech_backup
index f14f119..76cdfbb 100755
--- a/cryptech_backup
+++ b/cryptech_backup
@@ -17,6 +17,15 @@ sure only to export keys using a KEKEK known to have been generated by
 the target HSM.  See the unit tests in the source repository for
 an example of how to fake this in a few lines of Python.
 
+We also implement a software-based variant on this backup mechanism,
+for cases where there is no second HSM.  The protocol is much the
+same, but the KEKEK is generated in software and encrypted using a
+symmetric key derived from a passphrase using PBKDF2.  This requires
+the PyCrypto library, and is only as secure as memory on the machine
+where you're running it (so it's theoretically vulnerable to root or
+anybody with access to /dev/mem).  Don't use this mode unless you
+understand the risks, and see the "NOTE WELL" above.
+
 YOU HAVE BEEN WARNED.  Be careful out there.
 """
 
@@ -72,6 +81,11 @@ def main():
         "-u", "--uuid",
         help    = "UUID of existing KEKEK to use")
 
+    setup_mutex_group.add_argument(
+        "-s", "--soft-backup",
+        action = "store_true",
+        help   = "software-based backup, see warnings")
+
     setup_parser.add_argument(
         "-k", "--keylen",
         type    = int,
@@ -147,9 +161,11 @@ def cmd_setup(args, hsm):
     """
 
     result = {}
+    uuids  = []
 
-    uuids = []
-    if args.uuid:
+    if args.soft_backup:
+        SoftKEKEK.generate(args, result)
+    elif args.uuid:
         uuids.append(args.uuid)
     elif not args.new:
         uuids.extend(hsm.pkey_match(
@@ -177,7 +193,11 @@ def cmd_setup(args, hsm):
     if not result:
         sys.exit("Could not find suitable KEKEK")
 
-    result.update(comment = "KEKEK public key")
+    if args.soft_backup:
+        result.update(comment = "KEKEK software keypair")
+    else:
+        result.update(comment = "KEKEK public key")
+
     json.dump(result, args.output, indent = 4, sort_keys = True)
     args.output.write("\n")
 
@@ -249,7 +269,14 @@ def cmd_import(args, hsm):
     """
 
     db = json.load(args.input)
-    with hsm.pkey_open(uuid.UUID(db["kekek_uuid"]).bytes) as kekek:
+
+    soft_key = SoftKEKEK.is_soft_key(db)
+
+    with (hsm.pkey_load(SoftKEKEK.recover(db), HAL_KEY_FLAG_USAGE_KEYENCIPHERMENT)
+          if soft_key else
+          hsm.pkey_open(uuid.UUID(db["kekek_uuid"]).bytes)
+    ) as kekek:
+
         for k in db["keys"]:
             pkcs8 = b64join(k.get("pkcs8", ""))
             spki  = b64join(k.get("spki",  ""))
@@ -262,6 +289,158 @@ def cmd_import(args, hsm):
                 with hsm.pkey_load(der = spki, flags = flags) as pkey:
                     print "Loaded {} as {}".format(k["uuid"], pkey.uuid)
 
+        if soft_key:
+            kekek.delete()
+
+
+class AESKeyWrapWithPadding(object):
+    """
+    Implementation of AES Key Wrap With Padding from RFC 5649.
+    """
+
+    class UnwrapError(Exception):
+        "Something went wrong during unwrap."
+
+    def __init__(self, key):
+        from Crypto.Cipher import AES
+        self.ctx = AES.new(key, AES.MODE_ECB)
+
+    def _encrypt(self, b1, b2):
+        aes_block = self.ctx.encrypt(b1 + b2)
+        return aes_block[:8], aes_block[8:]
+
+    def _decrypt(self, b1, b2):
+        aes_block = self.ctx.decrypt(b1 + b2)
+        return aes_block[:8], aes_block[8:]
+
+    @staticmethod
+    def _start_stop(start, stop):               # Syntactic sugar
+        step = -1 if start > stop else 1
+        return xrange(start, stop + step, step)
+
+    @staticmethod
+    def _xor(R0, t):
+        from struct import pack, unpack
+        return pack(">Q", unpack(">Q", R0)[0] ^ t)
+
+    def wrap(self, Q):
+        "RFC 5649 section 4.1."
+        from struct import pack
+        m = len(Q)                              # Plaintext length
+        if m % 8 != 0:                          # Pad Q if needed
+            Q += "\x00" * (8 - (m % 8))
+        R = [pack(">LL", 0xa65959a6, m)]        # Magic MSB(32,A), build LSB(32,A)
+        R.extend(Q[i : i + 8]                   # Append Q
+                 for i in xrange(0, len(Q), 8))
+        n = len(R) - 1
+        if n == 1:
+            R[0], R[1] = self._encrypt(R[0], R[1])
+        else:
+            # RFC 3394 section 2.2.1
+            for j in self._start_stop(0, 5):
+                for i in self._start_stop(1, n):
+                    R[0], R[i] = self._encrypt(R[0], R[i])
+                    R[0] = self._xor(R[0], n * j + i)
+        assert len(R) == (n + 1) and all(len(r) == 8 for r in R)
+        return "".join(R)
+
+    def unwrap(self, C):
+        "RFC 5649 section 4.2."
+        from struct import unpack
+        if len(C) % 8 != 0:
+            raise self.UnwrapError("Ciphertext length {} is not an integral number of blocks"
+                                   .format(len(C)))
+        n = (len(C) / 8) - 1
+        R = [C[i : i + 8] for i in xrange(0, len(C), 8)]
+        if n == 1:
+            R[0], R[1] = self._decrypt(R[0], R[1])
+        else:
+            # RFC 3394 section 2.2.2 steps (1), (2), and part of (3)
+            for j in self._start_stop(5, 0):
+                for i in self._start_stop(n, 1):
+                    R[0] = self._xor(R[0], n * j + i)
+                    R[0], R[i] = self._decrypt(R[0], R[i])
+        magic, m = unpack(">LL", R[0])
+        if magic != 0xa65959a6:
+            raise self.UnwrapError("Magic value in AIV should have been 0xa65959a6, was 0x{:02x}"
+                              .format(magic))
+        if m <= 8 * (n - 1) or m > 8 * n:
+            raise self.UnwrapError("Length encoded in AIV out of range: m {}, n {}".format(m, n))
+        R = "".join(R[1:])
+        assert len(R) ==  8 * n
+        if any(r != "\x00" for r in R[m:]):
+            raise self.UnwrapError("Nonzero trailing bytes {}".format(R[m:].encode("hex")))
+        return R[:m]
+
+
+class SoftKEKEK(object):
+    """
+    Wrapper around all the goo we need to implement soft backups.
+    Requires PyCrypto on about every other line.
+    """
+
+    oid_aesKeyWrap = "\x60\x86\x48\x01\x65\x03\x04\x01\x30"
+
+    def parse_EncryptedPrivateKeyInfo(self, der):
+        from Crypto.Util.asn1 import DerObject, DerSequence, DerOctetString, DerObjectId
+        encryptedPrivateKeyInfo = DerSequence()
+        encryptedPrivateKeyInfo.decode(der)
+        encryptionAlgorithm = DerSequence()
+        algorithm = DerObjectId()
+        encryptedData = DerOctetString()
+        encryptionAlgorithm.decode(encryptedPrivateKeyInfo[0])
+        DerObject.decode(algorithm, encryptionAlgorithm[0])
+        DerObject.decode(encryptedData, encryptedPrivateKeyInfo[1])
+        if algorithm.payload != self.oid_aesKeyWrap:
+            raise ValueError
+        return encryptedData.payload
+
+    def encode_EncryptedPrivateKeyInfo(self, der):
+        from Crypto.Util.asn1 import DerSequence, DerOctetString
+        return DerSequence([
+            DerSequence([
+                chr(0x06) + chr(len(self.oid_aesKeyWrap)) + self.oid_aesKeyWrap
+            ]).encode(),
+            DerOctetString(der).encode()
+        ]).encode()
+
+    def gen_salt(self, bytes = 16):
+        from Crypto import Random
+        return Random.new().read(bytes)
+
+    def wrapper(self, salt, keylen = 256, iterations = 8000):
+        from Crypto.Protocol.KDF import PBKDF2
+        from Crypto.Hash         import SHA256, HMAC
+        return AESKeyWrapWithPadding(PBKDF2(
+            password = getpass.getpass("KEKEK Passphrase: "),
+            salt     = salt,
+            dkLen    = keylen/8,
+            count    = iterations,
+            prf      = lambda p, s: HMAC.new(p, s, SHA256).digest()))
+
+    @classmethod
+    def is_soft_key(cls, db):
+        return all(k in db for k in ("kekek_pkcs8", "kekek_salt"))
+
+    @classmethod
+    def generate(cls, args, result):
+        from Crypto.PublicKey import RSA
+        self = cls()
+        k = RSA.generate(args.keylen)
+        salt  = self.gen_salt()
+        spki  = k.publickey().exportKey(format = "DER")
+        pkcs8 = self.encode_EncryptedPrivateKeyInfo(self.wrapper(salt).wrap(
+            k.exportKey(format = "DER", pkcs = 8)))
+        result.update(kekek_salt   = b64(salt),
+                      kekek_pkcs8  = b64(pkcs8),
+                      kekek_pubkey = b64(spki))
+
+    @classmethod
+    def recover(cls, db):
+        self = cls()
+        return self.wrapper(b64join(db["kekek_salt"])).unwrap(
+            self.parse_EncryptedPrivateKeyInfo(b64join(db["kekek_pkcs8"])))
+
 
 if __name__ == "__main__":
     main()



More information about the Commits mailing list