-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataNode.js
171 lines (143 loc) · 4.51 KB
/
dataNode.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
const fs = require("fs");
const FormData = require("form-data");
const axios = require("axios");
const express = require("express");
const bodyParser = require("body-parser");
const multer = require("multer");
// Command line arguments
const args = process.argv.slice(2);
// DataNode Properties
const nodeDNS = args[1];
const NodeID = parseInt(args[0]);
const PORT = 3000 + NodeID;
const nameNodeURL = args[2];
// Array of blockIDs refrerring to blocks that are stored locally on disk.
const blocks = [];
// Storage engine for block uploads
var storage = multer.diskStorage({
destination: function(req, file, cb) {
if (!fs.existsSync("./DataNode-" + NodeID + "/")) {
fs.mkdirSync("./DataNode-" + NodeID + "/");
fs.mkdirSync("./DataNode-" + NodeID + "/blocks/");
}
cb(null, "./DataNode-" + NodeID + "/blocks/");
},
filename: function(req, file, cb) {
cb(null, file.fieldname + ".block");
}
});
// Express plugin to recieve files in formdata
let upload = multer({ storage: storage });
// Initialize express app
const app = express();
app.use(bodyParser.json());
// Tells datanode to replicate a specicific block to a target datanode.
// Expects the target datanode URL to be in the body.
app.post("/Replicate/:BlockID/", function(req, res) {
let blockID = req.params.BlockID;
let nodeURL = req.body.NodeURL;
if (!nodeURL || !blockID) {
res.status(400).send("Replication request malformed.");
}
sendDataBlock(nodeURL, blockID);
console.log("Replication operation recieved.");
res.send("Replication Operation Recieved.");
});
// Saves block to disk and remembers that the blockID is stored at this node.
// Expects block file to be in the body.
app.put("/Blocks/:BlockID", upload.any(), function(req, res) {
let blockID = req.params.BlockID;
let file = req.files[0];
let nextDataNodes = null;
if (req.body.next != null) {
nextDataNodes = req.body.next.split(",");
}
if (file == null) {
res.status(400).send("No block file uploaded.");
} else {
console.log("Trying to write: " + file.fieldname);
// Wait for block to finish writing
while (
!fs.existsSync(
"./DataNode-" + NodeID + "/blocks/" + file.fieldname + ".block"
)
) {}
fs.renameSync(
"./DataNode-" + NodeID + "/blocks/" + file.fieldname + ".block",
"./DataNode-" + NodeID + "/blocks/" + blockID + ".block"
);
blocks.push(blockID);
// If relpications need to be made
if (nextDataNodes != null) {
let nextDataNode = nextDataNodes[0];
sendDataBlock(nextDataNode, blockID, nextDataNodes.slice(1));
}
res.send("Block upload succeeded!");
}
});
// Returns a specific block declared in the request.
app.get("/Blocks/:BlockID", function(req, res) {
let blockID = req.params.BlockID;
let blockPath = __dirname + "/DataNode-" + NodeID + "/blocks/" + blockID + ".block";
if (!fs.existsSync(blockPath)) {
res.status(404).send("Block not found.");
} else {
res.download(blockPath, blockID + ".block");
}
});
// Listen for requests
app.listen(PORT, () => console.log(`DataNode ${NodeID} listening on port ${PORT}!`));
// Time between block reports in seconds
const blockReportInterval = 30;
// Send block report to nameNode on interval of [blockReportInterval] seconds.
setInterval(() => {
const blockReportURL = nameNodeURL + "/BlockReport/" + NodeID;
axios({
method: "put",
url: blockReportURL,
data: {
blocks
},
headers: {
referer: nodeDNS
}
})
.then(function(response) {
if (response.status == 200) {
console.log("Block report sent successfully!");
}
})
.catch(function(error) {
console.log(error);
console.error("Block report sending failed!");
});
}, blockReportInterval * 1000);
function sendDataBlock(nodeURL, blockID, nextDataNodes = []) {
let uploadURL = nodeURL + "/Blocks/" + blockID + "/";
let form = new FormData();
form.append(
"block-" + blockID,
fs.createReadStream(
"./DataNode-" + NodeID + "/blocks/" + blockID + ".block"
)
);
if (nextDataNodes.length > 0) {
form.append("next", nextDataNodes.join());
}
axios({
method: "put",
url: uploadURL,
data: form,
maxContentLength: 128000000000,
maxBodyLength: 128000000000,
headers: { ...form.getHeaders() }
})
.then(function(response) {
if (response.status == 200) {
console.log("Replication occured successfully!");
}
})
.catch(function(error) {
console.error("Replication Error Occurred: " + error);
});
}