Skip to content

Commit

Permalink
Merge pull request #178 from StarfilesFileSharing/alpha
Browse files Browse the repository at this point in the history
Alpha
  • Loading branch information
QuixThe2nd authored Nov 17, 2024
2 parents 666e69c + 6c07f4c commit d1124a9
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 66 deletions.
2 changes: 1 addition & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@starfiles/hydrafiles",
"version": "0.9.4",
"version": "0.9.5",
"description": "The (P2P) web privacy layer.",
"main": "src/hydrafiles.ts",
"exports": {
Expand Down
58 changes: 27 additions & 31 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ export enum FileEvent {
FileServed = "FileServed",
FileNotFound = "FileNotFound",
}

export enum RTCEvent {
RTCAnnounce = "RTCAnnounce",
RTCOpen = "RTCOpen",
Expand All @@ -11,29 +10,16 @@ export enum RTCEvent {
RTCIce = "RTCIce",
RTCClose = "RTCClose",
}

export interface EventsLogs {
file: Record<FileEvent, Record<number, number>>;
rtc: Record<RTCEvent, Record<number, number>>;
}
export type FileEventLog = Record<FileEvent, number[]>;
export type RTCEventLog = Record<RTCEvent, number[]>;

class Events {
interval = 10000;
lastInterval = 0;
startTime: number;
logs: EventsLogs = {
file: {
[FileEvent.FileServed]: {},
[FileEvent.FileNotFound]: {},
},
rtc: {
[RTCEvent.RTCAnnounce]: {},
[RTCEvent.RTCOpen]: {},
[RTCEvent.RTCOffer]: {},
[RTCEvent.RTCAnswer]: {},
[RTCEvent.RTCIce]: {},
[RTCEvent.RTCClose]: {},
},
logs = {
file: {} as FileEventLog,
rtc: {} as RTCEventLog,
};
fileEvents = FileEvent;
rtcEvents = RTCEvent;
Expand All @@ -43,23 +29,33 @@ class Events {
}

public log = (event: FileEvent | RTCEvent) => {
const eventType = event.constructor.name;

const interval = Math.floor((+new Date() - this.startTime) / this.interval);

for (let i = this.lastInterval + 1; i < interval; i++) {
Object.values(FileEvent).forEach((fileEvent) => {
if (!this.logs["file"][fileEvent][i]) this.logs["file"][fileEvent][i] = 0;
});
Object.values(RTCEvent).forEach((rtcEvent) => {
if (!this.logs["rtc"][rtcEvent][i]) this.logs["rtc"][rtcEvent][i] = 0;
for (let i = 0; i < interval; i++) {
(Object.keys(this.logs) as Array<keyof typeof this.logs>).forEach((key) => {
(Object.keys(this.logs[key]) as Array<FileEvent | RTCEvent>).forEach((event) => {
if (typeof (this.logs[key] as Record<FileEvent | RTCEvent, number[]>)[event][i] === "undefined") {
(this.logs[key] as Record<FileEvent | RTCEvent, number[]>)[event][i] = 0;
}
});
});
}

if (Object.values(FileEvent).includes(event as FileEvent)) {
if (!this.logs["file"][event as FileEvent][interval]) this.logs["file"][event as FileEvent][interval] = 0;
this.logs["file"][event as FileEvent][interval]++;
} else if (Object.values(RTCEvent).includes(event as RTCEvent)) {
if (!this.logs["rtc"][event as RTCEvent][interval]) this.logs["rtc"][event as RTCEvent][interval] = 0;
this.logs["rtc"][event as RTCEvent][interval]++;
if (eventType in FileEvent) {
const fileEvent = event as FileEvent;
if (!(fileEvent in this.logs.file)) this.logs.file[fileEvent] = [];
for (let i = this.logs.file[fileEvent].length; i < interval; i++) {
this.logs.file[fileEvent][i] = 0;
}
if (!this.logs.file[fileEvent][interval]) this.logs.file[fileEvent][interval] = 0;
this.logs.file[fileEvent][interval]++;
} else if (event in RTCEvent) {
const rtcEvent = event as RTCEvent;
if (!(rtcEvent in this.logs.rtc)) this.logs.rtc[rtcEvent] = [];
if (!this.logs.rtc[rtcEvent][interval]) this.logs.rtc[rtcEvent][interval] = 0;
this.logs.rtc[rtcEvent][interval]++;
}

this.lastInterval = interval;
Expand Down
4 changes: 2 additions & 2 deletions src/hydrafiles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ class Hydrafiles {
this.keyPair = await this.utils.getKeyPair();
console.log("Startup: Populating FileDB");
this.files = await Files.init(this);
console.log("Startup: Populating Wallet");
this.wallet = await Wallet.init(this);
console.log("Startup: Populating RPC Client & Server");
this.rpcClient = await RPCClient.init(this);
this.rpcServer = new RPCServer(this);
console.log("Startup: Populating Wallet");
this.wallet = await Wallet.init(this);
console.log("Startup: Starting WebTorrent");
this.webtorrent = opts.webtorrent;

Expand Down
33 changes: 17 additions & 16 deletions src/rpc/peers/rtc.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { RTCDataChannel, RTCIceCandidate, RTCPeerConnection, RTCSessionDescription } from "npm:werift";
import type RPCClient from "../client.ts";
import { encodeBase32 } from "jsr:@std/encoding@^1.0.5/base32";

export type SignallingAnnounce = { announce: true; from: string };
export type SignallingOffer = { offer: RTCSessionDescription; from: string; to: string };
export type SignallingAnswer = { answer: RTCSessionDescription; from: string; to: string };
export type SignallingIceCandidate = { iceCandidate: RTCIceCandidate; from: string; to: string };
export type WSRequest = { request: { method: string; url: string; headers: Record<string, string>; body: ReadableStream<Uint8Array> | null }; id: number; from: string };
export type WSResponse = { response: { body: string; status: number; statusText: string; headers: Record<string, string> }; id: number; from: string };
import type { EthAddress } from "../../wallet.ts";

export type SignallingAnnounce = { announce: true; from: EthAddress };
export type SignallingOffer = { offer: RTCSessionDescription; from: EthAddress; to: EthAddress };
export type SignallingAnswer = { answer: RTCSessionDescription; from: EthAddress; to: EthAddress };
export type SignallingIceCandidate = { iceCandidate: RTCIceCandidate; from: EthAddress; to: EthAddress };
export type WSRequest = { request: { method: string; url: string; headers: Record<string, string>; body: ReadableStream<Uint8Array> | null }; id: number; from: EthAddress };
export type WSResponse = { response: { body: string; status: number; statusText: string; headers: Record<string, string> }; id: number; from: EthAddress };
export type WSMessage = SignallingAnnounce | SignallingOffer | SignallingAnswer | SignallingIceCandidate | WSRequest | WSResponse;

type PeerConnection = { conn: RTCPeerConnection; channel: RTCDataChannel; startTime: number };
Expand Down Expand Up @@ -42,11 +42,10 @@ function arrayBufferToUnicodeString(buffer: ArrayBuffer): string {
}

const receivedPackets: Record<string, string[]> = {};
const peerId = encodeBase32(String(Math.random()).replace("0.", "")).replaceAll("=", "");

class RTCPeers {
private _rpcClient: RPCClient;
peerId = peerId;
peerId: EthAddress;
websockets: WebSocket[];
peerConnections: PeerConnections = {};
messageQueue: WSMessage[] = [];
Expand All @@ -56,6 +55,8 @@ class RTCPeers {
this._rpcClient = rpcClient;
this.websockets = [new WebSocket("wss://rooms.deno.dev/")];

this.peerId = rpcClient._client.wallet.address();

const peers = rpcClient.http.getPeers(true);
for (let i = 0; i < peers.length; i++) {
try {
Expand All @@ -76,7 +77,7 @@ class RTCPeers {

this.websockets[i].onmessage = async (event) => {
const message = JSON.parse(event.data) as WSMessage;
if (message === null || message.from === peerId || this.seenMessages.has(event.data) || ("to" in message && message.to !== this.peerId)) return;
if (message === null || message.from === this.peerId || this.seenMessages.has(event.data) || ("to" in message && message.to !== this.peerId)) return;
this.seenMessages.add(event.data);
if ("announce" in message) await this.handleAnnounce(message.from);
else if ("offer" in message) await this.handleOffer(message.from, message.offer);
Expand All @@ -88,7 +89,7 @@ class RTCPeers {
}
}

async createPeerConnection(from: string): Promise<PeerConnection> {
async createPeerConnection(from: EthAddress): Promise<PeerConnection> {
this._rpcClient._client.events.log(this._rpcClient._client.events.rtcEvents.RTCOpen);
const config = {
iceServers: [
Expand Down Expand Up @@ -203,7 +204,7 @@ class RTCPeers {
}
}

async handleAnnounce(from: string): Promise<void> {
async handleAnnounce(from: EthAddress): Promise<void> {
this._rpcClient._client.events.log(this._rpcClient._client.events.rtcEvents.RTCAnnounce);
console.log(`WebRTC: (2/12): ${from} Received announce`);
if (this.peerConnections[from] && this.peerConnections[from].offered) {
Expand All @@ -214,7 +215,7 @@ class RTCPeers {
this.peerConnections[from].offered = await this.createPeerConnection(from);
}

async handleOffer(from: string, offer: RTCSessionDescription): Promise<void> {
async handleOffer(from: EthAddress, offer: RTCSessionDescription): Promise<void> {
this._rpcClient._client.events.log(this._rpcClient._client.events.rtcEvents.RTCOffer);
if (typeof this.peerConnections[from] === "undefined") this.peerConnections[from] = {};
if (this.peerConnections[from].answered && this.peerConnections[from].answered?.channel.readyState === "open") {
Expand Down Expand Up @@ -250,7 +251,7 @@ class RTCPeers {
}
}

async handleAnswer(from: string, answer: RTCSessionDescription): Promise<void> {
async handleAnswer(from: EthAddress, answer: RTCSessionDescription): Promise<void> {
this._rpcClient._client.events.log(this._rpcClient._client.events.rtcEvents.RTCAnswer);
if (!this.peerConnections[from] || !this.peerConnections[from].offered) {
console.log(this.peerConnections[from].offered);
Expand All @@ -265,7 +266,7 @@ class RTCPeers {
await this.peerConnections[from].offered.conn.setRemoteDescription(answer);
}

handleIceCandidate(from: string, receivedIceCandidate: RTCIceCandidate): void {
handleIceCandidate(from: EthAddress, receivedIceCandidate: RTCIceCandidate): void {
const iceCandidate = receivedIceCandidate;
this._rpcClient._client.events.log(this._rpcClient._client.events.rtcEvents.RTCIce);
if (!this.peerConnections[from]) {
Expand Down
32 changes: 16 additions & 16 deletions web/dashboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import WebTorrent from "https://esm.sh/[email protected]";
import { Chart } from "https://esm.sh/[email protected]/auto";
import { processingRequests } from "../src/rpc/routes.ts";
import { ErrorNotInitialised } from "../src/errors.ts";
import type { FileEventLog, RTCEventLog } from "../src/events.ts";

declare global {
interface Window {
Expand Down Expand Up @@ -62,7 +63,7 @@ document.getElementById("startHydrafilesButton")!.addEventListener("click", asyn

await window.hydrafiles.start({ onUpdateFileListProgress, webtorrent });
console.log("Hydrafiles web node is running", window.hydrafiles);
setInterval(tickHandler, 60 * 1000);
setInterval(tickHandler, 30 * 1000);
tickHandler();
});

Expand Down Expand Up @@ -134,11 +135,7 @@ const tickHandler = async () => {
} catch (e) {
console.error(e);
}
try {
fetchAndPopulateCharts();
} catch (e) {
console.error(e);
}
fetchAndPopulateCharts();
} catch (e) {
console.error(e);
}
Expand Down Expand Up @@ -357,16 +354,18 @@ function fetchAndPopulateCharts() {
}
}

function populateChart(name: string, data: any) {
function populateChart(name: string, data: FileEventLog | RTCEventLog) {
const events = Object.keys(data);
const times = Object.keys(data[events[0]]);

const datasets = events.map((label) => ({
label,
data: times.map((time) => data[label][time] ?? 0),
data: data[label as keyof typeof data],
backgroundColor: getRandomColor(),
fill: true,
}));
console.log(datasets);

const maxLength = Math.max(...events.map((event) => (data[event as keyof typeof data] as number[]).length));
const labels = Array.from({ length: maxLength }, (_, i) => i.toString());

if (!chartInstances[name]) {
const canvas = document.createElement("canvas");
Expand All @@ -375,7 +374,7 @@ function populateChart(name: string, data: any) {
const lineChartCtx = (document.getElementById(name) as HTMLCanvasElement).getContext("2d")!;
chartInstances[name] = new Chart(lineChartCtx, {
type: "line",
data: { labels: times, datasets },
data: { labels, datasets },
options: {
responsive: true,
plugins: { legend: { position: "top" } },
Expand All @@ -384,12 +383,13 @@ function populateChart(name: string, data: any) {
},
},
});
} else {
chartInstances[name].data.labels = labels;
chartInstances[name].data.datasets.forEach((dataset, index) => {
dataset.data = datasets[index].data;
});
chartInstances[name].update();
}
chartInstances[name].data.labels = times;
chartInstances[name].data.datasets.forEach((dataset, index) => {
dataset.data = datasets[index].data;
});
chartInstances[name].update();
}

function getRandomColor(opacity = 1): string {
Expand Down

0 comments on commit d1124a9

Please sign in to comment.