What is a Data Warehouse

Insights over Data

Data. They are the plastic of the tech world. We’re are making way too much of it, you can’t seem to get rid of it, and rarely is it used for its designed purpose. Unfortunately, like water in plastic bottles, invaluable insights are found in these data. And all good systems are built on insight.

I’ve worked as a data engineer for the last few years and realize this is a fairly universal problem. Converting data into insight is hard. There’s too little time. The cloud bill is too much. The data are never clean. And there seems to be the assumption from the C-suite data innately have value. It doesn’t. Data are a raw resource, which can be converted into insights with the skilled people and proper tools. And a data warehouse is one of the best tools for increasing the ease of generating insight from data.

Before moving on, let’s define data and insights.

There are many fancy explanations of what insights are. In my opinion, they are gain in knowledge about how something works. In the visualization above, we can see the females score higher than males, and males lower than non-binary genders. This is insight.

Looking at the “Data” pane, little insight can be gained. There is information, such as an email we could use to contact someone. Or their name, by which to address them. However, there isn’t anything which explains how something works. These are data.

Unfortunately, in information technology, converting data into insight is tough. As I hope to demonstrate.

Heart Rate Variability

Throughout this article I’m going to use examples regarding Heart Rate Variability. As the name suggests, it is a measure of variability between heart-beats. To be clear, not the time between heart-beats, but the variability.

I find heart rate variability, or HRV, fascinating because it is linked to human stress levels. Research seems to indicate if HRV is high, then you are relaxed. But if your HRV is low, you are stressed. Evolutionarily, this makes sense. During times of stress your heart is one of the most important organs in your body. It focuses up and ensures all muscles and organs are ready for fighting hard or running harder. The famous fight-or-flight. However, during times of relaxation, your heart chills out too. Its beats are less well timed. Hey, your heart works hard!

Not required to follow this article, but if you want to read more about the psychological and social aspects of HRV.

I got interested in HRV as I found Apple Watch now tracks it. And I’ve been curious if it is an actual valid indicator of my stress levels.

What’s the Problem?

So what does this have to do with a data warehouses? Let’s look at the HRV data the Apple Watch produces.

occurrence_id user_id start end hrv
1 1 2021-09-01 14:10:01 2021-09-01 17:00:00 40
2 1 2021-09-01 19:00:00 2021-09-01 23:00:00 55
3 1 2021-09-02 05:00:00 2021-09-03 13:00:01 120
4 1 2021-09-04 14:00:00   65

These data have a start and end time, and if the end time is blank it means it is still going. They also have a user_id and the heart_rate_variability score. Let’s try to ask the data a question.

  • What is my average HRV per day?

A naive method for querying these data might look like:

  SELECT AVG(heart_rate_variability)   AS avg_hrv,
         COALESCE(DATE(start), DATE('')) AS day
    FROM df
GROUP BY day

This produces the following:

This looks pretty good. We can get a bit of insight, it looks like my average HRV might be 50. And it looks like 09-04 was an extremely relaxed day. But, what happened to 2021-09-03?

This is where things start getting complex. There is a long history in databases of converting sustained values to a start and stop value. Databases are highly optimized for updating records quickly and minimizing the amount of disk space needed to store the data. These types of databases are known as online transactional processing (OLTP) databases. And they are the antithesis of a data warehouse. Data warehouses are known as online analytical processing (OLAP) databases. Unlike OLTP, they are optimized for speed of providing insights. I’ll go in more depth about OLTP vs. OLAP a bit later.

Let’s get back to solving the problem.

The Infamous Calendar Table

Let’s look at what the start-stop data actually represent.

avg_hrv day
47.5 2021-09-01
65 2021-09-02
65 2021-09-03
120 2021-09-04

Simple enough, right? But how do we transform the start and stop data into a list of all dates with sustained values. The most straightforward method is the calendar table.

In SQL, every query must begin with a base table. This is the table associated with the FROM keyword. This is the base for all other data are retrieved. In our problem, what we really need is a base table which fills in all the possible missing dates in the start-stop data.

In this situation, it is known as a “calendar table” and is simply a list of possible dates you wish to include in your query.

For example:

date
2021-09-01
2021-09-02
2021-09-03
2021-09-04

Nice! But how do we apply this to our question?

  • What is my average HRV per day?

We can apply it with the following query:

   SELECT
          AVG(hrv) avg_hrv,
          c.date
     FROM calendar AS c
LEFT JOIN hrvs AS h
       ON c.date BETWEEN DATE(start)
                     AND COALESCE(DATE(end), DATE('2021-09-05'))
    GROUP BY c.date

This joins all of the HRV start-stop values to the calendar table where the calendar.date falls between the hrvs.start and hrvs.stop dates. This is the result:

Perfect, this has the exact effect we wish. It expands the start-stop dates for all days and calculates the average heart rate variability per day. You may ask, “But this only generated one extra date right? Do we really need a data warehouse for it; can’t a classical database solve it?” Yes, yes it can. But! Is it really one extra day?

Let’s change our question slightly.

  • What is my average HRV per hour?

Some of you may see where I’m going here, but let’s just draw it out anyway. We can answer the above question with another calendar table, but this one with a finer grain (level of granularity). That is, our calendar table will now have an entry for every hour between the minimum start and maximum exit.

date
2021-09-01 00:00:00
2021-09-01 01:00:00
2021-09-01 02:00:00
2021-09-01 03:00:00
….
2021-09-04 04:00:00
2021-09-04 05:00:00
2021-09-04 06:00:00
2021-09-04 07:00:00
2021-09-04 08:00:00

