Rigel Group

They shoot Yaks, don't they?

Tee With Sinatra

OK, so I wanted to take all of the JSON data that we were stuffing into our Riak cluster, and send a copy of it to our ElasticSearch cluster as well, so that we could, you know, actually find the data later. We could have done this by modifying one of the Riak client libraries, but then any data that got uploaded through a different client would be missed. So as an experiment, we turned to our new favorite tool, Sinatra, and hacked up a Rack proxy app, that will intercept the incoming HTTP requests, send them on to Riak and also send a copy to the ElasticSearch cluster. We used Typhoeus as the HTTP client to do this, so that we could concurrently execute the 2 requests in the interests of speed.

Here is the proof-of-concept:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
require 'rubygems'
require 'sinatra'
require 'typhoeus'

OPTIONS = {}
OPTIONS[:riak_host] = "localhost"
OPTIONS[:riak_port] = "8098"
OPTIONS[:es_host] = "localhost"
OPTIONS[:es_port] = "9200"
OPTIONS[:riak_timeout] = 5000 # milliseconds
OPTIONS[:es_timeout] = 5000 # milliseconds

class Rack::Proxy

  def initialize(app)
    @app = app
    @hydra = Typhoeus::Hydra.new
  end

  def call(env)
    req = Rack::Request.new(env)
    # We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware.
    req_body = req.body.read if req.body

    riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}"

    opts = {:timeout => OPTIONS[:riak_timeout]}
    opts.merge!(:method => req.request_method.downcase.to_sym)
    opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type
    opts.merge!(:body => req_body) if req_body && req_body.length > 0

    riak_req = Typhoeus::Request.new(riak_url, opts)
    riak_response = {}
    riak_req.on_complete do |response|
      riak_response[:code] = response.code
      riak_response[:headers] = response.headers_hash
      riak_response[:body] = response.body
    end
    @hydra.queue riak_req

    # If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak"
    if (req.put? || req.post?) && req.content_type == "application/json"
      req.path =~ %r{^/riak/([^/]+)/([^/]+)}
      bucket, key = $1, $2
      es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}"
      opts = {:timeout => OPTIONS[:es_timeout]}
      opts.merge!(:method => req.request_method.downcase.to_sym)
      opts.merge!(:body => req_body) if req_body  && req_body.length > 0
      es_req = Typhoeus::Request.new(es_url, opts)
      es_response = {}
      es_req.on_complete do |response|
        es_response[:code] = response.code
        es_response[:headers] = response.headers_hash
        es_response[:body] = response.body
      end
      @hydra.queue es_req
    end

    # Concurrently executes both HTTP requests, blocks until they both finish
    @hydra.run

    #If we wrote to ES add a custom header
    riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code]

    #Typhoeus can add nil headers, lets get rid of them
    riak_response[:headers].delete_if {|k,v| v == nil}

    # Return original Riak response to client
    [riak_response[:code], riak_response[:headers], riak_response[:body]]
  end
end

use Rack::Proxy

(Gist here)

Execute the script, and it will listen on port 4567, so point your Riak client of choice there and start PUTing data, which will be seamlessly replicated into the ElasticSearch cluster. If we were really going to use this in anger, there is a lot of work yet to be done, but as a skeleton of how to use Sinatra (Rack, really) to quickly whip up custom proxys, and tee HTTP requests, I thought it might be useful.