Here is a Thread Pool!

First, here’s a handy reference implementation of a thread pool in Ruby:

require 'thread'
work_q = Queue.new
(0..50).to_a.each{|x| work_q.push x }
workers = (0...4).map do
  Thread.new do
    begin
      while x = work_q.pop(true)
        50.times{print [128000+x].pack "U*"}
      end
    rescue ThreadError
    end
  end
end; "ok"
workers.map(&:join); "ok"

Note: For optimal enjoyment, run this code snippet on a Mac with the terminal font size bumped comically high. Can you see how it’s doing no more than 4 things at once?

Breaking it down

require 'thread'

Require threading library built-in to ruby.

work_q = Queue.new

Create a Queue data structure to represent the set of work that needs to be done. A Queue is a basic stack-like data structure on which we can call push and pop. Queue is thread-safe so if multiple threads access it at the same time, it will maintain consistency. We could use a Mutex, but in most cases a Queue will work and is simpler to reason about. If we happen to be on MRI we could probably get away with a basic array here because of the GVL (Global VM Lock), but it’s best to get into the habit of writing thread-safe code whenever possible.

(0..50).to_a.each{|x| work_q.push x }

Push the numbers 0 through 50 onto the work_q.

workers = (0...4).map do

Transform an array of 4 numbers into an array of worker threads (to be created inside this block in subsequent lines).

Thread.new do

Create a new Thread and within in begin executing the code inside the block given.

while x = work_q.pop(true)

From within a worker thread (this code executing inside of each of our 4 workers), pop items off of the work_q. We pass true to the pop method to indicate that we expect the method to raise an error if the work_q is empty. Passing false (the default argument), would cause this method to block waiting on the work_q to have at least 1 item.

50.times{print [128000+x].pack "U*"}

Perform the actual work of the workers, using the item pulled from the work_q as input. In this case, print out 50 of the character at index 128000+x in unicode. On Mac OS X, this is emoji of some animal.

rescue ThreadError

The exception we expect to get when our work_q is empty, as will be thrown by the call to work_q.pop(true). We don’t care to do anything with the exception, just proceed to the end of the block and thus the end of this thread/worker’s life.

workers.map(&:join)

For each of the 4 workers, in sequence, join the main thread of execution with the worker thread. This will have the effect of halting the execution at this point until the 4 workers have completed.

Was that so hard?

Yes, perhaps surprisingly, it was hard. And, in my experience, hard enough that most Rubyists would avoid the trouble and simply be satisfied with synchronous code. Many of us seem to have a deep fear of working with threads, and a few of us have the scars to justify it. This example truly was simple compared to some of the things we’ve seen in the wild.

But threads (and thread pools) are insanely useful. Here’s another example example:

Suppose I wanted to slurp down a copy of the entire backlog of McSweeney’s Internet Tendency as seed data for a highly relevant chat bot I’m working on. I could scrape each page sequentially, which will take about 1.1986217804166666 hours according to this hastily hacked together code snippet:

require 'nokogiri'
require 'open-uri'

started_at = Time.now
(1..266).each do |page|
  puts "page #{page}"
  archive_page = Nokogiri::HTML(open("http://www.mcsweeneys.net/articles/archives?page=#{page}"))
  all_links = archive_page.css(".singleColumn a").map{|e| e['href']}
  article_urls = all_links.reject{|url| url.match(/articles\/archives\?/)}
  article_urls.each_with_index do |article_url, index|
    begin
      article = Nokogiri::HTML(open("http://www.mcsweeneys.net/#{article_url}"))
      article_body = article.css(".articleBody").text
      puts article_body.size
      ChatBot.add_seed_data!( article_body )
    rescue => e
      puts "problem on page #{page} with article #{article_url}"
      puts e.inspect
    end
  end
  puts "ESTIMATED TIME REMAINING IN HOURS:"
  puts (((Time.now - started_at) / page) * (266 - page) / 3600.0).inspect
end

Or I could read 4 pages concurrently, bring the estimate down to a much more reasonable 0.3782935601851852 hours. Code snippet:

require 'nokogiri'
require 'open-uri'
require 'thread'