Then, we can run the following query:

   SELECT
          AVG(hrv) avg_hrv,
          c.date
     FROM calendar AS c
LEFT JOIN hrvs AS h
       ON c.date BETWEEN start
                     AND COALESCE(end, '2021-09-05 23:59:59')
    GROUP BY c.date

Our result should look something like this:

And here in lies the problem. When we begin to try and expand the data, they have a tendency to grow in number. These sorts of relationships have all sorts of terms associated with it. In SQL, we refer to the relationship having a many-to-many relationship. That is, many dates in the calendar table refer to many start-stop entries in the hrv table. And when the result increases the result set greatly, we refer to this as data “fan out.”

Just One More

At this point you may be wondering if this calendar table was really the best way to answer the questions we have of these data. Well, I’m going to risk boring you by giving you just a few more examples.

Let’s say you want to ask the data:

  • What is average HRV of everyone per hour? 🤯

This would take data like this:

occurrence_id user_id start end heart_rate_variability
1 1 2021-09-01 14:10:01 2021-09-01 17:00:00 40
2 1 2021-09-01 19:00:00 2021-09-01 23:00:00 55
3 1 2021-09-02 5:00:00 2021-09-02 13:00:01 120
4 1 2021-09-02 14:00:00   65
5 2 2021-09-01 8:00:00 2021-09-01 17:00:00 80
6 2 2021-09-01 18:00:00 2021-09-01 22:00:00 35
7 2 2021-09-02 5:30:00 2021-09-02 17:00:00 25
8 2 2021-09-02 17:00:00   105

And join it to the calendar table. The math, roughly, works out to something like this.

[hours between min and max dates] x [users] = [rows of data]

And to make it even more apparent, what if we ask something like:

  • What is average HRV of everyone per hour who is over 90.7 kilograms (200lbs)?

Then the math becomes something like this

[hours between min and max dates] x [users] x [facts] = [rows of data]

Even with two years of data for 100 users the data start getting large.

  • What is average HRV of everyone per hour who is over 90.7 kilograms (200lbs) and under 50 years of age?
(2 x 365 x 24) * (100 x 2) = 3,504,000

And with every increase of granularity we get some order of magnitude of increase in the amount of data.

  • What is average HRV of everyone per hour who is over 90.7 kilograms (200lbs) and under 50 years of age?
(2 x 365 x 24 x 60) x (100 X 2) = 210,240,000

This sheer number of rows of data which must be processed is the reason for a data warehouse.

Online Transactional Processing (OLTP)

Let’s examine Online Transactional Processing databases to understand why they have difficulty with reading large number of rows. First, what exactly are we talking about when we say “an OLTP database”? In the wild, most database technologies are OLTP, as they fit the most common need of a database.

For example,

  • SQLite
  • PostgreSQL
  • MySQL
  • MariaDB
  • Microsoft SQL Server (MSSS)

There are many, many more.

A clever individual may argue, “Wait, these database technologies are capable of being setup as an OLAP database.” True, many of the above have “plugins” or settings to allow them to act as an analytics database, however, their primary purpose is transactional processing. This leads us to the question, “What is ‘transactional processing’ anyway?”

Don’t let the name try to trick you. Transactional processing databases are exactly how they sound. They are technologies optimized for large amounts of small transactions. These transactions may be retrieving a username, then updating a service entry. Or retrieving a user ID, then adding an address. But most involve at least one read and write operation, thus “transaction.” These transactions are usually extremely short and involve a small amount of data. Let’s go through the details of the characteristics a bit more.

Supports Large Amounts of Concurrency

OLTP DBs are designed for large amounts of users making short queries of the database. This makes them perfect for the backend of many software applications. Let’s take a small online bookstore called Amazingzone. This bookstore will often have approximately 300 customers interacting with their webpage at a time. Each time they click on a product the web application sends a query to the products table and retrieves the product information. Then, it updates a record in the visitor table indicating a user viewed a product. Given each customer is casually browsing, this may lead to a few thousand queries a minute. Each query would look something like this:

SELECT product_name, product_image_path
  FROM products
 WHERE id = 534;

 INSERT user_id, view_date INTO visitors VALUES (754, CURRENT_DATE());

Even though though there are thousands of queries a minute, each one is retrieving a single record, from a single table. This is what is meant when you see a database which is “optimized for concurrency.” It means the database is designed to allow thousands of small queries to be executed simultaneously.

Data are Normalized

Another characteristic of an OLTP database is the data are often normalized into at least the 2rd Normal Form. An oversimplified explanation of 2NF would be: “More tables, less data.”

One of the goals of normalization is to reduce the amount of disk space a database needs. This comes from an age where databases were limited to kilobytes of data rather than exabytes. With that in mind, for the following example let’s assume every cell beside the field headers require 1 byte regardless of the datatype or information contained.

id avg_hrv date user_id user_name user_age
0 47.5 2021-09-01 1 Thomas 41
1 120 2021-09-02 1 Thomas 41
2 120 2021-09-03 1 Thomas 41
3 65 2021-09-04 1 Thomas 41

Given the table above, it would take 24 bytes to store all the information on a database. If our goal is reducing disk space, 2nd Normal Form (2NF) can help. Let’s see what these same data look like after “normalization.”

We will split the data into two tables, hrv_recordings and users.

Here’s the hrv_recordings table:

