What Are Events, Why Might You Care, and How Can EventMachine Help?
You have likely heard of event based/driven programming, which you may also see referred to as “evented” programming. It has been around and in use for a long time, but it is seeing a growing swell of interest in recent years. Perl has had POE for years. Likewise, Python has had Twisted for quite a few years. Graphics toolkits such as GTK use event loops to respond to user interface events. Javascript has been getting a lot of attention lately because of the Node.js event driven framework. Ruby has had the EventMachine library since 2006, with other event driven programming libraries such as Rev and Coolio being released since then.
Despite this relative wealth of libraries, and growing interest in the event driven programming paradigm, this realm of software design is still shrouded in mystery and unknowns for many developers. People who are new to it tend to misunderstand it, often assuming magic that does not exist, or simply misunderstanding what “event driven” really means. Even developers with experience writing event based software using one of the previously mentioned libraries are often fuzzy on the details. They may assume that their library of choice represents the “real” world of event based programming. Or simply because their use cases only take them to a few familiar neighborhoods of evented programming, they may retain some of that newbie fog for other parts.
This series of articles is going to attempt to rectify some of those situations. The precise course of the articles will be determined as they progress, but for today, let’s start at the beginning. Event Based/Driven Programming. It’a hot. All the cool kids are doing it. It’s got EVENTS! Raise your hand if you truly know what that means, or why you should actually care.
Event
–noun
- something that happens or is regarded as happening; an occurrence, especially one of some importance.
- the outcome, issue, or result of anything: The venture had no successful event.
- something that occurs in a certain place during a particular interval of time.
That quote was courtesy of http://dictionary.reference.com/browse/event
Applying that definition to the world of software is a pretty direct thing, as it turns out. Event based programming is nothing more than letting the flow of the program be determined by some set of events. Hardware interrupts are an example of a ubiquitous source of events. On Unix systems, signals and the signal handlers that deal with them are a type of event based programming, as well. A typical pattern for windowing systems is to operate with an event based model; the software can’t know when one is going to click on a menu item, or a dialog button, so the software instead runs in a loop, waiting for some event to happen. When an event occurs, the software calls into another piece of code to handle that event.
At its simplest, that general pattern of the system being divided into a dyad consisting of one part that detects or selects events, with a second part that handles them, is what event based programming is actually about. If you have been programming for any length of time, the odds are pretty good that at least in some small ways, you have engaged in event driven programming even if you didn’t realize it.
If you go back and look at any of those libraries that I mentioned at the top of the articles, you will notice a trend. Each of those libraries is an event driven programming library, but there is a fair amount of variation across them. This is because event driven is a vague label, encompassing numerous patterns and feature sets. One of the most common of these patterns is the Reactor pattern.
The Reactor pattern describes a system that handles asynchronous events, but that does so with synchronous event callbacks. There are several ruby implementations of this pattern, including the most common library for event based programming in Ruby today, EventMachine. A reactor is good at handling many concurrent streams of incoming our outgoing IO, but because the callbacks are invoked synchronously, callback handling can severely impact the concurrency, or apparent concurrency, of a reactor implementation. Nonetheless, reactors are easy to implement, and with a little care, can be used to drive high performance IO on a single threaded, single process application.
As I mentioned, EventMachine is a Reactor implementation. And it is perfectly possible to install EventMachine, look at a few documents and a few examples, and start writing your own event based software that uses it without really having a good idea of how the machine is running under the hood. But there is value in understanding what a Reactor actually is, so that you better understand what a library like EventMachine is doing for you. Ruby gives us a lot of tools that make it reasonably easy to write a very simple pure ruby reactor implementation, so let’s do that. It should make this topic much clearer when you see how simple it actually is. All of the code shown below can also be found on GitHub at https://github.com/engineyard/khaines_blog_code_examples/tree/master/what_are_events. All of these pure ruby examples should work on every Ruby implementation. I have tried it on MRI 1.8.7_p352 and 1.9.3_p0, as well as JRuby 1.6.5 and Rubinius 2.0.0dev, though I did not extensively test on anything other than MRI 1.9.3, so quirks may exist.For our pure ruby reactor, we want only a few features.
- Unlike EventMachine, which also includes substantial support for managing creations of network connections, servers, and other more sophisticated activities, our reactor is going to limit itself to only being a tool for handling events. Therefor, all that it needs is a way to attach and detach IO objects to/from the reactor. We’ll use the built in ruby mechanisms for everything else.
- Ruby has the select() call available to it on all platforms, so our reactor will be designed to use it. The select() call returns readable handles, writeable handles, and errors from a set of filehandles to operate on, so those three events (
:read, :write, :error
) will likewise be all that our reactor handles. - Timers are very useful, and are pretty easy to implement in a reactor, so it would be nice to have a timer implementation.
Even though it is not strictly necessary for a reactor implementation, I will start our implementation with the timer functionality. Timers are events which are time based. Their callback is triggered at some point after a given time threshold is reached. The difficulty with timers is in choosing a mechanism for storing them such that the ones which need to be triggered can be easily and efficiently detected. Time, however, is a sortable attribute, and there are some data structures that are great for storing sortable data where that sorted data order is important. There are tree based data structures which are very efficient at maintaining this sort of data. Ruby doesn’t have one of those as a native data type, so for this example, I will just fake it. If I were writing a serious implementation, I would have to do more work to provide an efficient data structure for timer data. The following data structure is built on top of a hash, makes no claims to be efficient, and provides the bare minimum API for our reactor to have the tool that it needs to implement timers.
class SimpleReactor
class TimerMap < Hash
def []=(k,v)
super
@sorted_keys = keys.sort
v
end
def delete k
r = super
@sorted_keys = keys.sort
r
end
def next_time
@sorted_keys.first
end
def shift
if @sorted_keys.empty?
nil
else
first_key = @sorted_keys.shift
val = self.delete first_key
[first_key, val]
end
end
def add_timer time, *args, &block
time = case time
when Time
Time.to_i
else
Time.now + time.to_i
end
self[time] = [block, args] if block
end
def call_next_timer
_, v = self.shift
block, args = v
block.call(*args)
end
end
end
Ok. Now let’s start writing a reactor! Since we started with timers, we’ll just write enough to make timers work. So, first, an #initialize method, and a method to add timers.
class SimpleReactor
def initialize
@running = false
@timers = TimerMap.new
@block_buffer = []
end
def add_timer time, *args, &block
time = time.to_i if Time === time
@timers.add_timer time, *args, &block
end
There’s nothing interesting with the initialization. It just sets the @running
instance variable false. This will be used in an upcoming bit of code. The method to add a timer also does nothing special; it just passes everything into a method of the same name in the TimerMap. The next part that is needed is the skeleton of our reactor. Here is what it looks like:
def next_tick &block
@block_buffer << block
end
def tick
handle_pending_blocks
handle_events
handle_timers
end
def run
@running = true
yield self if block_given?
tick while @running
end
def stop
@running = false
end
def handle_pending_blocks
@block_buffer.length.times { @block_buffer.shift.call }
end
def handle_events
end
def handle_timers
now = Time.now
while [email protected]? && @timers.next_time < now
@timers.call_next_timer
end
end
def empty?
@timers.empty? && @block_buffer.empty?
end
end
There you have it. A reactor skeleton, albeit one that only supports timers and next_tick right now. Here’s an example that uses it:
require 'simplereactor'
puts <<ETXT
This demo will add a sequence of numbers to a sum, via a timer, once a second,
for four seconds, with the fifth number immediately following the fourth.
1+2+3+4+5 == 15. Let's see if that's the answer that we get."
ETXT
n = 0
reactor = SimpleReactor.new
reactor.add_timer(1) do
puts "one"
n += 1
end
reactor.add_timer(2) do
puts "two"
reactor.add_timer(1, n + 2) do |sum|
puts "three"
reactor.add_timer(1, sum + 3) do |sum|
puts "four"
n = sum + 4
reactor.next_tick do
puts "five"
n += 5
puts "n is #{n}\nThe reactor should stop after this."
end
end
end
end
reactor.tick until reactor.empty?
There’s no real magic here. The code shows that one can create timers, and can create new timers within the callback code of existing timers, leveraging Ruby’s block syntax. If you run this, you will get output like this:
(This demo will add a sequence of numbers to a sum, via a timer, once a second, for four seconds, with the fifth number immediately following the fourth. 1+2+3+4+5 == 15. Let's see if that's the answer that we get.
one
two
three
four
five
n is 15
The reactor should stop after this.
)
Take note of the last line in the example code – reactor.tick until reactor.empty?
. The reactor will not do anything until that line runs. That line sits in a loop, ticking our reactor repeatedly until there’s nothing left for it to do. At that point, #empty? returns true, the loop terminates, and the program terminates.
The next step in this adventure is to add enough code to our reactor to do something useful with IO objects, as well. We need to be able to attach them to the reactor, detach them from the reactor, and put enough intelligence into the reactor to find events to respond to, and trigger the callbacks for those events.
First add a constant and some accessors, and change the #initialize method:
Events = [:read, :write, :error].freeze
attr_reader :ios
def self.run &block
reactor = self.new
reactor.run &block
end
def initialize
@running = false
@ios = Hash.new do |h,k|
h[k] = {
:events => [],
:callbacks => {},
:args => [] }
end
@timers = TimerMap.new
@block_buffer = []
end
(This adds a hash for holding our IO objects. It has an initializer to hold an array of events that that the IO object will respond to, a hash of callbacks (potentially one per event type), and some set of args which can be passed to an invoked callback. A hash is also created to hold unhandled events, should they occur.
Next, let’s add some methods to attach an IO object to the reactor, setup callbacks, and detach an IO object to the reactor.)
def attach io, *args, &block
events = Events & args
args -= events
@ios[io][:events] |= events
setup_callback io, events, *args, &block
self
end
def setup_callback io, events, *args, &block
i = @ios[io]
events.each {|event| i[:callbacks][event] = block }
i[:args] = args
i
end
def detach io
@ios.delete io
end
The code takes an IO object, a set of args to pass into the callback, and a block. It adds it to the @ios hash, and sets up the callback for the given events.
Next, we need to add a few small methods to enable triggering on IO events.
def handle_events
unless @ios.empty?
pending_events.each do |io, events|
events.each do |event|
if @ios.has_key? io
if handler = @ios[io][:callbacks][event]
handler.call io, *@ios[io][:args]
end
end
end
end
end
end
The #handle_events method is straightforward. If there are any attached IO objects, iterate through the events, calling the callbacks for each. In the existing code, we should never have unhandled events, but by adding that now, one could take this library and expand it more easily into a larger pure ruby reactor that handles types of events other that just what select()
uses.
def pending_events
# Trim our IO set to only include those which are not closed.
@ios.reject! {|io, v| io.closed? }
h = find_handles_with_events @ios.keys
if h
handles = Events.zip(h).inject({}) {|handles, ev| handles[ev.first] = ev.last; handles}
events = Hash.new {|h,k| h[k] = []}
Events.each do |event|
handles[event].each { |io| events[io] << event }
end
events
else
{} # No handles
end
end
def find_handles_with_events keys
select find_ios(:read), find_ios(:write), keys, 0.01
end
def find_ios(event)
@ios.select { |io, h| h[:events].include? event}.collect { |io, data| io } }
end
def empty?
@ios.empty? && @timers.empty? && @block_buffer.empty?
end
This last bit of code just adds the nitty gritty methods that figures out if there are any events that need to be handled. It removes from @ios any handles that are closed, uses select()
to find IO events, and then returns a hash of IO objects and the events that have been triggered on them. String all of this code together, and it is all that you need to have a basic working Reactor pattern for event based programming, with timer support. Here’s a trivial example that uses pipes, to illustrate how it works.
require 'simplereactor'
chunk = "01234567890" * 30
reactor = SimpleReactor.new
reader, writer = IO.pipe
reactor.attach writer, :write do |write_io|
bytes_written = write_io.write chunk
puts "Sent #{bytes_written} bytes: #{chunk[0..(bytes_written - 1)]}"
chunk.slice!(0,bytes_written)
if chunk.empty?
reactor.detach write_io
write_io.close
end
end
reactor.attach reader, :read do |read_io|
if read_io.eof?
puts "finished; detaching and closing."
reactor.detach read_io
read_io.close
else
puts "received: #{read_io.read 200}"
end
end
reactor.add_timer(2) do
puts "Timer called; the code should exit after this."
end
reactor.tick until reactor.empty?
All that code does is to open a pair of pipes. The reactor attaches to one end as a writer, and the other end as a reader. The callbacks are used to send data from one end of the pipe to the other, where it is received. The Reactor will run for two seconds, then after the timer runs, the reactor will be empty, and it will exit. It looks like this:
Sent 330 bytes: 012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890
received: 01234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001
received: 2345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890012345678900123456789001234567890
finished; detaching and closing.
Timer called; the code should exit after this.
(Here’s another example. This one takes user input through the reactor via STDIN, and at the same time runs the stupidest, simplest web server possible. That web server will return a response that includes whatever the user input. The example code is also written to leverage the #run method defined in the library instead of cranking it ourselves.)
require './simplereactor'
require 'socket'
server = TCPServer.new("0.0.0.0", 9949)
buffer = ''
puts <<ETXT
Type some text and press after each line. The reactor is attached to
STDIN and also port 9949, where it listens for any connection and responds with
a basic HTTP response containing whatever has been typed to that point. These
two dramatically different IO streams are being handled simultaneously. Type
to exit, or wait one minute, and a timer will fire which causes the
reactor to stop and the program to exit.
ETXT
SimpleReactor.run do |reactor|
reactor.attach(server, :read) do |server|
conn = server.accept
conn.gets # Pull all of the incoming data, even though it is not used in this example
conn.write "HTTP/1.1 200 OK\r\nContent-Length:#{buffer.length}\r\nContent-Type:text/plain\r\nConnection:close\r\n\r\n#{buffer}"
conn.close
end
characters_received = 0
reactor.attach(STDIN, :read) do |stdin|
characters_received += 1
data = stdin.getc # Pull a character at a time, just for illustration purposes
unless data
reactor.stop
else
buffer << data
end
end
reactor.add_timer(60) do
reactor.stop
end
end
When this is executed, it attaches to STDIN, allowing one to provide input which is buffered internally. Any connection to port 9949 returns a simple HTTP response that contains the buffer that was created through STDIN. The process will run for 60 seconds, then the reactor will stop. Bearing in mind that this is a ridiculously trivial example, it does perform pretty well, too. Below is an excerpt from a test run, done using Ruby 1.9.3_preview1, on one of my older Linux machines.
Concurrency Level: 25
Time taken for tests: 1.60504 seconds
Complete requests: 15000
Failed requests: 0
Write errors: 0
Total transferred: 1275000 bytes
HTML transferred: 75000 bytes
Requests per second: 14144.22 [#/sec] (mean)
Time per request: 1.768 [ms] (mean)
Time per request: 0.071 [ms] (mean, across all concurrent requests)
Transfer rate: 1173.97 [Kbytes/sec] received
Of course, this is an absurdly trivial example. There may be bugs, and it doesn’t really do a lot for you, but it is an event reactor, written in pure ruby, and if you went through the code and examples as you read, you should have a better feel for what a reactor truly is.
If you want to write more sophisticated event based software, you could continue using a simple hand-rolled pure ruby reactor like this one, or you might choose to use one of the other common libraries for Ruby today. There are several of them, each with their own strengths and weaknesses, though the most common is EventMachine. Just like our simple reactor, EventMachine offers timers and asynchronous handling of events, though EventMachine’s versions will scale better. This article’s version uses the select() call, which limits code using it to 1024 open file descriptors. On the other hand, EventMachine, if used on a platform that support epoll (Linux) or kqueue (various *BSD platforms), can readily support at least 10s of thousands. EventMachine also offers a more rich set of features for implementing event based code than this article’s example reactor. As a parting example, here is the HTTP server example from above, written to use EventMachine.
require 'rubygems'
require 'eventmachine'
module ServerHandler
def initialize(buffer)
@buffer = buffer
super
end
def receive_data data
send_data "HTTP/1.1 200 OK\r\nContent-Length:#{@buffer.length}\r\nContent-Type:text/plain\r\nConnection:close\r\n\r\n#{@buffer}"
close_connection_after_writing
end
end
module KeyHandler
def initialize buffer
@counter = 0
@buffer = buffer
super
end
def receive_data data
@counter += 1
if data.chomp.empty?
EM.stop
else
@buffer << data
end
end
end
puts <<ETXT
Type some text and press after each line. The reactor is attached to
STDIN and also port 9949, where it listens for any connection and responds with
a basic HTTP response containing whatever has been typed to that point. These
two dramatically different IO streams are being handled simultaneously. Type
a blank line to exit, or wait one minute, and a timer will fire which causes the
reactor to stop and the program to exit.
ETXT
buffer = ''
EventMachine.run do
EM.start_server('0.0.0.0',9949,ServerHandler,buffer)
EM.attach(STDIN, KeyHandler, buffer)
EM.add_timer(60) do
EM.stop
end
end
(And since I demonstrated the performance of the pure Ruby version, here’s the performance of the EventMachine version (using EventMachine 0.12.10), running on the same system, using the same Ruby 1.9.3_preview1 installation.)
Concurrency Level: 25
Time taken for tests: 0.696320 seconds
Complete requests: 15000
Failed requests: 0
Write errors: 0
Total transferred: 1275425 bytes
HTML transferred: 75025 bytes
Requests per second: 21541.82 [#/sec] (mean)
Time per request: 1.161 [ms] (mean)
Time per request: 0.046 [ms] (mean, across all concurrent requests)
Transfer rate: 1787.97 [Kbytes/sec] received
The code is structured differently, but as you can see, it works similarly. In our simple reactor example, if you changed data = stdin.getc
to data = stdin.gets
, the STDIN handling would behave similarly to the EM examples STDIN handling. However, given the experience of writing a pure ruby reactor, even if you have never used EventMachine before, I think you can now look at that piece of EventMachine code and generally understand how it works, both at the level of the ruby code itself, and with a good idea of what EventMachine is handling for you. This basic understanding of how event driven software actually works is key to writing software that uses the event paradigm effectively.
I have focused on EventMachine in this article because it is the most commonly used event reactor implementation in the Ruby world today. As I mentioned at the beginning of the article, however, there are other choices, such as Coolio. In future articles I will continue to focus on EventMachine, but I will try to include some examples from other frameworks, and also some examples focused on JRuby and Rubinius. Please let us know if there are particular topics that you would like to see discussed.
Share your thoughts with @engineyard on Twitter