Creating a Neural Network Webservice

We’re almost done. In the previous articles we’ve used a local machine to train a CNN to detect toxic sentiment in text. Also, we prepared a small (1GB RAM) server to use this pre-trained network to make predictions. Now, let’s finish it and create a webservice where anyone can access our awesome magical algorithm.

Prediction Service

On your remote server, navigate to your flask_app folder and create a file called nn_service.py. The following code creates an HTTP request endpoint /detect-toxic and it exposes to other programs running on the server. A bit more explanation after the code.

cd /home/my_user/flask_app
nano nn_service.py

Enter the following:

from flask import Flask, request
application = Flask(__name__)

from keras.models import load_model
from keras.preprocessing.sequence import pad_sequences
import numpy as np
import pymongo
import json

# Parameters
mongo_port = 27017
embedding_collection = 'word_embeddings'
word_embedding_name = 'glove-wiki-gigaword-50'
pad_length = 100

# Globals
global model, graph

# Connection to Mongo DB
try:
    mong = pymongo.MongoClient('127.0.0.1', mongo_port)
    print('Connected successfully.')
except pymongo.errors.ConnectionFailure:
    print('Could not connect to MongoDB: ' + e)

db = mong[embedding_collection]
coll = db[word_embedding_name]

# Load Keras Model
model = load_model('/home/my_user/flask_app/models/tox_com_det.h5')
model._make_predict_function()

# Start flask
if __name__ == '__main__':
    application.run(host='127.0.0.1')

@application.route('/detect-toxic', methods=['POST'])
def sequence_to_indexes():
    with open('nn_service.log', 'w+') as file:
        file.write('here')
    if request.method == 'POST':
        try:
            sequence = request.json['sequence']
        except:
            return get_error('missing parameters')
        response = {
            'prediction': prediction_from_sequence(sequence, pad_length)
        }
        return str(response)

def get_word_index(word):
    index = ''
    try:
        index = coll.posts.find_one({'word': word})['index']
    except:
        pass
    return index

def get_error(message):
    return json.dumps({'error': message})

def prediction_from_sequence(sequence, pad_length):
    sequence = sequence.lower()
    sequence_indexes = []
    for word in sequence.split():
        try:
            index = int(get_word_index(word.strip()))
        except:
            index = 0
        if index is not None:
            sequence_indexes.append(index)
    sequence_indexes = pad_sequences([sequence_indexes], maxlen=pad_length)
    sample = np.array(sequence_indexes)
    prediction = model.predict(sample, verbose = 1)
    prediction_labels = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']
    prediction_results = str({prediction_labels[0]: prediction[0][0],
                              prediction_labels[1]: prediction[0][1],
                              prediction_labels[2]: prediction[0][2],
                              prediction_labels[3]: prediction[0][3],
                              prediction_labels[4]: prediction[0][4],
                              prediction_labels[5]: prediction[0][5]
                            })
    return prediction_results

What’s going on? Well, it’s an extension of code I’ve detailed in earlier parts of this series. However, there are a couple of new pieces.

First, we are connecting to our MongoDB database containing the contextual word-embeddings. This database is used to look up words, which have been sent to our service endpoint.

The only route in this server is a POST service. It takes one argument: sequence. The sequence is the text the webservice consumer would like to have analyzed for toxic content. The endpoint calls the prediction_from_sequence(). Inside the function, the word indexes are pulled from the word_embeddings database. After, the newly converted sequence is padded to the needed 100 dimensions. Then, this sequence is passed to our CNN, which makes the prediction. Lastly, the prediction is converted to JSON and returned to the user.

Before we go much further, let’s test the script to make sure it actually works. Still in the flask_app directory type, replacing my_user with your user name and name_of_flask_app.py with the name of your Flask app:

echo "# Flask variables" &>> /home/my_user/.bashrc
echo "export FLASK_APP=name_of_flask_app.py" &>> /home/my_user/.bashrc

This sets FLASK_APP variable, which is used when executing the Flask webservice.

Ok, we should be able to test the app fully now:

flask run

You should be greeted with something similar to:

 * Serving Flask app "nn_service.py"
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: off
Using TensorFlow backend.
Connected successfully.
2019-02-03 15:53:26.391389: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2019-02-03 15:53:26.398145: I tensorflow/core/common_runtime/process_util.cc:69] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

Great! We’re on the home stretch.

I’ve prepared a curl statement to test the server. You will need to leave the Flask program running and open a second terminal to your server. When the second terminal is up paste in the following, replacing the “sequence” with something nasty or nice.

curl -X POST \
  http://localhost:5000/detect-toxic \
  -H 'Content-Type: application/json' \
  -d '{"sequence":"im pretty sure you are a super nice guy.","padding": 100}'

You should get back an appropriate response: local-curl-test-neural-net-webservice

NodeJS and node-http-proxy

It gets a bit weird here. Usually, one will setup a Flask server with uwsgi or gunicorn combined with nginx. However, I found the uwsgi middle-ware was creating two instances of my project, which would not fit in the microserver’s RAM. I spent a lot of time creating a server the proper only to be disheartened when I discovered uwsgi was creating two instances of the nn_service.py, thereby attempting to load two of the CNNs into memory. Our poor server. I gave up on “proper” and went with what I describe below. However, I’ve created a bash script to completely setup a server for you the “proper” way. I’ve added it to the Appendix.

I’ve opted to run Flask and serve it with a nodejs server as a proxy.
neural-net-service-stack

The nodejs is atypical, but I found it probably the most simple to setup. So, eh.

Let’s install NodeJS on the server.

sudo yum install -y nodejs

Now move to the directory containing your flask_app and initialize a node project.

cd /home/my_user/flask_app
npm init

You will be prompted to enter the project–take your time to fill it out or skip it by hitting return repeatedly.

Once the project has been setup, let’s install the node-http-proxy package. It will allow us to create a proxy server sitting on top of our Flask service in a couple of lines of code.

Still in your project directory:

npm install node-http-proxy
nano server.js

Inside the server file place:

var http = require('http'),
    httpProxy = require('http-proxy');
httpProxy.createProxyServer({target:'http://localhost:5000'}).listen(8000);

Alright, before testing our Flask webservice we need to allow 8000 port access and allow HTTP / HTTPS request on the firewall.

firewall-cmd --permanent --zone=public --add-service=http
firewall-cmd --permanent --zone=public --add-service=https
sudo firewall-cmd --zone=public --add-port=8000/tcp --permanent
sudo firewall-cmd --reload

You can test the whole proxy setup by opening two terminals to your server. In one, navigate to your Flask app and run it:

cd /home/my_user/flask_app
flask run

In the other navigate to the node proxy file and run it:

cd /home/my_user/flask_app/proxy
node server.js

Now, you should be able to make a call against the server. This time, run the curl command from your local machine–replacing the my_server_ip with your server’s IP address:

curl -X POST \
  http://my_server_ip:8000/detect-toxic \
  -H 'Content-Type: application/json' \
  -d '{"sequence":"im pretty sure you are a super nice guy.","padding": 100}'

You should get a response exactly like we saw from running the curl command locally.

Daemonize It

The last bit of work to do is create two daemons. One will keep the Flask app running in the background. The other, will keep the proxy between the web and the Flask app going.

One caveat before starting, because daemons are loaded without the PATH variable all file references must use absolute paths.

At the server’s command prompt type:

sudo nano /etc/systemd/system/nn_service.service

And add the following replacing my_user with your user name:

[Unit]
Description=Flask instance to serve nn_service
After=network.target

[Service]
User=my_user
Group=my_user
WorkingDirectory=/home/my_user/flask_app
ExecStart=/usr/local/miniconda/bin/flask run

[Install]
WantedBy=multi-user.target

This will create a service. It will run the program pointed to by ExecStart, in our case flask run, inside the directory pointed by WorkingDirectory.

Save and exit.

Now, let’s create the nn_service_proxy.service daemon:

sudo nano /etc/systemd/system/nn_service_proxy.service

And enter the following replacing my_user with your user name:

Description=Proxy to Flask instance to serve nn_service
After=network.target

[Service]
User=my_user
Group=my_user
WorkingDirectory=/home/my_user/flask_app/node
ExecStart=/usr/bin/node /home/my_user/flask_app/node/nn_service_proxy.js