id avg_hrv date user_id
0 47.5 2021-09-01 1
1 120 2021-09-02 1
2 120 2021-09-03 1
3 65 2021-09-04 1

And the users table:

id user_name user_age
1 Thomas 41

Now we have two tables related by the id of the user’s table, the Primary Key, and the user_id in the hrv_recordings table, the Foreign Key. When we query these data we can put the tables back together by using the JOIN keyword.

For example:

SELECT *
  FROM hrv_recordings AS hrvr
  JOIN users AS u
    ON hrvr.user_id = u.user_id

But why should we normalize these data in the first place. Well, if you go back and count the number of cells after normalization you will find their are 19. For our oversimplified scenario, that means we have saved 5 bytes through normalization! Pretty nifty.

And if 5 bytes doesn’t seem impressive, change it to a percentage. This represents a 20% savings. Let’s say your database is 100 GBs and our normalization savings scale linearly. The means you saved 20 GBs simply by changing how your data are stored! Nice job, you.

But the skeptics may be curious if there’s a catch. Well, yes, the catch is a reason normalized databases aren’t great for analytics. Normalization optimizes for reduced disk space needed and quick insets, not for how quickly the data can be read out.

Going back to the SQL query:

SELECT *
  FROM hrv_recordings AS hrvr
  JOIN users AS u
    ON hrvr.user_id = u.user_id

What’s really happening in this join can be simplified into a comparison loop.

For example:

for i in range(hrv_recordings.shape[0]):
  for j in range(users.shape[0]):
    if hrv_recordings[i:i+1,0] == users[j:j+1,0]:
      return (hrv_recordings[i:i+1,:], users[j:j+1,:])

If the above Python snippet doesn’t make sense, ignore it. Just know joins in SQL come with a computation cost every time a query is executed. And that’s the primary trade off with normalization, you are reducing the amount of disk space needed to store the data at the cost of retrieving the data quickly.

Made for Writing

About those quick inserts. As mentioned above, OLTP databases optimize for less disk space and speed of inserting new data. This makes sense, almost every interaction a user has with a an application most likely involves writing to the database, most often, more writing than reading.

Let’s take Amazingzone again. A customer clicks the “Buy it Now” button. This writes to the following tables:

  • orders table gets a record entry added for corresponding to the purchased item.
  • shipping table will get a record relating the order with a customer’s address.
  • users_orders tables get a record relating the orders entry with the customers
  • marketing table will contain an entry relating the orders entry with the customer prior viewing history.

And so forth. Needless to say, there are a lot of write operations, but fewer pieces read operations, as some of the data recorded will be returned to the user, such as the order amount, but a lot wont, such as the marketing data.

Data are Often Realtime

OLAP

Extract, Transform, Load (ETL) Prepare Data

Data are Denormalized (Star Schema)

Supports Parallelization of a Single Query

Made for Reading

OLTP and OLAP Compared

Let’s Talk Rows and Columns

As the name suggests, MariaDB ColumnStore stores the data in column format. This is referred to a columnar database management system (CDBMS). They differ from the row based database management system (RDBMS) in how they store data. There is lots of history on why, but historically, the world has used RDBMS to store data.

You can think of CDBMS and RDBMS as tools for specific jobs. They both work with data, but they have pros and cons which must be assessed when applying them.

Here is my summary comparison of the two systems.

Scraping Images from Google Search Results using Python

This articles relies on the code written by Fabian Bosler:

I’ve only modified Bosler’s code to make it a bit easier to pull images for multiple search terms.

The full code can be found the series’ Github repository:

Magic Symbols

As I’ve mentioned in my previous article, I needed a lot of images of magic symbols for training a deep convolutional generative adversarial network (DCGAN). Luckily, I landed on Bosler’s article early on.

To get my images, I used Chrome browser, Chromedriver, Selenium, and a Python script to slowly scrape images from Google’s image search. The scraping was done throttled to near human speed, but allowed automating the collection of a lot of images.

Regarding this process, I’ll echo Bosler, I’m in no way a legal expert. I’m not a lawyer and nothing I state should be taking as legal advice. I’m just some hack on the internet. However, from what I understand, scraping the SERPs (search engine results pages) is not illegal, at least, not for personal use. But using Google’s Image search for automated scraping of images is against their terms of service (ToS). Replicate this project at your own risk. I know when I adjusted my script to search faster Google banned my IP. I’m glad it was temporary.

Bosler’s Modified Script

The script automatically searches for images and collects their underlying URL. After searching, it uses the Python requests library to download all the images into a folder named respective to the search term.

Here are the modifications I made to Bosler’s original script:

  • Added a search term loop. This allows the script to continue running past one search term.
  • The script was getting stuck when it ran into the “Show More Results,” I’ve fixed the issue.
  • The results are saved in directories associated with the search term. If the script is interrupted and rerun it will look at what directories are created first, and remove those from the search terms.
  • I added a timeout feature; thanks to a user on Stack Overflow.
  • I parameterized the number of images to look for per search term, sleep times, and timeout.

Code: Libraries

You will need to install Chromedriver and Selenium–this is explained well in the original article.

You will also need to install Pillow–a Python library for managing images.

You can install it with:

pip install pillow

After installing all the needed libraries the following block of code should execute without error:

import os
import time

import io
import hashlib
import signal
from glob import glob
import requests

from PIL import Image
from selenium import webdriver

If you have any troubles, revisit the original articles setup explanation or feel free to ask questions in the comments below.

Code: Parameters

I’ve added a few parameters to the script to make use easier.

