Lessons Learned in Concurrency with Ruby - Part I

The Engine Yard PaaS products are highly complex, and span thousands of servers to serve our customers. Ensuring that our applications run quickly and reliably requires a good understanding of concurrency and parallelization. This blog post is a recap of my journey down the rabbit hole of concurrency in an attempt to solve this problem in my own development efforts. Part II will be the continuation of this trek, only with more of a focus on parallelization.

Hardware?

We can try throwing hardware at the problem. However, if CPU is the bottleneck then increasing the speed per core will only go so far before it no longer helps. A similar statement holds true for adding more servers. If the processing happens in Serial, one task after another, we only get a fixed number of processing units done per server added.

Become a “Serial” Killer!

The name of the game these days is not as much core speed as it is core density. CPU’s aren’t getting much faster - they are getting more cores. However, unless you can break your application into “bite-sized” chunks that can all run at the same time (or “parallelize”), you cannot take advantage of this multi-core goodness.

Concurrency vs Parallelism

When tackling such a complex problem, it is important to know the difference between concurrency and parallelism. Concurrency is the ability of a program to make progress on a task that is spread out over different slices of time. It allows us to run multiple “threads” at the same time. As one thread may be sleeping or waiting on I/O, another thread may take priority and start working, thus making optimal use of available CPU time. When you think of concurrency, think of “threads”. Parallelism is the ability to accomplish multiple tasks simultaneously. Concurrency may give the illusion of parallelism, but is not parallelism in and of itself, as your threads are still deferring to one another competing for the same CPU resources. For example: In MRI, you have something called a Global Interpreter Lock (GIL). Even though your code uses multiple threads, those threads are run one at a time, due to the GIL. Therefore, your application is not truly multithreaded! Instead what MRI does is switch quickly between running threads giving each a small window of processing time. Your I/O may run parallelized, but the ruby code is still run one thread at a time. Fortunately, there are a few options for you! The option I advocate is Rubinius. Rubinius is a dynamic language platform implemented from scratch for the Ruby programming language. It includes a bytecode interpreter, modern garbage collector, just-in-time compiler to machine code, and native threads with no global interpreter lock, so multiple Ruby threads will run in parallel if the hardware supports it. Rubinius was conceived by Evan Phoenix and is maintained by Dirkjan Bussink and Brian Ford of Engine Yard. Most importantly for our discussion here is that Rubinius has no GIL and thus supports true multithreading. In Rubinius each thread is mapped to a native operating system light weight process (LWP). Rubinius is the ruby interpreter we use for our project mentioned above. Even if you correctly implement threading, you won’t get the full benefit of your available hardware until you look at other options, such as adding multi-process and multi-machine capabilities to your application. These techniques will be discussed in Part II. Please read Evan Phoenix’s post for a more in-depth discussion on Ruby concurrency and parallelism.

Threading

When you create a Thread, the code contained within the thread block is run in parallel to the main application that instantiated the thread.

For our example application, let’s start with a simple currency downloader:

require 'net/http'

# Our sample set of currencies
currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']

currencies.each do |currency|
    puts Net::HTTP.get("download.finance.yahoo.com","/d/quotes.csv?e=.csv&f=sl1d1t1&s=USD#{currency}=X")
end
puts "DONE!"

It works fine as is, however, since we are processing each next currency one at a time it takes a bit. Let’s make this faster by using concurrency (parallelize it). A more concurrent example might look like this:

require 'net/http'

# Our sample set of currencies
currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']
# Create an array to keep track of threads
threads = []

currencies.each do |currency|
    # Keep track of the child processes as you spawn them
    threads << Thread.new do
        puts Net::HTTP.get("download.finance.yahoo.com","/d/quotes.csv?e=.csv&amp;f=sl1d1t1&amp;s=USD#{currency}=X")
    end
end
# Join on the child processes to allow them to finish
threads.each do |thread|
    thread.join
