This repository has been archived by the owner on Dec 5, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathindex.js
128 lines (107 loc) · 3.28 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
var Trigger = require('level-trigger')
var range = require('./range')
var next =
typeof setImmediate == 'undefined'
? process.nextTick
: setImmediate
next = setTimeout
module.exports = function (db, mapDb, map, reduce, initial) {
if('string' === typeof mapDb) mapDb = db.sublevel(mapDb)
//store the keys a value has been mapped to.
var mapper = mapDb.sublevel('mapped')
if(!map)
throw new Error('expected a map function')
//when record is inserted, pull out what it was mapped to last time.
var maps = Trigger(db, mapDb.sublevel('maps'), function (id, done) {
mapper.get(id, function (err, oldKeys) {
oldKeys = oldKeys ? JSON.parse(oldKeys) : []
var newKeys = []
db.get(id, function (err, value) {
var batch = [], async = false
//don't map if it's delete, just delete the old maps
if(value != null)
map(id, value, function (key, value) {
var array = 'string' === typeof key ? [key] : key || []
if(true == async) return console.error('map must not emit async')
if(value == null || key == null) return
array.push(id)
batch.push({key: range.stringify(array), value: value, type: 'put'})
newKeys.push(range.stringify(array))
})
async = true
oldKeys.forEach(function (k) {
if(!~newKeys.indexOf(k)) batch.push({key: k, type: 'del'})
})
batch.push({
key: id,
value: JSON.stringify(newKeys),
type: 'put',
prefix: mapper
})
mapDb.batch(batch, done)
})
})
})
var reduces
if(reduce)
reduces = Trigger(mapDb, 'reduces', function (ch) {
var a = range.parse(ch.key);
if(!a.length) return
a.pop()
return JSON.stringify(a)
},
function (a, done) {
var array = JSON.parse(a)
var acc = initial
mapDb.createReadStream(range.range(array.concat(true)))
.on('data', function (e) {
try {
acc = reduce(acc, e.value)
} catch (err) {
console.error(err);
return done(err)
}
})
.on('end', function () {
var batch
mapDb.batch([batch = {
key : range.stringify(array),
value: ''+acc,
type : acc == null ? 'del' : 'put'
}], function (err) {
if(err) return done(err)
mapDb.emit('reduce', array, acc)
done()
})
})
})
mapDb.start = function () {
maps.start()
reduces && reduces.start()
return mapDb
}
//patch streams so that they can handle ranges.
var createReadStream = mapDb.createReadStream
mapDb.createReadStream = function (opts) {
opts = opts || {}
if(opts.range) {
var r = range.range(opts.range)
opts.start = opts.min = r.min
opts.end = opts.max = r.max
}
return createReadStream.call(this, opts)
}
mapDb.createViewStream = function(opts) {
var stream = this.createReadStream(opts)
stream.on('data', function(d) {
d.key = range.parse(d.key)
})
return stream
}
var oldGet = mapDb.get
mapDb.get = function(key){
if(Array.isArray(key)) key = range.stringify(key)
return oldGet.apply(this, arguments)
}
return mapDb
}