number_of_images = 400
GET_IMAGE_TIMEOUT = 2
SLEEP_BETWEEN_INTERACTIONS = 0.1
SLEEP_BEFORE_MORE = 5
IMAGE_QUALITY = 85

output_path = "/path/to/your/image/directory"

The number_of_images tells the script how many images to search for per search term. If the script runs out of images before reaching number_of_images, it will skip to the next term.

GET_IMAGE_TIMEOUT determines how long the script should wait for a response before skipping to the next image URL.

SLEEP_BETWEEN_INTERACTIONS is how long the script should delay before checking the URL of the next image. In theory, this can be set low, as I don’t think it makes any requests of Google. But I’m unsure, adjust at your own risk.

SLEEP_BEFORE_MORE is how long the script should wait before clicking on the “Show More Results” button. This should not be set lower than you can physically search. Your IP will be banned. Mine was.

Code: Search Terms

Here is where the magic happens. The search_terms array should include any terms which you think will get the sorts of images you are targeting.

Below are the exact set of terms I used to collect magic symbol images:

search_terms = [
    "black and white magic symbol icon",
    "black and white arcane symbol icon",
    "black and white mystical symbol",
    "black and white useful magic symbols icon",
    "black and white ancient magic sybol icon",
    "black and white key of solomn symbol icon",
    "black and white historic magic symbol icon",
    "black and white symbols of demons icon",
    "black and white magic symbols from book of enoch",
    "black and white historical magic symbols icons",
    "black and white witchcraft magic symbols icons",
    "black and white occult symbols icons",
    "black and white rare magic occult symbols icons",
    "black and white rare medieval occult symbols icons",
    "black and white alchemical symbols icons",
    "black and white demonology symbols icons",
    "black and white magic language symbols icon",
    "black and white magic words symbols glyphs",
    "black and white sorcerer symbols",
    "black and white magic symbols of power",
    "occult religious symbols from old books",
    "conjuring symbols",
    "magic wards",
    "esoteric magic symbols",
    "demon summing symbols",
    "demon banishing symbols",
    "esoteric magic sigils",
    "esoteric occult sigils",
    "ancient cult symbols",
    "gypsy occult symbols",
    "Feri Tradition symbols",
    "Quimbanda symbols",
    "Nagualism symbols",
    "Pow-wowing symbols",
    "Onmyodo symbols",
    "Ku magical symbols",
    "Seidhr And Galdr magical symbols",
    "Greco-Roman magic symbols",
    "Levant magic symbols",
    "Book of the Dead magic symbols",
    "kali magic symbols",
]

Before searching, the script checks the image output directory to determine if images have already been gathered for a particular term. If it has, the script will exclude the term from the search. This is part of my “be cool” code. We don’t need to be downloading a bunch of images twice.

The code below grabs all the directories in our output path, then reconstructs the search term from the directory name (i.e., it replaces the “_”s with “ “s.)

dirs = glob(output_path + "*")
dirs = [dir.split("/")[-1].replace("_", " ") for dir in dirs]
search_terms = [term for term in search_terms if term not in dirs]

Code: Chromedriver

Before starting the script, we have to kick off a Chromedriver session. Note, you must put the chromedriver executable into a folder listed in your PATH variable for Selenium to find it.

For MacOS users, setting up Chromedriver for Selenium use is a bit tough to do manually. But, using homebrew makes it easy.

brew install chromedriver

If everything is setup correctly, executing the following code will open a Chrome browser and bring up the Google search page.

wd = webdriver.Chrome()
wd.get("https://google.com")

Code: Chrome Timeout

The timeout class below I borrowed from Thomas Ahle at Stack Overflow. It is a dirty way of creating a timeout for the GET request to download the image. Without it, the script can get stuck on unresponsive image downloads.

class timeout:
    def __init__(self, seconds=1, error_message="Timeout"):
        self.seconds = seconds
        self.error_message = error_message

    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)

    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)

    def __exit__(self, type, value, traceback):
        signal.alarm(0)

Code: Fetch Images

As I’ve hope I made clear, the code below I did not write; I just polished it. I’ll provide a brief explanation, but refer back to Bosler’s article for more information.

Essentially, the script:

  1. Creates a directory corresponding to a search term in the array.
  2. It passes the search term to the fetch_image_urls(), this function drives the Chrome session. The script navigates the Google to find images relating to the search term. It stores the image link in an list. After it has searched through all the images or reached the num_of_images it returns a list (res) containing all the image URLs.
  3. The list of image URLs is passed to the persist_image(), which then downloads each one of the images into the corresponding folder.
  4. It repeats steps 1-3 per search term.

I’ve added extra comments as a guide:

