diff --git a/public/dashboard.html b/public/dashboard.html index f6be7b0..0e6f74b 100644 --- a/public/dashboard.html +++ b/public/dashboard.html @@ -173,7 +173,7 @@

Files

} async function fetchAndPopulatePeers() { - const peers = await window.hydrafiles.rpcClient.http.getPeers(); + const peers = window.hydrafiles.rpcClient.http.getPeers(); const peersEl = document.getElementById('peers'); peersEl.innerHTML = ''; peers.forEach(node => { diff --git a/src/file.ts b/src/file.ts index ce0ddba..5d5ce02 100644 --- a/src/file.ts +++ b/src/file.ts @@ -100,7 +100,7 @@ function createIDBDatabase(): Promise { return dbPromise; } -export class FileDB { +class FileDB { private _client: Hydrafiles; db: DatabaseWrapper = { type: "UNDEFINED", db: undefined }; @@ -154,8 +154,6 @@ export class FileDB { } select(where?: { key: T; value: NonNullable } | undefined, orderBy?: { key: T; direction: "ASC" | "DESC" } | "RANDOM" | undefined): Promise { - if (this.db === undefined) return new Promise((resolve) => resolve([])); - if (this.db.type === "SQLITE") { let query = "SELECT * FROM file"; const params: (string | number | boolean)[] = []; @@ -248,7 +246,6 @@ export class FileDB { } async update(hash: Sha256, updates: Partial): Promise { - if (this.db === undefined) return; updates.updatedAt = new Date().toISOString(); updates.hash = hash; const newFile = fileAttributesDefaults(updates); @@ -298,7 +295,6 @@ export class FileDB { } delete(hash: Sha256): void { - if (this.db === undefined) return; const query = `DELETE FROM file WHERE hash = ?`; if (this.db.type === "SQLITE") { @@ -308,7 +304,6 @@ export class FileDB { } increment(hash: Sha256, column: keyof FileAttributes): void { - if (this.db === undefined) return; if (this.db.type === "SQLITE") this.db.db.prepare(`UPDATE file set ${column} = ${column}+1 WHERE hash = ?`).values(hash.toString()); else if (this.db.type === "INDEXEDDB") { const request = this.objectStore().get(hash.toString()); @@ -326,8 +321,7 @@ export class FileDB { count(): Promise { return new Promise((resolve, reject) => { - if (this.db === undefined) return resolve(0); - else if (this.db.type === "SQLITE") { + if (this.db.type === "SQLITE") { const result = this.db.db.prepare("SELECT COUNT(*) FROM file").value() as number[]; return resolve(result[0]); } @@ -341,7 +335,6 @@ export class FileDB { sum(column: string, where = ""): Promise { return new Promise((resolve, reject) => { - if (this.db === undefined) return resolve(0); if (this.db.type === "SQLITE") { const result = this.db.db.prepare(`SELECT SUM(${column}) FROM file${where.length !== 0 ? ` WHERE ${where}` : ""}`).value() as number[]; return resolve(result === undefined ? 0 : result[0]); @@ -371,7 +364,7 @@ export class FileDB { } } -class File implements FileAttributes { +export class File implements FileAttributes { hash!: Sha256; infohash: string | null = null; downloadCount = Utils.createNonNegativeNumber(0); @@ -400,14 +393,14 @@ class File implements FileAttributes { * @returns {File} A new instance of File. * @default */ - static async init(values: Partial, client: Hydrafiles, vote = false): Promise { + static async init(values: Partial, client: Hydrafiles, vote = false): Promise { if (!values.hash && values.id) { - const files = await client.fileDB.select({ key: "id", value: values.id }); + const files = await client.files.db.select({ key: "id", value: values.id }); values.hash = files[0]?.hash; } if (!values.hash && values.id) { console.log(`Fetching file metadata`); // TODO: Merge with getMetadata - const responses = await client.rpcClient.fetch(`http://localhost/file/${values.id}`); + const responses = client.rpcClient.fetch(`http://localhost/file/${values.id}`); for (let i = 0; i < responses.length; i++) { const response = await responses[i]; if (!response) continue; @@ -422,16 +415,16 @@ class File implements FileAttributes { throw new Error("No hash found for the provided id"); } if (values.infohash !== undefined && values.infohash !== null && Utils.isValidInfoHash(values.infohash)) { - const files = await client.fileDB.select({ key: "infohash", value: values.infohash }); + const files = await client.files.db.select({ key: "infohash", value: values.infohash }); const fileHash = files[0]?.hash; if (fileHash) values.hash = fileHash; } if (!values.hash) throw new Error("File not found"); - let fileAttributes = (await client.fileDB.select({ key: "hash", value: values.hash }))[0]; + let fileAttributes = (await client.files.db.select({ key: "hash", value: values.hash }))[0]; if (fileAttributes === undefined) { - client.fileDB.insert(values); - fileAttributes = (await client.fileDB.select({ key: "hash", value: values.hash }))[0] ?? { hash: values.hash }; + client.files.db.insert(values); + fileAttributes = (await client.files.db.select({ key: "hash", value: values.hash }))[0] ?? { hash: values.hash }; } const file = new File(values.hash, client, vote); Object.assign(file, fileAttributesDefaults(fileAttributes)); @@ -465,7 +458,7 @@ class File implements FileAttributes { const id = this.id; if (id !== undefined && id !== null && id.length > 0) { - const responses = await this._client.rpcClient.fetch(`http://localhost/file/${this.id}`); + const responses = this._client.rpcClient.fetch(`http://localhost/file/${this.id}`); for (let i = 0; i < responses.length; i++) { try { @@ -633,7 +626,7 @@ class File implements FileAttributes { if (this._client.config.s3Endpoint.length > 0) file = await this.fetchFromS3(); if (file !== false) console.log(` ${hash} Serving ${this.size !== undefined ? Math.round(this.size / 1024 / 1024) : 0}MB from S3`); else { - file = await this._client.rpcClient.downloadFile(hash, this.size); + file = await this.download(); if (file === false) { this.found = false; this.save(); @@ -647,7 +640,7 @@ class File implements FileAttributes { } save(): void { - if (this._client.fileDB) this._client.fileDB.update(this.hash, this); + this._client.files.db.update(this.hash, this); } seed(): void { @@ -670,7 +663,7 @@ class File implements FileAttributes { } increment(column: keyof FileAttributes): void { - if (this._client.fileDB) this._client.fileDB.increment(this.hash, column); + this._client.files.db.increment(this.hash, column); this[column]++; } @@ -687,6 +680,155 @@ class File implements FileAttributes { this.save(); } } + + async download(): Promise<{ file: Uint8Array; signal: number } | false> { + let size = this.size; + if (size === 0) { + this.getMetadata(); + size = this.size; + } + if (!this._client.utils.hasSufficientMemory(size)) { + console.log("Reached memory limit, waiting"); + await new Promise(() => { + const intervalId = setInterval(async () => { + if (await this._client.utils.hasSufficientMemory(size)) clearInterval(intervalId); + }, this._client.config.memoryThresholdReachedWait); + }); + } + + const peers = this._client.rpcClient.http.getPeers(true); + for (const peer of peers) { + let fileContent: { file: Uint8Array; signal: number } | false = false; + try { + fileContent = await peer.downloadFile(this); + } catch (e) { + console.error(e); + } + if (fileContent) return fileContent; + } + + console.log(` ${this.hash} Downloading from WebRTC`); + const responses = this._client.rpcClient.rtc.fetch(`http://localhost/download/${this.hash}`); + for (let i = 0; i < responses.length; i++) { + const response = await responses[i]; + const peerContent = new Uint8Array(await response.arrayBuffer()); + console.log(` ${this.hash} Validating hash`); + const verifiedHash = await Utils.hashUint8Array(peerContent); + console.log(` ${this.hash} Done Validating hash`); + if (this.hash !== verifiedHash) return false; + console.log(` ${this.hash} Valid hash`); + + if (this.name === null || this.name.length === 0) { + this.name = String(response.headers.get("Content-Disposition")?.split("=")[1].replace(/"/g, "").replace(" [HYDRAFILES]", "")); + this.save(); + } + } + + return false; + } +} + +class Files { + private _client: Hydrafiles; + public db: FileDB; + public files = new Map(); // TODO: add inserts + + private constructor(client: Hydrafiles, db: FileDB) { + this._client = client; + this.db = db; + + setTimeout(async () => { + const files = await this.db.select(); + for (const file of files) { + this.add(file); + } + }, 1000); // Runs 1 sec late to ensure Files gets saves to this._client + } + + static async init(client: Hydrafiles): Promise { + return new Files(client, await FileDB.init(client)); + } + + public async add(values: Partial): Promise { + if (!values.hash) throw new Error("Hash not defined"); + const file = await File.init(values, this._client, false); + this.files.set(values.hash, file); + if (values.infohash) this.files.set(values.infohash, file); + if (values.name) this.files.set(values.name, file); + return file; + } + + backfillFiles = async (): Promise => { + while (true) { + try { + const keys = Array.from(this.files.keys()); + if (keys.length === 0) return; + const randomKey = keys[Math.floor(Math.random() * keys.length)]; + const file = this.files.get(randomKey); + if (!file) return; + if (file) { + console.log(` ${file.hash} Backfilling file`); + await file.getFile({ logDownloads: false }); + } + } catch (e) { + if (this._client.config.logLevel === "verbose") throw e; + } + } + }; + + // TODO: Compare list between all peers and give score based on how similar they are. 100% = all exactly the same, 0% = no items in list were shared. The lower the score, the lower the propagation times, the lower the decentralisation + async updateFileList(onProgress?: (progress: number, total: number) => void): Promise { + console.log(`Comparing file list`); + let files: FileAttributes[] = []; + const responses = await Promise.all(this._client.rpcClient.fetch("http://localhost/files")); + for (let i = 0; i < responses.length; i++) { + if (responses[i] !== false) { + try { + files = files.concat((await (responses[i] as Response).json()) as FileAttributes[]); + } catch (e) { + if (this._client.config.logLevel === "verbose") console.log(e); + } + } + } + + const uniqueFiles = new Set(); + files = files.filter((file) => { + const fileString = JSON.stringify(file); + if (!uniqueFiles.has(fileString)) { + uniqueFiles.add(fileString); + return true; + } + return false; + }); + + for (let i = 0; i < files.length; i++) { + if (onProgress) onProgress(i, files.length); + const newFile = files[i]; + try { + if (typeof files[i].hash === "undefined") continue; + const fileObj: Partial = { hash: files[i].hash }; + if (files[i].infohash) fileObj.infohash = files[i].infohash; + const currentFile = await this.add(fileObj); + if (!currentFile) continue; + + const keys = Object.keys(newFile) as unknown as (keyof File)[]; + for (let i = 0; i < keys.length; i++) { + const key = keys[i] as keyof FileAttributes; + if (["downloadCount", "voteHash", "voteNonce", "voteDifficulty"].includes(key)) continue; + // @ts-expect-error: + if (newFile[key] !== undefined && newFile[key] !== null && newFile[key] !== 0 && (currentFile[key] === null || currentFile[key] === 0)) currentFile[key] = newFile[key]; + if (newFile.voteNonce !== 0 && newFile.voteDifficulty > currentFile.voteDifficulty && newFile["voteNonce"] > 0) { + console.log(` ${newFile.hash} Checking vote nonce ${newFile["voteNonce"]}`); + currentFile.checkVoteNonce(newFile["voteNonce"]); + } + } + currentFile.save(); + } catch (e) { + console.error(e); + } + } + if (onProgress) onProgress(files.length, files.length); + } } -export default File; +export default Files; diff --git a/src/hydrafiles.ts b/src/hydrafiles.ts index 459abfa..f7bb5fd 100644 --- a/src/hydrafiles.ts +++ b/src/hydrafiles.ts @@ -1,7 +1,7 @@ import { encode as base32Encode } from "https://deno.land/std@0.194.0/encoding/base32.ts"; // import WebTorrent from "npm:webtorrent"; import getConfig, { type Config } from "./config.ts"; -import File, { type FileAttributes, FileDB } from "./file.ts"; +import Files, { FileAttributes } from "./file.ts"; import Utils from "./utils.ts"; // import Blockchain, { Block } from "./block.ts"; import { S3Client } from "https://deno.land/x/s3_lite_client@0.7.0/mod.ts"; @@ -29,7 +29,7 @@ class Hydrafiles { keyPair!: CryptoKeyPair; rpcServer!: RPCServer; rpcClient!: RPCClient; - fileDB!: FileDB; + files!: Files; // webtorrent: WebTorrent = new WebTorrent(); constructor(customConfig: Partial = {}) { @@ -47,7 +47,7 @@ class Hydrafiles { console.log("Startup: Populating KeyPair"); this.keyPair = await this.utils.getKeyPair(); console.log("Startup: Populating FileDB"); - this.fileDB = await FileDB.init(this); + this.files = await Files.init(this); console.log("Startup: Populating RPC Client & Server"); this.rpcClient = await RPCClient.init(this); this.rpcServer = new RPCServer(this); @@ -61,82 +61,10 @@ class Hydrafiles { setInterval(() => this.rpcClient.http.updatePeers(), this.config.comparePeersSpeed); } if (this.config.compareFilesSpeed !== -1) { - this.updateFileList(onUpdateFileListProgress); - setInterval(() => this.updateFileList(onUpdateFileListProgress), this.config.compareFilesSpeed); + this.files.updateFileList(onUpdateFileListProgress); + setInterval(() => this.files.updateFileList(onUpdateFileListProgress), this.config.compareFilesSpeed); } - if (this.config.backfill) this.backfillFiles(); - } - - backfillFiles = async (): Promise => { - while (true) { - try { - const fileAttributes = (await this.fileDB.select(undefined, "RANDOM"))[0]; - if (!fileAttributes) return; - const file = await this.initFile(fileAttributes, false); - if (file) { - console.log(` ${file.hash} Backfilling file`); - await file.getFile({ logDownloads: false }); - } - } catch (e) { - if (this.config.logLevel === "verbose") throw e; - } - } - }; - - // TODO: Compare list between all peers and give score based on how similar they are. 100% = all exactly the same, 0% = no items in list were shared. The lower the score, the lower the propagation times, the lower the decentralisation - async updateFileList(onProgress?: (progress: number, total: number) => void): Promise { - console.log(`Comparing file list`); - let files: FileAttributes[] = []; - const responses = await Promise.all(await this.rpcClient.fetch("http://localhost/files")); - for (let i = 0; i < responses.length; i++) { - if (responses[i] !== false) { - try { - files = files.concat((await (responses[i] as Response).json()) as FileAttributes[]); - } catch (e) { - if (this.config.logLevel === "verbose") console.log(e); - } - } - } - - const uniqueFiles = new Set(); - files = files.filter((file) => { - const fileString = JSON.stringify(file); - if (!uniqueFiles.has(fileString)) { - uniqueFiles.add(fileString); - return true; - } - return false; - }); - - for (let i = 0; i < files.length; i++) { - if (onProgress) onProgress(i, files.length); - const newFile = files[i]; - try { - if (typeof files[i].hash === "undefined") continue; - const fileObj: Partial = { hash: files[i].hash }; - if (files[i].infohash) fileObj.infohash = files[i].infohash; - const currentFile = await File.init(fileObj, this); - if (!currentFile) continue; - - const keys = Object.keys(newFile) as unknown as (keyof File)[]; - for (let i = 0; i < keys.length; i++) { - const key = keys[i] as keyof FileAttributes; - if (["downloadCount", "voteHash", "voteNonce", "voteDifficulty"].includes(key)) continue; - if (newFile[key] !== undefined && newFile[key] !== null && newFile[key] !== 0 && (currentFile[key] === undefined || currentFile[key] === null || currentFile[key] === 0)) { - // @ts-expect-error: - currentFile[key] = newFile[key]; - } - if (newFile.voteNonce !== 0 && newFile.voteDifficulty > currentFile.voteDifficulty) { - console.log(` ${newFile.hash} Checking vote nonce`); - currentFile.checkVoteNonce(newFile["voteNonce"]); - } - } - currentFile.save(); - } catch (e) { - console.error(e); - } - } - if (onProgress) onProgress(files.length, files.length); + if (this.config.backfill) this.files.backfillFiles(); } async getHostname(): Promise { @@ -156,30 +84,26 @@ class Hydrafiles { "\n| Hostname: ", `${await this.getHostname()}`, "\n| Known (Network) Files:", - await this.fileDB.count(), - `(${Math.round((100 * (await this.fileDB.sum("size"))) / 1024 / 1024 / 1024) / 100}GB)`, + this.files.db.count(), + `(${Math.round((100 * (await this.files.db.sum("size"))) / 1024 / 1024 / 1024) / 100}GB)`, "\n| Stored Files:", (await this.fs.readDir("files/")).length, `(${Math.round((100 * await this.utils.calculateUsedStorage()) / 1024 / 1024 / 1024) / 100}GB)`, "\n| Processing Files:", this.rpcServer.processingRequests.size, "\n| Known HTTP Peers:", - (await this.rpcClient.http.getPeers()).length, + (this.rpcClient.http.getPeers()).length, // '\n| Seeding Torrent Files:', // (await webtorrentClient()).torrents.length, "\n| Downloads Served:", - (await this.fileDB.sum("downloadCount")) + ` (${Math.round((((await this.fileDB.sum("downloadCount * size")) / 1024 / 1024 / 1024) * 100) / 100)}GB)`, + (await this.files.db.sum("downloadCount")) + ` (${Math.round((((await this.files.db.sum("downloadCount * size")) / 1024 / 1024 / 1024) * 100) / 100)}GB)`, "\n===============================================\n", ); } - public search = async (where?: { key: T; value: NonNullable }, orderBy?: "RANDOM" | { key: T; direction: "ASC" | "DESC" }): Promise => { - return await this.fileDB.select(where, orderBy); + public search = async (where?: { key: T; value: NonNullable }, orderBy?: "RANDOM" | { key: T; direction: "ASC" | "DESC" }): Promise => { + return await this.files.db.select(where, orderBy); }; - - public async initFile(values: Partial, vote = false): Promise { - return await File.init(values, this, vote); - } } export default Hydrafiles; diff --git a/src/rpc/client.ts b/src/rpc/client.ts index fe6d794..e1dbf0b 100644 --- a/src/rpc/client.ts +++ b/src/rpc/client.ts @@ -1,69 +1,23 @@ import type Hydrafiles from "../hydrafiles.ts"; -import HTTPClient, { HTTPPeer } from "./peers/http.ts"; -import RTCClient from "./peers/rtc.ts"; -import File from "../file.ts"; -import Utils, { type Sha256 } from "../utils.ts"; +import HTTPPeers from "./peers/http.ts"; +import RTCPeers from "./peers/rtc.ts"; export default class RPCClient { _client: Hydrafiles; - http!: HTTPClient; - rtc!: RTCClient; + http!: HTTPPeers; + rtc!: RTCPeers; private constructor(client: Hydrafiles) { this._client = client; } static async init(client: Hydrafiles): Promise { const rpcClient = new RPCClient(client); - rpcClient.http = await HTTPClient.init(rpcClient); - rpcClient.rtc = await RTCClient.init(rpcClient); + rpcClient.http = await HTTPPeers.init(rpcClient); + rpcClient.rtc = new RTCPeers(rpcClient); return rpcClient; } - public async fetch(input: RequestInfo, init?: RequestInit): Promise[]> { - return [...await this.http.fetch(input, init), ...this.rtc.fetch(input, init)]; - } - - async downloadFile(hash: Sha256, size = 0): Promise<{ file: Uint8Array; signal: number } | false> { - if (!this._client.utils.hasSufficientMemory(size)) { - console.log("Reached memory limit, waiting"); - await new Promise(() => { - const intervalId = setInterval(async () => { - if (await this._client.utils.hasSufficientMemory(size)) clearInterval(intervalId); - }, this._client.config.memoryThresholdReachedWait); - }); - } - - const file = await File.init({ hash }, this._client); - if (!file) return false; - const peers = await this.http.getPeers(true); - for (const peer of peers) { - let fileContent: { file: Uint8Array; signal: number } | false = false; - try { - fileContent = await (await HTTPPeer.init(peer, this.http.db, this._client)).downloadFile(file); - } catch (e) { - console.error(e); - } - if (fileContent) return fileContent; - } - - console.log(` ${hash} Downloading from WebRTC`); - const responses = this.rtc.fetch(`http://localhost/download/${hash}`); - for (let i = 0; i < responses.length; i++) { - const hash = file.hash; - const response = await responses[i]; - const peerContent = new Uint8Array(await response.arrayBuffer()); - console.log(` ${hash} Validating hash`); - const verifiedHash = await Utils.hashUint8Array(peerContent); - console.log(` ${hash} Done Validating hash`); - if (hash !== verifiedHash) return false; - console.log(` ${hash} Valid hash`); - - if (file.name === undefined || file.name === null || file.name.length === 0) { - file.name = String(response.headers.get("Content-Disposition")?.split("=")[1].replace(/"/g, "").replace(" [HYDRAFILES]", "")); - file.save(); - } - } - - return false; + public fetch(input: RequestInfo, init?: RequestInit): Promise[] { + return [...this.http.fetch(input, init), ...this.rtc.fetch(input, init)]; } } diff --git a/src/rpc/peers/http.ts b/src/rpc/peers/http.ts index afecdf0..c52ecb7 100644 --- a/src/rpc/peers/http.ts +++ b/src/rpc/peers/http.ts @@ -2,7 +2,7 @@ import type Hydrafiles from "../../hydrafiles.ts"; import Utils, { type NonNegativeNumber } from "../../utils.ts"; import type { Database } from "jsr:@db/sqlite@0.11"; import type { indexedDB } from "https://deno.land/x/indexeddb@v1.1.0/ponyfill.ts"; -import File from "../../file.ts"; +import { File } from "../../file.ts"; import type RPCClient from "../client.ts"; type DatabaseWrapper = { type: "UNDEFINED"; db: undefined } | { type: "SQLITE"; db: Database } | { type: "INDEXEDDB"; db: IDBDatabase }; @@ -98,8 +98,6 @@ class PeerDB { } select(where?: { key: T; value: NonNullable } | undefined, orderBy?: { key: T; direction: "ASC" | "DESC" } | "RANDOM" | undefined): Promise { - if (this.db === undefined) return new Promise((resolve) => resolve([])); - if (this.db.type === "SQLITE") { let query = "SELECT * FROM peer"; const params: (string | number | boolean)[] = []; @@ -194,8 +192,6 @@ class PeerDB { } async update(host: string, newPeer: PeerAttributes | HTTPPeer): Promise { - if (this.db === undefined) return; - // Get the current peer attributes before updating const currentPeer = (await this.select({ key: "host", value: host }))[0] ?? { host }; if (!currentPeer) { @@ -243,7 +239,6 @@ class PeerDB { } delete(host: string): void { - if (this.db === undefined) return; const query = `DELETE FROM peer WHERE host = ?`; if (this.db.type === "SQLITE") { @@ -253,7 +248,6 @@ class PeerDB { } increment(host: string, column: keyof PeerAttributes): void { - if (this.db === undefined) return; if (this.db.type === "SQLITE") this.db.db.prepare(`UPDATE peer set ${column} = ${column}+1 WHERE host = ?`).values(host); else if (this.db.type === "INDEXEDDB") { const request = this.objectStore().get(host); @@ -271,8 +265,7 @@ class PeerDB { count(): Promise { return new Promise((resolve, reject) => { - if (this.db === undefined) return resolve(0); - else if (this.db.type === "SQLITE") { + if (this.db.type === "SQLITE") { const result = this.db.db.prepare("SELECT COUNT(*) FROM peer").value() as number[]; return resolve(result[0]); } @@ -286,7 +279,6 @@ class PeerDB { sum(column: string, where = ""): Promise { return new Promise((resolve, reject) => { - if (this.db === undefined) return resolve(0); if (this.db.type === "SQLITE") { const result = this.db.db.prepare(`SELECT SUM(${column}) FROM peer${where.length !== 0 ? ` WHERE ${where}` : ""}`).value() as number[]; return resolve(result === undefined ? 0 : result[0]); @@ -316,7 +308,7 @@ class PeerDB { } } -export class HTTPPeer implements PeerAttributes { +class HTTPPeer implements PeerAttributes { host: string; hits: NonNegativeNumber = 0 as NonNegativeNumber; rejects: NonNegativeNumber = 0 as NonNegativeNumber; @@ -414,12 +406,19 @@ export class HTTPPeer implements PeerAttributes { return false; } } + + async validate(): Promise { + const file = await File.init({ hash: Utils.sha256("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f") }, this._client); + if (!file) throw new Error("Failed to build file"); + return await this.downloadFile(file) !== false; + } } // TODO: Log common user-agents and re-use them to help anonimise non Hydrafiles peers -export default class HTTPClient { +export default class HTTPPeers { private _rpcClient: RPCClient; public db: PeerDB; + public peers = new Map(); private constructor(rpcClient: RPCClient, db: PeerDB) { this._rpcClient = rpcClient; @@ -427,40 +426,43 @@ export default class HTTPClient { } /** - * Initializes an instance of HTTPClient. - * @returns {HTTPClient} A new instance of HTTPClient. + * Initializes an instance of HTTPPeers. + * @returns {HTTPPeers} A new instance of HTTPPeers. * @default */ - public static async init(rpcClient: RPCClient): Promise { + public static async init(rpcClient: RPCClient): Promise { const db = await PeerDB.init(rpcClient); - const peers = new HTTPClient(rpcClient, db); + const httpPeers = new HTTPPeers(rpcClient, db); + + (await Promise.all((await db.select()).map((peer) => HTTPPeer.init(peer, db, rpcClient._client)))).forEach((peer) => httpPeers.peers.set(peer.host, peer)); for (let i = 0; i < rpcClient._client.config.bootstrapPeers.length; i++) { - await peers.add(rpcClient._client.config.bootstrapPeers[i]); + await httpPeers.add(rpcClient._client.config.bootstrapPeers[i]); } - return peers; + return httpPeers; } async add(host: string): Promise { - if (host !== this._rpcClient._client.config.publicHostname) await HTTPPeer.init({ host }, this.db, this._rpcClient._client); + const peer = await HTTPPeer.init({ host }, this.db, this._rpcClient._client); + if (host !== this._rpcClient._client.config.publicHostname) this.peers.set(peer.host, peer); } - public getPeers = async (applicablePeers = false): Promise => { - const peers = (await this.db.select()).filter((peer) => !applicablePeers || typeof window === "undefined" || !peer.host.startsWith("http://")); + public getPeers = (applicablePeers = false): HTTPPeer[] => { + const peers = Array.from(this.peers).filter((peer) => !applicablePeers || typeof window === "undefined" || !peer[0].startsWith("http://")); if (this._rpcClient._client.config.preferNode === "FASTEST") { - return peers.sort((a, b) => a.bytes / a.duration - b.bytes / b.duration); + return peers.map(([_, peer]) => peer).sort((a, b) => a.bytes / a.duration - b.bytes / b.duration); } else if (this._rpcClient._client.config.preferNode === "LEAST_USED") { - return peers.sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects)); + return peers.map(([_, peer]) => peer).sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects)); } else if (this._rpcClient._client.config.preferNode === "HIGHEST_HITRATE") { - return peers.sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects)); + return peers.sort((a, b) => a[1].hits - a[1].rejects - (b[1].hits - b[1].rejects)).map(([_, peer]) => peer); } else { - return peers; + return peers.map(([_, peer]) => peer); } }; async getValidPeers(): Promise { - const peers = await this.getPeers(); + const peers = this.getPeers(); const results: PeerAttributes[] = []; const executing: Array> = []; @@ -470,7 +472,7 @@ export default class HTTPClient { results.push(peer); continue; } - const promise = this.validatePeer(await HTTPPeer.init(peer, this.db, this._rpcClient._client)).then((result) => { + const promise = peer.validate().then((result) => { if (result) results.push(peer); executing.splice(executing.indexOf(promise), 1); }); @@ -480,15 +482,9 @@ export default class HTTPClient { return results; } - async validatePeer(peer: HTTPPeer): Promise { - const file = await File.init({ hash: Utils.sha256("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f") }, this._rpcClient._client); - if (!file) throw new Error("Failed to build file"); - return await peer.downloadFile(file) !== false; - } - - public async fetch(input: RequestInfo, init?: RequestInit): Promise[]> { + public fetch(input: RequestInfo, init?: RequestInit): Promise[] { const req = typeof input === "string" ? new Request(input, init) : input; - const peers = await this.getPeers(true); + const peers = this.getPeers(true); const fetchPromises = peers.map(async (peer) => { try { const url = new URL(req.url); @@ -508,7 +504,7 @@ export default class HTTPClient { // TODO: Compare list between all peers and give score based on how similar they are. 100% = all exactly the same, 0% = no items in list were shared. The lower the score, the lower the propagation times, the lower the decentralisation async updatePeers(): Promise { console.log(`Fetching peers`); - const responses = await Promise.all(await this._rpcClient._client.rpcClient.fetch("http://localhost/peers")); + const responses = await Promise.all(this._rpcClient._client.rpcClient.fetch("http://localhost/peers")); for (let i = 0; i < responses.length; i++) { try { if (!(responses[i] instanceof Response)) continue; @@ -526,4 +522,10 @@ export default class HTTPClient { } } } + + public getSelf(): HTTPPeer { + const peer = this.peers.get(this._rpcClient._client.config.publicHostname); + if (!peer) throw new Error("Could not find self"); + return peer; + } } diff --git a/src/rpc/peers/rtc.ts b/src/rpc/peers/rtc.ts index f7363d5..df96f24 100644 --- a/src/rpc/peers/rtc.ts +++ b/src/rpc/peers/rtc.ts @@ -40,58 +40,47 @@ function arrayBufferToUnicodeString(buffer: ArrayBuffer): string { const peerId = Math.random(); -class RTCClient { +class RTCPeers { private _rpcClient: RPCClient; - peerId: number; + peerId = peerId; websockets: WebSocket[]; peerConnections: PeerConnections = {}; messageQueue: SignallingMessage[] = []; - seenMessages: Set; + seenMessages: Set = new Set(); - private constructor(rpcClient: RPCClient) { + constructor(rpcClient: RPCClient) { this._rpcClient = rpcClient; - this.peerId = peerId; this.websockets = [new WebSocket("wss://rooms.deno.dev/")]; - this.seenMessages = new Set(); - } - /** - * Initializes an instance of RTCClient. - * @returns {RTCClient} A new instance of RTCClient. - * @default - */ - static async init(rpcClient: RPCClient): Promise { - const webRTC = new RTCClient(rpcClient); - const peers = await rpcClient.http.getPeers(true); + const peers = rpcClient.http.getPeers(true); for (let i = 0; i < peers.length; i++) { try { - webRTC.websockets.push(new WebSocket(peers[i].host.replace("https://", "wss://").replace("http://", "ws://"))); + this.websockets.push(new WebSocket(peers[i].host.replace("https://", "wss://").replace("http://", "ws://"))); } catch (e) { if (rpcClient._client.config.logLevel === "verbose") console.error(e); continue; } } - for (let i = 0; i < webRTC.websockets.length; i++) { - webRTC.websockets[i].onopen = () => { - console.log(`WebRTC: (1/12): Announcing to ${webRTC.websockets[i].url}`); - const message: SignallingMessage = { announce: true, from: webRTC.peerId }; - webRTC.wsMessage(message); - setInterval(() => webRTC.wsMessage(message), rpcClient._client.config.announceSpeed); + for (let i = 0; i < this.websockets.length; i++) { + this.websockets[i].onopen = () => { + console.log(`WebRTC: (1/12): Announcing to ${this.websockets[i].url}`); + const message: SignallingMessage = { announce: true, from: this.peerId }; + this.wsMessage(message); + setInterval(() => this.wsMessage(message), rpcClient._client.config.announceSpeed); }; - webRTC.websockets[i].onmessage = async (event) => { + this.websockets[i].onmessage = async (event) => { const message = JSON.parse(event.data) as SignallingMessage; - if (message === null || message.from === peerId || webRTC.seenMessages.has(event.data) || ("to" in message && message.to !== webRTC.peerId)) return; - webRTC.seenMessages.add(event.data); - if ("announce" in message) await webRTC.handleAnnounce(message.from); - else if ("offer" in message) await webRTC.handleOffer(message.from, message.offer); - else if ("answer" in message) await webRTC.handleAnswer(message.from, message.answer); - else if ("iceCandidate" in message) webRTC.handleIceCandidate(message.from, message.iceCandidate); + if (message === null || message.from === peerId || this.seenMessages.has(event.data) || ("to" in message && message.to !== this.peerId)) return; + this.seenMessages.add(event.data); + if ("announce" in message) await this.handleAnnounce(message.from); + else if ("offer" in message) await this.handleOffer(message.from, message.offer); + else if ("answer" in message) await this.handleAnswer(message.from, message.answer); + else if ("iceCandidate" in message) this.handleIceCandidate(message.from, message.iceCandidate); else console.warn("WebRTC: (13/12): Unknown message type received", message); }; } - return webRTC; } async createPeerConnection(from: number): Promise { @@ -219,7 +208,6 @@ class RTCClient { console.warn(`WebRTC: (13/12): ${from} Peer connection in unexpected state 2: ${this.peerConnections[from].answered.conn.signalingState}`); return; } - console.log("Current signaling state:", this.peerConnections[from].answered.conn.signalingState); try { const answer = await this.peerConnections[from].answered.conn.createAnswer(); if (this.peerConnections[from].answered.conn.signalingState !== "have-remote-offer") return; @@ -305,4 +293,4 @@ class RTCClient { } } -export default RTCClient; +export default RTCPeers; diff --git a/src/rpc/server.ts b/src/rpc/server.ts index d816594..cc3bb78 100644 --- a/src/rpc/server.ts +++ b/src/rpc/server.ts @@ -1,12 +1,11 @@ import { encode as base32Encode } from "https://deno.land/std@0.194.0/encoding/base32.ts"; import type Hydrafiles from "../hydrafiles.ts"; // import { BLOCKSDIR } from "./block.ts"; -import File from "../file.ts"; import Utils, { type Base64 } from "../utils.ts"; import { join } from "https://deno.land/std@0.224.0/path/mod.ts"; -import { HTTPPeer } from "./peers/http.ts"; import { SignallingMessage } from "./peers/rtc.ts"; import { serveFile } from "https://deno.land/std@0.115.0/http/file_server.ts"; +import { File } from "../file.ts"; class RPCServer { private _client: Hydrafiles; @@ -42,17 +41,17 @@ class RPCServer { private onListen = async (hostname: string, port: number): Promise => { console.log(`Server started at ${hostname}:${port}`); console.log("Testing network connection"); - const file = await this._client.rpcClient.downloadFile(Utils.sha256("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f")); - if (file === false) console.error("Download test failed, cannot connect to network"); + const file = this._client.files.files.get("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f"); + if (!file) return; + if (!(await file.download())) console.error("Download test failed, cannot connect to network"); else { console.log("Connected to network"); if (Utils.isIp(this._client.config.publicHostname) && Utils.isPrivateIP(this._client.config.publicHostname)) console.error("Public hostname is a private IP address, cannot announce to other nodes"); else { console.log(`Testing downloads ${this._client.config.publicHostname}/download/04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f`); - const file = await File.init({ hash: Utils.sha256("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f") }, this._client); if (!file) console.error("Failed to build file"); else { - const response = await (await HTTPPeer.init({ host: this._client.config.publicHostname }, this._client.rpcClient.http.db, this._client)).downloadFile(file); + const response = await this._client.rpcClient.http.getSelf().downloadFile(file); // TODO: HTTPPeers.getSelf() if (response === false) console.error("Test: Failed to download file from self"); else { console.log("Announcing HTTP server to nodes"); @@ -102,7 +101,7 @@ class RPCServer { } else if (url.pathname === "/peers") { headers.set("Content-Type", "application/json"); headers.set("Cache-Control", "public, max-age=300"); - return new Response(JSON.stringify(await this._client.rpcClient.http.getPeers()), { headers }); + return new Response(JSON.stringify(this._client.rpcClient.http.getPeers()), { headers }); } else if (url.pathname === "/info") { headers.set("Content-Type", "application/json"); headers.set("Cache-Control", "public, max-age=300"); @@ -110,7 +109,7 @@ class RPCServer { } else if (url.pathname.startsWith("/announce")) { const host = url.searchParams.get("host"); if (host === null) return new Response("No hosted given\n", { status: 401 }); - const knownNodes = await this._client.rpcClient.http.getPeers(); + const knownNodes = this._client.rpcClient.http.getPeers(); if (knownNodes.find((node) => node.host === host) !== undefined) return new Response("Already known\n"); await this._client.rpcClient.http.add(host); return new Response("Announced\n"); @@ -187,8 +186,8 @@ class RPCServer { await this.processingRequests.get(infohash); } const processingPromise = (async () => { - const file = await File.init({ infohash }, this._client, true); - if (!file) throw new Error("Failed to build file"); + const file = this._client.files.files.get(infohash); + if (!file) throw new Error("Failed to find file"); await file.getMetadata(); let fileContent: { file: Uint8Array; signal: number } | false; @@ -262,7 +261,7 @@ class RPCServer { await this._client.fs.writeFile("config.json", new TextEncoder().encode(JSON.stringify(this._client.config, null, 2))); return new Response("200 OK\n"); } else if (url.pathname === "/files") { - const rows = (await this._client.fileDB.select()).map((row) => { + const rows = Array.from(this._client.files.files.values()).map((row) => { const { downloadCount, found, ...rest } = row; const _ = { downloadCount, found }; return rest; @@ -272,9 +271,9 @@ class RPCServer { return new Response(JSON.stringify(rows), { headers }); } else if (url.pathname.startsWith("/file/")) { const id = url.pathname.split("/")[2]; - let file: File | false; + let file: File | undefined; try { - file = await File.init({ id }, this._client, true); + file = this._client.files.files.get(id); } catch (e) { const err = e as Error; if (err.message === "File not found") return new Response("File not found", { headers, status: 404 }); @@ -305,7 +304,7 @@ class RPCServer { if (hostname in this.cachedHostnames) return new Response(this.cachedHostnames[hostname].body, { headers: this.cachedHostnames[hostname].headers }); console.log(` ${hostname} Fetching endpoint response from peers`); - const responses = await this._client.rpcClient.fetch(`http://localhost/endpoint/${hostname}`); + const responses = this._client.rpcClient.fetch(`http://localhost/endpoint/${hostname}`); const processingPromise = new Promise((resolve, reject) => { (async () => {