Skip to content

Commit

Permalink
Merge pull request #147 from StarfilesFileSharing/alpha
Browse files Browse the repository at this point in the history
Alpha
  • Loading branch information
QuixThe2nd authored Nov 5, 2024
2 parents 8fec946 + db972ac commit 5e942f5
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 131 deletions.
2 changes: 1 addition & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@starfiles/hydrafiles",
"version": "0.7.34",
"version": "0.7.35",
"description": "The (P2P) web privacy layer.",
"main": "src/hydrafiles.ts",
"exports": {
Expand Down
10 changes: 4 additions & 6 deletions src/hydrafiles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ class Hydrafiles {
console.log("Startup: Populating FileDB");
this.fileDB = await FileDB.init(this);
console.log("Startup: Populating RPC Client & Server");
this.rpcClient = new RPCClient(this);
this.rpcClient.start().then(() => {
this.rpcServer = new RPCServer(this);
this.startBackgroundTasks(onUpdateFileListProgress);
});
this.rpcClient = await RPCClient.init(this);
this.rpcServer = new RPCServer(this);
this.startBackgroundTasks(onUpdateFileListProgress);
}

startBackgroundTasks(onUpdateFileListProgress?: (progress: number, total: number) => void): void {
Expand Down Expand Up @@ -164,7 +162,7 @@ class Hydrafiles {
(await this.fs.readDir("files/")).length,
`(${Math.round((100 * await this.utils.calculateUsedStorage()) / 1024 / 1024 / 1024) / 100}GB)`,
"\n| Processing Files:",
this.rpcServer.hashLocks.size,
this.rpcServer.processingRequests.size,
"\n| Known HTTP Peers:",
(await this.rpcClient.http.getPeers()).length,
// '\n| Seeding Torrent Files:',
Expand Down
14 changes: 8 additions & 6 deletions src/rpc/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import File from "../file.ts";
import Utils, { type Sha256 } from "../utils.ts";

export default class RPCClient {
private _client: Hydrafiles;
_client: Hydrafiles;
http!: HTTPClient;
rtc!: RTCClient;

constructor(client: Hydrafiles) {
private constructor(client: Hydrafiles) {
this._client = client;
}
async start(): Promise<void> {
this.http = await HTTPClient.init(this._client);
this.rtc = await RTCClient.init(this._client);
static async init(client: Hydrafiles): Promise<RPCClient> {
const rpcClient = new RPCClient(client);
rpcClient.http = await HTTPClient.init(rpcClient);
rpcClient.rtc = await RTCClient.init(rpcClient);
return rpcClient;
}

public async fetch(input: RequestInfo, init?: RequestInit): Promise<Promise<Response | false>[]> {
Expand All @@ -37,7 +39,7 @@ export default class RPCClient {
for (const peer of peers) {
let fileContent: { file: Uint8Array; signal: number } | false = false;
try {
fileContent = await this.http.downloadFromPeer(await HTTPPeer.init(peer, this.http._db), file);
fileContent = await (await HTTPPeer.init(peer, this.http.db, this._client)).downloadFile(file);
} catch (e) {
console.error(e);
}
Expand Down
175 changes: 90 additions & 85 deletions src/rpc/peers/http.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import type Hydrafiles from "../../hydrafiles.ts";
import Utils from "../../utils.ts";
import Utils, { type NonNegativeNumber } from "../../utils.ts";
import type { Database } from "jsr:@db/[email protected]";
import type { indexedDB } from "https://deno.land/x/[email protected]/ponyfill.ts";
import File from "../../file.ts";
import type RPCClient from "../client.ts";

type DatabaseWrapper = { type: "UNDEFINED"; db: undefined } | { type: "SQLITE"; db: Database } | { type: "INDEXEDDB"; db: IDBDatabase };

interface PeerAttributes {
host: string;
hits: number;
rejects: number;
bytes: number;
duration: number;
hits: NonNegativeNumber;
rejects: NonNegativeNumber;
bytes: NonNegativeNumber;
duration: NonNegativeNumber;
updatedAt: string;
}

Expand Down Expand Up @@ -51,21 +52,24 @@ function createIDBDatabase(): Promise<IDBDatabase> {
return dbPromise;
}

export class PeerDB {
private _client: Hydrafiles;
/**
* @group Database
*/
class PeerDB {
private _rpcClient: RPCClient;
db: DatabaseWrapper = { type: "UNDEFINED", db: undefined };

private constructor(client: Hydrafiles) {
this._client = client;
private constructor(rpcClient: RPCClient) {
this._rpcClient = rpcClient;
}

/**
* Initializes an instance of PeerDB.
* @returns {PeerDB} A new instance of PeerDB.
* @default
*/
static async init(client: Hydrafiles): Promise<PeerDB> {
const peerDB = new PeerDB(client);
static async init(rpcClient: RPCClient): Promise<PeerDB> {
const peerDB = new PeerDB(rpcClient);

if (typeof window === "undefined") {
const database = (await import("jsr:@db/[email protected]")).Database;
Expand Down Expand Up @@ -224,16 +228,16 @@ export class PeerDB {
const query = `UPDATE peer SET ${updatedColumn.map((column) => `${column} = ?`).join(", ")} WHERE host = ?`;
this.db.db.prepare(query).values(params);
console.log(
` ${host} Peer UPDATEd - Updated Columns: ${updatedColumn.join(", ")}` + (this._client.config.logLevel === "verbose" ? ` - Params: ${params.join(", ")} - Query: ${query}` : ""),
this._client.config.logLevel === "verbose" ? console.log(` ${host} Updated Values:`, beforeAndAfter) : "",
` ${host} Peer UPDATEd - Updated Columns: ${updatedColumn.join(", ")}` + (this._rpcClient._client.config.logLevel === "verbose" ? ` - Params: ${params.join(", ")} - Query: ${query}` : ""),
this._rpcClient._client.config.logLevel === "verbose" ? console.log(` ${host} Updated Values:`, beforeAndAfter) : "",
);
} else {
// @ts-expect-error:
const { _db, ...clonedPeer } = newPeer;
if (this.db.type === "INDEXEDDB") this.objectStore().put(clonedPeer).onerror = console.error;
console.log(
` ${host} Peer UPDATEd - Updated Columns: ${updatedColumn.join(", ")}` + (this._client.config.logLevel === "verbose" ? ` - Params: ${params.join(", ")}` : ""),
this._client.config.logLevel === "verbose" ? console.log(` ${host} Updated Values:`, beforeAndAfter) : "",
` ${host} Peer UPDATEd - Updated Columns: ${updatedColumn.join(", ")}` + (this._rpcClient._client.config.logLevel === "verbose" ? ` - Params: ${params.join(", ")}` : ""),
this._rpcClient._client.config.logLevel === "verbose" ? console.log(` ${host} Updated Values:`, beforeAndAfter) : "",
);
}
}
Expand Down Expand Up @@ -314,14 +318,16 @@ export class PeerDB {

export class HTTPPeer implements PeerAttributes {
host: string;
hits = 0;
rejects = 0;
bytes = 0;
duration = 0;
hits: NonNegativeNumber = 0 as NonNegativeNumber;
rejects: NonNegativeNumber = 0 as NonNegativeNumber;
bytes: NonNegativeNumber = 0 as NonNegativeNumber;
duration: NonNegativeNumber = 0 as NonNegativeNumber;
updatedAt: string = new Date().toISOString();
private _db: PeerDB;
private _client: Hydrafiles;

private constructor(values: PeerAttributes, db: PeerDB) {
private constructor(values: PeerAttributes, db: PeerDB, client: Hydrafiles) {
this._client = client;
this._db = db;

if (values.host === undefined || values.host === null) throw new Error("Created peer without host");
Expand All @@ -339,7 +345,7 @@ export class HTTPPeer implements PeerAttributes {
* @returns {HTTPPeer} A new instance of HTTPPeer.
* @default
*/
static async init(values: Partial<PeerAttributes>, db: PeerDB): Promise<HTTPPeer> {
static async init(values: Partial<PeerAttributes>, db: PeerDB, client: Hydrafiles): Promise<HTTPPeer> {
if (values.host === undefined) throw new Error("Hash is required");
const peerAttributes: PeerAttributes = {
host: values.host,
Expand All @@ -356,68 +362,23 @@ export class HTTPPeer implements PeerAttributes {
peer = (await db.select({ key: "host", value: values.host }))[0];
}

return new HTTPPeer(peer, db);
return new HTTPPeer(peer, db, client);
}

save(): void {
this.updatedAt = new Date().toISOString();
if (this._db) this._db.update(this.host, this);
}
}

// TODO: Log common user-agents and re-use them to help anonimise non Hydrafiles peers
export default class HTTPClient {
private _client: Hydrafiles;
/** @internal */
public _db: PeerDB;

private constructor(client: Hydrafiles, db: PeerDB) {
this._client = client;
this._db = db;
}

/**
* Initializes an instance of HTTPClient.
* @returns {HTTPClient} A new instance of HTTPClient.
* @default
*/
public static async init(client: Hydrafiles): Promise<HTTPClient> {
const db = await PeerDB.init(client);
const peers = new HTTPClient(client, db);

for (let i = 0; i < client.config.bootstrapPeers.length; i++) {
await peers.add(client.config.bootstrapPeers[i]);
}
return peers;
}

async add(host: string): Promise<void> {
if (host !== this._client.config.publicHostname) await HTTPPeer.init({ host }, this._db);
}

public getPeers = async (applicablePeers = false): Promise<PeerAttributes[]> => {
const peers = (await this._db.select()).filter((peer) => !applicablePeers || typeof window === "undefined" || !peer.host.startsWith("http://"));

if (this._client.config.preferNode === "FASTEST") {
return peers.sort((a, b) => a.bytes / a.duration - b.bytes / b.duration);
} else if (this._client.config.preferNode === "LEAST_USED") {
return peers.sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects));
} else if (this._client.config.preferNode === "HIGHEST_HITRATE") {
return peers.sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects));
} else {
return peers;
}
};

async downloadFromPeer(peer: HTTPPeer, file: File): Promise<{ file: Uint8Array; signal: number } | false> {
async downloadFile(file: File): Promise<{ file: Uint8Array; signal: number } | false> {
try {
const startTime = Date.now();

const hash = file.hash;
console.log(` ${hash} Downloading from ${peer.host}`);
console.log(` ${hash} Downloading from ${this.host}`);
let response;
try {
response = await Utils.promiseWithTimeout(fetch(`${peer.host}/download/${hash}`), this._client.config.timeout);
response = await Utils.promiseWithTimeout(fetch(`${this.host}/download/${hash}`), this._client.config.timeout);
} catch (e) {
const err = e as Error;
if (this._client.config.logLevel === "verbose" && err.message !== "Promise timed out") console.error(e);
Expand All @@ -435,10 +396,10 @@ export default class HTTPClient {
file.save();
}

peer.duration += Date.now() - startTime;
peer.bytes += peerContent.byteLength;
peer.hits++;
peer.save();
this.duration = Utils.createNonNegativeNumber(this.duration + Date.now() - startTime);
this.bytes = Utils.createNonNegativeNumber(this.bytes + peerContent.byteLength);
this.hits++;
this.save();

await file.cacheFile(peerContent);
return {
Expand All @@ -447,12 +408,56 @@ export default class HTTPClient {
};
} catch (e) {
console.error(e);
peer.rejects++;
this.rejects++;

peer.save();
this.save();
return false;
}
}
}

// TODO: Log common user-agents and re-use them to help anonimise non Hydrafiles peers
export default class HTTPClient {
private _rpcClient: RPCClient;
public db: PeerDB;

private constructor(rpcClient: RPCClient, db: PeerDB) {
this._rpcClient = rpcClient;
this.db = db;
}

/**
* Initializes an instance of HTTPClient.
* @returns {HTTPClient} A new instance of HTTPClient.
* @default
*/
public static async init(rpcClient: RPCClient): Promise<HTTPClient> {
const db = await PeerDB.init(rpcClient);
const peers = new HTTPClient(rpcClient, db);

for (let i = 0; i < rpcClient._client.config.bootstrapPeers.length; i++) {
await peers.add(rpcClient._client.config.bootstrapPeers[i]);
}
return peers;
}

async add(host: string): Promise<void> {
if (host !== this._rpcClient._client.config.publicHostname) await HTTPPeer.init({ host }, this.db, this._rpcClient._client);
}

public getPeers = async (applicablePeers = false): Promise<PeerAttributes[]> => {
const peers = (await this.db.select()).filter((peer) => !applicablePeers || typeof window === "undefined" || !peer.host.startsWith("http://"));

if (this._rpcClient._client.config.preferNode === "FASTEST") {
return peers.sort((a, b) => a.bytes / a.duration - b.bytes / b.duration);
} else if (this._rpcClient._client.config.preferNode === "LEAST_USED") {
return peers.sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects));
} else if (this._rpcClient._client.config.preferNode === "HIGHEST_HITRATE") {
return peers.sort((a, b) => a.hits - a.rejects - (b.hits - b.rejects));
} else {
return peers;
}
};

async getValidPeers(): Promise<PeerAttributes[]> {
const peers = await this.getPeers();
Expand All @@ -461,11 +466,11 @@ export default class HTTPClient {

for (let i = 0; i < peers.length; i++) {
const peer = peers[i];
if (peer.host === this._client.config.publicHostname) {
if (peer.host === this._rpcClient._client.config.publicHostname) {
results.push(peer);
continue;
}
const promise = this.validatePeer(await HTTPPeer.init(peer, this._db)).then((result) => {
const promise = this.validatePeer(await HTTPPeer.init(peer, this.db, this._rpcClient._client)).then((result) => {
if (result) results.push(peer);
executing.splice(executing.indexOf(promise), 1);
});
Expand All @@ -476,9 +481,9 @@ export default class HTTPClient {
}

async validatePeer(peer: HTTPPeer): Promise<boolean> {
const file = await File.init({ hash: Utils.sha256("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f") }, this._client);
const file = await File.init({ hash: Utils.sha256("04aa07009174edc6f03224f003a435bcdc9033d2c52348f3a35fbb342ea82f6f") }, this._rpcClient._client);
if (!file) throw new Error("Failed to build file");
return await this.downloadFromPeer(peer, file) !== false;
return await peer.downloadFile(file) !== false;
}

public async fetch(input: RequestInfo, init?: RequestInit): Promise<Promise<Response | false>[]> {
Expand All @@ -490,9 +495,9 @@ export default class HTTPClient {
const peerUrl = new URL(peer.host);
url.hostname = peerUrl.hostname;
url.protocol = peerUrl.protocol;
return await Utils.promiseWithTimeout(fetch(url.toString(), init), this._client.config.timeout);
return await Utils.promiseWithTimeout(fetch(url.toString(), init), this._rpcClient._client.config.timeout);
} catch (e) {
if (this._client.config.logLevel === "verbose") console.error(e);
if (this._rpcClient._client.config.logLevel === "verbose") console.error(e);
return false;
}
});
Expand All @@ -503,7 +508,7 @@ export default class HTTPClient {
// TODO: Compare list between all peers and give score based on how similar they are. 100% = all exactly the same, 0% = no items in list were shared. The lower the score, the lower the propagation times, the lower the decentralisation
async updatePeers(): Promise<void> {
console.log(`Fetching peers`);
const responses = await Promise.all(await this._client.rpcClient.fetch("http://localhost/peers"));
const responses = await Promise.all(await this._rpcClient._client.rpcClient.fetch("http://localhost/peers"));
for (let i = 0; i < responses.length; i++) {
try {
if (!(responses[i] instanceof Response)) continue;
Expand All @@ -512,12 +517,12 @@ export default class HTTPClient {
const remotePeers = (await response.json()) as HTTPPeer[];
for (const remotePeer of remotePeers) {
this.add(remotePeer.host).catch((e) => {
if (this._client.config.logLevel === "verbose") console.error(e);
if (this._rpcClient._client.config.logLevel === "verbose") console.error(e);
});
}
}
} catch (e) {
if (this._client.config.logLevel === "verbose") console.error(e);
if (this._rpcClient._client.config.logLevel === "verbose") console.error(e);
}
}
}
Expand Down
Loading

0 comments on commit 5e942f5

Please sign in to comment.