def fetch_image_urls(
    query: str,
    max_links_to_fetch: int,
    wd: webdriver,
    sleep_between_interactions: int = 1,
):
    def scroll_to_end(wd):
        wd.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(sleep_between_interactions)

    # Build the Google Query.
    search_url = "https://www.google.com/search?safe=off&site=&tbm=isch&source=hp&q={q}&oq={q}&gs_l=img"

    # load the page
    wd.get(search_url.format(q=query))

    # Declared as a set, to prevent duplicates.
    image_urls = set()
    image_count = 0
    results_start = 0
    while image_count < max_links_to_fetch:
        scroll_to_end(wd)

        # Get all image thumbnail results
        thumbnail_results = wd.find_elements_by_css_selector("img.Q4LuWd")
        number_results = len(thumbnail_results)

        print(
            f"Found: {number_results} search results. Extracting links from {results_start}:{number_results}"
        )

        # Loop through image thumbnail identified
        for img in thumbnail_results[results_start:number_results]:
            # Try to click every thumbnail such that we can get the real image behind it.
            try:
                img.click()
                time.sleep(sleep_between_interactions)
            except Exception:
                continue

            # Extract image urls
            actual_images = wd.find_elements_by_css_selector("img.n3VNCb")
            for actual_image in actual_images:
                if actual_image.get_attribute(
                    "src"
                ) and "http" in actual_image.get_attribute("src"):
                    image_urls.add(actual_image.get_attribute("src"))

            image_count = len(image_urls)

            # If the number images found exceeds our `num_of_images`, end the seaerch.
            if len(image_urls) >= max_links_to_fetch:
                print(f"Found: {len(image_urls)} image links, done!")
                break
        else:
            # If we haven't found all the images we want, let's look for more.
            print("Found:", len(image_urls), "image links, looking for more ...")
            time.sleep(SLEEP_BEFORE_MORE)

            # Check for button signifying no more images.
            not_what_you_want_button = ""
            try:
                not_what_you_want_button = wd.find_element_by_css_selector(".r0zKGf")
            except:
                pass

            # If there are no more images return.
            if not_what_you_want_button:
                print("No more images available.")
                return image_urls

            # If there is a "Load More" button, click it.
            load_more_button = wd.find_element_by_css_selector(".mye4qd")
            if load_more_button and not not_what_you_want_button:
                wd.execute_script("document.querySelector('.mye4qd').click();")

        # Move the result startpoint further down.
        results_start = len(thumbnail_results)

    return image_urls


def persist_image(folder_path: str, url: str):
    try:
        print("Getting image")
        # Download the image.  If timeout is exceeded, throw an error.
        with timeout(GET_IMAGE_TIMEOUT):
            image_content = requests.get(url).content

    except Exception as e:
        print(f"ERROR - Could not download {url} - {e}")

    try:
        # Convert the image into a bit stream, then save it.
        image_file = io.BytesIO(image_content)
        image = Image.open(image_file).convert("RGB")
        # Create a unique filepath from the contents of the image.
        file_path = os.path.join(
            folder_path, hashlib.sha1(image_content).hexdigest()[:10] + ".jpg"
        )
        with open(file_path, "wb") as f:
            image.save(f, "JPEG", quality=IMAGE_QUALITY)
        print(f"SUCCESS - saved {url} - as {file_path}")
    except Exception as e:
        print(f"ERROR - Could not save {url} - {e}")

def search_and_download(search_term: str, target_path="./images", number_images=5):
    # Create a folder name.
    target_folder = os.path.join(target_path, "_".join(search_term.lower().split(" ")))

    # Create image folder if needed.
    if not os.path.exists(target_folder):
        os.makedirs(target_folder)

    # Open Chrome
    with webdriver.Chrome() as wd:
        # Search for images URLs.
        res = fetch_image_urls(
            search_term,
            number_images,
            wd=wd,
            sleep_between_interactions=SLEEP_BETWEEN_INTERACTIONS,
        )

        # Download the images.
        if res is not None:
            for elem in res:
                persist_image(target_folder, elem)
        else:
            print(f"Failed to return links for term: {search_term}")

# Loop through all the search terms.
for term in search_terms:
    search_and_download(term, output_path, number_of_images)

Results

Scraping tehe images resulted in a lot of garbage images (noise) along with my ideal training images.

For example, out of all the images shown, I only wanted the image highlighted:

There was also the problem of lots of magic symbols stored in a single image. These “collection” images would need further processing to extract all of the symbols.

However, even with a few rough edges, the script sure as hell beat manually downloading the 10k images I had in the end.

Train a Generative Adversarial Network to Create Magic Symbols

I love folklore dealing with magic. Spells, witches, and summoning the dead. It all piques my interest. I think it inspires me as it is far removed from being a data engineer–I know it might kill aspirations of young data engineers reading, but data engineering can be a bit boring at times. To beat the boredom, I decided to mix my personal and professional interests.

I’ve scraped the internet for images of magic symbols, then trained a deep convolutional generative adversarial network (DCGAN) to generate new magic symbols, which are congruent to real magic symbols. The DCGAN is built using PyTorch. I usually roll with Tensorflow, but working on learning PyTorch.

I’ve taken the “nothing but net” approach with this project. Most of the data augmentation I’ve done during this project have been using other neural networks. Most of these augmenting nets were written in Tensorflow.

I’ve planned a series of articles, as there is too much to cover in one. A lot of the code has been borrowed and adapted; I’ll do my best to give credit where it’s due.

What was in my Head

Let’s start with current results first. After getting the urge to teach a computer to make a magic sign, it took a couple days of hacking before I ended up with the images below.

Keep in mind, these are preliminary results. They were generated using my GTX 1060 6GB. The GPU RAM limits the model a lot–at least, until I rewrite the training loop. Why do I mention the the small GPU? Well, GANs are an architecture which provide much better results with more neurons. And the 6GB limits the network a lot for well performing GAN.

Anyway, ‘nuff caveats. Let’s dig in.

Signal

There are a few concepts I’ll refer to a lot throughout these articles–let’s define real quick.

First, “signal.” I like Wikipedia’s definition, even if it is sleep inducing.

In signal processing, a signal is a function that conveys information about a phenomenon.