end
puts "DONE!"

Notice that we are using an array to keep track of the threads we spawn. We usually want to wait for all threads to finish (or “join”) before moving on, rather than just let them run amok. Also, if you fail to make the parent thread join (wait) on its children, the children could be killed before they finish running. Without the join loop your output would look like this:

DONE!

With the join loop included your output looks like this:

"\"USDCAD=X\",0.9967,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDINR=X\",54.575,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDAUD=X\",0.9649,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDILS=X\",3.9054,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDCNY=X\",6.2443,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDEUR=X\",0.7818,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDXAG=X\",0.0321,\"11/5/2012\",\"3:31pm\"\r\n"
"\"USDGBP=X\",0.6259,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDHKD=X\",7.7501,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDARS=X\",4.773,\"11/5/2012\",\"4:23pm\"\r\n"
"\"USDUSD=X\",0.00,\"N/A\",\"N/A\"\r\n"
"\"USDXAU=X\",0.0006,\"11/5/2012\",\"4:20pm\"\r\n"
"\"USDDEM=X\",0.00,\"N/A\",\"N/A\"\r\n"
DONE!

Threads Gone Wild!

OK, great - we now know how to use threads. We still have a problem: In our example, let’s say we expand our application to use over 500 currencies (if 500 currencies do indeed exist…). Does this mean I can take my entire application, put these calls into 500 simultaneous threads, and let it run? While we can absolutely do so, it is not such a good idea unless you have a 100 core CPU. Here’s why:

In any multithreaded environment, context switching occurs. Context switching is the process by which a thread is stopped and its state and context are stored allowing other threads to use CPU cycles. Once the competing thread has been interrupted in a similar manner, the context and state of the original thread are loaded and the original thread has the opportunity to run if it gets priority.

One other thing to note - the actual act of context switching in and of itself takes CPU cycles. If you simply spawn as many threads as you can, your system can actually spend more time context switching than actually processing your data. This will slow down your application.

So how do we properly schedule our threads? The common solution that I use is the producer-consumer model. In this model, we have 2 main threads: The Producer thread provides units of work to be done, and the Consumer thread takes those units of work and spins up threads to do the work.

There are a few avenues of communication that need to be laid out for this to work:

  • The Producer needs a way to give units of work to the Consumer.
  • The Consumer needs to know which threads are working and which ones are done so others can be scheduled.
  • The Producer needs a way to tell the Consumer that it has given out all available work.

Let’s apply this model to our example:

require 'thread'
require 'monitor'
require 'net/http'

# Our sample set of currencies
currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']

# Set a finite number of simultaneous worker threads that can run
thread_count = 5

# Create an array to keep track of threads
threads = Array.new(thread_count)

# Create a work queue for the producer to give work to the consumer
work_queue = SizedQueue.new(thread_count)

# Add a monitor so we can notify when a thread finishes and we can schedule a new one
threads.extend(MonitorMixin)

# Add a condition variable on the monitored array to tell the consumer to check the thread array
threads_available = threads.new_cond

# Add a variable to tell the consumer that we are done producing work
sysexit = false

consumer_thread = Thread.new do
  loop do
    # Stop looping when the producer is finished producing work
    break if sysexit &amp; work_queue.length == 0
    found_index = nil

    # The MonitorMixin requires us to obtain a lock on the threads array in case
    # a different thread may try to make changes to it.
    threads.synchronize do
      # First, wait on an available spot in the threads array.  This fires every
      # time a signal is sent to the "threads_available" variable
      threads_available.wait_while do
        threads.select { |thread| thread.nil? || thread.status == false  ||
                                  thread["finished"].nil? == false}.length == 0
      end
      # Once an available spot is found, get the index of that spot so we may
      # use it for the new thread
      found_index = threads.rindex { |thread| thread.nil? || thread.status == false ||
                                              thread["finished"].nil? == false }
    end

    # Get a new unit of work from the work queue
    currency = work_queue.pop

    # Pass the currency variable to the new thread so it can use it as a parameter to go
    # get the exchange rates
    threads[found_index] = Thread.new(currency) do
      puts Net::HTTP.get("download.finance.yahoo.com","/d/quotes.csv?e=.csv&amp;f=sl1d1t1&amp;s=USD#{currency}=X")
      # When this thread is finished, mark it as such so the consumer knows it is a
      # free spot in the array.
      Thread.current["finished"] = true

      # Tell the consumer to check the thread array
      threads.synchronize do
        threads_available.signal
      end
    end
  end