[Install]
WantedBy=multi-user.target

Great! We’re ready to enable and start them.

sudo systemctl enable nn_service.service
sudo systemctl enable nn_service_proxy.service
sudo systemctl start nn_service.service
sudo systemctl start nn_service_proxy.service

Alright, you can now check the system journal to make sure they loaded correctly:

sudo journalctl -xe

But, it should be good. If something goes wrong, definitely ask questions in the comments. Otherwise, we should be ready to test our full functioning toxic text detection webservice!

curl -X POST \
  http://my_server_ip:8000/detect-toxic \
  -H 'Content-Type: application/json' \
  -d '{"sequence":"im pretty sure you are a super nice guy.","padding": 100}'

Wow! What a journey right. But pretty damn cool. We now have a webservice which can be called by anyone who wants to check text to see if it contains toxic sentiment. I didn’t have an application when starting this project, but I’m learning webscraping with a friend, and I think it’ll be great to pass text off to this webservice and have it flagged if contains nasty content.

“Proper” Flask Webservice Setup

I’ve written a script to setup the webservice for you. First, you will need to be logged into your Centos 7 server as root.

Then type:

yum install -y wget
wget http://ladvien.com/assets/centos_nn_webservice.sh
chmod +x centos_nn_webservice.sh

What this script does:

  1. Sets up a new user
  2. Adds Miniconda to the PATH variable.
  3. Adds Flask environment variables (needed to run app).
  4. Updates the server.
  5. Creates the flask_app directories
  6. Opens the needed ports
  7. Installs nginx
  8. Creates a nginx .conf file with information to proxy uwsgi service.
  9. Installs uwsgi creates a .ini file for wrapping the Flask app.
  10. Creates and enables a uwsgi daemon.
  11. Creates and enables a Flask daemon.
  12. Installs Miniconda, tensorflow, and sets Python to 3.6.8.
  13. Installs MongoDB
  14. Enables remote editing from VSCode (info)

We’re about to execute the script, but there’s a critical step I wanted to explain first. The script is going to take several commandline arguments. If these are wrong, it’ll royally jake up your server.

./centos_nn_webservice.sh user_name user_password flask_app_name flask_port
  • user_name This will be the user who provides the webservice
  • user_password The user’s password. You’ll need this to ssh into the server as this user.
  • flask_app_name This is the name of your app. Everything from the Python script to the daemon will be labeled with this name.
  • flask_port This is the port which will be exposed to the web.

Ok, replace all of the above commandline arguments with the ones you prefer and execute it. Cross your fingers or yell at me in the comments.

Preparing a Small Server for a Neural Network Webservice

Previously, I wrote about training a CNN to detect toxic comments from text alone. But, I realized, even if one has a nice little NN to solve all the world’s problems it doesn’t help unless it is in production.

This article is going to cover how to prepare a server and needed word embeddings to mechanize the NN in a Flask webservice.

Server Setup: Preamble

For this project I’m using a small server from Linode–called a “Nanode.” At the time of writing these servers are only $5 a month. The catch? They only have 1GB of RAM. It’s definitely going to be tricky to deploy our CNN there, but let’s see it through.

  • https://www.linode.com/pricing

As for setting up the server, I’ve written about it elsewhere:

For this particular project, I decided to go with a CentOS 7 distribution.

For those of you who know me; I’m not betraying Arch Linxu, however, this project will be using MongoDB and there’s a bit of drama going on. I will leave some Arch Linux instructions in the Appendix, in case it is ever resolved.

I chose CentOS because it is the distro we use at work and I hoped to get some experience using it.

Setup User on Centos

Login as root and update the system

yum update -y

Let’s add another user; setting up the system as root is not a best practice.

useradd my_user
passwd my_user

Set the password for the my_user

Now, let’s give the my_user sudo powers

EDITOR=nano visudo

Find line with:

root    ALL=(ALL)    ALL

And add the exact same entry for my_user. It should look like this when done

root    ALL=(ALL)    ALL
my_user    ALL=(ALL)    ALL

Save the file and exit.

Let’s login as our new user. Exit your shell and login back in as the my_user. It should look something like this, typed on your local computer command line.

ssh my_user@erver_ip_address

Once logged in, let’s test the my_user’s sudo powers

sudo ls

If you are greeted with:

We trust you have received the usual lecture from the local System
Administrator. It usually boils down to these three things:

    #1) Respect the privacy of others.
    #2) Think before you type.
    #3) With great power comes great responsibility.

[sudo] password for my_user: 

Then task complete! Otherwise, feel free to ask questions in the comments.

Setup Miniconda on Centos

Anaconda is a great package system for Python data analyst tools. It takes care of a lot of silly stuff. Miniconda is the commandline version fo Anaconda, which we will be using.

Install it by entering the following and agreeing to the terms.

sudo yum install -y wget bzip2
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
chmod +x Miniconda3-latest-Linux-x86_64.sh
./Miniconda3-latest-Linux-x86_64.sh
source .bashrc

Side note here, if you install Miniconda and have trouble executing conda, most likely it didn’t add the executable path to your PATH variables.

This should add the path for both your user and root:

echo "export PATH='/usr/local/miniconda/bin:$PATH'" &>> /home/my_user/.bashrc
echo "export PATH='/usr/local/miniconda/bin:$PATH'" &>> /root/.bashrc

You will need to make sure to reload your shell (log out and back in or run source .bashrc) after adding the conda path.

As of this writing Tensorflow only supports Python as late as 3.6, while Miniconda sets up your environment to use 3.7. To rectify this we can set Python to 3.6.8 by using the Miniconda installer conda.

conda install -y -vv python=3.6.8

Also, we need to install a few Python packages.

conda install -y -vv tensorflow scikit-learn keras pandas

Ok, one last important step: Reboot and log back in.

sudo reboot now

Create MongoDB Tokenizer Collection

Here’s where we get clever. We are trying to fit our model into less than 1GB of RAM, to do this, we are going to need to find a way to access the word-embeddings’ index2word and word2index lookup objects without loading them in RAM, like we did in training. We are going to shove them into a database to be loaded into RAM only when a specific word is needed.

Disk access is slower, but hey! I don’t want to pay $40 a month for a hobby server, do you?

To move the word-embeddings will take a few steps. First, we’ll run a Python script to save the embeddings matching the context of our original training. Then, we will export those embeddings from our local MongoDB. Next, we’ll move them to the remote server and import them into the MongoDB there. Simple!

Install MongoDB Locally

To create the local word-embedding databases we will need to install MongoDB locally. This could vary based upon your OS. I’ve used homebrew to install on the Mac.

  • https://brew.sh/

Here are instructions on installing MongoDB on the Mac:

Don’t forget you’ll need to start the MonogDB service before starting the next step.

On the Mac, using Homebrew, it can be started with:

brew services start mongodb

Create a Word Embedding Database

