Generating LEGO Images for Training a CNN

Reading time ~11 minutes

After having success with training a CNN on our initial dataset, we decided to up the game on generating training images. My buddy Rockets built a nice little turntable and ordered a couple of NEMA17s for each of us. His idea was we could both start generating training images.

I asked if he would be OK with me ordering some RAMPs boards and programming them to synchronize with the PiCamera. I figured, it would probably be better for reproducibility if we had solid hardware, with custom firmware and software.

After a few hours of coding over a couple of weeks I was able to control the RAMPs within a Python script from either the Raspberry Pi or a desktop computer.

I’ve listed the code parts below with a brief explanation–just in case someone would like to hack them for other projects.

Minimum Viable Hack

Warning words, I’m an advocate of the minimum viable product, especially, when it comes to my personal hacking time. I refer to this as the minimum viable hack. That stated, there are known issues in the code below. But! It does the job–so I’ve not addressed them.

Here are a few:

  1. The value 0x0A (\n) value is not handled as part of packet (e.g., if MILLI_BETWEEN = 10 bad things will happen).
  2. The motors are always on (reduces motor life).
  3. Pulse width is not adjustable without firmware update.
  4. The Python code is blocking. This makes the halt feature on the Arduino Mega side fairly useless.
  5. Only RAMPs motor X is setup (this one I will address later, as we will need several drivers before the end of this project).

RAMPS Code

To move the turn table we used a RAMPs 1.4 board:

Getting things going was straightforward. I put together the hardware, installed the Arduino IDE, and looked-up the pinout for the RAMPs controller.

I wrote the firmware to receive serial commands as a packet. The packet structure (at time of writing) looks like this:

MOTOR_PACKET = 0x01 0x01 0x00 0x03 0xE8 0x05 0x0A
INDEX        =  1    2     3    4    5    6   7
  • first_byte = This indicates what sort of packet type. Right now, there is only one, but I figure we might want to control other I/O on the Arduino later.
  • second_byte = the motor selected 1-5 (X, Y, Z, E1, E2).
  • third_byte = Motor direction, 0x00 is clockwise and 0x01 is counter-clockwise.
  • fourth_byte = first chunk of the steps.
  • fifth_byte = second chunk of the steps. The steps variable tells the motor how many steps to move before stopping.
  • sixth_byte = delay between steps in milliseconds.
  • seventh_byte = the end-of-transmission (EOT) character. I’ve used \n.

When the code receives an EOT character, it parses the packet and calls the writeMotor(). This function loops through the number of steps, delaying between each. Each loop, the function checks if a halt command has been received. If it has, it stops the motor mid-move.

Again, this code isn’t perfect. Far from it. But it does the job.

#include <avr/interrupt.h> 
#include <avr/io.h> 
// https://reprap.org/mediawiki/images/f/f6/RAMPS1.4schematic.png
// https://reprap.org/forum/read.php?219,168722

// TODO: Pulse width set by initialization.
// TODO: Setup all motors to be selected by master.
// TODO: Add a timer to shutdown motors after threshold.
//       And keep motor enabled until threshold has been met.
// TODO: Handle 0x0A values as part of packet (e.g., if MILLI_BETWEEN = 10).
// TODO: Add a "holding torque" feature; making it so motors never disable.

// For RAMPS 1.4
#define X_STEP_PIN         54
#define X_DIR_PIN          55
#define X_ENABLE_PIN       38
#define X_MIN_PIN           3
#define X_MAX_PIN           2

#define Y_STEP_PIN         60
#define Y_DIR_PIN          61
#define Y_ENABLE_PIN       56
#define Y_MIN_PIN          14
#define Y_MAX_PIN          15

#define Z_STEP_PIN         46
#define Z_DIR_PIN          48
#define Z_ENABLE_PIN       62
#define Z_MIN_PIN          18
#define Z_MAX_PIN          19

#define E_STEP_PIN         26
#define E_DIR_PIN          28
#define E_ENABLE_PIN       24

