21.12.2011 16:00

Toradex Embedded Controller with Python in Linux

Toradex is making interesting small boards mainly for embedded applications. I've got the one called Robin Z510 with Atom Z510 CPU and Intel Poulsbo chipset. Toradex makes one more thing on the board and it is their own Toradex Embedded Controller (TEC). It provides some additional functionality. The TEC is generally an USB HID device. HID specification is long and complicated (and it is just a top of the pyramid) as it is made to describe in very general way, very large group of USB devices. HID description of the device is defining thru the set of nested Descriptors the structure of the data you may obtain from the device or you may want to write to device (e.g. settings). As I am a lowlevel man this was somewhat over complicated for me. I am used to work with bytes. Therefore I wrote my own code first in C then in Python to control some of the functionality in embedded controller.

Toradex is providing the OakLinux C++ code that may help you get started from the right point as it is using the object code to represent the HID data according to Descriptors, but this code is not complete and is not targeted mainly for TEC even thought TEC conforms to Oak and the code may be used as a base for your own code. As it is bit complicated and I was heading to have the code in Python I decided to go "my way" using HID RAW device exported by Linux kernel to access the bare bytes.

Toradex seems to be very open, even with their HW, while it is not "open source" HW, you may get many of the stuff around without any registraton, NDA or further burden.

This code example is not complete in any way. It is just an example, hopefully easy to undrestand, how to work with the TEC. It may hopefully save you a few hard days.

#!/usr/bin/python
"""
Code to control the Toradex Embedded Controller present on Toradex Robin Z5xx boards.

It has only limited functionality.
It's principles are based on C kernel code for hidraw devices.
It has very low level of abstraction and is not meant to be and API or library.

The terminology around the HID devices is bit hard to understand as it is using a cascade
of Descriptors describing the Reports(FEATURES)/EndPoins(INTERRUPT IN/OUT) formats. Therefore it is self describing.
This code does not use these HID structures. Its interpretation of the data is hardcoded.
"""

import struct, array, fcntl, glob, logging, optparse, time, signal, sys;

LOGGING_LEVELS = {'critical': logging.CRITICAL,
                  'error': logging.ERROR,
                  'warning': logging.WARNING,
                  'info': logging.INFO,
                  'debug': logging.DEBUG}

class struxx:
  """
  Example structure to help interpret the Descriptor datas are based on formated fields
  """
  _fields = None
  _format = None
  _buffer = None
  def __init__(self):
    self.reset()

  def __len__(self):
    return struct.calcsize(self._format)

  def __iter__(self):
    return [getattr(self, field) for field in self._fields.split(";")].__iter__()

  def reset(self):
    for field in self._fields.split(";"):
      setattr(self, field, 0)
    self._buffer = array.array('B', [0]*len(self))

  def pack(self):
    self._buffer = array.array('B', struct.pack(self._format, *self))

  def unpack(self):
    rv = struct.unpack(self._format, self._buffer)
    for i in range(len(rv)):
      setattr(self, self._fields.split(";")[i], rv[i])

  def ioctl(self, fd, ioctlno):
    self.pack()
    logging.debug('ioctlno %d' % ioctlno)
    rv = fcntl.ioctl(fd, ioctlno, self._buffer, True)
    self.unpack()
    return rv

class uint(struxx):
  _fields = "uint"
  _format = "I"
  def get_size(self, fd): return self.ioctl(fd, HIDIOCGRDESCSIZE)

class hidraw_report_descriptor(struxx):
  #HID_MAX_DESCRIPTOR_SIZE           4096
  _fields = "size;value"
  _format = "I4096c"

  def reset(self):
    self.size = 0
    self.value = '\0'*4096

  def pack(self):
    tmp = struct.pack("i", self.index) + self.value[:4096].ljust(4096, '\0')
    self._buffer = array.array('B', tmp)

  def unpack(self):
    self.index = struct.unpack("i", self._buffer[:4])
    self.value = self._buffer[4:].tostring()

  def get_string(self, fd, idx):
    self.index = idx
    return self.ioctl(fd, HIDIOCGRDESC)