One of the mistakes I made early in this project was not defining the desired signal. In future projects, I’ll lead with a written definition and modify it based on what I learn about the signal. However, for this project, here was my eventual definition.

The “magic symbol” signal had the following properties:

  • Used in traditional superstition
  • Defined

These terms became my measuring stick for determining whether an image was included in the training data.

Given poorly defined training images seemed to produce extremely muddy outputs, I decided each image should be “defined.” Meaning, an image must be easily discernible at the resolution in which it was trained.

Here are examples of what I see as “defined”:

And examples of “used in traditional superstition.” The top-left symbol is the Leviathan Cross and bottom-left is the Sigil of Bael.

Results

Again, preliminary results. I’m shopping for a way to scale up the size of the network, which should increase the articulation of the outputs. Overall, the bigger the network the more interesting the results.

Small Symbols (64x64)

The following symbols were generated with a DCGAN using 64x64 dimensions as output. These symbols were then post-processed by using a deep denoising varational auto-encoder (DDVAE). It was a fancy way of removing “pepper” from the images.

Large Symbols (128x128)

The following symbols were generated with a GAN using 128x128 dimensions as input and output. These symbols were not post-processed.

Assessment of Outputs

Overall, I’m pleased with the output. Looking at how muddy the outputs are on the 128x128 you may be wondering why. Well, a few reasons.

I’ve been able to avoid mode collapse in almost all of my training sessions. Mode collapse is the bane of GANs. Simply put, the generator finds one or two outputs which always trick the discriminator and then produces those every time.

There is a lot of pepper throughout the generated images. I believe a lot of this comes from dirty input data, so when there’s time, I’ll refine my dataset further. However, the denoising auto-encoder seems to be the easiest way to get rid of the noise–as you can see the 64x64 samples (denoised) are much cleaner than the 128x128 samples. Also, I might try applying the denoiser to the inputs, rather than the outputs. In short, I feel training will greatly improve as I continue to refine the training data.

But do they look like real magic symbols? I don’t know. At this point, I’m biased, so I don’t trust my perspective. I did show the output to a coworker and asked, “What does this look like?” He said, “I don’t know, some sort of runes?” And my boss asked, “What are those Satan symbols?” So, I feel I’m on the right track.

How to Send Data between PC and Arduino using Bluetooth LE

A how-to guide on connecting your PC to an Arduino using Bluetooth LE and Python. To make it easier, we will use bleak an open source BLE library for Python. The code provided should work for connecting your PC to any Bluetooth LE devices.

Before diving in a few things to know

  • Bleak is under-development. It will have issues
  • Although Bleak is multi-OS library, Windows support is still rough
  • PC operating systems suck at BLE
  • Bleak is asynchronous; in Python, this means a bit more complexity
  • The code provided is a proof-of-concept; it should be improved before use

Ok, all warnings stated, let’s jump in.

Bleak

Bleak is a Python package written by Henrik Blidh. Although the package is still under development, it is pretty nifty. It works on Linux, Mac, or Windows. It is non-blocking, which makes writing applications a bit more complex, but extremely powerful, as your code doesn’t have to manage concurrency.

Setup

Getting started with BLE using my starter application and bleak is straightforward. You need to install bleak and I’ve also included library called aioconsole for handling user input asynchronously

pip install bleak aioconsole

Once these packages are installed we should be ready to code. If you have any issues, feel free to ask questions in the comments. I’ll respond when able.

The Code

Before we get started, if you’d rather see the full-code it can be found at:

If you are new to Python then following code may look odd. You’ll see terms like async, await, loop, and future. Don’t let it scare you. These keywords are Python’s way of allowing a programmer to “easily” write asynchronous code in Python.

If you’re are struggling with using asyncio, the built in asynchronous Python library, I’d highly recommend Łukasz Langa’s detailed video series; it takes a time commitment, but is worth it.

If you are an experienced Python programmer, feel free to critique my code, as I’m a new to Python’s asynchronous solutions. I’ve got my big kid britches on.

Enough fluff. Let’s get started.

Application Parameters

There are a few code changes needed for the script to work, at least, with the Arduino and firmware I’ve outlined in the previous article:

The incoming microphone data will be dumped into a CSV; one of the parameters is where you would like to save this CSV. I’ll be saving it to the Desktop. I’m also retrieving the user’s home folder from the HOME environment variable, which is only available on Mac and Linux OS (Unix systems). If you are trying this project from Windows, you’ll need to replace the root_path reference with the full path.

root_path = os.environ["HOME"]
output_file = f"{root_path}/Desktop/microphone_dump.csv"

You’ll also need need to specify the characteristics which the Python app should try to subscribe to when connected to remote hardware. Referring back to our previous project, you should be able to get this from the Arduino code. Or the Serial terminal printout.

read_characteristic = "00001143-0000-1000-8000-00805f9b34fb"
write_characteristic = "00001142-0000-1000-8000-00805f9b34fb"

Main

The main method is where all the async code is initialized. Essentially, it creates three different loops, which run asynchronously when possible.

  • Main – you’d put your application’s code in this loop. More on it later
  • Connection Manager – this is the heart of the Connection object I’ll describe more in a moment.
  • User Console – this loop gets data from the user and sends it to the remote device.

You can imagine each of these loops as independent, however, what they are actually doing is pausing their execution when any of the loops encounter a blocking I/O event. For example, when input is requested from the user or waiting for data from the remote BLE device. When one of these loops encounters an I/O event, they let one of the other loops take over until the I/O event is complete.