#define SDPOWER            -1
#define SDSS               53
#define LED_PIN            13

#define FAN_PIN            9

#define PS_ON_PIN          12
#define KILL_PIN           -1

#define HEATER_0_PIN       10
#define HEATER_1_PIN       8
#define TEMP_0_PIN         13   // ANALOG NUMBERING
#define TEMP_1_PIN         14   // ANALOG NUMBERING

#define MOTOR_X         0x01
#define MOTOR_Y         0x02
#define MOTOR_Z         0x03
#define MOTOR_E1        0x04
#define MOTOR_E2        0x05

#define DRIVE_CMD       (char)0x01
#define HALT_CMD        (char)0x0F
#define DIR_CC          (char)0x00
#define DIR_CCW         (char)0x01

#define COMPLETED_CMD   (char)0x07
#define END_TX          (char)0x0A
#define ACK             (char)0x06 // Acknowledge
#define NACK            (char)0x15 // Negative Acknowledge


// Determine the pulse width of motor.
#define MOTOR_ANGLE           1.8
#define PULSE_WIDTH_MICROS    360 / MOTOR_ANGLE

#define RX_BUFFER_SIZE 16

/*
  MOTOR_NUM:
      X     = 0
      Y     = 1
      Z     = 2
      E1    = 3
      E2    = 4
      
  PACKET_TYPES
      0x01 = motor_write
      0x02 = motor_halt

  DIRECTION
      0x00 = CW
      0x01 = CCW

  MOTOR MOVE PROTOCOL:
                       0               1     2     3        4       5         6
  MOTOR_PACKET = PACKET_TYPE_CHAR MOTOR_NUM DIR STEPS_1 STEPS_2 MILLI_BETWEEN \n
  MOTOR_PACKET =    01                01    00    03     E8        05         0A
  MOTOR_PACKET =    0x 01010003E8050A

  HALT         = 0x0F
*/


/* Create a structure for the motors
 *  direction_pin = pin to control direction of stepper.
 *  step_pin      = pin to control the steps.
 *  enable_pin    = pin to enable motor.
 */
struct MOTOR {
  uint8_t direction_pin;
  uint8_t step_pin;
  uint8_t enable_pin;
  uint8_t pulse_width_micros;
};

struct BUFFER {
  uint8_t data[RX_BUFFER_SIZE];
  uint8_t bufferSize;
  uint8_t index;
  boolean packetComplete;
  uint8_t shutdownThreshold;
};

/* Initialize motors */
MOTOR motorX = {
      X_DIR_PIN,
      X_STEP_PIN,
      X_ENABLE_PIN,
      PULSE_WIDTH_MICROS
};

// Urgent shutdown.
volatile boolean halt = false;
volatile static bool triggered;

/* Initialize RX buffer */
BUFFER rxBuffer;;



/* Initialize program */
void setup()
{
  Serial.begin(115200);
  
  // Initialize the structures
  motorSetup(motorX);
  rxBuffer.bufferSize = RX_BUFFER_SIZE;

  // Disable holding torque.
  digitalWrite(motorX.enable_pin, HIGH);
}

/* Main */
void loop()
{
  // If packet is packetComplete
  if (rxBuffer.packetComplete) {
    
    uint8_t packet_type = rxBuffer.data[0];

    switch (packet_type) {
      case DRIVE_CMD:
        {
          // Unpack the command.
          uint8_t motorNumber =  rxBuffer.data[1];
          uint8_t direction =  rxBuffer.data[2];
          uint16_t steps = ((uint8_t)rxBuffer.data[3] << 8)  | (uint8_t)rxBuffer.data[4];
          uint8_t milliSecondsDelay = rxBuffer.data[5];

          // Let the master know command is in process.
          sendAck();
  
          // Start the motor
          writeMotor(motorX, direction, steps, milliSecondsDelay);
        }
        break;
      default:
        sendNack();
        break;
    }
    // Clear the buffer for the nexgt packet.
    resetBuffer(&rxBuffer);
  }
}


