Clojure Elasticsearch Spandex (part 1)

Clojure Elasticsearch Spandex (part 1)

@MikeAnanev

In this article I will describe how to get started with Clojure and Elasticsearch 5.2.2 (dockerized version). 

Before we start

I use Macbook and Docker to run Elastic 5. Docker has some weird issue for Elastic 5 in Mac environment preventing normal start out of the box. Symptoms for this error is message like this: ...virtual memory areas vm.max_map_count [65530] is too low...

To avoid this issue we need to run Docker Terminal from Docker Toolbox for mac and run next commands in terminal:

docker-machine ssh
sudo sysctl -w vm.max_map_count=262144
exit

First contact

Clojure has several libs to work with Elasticsearch. I pick Spandex https://github.com/mpenet/spandex cause it has newest client for Elasticsearch 5.2. Elastich (other Clojure lib) supports only Elasticsearch 2.xx or older.

First of all, lets include necessary dependency in project.clj

[cc.qbits/spandex "0.3.5"]

Then, we need to require necessary namespaces:

(:require [qbits.spandex :as s]
[qbits.spandex.utils :as s-utils]
[clojure.core.async :as async])

To make connection, we need to specify Elastic servers and auth credentials, cause Elastic 5 (docker image) has basic auth by default.

(def c (s/client {:hosts ["http://192.168.99.100:9200"]
:http-client {:basic-auth {:user "elastic" :password "Secret13"}}}))

Let's make first query:

(s/request c {:url "/_cluster/health" :method :get}))

=>
#qbits.spandex.Response{:body {:active_shards 5,
                :task_max_waiting_in_queue_millis 0,
        ...<skipped>...
                :active_primary_shards 5},
            :status 200,
...}

Let's make another one:

(s/request c {:url    (s-utils/url [ :_search])
:method :get
:body {:query {:match_all {}}}})

=>
#qbits.spandex.Response{:body {:took 32,
                :timed_out false,
                :_shards {:total 5, :successful 5, :failed 0},
                :hits {:total 22112,
                   :max_score 1.0,
...

These requests were made with blocking API. But what if we need some async interactions?

Async queries

Spandex provides nice async features: callbacks or core.async channels. Let's make query with callback fn's:

(s/request-async c {:url     (s-utils/url [ :_search])
:method :get
:body {:query {:match {:message "this is a test"}}}
:success (fn [response-as-clj] (println "success:" response-as-clj))
:error (fn [ex] (println "got error:" ex))})


=> nil
success: #qbits.spandex.Response{:body {:took 8, :timed_out false, :_shards {:total 5, :successful 5, :failed 0}, :hits {:total 0, :max_score nil, :hits []}}, :status 200,

But more interesting feature is core.async support. Let's make the same query using async channels:

(async/<!! (s/request-chan c {:url (s-utils/url [ :_search])
:method :get
:body {:query {:match {:message "this is a test"}}}}))

=>
#qbits.spandex.Response{:body {:took 5,
                :timed_out false,
                :_shards {:total 5, :successful 5, :failed 0},
                :hits {:total 0, :max_score nil, :hits []}},
            :status 200,
...}

Insert data

Let's create helper function, which takes a body and send it to elastic using existing connection:

 (defn es-req
  "make async request to elasticsearch"
  [conn body]
  (async/<!! (s/request-chan conn body)))

Now we are ready to create new index (database)

 ;;create index (database) using settings by default
 (es-req c {:url "/megacorp"
      :method :put
      :body {:settings {:index {}}}})

=>
{:body {:acknowledged true, :shards_acknowledged true},
 :status 200,
...}

Now we can get info about new db:

 ;;show me info about new db 
 (es-req c {:url "/megacorp"})

=>
{:body
 {:megacorp
 {:aliases {},
  :mappings {},
  :settings
  {:index
  {:creation_date "1489479710312",
   :number_of_shards "5",
   :number_of_replicas "1",
   :uuid "629PwbW6QpqKxk44Wic7wQ",
   :version {:created "5020299"},
   :provided_name "megacorp"}}}},
 :status 200,
 :headers
...}

But what if we want specific settings for index? No problem, let's delete current index and create another one:

 ;;delete index
 (es-req c {:url "/megacorp"
      :method :delete
      :body {}})
=>
{:body {:acknowledged true},
 :status 200,
...}

And now we create new index with specific settings:

 ;;note: :index section is not necessary
 (es-req c {:url "/megacorp"
      :method :put
      :body {:settings {:number_of_shards 1
               :number_of_replicas 1}}})
=>{:body {:acknowledged true, :shards_acknowledged true},
 :status 200,
...}

Now it is time to create new mapping (table). Let's create user table. Note, that name of the new mapping represented as keyword in url.

 ;;create new mapping (table)
 (es-req c {:url (s-utils/url [:megacorp :_mapping :user])
      :method :put
      :body {:properties {:first {:type :text}
                :last {:type :text}
                :age {:type :long}
                :birth_date {:type :date}}}})

=>
{:body {:acknowledged true},
 :status 200,
 :headers
...}

Now we can see updated index info:

 ;;show me info about updated db
 (es-req c {:url "/megacorp"})
=>
{:body
 {:megacorp
 {:aliases {},
  :mappings
  {:user
  {:properties
   {:age {:type "long"},
   :birth_date {:type "date"},
   :first {:type "text"},
   :last {:type "text"}}}},
  :settings
  {:index
  {:creation_date "1489480153988",
   :number_of_shards "1",
   :number_of_replicas "1",
   :uuid "fLK37lJBTjCeenTv5NnOoQ",
   :version {:created "5020299"},
   :provided_name "megacorp"}}}},
 :status 200,
 ...

Insert data for user with explicit id 1. In this case we use method PUT. 

(es-req c {:url (s-utils/url [:megacorp :user 1])
       :method :put
       :body {:first "Mike"
           :last "Clojurian"
           :age 29
           :birth_date "2001-11-12"}})
=>
{:body
 {:_index "megacorp",
 :_type "user",
 :_id "1",
 :_version 1,
 :result "created",
 :_shards {:total 2, :successful 1, :failed 0},
 :created true},
...

Now we insert data with automatic ID generation. Note, that now we use method POST.

 (es-req c {:url (s-utils/url [:megacorp :user])
       :method :post
       :body {:first "Bob"
           :last "Clojurian"
           :age 27
           :birth_date "1992-07-25"}})
=>
{:body
 {:_index "megacorp",
 :_type "user",
 :_id "AVrMgARtSZrfBRKJWUrZ",
 :_version 1,
 :result "created",
 :_shards {:total 2, :successful 1, :failed 0},
 :created true},
 :status 201,
...

And finally let's find some user:

 (es-req c {:url  (s-utils/url [:megacorp :_search])
           :method :get
       :body  {:query {:match {:first "Mike"}}}})
=>
{:body
 {:took 6,
 :timed_out false,
 :_shards {:total 1, :successful 1, :failed 0},
 :hits
 {:total 1,
  :max_score 0.6931472,
  :hits
  [{:_index "megacorp",
   :_type "user",
   :_id "1",
   :_score 0.6931472,
   :_source
   {:first "Mike", :last "Clojurian", :age 29, :birth_date "2001-11-12"}}]}},
 :status 200
...

Conclusion

Well, Spandex lib is really thing wrapper under standard Elastic DSL. Requests and responses are usual Clojure maps what makes Spandex with async features very handy tool for Clojure developer.