That’s far from an accurate explanation, but like I said, I won’t go in depth on async Python, as Langa’s video series is much better than my squawking.

Though, it’s important to know, the ensure_future is what tells Python to run a chunk of code asynchronously. And I’ve been calling them “loops” because each of the 3 ensure_future calls have a while True statement in them. That is, they do not return without error.

After creating the different futures, the loop.run_forever() is what causes them to run.

if __name__ == "__main__":
    # Create the event loop.
    loop = asyncio.get_event_loop()

    data_to_file = DataToFile(output_file)
    connection = Connection(
        loop, read_characteristic, write_characteristic, data_to_file.write_to_csv
    )
    try:
        asyncio.ensure_future(main())
        asyncio.ensure_future(connection.manager())
        asyncio.ensure_future(user_console_manager(connection))
        loop.run_forever()
    except KeyboardInterrupt:
        print()
        print("User stopped program.")
    finally:
        print("Disconnecting...")
        loop.run_until_complete(connection.cleanup())

Where does bleak come in? You may have been wondering about the code directly before setting up the loops.

    connection = Connection(
        loop, read_characteristic, write_characteristic, data_to_file.write_to_csv
    )

This class wrap the bleak library and makes it a bit easier to use. Let me explain.

Connection()

You may be asking, “Why create a wrapper around bleak, Thomas?” Well, two reasons. First, the bleak library is still in development and there are several aspects which do not work well. Second, there are additional features I’d like my Bluetooth LE Python class to have. For example, if you the Bluetooth LE connection is broken, I want my code to automatically attempt to reconnect. This wrapper class allows me to add these capabilities.

I did try to keep the code highly hackable. I want anybody to be able to use the code for their own applications, with a minimum time investment.

Connection(): init

The Connection class has three required arguments and one optional.

  • loop – this is the loop established by asyncio, it allows the connection class to do async magic.
  • read_characteristic – the characteristic on the remote device containing data we are interested in.
  • write_characteristic – the characteristic on the remote device which we can write data.
  • data_dump_handler – this is the function to call when we’ve filled the rx buffer.
  • data_dump_size – this is the size of the rx buffer. Once it is exceeded, the data_dump_handler function is called and the rx buffer is cleared.
class Connection:
    
    client: BleakClient = None
    
    def __init__(
        self,
        loop: asyncio.AbstractEventLoop,
        read_characteristic: str,
        write_characteristic: str,
        data_dump_handler: Callable[[str, Any], None],
        data_dump_size: int = 256,
    ):
        self.loop = loop
        self.read_characteristic = read_characteristic
        self.write_characteristic = write_characteristic
        self.data_dump_handler = data_dump_handler
        self.data_dump_size = data_dump_size

Alongside the arguments are internal variables which track device state.

The variable self.connected tracks whether the BleakClient is connected to a remote device. It is needed since the await self.client.is_connected() currently has an issue where it raises an exception if you call it and it’s not connected to a remote device. Have I mentioned bleak is in progress?

        # Device state
        self.connected = False
        self.connected_device = None

self.selected_device hangs on to the device you selected when you started the app. This is needed for reconnecting on disconnect.

The rest of variables help track the incoming data. They’ll probably be refactored into a DTO at some point.

        # RX Buffer
        self.last_packet_time = datetime.now()
        self.rx_data = []
        self.rx_timestamps = []
        self.rx_delays = []

Connection(): Callbacks

There are two callbacks in the Connection class. One to handle disconnections from the Bluetooth LE device. And one to handle incoming data.

Easy one first, the on_disconnect method is called whenever the BleakClient loses connection with the remote device. All we’re doing with the callback is setting the connected flag to False. This will cause the Connection.connect() to attempt to reconnect.

    def on_disconnect(self, client: BleakClient):
        self.connected = False
        # Put code here to handle what happens on disconnet.
        print(f"Disconnected from {self.connected_device.name}!")

The notification_handler is called by the BleakClient any time the remote device updates a characteristic we are interested in. The callback has two parameters, sender, which is the name of the device making the update, and data, which is a bytearray containing the information received.

I’m converting the data from two-bytes into a single int value using Python’s from_bytes(). The first argument is the bytearray and the byteorder defines the endianness (usually big). The converted value is then appended to the rx_data list.

The record_time_info() calls a method to save the current time and the number of microseconds between the current byte received and the previous byte.

If the length of the rx_data list is greater than the data_dump_size, then the data are passed to the data_dump_handler function and the rx_data list is cleared, along with any time tracking information.

    def notification_handler(self, sender: str, data: Any):
        self.rx_data.append(int.from_bytes(data, byteorder="big"))
        self.record_time_info()
        if len(self.rx_data) >= self.data_dump_size:
            self.data_dump_handler(self.rx_data, self.rx_timestamps, self.rx_delays)
            self.clear_lists()

Connection(): Connection Management

The Connection class’s primary job is to manage BleakClient’s connection with the remote device.

The manager function is one of the async loops. It continually checks if the Connection.client exists, if it doesn’t then it prompts the select_device() function to find a remote connection. If it does exist, then it executes the connect().

    async def manager(self):
        print("Starting connection manager.")
        while True:
            if self.client:
                await self.connect()
            else:
                await self.select_device()
                await asyncio.sleep(15.0, loop=loop)       

The connect() is responsible for ensuring the PC’s Bluetooth LE device maintains a connection with the selected remote device.

First, the method checks if the the device is already connected, if it does, then it simply returns. Remember, this function is in an async loop.