/*  ############### MOTORS ############### */

/* Method for initalizing MOTOR */
void motorSetup(MOTOR motor) {

  // Setup motor pins
  pinMode(motor.direction_pin, OUTPUT);
  pinMode(motor.step_pin, OUTPUT);
  pinMode(motor.enable_pin, OUTPUT);

}

/* Write to MOTOR */
void writeMotor(MOTOR motor, int direction, uint16_t numberOfSteps, int milliBetweenSteps) {

    // Enable motor.
    digitalWrite(motor.enable_pin, LOW);

    // Check direction;
    switch (direction) {
      case DIR_CC:
        digitalWrite(motor.direction_pin, HIGH);
        break;
      case DIR_CCW:
        digitalWrite(motor.direction_pin, LOW);
        break;
      default:
        sendNack();
        return;
    }

    // Move the motor (but keep an eye for a halt command)
    for(int n = 0; n < numberOfSteps; n++) {
      // Interrupt motor
      if(checkForHalt()) {  
        sendAck();
        break; 
      }
      digitalWrite(motor.step_pin, HIGH);
      delayMicroseconds(motor.pulse_width_micros);
      digitalWrite(motor.step_pin, LOW);
      delay(milliBetweenSteps);
    }

    // Disable holding torque.
    digitalWrite(motor.enable_pin, HIGH);

    // Let the user know the move is done.
    sendCompletedAction();
}

// END MOTORS

/*  ############### COMMUNICATION ###############
 * 
*/
void serialEvent() {

  // Get all the data.
  while (Serial.available()) {

    // Read a byte
    uint8_t inByte = (uint8_t)Serial.read();

    // Store the byte in the buffer.
    rxBuffer.data[rxBuffer.index] = inByte;
    rxBuffer.index++;

    // If a complete packet character is found, mark the packet
    // as ready for execution.
    if ((char)inByte == '\n') {
      rxBuffer.packetComplete = true;
    }
    
  }
}

// Clear the buffer.
void resetBuffer(struct BUFFER *buffer) {
  memset(buffer->data, 0, sizeof(buffer->data));
  buffer->index = 0;
  buffer->packetComplete = false;
}

// Does not count termination char.
int packetLength(BUFFER buffer){
  for(int i = 0; i < buffer.bufferSize; i++) {
    if((char)buffer.data[i] == '\n'){ return i; }
  }
  return -1;
}

void sendAck() {
  Serial.write(ACK);
  Serial.write(END_TX);
}

void sendNack() {
  Serial.write(ACK);
  Serial.write(END_TX);
}

void sendCompletedAction() {
  Serial.write(COMPLETED_CMD);
  Serial.write(END_TX);
}

// Halt is handled outside normal communication protocol.
boolean checkForHalt() {
  if (Serial.available()){
    // Halt command has no termination character.
    if ((uint8_t)Serial.read() == HALT_CMD) {
      return true;
    }
  }
  return false;
}

// END COMMUNICATION

Python RAMPS

There are two variants of the Python code. First, is for the Raspberry Pi. It’s where I focused coding time, as it made sense to generate training images using the same hardware (PiCamera) as would be used for production. However, I’ve a simpler desktop version which uses OpenCV and a webcam.

For the Raspberry Pi and desktop versions you will need the following:

  • Python 3.7 – this should be standard on Raspbian Buster.

On the desktop you will need opencv, it can be installed using:

pip install opencv

In both cases you will need the custom class ramps_control, if you clone the repository and run your script from the ./turn_table directory, that should be handled for you.

What’s it Do?

The turn table script initializes the camera. It then creates a loop over the number of angles you want to take images.

A full rotation is 3200 steps and if you ask for 60 images, then the script will rotate the turntable ~53.33 steps. At the end of the rotation, the script will capture an image of your target. Then, it will rotate another 53.33 steps and take another picture. It will do this 60 times, where it should have completed a full rotation.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 25 05:58:48 2019