end

producer_thread = Thread.new do
  # For each currency we need to download...
  currencies.each do |currency|
    # Put the currency on the work queue
    work_queue << currency

    # Tell the consumer to check the thread array so it can attempt to schedule the
    # next job if a free spot exists.
    threads.synchronize do
      threads_available.signal
    end
  end
  # Tell the consumer that we are finished downloading currencies
  sysexit = true
end

# Join on both the producer and consumer threads so the main thread doesn't exit while
# they are doing work.
producer_thread.join
consumer_thread.join

# Join on the child processes to allow them to finish (if any are left)
threads.each do |thread|
    thread.join unless thread.nil?
end
puts "DONE!"

In this case, if we were to add hundreds of currencies to the list, it would not overwhelm our system. You would want to adjust the “thread_count” variable as you monitor your running system until you get the maximum performance available to your application.

Be Consistent

As threads execute, you are no longer assured of the order in which your code runs. Therefore, anything that alters a shared structure (such as a variable) could potentially put your application into an inconsistent state.

Instead of simply writing our results to the console, let us instead change our example to write the results to a shared array for printing later:

require 'thread'
require 'monitor'
require 'net/http'

# Our sample set of currencies
currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']

# Store our results here
results = Array.new
...
    # Pass the currency variable to the new thread so it can use it as a parameter to go
    # get the exchange rates
    threads[found_index] = Thread.new(currency) do
      # Add the results to the array
      results &lt;&lt; Net::HTTP.get("download.finance.yahoo.com","/d/quotes.csv?e=.csv&amp;f=sl1d1t1&amp;s=USD#{currency}=X")

      # When this thread is finished, mark it as such so the consumer knows it is a
      # free spot in the array.
      Thread.current["finished"] = true

      # Tell the consumer to check the thread array
      threads.synchronize do
        threads_available.signal
      end
    end
...
# Show our downloaded currencies as they are stored in the array
puts results.inspect
puts "#{results.length} currencies returned."
puts "DONE!"

So we run our application, and make a discovery:

["\"USDCNY=X\",6.2254,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDAUD=X\",0.9536,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDCAD=X\",0.9905,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDDEM=X\",0.00,\"N/A\",\"N/A\"\r\n", "\"USDARS=X\",4.8585,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDEUR=X\",0.7735,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDHKD=X\",7.7501,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDGBP=X\",0.6234,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDINR=X\",54.475,\"12/7/2012\",\"4:14pm\"\r\n", "\"USDUSD=X\",0.00,\"N/A\",\"N/A\"\r\n", "\"USDXAU=X\",0.0006,\"12/7/2012\",\"4:12pm\"\r\n", "\"USDXAG=X\",0.0303,\"12/7/2012\",\"3:33pm\"\r\n"]
12 currencies returned.

Only 12 currencies are returned, even though we are asking for 13. If we run it again… we get 13. Sometimes 12 on other tries. Why do we randomly get different results?

The trouble here lies in the fact that all of our child threads are accessing 1 array. When 2 or more threads append a result to the array at the exact same time, one may overwrite the others as the threads insert the item into the same indexed spot in the array. This means that at some time during the execution of the code above, one of our returned currencies is being overwritten.

To address this issue, we can add a Mutex which ensures that whenever a thread runs code inside the mutex “synchronize” block, it is the only thread writing to the array:

require 'thread'
require 'monitor'
require 'net/http'

# Our sample set of currencies
currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']

# Store our results here
results = Array.new

# Create a mutex for the shared results array
results_mutex = Mutex.new
...
    # Pass the currency variable to the new thread so it can use it as a parameter to go
    # get the exchange rates
    threads[found_index] = Thread.new(currency) do
      # Add the results to the array
      results_mutex.synchronize do
        results &lt;&lt; Net::HTTP.get("download.finance.yahoo.com","/d/quotes.csv?e=.csv&amp;f=sl1d1t1&amp;s=USD#{currency}=X")
      end

      # When this thread is finished, mark it as such so the consumer knows it is a
      # free spot in the array.
      Thread.current["finished"] = true

      # Tell the consumer to check the thread array
      threads.synchronize do
        threads_available.signal
      end
    end
...
puts results.inspect
puts "#{results.length} currencies returned."
puts "DONE!"

Using the mutex, shown above, we now consistently get all 13 currencies back as expected:

["\"USDARS=X\",4.859,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDCNY=X\",6.2254,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDCAD=X\",0.9905,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDAUD=X\",0.9536,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDDEM=X\",0.00,\"N/A\",\"N/A\"\r\n", "\"USDEUR=X\",0.7735,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDGBP=X\",0.6235,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDHKD=X\",7.7501,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDILS=X\",3.8319,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDINR=X\",54.475,\"12/7/2012\",\"4:15pm\"\r\n", "\"USDUSD=X\",0.00,\"N/A\",\"7:47am\"\r\n", "\"USDXAG=X\",0.0303,\"12/7/2012\",\"3:33pm\"\r\n", "\"USDXAU=X\",0.0006,\"12/7/2012\",\"4:15pm\"\r\n"]
13 currencies returned.
DONE!

Memory Management

As we write applications that do many things in parallel in a short amount of time, we have to be careful how we use memory. There are a couple of pitfalls you can run into:

Memory Bloat

Bloat typically occurs when we keep stuffing objects into memory without giving the memory back. In a language like Ruby this means that you are ‘referencing’ the objects always, perhaps using a hash or an array for example. Your process ends up taking up more and more memory - “bloating” - forcing the operating system to move part of your process into swap (disk partition for memory overflow, slow!). A process which continues to ‘bloat’ until all available swap/RAM has been used up causes a segmentation fault or other undesired side effects within the operating system such as process killings.

One way to reduce bloat is to ensure that you allocate objects within the thread for which they are intended to be used. In a multithreaded application, all threads have access to the memory of the parent thread.

In this example, we’ve wrapped our application into a class function, and are now running it in a loop to get arrays of currencies every 5 minutes (300 seconds):

require 'thread'
require 'monitor'
require 'net/http'

class CurrencyDownloader
  class << self
    def download_currencies
      # Our sample set of currencies
      currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']
      ...
      # Show our downloaded currencies as they are stored in the array
      #puts results.inspect
      #puts "#{results.length} currencies returned."
      #puts "DONE!"
      # Return the results we downloaded
      return results
    end
  end
end

downloaded_currencies = Array.new
loop do
  Thread.new do
    downloaded_currencies &lt;&lt; CurrencyDownloader.download_currencies
  end
  sleep(300)
end

This is a fairly contrived example that you wouldn’t typically write. The point is to show how as subsequent threads run, the array keeps growing until we run out of memory.

Now, let’s rewrite the loop to be more conscientious of memory:

#downloaded_currencies = Array.new
loop do
  Thread.new do
    downloaded_currencies = Array.new
    downloaded_currencies &lt;&lt; CurrencyDownloader.download_currencies
    # Perhaps do something with the results here...
  end
  sleep(300)
end

As each thread terminates, the structures created within it are able to be garbage collected, thus freeing up memory for future threads and fighting ‘memory bloat’.

