Stream processing in Ruby

4 minute read

Reactive applications seems to have been the new hotness this year. Whether it’s at a conference about Big Data, DevOps or IoT, the same use case is coming up repeatedly - the need to process real-time streams of data. There’s even a reactive programming manifesto for those really feeling the need to get polemical about it.

A couple of years back I did a piece of consultancy work where I needed to process twitter data and catch a load of web hooks to provide real-time reporting on a big social media campaign. I was taking the opportunity to have a play with RabbitMQ and needed an easy way to manage a swarm of small separate agents doing simple processing operations on queues… Qswarm, my own little reactive framework, was born.

Roll forward to 2013 and I’m back to messing about with queues again and deciding that my initial implementation of the Qswarm DSL needs an overhaul. So after a considerable syntax re-write, and a decent period of testing with real data, I’m releasing the new version of Qswarm and bumping it to version 1.0.

Enough back story, let’s dive in to some examples. Qswam initially ships with a small number of clients which allow you to source and sink data from AMQP, Twitter, and XMPP. Here’s a simple chatbot programme to source data from an AMQP queue and dump it to an XMPP server such as Hipchat.

agent :mybot do
  connect :hipchat,
          :type            => :xmpp,
          :jid             => '54321_123456@chat.hipchat.com',
          :real_name       => 'MyBot',
          :channel         => ['54321_lounge@conf.hipchat.com', '54321_chat@conf.hipchat.com'],
          :password        => 'foobar'

  connect :messages,
          :type            => :amqp,
          :uri             => 'guest:guest@localhost:5672/',
          :exchange_type   => :topic,
          :exchange_name   => 'messages',
          :bind            => '#',
          :format          => :raw

  source  :messages do
    case payload.routing_key
    when /^wall\./
      sink :hipchat
    when /^channel\.(.*)/
      sink  :hipchat,
            :channel       => "54321_#{$1}@conf.hipchat.com"
    end
  end
end

The first connect sets up the XMPP connection, in this case to Hipchat, passing an initial set of groupchat rooms to join. The second connect sets up the XMPP stream subscribing to a # wildcard on the messages topic exchange and specifying the messages will be raw (i.e. plain text rather than json or XML).

The final section is where the flow logic is setup. We source messages from the :messages connection and then dependant on the routing key, sink them either to all the groupchat rooms if its a wall, or to a specific room based on the routing key. Save the above file as bot.swarm and let’s give it a test with the RabbitMQ command line:

qswarm bot.swarm &
rabbitmqadmin publish exchange=messages routing_key=channel.lounge payload="Test message"

Now we have our chat bot running, let’s see if we can feed it a tweetstream. For this you’ll need to get oauth connection details from dev.twitter.com.

agent :twitter do
  connect :amqp,
          :type            => :amqp,
          :uri             => 'guest:guest@localhost:5672/',
          :exchange_type   => :headers,
          :exchange_name   => 'twitter',
          :format          => :json

  connect :twitter,
          :type            => :twitter,
          :consumer_key    => 'YOURKEYHERE',
          :consumer_secret => 'YOURSECRETHERE',
          :oauth_token     => 'YOURTOKENHERE',
          :oauth_token_secret => 'YOURSECRETHERE',
          :track           => {
            :colours         => ['red', 'green', 'blue'],
            :feelings        => ['happy', 'sad'],
            :tech            => ['ruby', 'python'],
          },
          :follow          => {
            :tech            => [11987892]
          },
          :list            => {
            :flibbertigibbets     => { 'Scobleizer' => 'most-influential-in-tech' }
          }

  source :twitter do
    case payload.headers[:type]
    when :follow
      sink :amqp,
           :routing_key    => "follow.#{payload.data[:user][:id]}"

    when :track
      sink :amqp,
           :routing_key  => "track.#{agent.name}"

    when :list
      sink :amqp,
           :routing_key  => "list.#{payload.headers[:user_id]}.#{payload.headers[:slug]}"
    end
  end
end

The twitter source allows you to track keywords used in tweets, follow particular users, and track tweets from people in lists. The flow logic in this case takes each of these twitter events and pumps them into an exchange with the appropriate routing key. We’re using a headers exchange in this case as the twitter client adds useful headers to the payload (used in the case statement):

  • :type => :track, :group => colours feelings tech, :matches => [red green blue happy sad ruby python]
  • :type => :follow, :group => tech, :user_id => 11987892
  • :type => :list, :group => flibbertigibbets, :user_id => Scobleizer, :slug => most-influential-in-tech

Now lets wire this up to our bot with a final agent that routes between the twitter exchange and the messages exchange.

agent :chirp do
  connect :twitter,
          :type            => :amqp,
          :uri             => 'guest:guest@localhost:5672/',
          :exchange_type   => :headers,
          :exchange_name   => 'twitter',
          :bind            => '#',
          :format          => :json

  connect :messages,
          :type            => :amqp,
          :uri             => 'guest:guest@localhost:5672/',
          :exchange_type   => :topic,
          :exchange_name   => 'messages',
          :format          => :raw

  before :twitter do
    @pp = "<#{payload.data[:user][:name]}/#{payload.data[:user][:screen_name]}> #{payload.data[:text]}"
    payload.data[:entities][:user_mentions].each do |u|
      @pp.gsub!(/@#{u[:screen_name]}/,"<#{u[:name]}/#{u[:screen_name]}>")
    end
  end

  source  :twitter, :type => 'list' do
    if payload.data[:text].match(/awesome/)
      sink  :messages,
            :routing_key     => "wall.#{agent.name}",
            :data            => @pp
    end
  end

  source  :twitter, :group => 'tech' do
    sink  :messages,
          :routing_key     => 'channel.tech',
          :data            => @pp
  end

  source  :twitter, :type => 'track', :group => %w( colours feelings ) do
    sink  :messages,
          :routing_key     => 'channel.chat',
          :data            => @pp
  end

  source  :twitter, :type => 'list', :slug => 'most-influential-in-tech' do
    sink  :messages,
          :routing_key      => 'channel.lists',
          :data             => @pp
  end
end

We’re doing a couple of new things here. Firstly the before statement allows us to register a filter which we’re using to format the message. The source statements also have guards on them which allows us to do conditional execution based on the headers. Finally in the first source statement you can see we’re using data from the message payload - in this case the tweet which has been turned into a Ruby Hash as we specified JSON format in the connection.

Putting all the code together and running it with Qswarm should now give you a selection of tweets filtered to different channels. Of course you could equivalently resort to a simpler script which takes a stream from twitter and sends it straight to Hipchat without the intermediate exchanges, but the intention was to show what could be done in a more complicated decoupled setup using all the clients.