[Cryptech-Commits] [sw/stm32] 01/01: Untested conversion to support Python 3

git at cryptech.is git at cryptech.is
Mon May 25 23:57:39 UTC 2020


This is an automated email from the git hooks/post-receive script.

sra at hactrn.net pushed a commit to branch python3
in repository sw/stm32.

commit f948d674351a2ca87d33c0e0d8558cbbb2f59682
Author: Rob Austein <sra at hactrn.net>
AuthorDate: Mon May 25 19:33:47 2020 -0400

    Untested conversion to support Python 3
---
 bin/dfu                        |  8 +++---
 projects/cli-test/filetransfer | 18 ++++++------
 projects/hsm/cryptech_probe    | 21 +++++++-------
 projects/hsm/cryptech_upload   | 65 ++++++++++++++++++++----------------------
 4 files changed, 55 insertions(+), 57 deletions(-)

diff --git a/bin/dfu b/bin/dfu
index e270438..d37d3ba 100755
--- a/bin/dfu
+++ b/bin/dfu
@@ -43,7 +43,7 @@ import struct
 import serial
 import argparse
 
-from binascii import crc32
+from binascii import crc32, hexlify
 
 CHUNK_SIZE = 4096
 
@@ -80,20 +80,20 @@ def _write(dst, data):
         dst.write(data[i])
         time.sleep(0.1)
     if len(data) == 4:
-        print("Wrote 0x{:02x}{:02x}{:02x}{:02x}".format(ord(data[0]), ord(data[1]), ord(data[2]), ord(data[3])))
+        print("Wrote 0x{}".format(hexlify(data)))
     else:
         print("Wrote {!r}".format(data))
 
 
 def _read(dst, verbose=False):
-    res = ''
+    res = b''
     while True:
         x = dst.read(1)
         if not x:
             break
         res += x
     if res and verbose:
-        print ("Read {!r}".format(res))
+        print("Read {!r}".format(res))
     return res
 
 
diff --git a/projects/cli-test/filetransfer b/projects/cli-test/filetransfer
index f704e31..147f081 100755
--- a/projects/cli-test/filetransfer
+++ b/projects/cli-test/filetransfer
@@ -37,7 +37,7 @@ import struct
 import serial
 import argparse
 
-from binascii import crc32
+from binascii import crc32, hexlify
 
 CHUNK_SIZE = 256
 DFU_CHUNK_SIZE = 256
@@ -79,19 +79,19 @@ def parse_args():
 def _write(dst, data):
     dst.write(data)
     if len(data) == 4:
-        print("Wrote 0x{!s}".format(data.encode('hex')))
+        print(("Wrote 0x{!s}".format(hexlify(data))))
     else:
-        print("Wrote {!r}".format(data))
+        print(("Wrote {!r}".format(data)))
 
 
 def _read(dst):
-    res = ''
+    res = b''
     while True:
         x = dst.read(1)
         if not x:
             break
         res += x
-    print ("Read {!r}".format(res))
+    print(("Read {!r}".format(res)))
     return res
 
 
@@ -142,19 +142,19 @@ def send_file(filename, args, dst):
         if not data:
             break
         dst.write(data)
