OK, so I wanted to take all of the JSON data that we were stuffing into our Riak cluster, and send a copy of it to our ElasticSearch cluster as well, so that we could, you know, actually find the data later. We could have done this by modifying one of the Riak client libraries, but then any data that got uploaded through a different client would be missed. So as an experiment, we turned to our new favorite tool, Sinatra, and hacked up a Rack proxy app, that will intercept the incoming HTTP requests, send them on to Riak and also send a copy to the ElasticSearch cluster. We used Typhoeus as the HTTP client to do this, so that we could concurrently execute the 2 requests in the interests of speed.
require'rubygems'require'sinatra'require'typhoeus'OPTIONS={}OPTIONS[:riak_host]="localhost"OPTIONS[:riak_port]="8098"OPTIONS[:es_host]="localhost"OPTIONS[:es_port]="9200"OPTIONS[:riak_timeout]=5000# millisecondsOPTIONS[:es_timeout]=5000# millisecondsclassRack::Proxydefinitialize(app)@app=app@hydra=Typhoeus::Hydra.newenddefcall(env)req=Rack::Request.new(env)# We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware.req_body=req.body.readifreq.bodyriak_url="http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}"opts={:timeout=>OPTIONS[:riak_timeout]}opts.merge!(:method=>req.request_method.downcase.to_sym)opts.merge!(:headers=>{"Content-type"=>req.content_type})ifreq.content_typeopts.merge!(:body=>req_body)ifreq_body&&req_body.length>0riak_req=Typhoeus::Request.new(riak_url,opts)riak_response={}riak_req.on_completedo|response|riak_response[:code]=response.coderiak_response[:headers]=response.headers_hashriak_response[:body]=response.bodyend@hydra.queueriak_req# If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak"if(req.put?||req.post?)&&req.content_type=="application/json"req.path=~%r{^/riak/([^/]+)/([^/]+)}bucket,key=$1,$2es_url="http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}"opts={:timeout=>OPTIONS[:es_timeout]}opts.merge!(:method=>req.request_method.downcase.to_sym)opts.merge!(:body=>req_body)ifreq_body&&req_body.length>0es_req=Typhoeus::Request.new(es_url,opts)es_response={}es_req.on_completedo|response|es_response[:code]=response.codees_response[:headers]=response.headers_hashes_response[:body]=response.bodyend@hydra.queuees_reqend# Concurrently executes both HTTP requests, blocks until they both finish@hydra.run#If we wrote to ES add a custom headerriak_response[:headers].merge!("X-ElasticSearch-ResCode"=>es_response[:code].to_s)ifes_response&&es_response[:code]#Typhoeus can add nil headers, lets get rid of themriak_response[:headers].delete_if{|k,v|v==nil}# Return original Riak response to client[riak_response[:code],riak_response[:headers],riak_response[:body]]endenduseRack::Proxy
Execute the script, and it will listen on port 4567, so point your Riak client of choice there and start PUTing data, which will be seamlessly replicated into the ElasticSearch cluster. If we were really going to use this in anger, there is a lot of work yet to be done, but as a skeleton of how to use Sinatra (Rack, really) to quickly whip up custom proxys, and tee HTTP requests, I thought it might be useful.