If the device is not connected, it tries to make the connection by calling self.client.connect(). This is awaited, meaning it will not continue to execute the rest of the method until this function call is returned. Then, we check if the connection is was successful and update the Connection.connected property.

If the BleakClient is indeed connected, then we add the on_disconnect and notification_handler callbacks. Note, we only added a callback on the read_characteristic. Makes sense, right?

Lastly, we enter an infinite loop which checks every 5 seconds if the BleakClient is still connected, if it isn’t, then it breaks the loop, the function returns, and the entire method is called again.

    async def connect(self):
        if self.connected:
            return
        try:
            await self.client.connect()
            self.connected = await self.client.is_connected()
            if self.connected:
                print(f"Connected to {self.connected_device.name}")
                self.client.set_disconnected_callback(self.on_disconnect)
                await self.client.start_notify(
                    self.read_characteristic, self.notification_handler,
                )
                while True:
                    if not self.connected:
                        break
                    await asyncio.sleep(5.0, loop=loop)
            else:
                print(f"Failed to connect to {self.connected_device.name}")
        except Exception as e:
            print(e)

Whenever we decide to end the connection, we can escape the program by hitting CTRL+C, however, before shutting down the BleakClient needs to free up the hardware. The cleanup method checks if the Connection.client exists, if it does, it tells the remote device we no longer want notifications from the read_characteristic. It also sends a signal to our PC’s hardware and the remote device we want to disconnect.

    async def cleanup(self):
        if self.client:
            await self.client.stop_notify(read_characteristic)
            await self.client.disconnect()

Device Selection

Bleak is a multi-OS package, however, there are slight differences between the different operating-systems. One of those is the address of your remote device. Windows and Linux report the remote device by it’s MAC. Of course, Mac has to be the odd duck, it uses a Universally Unique Identifier (UUID). Specially, it uses a CoreBluetooth UUID, or a CBUUID.

These identifiers are important as bleak uses them during its connection process. These IDs are static, that is, they shouldn’t change between sessions, yet they should be unique to the hardware.

The select_device method calls the bleak.discover method, which returns a list of BleakDevices advertising their connections within range. The code uses the aioconsole package to asynchronously request the user to select a particular device

    async def select_device(self):
        print("Bluetooh LE hardware warming up...")
        await asyncio.sleep(2.0, loop=loop) # Wait for BLE to initialize.
        devices = await discover()

        print("Please select device: ")
        for i, device in enumerate(devices):
            print(f"{i}: {device.name}")

        response = -1
        while True:
            response = await ainput("Select device: ")
            try:
                response = int(response.strip())
            except:
                print("Please make valid selection.")
            
            if response > -1 and response < len(devices):
                break
            else:
                print("Please make valid selection.")

After the user has selected a device then the Connection.connected_device is recorded (in case we needed it later) and the Connection.client is set to a newly created BleakClient with the address of the user selected device.


        print(f"Connecting to {devices[response].name}")
        self.connected_device = devices[response]
        self.client = BleakClient(devices[response].address, loop=self.loop)

Utility Methods

Not much to see here, these methods are used to handle timestamps on incoming Bluetooth LE data and clearing the rx buffer.

    def record_time_info(self):
        present_time = datetime.now()
        self.rx_timestamps.append(present_time)
        self.rx_delays.append((present_time - self.last_packet_time).microseconds)
        self.last_packet_time = present_time

    def clear_lists(self):
        self.rx_data.clear()
        self.rx_delays.clear()
        self.rx_timestamps.clear()

Save Incoming Data to File

This is a small class meant to make it easier to record the incoming microphone data along with the time it was received and delay since the last bytes were received.

class DataToFile:

    column_names = ["time", "delay", "data_value"]

    def __init__(self, write_path):
        self.path = write_path

    def write_to_csv(self, times: [int], delays: [datetime], data_values: [Any]):

        if len(set([len(times), len(delays), len(data_values)])) > 1:
            raise Exception("Not all data lists are the same length.")

        with open(self.path, "a+") as f:
            if os.stat(self.path).st_size == 0:
                print("Created file.")
                f.write(",".join([str(name) for name in self.column_names]) + ",\n")
            else:
                for i in range(len(data_values)):
                    f.write(f"{times[i]},{delays[i]},{data_values[i]},\n")

App Loops

I mentioned three “async loops,” we’ve covered the first one inside the Connection class, but outside are the other two.

The user_console_manager() checks to see if the Connection instance has a instantiated a BleakClient and it is connected to a device. If so, it prompts the user for input in a non-blocking manner. After the user enters input and hits return the string is converted into a bytearray using the map(). Lastly, it is sent by directly accessing the Connection.client’s write_characteristic method. Note, that’s a bit of a code smell, it should be refactored (when I have time).

async def user_console_manager(connection: Connection):
    while True:
        if connection.client and connection.connected:
            input_str = await ainput("Enter string: ")
            bytes_to_send = bytearray(map(ord, input_str))
            await connection.client.write_gatt_char(write_characteristic, bytes_to_send)
            print(f"Sent: {input_str}")
        else:
            await asyncio.sleep(2.0, loop=loop)

The last loop is the one designed to take the application code. Right now, it only simulates application logic by sleeping 5 seconds.

async def main():
    while True:
        # YOUR APP CODE WOULD GO HERE.
        await asyncio.sleep(5)

Closing

Well, that’s it. You will have problems, especially if you are using the above code from Linux or Windows. But, if you run into any issues I’ll do my best to provide support. Just leave me a comment below.