-        print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size)))
+        print(("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter, int(size / chunk_size))))
         # read ACK (a counter of number of 4k chunks received)
         while True:
             ack_bytes = dst.read(4)
             if len(ack_bytes) == 4:
                 break
-            print('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes))
+            print(('ERROR: Did not receive an ACK, got {!r}'.format(ack_bytes)))
             dst.write('\r')  # eventually get back to the CLI prompt
         ack = struct.unpack('<I', ack_bytes)[0]
         if ack != counter + 1:
-            print('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter))
+            print(('ERROR: Did not receive the expected counter as ACK (got {!r}/{!r}, not {!r})'.format(ack, ack_bytes, counter)))
             flush = dst.read(100)
-            print('FLUSH data: {!r}'.format(flush))
+            print(('FLUSH data: {!r}'.format(flush)))
             return False
         counter += 1
 
diff --git a/projects/hsm/cryptech_probe b/projects/hsm/cryptech_probe
index ccee40a..3d14484 100755
--- a/projects/hsm/cryptech_probe
+++ b/projects/hsm/cryptech_probe
@@ -38,6 +38,7 @@ goes to stderr.
 import sys
 import time
 import argparse
+import binascii
 import serial.tools.list_ports_posix
 
 class positive_integer(int):
@@ -52,16 +53,16 @@ parser.add_argument("--no-cleanup",       action = "store_true",   help = "don't
 parser.add_argument("--read-buffer-size", type = positive_integer, help = "size of read buffer", default = 1024)
 args = parser.parse_args()
 
-SLIP_END     = chr(0300)        # Indicates end of SLIP packet
-SLIP_ESC     = chr(0333)        # Indicates byte stuffing
-SLIP_ESC_END = chr(0334)        # ESC ESC_END means END data byte
-SLIP_ESC_ESC = chr(0335)        # ESC ESC_ESC means ESC data byte
+SLIP_END     = b"\300"          # Indicates end of SLIP packet
+SLIP_ESC     = b"\333"          # Indicates byte stuffing
+SLIP_ESC_END = b"\334"          # ESC ESC_END means END data byte
+SLIP_ESC_ESC = b"\335"          # ESC ESC_ESC means ESC data byte
 
-Control_U    = chr(0025)        # Console: clear line
-Control_M    = chr(0015)        # Console: end of line
+Control_U    = b"\025"          # Console: clear line
+Control_M    = b"\015"          # Console: end of line
 
-RPC_query    = chr(0) * 8       # client_handle = 0, function code = RPC_FUNC_GET_VERSION
-RPC_reply    = chr(0) * 12      # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK
+RPC_query    = b"\0" * 8        # client_handle = 0, function code = RPC_FUNC_GET_VERSION
+RPC_reply    = b"\0" * 12       # opcode = RPC_FUNC_GET_VERSION, client_handle = 0, valret = HAL_OK
 
 # This is the query string we send to each USB port we find.  It's
 # intended to be relatively harmless, at least for either of the HSM
@@ -101,11 +102,11 @@ for port in ports:
 
     response = tty.read(args.read_buffer_size)
     if args.debug:
-        sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, ":".join("{:02x}".format(ord(c)) for c in response)))
+        sys.stderr.write("Received from {}: {!r} ({})\n".format(port, response, ":".join(binascii.hexlify(c) for c in response)))
 
     # Check whether we got a known console prompt.
 
-    is_cty = any(prompt in response for prompt in ("Username:", "Password:", "cryptech>"))
+    is_cty = any(prompt in response for prompt in (b"Username:", b"Password:", b"cryptech>"))
 
     # Check whether we got something that looks like the response to an RPC version query.
     # We skip over the version value itself, as it might change, but we check that it's
diff --git a/projects/hsm/cryptech_upload b/projects/hsm/cryptech_upload
index b6e02bd..0c18f25 100755
--- a/projects/hsm/cryptech_upload
+++ b/projects/hsm/cryptech_upload
@@ -142,18 +142,15 @@ class ManagementPortAbstract(object):
         self.args = args
 
     def write(self, data):
-        numeric = isinstance(data, (int, long))
+        numeric = isinstance(data, int)
         if numeric:
             data = struct.pack("<I", data)
         self.send(data)
         if self.args.debug:
-            if numeric:
-                print("Wrote 0x{!s}".format(data.encode("hex")))
-            else:
-                print("Wrote {!r}".format(data))
+            print("Wrote {!r}".format(data))
 
     def read(self):
-        res = ""
+        res = b""
         x = self.recv()
         while not x:
             x = self.recv()
@@ -161,26 +158,26 @@ class ManagementPortAbstract(object):
             res += x
             x = self.recv()
         if self.args.debug:
-            print ("Read {!r}".format(res))
+            print("Read {!r}".format(res))
         return res
 
     def execute(self, cmd):
-        self.write("\r")
+        self.write(b"\r")
         prompt = self.read()
         #if prompt.endswith("This is the bootloader speaking..."):
         #    prompt = self.read()
         if prompt.endswith("Username: "):
-            self.write(self.args.username + "\r")
+            self.write(self.args.username.encode("ascii") + b"\r")
             prompt = self.read()
-            if prompt.endswith("Password: "):
+            if prompt.endswith(b"Password: "):
                 if not self.args.pin or self.args.separate_pins:
                     self.args.pin = getpass.getpass("{} PIN: ".format(self.args.username))
-                self.write(self.args.pin + "\r")
+                self.write(self.args.pin.encode("ascii") + b"\r")
                 prompt = self.read()
-        if not prompt.endswith(("> ", "# ")):
+        if not prompt.endswith((b"> ", b"# ")):
             print("Device does not seem to be ready for a file transfer (got {!r})".format(prompt))
             return prompt
-        self.write(cmd + "\r")
+        self.write(cmd.encode("ascii") + b"\r")
         response = self.read()
         return response
 
@@ -227,7 +224,7 @@ class ManagementPortSocket(ManagementPortAbstract):
         try:
             return self.socket.recv(1)
         except socket.timeout:
-            return ""
+            return b""
 
     def set_timeout(self, timeout):
         self.socket.settimeout(timeout)
@@ -244,19 +241,19 @@ def send_file(src, size, args, dst):
 
     if args.fpga:
         chunk_size = FPGA_CHUNK_SIZE
-        response = dst.execute("fpga bitstream upload")
+        response = dst.execute(b"fpga bitstream upload")
     elif args.firmware:
         chunk_size = FIRMWARE_CHUNK_SIZE
-        response = dst.execute("firmware upload")
-        if "Rebooting" in response:
-            response = dst.execute("firmware upload")
+        response = dst.execute(b"firmware upload")
+        if b"Rebooting" in response:
+            response = dst.execute(b"firmware upload")
     elif args.bootloader:
         chunk_size = FIRMWARE_CHUNK_SIZE
-        response = dst.execute("bootloader upload")
-    if "Access denied" in response:
-        print "Access denied"
+        response = dst.execute(b"bootloader upload")
+    if b"Access denied" in response:
+        print("Access denied")
         return False
-    if not "OK" in response:
+    if not b"OK" in response:
         print("Device did not accept the upload command (got {!r})".format(response))
         return False
 
@@ -266,19 +263,19 @@ def send_file(src, size, args, dst):
     # 1. Write size of file (4 bytes)
     dst.write(struct.pack("<I", size))
     response = dst.read()
-    if not response.startswith("Send "):
-        print response
+    if not response.startswith(b"Send "):
+        print(response)
         return False
 
     # 2. Write file contents while calculating CRC-32
     chunks = int((size + chunk_size - 1) / chunk_size)
-    for counter in xrange(chunks):
+    for counter in range(chunks):
         data = src.read(chunk_size)
         dst.write(data)
         if not args.quiet:
             print("Wrote {!s} bytes (chunk {!s}/{!s})".format(len(data), counter + 1, chunks))
         # read ACK (a counter of number of 4k chunks received)
-        ack_bytes = ""
+        ack_bytes = b""
         while len(ack_bytes) < 4:
             ack_bytes += dst.read()
         ack = struct.unpack("<I", ack_bytes[:4])[0]
@@ -293,16 +290,16 @@ def send_file(src, size, args, dst):
     dst.write(struct.pack("<I", crc))
     response = dst.read()
     if not args.quiet:
-        print response
+        print(response)
 
     src.close()
 
     if args.fpga:
         # tell the fpga to read its new configuration
-        dst.execute("fpga reset")
+        dst.execute(b"fpga reset")
         # log out of the CLI
         # (firmware/bootloader upgrades reboot, don't need an exit)
-        dst.execute("exit")
+        dst.execute(b"exit")
 
     return True
 
@@ -332,7 +329,7 @@ def main():
     if args.bootloader:
         if not args.simon_says_whack_my_bootloader:
             sys.exit("You didn't say \"Simon says\"")
-        print dire_bootloader_warning
+        print(dire_bootloader_warning)
         args.pin = None
 
     if args.explicit_image is None and args.firmware_tarball is None:
@@ -344,12 +341,12 @@ def main():
         if size == 0:                   # Flashing from stdin won't work, sorry
             sys.exit("Can't flash from a pipe or zero-length file")
         if not args.quiet:
-            print "Uploading from explicitly-specified file {}".format(args.explicit_image.name)
+            print("Uploading from explicitly-specified file {}".format(args.explicit_image.name))
 
     else:
         tar = tarfile.open(fileobj = args.firmware_tarball)
         if not args.quiet:
-            print "Firmware tarball {} content:".format(args.firmware_tarball.name)
+            print("Firmware tarball {} content:".format(args.firmware_tarball.name))
         tar.list(True)
         if args.fpga:
             name = "alpha_fmc.bit"
@@ -366,10 +363,10 @@ def main():
             sys.exit("Expected component {} missing from firmware tarball {}".format(name, args.firmware_tarball.name))
         src = tar.extractfile(name)
         if not args.quiet:
-            print "Uploading {} from {}".format(name, args.firmware_tarball.name)
+            print("Uploading {} from {}".format(name, args.firmware_tarball.name))
 
     if not args.quiet:
-        print "Initializing management port and synchronizing with HSM, this may take a few seconds"
+        print("Initializing management port and synchronizing with HSM, this may take a few seconds")
     try:
         dst = ManagementPortSocket(args, timeout = 1)
     except socket.error as e:



More information about the Commits mailing list