Skip to content

Commit

Permalink
feat: remove intermediate message format
Browse files Browse the repository at this point in the history
  • Loading branch information
luckydye committed Jan 5, 2024
1 parent 0fd5eb2 commit 1823686
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 186 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/Accessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class Accessor<Params extends AccessorParams, Cache, Data, RequestMessage
*/
transform: async (msg, controller) => {
if (msg._type === 'error') {
console.error('ohno an error, this should be handled!', msg.error);
console.error('ohno an error, this should be handled!', msg);

if (msg.error) {
this.onError(msg.error);
Expand Down
File renamed without changes.
24 changes: 7 additions & 17 deletions packages/api/src/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import RemoteLibrary from './api/RemoteLibrary.ts?worker';

import * as Comlink from 'comlink';
import { MessageType } from './lib.ts';

export default {
stream() {
Expand All @@ -10,37 +11,26 @@ export default {
const wrappedWorker = Comlink.wrap<typeof import('./api/RemoteLibrary.ts').default>(worker);

worker.onerror = (err) => {
console.error('Error creating worker');
console.error('Error in worker:', err);
};

wrappedWorker.connect(url);

let controller: ReadableStreamDefaultController<any>;

const read = new ReadableStream({
const read = new ReadableStream<{ _type: MessageType }>({
start(ctlr) {
controller = ctlr;

wrappedWorker.onMessage(
Comlink.proxy((msg) => {
controller.enqueue(msg);
console.log(msg);

ctlr.enqueue(msg);
})
);
},
});

const write = new WritableStream({
write(chunk) {
switch (chunk._type) {
case 'locations':
wrappedWorker.fetchLocations();
break;
case 'index':
wrappedWorker.fetchIndex(chunk.locations);
break;
default:
throw new Error(`Request message "${chunk._type}" not handled.`);
}
wrappedWorker.send(chunk);
},
});

Expand Down
14 changes: 10 additions & 4 deletions packages/api/src/accessors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { MessageType } from '../lib.js';
import { Accessor } from '../Accessor.js';
import Worker from '../Worker.js';
import { IndexEntryMessage } from 'tokyo-proto';
import * as proto from 'tokyo-proto';

export function createIndexAccessor() {
return new Accessor([Worker], {
Expand All @@ -14,10 +15,15 @@ export function createIndexAccessor() {
sortCreated: boolean;
}) {
if (params?.query) {
return {
_type: MessageType.Index,
locations: params.query.locations,
};
return proto.ClientMessage.create({
index: proto.RequestLibraryIndex.create({
ids: params.query.locations,
}),
});
// return {
// _type: MessageType.Index,
// locations: params.query.locations,
// };
}
},

Expand Down
15 changes: 12 additions & 3 deletions packages/api/src/accessors/locations.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import { MessageType } from '../lib.js';
import { Accessor } from '../Accessor.js';
import Worker from '../Worker.js';
import * as proto from 'tokyo-proto';

export function createLocationsAccessor() {
return new Accessor([Worker], {
createRequest(params: {
query: unknown;
}) {
return {
_type: MessageType.Locations,
};
return proto.ClientMessage.create({
locations: proto.RequestLocations.create({}),
});
// return {
// _type: MessageType.MutateLocations,
// path: params.path,
// name: params.name,
// };

// Create a library
// library.ClientMessage.create({
// create: library.CreateLibraryMessage.create({
// name: 'Desktop',
// path: '/Users/tihav/Desktop',
// }),
// })
},

handleMessage(msg) {
Expand Down
48 changes: 29 additions & 19 deletions packages/api/src/accessors/metadata.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { MessageType } from '../lib.js';
import { Accessor } from '../Accessor.js';
import * as proto from 'tokyo-proto';
import Worker from '../Worker.js';
import { DynamicImage } from '../DynamicImage.js';

export function createMetadataAccessor() {
const loadImage = (src: string): Promise<Image> => {
Expand Down Expand Up @@ -35,29 +37,37 @@ export function createMetadataAccessor() {
},
cache
) {
if (params?.query)
return {
_type: MessageType.Thumbnails,
ids: params.query.ids?.filter((id) => !cache[0]?.find((entry) => entry.id === id)),
};
if (params?.query) {
// const ids = params.query.ids?.filter((id) => !cache[0]?.find((entry) => entry.id === id));
const ids = params.query.ids;

return proto.ClientMessage.create({
meta: proto.RequestMetadata.create({
file: ids[0],
}),
});
}
},

async handleMessage(msg) {
if (msg._type === MessageType.Locations) {
const items = Promise.all(
msg.data.map(async (entry) => {
const buff = new Uint8Array(entry.metadata.thumbnail);
const blob = new Blob([buff]);
return {
...entry.metadata,
thumbnail: await makeThumbnail(blob, entry.metadata),
id: entry.id,
source_id: msg.source_id,
};
})
);
if (msg._type === MessageType.Metadata) {
const entry = async (entry) => {
const buff = new Uint8Array(entry.thumbnail);
const blob = new Blob([buff]);
return {
...entry,
thumbnail: await makeThumbnail(blob, entry),
id: entry.id,
source_id: msg.source_id,
};
};

// const items = Promise.all(
// msg.data.map(entry)
// );
// return items;

return items;
return [await entry(msg.data)];
}
},

Expand Down
1 change: 1 addition & 0 deletions packages/api/src/accessors/thumbnails.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MessageType } from '../lib.js';
import { Accessor } from '../Accessor.js';
import Worker from '../Worker.js';
import { DynamicImage } from '../DynamicImage.js';

export function createThumbnailAccessor(hosts: string[]) {
const makeThumbnail = (blob?: Blob) => {
Expand Down
144 changes: 24 additions & 120 deletions packages/api/src/api/RemoteLibrary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,136 +2,50 @@

import * as Comlink from 'comlink';
import * as library from 'tokyo-proto';
import { ClientAPIMessage, RequestMessage } from '../lib';
import { MessageType } from '../lib.ts';

let msg_count = 1;
const messageKeyToType = {
error: MessageType.Error,
list: MessageType.Locations,
index: MessageType.Index,
metadata: MessageType.Metadata,
};

export class RemoteLibrary {
ws!: WebSocket;

messageListeners = new Set<(arg: library.Message) => void>();

public async onMessage(callback: (arg: ClientAPIMessage) => void, id?: number) {
public async onMessage(callback: (arg: any) => void, id?: number) {
const listener = async (msg: library.Message) => {
if (id !== undefined && id !== msg.id) {
return;
}

const handledMessage = await this.handleMessage(msg);
if (handledMessage) {
callback(handledMessage);
for (const key in msg) {
if (msg[key] !== undefined) {
callback({
_type: messageKeyToType[key] || key,
data: msg[key],
});
return;
}
}

callback({
_type: MessageType.Error,
message: 'Message not handled',
});
};
this.messageListeners.add(listener);
return () => this.messageListeners.delete(listener);
}

public fetchLocations() {
const msg = library.ClientMessage.create({
id: ++msg_count,
locations: library.RequestLocations.create({}),
});

const res = new Promise<ClientAPIMessage>(async (resolve) => {
const unsub = await this.onMessage(async (msg) => {
unsub();
resolve(msg);
}, msg.id);
});

this.send(msg);

return res;
}

public fetchIndex(locations: string[]): Promise<ClientAPIMessage> {
const msg = library.ClientMessage.create({
id: ++msg_count,
index: library.RequestLibraryIndex.create({
ids: locations,
}),
});

const res = new Promise<ClientAPIMessage>(async (resolve) => {
const unsub = await this.onMessage(async (msg) => {
unsub();
resolve(msg);
}, msg.id);
});

this.send(msg);

return res;
}

private async handleMessage(message: library.Message): Promise<ClientAPIMessage | undefined> {
const type =
message.error ||
message.image ||
message.index ||
message.list ||
message.metadata ||
message.system;

switch (type) {
case message.error: {
console.error(new Error(`Error response, ${JSON.stringify(message)}`));
break;
}
case message.index: {
if (message.index) {
return {
_type: 'index',
data: message.index,
};
}
break;
}
case message.list: {
if (message.list) {
return {
_type: 'locations',
data: message.list.libraries,
};
}
break;
}
case message.metadata: {
// const file = message.metadata?.hash;
// const thumbnail = message.metadata?.thumbnail;
// if (file && thumbnail) {
// const blob = new Blob([thumbnail]);
// storage.writeTemp(file, await blob.arrayBuffer());
// }
//
if (message.metadata) {
return {
_type: 'metadata',
data: message.metadata,
};
}
break;
}
case message.image: {
return {
_type: 'image',
data: message.image,
};
}
case message.system: {
return {
_type: 'system',
data: message.system,
};
}
}

return undefined;
}

backlog = [];
backlog: library.ClientMessage[] = [];

send(msg: library.ClientMessage) {
console.log('RemoteLibrary.send', msg);

if (this.ws.readyState !== this.ws.OPEN) {
this.backlog.push(msg);
} else {
Expand All @@ -140,16 +54,6 @@ export class RemoteLibrary {
}
}

postLocation() {
const msg = library.ClientMessage.create({
create: library.CreateLibraryMessage.create({
name: 'Desktop',
path: '/Users/tihav/Desktop',
}),
});
this.send(msg);
}

retryTimeoutDuration = 1000;
retryTimeout: number | undefined = undefined;

Expand Down
2 changes: 0 additions & 2 deletions packages/library/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod image;
mod images;
mod ws;

use ::image::imageops::FilterType;
use anyhow::Result;
use axum::extract::WebSocketUpgrade;
use axum::routing::get;
Expand All @@ -13,7 +12,6 @@ use db::LibraryDatabase;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::path::Path;
use std::sync::Arc;
use sysinfo::DiskExt;
use sysinfo::SystemExt;
Expand Down
2 changes: 1 addition & 1 deletion packages/library/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub async fn handle_socket(mut socket: WebSocket) {
if message.is_err() {
let mut error_message = schema::Message::new();
error_message.error = Some(true);
error_message.message = Some("Something went wrong".to_string());
error_message.message = Some(message.err().unwrap().to_string());

sender
.lock()
Expand Down
Loading

0 comments on commit 1823686

Please sign in to comment.