started_at = Time.now
(1..266).each do |page|
  puts "page #{page}"
  archive_page = Nokogiri::HTML(open("http://www.mcsweeneys.net/articles/archives?page=#{page}"))
  all_links = archive_page.css(".singleColumn a").map{|e| e['href']}
  article_urls = all_links.reject{|url| url.match(/articles\/archives\?/)}
  work_q = Queue.new
  article_urls.each{|url| work_q << url}
  workers = (0...4).map do
    Thread.new do
      begin
        while article_url = work_q.pop(true)
          begin
            article = Nokogiri::HTML(open("http://www.mcsweeneys.net/#{article_url}"))
            article_body = article.css(".articleBody").text
            puts article_body.size
            ChatBot.add_seed_data!( article_body )
          rescue => e
            puts "problem on page #{page} with article #{article_url}"
            puts e.inspect
          end
        end
      rescue ThreadError
      end
    end
  end; "ok"
  workers.map(&:join); "ok"
  puts "ESTIMATED TIME REMAINING IN HOURS:"
  puts (((Time.now - started_at) / page) * (266 - page) / 3600.0).inspect
end

It might go even faster if I bumped up my worker count, but let’s not do that out of respect for the fine folks at McSweeney’s (whose core business value comes from snarky publications and not scaling web servers).

Threads in your Rails Console

So back to those thread-phobic ruby programmers I was talking about. One of the things we do all the time is SSH into production and figure out what’s going on from a Rails console session. We’ll find some interesting anomaly and then say to ourselves: “hmmm, I wonder how many other customers are affected”. In which case, the code we write really bears a lot of resemblance to that web scrapper snippet above. It probably makes some API calls to other systems involved. And, we can probably benefit from doing things concurrently. But, we want to avoid flooding any external APIs or the database.

The process is usually:

  1. start a screen session.
  2. write a short ruby snippet in some text editor.
  3. paste said code snippet into running console inside of screen session.
  4. check back multiple hours later.

A Few Tips

(If you find yourself doing this sort of thing)

  • You are writing a short snippet of code that’s going to take a long time to run. So try a version first that runs on a smaller subset of “all the things”. You don’t want to wait an hour to find a bug and have to run the whole thing over again.

  • Use a screen session and turn on screen logging. Regularly output some useful piece of information about how far along your code is, and maybe even attempt to estimate how long it will take to complete (see McSweeney’s scrapper for example).

  • Save results into a results hash or other useful structure, that is updated as your code runs (not when it completes). AKA each over map. If you Control-C to interrupt your code, it’s nice to have a partial answer.

  • Wrap every unit of work in begin/rescue with a puts of the error as well as saving it (and related “erred thing” data) off into a global hash or array you can inspect later. You don’t want an edge case of bad data you didn’t predict to halt your whole program (especially since you likely won’t be watching it). The McSweeney’s scrappers example include this protection for your reference.

  • Use redis if appropriate. Hashes and arrays you keep an outer reference too will be available as long as your screen+console is active, and you don’t mistakenly overwrite them. Redis offers a sort of temporary persistence slightly better than this. Be sure to prefix the keys you stuff into redis with something unique and identifying, so that your co-workers will know they belong to you. (And be sure to cleanup after yourself!)

  • Cart around IDs instead of objects. This makes the results sets you save fix into memory, or redis, or on your screen when you puts them. Also, it insulates you from connection issues, reloading issues, and other behaviors you might not have expected.

Advanced Tip

  • class_eval and class << some_object can be super handy in a console session. You can safely and quickly “fix” methods, or add methods to single objects or whole classes, in production, without worrying about affecting other running processes. You can always close your console and start over if you go too far down this rabbit hole.

Why not use a gem?

Yes, use a gem. Except in the case where you are trying to get answers to something right now and you have a production console open and don’t have the ability or authority to just go pushing new code. (Or feel like waiting for a green build and a deploy). And as you may have guessed, that is frequently the case. So I like to copy/paste small comprehensible code snippets instead of installing random gems into projects that are otherwise un-used.

Also, since threads are hard. I think we all need more practice. Using a gem will probably hide this complexity from you (as it’s supposed to). So try to write a little threaded code, you might learn something.

If you are NOT writing more than throw-away or run-once code you probably should use such a gem. Spawnling, for example, will even hide the complexity so much you can choose between threads and forking with the same DSL. Resque is the obvious choice for ruby if for concurrent workers that (Yes it’s ok to avoid threads altogether). And for advanced threading, take a look at Celluloid and it’s Actor model.

Related Links:

A Talk about Background Jobs and Failure: Video, Slides.

A Talk about Background Jobs and more Failure: Video, Slides