Memory Leaks

Memory leaks are a different animal, and are often harder to diagnose. In this case, your application has allocated memory, which for some reason has become inaccessible and cannot be freed back up by the Garbage Collector (GC).

Tools for Memory Analysis

There are many tools and gems out there that will help you “plug the leaks” and figure out where you need to optimize your code.

One “quick and dirty” way I found to figure out where you’re bloating is to use the following code:

def get_object_stats
  return_value = Hash.new
  ObjectSpace::each_object(Object) {|my_object|
    unless return_value[my_object.class.to_s.downcase].nil?
      return_value[my_object.class.to_s.downcase][:count] += 1
    else
      return_value[my_object.class.to_s.downcase] = Hash.new
      return_value[my_object.class.to_s.downcase][:name] = my_object.class
      return_value[my_object.class.to_s.downcase][:count] = 1
    end
  }
  return_value.sort_by {|k,v| -v[:count]}
end

@thread_object_stats_logger = Thread.new do
  loop do
    break if @sysexit || expired?
    str_usage_report =  "OBJECTS IN MEMORY AT PID=#{Process.pid}\n"
    str_usage_report &lt;&lt; "=============================================\n"
    get_object_stats.each do |stat|
       str_usage_report &lt;&lt; "#{stat.at(1)[:count]}\t\t\t#{stat.at(1)[:name]}\n"
    end
    str_usage_report &lt;&lt; "\n=============================================\n"
    info str_usage_report
    sleep 300
  end
end

This code runs on a set interval (in this case, 5 minutes) and catalogues how many of each type of object is currently allocated in your application. When you compare the results from one run to another, pay attention to the object classes that have consistently increasing numbers of objects allocated which are the objects to look at for optimization. This code is not something you want to run in Production as it adds additional overhead to your application. However, it’s probably something you want to bake into “debug mode” or activate with a command-line switch.

An article that I find especially helpful in understanding this was written by the Rubinius team. In this article they go over tools that are included in Rubinius, as well as how to use OS X’s “leaks” and gdb to do additional diagnosis:

http://rubini.us/doc/en/tools/memory-analysis/

Manage Your Dependencies

When writing a concurrent application, you must ensure that your gems and other dependencies have been written to be “thread-safe”. Here are a couple of examples:

Old Libraries

Be aware of how up to date both your OS libraries and gems are. Some newer versions have been better optimized for multithreaded applications. Even Rubinius itself is being diligently improved in this area. For example, I recently found a memory leak (using the leaks command) where I was seeing the SSL certificate for an API I was calling over and over again. By recompiling Rubinius with OpenSSL v1.0.1c, this memory leak was resolved.

Unnecessary Abstraction

A layer of abstraction may reduce the amount of code needed to write your application, but may add overhead. For example: In one of our applications, we were tracking down a thread safety and performance issue. We narrowed the problem down to a gem we created that inserts high volumes of data into a Postgres database. Other gems that performed similar tasks did not have the problem in question - the difference was that this particular gem was using an ORM instead of the pg gem to connect to Postgres. Removing the ORM gave our application a significant performance boost, and drastically reduced the amount of memory used.

Conclusion

So the main takeaways from this post are:

  • Look for ways to run multiple things at once instead of just throwing hardware at the problem.
  • Make sure you know the difference between concurrency and parallelism.
  • Pick the right interpreter if you really want a concurrent / parallelized application (use Rubinius!)
  • When using threads…
    • Make sure you wait for your threads to finish executing before you exit your application.
    • Use a control mechanism, such as the Producer/Consumer model, to make sure your threads are mostly working instead of just context-switching.
    • Protect structures in memory that are commonly edited by your threads - use a mutex or monitor.
    • Be conscious of how your application uses memory
    • Be sure to select gems that are thread-safe.

This concludes Part I. In Part II, we will discuss processes and queueing mechanisms - options to use when threads are not enough.