Skip to content
liebke edited this page Nov 18, 2011 · 2 revisions

zookeeper-clj

Zookeeper-clj is a Clojure DSL for Apache ZooKeeper, which "is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services."

Out of the box ZooKeeper provides name service, configuration, and group membership. From these core services, higher-level distributed concurrency abstractions can be built, including distributed locks, distributed queues, barriers, leader-election, and transaction services as described in ZooKeeper Recipes and Solutions and the paper "ZooKeeper: Wait-free coordination for Internet-scale systems".

Building these distributed concurrency abstractions is the goal of the Java-based Menagerie library and the, soon to be released, Clojure-based Avout library. Avout, in particular, provides distributed versions of Clojure's Atom and Ref concurrency primitives, as well as distributed implementations of java.util.concurrent.lock.Lock and java.util.concurrent.lock.ReadWriteLock.

Table of Contents

Getting Started

To run these examples, first start a local instance of ZooKeeper on port 2181, see instructions below, and include zookeeper-clj as a dependency by adding the following to your Leiningen project.clj file:

    [zookeeper-clj "0.9.0"]

connect function

First require the zookeeper namespace and create a client with the connect function.

    (require '[zookeeper :as zk])
    (def client (zk/connect "127.0.0.1:2181"))

The connection string is the name, or IP address, and port of the ZooKeeper server. Several host:port pairs can be included as a comma seperated list. The port can be left off if it is 2181.

The connection to the ZooKeeper server can be closed with the close function.

    (zk/close client)

watchers

A watcher function that takes a single event map argument can be passed to connect, which will be invoked as a result of changes of keeper-state, or as a result of other events.

    (def client (zk/connect "127.0.0.1" :watcher (fn [event] (println event))))

if the :watch? flag is set to true when using the exists, children, or data functions, the default watcher function will be triggered under the following circumstances.

  • exists: the watch will be triggered by a successful operation that creates/deletes the node or sets the data on the node.
  • children: the watch will be triggered by a successful operation that deletes the node of the given path or creates/deletes a child under the node.
  • data: the watch will be triggered by a successful operation that sets data on the node, or deletes the node.

The default watcher function can be overriden with a custom function by passing it as the :watcher argument to the exists, children, or data functions.

The argument to the watcher function is a map with three keys: :event-type, :keeper-state, and :path.

  • event-type: :NodeDeleted, :NodeDataChanged, :NodeCreated, :NodeChildrenChanged, :None
  • keeper-state: :AuthFailed, :Unknown, :SyncConnected, :Disconnected, :Expired, :NoSyncConnected
  • path: the path to the node in question, may be nil

NOTE: Watches are one time triggers; if you get a watch event and you want to get notified of future changes, you must set another watch.

create function

Next, create a node called "/parent-node"

    (zk/create client "/parent-node" :persistent? true)
    ;; => "/parent-node"

Setting the :persistent? flag to true creates a persistent node, meaning one that will persist even after the client that created it is no longer connected. By default, nodes are ephemeral (i.e. :persistent? false) and will be deleted if the client that created them is disconnected (this is key to how ZooKeeper is used to build robust distributed systems).

A node must be persistent if you want it to have child nodes.

asynchronous calls

Most of the zookeeper functions can be called asynchronously by setting the :async? option to true, or by providing an explicit callback function with the :callback option. When invoked asynchronously, each function will return a promise that will eventually contain the result of the call (a map with the following keys: :return-code, :path, :context, :name).

    (def result-promise (zk/create client "/parent-node" :persistent? true :async? true))

Dereferencing the promise will block until a result is returned.

    @result-promise

If a :callback function is passed, the promise will be returned with the result map and the callback will be invoked with the same map.

    (def result-promise (zk/create client "/parent-node" :persistent? true :callback (fn [result] (println result))))

exists function

We can check the existence of the newly created node with the exists function.

    (zk/exists client "/parent-node")

The exists function returns nil if the node does not exist, and returns a map with the following keys if it does: :numChildren, :ephemeralOwner, :cversion, :mzxid, :czxid, :dataLength, :ctime, :version, :aversion, :mtime, :pzxid. See the ZooKeeper documentation for description of each field.

The exists function accepts the :watch?, :watcher, :async?, and :callback options. The watch functions will be triggered by a successful operation that creates/deletes the node or sets the data on the node.

children function

Next, create a child node for "/parent-node"

    (zk/create client "/parent-node/child-node")
    ;; => "/parent-node/child-node"

Since the :persistent? flag wasn't set to true, this node will be ephemeral, meaning it will be deleted if the client that created it is disconnected.

A list of a node's children can be retrieved with the children function.

    (zk/children client "/parent-node")
    ;; => ("child-node")

If the node has no children, nil will be returned, and if the node doesn't exist, false will be returned.

The children function accepts the :watch?, :watcher, :async?, and :callback options. The watch function will be triggered by a successful operation that deletes the node of the given path or creates/delete a child under the node.

sequential nodes

If the :sequential? option is set to true when a node is created, a ten digit sequential ID is appended to the name of the node (it's idiomatic to include a dash as the last character of a sequential node's name).

    (zk/create-all client "/parent/child-" :sequential? true)
    ;; => "/parent/child-0000000000"

The create-all function creates the parent nodes if they don't already exist, here we used it to create the "/parent" node.

The sequence ID increases monotonically for a given parent directory.

    (zk/create client "/parent/child-" :sequential? true)
    ;; => "/parent/child-0000000001"

    (zk/create client "/parent/child-" :sequential? true)
    ;; => "/parent/child-0000000002"

The zookeeper.util namespace contains functions for extracting IDs from sequential nodes and sorting them.

    (require '[zookeeper.util :as util])
    (util/extract-id (first (zk/children client "/parent")))
    ;; => 2

The order of the child nodes return from children is arbitrary, but the nodes can be sorted with the sort-sequential-nodes function.

    (util/sort-sequential-nodes (zk/children client "/parent"))
    ;; => ("child-0000000000" "child-0000000001" "child-0000000002")

data functions

Each node has a data field that can hold a byte array, which is limited to 1M is size.

The set-data function is used to insert data. The set-data function takes a version number, which needs to match the current data version. The current version is a field in the map returned by the exists function.

    (def version (:version (zk/exists client "/parent")))

    (zk/set-data client "/parent" (.getBytes "hello world" "UTF-8") version)

The data function is used to retrieve the data stored in a node.

    (zk/data client "/parent")
    ;; => {:data ..., :stat {...}}

The data function returns a map with two fields, :data and :stat. The :stat value is the same map returned by the exists function. The :data value is a byte array.

    (String. (:data (zk/data client "/parent")) "UTF-8")
    ;; => "hello world"

The data function accepts the :watch?, :watcher, :async?, and :callback options. The watch function will be triggered by a successful operation that sets data on the node, or deletes the node.

data serialization

The zookeeper.data namespace contains functions for serializing different primitive types to and from byte arrays.

    (require '[zookeeper.data :as data])
    (def version (:version (zk/exists client "/parent")))
    (zk/set-data client "/parent" (data/to-bytes 1234) version)
    (data/to-long (:data (zk/data client "/parent")))
    ;; => 1234

The following types have been extended to support the to-bytes method: String, Integer, Double, Long, Float, Character. The following functions can be used to convert byte arrays back to their respective types: to-string, to-int, to-double, to-long, to-float, to-short, and to-char.

Clojure forms can be written to and read from the data field using pr-str and read-string, respectively.

    (zk/set-data client "/parent" (data/to-bytes (pr-str {:a 1, :b 2, :c 3})) 2)
    (read-string (data/to-string (:data (zk/data client "/parent"))))
    ;; => {:a 1, :b 2, :c 3}

delete functions

Nodes can be deleted with the delete function.

    (zk/delete client "/parent/child-node")

The delete function takes an optional version number, the delete will succeed if the node exists at the given version. the default version value is -1, which matches any version number.

The delete function accepts the :async? and :callback options.

Nodes that have children cannot be deleted. Two convenience functions, delete-children and delete-all, can be used to delete all of a node's children or delete a node and all of it's children, respectively.

    (delete-all client "/parent")

ACL functions

The acl function takes a scheme, id value, and a set of permissions. The following schemes are built in.

  • world has a single id, anyone, that represents anyone.
  • auth doesn't use any id, represents any authenticated user.
  • digest uses a username:password string to generate an MD5 hash which is then used as an ACL ID identity. Authentication is done by sending the username:password in clear text. When used in the ACL the expression will be the username:base64 encoded SHA1 password digest.
  • host uses the client host name as an ACL ID identity. The ACL expression is a hostname suffix. For example, the ACL expression host:corp.com matches the ids host:host1.corp.com and host:host2.corp.com, but not host:host1.store.com.
  • ip uses the client host IP as an ACL ID identity. The ACL expression is of the form addr/bits where the most significant bits of addr are matched against the most significant bits of the client host IP.

The folllowing permissions are supported:

  • :create: you can create a child node
  • :read: you can get data from a node and list its children.
  • :write: you can set data for a node
  • :delete: you can delete a child node
  • :admin: you can set permissions

Below are examples of each ACL scheme.

    (zk/acl "world" "anyone" :read :create :delete :admin :write)
    (zk/acl "ip" "127.0.0.1" :read :create :delete :admin :write)
    (zk/acl "host" "thinkrelevance.com" :admin :read :write :delete :create)
    (zk/acl "auth" "" :read :create :delete :admin :write)

There are five convenience functions for creating ACLs of each scheme, world-acl, auth-acl, digest-acl, host-acl, and ip-acl.

    (zk/world-acl :read :delete :write)

When no permissions are provided, the following are used by default: :read, :create, :delete, :write -- but not :admin.

    (zk/ip-acl "127.0.0.1")
    (zk/digest-acl "david:secret" :read :delete :write)
    (zk/host-acl "thinkrelevance.com" :read :delete :write)
    (zk/auth-acl :read :delete :write)

A list of ACLs can be passed as an option to the create function.

    (zk/create client "/protected-node" :acl [(zk/auth-acl :admin :create :read :delete :write)])

In the above example, only the user that created the node has permissions on it. In order to authenticate a user, authentication info must be added to a client connection with the add-auth-info function.

    (zk/add-auth-info client "digest" "david:secret")

If an unauthorized client tries to access the node, a org.apache.zookeeper.KeeperException$NoAuthException exception will be thrown.

Group Membership Example

    (def group-name "/example-group")

    (def client (zk/connect "127.0.0.1:2181"))

    (when-not (zk/exists client group-name)
      (zk/create client group-name :persistent? true))

This watcher will be called every time the children of the "/example-group" node are changed. Each time it is called it will print the children and add itself as the watcher.

    (defn group-watcher [x]
      (let [group (zk/children client group-name :watcher group-watcher)]
        (prn "Group members: " group)))

Create a new node for this member and add a watcher for changes to the children of "/example-group".

    (defn join-group [name]
      (do (zk/create client (str group-name "/" name))
          (zk/children client group-name :watcher group-watcher)))

Run this Example

    (use 'examples.group-membership)
    (join-group "bob")

From another REPL run:

    (use 'examples.group-membership)
    (join-group "sue")

And from another REPL run:

    (use 'examples.group-membership)
    (join-group "dan")

Each REPL will print the group members as each one joins the group. Kill any process and the remaining processes will print the remaining group members.

Leader Election Example

    (def root-znode "/election")

    (def client (zk/connect "127.0.0.1:2181"))

    (when-not (zk/exists client root-znode)
      (zk/create client root-znode :persistent? true))

    (defn node-from-path [path]
      (.substring path (inc (count root-znode))))

    (declare elect-leader)

The predecessor for Node A is the node that has the highest id that is < the id of Node A. watch-predecessor is called when the predecessor node changes. If this node is deleted and was the leader, then the watching node becomes the new leader.

    (defn watch-predecessor [me pred leader {:keys [event-type path]}]
      (if (and (= event-type :NodeDeleted) (= (node-from-path path) leader))
        (println "I am the leader!")
        (if-not (zk/exists client (str root-znode "/" pred)
                           :watcher (partial watch-predecessor me pred leader))
          (elect-leader me))))

    (defn predecessor [me coll]
      (ffirst (filter #(= (second %) me) (partition 2 1 coll))))

If the node associated with the current process is not the leader then add a watch to the predecessor.

    (defn elect-leader [me]
      (let [members (util/sort-sequential-nodes (zk/children client root-znode))
            leader (first members)]
        (print "I am" me)
        (if (= me leader)
          (println " and I am the leader!")
          (let [pred (predecessor me members)]
            (println " and my predecessor is:" pred)
            (if-not (zk/exists client (str root-znode "/" pred)
                               :watcher (partial watch-predecessor me pred leader))
              (elect-leader me))))))

    (defn join-group []
      (let [me (node-from-path (zk/create client (str root-znode "/n-") :sequential? true))]
        (elect-leader me)))

Evaluate the following forms in any number of REPLs and then kill each one in any order.

    (use 'examples.leader-election)
    (join-group)

Barrier Example

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.

The following is an implementation of a double barrier based on the algorithm from the ZooKeeper Recipes page.

    (require '[zookeeper :as zk])
    (import '(java.net InetAddress))

    (defn enter-barrier
      ([client n f & {:keys [barrier-node proc-name double-barrier?]
                      :or {barrier-node "/barrier"
                           proc-name (.getCanonicalHostName (InetAddress/getLocalHost))
                           double-barrier? true}}]
        (let [mutex (Object.)
              watcher (fn [event] (locking mutex (.notify mutex)))]
          (locking mutex
            (zk/create-all client (str barrier-node "/" proc-name))
            (if (>= (count (zk/children client barrier-node)) n)
              (zk/create client (str barrier-node "/ready") :async? true)
              (do (zk/exists client (str barrier-node "/ready") :watcher watcher :async? true)
                (.wait mutex)))
            (let [results (f)]
              (if double-barrier?
                (exit-barrier client :barrier-node barrier-node :proc-name proc-name)
                (zk/delete-all client barrier-node))
              results)))))

If the :double-barrier? option is set to true, then exit-barrier is called which blocks until all the processes have completed.

    (defn exit-barrier
      ([client & {:keys [barrier-node proc-name]
                  :or {barrier-node "/barrier"
                       proc-name (.getCanonicalHostName (InetAddress/getLocalHost))}}]
        (let [mutex (Object.)
              watcher (fn [event] (locking mutex (.notify mutex)))]
          (zk/delete client (str barrier-node "/ready"))
          (locking mutex
            (loop []
              (when-let [children (seq (sort (or (zk/children client barrier-node) nil)))]
                (cond
                  ;; the last node deletes itself and the barrier node, letting all the processes exit
                  (= (count children) 1)
                    (zk/delete-all client barrier-node)
                  ;; first node watches the second, waiting for it to be deleted
                  (= proc-name (first children))
                    (do (when (zk/exists client
                                         (str barrier-node "/" (second children))
                                         :watcher watcher)
                          (.wait mutex))
                        (recur))
                  ;; rest of the nodes delete their own node, and then watch the
                  ;; first node, waiting for it to be deleted
                  :else
                    (do (zk/delete client (str barrier-node "/" proc-name))
                        (when (zk/exists client
                                         (str barrier-node "/" (first children))
                                         :watcher watcher)
                          (.wait mutex))
                        (recur)))))))))

Example Usage

    (require '[zookeeper :as zk])
    (use 'examples.barrier)
    (def client (zk/connect "127.0.0.1:2181"))

    (enter-barrier client 2 #(println "First process is running"))

The call to enter-barrier will block until there are N=2 processes in the barrier. From another REPL, execute the following, and then both processes will run and exit the barrier.

    (require '[zookeeper :as zk])
    (use 'examples.barrier)
    (def client (zk/connect "127.0.0.1:2181"))

    (enter-barrier client 2 #(println "Second process is running") :proc-name "node2")

Running ZooKeeper

Download Apache ZooKeeper from http://zookeeper.apache.org/releases.html.

Unpack to $ZOOKEEPER_HOME (wherever you would like that to be).

Here's an example conf file for a standalone instance, by default ZooKeeper will look for it in $ZOOKEEPER_HOME/conf/zoo.cfg

    # The number of milliseconds of each tick
    tickTime=2000
    
    # the directory where the snapshot is stored.
    dataDir=/var/zookeeper
    
    # the port at which the clients will connect
    clientPort=2181

Ensure that the dataDir exists and is writable.

After creating and customizing the conf file, start ZooKeeper

    $ZOOKEEPER_HOME/bin/zkServer.sh start

Testing

Before running 'lein test' you need to start a local instance of ZooKeeper on port 2181.

## Contributing

Although Zookeeper-clj is not part of Clojure-Contrib, it follows the same guidelines for contributing, which includes signing a Clojure Contributor Agreement (CA) before contributions can be accepted.

References

License

zookeper-clj is Copyright © 2011 David Liebke and Relevance, Inc

Distributed under the Eclipse Public License, the same as Clojure.