-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathzookeeper_store.clj
50 lines (45 loc) · 1.86 KB
/
zookeeper_store.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
(ns shoutout.zookeeper-store
(:require shoutout
[clojure.edn :as edn])
(:import (org.apache.zookeeper CreateMode ZooKeeper)
(org.apache.curator
framework.CuratorFramework
framework.CuratorFrameworkFactory
retry.BoundedExponentialBackoffRetry
framework.recipes.cache.NodeCache
framework.recipes.cache.NodeCacheListener)))
(defn set-or-create [^CuratorFramework curator ^String k ^bytes v]
(try
(-> curator .create (.forPath k v))
(catch org.apache.zookeeper.KeeperException$NodeExistsException e
(-> curator .setData (.forPath k v)))))
(defn read-raw [^CuratorFramework curator ^String path]
(try
(edn/read-string (-> curator .getData (.forPath path)))
(catch org.apache.zookeeper.KeeperException$NoNodeException e
{})))
(deftype ShoutoutZookeeperStore [^CuratorFramework curator node-cache deserialized-cache path]
shoutout/ShoutoutStorage
(shoutout/read-from-storage [_ feature-name]
(get @deserialized-cache feature-name))
(shoutout/write-to-storage [_ feature-name serialized-feature]
(let [existing (read-raw curator path)]
(set-or-create
curator
path
(pr-str
(assoc existing
feature-name
serialized-feature))))))
(defn shoutout-cached-zookeeper-store [^CuratorFramework curator-framework ^String path]
(let [deserialized-cache (atom {})
node-cache (NodeCache. curator-framework path)]
(.addListener
(.getListenable node-cache)
(reify NodeCacheListener
(nodeChanged [_]
(let [data (edn/read-string (.getData (.getCurrentData node-cache)))]
(swap! deserialized-cache (constantly data))))))
(.start node-cache)
(.rebuild node-cache)
(ShoutoutZookeeperStore. curator-framework node-cache deserialized-cache path)))