@author: ladvien
"""

from picamera import PiCamera
import os
from time import sleep
import ramps_control
import serial
import glob

#################
# Parameters
#################

MILLI_BETWEEN_STEPS     = 5
IMAGES_PER_ROTATION	    = 60
FULL_ROTATION           = 3200
STEPS_BEFORE_PIC 	    = int(FULL_ROTATION / IMAGES_PER_ROTATION)

print(f'Steps per image: {STEPS_BEFORE_PIC}')

####################
# Don't Overwrite
####################
def check_existing_images(output_directory):
    existing_image_files = glob.glob(f'{output_directory}/*.jpg')
    max_file_index = 0
    for file in existing_image_files:
        file_index = file.split('/')[-1].split('_')[1].replace('.jpg', '')
        try:
            file_index = int(file_index)
            if file_index > max_file_index:
                max_file_index = file_index
        except:
            pass
    return max_file_index

#################
# Open Serial
#################
ser = serial.Serial('/dev/ttyUSB0', 115200)
print(ser.name)   

#################
# Init Camera
#################
#picam v2 resolution 3280 x 2464
camera = PiCamera()
PIC_SIZE = 1200
CAM_OFFSET_X = 0
CAM_OFFSET_Y = 0
camera.start_preview()

#################
# Init RAMPS
#################
ramps = ramps_control.RAMPS(ser, debug = False)

# Track whether the motor is at work.
motor_moving = False

# Reset the RAMPs program.
ramps.reset_ramps(False)

#################
# Main
#################

part = ''

while True:
    part_candidate = input(f'Enter part number and hit enter. (Default {part}; "q" to quit): ')

    if part_candidate.lower() == 'q':
        print('Bye!')
        quit()
    elif part_candidate != '':
        part = part_candidate

    output_directory = f'/home/pi/Desktop/lego_images/{part}' 

    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    max_file_index = check_existing_images(output_directory)

    for i in range(IMAGES_PER_ROTATION):

            success = ramps.move(ramps.MOTOR_X,
                        ramps.DIR_CCW,
                        STEPS_BEFORE_PIC,
                        MILLI_BETWEEN_STEPS)
            
            if success:
                print('Table move a success.')

                file_path = f'{output_directory}/{part}_{i + max_file_index}.jpg'
                print(file_path)
                camera.capture(file_path)
                
            # sleep(0.05)

ser.close()
camera.stop_preview()

Python RAMPS Class

To increase resuability of the code, I’ve abstracted the RAMPs controller code into a Python class. This class is called by the script above. It is blocking code which handles sending commands, polling the Arduino, and reports received information.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Sep 28 05:39:18 2019

@author: ladvien
"""
from time import sleep, time


"""
  MOTOR_NUM:
      X     = 0
      Y     = 1
      Z     = 2
      E1    = 3
      E2    = 4
      
  PACKET_TYPES
      0x01 = motor_write
      0x02 = motor_halt

  DIRECTION
      0x00 = CW
      0x01 = CCW

  MOTOR MOVE PROTOCOL:
                       0               1     2     3        4       5         6
  MOTOR_PACKET = PACKET_TYPE_CHAR MOTOR_NUM DIR STEPS_1 STEPS_2 MILLI_BETWEEN \n

"""

