-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathindex.js
68 lines (58 loc) · 1.58 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
var pull = require('pull-stream/pull')
var pullFilter = require('pull-stream/throughs/filter')
var pullFlatten = require('pull-stream/throughs/flatten')
var pullMap = require('pull-stream/throughs/map')
var pullReduce = require('pull-stream/sinks/reduce')
var pullSort = require('pull-sort')
var CompareAt = require('compare-at-paths')
var make = require('./make')
var SinkThrough = require('pull-sink-through')
function first (q) {
for(var k in q) return k
}
function get (q) {
var k = first(q)
var s = k.substring(1)
if(k[0] == '$' && exports[s]) return exports[s](q)
throw new Error('unknown function:'+ k)
}
function last (l) {
return l[l.length - 1]
}
function passSync(fn) {
return function (data) {
return data.sync ? data : fn(data)
}
}
exports = module.exports = function (q, cb) {
q = q.filter(Boolean)
if(!q.length) return pull.through()
if(last(q).$reduce && cb) {
return pull.apply(null,
q.slice(0, q.length - 1).map(get)
.concat(exports.reduce(last(q).$reduce, cb))
)
}
else if(Array.isArray(q))
return pull.apply(null, q.map(get))
else
return get(q)
}
exports.filter = function (q) {
return pullFilter(passSync(make(q)))
}
exports.map = function (q) {
return pull(pullMap(passSync(make(q))), pullFilter())
}
exports.reduce = function (q, cb) {
//TODO: realtime reduce.
if(cb)
return pullReduce(make(q), null, cb)
return pull(SinkThrough(function (cb) {
return pullReduce(make(q), null, cb)
}), pullFlatten())
}
exports.sort = function (paths) {
console.log("SORT", paths)
return pullSort(CompareAt(paths.$sort))
}