class hidraw_devinfo(struxx):
  """
  Hidraw device info is interpreted using the format and structure
  """
  _fields = "bustype;vendor;product"
  _format = "Ihh"
  def get(self, fd): return self.ioctl(fd, HIDIOCGRAWINFO)

"""
Kernel definitions for ioctl commands come from the ioctl.h of Linux kernel
"""
_IOC_NRBITS   = 8
_IOC_TYPEBITS = 8
_IOC_SIZEBITS = 14
_IOC_DIRBITS  = 2

_IOC_NRSHIFT   = 0
_IOC_TYPESHIFT = _IOC_NRSHIFT + _IOC_NRBITS
_IOC_SIZESHIFT = _IOC_TYPESHIFT + _IOC_TYPEBITS
_IOC_DIRSHIFT  = _IOC_SIZESHIFT + _IOC_SIZEBITS

_IOC_WRITE = 1
_IOC_READ  = 2

_IOC = lambda d,t,nr,size: (d << _IOC_DIRSHIFT) | (ord(t) << _IOC_TYPESHIFT) | \
     (nr << _IOC_NRSHIFT) | (size << _IOC_SIZESHIFT)
_IOW  = lambda t,nr,size: _IOC(_IOC_WRITE, t, nr, size)
_IOR  = lambda t,nr,size: _IOC(_IOC_READ, t, nr, size)
_IOWR = lambda t,nr,size: _IOC(_IOC_READ | _IOC_WRITE, t, nr, size)

HIDIOCGRDESCSIZE                =_IOR('H', 0x01, struct.calcsize("I"))
HIDIOCGRDESC                    =_IOR('H', 0x02, len(hidraw_report_descriptor()))
HIDIOCGRAWINFO                  =_IOR('H', 0x03, len(hidraw_devinfo()))
#HIDIOCGRAWINFO                 =_IOR('H', 0x03, struct.calcsize("Ihh"))
def HIDIOCGRAWNAME(buflen):     return _IOR('H', 0x04, buflen)
def HIDIOCGRAWPHYS(buflen):     return _IOR('H', 0x05, buflen)
def HIDIOCSFEATURE(buflen):     return _IOWR('H', 0x06, buflen)
def HIDIOCGFEATURE(buflen):     return _IOWR('H', 0x07, buflen)



def get_rawdevice_name(f):
  a = array.array('B', [0]*256)
  fcntl.ioctl(f, HIDIOCGRAWNAME(256), a, True)
  return a.tostring()


def find_device():
  """
  Simple device enumeration is using directly Linux /dev filesystem
  """
  filename = 'None'
  for hidfile in glob.glob('/dev/hidraw*'):
    logging.debug('Device filename      : %s' % hidfile)
    try:
      f = open(hidfile, "rw")
    except:
      continue
    devinfo = hidraw_devinfo()
    devinfo.get(f)
    if devinfo.vendor == 0x1b67 and devinfo.product == 0x0013:
      logging.debug('...is TEC')
      filename = hidfile
      f.close()
      break
    else:
      f.close()
  return filename

def read_feature_report(value):
  """
  Reading and writing of FEATURE reports is done directly by kernel ioctl
  """
  ret = array.array('B', [0]*len(value))
  result = fcntl.ioctl(f, HIDIOCGFEATURE(len(value)), ret, True)
  retstring = ' '.join( ['0x%02x '%b for b in ret] )
  logging.debug('READ returned  : %d, %s' % (result, retstring))
  return ret

def write_feature_report(value):
  result = fcntl.ioctl(f, HIDIOCSFEATURE(len(value)), value, True)
  valstring = ' '.join( ['0x%02x '%b for b in value] )
  logging.debug('WRITE          : %d, %s' % (result, valstring))
  return result