class RAMPS:
    DRIVE_CMD       = 0x01
    HALT_CMD        = 0x0F
    DIR_CC          = 0x00
    DIR_CCW         = 0x01
    
    COMPLETED_CMD   = 0x07
    END_TX          = 0x0A
    ACKNOWLEDGE     = 0x06
    NEG_ACKNOWLEDGE = 0x15
        
    
    MOTOR_X         = 0x01
    MOTOR_Y         = 0x02
    MOTOR_Z         = 0x03
    MOTOR_E1        = 0x04
    MOTOR_E2        = 0x05
    
    def __init__(self, ser, debug = False):
        self.ser = ser
        self.toggle_debug = debug
        self.rx_buffer_size = 256
        self.serial_delay = 0.1
        
    def toggle_debug(self):
        self.debug = not self.debug
        
    def print_debug(self, message):
        if self.toggle_debug:
            print(message)
    
    """ 
            COMMUNICATION
    """
    
    # Prepare for a serial send.
    def encode_packet(self, values):
        return bytearray(values)
    
    # Prepare a packet the slave will understand
    def prepare_motor_packet(self, motor_num, direction, steps, milli_between):
        steps_1 = (steps >> 8) & 0xFF
        steps_2 = (steps) & 0xFF
        return [self.DRIVE_CMD, motor_num, direction, steps_1, steps_2, milli_between, self.END_TX]
    
    def read_available(self, as_ascii = False):
        
        self.print_debug(f'Reading available.')
        
        # 1. Get all available data.
        # 2. Unless buffer exceeded.
        # 3. Return a list of the data.
        
        incoming_data = []
        incoming_data_size = 0
        
        while self.ser.in_waiting > 0:
            incoming_data_size += 1
            
            if incoming_data_size > self.rx_buffer_size:
                self.print_debug(f'Buffer overflow.')
                return list('RX buffer overflow.')
            
            if as_ascii:
                incoming_data.append(self.ser.readline().decode('utf-8'))
            else:
                incoming_data += self.ser.readline()

        self.print_debug(f'Completed reading available.')
        return incoming_data


    def check_for_confirm(self, command_expected):
        confirmation = self.read_available()
        if len(confirmation) > 0:
            if confirmation[0] == command_expected:
                return True
        else:
            return False

    
    """ 
            RAMPS UTILITY
    """
    
    def reset_ramps(self, print_welcome = False):

        self.print_debug(f'Reseting Arduino.')
        # Reset the Arduino Mega.
        self.ser.setDTR(False)
        sleep(0.4)
        self.ser.setDTR(True)
        sleep(2)   
        
        # Get welcome message.
        welcome_message = []
        
        while self.ser.in_waiting > 0:
            welcome_message.append(self.ser.readline().decode('utf-8') )
        
        self.print_debug(f'Completed reset.')
        if print_welcome:
            # Print it for the user.
            print(''.join(welcome_message))
            return
        else:
            return

    """ 
            MOTOR COMMANDS
    """
    def move(self, motor, direction, steps, milli_secs_between_steps):
        
        # 1. Create a list containg RAMPs command.
        # 2. Encode it for serial writing.
        # 3. Write to serial port.
        # 4. Check for ACK or NACK.
        # 5. Poll serial for completed command.
        
        packet = self.prepare_motor_packet(motor,
                                           direction,
                                           steps,
                                           milli_secs_between_steps)
        packet = self.encode_packet(packet)
        
        self.print_debug(f'Created move packet: {packet}')
        
        self.write_move(packet)
        
        # Don't miss ACK to being in a hurry.
        sleep(self.serial_delay)
        confirmation = self.read_available()
        if confirmation[0] == self.ACKNOWLEDGE:
            self.print_debug(f'Move command acknowledged.')
        
        if(self.wait_for_complete(120)):
            return True
        
        return False
    
    def wait_for_complete(self, timeout):
        
        # 1. Wait for complete or timeout
        # 2. Return whether the move was successful.
        
        start_time = time()
        
        while True:
            now_time = time()
            duration = now_time - start_time
            self.print_debug(duration)
            if(duration > timeout):
                return False
            
            if self.check_for_confirm(self.COMPLETED_CMD):
                self.print_debug(f'Move command completed.')
                return True
            
            sleep(self.serial_delay)
    
    def write_move(self, packet):        
        self.ser.write(packet)
        self.print_debug(f'Executed move packet: {packet}')

Questions

That’s pretty much it. I’ve kept this article light, as I’m saving most of my free time for coding. But, feel free to ask questions in the comments below.

What is a Data Warehouse

## Insights over DataData. They are the plastic of the tech world. We're are making way too much of it, you can't seem to get rid of it, ...… Continue reading