-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmysqliterator.js
100 lines (81 loc) · 2.65 KB
/
mysqliterator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
var util = require('util')
, PassThrough = require('stream').PassThrough
, AbstractIterator = require('abstract-leveldown').AbstractIterator
, mysql = require('mysql')
, sqlHelper = require('./sql')
, MysqlIterator = function(db, options) {
var self = this
, query = []
, start = options.start && options.start.length > 0 ? mysql.escape(options.start) : null
, end = options.end && options.end.length > 0 ? mysql.escape(options.end) : null
AbstractIterator.call(this, db)
this._reverse = !!options.reverse
this._keyAsBuffer = !!options.keyAsBuffer
this._valueAsBuffer = !!options.valueAsBuffer
this._stream = new PassThrough({
objectMode: true
})
this._stream.once('end', function() {
self._endEmitted = true
})
query.push('SELECT * from ' + db.table)
if (options.reverse) {
if (start && end)
query.push('WHERE `key` <= ' + start + ' AND `key` >= ' + end)
else if (start)
query.push('WHERE `key` <= ' + start)
else if (end)
query.push('WHERE `key` >= ' + end)
query.push('ORDER BY `key` DESC')
} else {
if (start && end)
query.push('WHERE `key` >= ' + start + ' AND `key` <= ' + end)
else if (start)
query.push('WHERE `key` >= ' + start)
else if (end)
query.push('WHERE `key` <= ' + end)
query.push('ORDER BY `key` ASC')
}
if (options.limit && options.limit !== -1)
query.push('LIMIT ' + options.limit)
db._streamingQuery(query.join('\n'), function(err, s) {
self._foobar = s
s.pipe(self._stream)
s.once('close', function() {
throw new Error('CLOSE')
})
})
}
util.inherits(MysqlIterator, AbstractIterator)
MysqlIterator.prototype._next = function(callback) {
var self = this
, obj = this._stream.read()
, onReadable = function() {
self._stream.removeListener('end', onEnd)
self._next(callback)
}
, onEnd = function() {
self._stream.removeListener('readable', onReadable)
callback()
}
, key
, value
if (this._endEmitted)
callback()
else if (obj === null) {
this._stream.once('readable', onReadable)
this._stream.once('end', onEnd)
}
else {
key = obj.key
if (!this._keyAsBuffer) key = key.toString()
value = obj.value
if (!this._valueAsBuffer) value = value.toString()
callback(null, key, value)
}
}
MysqlIterator.prototype._end = function(callback) {
this._stream.end()
callback()
}
module.exports = MysqlIterator