Skip to content

Commit

Permalink
Improvements to node/python ingestor(s) (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
AHarmlessPyro authored Oct 26, 2022
1 parent f6c3ce4 commit 786710f
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 68 deletions.
5 changes: 3 additions & 2 deletions ingestors/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"format": "prettier --write './src/**/*.{ts,tsx}'"
},
"dependencies": {
"axios": "^1.1.3",
"require-in-the-middle": "^5.2.0"
},
"repository": {
Expand All @@ -24,9 +25,9 @@
"homepage": "www.metlo.com",
"devDependencies": {
"@types/node": "^18.8.4",
"typescript": "^4.8.4",
"nodemon": "^2.0.20",
"ts-node": "^10.9.1",
"tslib": "^2.4.0"
"tslib": "^2.4.0",
"typescript": "^4.8.4"
}
}
14 changes: 13 additions & 1 deletion ingestors/node/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const WorkerPool = require("./pool");
const path = require("path")

const pool = new WorkerPool(os.cpus().length, './workerTarget.js');
const endpoint = "api/v1/log-request/single"

function exit() {
pool.close()
Expand All @@ -13,5 +14,16 @@ process.on('exit', exit);
process.on('SIGTERM', exit);

module.exports = function (key, host) {
Modules({ host, key, pool })
try {
new URL(host)
} catch (err) {
console.error(err)
throw new Error(`Couldn't load metlo. Host is not a proper url : ${host}`)
}
let metlo_host = host
if (metlo_host[metlo_host.length - 1] != "/") {
metlo_host += "/"
}
metlo_host += endpoint
Modules({ host: metlo_host, key, pool })
}
4 changes: 2 additions & 2 deletions ingestors/node/src/modules/express.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ function initialize({ key, host, pool }) {
METLO_DETAILS.host = host
METLO_DETAILS.pool = pool


function compileInformation(_req, _res, responseBody) {
const data = JSON.stringify({
request: {
Expand All @@ -42,6 +41,7 @@ function initialize({ key, host, pool }) {
sourcePort: _req.socket.remotePort,
destination: _res.socket.localAddress,
destinationPort: _res.socket.localPort,
metloSource: "node/express"
}
})

Expand Down Expand Up @@ -77,7 +77,7 @@ function initialize({ key, host, pool }) {

function modifiedSendFile() {
const resp = original_sendFile.apply(this, arguments)
compileInformation(this.req, resp, arguments[arguments.length - 1])
compileInformation(this.req, this, arguments[0])
return resp
};

Expand Down
1 change: 1 addition & 0 deletions ingestors/node/src/modules/fastify.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ function initialize({ key, host, pool }) {
sourcePort: request.raw.socket.remotePort,
destination: response.raw.socket.localAddress,
destinationPort: response.raw.socket.localPort,
metloSource: "node/fastify"
}
}
)
Expand Down
1 change: 1 addition & 0 deletions ingestors/node/src/modules/koa.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ function initialize({ key, host, pool }) {
sourcePort: ctx.request.socket.remotePort,
destination: ctx.request.socket.localAddress,
destinationPort: ctx.request.socket.localPort,
metloSource: "node/koa"
}
}
)
Expand Down
12 changes: 9 additions & 3 deletions ingestors/node/src/pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ class WorkerPool extends EventEmitter {
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
if (worker[kTaskInfo]) {
console.warn(err)
try {
worker[kTaskInfo].done(err, null);
} catch (err) {
//pass
}
} else {
this.emit('error', err);
}
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
Expand Down
38 changes: 12 additions & 26 deletions ingestors/node/src/pool/workerTarget.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,18 @@
const { parentPort } = require("node:worker_threads")
const https = require("https");
// const { http } = require('follow-redirects');
const axios = require("axios")

parentPort.on('message', ({ data: postData, host, key }) => {
var _resp = https.request(host, {
method: "POST", headers: {
parentPort.on('message', ({ data: postData, host, key }) => {
axios({
method: 'post',
url: host,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData),
'Authorization': key
}
},
// (res) => {
// let data = '';

// console.log('Status Code:', res.statusCode);

// res.on('data', (chunk) => {
// data += chunk;
// });

// res.on('end', () => {
// console.log('Body sent');
// });

// }
).on("error", (err) => {
console.log("Error: ", err.message);
})

_resp.write(postData)
_resp.end();
},
data: postData
}).catch((error) => {
console.warn("Encountered an error with metlo ingestor.\nError: ", error.message);
});
});
57 changes: 57 additions & 0 deletions ingestors/node/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ arg@^4.1.0:
resolved "https://registry.yarnpkg.com/arg/-/arg-4.1.3.tgz#269fc7ad5b8e42cb63c896d5666017261c144089"
integrity sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==

asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
integrity sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==

axios@^1.1.3:
version "1.1.3"
resolved "https://registry.yarnpkg.com/axios/-/axios-1.1.3.tgz#8274250dada2edf53814ed7db644b9c2866c1e35"
integrity sha512-00tXVRwKx/FZr/IDVFt4C+f9FYairX517WoGCL6dpOntqLkZofjhu43F/Xl44UOpqa+9sLFDrG/XAnFsUYgkDA==
dependencies:
follow-redirects "^1.15.0"
form-data "^4.0.0"
proxy-from-env "^1.1.0"

balanced-match@^1.0.0:
version "1.0.2"
resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-1.0.2.tgz#e83e3a7e3f300b34cb9d87f615fa0cbf357690ee"
Expand Down Expand Up @@ -120,6 +134,13 @@ chokidar@^3.5.2:
optionalDependencies:
fsevents "~2.3.2"

combined-stream@^1.0.8:
version "1.0.8"
resolved "https://registry.yarnpkg.com/combined-stream/-/combined-stream-1.0.8.tgz#c3d45a8b34fd730631a110a8a2520682b31d5a7f"
integrity sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==
dependencies:
delayed-stream "~1.0.0"

[email protected]:
version "0.0.1"
resolved "https://registry.yarnpkg.com/concat-map/-/concat-map-0.0.1.tgz#d8a96bd77fd68df7793a73036a3ba0d5405d477b"
Expand All @@ -144,6 +165,11 @@ debug@^4.1.1:
dependencies:
ms "2.1.2"

delayed-stream@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/delayed-stream/-/delayed-stream-1.0.0.tgz#df3ae199acadfb7d440aaae0b29e2272b24ec619"
integrity sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==

diff@^4.0.1:
version "4.0.2"
resolved "https://registry.yarnpkg.com/diff/-/diff-4.0.2.tgz#60f3aecb89d5fae520c11aa19efc2bb982aade7d"
Expand All @@ -156,6 +182,20 @@ fill-range@^7.0.1:
dependencies:
to-regex-range "^5.0.1"

follow-redirects@^1.15.0:
version "1.15.2"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.2.tgz#b460864144ba63f2681096f274c4e57026da2c13"
integrity sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==

form-data@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/form-data/-/form-data-4.0.0.tgz#93919daeaf361ee529584b9b31664dc12c9fa452"
integrity sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==
dependencies:
asynckit "^0.4.0"
combined-stream "^1.0.8"
mime-types "^2.1.12"

fsevents@~2.3.2:
version "2.3.2"
resolved "https://registry.yarnpkg.com/fsevents/-/fsevents-2.3.2.tgz#8a526f78b8fdf4623b709e0b975c52c24c02fd1a"
Expand Down Expand Up @@ -226,6 +266,18 @@ make-error@^1.1.1:
resolved "https://registry.yarnpkg.com/make-error/-/make-error-1.3.6.tgz#2eb2e37ea9b67c4891f684a1394799af484cf7a2"
integrity sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==

[email protected]:
version "1.52.0"
resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.52.0.tgz#bbabcdc02859f4987301c856e3387ce5ec43bf70"
integrity sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==

mime-types@^2.1.12:
version "2.1.35"
resolved "https://registry.yarnpkg.com/mime-types/-/mime-types-2.1.35.tgz#381a871b62a734450660ae3deee44813f70d959a"
integrity sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==
dependencies:
mime-db "1.52.0"

minimatch@^3.1.2:
version "3.1.2"
resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.1.2.tgz#19cd194bfd3e428f049a70817c038d89ab4be35b"
Expand Down Expand Up @@ -286,6 +338,11 @@ picomatch@^2.0.4, picomatch@^2.2.1:
resolved "https://registry.yarnpkg.com/picomatch/-/picomatch-2.3.1.tgz#3ba3833733646d9d3e4995946c1365a67fb07a42"
integrity sha512-JU3teHTNjmE2VCGFzuY8EXzCDVwEqB2a8fsIvwaStHhAWJEeVd1o1QD80CU6+ZdEXXSLbSsuLwJjkCBWqRQUVA==

proxy-from-env@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/proxy-from-env/-/proxy-from-env-1.1.0.tgz#e102f16ca355424865755d2c9e8ea4f24d58c3e2"
integrity sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==

pstree.remy@^1.1.8:
version "1.1.8"
resolved "https://registry.yarnpkg.com/pstree.remy/-/pstree.remy-1.1.8.tgz#c242224f4a67c21f686839bbdb4ac282b8373d3a"
Expand Down
68 changes: 49 additions & 19 deletions ingestors/python/metlo/django.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,93 @@
import json
from concurrent.futures import ThreadPoolExecutor
from urllib.request import Request, urlopen
from urllib.parse import urlparse

from django.conf import settings

endpoint = "api/v1/log-request/single"

class MetloDjango(object):

class MetloDjango(object):
def perform_request(self, data):
urlopen(
url=self.saved_request,
data=json.dumps(data).encode('utf-8')
)
urlopen(url=self.saved_request, data=json.dumps(data).encode("utf-8"))

def __init__(self, get_response):
"""
Middleware for Django to communicate with METLO
:param get_response: Automatically populated by django
"""
self.get_response = get_response
self.pool = ThreadPoolExecutor(max_workers=settings.METLO_CONFIG.get("workers", 4))
self.pool = ThreadPoolExecutor(
max_workers=settings.METLO_CONFIG.get("workers", 4)
)

assert settings.METLO_CONFIG.get("METLO_HOST") is not None, "METLO_CONFIG is missing METLO_HOST attribute"
assert settings.METLO_CONFIG.get("API_KEY") is not None, "METLO_CONFIG is missing API_KEY attribute"
assert (
settings.METLO_CONFIG.get("METLO_HOST") is not None
), "METLO_CONFIG is missing METLO_HOST attribute"
assert (
settings.METLO_CONFIG.get("API_KEY") is not None
), "METLO_CONFIG is missing API_KEY attribute"
assert urlparse(settings.METLO_CONFIG.get("METLO_HOST")).scheme in [
"http",
"https",
], f"Metlo for Django has invalid host scheme. Host must be in format http[s]://example.com"

self.host = settings.METLO_CONFIG["METLO_HOST"]
self.host += endpoint if self.host[-1] == "/" else f"/{endpoint}"
self.key = settings.METLO_CONFIG["API_KEY"]
self.saved_request = Request(
url=self.host,
headers={
"Content-Type": "application/json; charset=utf-8",
"Authorization": self.key,
},
method="POST"
method="POST",
)

def __call__(self, request):
response = self.get_response(request)
params = request.GET if request.method == "GET" else request.POST
dest_ip = request.META.get("SERVER_NAME") if \
"1.0.0.127.in-addr.arpa" not in request.META.get("SERVER_NAME") else "localhost"
src_ip = request.META.get("REMOTE_ADDR") if \
"1.0.0.127.in-addr.arpa" not in request.META.get("REMOTE_ADDR") else "localhost"
dest_ip = (
request.META.get("SERVER_NAME")
if "1.0.0.127.in-addr.arpa" not in request.META.get("SERVER_NAME")
else "localhost"
)
src_ip = (
request.META.get("REMOTE_ADDR")
if "1.0.0.127.in-addr.arpa" not in request.META.get("REMOTE_ADDR")
else "localhost"
)
source_port = request.environ["wsgi.input"].stream.raw._sock.getpeername()[1]
res_body = response.content.decode("utf-8")
data = {
"request": {
"url": {
"host": request._current_scheme_host if request._current_scheme_host else src_ip,
"host": request._current_scheme_host
if request._current_scheme_host
else src_ip,
"path": request.path,
"parameters": list(map(lambda x: {"name": x[0], "value": x[1]}, params.items())),
"parameters": list(
map(lambda x: {"name": x[0], "value": x[1]}, params.items())
),
},
"headers": list(map(lambda x: {"name": x[0], "value": x[1]}, request.headers.items())),
"headers": list(
map(
lambda x: {"name": x[0], "value": x[1]}, request.headers.items()
)
),
"body": request.body.decode("utf-8"),
"method": request.method,
},
"response": {
"url": f"{dest_ip}:{request.META.get('SERVER_PORT')}",
"status": response.status_code,
"headers": list(map(lambda x: {"name": x[0], "value": x[1]}, response.headers.items())),
"headers": list(
map(
lambda x: {"name": x[0], "value": x[1]},
response.headers.items(),
)
),
"body": res_body,
},
"meta": {
Expand All @@ -67,8 +96,9 @@ def __call__(self, request):
"source": src_ip,
"sourcePort": source_port,
"destination": dest_ip,
"destinationPort": request.META.get('SERVER_PORT'),
}
"destinationPort": request.META.get("SERVER_PORT"),
"metloSource": "python/django",
},
}
self.pool.submit(self.perform_request, data=data)
return response
Expand Down
Loading

0 comments on commit 786710f

Please sign in to comment.