-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
104 lines (86 loc) · 2.75 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
var trigger = require('level-trigger')
var liveStream = require('level-live-stream')
var viewStream = require('level-view-stream')
var Bucket = require('range-bucket')
var map = require('map-stream')
module.exports = function (db) {
if(db.map) return
trigger(db)
liveStream(db)
var views = {}
db.map = {views: views}
db.map.add = function (view) {
var name = view.name
if('function' === typeof view)
view = {
map: view, name: name, start: '', end: '~'
}
if('function' !== typeof view.map) throw new Error('expected map function')
views[name] = view
view.bucket = Bucket('mapr', name)
view.keyMap = view.keyMap || function (data) {
return data.key
}
view.load = view.load || function (key, cb) {
db.get(key, cb)
}
db.trigger.add({
start: view.start,
name : 'TR-'+name,
end : view.end,
map : view.keyMap,
job : function (key, done) {
//expose a LOAD method...
//and use trigger-map to remove the timestamp from the job.
//(so you don't queue a different job for each update)
view.load(key, function (err, value) {
doMap(view, {key: key, value: value}, done)
})
}
})
}
db.map.start = function (name, done) {
var rs = db.readStream(views[name])
.pipe(map(function (data, next) {
doMap(views[name], data, next)
}))
if(done) rs.on('end', done)
}
db.map.view = viewStream(db, db.map)
function doMap (view, data, done) {
var keys = [], sync = true, self = this, batch = []
var kBucket = Bucket('mapr-keys', view.name)
function emit (key, value) {
if(!sync) throw new Error('emit called asynchronously')
var _key = view.bucket([].concat(key).concat(data.key))
batch.push({
type: 'put', key: _key, value: value
})
keys.push(_key)
}
emit.emit = emit
//don't do a map if this was a delete.
//will still delete the old mappings,
//which will trigger a reduce (or whatevs)
if('undefined' !== typeof data.value)
view.map.call(emit, data.key, data.value, emit)
//setting this will make emit throw if it is called again later.
sync = false
var mapOldKeys = kBucket(data.key)
db.get(mapOldKeys, function (err, oldKeys) {
oldKeys = (oldKeys ? JSON.parse(oldKeys) : [])
//delete the old keys that arn't being updated.
oldKeys.forEach(function (oldKey) {
if(!~keys.indexOf(oldKey))
batch.push({type: 'del', key: oldKey})
})
//save the maps.
batch.push({
type: keys.length ? 'put' : 'del',
key: mapOldKeys,
value: keys.length ? JSON.stringify(keys) : null
})
db.batch(batch, done)
})
}
}