def get_feature(value):
  """
  Recommended way of setting and getting the FEATURE report is to read from controller
  to make sure it is not busy, then do a write with either get or set request. Then reading
  the controller until it says again it is ready.
  Note: It may return more than one value for some functionality.
  """
  value[1] = 0x01       # GET feature
  logging.debug('GET FEATURE    :')
  while True:
    if read_feature_report(value)[1] == 0xff:
      break
  write_feature_report(value)
  while True:
    ret = read_feature_report(value)
    if ret[1] == 0xff:
      break
  return ret[2]

def set_feature(value):
  value[1] = 0x00       # SET feature
  logging.debug('SET FEATURE    :  result 0xFF means OK')
  while True:
    if read_feature_report(value)[1] == 0xff:
      break
  write_feature_report(value)
  while True:
    ret = read_feature_report(value)
    if ret[1] == 0xff:
      break
  return ret[1]         # Does not return a set value, FF means OK

def read_interrupt_in(f):
  """
  Reading and writing of data directly to device reads or writes the data to/from EndPoints. 
  The interpretation of data is on the controllers side.
  
  As those data are read as bytes, but we are using the Python structures, we have to use
  the struct packing and unpacking to convert data into correct format. This may be done also 
  in the same way as the above HID reports but is written on the lower abstraction level to make it
  easy to understand.
  """
  buf = array.array('B', [0]*10)
  buf[0] = 0            # Report number (always 0)
  try:
    buf = f.read(10)
  except IOError:
    logging.critical('Can not read report in')
  buf = struct.unpack('10B', buf)
  logging.debug('READ INTERRUPT IN      : Frame num.: %02x %02x, GPIO: %02x, I2C ID: %02x, I2C COUNT: %02x, I2C DATA: %02x %02x %02x %02x, TEC state: %02x' % (buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9]) )
  return buf


def write_interrupt_out(f, buf):
  buf[0] = 0            # Report number (always 0)
  try:
    res = f.write(struct.pack('13B', buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11], buf[12]))
  except IOError:
    logging.critical('Can not write report out')
  logging.debug('WRITE INTERRUPT OUT    : OUT1CMD: %02x, OUT2CMD: %02x, WDS: %02x, I2C ID: %02x, I2C CMD: %02x, I2C ADDR: %02x, I2C COUNT: %02x, I2C DATA: %02x %02x %02x %02x %02x' % (buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11], buf[12]) )
  return buf

def signal_handler(signal, frame):
  logging.debug('Disabling watchdog')
  wdt_mode[6] = 0x00
  set_feature(wdt_mode)
  f.close()
  sys.exit(0)


