[Cryptech-Commits] [test/novena_trng] 01/01: Adding new data extractor from the trng. This program does check the status bit before reading data. The program can generate both text and binary formats.

git at cryptech.is git at cryptech.is
Fri Dec 19 14:24:41 UTC 2014


This is an automated email from the git hooks/post-receive script.

joachim at secworks.se pushed a commit to branch master
in repository test/novena_trng.

commit bff7e578ee043501a90cd2268c2980b99aa019ce
Author: Joachim Strömbergson <joachim at secworks.se>
Date:   Fri Dec 19 15:24:32 2014 +0100

    Adding new data extractor from the trng. This program does check the status bit before reading data. The program can generate both text and binary formats.
---
 src/sw/trng_extract.py | 348 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 348 insertions(+)

diff --git a/src/sw/trng_extract.py b/src/sw/trng_extract.py
new file mode 100755
index 0000000..0da8ae8
--- /dev/null
+++ b/src/sw/trng_extract.py
@@ -0,0 +1,348 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#=======================================================================
+#
+# trng_extract.py
+# --------------
+# This program extracts data values from the trng. The program supports
+# reading from the entropy providers as well as the rng output.
+# Extraxted data can be delivered in text or binary form.
+#
+#
+# Author: Joachim Strömbergson, Paul Sekirk
+# Copyright (c) 2014, SUNET
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+#    notice, this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in
+#    the documentation and/or other materials provided with the
+#    distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+#=======================================================================
+
+#-------------------------------------------------------------------
+# Python module imports.
+#-------------------------------------------------------------------
+import sys
+import io
+import fcntl
+import argparse
+
+
+#-------------------------------------------------------------------
+# Defines.
+#-------------------------------------------------------------------
+
+# Output control. Shall be flags set by the argument parser.
+VERBOSE       = False
+DEBUG         = False
+BINARY_OUTPUT = True
+
+
+# TRNG defines.
+TRNG_PREFIX       = 0x00
+TRNG_ADDR_NAME0   = 0x00
+TRNG_ADDR_NAME1   = 0x01
+TRNG_ADDR_VERSION = 0x02
+ENT1_PREFIX       = 0x05
+CSPRNG_PREFIX     = 0x0b
+CSPRNG_DATA       = 0x20
+
+
+# ENT1 defines. This is the Avalanche noise based entropy provider.
+ENT11_PREFIX      = 0x05
+ENT1_NOISE        = 0x20
+ENT1_DELTA        = 0x30
+
+
+# ENT2 defines. This is the ROSC entropy provider.
+ENT2_PREFIX       = 0x06
+ENT2_ADDR_NAME0   = 0x00
+ENT2_ADDR_NAME1   = 0x01
+ENT2_ADDR_VERSION = 0x02
+ENT2_DATA         = 0x20
+ENT2_CTRL         = 0x10
+ENT2_STATUS       = 0x11
+ENT2_ENT_DATA     = 0x20
+ENT2_ENT_RAW      = 0x21
+ENT2_ROSC_OUT     = 0x22
+
+
+# Mixer defines
+MIXER_PREFIX      = 0x0a
+
+
+# CSPRNG defines
+CSPRNG_PREFIX     = 0x0b
+
+
+
+# Command codes
+SOC                   = 0x55
+EOC                   = 0xaa
+READ_CMD              = 0x10
+WRITE_CMD             = 0x11
+RESET_CMD             = 0x01
+
+
+# Response codes
+SOR                   = 0xaa
+EOR                   = 0x55
+READ_OK               = 0x7f
+WRITE_OK              = 0x7e
+RESET_OK              = 0x7d
+UNKNOWN               = 0xfe
+ERROR                 = 0xfd
+
+
+# I2C interface defines
+# from /usr/include/linux/i2c-dev.h
+I2C_SLAVE = 0x0703
+I2C_DEVICE = "/dev/i2c-2"
+I2C_ADDR   = 0x0f
+
+
+# Number of 32 bit data words extracted in a run.
+# Should be set by the arg parser.
+NUM_WORDS = 40000000
+
+
+#----------------------------------------------------------------
+# hexlist()
+#
+# Helper function to cretae a list of hex numbers from a
+# given list of values.
+#----------------------------------------------------------------
+def hexlist(list):
+    return "[ " + ' '.join('%02x' % b for b in list) + " ]"
+
+
+#----------------------------------------------------------------
+# I2C class
+#
+# Handles the actual device including reading and writing
+# bytes from the device.
+#----------------------------------------------------------------
+class I2C:
+    # file handle for the i2c device
+    file = None
+
+    # constructor: initialize the i2c communications channel
+    def __init__(self, dev, addr):
+        self.dev = dev
+        self.addr = addr
+        try:
+            self.file = io.FileIO(self.dev, 'r+b')
+        except IOError as e:
+            print "Unable to open %s: %s" % (self.dev, e.strerror)
+            sys.exit(1)
+        try:
+            fcntl.ioctl(self.file, I2C_SLAVE, self.addr)
+        except IOError as e:
+            print "Unable to set I2C slave device 0x%02x: %s" % (self.addr, e.strerror)
+            sys.exit(1)
+
+    # destructor: close the i2c communications channel
+    def __del__(self):
+        if (self.file):
+            self.file.close()
+
+    # write a command to the i2c device
+    def write(self, buf):
+        if DEBUG:
+            print "write %s" % hexlist(buf)
+        self.file.write(bytearray(buf))
+
+    # read one response byte from the i2c device
+    def read(self):
+        # read() on the i2c device will only return one byte at a time,
+        # and tc.get_resp() needs to parse the response one byte at a time
+        return ord(self.file.read(1))
+
+
+#----------------------------------------------------------------
+# Commerror()
+#
+# Empty class exception eater.
+#----------------------------------------------------------------
+class Commerror(Exception):
+    pass
+
+
+#----------------------------------------------------------------
+# Comm
+#
+# Class for communicating with the HW via the I2C interface
+#----------------------------------------------------------------
+class Comm:
+    def __init__(self):
+        self.i2c = I2C(I2C_DEVICE, I2C_ADDR)
+
+    def send_write_cmd(self, prefix, addr, data):
+        buf = [SOC, WRITE_CMD, prefix, addr]
+        for s in (24, 16, 8, 0):
+            buf.append(data >> s & 0xff)
+        buf.append(EOC)
+        self.i2c.write(buf)
+
+    def send_read_cmd(self, prefix, addr):
+        buf = [SOC, READ_CMD, prefix, addr, EOC]
+        self.i2c.write(buf)
+
+    def get_resp(self):
+        buf = []
+        len = 2
+        i = 0
+        while i < len:
+            b = self.i2c.read()
+            if ((i == 0) and (b != SOR)):
+                # we've gotten out of sync, and there's probably nothing we can do
+                print "response byte 0: expected 0x%02x (SOR), got 0x%02x" % (SOR, b)
+                raise CommError()
+            elif (i == 1):        # response code
+                try:
+                    # anonymous dictionary of message lengths
+                    len = {READ_OK:9, WRITE_OK:5, RESET_OK:3, ERROR:4, UNKNOWN:4}[b]
+                except KeyError:  # unknown response code
+                    # we've gotten out of sync, and there's probably nothing we can do
+                    print "unknown response code 0x%02x" % b
+                    raise CommError()
+            buf.append(b)
+            i += 1
+        if DEBUG:
+            print "read  %s" % hexlist(buf)
+        return buf
+
+    def write_data(self, prefix, addr, data):
+        self.send_write_cmd(prefix, addr, data)
+        return self.get_resp()
+
+    def read_data(self, prefix, addr):
+        self.send_read_cmd(prefix, addr)
+        return self.get_resp()
+
+
+#----------------------------------------------------------------
+# print_data()
+#
+# Print either text or binary data to std out.
+#----------------------------------------------------------------
+def print_data(my_data):
+    my_bytes = my_data[4 : 8]
+
+    if (BINARY_OUTPUT):
+        for my_byte in my_bytes:
+            sys.stdout.write(chr(my_byte))
+
+    else:
+        print("0x%02x 0x%02x 0x%02x 0x%02x" %
+              (my_bytes[0], my_bytes[1], my_bytes[2], my_bytes[3]))
+
+
+#----------------------------------------------------------------
+# wait_ready()
+#
+# Wait for the ready bit in the status register for the
+# given core to be set accessible via the given device.
+#----------------------------------------------------------------
+def wait_ready(dev, prefix, addr):
+    my_status = False
+    while not my_status:
+        my_status = dev.read_data(prefix, addr)[7]
+        if VERBOSE:
+            print("Status: %s" % my_status)
+
+
+#----------------------------------------------------------------
+# get_avalanche_entropy()
+#----------------------------------------------------------------
+def get_avalanche_entropy(dev):
+    if VERBOSE:
+        print "Reading avalanche entropy data."
+
+    for i in range(NUM_WORDS):
+        dev.read_data(ENT1_PREFIX, ENT1_DATA)
+
+
+#----------------------------------------------------------------
+# get_rosc_entropy()
+#----------------------------------------------------------------
+def get_rosc_entropy(dev):
+    if VERBOSE:
+        print "Reading rosc entropy data."
+
+#    for i in range(NUM_WORDS):
+    for i in range(10):
+        wait_ready(dev, ENT2_PREFIX, ENT2_STATUS)
+        my_data = dev.read_data(ENT2_PREFIX, ENT2_ENT_DATA)
+        print_data(my_data)
+
+
+#----------------------------------------------------------------
+# looptest()
+#
+# Simple test that loops a large number of times to see
+# if the ready bit is ever cleared.
+#----------------------------------------------------------------
+def looptest(dev):
+    print("TRNG name: ", my_commdev.read_data(TRNG_PREFIX, TRNG_ADDR_NAME0))
+    print("ENT2 status: ", my_commdev.read_data(ENT2_PREFIX, ENT2_STATUS))
+
+    for i in range(100000000):
+        ent_data = dev.read_data(ENT2_PREFIX, ENT2_ENT_DATA)
+        ent_status = dev.read_data(ENT2_PREFIX, ENT2_STATUS)
+        if not ent_status[7]:
+            print("Got: %d" % ent_status[7])
+
+#----------------------------------------------------------------
+# main
+#----------------------------------------------------------------
+def main():
+    if VERBOSE:
+        print("Starting trng data extraction.")
+
+    my_commdev = Comm()
+
+    # looptest(my_commdev)
+
+#    get_avalanche_entropy()
+#    get_avalanche_delta()
+
+    get_rosc_entropy(my_commdev)
+#    get_rosc_raw()
+
+#    get_rng_data()
+
+
+#-------------------------------------------------------------------
+# __name__
+# Python thingy which allows the file to be run standalone as
+# well as parsed from within a Python interpreter.
+#-------------------------------------------------------------------
+if __name__=="__main__":
+    # Run the main function.
+    sys.exit(main())
+
+#=======================================================================
+# EOF trng_extract.py
+#=======================================================================



More information about the Commits mailing list