forked from Level/party
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
97 lines (77 loc) · 3.09 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
var level = require('level-prebuilt');
var has = require('has');
var pump = require('pump');
var fs = require('fs');
var net = require('net');
var path = require('path');
var multileveldown = require('multileveldown');
module.exports = function (dir, opts) {
if (!opts) opts = {};
if (!has(opts, 'retry')) opts.retry = true;
var sockPath = process.platform === 'win32' ?
'\\\\.\\pipe\\level-party\\' + path.resolve(dir) :
path.join(dir, 'level-party.sock');
var client = multileveldown.client(opts);
client.open(tryConnect);
function tryConnect () {
if (!client.isOpen()) return;
var socket = net.connect(sockPath);
var connected = false;
socket.on('connect', function () {
connected = true;
});
// we pass socket as the ref option so we dont hang the event loop
pump(socket, client.createRpcStream({ref: socket}), socket, function () {
if (!client.isOpen()) return;
var db = level(dir, opts);
db.on('error', onerror);
db.on('open', onopen);
function onerror (err) {
db.removeListener('open', onopen);
if (err.type === 'OpenError') {
if (connected) return tryConnect();
setTimeout(tryConnect, 100);
}
}
function onopen () {
db.removeListener('error', onerror);
fs.unlink(sockPath, function (err) {
if (err && err.code !== 'ENOENT') return db.emit('error', err);
if (!client.isOpen()) return;
var sockets = [];
var down = client.db;
var server = net.createServer(function (sock) {
if (sock.unref) sock.unref();
sockets.push(sock);
pump(sock, multileveldown.server(db), sock, function () {
sockets.splice(sockets.indexOf(sock), 1);
});
});
client.db = db.db;
client.close = shutdown;
client.emit('leader');
server.listen(sockPath, onlistening);
function shutdown (cb) {
sockets.forEach(function (sock) {
sock.destroy();
});
server.on('close', function () {
db.close(cb);
});
server.close();
}
function onlistening () {
if (server.unref) server.unref();
if (down.isFlushed()) return;
var sock = net.connect(sockPath);
pump(sock, down.createRpcStream(), sock);
down.flush(function () {
sock.destroy();
});
}
});
}
});
};
return client;
};