Here is a Thread Pool!
First, here’s a handy reference implementation of a thread pool in Ruby:
require 'thread'
work_q = Queue.new
(0..50).to_a.each{|x| work_q.push x }
workers = (0...4).map do
Thread.new do
begin
while x = work_q.pop(true)
50.times{print [128000+x].pack "U*"}
end
rescue ThreadError
end
end
end; "ok"
workers.map(&:join); "ok"
Note: For optimal enjoyment, run this code snippet on a Mac with the terminal font size bumped comically high. Can you see how it’s doing no more than 4 things at once?
Breaking it down
require 'thread'
Require threading library built-in to ruby.
work_q = Queue.new
Create a Queue
data structure to represent the set of work that needs to be done. A Queue
is a basic stack-like data structure on which we can call push
and pop
. Queue
is thread-safe so if multiple threads access it at the same time, it will maintain consistency. We could use a Mutex
, but in most cases a Queue
will work and is simpler to reason about. If we happen to be on MRI we could probably get away with a basic array here because of the GVL (Global VM Lock), but it’s best to get into the habit of writing thread-safe code whenever possible.
(0..50).to_a.each{|x| work_q.push x }
Push the numbers 0 through 50 onto the work_q
.
workers = (0...4).map do
Transform an array of 4 numbers into an array of worker threads (to be created inside this block in subsequent lines).
Thread.new do
Create a new Thread
and within in begin executing the code inside the block given.
while x = work_q.pop(true)
From within a worker thread (this code executing inside of each of our 4 workers), pop items off of the work_q
. We pass true
to the pop
method to indicate that we expect the method to raise an error if the work_q
is empty. Passing false
(the default argument), would cause this method to block waiting on the work_q
to have at least 1 item.
50.times{print [128000+x].pack "U*"}
Perform the actual work of the workers, using the item pulled from the work_q
as input. In this case, print out 50 of the character at index 128000+x in unicode. On Mac OS X, this is emoji of some animal.
rescue ThreadError
The exception we expect to get when our work_q
is empty, as will be thrown by the call to work_q.pop(true)
. We don’t care to do anything with the exception, just proceed to the end of the block and thus the end of this thread/worker’s life.
workers.map(&:join)
For each of the 4 workers, in sequence, join the main thread of execution with the worker thread. This will have the effect of halting the execution at this point until the 4 workers have completed.
Was that so hard?
Yes, perhaps surprisingly, it was hard. And, in my experience, hard enough that most Rubyists would avoid the trouble and simply be satisfied with synchronous code. Many of us seem to have a deep fear of working with threads, and a few of us have the scars to justify it. This example truly was simple compared to some of the things we’ve seen in the wild.
But threads (and thread pools) are insanely useful. Here’s another example example:
Suppose I wanted to slurp down a copy of the entire backlog of McSweeney’s Internet Tendency as seed data for a highly relevant chat bot I’m working on. I could scrape each page sequentially, which will take about 1.1986217804166666 hours according to this hastily hacked together code snippet:
require 'nokogiri'
require 'open-uri'
started_at = Time.now
(1..266).each do |page|
puts "page #{page}"
archive_page = Nokogiri::HTML(open("http://www.mcsweeneys.net/articles/archives?page=#{page}"))
all_links = archive_page.css(".singleColumn a").map{|e| e['href']}
article_urls = all_links.reject{|url| url.match(/articles\/archives\?/)}
article_urls.each_with_index do |article_url, index|
begin
article = Nokogiri::HTML(open("http://www.mcsweeneys.net/#{article_url}"))
article_body = article.css(".articleBody").text
puts article_body.size
ChatBot.add_seed_data!( article_body )
rescue => e
puts "problem on page #{page} with article #{article_url}"
puts e.inspect
end
end
puts "ESTIMATED TIME REMAINING IN HOURS:"
puts (((Time.now - started_at) / page) * (266 - page) / 3600.0).inspect
end
Or I could read 4 pages concurrently, bring the estimate down to a much more reasonable 0.3782935601851852 hours. Code snippet:
require 'nokogiri'
require 'open-uri'
require 'thread'
started_at = Time.now
(1..266).each do |page|
puts "page #{page}"
archive_page = Nokogiri::HTML(open("http://www.mcsweeneys.net/articles/archives?page=#{page}"))
all_links = archive_page.css(".singleColumn a").map{|e| e['href']}
article_urls = all_links.reject{|url| url.match(/articles\/archives\?/)}
work_q = Queue.new
article_urls.each{|url| work_q << url}
workers = (0...4).map do
Thread.new do
begin
while article_url = work_q.pop(true)
begin
article = Nokogiri::HTML(open("http://www.mcsweeneys.net/#{article_url}"))
article_body = article.css(".articleBody").text
puts article_body.size
ChatBot.add_seed_data!( article_body )
rescue => e
puts "problem on page #{page} with article #{article_url}"
puts e.inspect
end
end
rescue ThreadError
end
end
end; "ok"
workers.map(&:join); "ok"
puts "ESTIMATED TIME REMAINING IN HOURS:"
puts (((Time.now - started_at) / page) * (266 - page) / 3600.0).inspect
end
It might go even faster if I bumped up my worker count, but let’s not do that out of respect for the fine folks at McSweeney’s (whose core business value comes from snarky publications and not scaling web servers).
Threads in your Rails Console
So back to those thread-phobic ruby programmers I was talking about. One of the things we do all the time is SSH into production and figure out what’s going on from a Rails console session. We’ll find some interesting anomaly and then say to ourselves: “hmmm, I wonder how many other customers are affected”. In which case, the code we write really bears a lot of resemblance to that web scrapper snippet above. It probably makes some API calls to other systems involved. And, we can probably benefit from doing things concurrently. But, we want to avoid flooding any external APIs or the database.
The process is usually:
- start a
screen
session. - write a short ruby snippet in some text editor.
- paste said code snippet into running console inside of screen session.
- check back multiple hours later.
A Few Tips
(If you find yourself doing this sort of thing)
-
You are writing a short snippet of code that’s going to take a long time to run. So try a version first that runs on a smaller subset of “all the things”. You don’t want to wait an hour to find a bug and have to run the whole thing over again.
-
Use a screen session and turn on screen logging. Regularly output some useful piece of information about how far along your code is, and maybe even attempt to estimate how long it will take to complete (see McSweeney’s scrapper for example).
-
Save results into a
results
hash or other useful structure, that is updated as your code runs (not when it completes). AKAeach
overmap
. If you Control-C to interrupt your code, it’s nice to have a partial answer. -
Wrap every unit of work in
begin
/rescue
with aputs
of the error as well as saving it (and related “erred thing” data) off into a global hash or array you can inspect later. You don’t want an edge case of bad data you didn’t predict to halt your whole program (especially since you likely won’t be watching it). The McSweeney’s scrappers example include this protection for your reference. -
Use redis if appropriate. Hashes and arrays you keep an outer reference too will be available as long as your screen+console is active, and you don’t mistakenly overwrite them. Redis offers a sort of temporary persistence slightly better than this. Be sure to prefix the keys you stuff into redis with something unique and identifying, so that your co-workers will know they belong to you. (And be sure to cleanup after yourself!)
-
Cart around IDs instead of objects. This makes the results sets you save fix into memory, or redis, or on your screen when you
puts
them. Also, it insulates you from connection issues, reloading issues, and other behaviors you might not have expected.
Advanced Tip
class_eval
andclass << some_object
can be super handy in a console session. You can safely and quickly “fix” methods, or add methods to single objects or whole classes, in production, without worrying about affecting other running processes. You can always close your console and start over if you go too far down this rabbit hole.
Why not use a gem?
Yes, use a gem. Except in the case where you are trying to get answers to something right now and you have a production console open and don’t have the ability or authority to just go pushing new code. (Or feel like waiting for a green build and a deploy). And as you may have guessed, that is frequently the case. So I like to copy/paste small comprehensible code snippets instead of installing random gems into projects that are otherwise un-used.
Also, since threads are hard. I think we all need more practice. Using a gem will probably hide this complexity from you (as it’s supposed to). So try to write a little threaded code, you might learn something.
If you are NOT writing more than throw-away or run-once code you probably should use such a gem. Spawnling, for example, will even hide the complexity so much you can choose between threads and forking with the same DSL. Resque is the obvious choice for ruby if for concurrent workers that (Yes it’s ok to avoid threads altogether). And for advanced threading, take a look at Celluloid and it’s Actor model.
Related Links:
A Talk about Background Jobs and Failure: Video, Slides.
A Talk about Background Jobs and more Failure: Video, Slides
Share your thoughts with @engineyard on Twitter