Once you’ve installed it locally, here’s the script I used to convert the word_embeddings into a MongoDB database. It loads the word-embeddings using gensim, tokenizes them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 22 05:19:35 2019
@author: cthomasbrittain
"""
import pymongo
import gensim.downloader as api
import pandas as pd
from keras.preprocessing.text import Tokenizer

# Convenience Macros
word_embedding_name = "glove-wiki-gigaword-50"

BASE_DIR = '/path/to/embeddings'
TRAIN_TEXT_DATA_DIR = BASE_DIR + 'train.csv'
MAX_NUM_WORDS = 20000

# Load embeddings
info = api.info() # show info about available models/datasets
embedding_model = api.load(word_embedding_name) # download the model and return as object ready for use

vocab_size = len(embedding_model.vocab)

index2word = embedding_model.index2word
word2idx = {}
for index in range(vocab_size):
    word2idx[embedding_model.index2word[index]] = index
    
# Get labels
print('Loading Toxic Comments data.')
with open(TRAIN_TEXT_DATA_DIR) as f:
    toxic_comments = pd.read_csv(TRAIN_TEXT_DATA_DIR)

# Convert Toxic Comments to Sequences
print('Processing text dataset')

tokenizer = Tokenizer(num_words=MAX_NUM_WORDS)
tokenizer.fit_on_texts(toxic_comments['comment_text'].fillna("DUMMY_VALUE").values)
sequences = tokenizer.texts_to_sequences(toxic_comments['comment_text'].fillna("DUMMY_VALUE").values)
word_index = tokenizer.word_index

# Save Embeddings to MongoDB
mong = pymongo.MongoClient('127.0.0.1', 27017)

# Create collection database
mongdb = mong["word_embeddings"]

# Create this word_embeddings 
coll = mongdb[word_embedding_name]

for i, word in enumerate(index2word):
    if i % 1000 == 0:
        print('Saved: ' + str(i) + ' out of ' + str(len(index2word)))
    try:
        embedding_vector = list(map(str, embedding_model.get_vector(word)))
        post = {
                'word': word,
                'index': word_index[word],
                'vector': list(embedding_vector)
         }
        posts = coll.posts
        post_id = posts.insert_one(post).inserted_id
    except:
        continue

One note here, you could set the database directly to your remote. However, I found saving the >2 GB enteries one at a time across a 38.8bps SSH connection took most of the day. So, I’ve opted to create them locally and then copy them in bulk.

Install MongoDB Remote Server

MongoDB has license with some strict redistribution clauses. Most distros no longer include it in the package repos. However, MongoDB has several distro repos of their own–luckily, REHL and Centos are included. But not Arch Linux? Really? :|

Ok, to install MongoDB from the private repo we need to add it to the local repo addresses.

We can create the file by typing:

sudo nano /etc/yum.repos.d/mongodb-org-4.0.repo

One word of caution, the following text was copied from the MongoDB website.

It’s probably best to copy the repo information directly from the link above, in case there is a newer version.

Or, here’s what I put in the file:

[mongodb-org-4.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/4.0/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc

Save the file.

Run

sudo yum install -y mongodb-org

Yum should now find the private repo and install MongoDB.

Setup MongoDB

We need to enable the mongod.service.

sudo systemctl enable mongod.service

And reboot

sudo reboot now

I’ll be setting up MongoDB to only for local access. This enables it to be accessed by our Flask program, but not remotely. This is a best practice in securing your server. However, if you’d like to enable remote access to the MongoDB I’ve included instructions in the Appendix.

Move the Model to Server

Since we trained the model locally, let’s move it to the server. Open your terminal in the directory where the model was stored.

scp toxic_comment_detector.h5 my_user@my_server_ip:/home/my_user

Replace my_user with the user name we created earlier and my_server_ip with the address of your server. It should then prompt you to enter the server password, as if you were ssh’ing into the server. Once entered, the model should be copied to the server.

Move word_embeddings Database to Server

Once ou’ve created the local word_embeddings DB, at local the terminal type the following to make a copy:

mongodump --out /directory_to_save

Now, copy this DB backup to your remote server

scp -r /directory_to_save/name_of_output_folder user_name@remote_ip_address:/home/user_name/

Now, log in to your remote server and create a DB from the data dumps.

mkdir /home/user_name/word_embeddings
mongorestore --db word_embeddings /home/user_name/word_embeddings

We also need to restart the MongoDB service

sudo systemctl restart mongod.service

If you would like to enable access to the database remotely (see instructions in Appendix) you could use Robo3T to make sure everything is in place. But if you didn’t get any errors, we’re probably good to go.

Test the Model

Log into your server. We are going to test the model, since it needs to fit in the RAM available. The my_user in the script should be replaced with the user name you created while setting up your server and proejct.

Type:

python

Now, enter the following into the Python interpreter.

from keras.models import load_model
model = load_model('/home/my_user/toxic_comment_detector.h5')

If all goes well it will mention it’s using the Tensorflow backend and return you to the interpreter prompt.

If you trained your network like me, then the following will allow you to fully test the model deployed remotely.

import numpy as np
test_prediction = np.array([[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 1873,147,6, 3476,324, 15, 29,141]])
model.predict(test_prediction)

If you get back something similar to:

array([[0.97645617, 0.21598859, 0.92201746, 0.01897666, 0.7753273,
0.11565485]], dtype=float32)

We’re in good shape. These are the predictions for the the following respectively:

["toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate"]

The test_prediction was the following text sequence pre-encoded.

"C*#%`SUCKER BEFORE YOU PISS AROUND ON MY WORK"

So, the toxic and obscene label should definitely be close to 1. Looks like we’re in good shape.

In the next article I’ll show how to create a Flask webservice to access the model. Well, at least I hope, not sure how to do that yet.

Appendix

Arch Linux Miniconda Setup

sudo pacman -Syu
sudo pacman -S git wget tk valgrind gcc make
adduser -m user_name
passwd user_name
EDITOR=nano visudo
(add user_name to sudo)
su user_name

wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
source .bashrc
conda install keras h5py pillow flask numpy gensim pandas scikit-learn matplotlib
conda install tensorflow=1.8

Setup MongoDB on Arch Linux

Apparently MongoDB’s license change means the Arch Linux official repos cannot distribute it. So, we have to compile from source. Waaawaaah.

Note, it took more than 1GB of RAM to compile from source.

  • https://lists.archlinux.org/pipermail/arch-dev-public/2019-January/029430.html
  • https://techcrunch.com/2018/10/16/mongodb-switches-up-its-open-source-license/
sudo pacman -S fakeroots automake autoconf gcc make snappy \ 
            yaml-cpp lsb-release  gperftools \
            libstemmer scons python2-setuptools python2-regex \
            python2-cheetah python2-typing python2-requests \
            python2-yaml python2-pymongo 
git clone https://aur.archlinux.org/wiredtiger.git
cd wiredtiger
makepkg -i
git clone https://aur.archlinux.org/mongodb.git
cd mongodb
makepkg -i

Enabling Remote Access to MongoDB

To enable remote connections edit the mongod.conf file:

sudo nano /etc/mongod.conf

Find the following lines in the file and comment out bindIp.

Your file should look like this:

# network interfaces
net:
  port: 27017
  #bindIp: 127.0.0.1  # Enter 0.0.0.0,:: to bind to all IPv4 and IPv6 addresses or, alternatively, us$

This allows us to connect to the MongoDB from any IP address. If we’d left this line, then we could only connect to the database from within the server itself (127.0.0.1 = local).

Monitoring System Resources

I like using htop for this, but you’ve gotta build it from source on Centos

wget dl.fedoraproject.org/pub/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm
sudo rpm -ihv epel-release-7-11.noarch.rpm
sudo yum install -y htop
Training a Toxic Comment Detector

I’m writing learning-notes from implementing a “toxic comment” detector using a convolutional neural network (CNN). This is a common project across the interwebs, however, the articles I’ve seen on the matter leave a few bits out. So, I’m attempting to augment public knowledge–not write a comprehensive tutorial.

A common omission is what the data look like as they travel through pre-processing. I’ll try to show how the data look before falling into the neural-net black-hole. However, I’ll stop short before reviewing the CNN setup, as this is explained much better elsewhere. Though, I’ve put all the original code, relevant project links, tutorial links, and other resources towards the bottom.

The Code

Code: Imports

from __future__ import print_function

import numpy as np
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.layers import Dense, Input, GlobalMaxPooling1D, Conv1D, Embedding, MaxPooling1D
from keras.models import Model
from keras.initializers import Constant
import gensim.downloader as api
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score

The above code includes several packages which would need to be downloaded. The easiest way is to use pip.

pip install keras
pip install gensim
pip install pandas

Code: Variables

BASE_DIR = 'your project directory'
TRAIN_TEXT_DATA_DIR = BASE_DIR + 'train.csv'
MAX_SEQUENCE_LENGTH = 100
MAX_NUM_WORDS = 20000
EMBEDDING_DIM = 300
VALIDATION_SPLIT = 0.2

The above variables define the preprocessing actions and the neural-network.

TRAIN_TEXT_DATA_DIR

The directory containing the data file train.csv

MAX_SEQUENCE_LENGTH

The toxic_comment data set contains comments collected from Wikipedia. MAX_SEQUENCE_LENGTH is used in the preprocessing stages to truncate a comment if too long. That is, greater than MAX_SEQUENCE_LENGTH. For example, a comment like:

You neeed to @#$ you mother!$@#$&...

Probably doesn’t need much more for the network to discern it’s a toxic comment. Also, if we create the network based around the longest comment, it will become unnecessarily large and slow. Much like the human brain (See Overchoice), we need to provide as little information as needed to make a good decision.

MAX_NUM_WORDS

This variable is the maximum number of words to include–or, vocabulary size.

Much like truncating the sequence length, the maximum vocabulary should not be overly inclusive. The number 20,000 comes from a “study” stating an average person only uses 20,000 words. Of course, I’ve not found a primary source stating this–not saying it’s not out there, but I’ve not found it yet. (Halfhearted search results in the appendix.)

Regardless, it seems to help us justify keeping the NN nimble.

EMBEDDING_DIM

In my code, I’ve used gensim to download pre-trained word embeddings. But beware, not all pre-trained embeddings have the same number of dimensions. This variables defines the size of the embeddings used. Please note, if you use embeddings other than glove-wiki-gigaword-300 you will need to change this variable to match.

VALIDATION_SPLIT

A helper function in Keras will split our data into a test and validation. This percentage represents how much of the data to hold back for validation.

Code: Load Embeddings

print('Loading word vectors.')
# Load embeddings
info = api.info()
embedding_model = api.load("glove-wiki-gigaword-300")

The info object is a list of gensim embeddings available. You can use any of the listed embeddings in the format api.load('name-of-desired-embedding'). One nice feature of gensim’s api.load is it will automatically download the embeddings from the Internet and load them into Python. Of course, once they’ve been downloaded, gensim will load the local copy. This makes it easy to experiment with different embedding layers.

Code: Process Embeddings

index2word = embedding_model.index2word
vocab_size = len(embedding_model.vocab)
word2idx = {}
for index in range(vocab_size):
    word2idx[index2word[index]] = index

The two dictionaries index2word and word2idx are key to embeddings.

The word2idx is a dictionary where the keys are the words contained in the embedding and the values are the integers they represent.

word2idx = {
    "the": 0,
    ",": 1,
    ".": 2,
    "of": 3,
    "to": 4,
    "and": 5,
    ....
    "blah": 12984,
    ...
}  

index2word is a list where the the values are the words and the word’s position in the string represents it’s index in the word2idx.

index2word = ["the", ",", ".", "of", "to", "and", ...]

These will be used to turn our comment strings into integer vectors.

After this bit of code we should have three objects.

  1. embedding_model – Pre-trained relationships between words, which is a matrix 300 x 400,000.
  2. index2word – A dictionary containing key-value pairs, the key being the word as a string and value being the integer representing the word. Note, these integers correspond with the index in the embedding_model.
  3. word2idx – A list containing all the words. The index corresponds to the word’s position in the word embeddings. Essentially, the reverse of the index2word.

Code: Get Toxic Comments Labels

print('Loading Toxic Comments data.')
with open(TRAIN_TEXT_DATA_DIR) as f:
    toxic_comments = pd.read_csv(TRAIN_TEXT_DATA_DIR)

print('Getting Comment Labels.')
prediction_labels = ["toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate"]
labels = toxic_comments[prediction_labels].values

This loads the toxic_comment.csv as a Pandas dataframe called toxic_comments. We then grab all of the comment labels using their column names. This becomes a second a numpy matrix called labels.

We will use the text in the toxic_comments dataframe to predict the data found in the labels matrix. That is, toxic_comments will be our x_train and labels our y_train.

You may notice, the labels are also included in our toxic_comments. But they will not be used, as we will only be taking the comment_text column to become our sequences here in a moment.

toxic_comments dataframe

  id comment_text toxic severe_toxic obscene threat insult identity_hate
5 00025465d4725e87 Congratulations from me as well, use the tools well. · talk 0 0 0 0 0 0
6 0002bcb3da6cb337 COCKSUCKER BEFORE YOU PISS AROUND ON MY WORK 1 1 1 0 1 0
7 00031b1e95af7921 Your vandalism to the Matt Shirvington article has been reverted. Please don’t do it again, or you will be banned. 0 0 0 0 0 0

labels (y_train) numpy matrix

0 1 2 3 4 5
0 0 0 0 0 0
1 1 1 0 1 0
0 0 0 0 0 0
0 0 0 0 0 0

Code: Convert Comments to Sequences

print('Tokenizing and sequencing text.')

tokenizer = Tokenizer(num_words=MAX_NUM_WORDS)
tokenizer.fit_on_texts(toxic_comments['comment_text'].fillna("<DT>").values)
sequences = tokenizer.texts_to_sequences(toxic_comments['comment_text'].fillna("<DT>").values)
word_index = tokenizer.word_index

print('Found %s sequences.' % len(sequences))

The Tokenizer object comes from the Keras API. It takes chunks of texts cleans them and then converts them to unique integer values.

The num_words argument tells the Tokenizer to only preserve the word frequencies higher than this threshold. This makes it necessary to run the fit() on the targeted texts before using the Tokenizer. The fit function will determine the number of occurrences each word has throughout all the texts provided, then, it will order these by frequency. This frequency rank can be found in the tokenizer.word_index property.

For example, looking at the dictionary below, if num_words = 7 all words after “i” would be excluded.

{
    "the": 1,
    "to": 2,
    "of": 3,
    "and": 4,
    "a": 5,
    "you": 6,
    "i": 7,
    "is": 8,
    ...
    "hanumakonda": 210334,
    "956ce": 210335,
    "automakers": 210336,
    "ciu": 210337
}

Also, as we are loading the data, we are filling any missing values with a dummy token (i.e., “<DT>”). This probably isn’t the best way to handle missing values, however, given the amount of data, it’s probably best to try and train the network using this method. Then, come back and handle na values more strategically. Diminishing returns and all that.

Code: Padding

data = pad_sequences(sequences, maxlen=MAX_SEQUENCE_LENGTH)

This is an easy one. It pads our sequences so they are all the same length. The pad_sequences function is part of the Keras library. A couple of important arguments have default values: padding and truncating.

Here’s the Keras docs explanation:

padding: String, ‘pre’ or ‘post’: pad either before or after each sequence.

truncating: String, ‘pre’ or ‘post’: remove values from sequences larger than maxlen, either at the beginning or at the end of the sequences.

Both arguments default to pre.

Lastly, the maxlen argument controls where padding and truncation happen. And we are setting it with our MAX_SEQUENCE_LENGTH variable.

padding-sequences-before-after

Code: Applying Embeddings

num_words = min(MAX_NUM_WORDS, len(word_index)) + 1
embedding_matrix = np.zeros((num_words, EMBEDDING_DIM))
for word, i in word_index.items():
    try:
        embedding_vector = embedding_model.get_vector(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
    except:
        continue

Here’s where stuff gets good. The code above will take all the words from our tokenizer, look up the word-embedding (vector) for each word, then add this to the embedding matrix. The embedding_matrix will be converted into a keras.layer.Embeddings object.

I think of an Embedding layer as a transformation tool sitting at the top of our neural-network. It takes the integer representing a word and outputs its word-embedding vector. It then passes the vector into the neural-network. Simples!

Probably best to visually walk through what’s going on. But first, let’s talk about the code before the for-loop.

num_words = min(MAX_NUM_WORDS, len(word_index)) + 1

This gets the maximum number of words to be addeded in our embedding layer. If it is less than our “average English speaker’s vocabulary”–20,000–we’ll use all of the words found in our tokenizer. Otherwise, the for-loop will stop after num_words is met. And remember, the tokenizer has kept the words in order of their frequency–so, the words which are lost aren’t as critical.

embedding_matrix = np.zeros((num_words, EMBEDDING_DIM))

This initializes our embedding_matrix, which is a numpy object with all values set to zero. Note, if the EMBEDDING_DIM size does not match the size of the word-embeddings loaded, the code will execute, but you will get a bad embedding matrix. Further, you might not notice until your network isn’t training. I mean, not that this happened to me–I’m just guessing it could happen to someone.

for word, i in word_index.items():
    try:
        embedding_vector = embedding_model.get_vector(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector
    except:
        continue

Here’s where the magic happens. The for-loop iterates over the words in the tokenizer object word_index. It attempts to find the word in word-embeddings, and if it does, it adds the vector to the embedding matrix at a row respective to its index in the word_index object.

Confused? Me too. Let’s visualize it.

Let’s walk through the code with a word in mind: “of”.

for word, i in word_index.items():

By now the for-loop is two words in. The words “the” and “to” have already been added. Therefore, for this iteration word = ‘of’ and i = 2.

embedding_vector = embedding_model.get_vector(word)

The the word-embedding for the word “of” is

-0.076947, -0.021211, 0.21271, -0.72232, -0.13988, -0.12234, ...

This list is contained in a numpy.array object.

embedding_matrix[i] = embedding_vector

Lastly, the word-embedding vector representing “of” gets added to the third row of the embedding matrix (the matrix index starts at 0).

Here’s how the embedding matrix should look after the word “of” is added. (The first column added for readability.)

word 1 2 3 4
the 0 0 0 0
to 0.04656 0.21318 -0.0074364 -0.45854
of -0.25756 -0.057132 -0.6719 -0.38082

Also, for a deep visualization, check the image above. The picture labeled “word embeddings” is actually the output of our embedding_matrix. The big difference? The word vectors in the gensim embedding_model which are not found anywhere in our corpus (all the text contained in the toxic_comments column) have been replaced with all zeroes.

embedding-matrix

Code: Creating Embedding Layer

embedding_layer = Embedding(len(word2idx),
                            EMBEDDING_DIM,
                            embeddings_initializer=Constant(embedding_matrix),
                            input_length=MAX_SEQUENCE_LENGTH,
                            trainable=False)

Here we are creating the first layer of our NN. The primary parameter passed into the Keras Embedding class is the embedding_matrix, which we created above. However, there are several other attributes of the embedding_layer we must define. Keep in mind our embedding_layer will take an integer representing a word as input and output a vector, which is the word-embedding.

First, the embedding_layers needs to know the input dimensions. The input dimension is the number of words we are considering for this training session. This can be found by taking the length of our word2idx object. So, the len(word2idx) returns the total number of words to consider.

One note on the layer’s input, there are two “input” arguments for keras.layers.Embedding class initializer, which can be confusing. They are input and input_length. The input is the number of possible values provided to the layer. The input_length is how many values will be passed in a sequence.

Here are the descriptions from the Keras documentation:

input

int > 0. Size of the vocabulary, i.e. maximum integer index + 1.

input_length

Length of input sequences, when it is constant. This argument is required if you are going to connect Flatten then Dense layers upstream (without it, the shape of the dense outputs cannot be computed).

In our case, the input will be the vocabulary size and input_length is the number of words in a sequence, which should be MAX_SEQUENCE_LENGTH. This is also why we padded comments shorter than MAX_SEQUENCE_LENGTH, as the embedding layer will expect a consistent size.

Next, the embedding_layers needs to know the dimensions of the output. The output is going to be a word-embedding vector, which should be the same size as the word embeddings loaded from the gensim library.
We defined this size with the EMBEDDING_DIM variable.

Lastly, the training option is set to False so the word-embedding relationships are not updated as we train our toxic_comment detector. You could set it to True, but come on, let’s be honest, are we going to be doing better than Google?

Code: Splitting the Data

nb_validation_samples = int(VALIDATION_SPLIT * data.shape[0])
x_train = data[:-nb_validation_samples]
y_train = labels[:-nb_validation_samples]
x_val = data[-nb_validation_samples:]
y_val = labels[-nb_validation_samples:]

Here we are forming our data as inputs. We convert the data into x_train and x_val. The labels dataframe becomes y_train and y_val. And here marks the end of pre-processing.

But! Let’s recap before you click away:

  1. Load the word-embeddings. These are pre-trained word relationships. It is a matrix 300 x 400,000.
  2. Create two look up objects: index2word and word2idx
  3. Get our toxic_comment and labels data.
  4. Convert the comments column from toxic_comments dataframe into the sequences list.
  5. Create a tokenizer object and fit it to the sequences text
  6. Pad all the sequences so they are the same size.
  7. Look up the word-embedding vector for each unique word in sequences. Store the word-embedding vector in thembedding_matrix. If the word is not found in the embeddings, then leave the index all zeroes. Also, limit the embedding-matrix to the 20,000 most used words.
  8. Create a Keras Embedding layer from the embedding_matrix
  9. Split the data for training and validation.

And that’s it. The the prepared embedding_layer will become the first layer in the network.

Code: Training

Like I stated at the beginning, I’m not going to review training the network, as there are many better explanations–and I’ll link them in the Appendix. However, for those interested, here’s the rest of the code.

input_ = Input(shape=(MAX_SEQUENCE_LENGTH,))
x = embedding_layer(input_)
x = Conv1D(128, 5, activation='relu')(x)
x = MaxPooling1D(5)(x)
x = Conv1D(128, 5, activation='relu')(x)
x = MaxPooling1D(5)(x)
x = Conv1D(128, 3, activation='relu')(x)
x = GlobalMaxPooling1D()(x)
x = Dense(128, activation='relu')(x)
output = Dense(len(prediction_labels), activation='sigmoid')(x)
model = Model(input_, output)
model.compile(loss='binary_crossentropy',
              optimizer='rmsprop',
              metrics=['acc'])

print('Training model.')
# happy learning!
history = model.fit(x_train, y_train, epochs=2, batch_size=512, validation_data=(x_val, y_val))

Oh! There’s one more bit I’d like to go over, which most other articles have left out. Prediction.

Code: Predictions

I mean, training a CNN is fun and all, but how does one use it? Essentially, it comes down to repeating the steps above, but with with less data.

def create_prediction(model, sequence, tokenizer, max_length, prediction_labels):
    # Convert the sequence to tokens and pad it.
    sequence = tokenizer.texts_to_sequences(sequence)
    sequence = pad_sequences(sequence, maxlen=max_length)

    # Make a prediction
    sequence_prediction = model.predict(sequence, verbose=1)

    # Take only the first of the batch of predictions
    sequence_prediction = pd.DataFrame(sequence_prediction).round(0)

    # Label the predictions
    sequence_prediction.columns = prediction_labels
    return sequence_prediction

# Create a test sequence
sequence = ["""
            Put your test sentence here.
            """]
prediction = create_prediction(model, sequence, tokenizer, MAX_SEQUENCE_LENGTH, prediction_labels)

The function above needs the following arguments:

  • The pre-trained model. This is the Keras model we just trained.
  • A sequence you’d like to determine whether it is “toxic”.
  • The tokenizer, which is used to encode the prediction sequence the same way as the training sequences.
  • max_length must be the same as the maximum size of the training sequences
  • The prediction_labels are a list of strings containing the human readable labels for the predicted tags (e.g. “toxic”, “severe_toxic”, “insult”, etc.)

Really, the function takes all the important parts of our pre-processing and reuses them on the prediction sequence.

One piece of the function you might tweak is the .round(0). I’ve put this there to convert the predictions into binary. That is, if prediction for a sequence is .78 it is rounded up to 1. This is do to the binary nature of the prediction. Either a comment is toxic or it is not. Either 0 or 1.

Well, that’s what I got. Thanks for sticking it out. Let me know if you have any questions.

Appendix

Full Code

Tutorials

If you want to know more about gensim and how it can be used with Keras.

Data

The data are hosted by Kaggle.

Please note, you will have to sign-up for a Kaggle account.

Average Person’s Vocabulary Size

Primary sources on vocabulary size:

Distributing Machine Learning Jobs

Boss

A human sends machine learning job to the Boss. A Job is JSON object containing the the desired machine learning script and the parameters needed for successful execution. The Boss stores the Job and Creates an Order. The Order is another JSON object representing the state of a requested Job.

         Job #4
 0                        Boss
/|\ +----------------->   ____
/ \                       +""+
                          +__+
                         [ ==.]`)
                   +----+====== 0 +--+
                   +                 |
                Order #3           Job #3
                   |                 |
                Order #2           Job #2
                   |                 |
                Order #1           Job #1

Worker

The Worker uses node-scheduler to fire an HTTP request to the Boss letting it know the Worker is “bored.” The Boss will then search through the Orders for the oldest unassigned Order, if it finds one, it will return this Order to the Worker as a JSON object. At this point, the Boss updates the Order’s status to “assigned.”

The Worker sends another HTTP request, this time requesting the Job information associated with the Order the Boss had assigned.

          Boss
          ____
          +""+
          +__+
         [ ==.]`)
   +----+====== 0 +--+
   +                 +            If the Boss finds an unassigned
Order #3           Job #3         Order it is returned. The worker requests the
   +                 +            related Job. The Boss updates the
Order #2           Job #2         the Order status to "assigned"
   +                 +                   Worker
Order #1           Job #1<-+              ____
  ^                        +----------->  +""+
  |                                       +__+
  +------------------------------------+ [ ==.]`)
          The worker checks with
          the boss periodically
          for the oldest submitted
          Order.

The worker passes the Job information into the appropriate machine learning Python script via stdout. The script is executed and whether successful or not, an Outcome object is passed back to the Worker Node through stdout.

Worker
 ____
 +""+     Job #1
 +__+ +--------------->  Python Script
[ ==.]                         +
  ^                            |
  |                            |
  |                            v
  +------------------------ Outcome #1

The Worker then makes a callback API call and passes the Outcome object to the Boss to be stored in the database

          Boss                                Worker
          ____                                 ____
          +""+                                 +""+
          +__+                                 +__+
         [ ==.]`)                             [ ==.]`)
   +----+====== 0 +------+                       +
   |         |           |                       |
Order #3   Job #3     Outcome #1 <---------------+
   |         |
Order #2   Job #2
   |         |
Order #1   Job #1

MongoDB on Mac

brew install mongodb
nano /usr/local/etc/mongod.conf

Your file should look something like this

systemLog:
  destination: file
  path: /usr/local/var/log/mongodb/mongo.log
  logAppend: true
storage:
  dbPath: /usr/local/var/mongodb
net:
  bindIp: 127.0.0.1

Change the dbPath to where you’d like Mongo to store your databases. Then, start and enable Mongo with brew’s services.

brew services start mongo

Sample Objects

Order

{
    "_id" : "5bcc93d67f0b3f4844c87c7a",
    "jobId" : "5bcc93d67f0b3f4844c87c79",
    "createdDate" : ISODate("2018-10-21T14:57:26.980Z"),
    "status" : "unassigned",
}

Job

{
    "_id" : ObjectId("5bcc93d67f0b3f4844c87c79"),
    "hiddenLayers" : [ 
        {
            "activation" : "relu",
            "widthModifier" : 4,
            "dropout" : 0.2
        }, 
        {
            "activation" : "relu",
            "widthModifier" : 2.3,
            "dropout" : 0.2
        }, 
        {
            "activation" : "relu",
            "widthModifier" : 1.3,
            "dropout" : 0.2
        }
    ],
    "dataFileName" : "wine_data.csv",
    "scriptName" : "nn.py",
    "projectName" : "wine_data",
    "depedentVariable" : "con_lot",
    "crossValidateOnly" : true,
    "crossValidationCrossingType" : "neg_mean_squared_error",
    "batchSize" : 100000,
    "epochs" : 3000,
    "patienceRate" : 0.05,
    "slowLearningRate" : 0.01,
    "loss" : "mse",
    "pcaComponents" : -1,
    "extraTreesKeepThreshd" : 0,
    "saveWeightsOnlyAtEnd" : false,
    "optimizer" : "rmsprop",
    "lastLayerActivator" : "",
    "learningRate" : 0.05,
    "l1" : 0.1,
    "l2" : 0.1,
    "minDependentVarValue" : 0,
    "maxDependentVarValue" : 1500,
    "scalerType" : "standard",
}

Outcomes

{
    "_id" : ObjectId("5bcc88fa7f0b3f4844c87c78"),
    "status" : 200,
    "jobId" : "5bcc724d7449f746b5aa6fe8",
    "loss" : 15109.168650257,
    "metric" : 14281.4453526111,
}

Code

Worker

server.js

var express = require('express');
var bodyParser = require('body-parser');
var pythonRunner = require('./preprocessing-services/python-runner');
var schedule = require('node-schedule');
var axios = require('axios');
var fs = require('fs');
var {Worker} = require('./worker/worker');

// Get Worker Node configuration
var fs = require('fs');
var config = JSON.parse(fs.readFileSync('./python-scripts/worker-node-configure.json', 'utf8'));

if(!config) { 
    console.log('No configuration file found.')
    process.exit();
}

// Boss' address
bossAddress = config.bossAddress;
nodeName = config.nodeName;
console.log(`Boss's address is ${bossAddress}`);
console.log(`This worker's name is ${nodeName}`);

var worker = new Worker('bored');

// Start server and add Middleware
var app = express();
const port = 3000;
app.use(bodyParser.json())

// Start checking for Boredom
var j = schedule.scheduleJob('*/1 * * * *', function(){
    if (worker.status === 'bored') {
        console.log('Worker is bored.');
        axios({
            method: 'post',
            url: bossAddress + `/bored/${nodeName}`
        }).then((response) => {
            let orderId = response.data._id
            let jobId = response.data.jobId;
            console.log(`Boss provided jobID #${jobId}`);
            axios({
                method: 'get',
                url: bossAddress + `/retrieve/job/${jobId}`
            }).then((response) => {
                let job = response.data;
                console.log(`Worker found the details for jobID #${jobId}`);
                job.callbackAddress = bossAddress;
                job.assignmentId = orderId;
                pythonRunner.scriptRun(job, worker)
                .then((response) => {
                    console.log('Worker started job, will let Boss know when finished.');
                });
            }).catch((error) => {
                console.log(error);
            });
        }).catch((error) => {
            console.log('Failed to find new job.')
        });
    }
});

// Python script runner interface
app.post('/scripts/run', (req, res) => {
    try {
        let pythonJob = req.body;
        pythonRunner.scriptRun(pythonJob)
        .then((response) => {
            console.log(response);
            res.send(response);
        });
    } catch (err) {
        res.send(err);
    }
});

app.listen(port, () => {
    console.log(`Started on port ${port}`);
});

python-runner.js

let {PythonShell} = require('python-shell')
var fs = require('fs');
var path = require('path');
var axios = require('axios');

var scriptRun = function(pythonJob, worker){
    console.log(worker);
    worker.status = 'busy';
    return new Promise((resolve, reject) => {
        try {
            let callbackAddress = pythonJob.callbackAddress;
            let options = {
                mode: 'text',
                pythonOptions: ['-u'], // get print results in real-time
                scriptPath: path.relative(process.cwd(), 'python-scripts/'),
                args: [JSON.stringify(pythonJob)]
            };
            PythonShell.run(pythonJob.scriptName, options, function (err, results) {
                if (err) throw err;
                try {
                    result = JSON.parse(results.pop());
                    if(result) {
                        console.log(callbackAddress + '/callback')
                        axios({
                            method: 'post',
                            url: callbackAddress + '/callback',
                            data: result
                        }).then((response) => {
                            console.log(`Worker let let the Boss know job is complete.`);
                            worker.status = 'bored';
                        }).catch((error) => {
                            worker.status = 'bored'
                        });
                    } else {
                        worker.status = 'bored'
                    }
                } catch (err) {
                   worker.status = 'bored'
                }
            });
            resolve({'message': 'job started'});
        } catch (err) {
            reject(err)
            worker.status = 'bored'
        }
    });
}
module.exports = {scriptRun}

Boss

server.js

const express = require('express');
const bodyParser = require('body-parser');
const axios = require('axios');
var timeout = require('connect-timeout')

const {mongoose} = require('./backend/database-services/dl-mongo');
const workerNode = require('./backend/services/worker-node');
const work = require('./backend/services/work');

// Database collection
var {Job} = require('./backend/database-services/models/job');
var {Order} = require('./backend/database-services/models/order');


const bossAddress = 'http://maddatum.com'

// Server setup.
var app = express();
const port = 3000;

// Add request parameters.
app.use((req, res, next) => {
    res.setHeader('Access-Control-Allow-Origin', '*');
    res.setHeader('Access-Control-Allow-Headers', 
                  'Origin, X-Requested-With, Content-Type, Accept'); 
    res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, PATCH, DELETE, OPTIONS');
    next();
});

// Add the middleware.
app.use(bodyParser.json())

/*
This route is for creating new Jobs on the queue
*/
app.post('/job/:method', (req, res) => {
    if (!req.body) { return { 'message': 'No request provided.' }};
    try {
        switch (req.params.method) {
            case 'create':
                work.create(req.body)
                .then((response) =>{
                    res.send(response);
                }).catch((error) => {
                    res.send({'error': error })
                });
                break;
            default:
                res.send({'error': 'No method selected.'})
        }
    } catch (err) {
        res.send({'error': 'Error with request shape.', err})
    }
});

/*
This route is for adding new WorkerNodes to the database.
*/
app.post('/worker-node/:method', (req, res) => {
    if (!req.body) { return { 'message': 'No request provided.' }};
    try {
        switch (req.params.method) {
            case 'create':
                workerNode.create(req.body)
                .then((response) =>{
                    res.send(response);
                }).catch((error) =>{
                    res.send({'error': error.message});
                })
            break;
            default:
                throw err;
        }
    } catch (err) {
        res.send({'error': 'Error with request shape.', err })
    }
});

app.post('/callback', (req, res) => {
    if (!req.body) { return { 'message': 'No request provided.' }};
    let outcome = req.body;
    console.log(outcome);
    try {
        work.file(outcome)
        .then((response) =>{
            console.log(response);
            res.send(response);
        })
    } catch (err) {
        res.send({'error': 'Error with request shape.', err })
    }
});

/*
Route for Worker Node to let the Boss know it needs a Job.
The oldest Job which is unassigned is provided.
*/
app.post('/bored/:id', (req, res) => {
    if (!req.body) { return { 'message': 'No request provided.' }};
    try {
        let workerNodeId = req.params.id;
        console.log(`${workerNodeId} said it's bored.`);
        if (!workerNodeId) { throw {'error': 'No id provided.'}}
        Order.findOne({ status: 'unassigned' }, {}, { sort: { 'created_at' : -1 } }, (err, order) => {
            console.log(`Found a work order, #${order._id}`)
            order.status = 'assigned';
            console.log(`Provided ${workerNodeId} with ${order.jobId}`);
            order.save()
            .then((doc) => {
                console.log(`Updated the Order #${doc.id}'s status to ${order.status}`);
                res.send(doc);
            });
        })
        .catch((err) => {
            res.send({'message': `No work to do.  Don't get used to it.`})
        });
    } catch (err) {
        res.send({'error': 'Error with request shape.', err })
    }
});

/*
Retrieve Orders or Job
*/
app.get('/retrieve/:type/:id?/:param1?', (req, res) => {
    if (!req.body) { return { 'message': 'No request provided.' }};
    try {
        let type = req.params.type;
        let id = req.params.id;
        let param1 = req.params.param1;
        switch(type) {
            case 'order':
                Order.find().then((response) => {
                    res.send(response);
                });
                break;
            case 'job':
                if (!id)  { throw {'error': 'Missing Id'} }
                Job.findOne({'_id': id })
                .then((response) => {
                    res.send(response);
                });
                break;
            default:
                throw error
        }
    } catch (err) {
        res.send({'error': 'Error with request shape.', err })
    }
})

app.listen(port, () => {
    console.log(`Started on port ${port}`);
});
Using Python, NodeJS, Angular, and MongoDB to Create a Machine Learning System

I’ve started designing a system to manage data analysis tools I build.

  1. An illegitimate REST interface
  2. Interface for existing Python scripts
  3. Process for creating micro-services from Python scripts
  4. Interface for creating machine learning jobs to be picked up my free machines.
  5. Manage a job queue for work machines to systematically tackle machine learning jobs
  6. Data storage and access
  7. Results access and job meta data
  8. A way to visualize results

I’ve landed on a fairly complicated process of handling the above. I’ve tried cutting frameworks, as I know it’ll be a nightmare to maintain, but I’m not seeing it.

  • Node for creating RESTful interfaces between the HQ Machine and the Worker Nodes
  • Node on the workers to ping the HQ machine periodically to see if their are jobs to run
  • MongoDB on the HQ Machine to store the job results data, paths to datasets, and possibly primary data
  • Angular to interact with the HQ Node for creating job creation and results viewing UI.
  • ngx-datatables for viewing tabular results.
  • ngx-charts for viewing job results (e.g., visualizing variance and linearity )
  • Python for access to all the latest awesome ML frameworks
  • python-shell (npm) for creating an interface between Node and Python.

Utilizing all Machines in the House

Machine learning is a new world for me. But, it’s pretty dern cool. I like making machines do the hard stuff while I’m off doing other work. It makes me feel extra productive–like, “I created that machine, so any work it does I get credit for. And! The work I did while it as doing its work.” This is the reason I own two 3D-printers.

I’m noticing there is a possibility of utilizing old computers I’ve lying around the house for the same effect. The plan is to abstract a neural network script, install it on all the computers lying about, and create a HQ Computer where I can create a sets of hyperparameters passed to the Worker Nodes throughout the house.

Why? Glad I asked for you. I feel guilty there are computers used. There’s an old AMD desktop with a GFX1060 in it, a 2013 MacBook Pro (my son’s), and my 2015 MacBook Pro. These don’t see much use anymore, since my employer has provided an iMac to work on. They need to earn their keep.

How? Again, glad to ask for you. I’ll create a system to make deep-learning jobs from hyperparameter sets and send them to these idle machines, thus, trying to get them to solve problems while I’m working on paying the bills. This comes from the power of neural networks. They need little manual tweaking. You simply provide them with hyperparameters and let them run.

Here are the napkin-doodles:

+-Local------------------------------------------------------+
|                                                            |
|        ____                   ____      Each machine runs  |
|        |""|                   |""|      Node and Express   |
|  HQ    |__|             #1    |__|      server, creating   |
|       [ ==.]`)               [ ==.]`)   routes to Python   |
|       ====== 0               ====== 0   scripts using      |
|  The HQ machine runs          ____      stdin and stdout   |
|  Node and Express, but        |""|                         |
|  the routes are for     #2    |__|                         |
|  storing results in a        [ ==.]`)                      |
|  database.                   ====== 0                      |
|                               ____                         |
|                               |""|                         |
|                         #3    |__|        Worker           |
|                              [ ==.]`)     Nodes            |
|                              ====== 0                      |
|                                                            |
+------------------------------------------------------------+
+-Local------------------------------------------------------+
|                 Each worker Node checks         Workers    |
|        ____    with HQ on a set interval         ____      |
|        |""|       for jobs to run                |""|      |
|  HQ    |__|   <--------------------------+ #1    |__|      |
|       [ ==.]`)                                  [ ==.]`)   |
|       ====== 0                                  ====== 0   |
|       ^ |                                        ____      |
|       | |                                  #2    |""|      |
|       | +--------------------------------------->|__|      |
|       |             If there is a job, the      [ ==.]`)   |
|       |             Worker will send a GET      ====== 0   |
|       |              request for the job         ____      |
|       |                  parameters              |""|      |
|       |                                    #3    |__|      |
|       +-----------------------------------------[ ==.]`)   |
|         Once completed, the Worker updates HQ   ====== 0   |
|              with the job results.                         |
+------------------------------------------------------------+

Worker Nodes

The Worker Nodes code is pretty straightforward. It uses Node, Express, and python-shell to create a bastardized REST interface to create simple interactions between the HQ Node controlling the job queue.

Node Side

Here’s the proof-of-concept NodeJS code.

var express = require('express');
var bodyParser = require('body-parser');
var pythonRunner = require('./preprocessing-services/python-runner');

var app = express();
const port = 3000;

app.use(bodyParser.json())

// Python script runner interface
app.post('/scripts/run', (req, res) => {
    try {
        let pythonJob = req.body;
        pythonRunner.scriptRun(pythonJob)
        .then((response, rejection) => {
            res.send(response);
        });
    } catch (err) {
        res.send(err);
    }
});

app.listen(port, () => {
    console.log(`Started on port ${port}`);
});

The above code is a dead simple NodeJS server using Express. It is using body-parser middleware to shape JSON objects. The pythonJob object looks something like this (real paths names have been changed to help protect their anonymity).

{
    "scriptsPath": "/Users/hinky-dink/dl-principal/python-scripts/",
    "scriptName": "union.py",
    "jobParameters": {
    	"dataFileName": "",
        "dataPath": "/Users/hinky-dink/bit-dl/data/lot-data/wine_encoded/",
        "writePath": "/Users/hinky-dink/bit-dl/data/lot-data/wine_encoded/",
        "execution": {
        	"dataFileOne": "wine_2017_encoded.csv",
        	"dataFileTwo": "wine_2018_encoded.csv",
        	"outputFilename": "wine_17-18.csv"
        }
    }
}

Each of these attributes will be passed to the Python shell in order to execute data_prep.py. They are passed to the shell as system arguments.

Here’s the python-runner.js

let {PythonShell} = require('python-shell')
 
var scriptRun = function(pythonJob){    
    return new Promise((resolve, reject) => {
        console.log(pythonJob)
        try {
            let options = {
                mode: 'text',
                pythonOptions: ['-u'], // get print results in real-time
                scriptPath: pythonJob.scriptsPath,
                args: [pythonJob.jobParameters.dataFileName, 
                       pythonJob.jobParameters.dataPath, 
                       pythonJob.jobParameters.writePath,
                       JSON.stringify(pythonJob.jobParameters.execution)]
            };
            PythonShell.run(pythonJob.scriptName, options, function (err, results) {
                if (err) throw err;
                try {
                    result = JSON.parse(results.pop());
                    if(result) {
                        resolve(result);
                    } else {
                        reject({'err': ''})
                    }
                } catch (err) {
                    reject({'error': 'Failed to parse Python script return object.'})
                }
            });
        } catch (err) {
            reject(err)
        }
    });
}
module.exports = {scriptRun}

Python Side

Here’s the Python script in the above example. It is meant to detect what type of data is in a table. If it’s is continuous it leaves it alone (I’ll probably add normalization option as some point), if it is categorical, it converts it to a dummy variable. It then saves this encoded data on the Worker Node side (right now). Lastly, it returns a JSON string back to the node side.

"""
Created on Mon Jun 11 21:12:10 2018
@author: cthomasbrittain
"""

import sys
import json
#
filename = sys.argv[1]
filepath = sys.argv[2]
pathToWriteProcessedFile = sys.argv[3]

request = sys.argv[4]
request = json.loads(request)

try:
    cols_to_remove = request['columnsToRemove']
    unreasonable_increase = request['unreasonableIncreaseThreshold']
except:
    # If columns aren't contained or no columns, exit nicely
    result = {'status': 400, 'message': 'Expected script parameters not found.'}
    print(str(json.dumps(result)))
    quit()

pathToData = filepath + filename


# Clean Data --------------------------------------------------------------------
# -------------------------------------------------------------------------------

# Importing data transformation libraries
import pandas as pd

# The following method will do the following:a
#   1. Add a prefix to columns based upon datatypes (cat and con)
#   2. Convert all continuous variables to numeric (float64)
#   3. Convert all categorical variables to objects
#   4. Rename all columns with prefixes, convert to lower-case, and replace
#      spaces with underscores.
#   5. Continuous blanks are replaced with 0 and categorical 'not collected'
# This method will also detect manually assigned prefixes and adjust the 
# columns and data appropriately.  
# Prefix key:
# a) con = continuous
# b) cat = categorical
# c) rem = removal (discards entire column)

def add_datatype_prefix(df, date_to_cont = True):    
    import pandas as pd
    # Get a list of current column names.
    column_names = list(df.columns.values)
    # Encode each column based with a three letter prefix based upon assigned datatype.
    # 1. con = continuous
    # 2. cat = categorical
    
    for name in column_names:
        if df[name].dtype == 'object':
            try:
                df[name] = pd.to_datetime(df[name])
                if(date_to_cont):
                    new_col_names = "con_" + name.lower().replace(" ", "_").replace("/", "_")
                    df = df.rename(columns={name: new_col_names})
                else:
                    new_col_names = "date_" + name.lower().replace(" ", "_").replace("/", "_")
                    df = df.rename(columns={name: new_col_names})                    
            except ValueError:
                pass
    
    column_names = list(df.columns.values)
    
    for name in column_names:
        if name[0:3] == "rem" or "con" or "cat" or "date":
            pass
        if df[name].dtype == 'object':
            new_col_names = "cat_" + name.lower().replace(" ", "_").replace("/", "_")
            df = df.rename(columns={name: new_col_names})
        elif df[name].dtype == 'float64' or df[name].dtype == 'int64' or df[name].dtype == 'datetime64[ns]':
            new_col_names = "con_" + name.lower().replace(" ", "_").replace("/", "_")
            df = df.rename(columns={name: new_col_names})
    column_names = list(df.columns.values)
    
    # Get lists of coolumns for conversion
    con_column_names = []
    cat_column_names = []
    rem_column_names = []
    date_column_names = []
    
    for name in column_names:
        if name[0:3] == "cat":
            cat_column_names.append(name)
        elif name[0:3] == "con":
            con_column_names.append(name)
        elif name[0:3] == "rem":
            rem_column_names.append(name)
        elif name[0:4] == "date":
            date_column_names.append(name)
            
    # Make sure continuous variables are correct datatype. (Otherwise, they'll be dummied).
    for name in con_column_names:
        df[name] = pd.to_numeric(df[name], errors='coerce')
        df[name] = df[name].fillna(value=0)
    
    for name in cat_column_names:
        df[name] = df[name].apply(str)
        df[name] = df[name].fillna(value='not_collected')
    
    # Remove unwanted columns    
    df = df.drop(columns=rem_column_names, axis=1)
    return df

# ------------------------------------------------------
# Encoding Categorical variables
# ------------------------------------------------------

# The method below creates dummy variables from columns with
# the prefix "cat".  There is the argument to drop the first column
# to avoid the Dummy Variable Trap.
def dummy_categorical(df, drop_first = True):
    # Get categorical data columns.
    columns = list(df.columns.values)
    columnsToEncode = columns.copy() 

    for name in columns:
        if name[0:3] != 'cat':          
            columnsToEncode.remove(name)

    # if there are no columns to encode, return unmutated.
    if not columnsToEncode:
        return df


    # Encode categories
    for name in columnsToEncode:

        if name[0:3] != 'cat':
            continue

        tmp = pd.get_dummies(df[name], drop_first = drop_first)
        names = {}
        
        # Get a clean column name.
        clean_name = name.replace(" ", "_").replace("/", "_").lower()
        # Get a dictionary for renaming the dummay variables in the scheme of old_col_name + response_string
        if clean_name[0:3] == "cat":
            for tmp_name in tmp:
                tmp_name = str(tmp_name)
                new_tmp_name = tmp_name.replace(" ", "_").replace("/", "_").lower()
                new_tmp_name = clean_name + "_" + new_tmp_name
                names[tmp_name] = new_tmp_name
        
        # Rename the dummy variable dataframe
        tmp = tmp.rename(columns=names)
        
        # join the dummy variable back to original dataframe.
        df = df.join(tmp)
    
    # Drop all old categorical columns
    df = df.drop(columns=columnsToEncode, axis=1)
    return df

# Read the file
df = pd.read_csv(pathToData)

# Drop columns such as unique IDs
try:
    df = df.drop(cols_to_remove, axis=1)
except:
    # If columns aren't contained or no columns, exit nicely
    result = {'status': 404, 'message': 'Problem with columns to remove.'}
    print(str(json.dumps(result)))
    quit()
    
# Get the number of columns before hot encoding
num_cols_before = df.shape[1]

# Encode the data.
df = add_datatype_prefix(df)
df = dummy_categorical(df)

# Get the new dataframe shape.
num_cols_after = df.shape[1]


percentage_increase = num_cols_after / num_cols_before

result = ""

if percentage_increase > unreasonable_increase:
    message = "\"error\": \"Feature increase is greater than unreasonableIncreaseThreshold, most likely a unique id was included."
    result = {'status': 400, 'message': message}
else:
    filename = filename.replace(".csv", "")
    import os
    if not os.path.exists(pathToWriteProcessedFile):
        os.makedirs(pathToWriteProcessedFile)
        
    
    writeFile = pathToWriteProcessedFile + filename + "_encoded.csv"
    df.to_csv(path_or_buf=writeFile, sep=',')
    
    
    # Process the results and return JSON results object
    result = {'status': 200, 'message': 'encoded data', 'path': writeFile}
 
print(str(json.dumps(result)))

That’s the premise. I’ll be adding more services to as a series of articles.