if __name__=="__main__":

  # Set signal handler for SIGINT and SIGTERM to disable watchdog on them
  signal.signal(signal.SIGINT, signal_handler)
  signal.signal(signal.SIGTERM, signal_handler)

  parser = optparse.OptionParser()
  parser.add_option('-l', '--logging-level', help='Logging level')
  parser.add_option('-f', '--logging-file', help='Logging file name')
  parser.add_option('-w', '--watchdog-run', help='Sets watchdog and runs forever to refresh WD', action='store_true', dest='watchdog_run')
  parser.add_option('-i', '--itwoc', help='Send some test data thru I2C controller on TEC', action='store_true', dest='itwoc')
  parser.add_option('-g', '--gpio', help='Set GPIO to output and toggle all pins', action='store_true', dest='gpio')
  parser.add_option('-r', '--read-int-in', help='Read interrupt in report', action='store_true', dest='read_int_in')
  (options, args) = parser.parse_args()

  # Just to enable easy logging/message printing
  logging_level = LOGGING_LEVELS.get(options.logging_level, logging.CRITICAL)
  logging.basicConfig(level=logging_level, filename=options.logging_file,
                      format='%(asctime)s %(levelname)s: %(message)s',
                      datefmt='%Y-%m-%d %H:%M:%S')

  filename = find_device()
  try:
    f = open(filename, 'r+b')   # read and update in binary mode
  except IOError:
    logging.critical('IOError: No suitable device found')
    raise

  logging.debug("Found          : %s" % get_rawdevice_name(f))

  # Definition of limited set of features - byte definitions are from Robin documentation
  # Feature = array of bytes [rpt.num=0, set=0/get=1, RAM=0/Flash=1, OAK type 3 bytes, value(s)]
  wdt_mode  = array.array('B', [0x00, 0x00, 0x00, 0x01, 0x05, 0x00, 0x00])      # value 0x00 = off, 0x01 = reset after timeout (10s default), 0x02 asserts COM exp. pin only
  wdt_int   = array.array('B', [0x00, 0x00, 0x00, 0x02, 0x02, 0x00, 0x00, 0x00])        # values LSB, MSB time in 0.1s
  wdt_serv  = array.array('B', [0x00, 0x00, 0x80, 0x00, 0x01, 0x00])            # service (refresh) watchdog
  gpio_dir  = array.array('B', [0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00])      # value per bit 0 = input, 1 = output
  gpio_mode = array.array('B', [0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00])      # value per bit 0 = CMOS, 1 = open drain

  if options.gpio:
    print 'Get WDT mode: 0x%02x' % get_feature(wdt_mode)
    gpio_dir[6] = 0xfa
    print 'Set GPIO dir: 0x%02x' % set_feature(gpio_dir)
    print 'Get GPIO dir: 0x%02x' % get_feature(gpio_dir)
    # INTERRUPT OUT = array of bytes [rpt.num=0, OUT1,2 CMD (GPIO), WDS, I2C-ID, -CMD, -ADDR, -COUNT, -5xDATA]
    out = array.array('B', [0x00, 0x00, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
    write_interrupt_out(f, out)


  if options.itwoc:
    # Sendig data thru I2C in TEC to e.g. GPIO expander at I2C address 0x21
    # INTERRUPT OUT = array of bytes [rpt.num=0, OUT1,2 CMD (GPIO), WDS, I2C-ID, -CMD, -ADDR, -COUNT, -5xDATA]
    out = array.array('B', [0x00, 0x00, 0xff, 0x00, 0x00, 0x08, 0x21, 0x03, 0x06, 0x00, 0x00, 0x00, 0x00])
    print "Write interrupt out:", out
    write_interrupt_out(f, out)
    out = array.array('B', [0x00, 0x00, 0xff, 0x00, 0x00, 0x08, 0x21, 0x03, 0x00, 0xff, 0xff, 0x00, 0x00])
    print "Write interrupt out:", out
    write_interrupt_out(f, out)
    time.sleep(2)
    out = array.array('B', [0x00, 0x00, 0xff, 0x00, 0x00, 0x08, 0x21, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00])
    print "Write interrupt out:", out
    write_interrupt_out(f, out)

  if options.read_int_in:
    print "Read interrupt in:", read_interrupt_in(f)

  if options.watchdog_run:
    logging.debug('Starting watchdog')
    wdt_mode[6] = 0x01;
    set_feature(wdt_mode)
    while 1:
      logging.debug('Refreshing watchdog')
      set_feature(wdt_serv)
      time.sleep(7)

  f.close()


# Not used 
def leftover():
#  tmp = uint()
#  tmp.get_size(f)
#  print "size 0x%x" % tmp.uint

#  print HIDIOCGRAWNAME(256)
#  get_rawdevice_name(f)  

#  info = struct.pack(struct.calcsize("Ihh")*"x")
#  print HIDIOCGRAWINFO
#  data = fcntl.ioctl(f, HIDIOCGRAWINFO, info)
#  fields = struct.unpack("Ihh", data)
#  print fields
  return

Email comment