Skip to content

Commit

Permalink
FIX node method
Browse files Browse the repository at this point in the history
  • Loading branch information
pubkey committed Nov 27, 2023
1 parent ddd2da7 commit 4ed44c9
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 43 deletions.
8 changes: 8 additions & 0 deletions src/broadcast-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,19 @@ BroadcastChannel.prototype = {
* @returns {Promise} that resolved when the message sending is done
*/
function _post(broadcastChannel, type, msg) {
console.log('_post - 0 ' + !!broadcastChannel._prepP);
const time = broadcastChannel.method.microSeconds();
console.log('_post time ' + time);
const msgObj = {
time,
type,
data: msg
};

const awaitPrepare = broadcastChannel._prepP ? broadcastChannel._prepP : PROMISE_RESOLVED_VOID;
console.log('_post - 1');
return awaitPrepare.then(() => {
console.log('_post - 2');

const sendPromise = broadcastChannel.method.postMessage(
broadcastChannel._state,
Expand All @@ -201,6 +205,7 @@ function _post(broadcastChannel, type, msg) {

// add/remove to unsent messages list
broadcastChannel._uMP.add(sendPromise);
console.log('_post - 3');
sendPromise
.catch()
.then(() => broadcastChannel._uMP.delete(sendPromise));
Expand All @@ -212,6 +217,8 @@ function _post(broadcastChannel, type, msg) {
function _prepareChannel(channel) {
const maybePromise = channel.method.create(channel.name, channel.options);
if (isPromise(maybePromise)) {

console.log('CRAETE IS POROMSE!');
channel._prepP = maybePromise;
maybePromise.then(s => {
// used in tests to simulate slow runtime
Expand All @@ -221,6 +228,7 @@ function _prepareChannel(channel) {
channel._state = s;
});
} else {
console.log('CRAETE IS NOT POROMSE!');
channel._state = maybePromise;
}
}
Expand Down
12 changes: 8 additions & 4 deletions src/methods/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,15 @@ export async function openClientConnection(channelName, readerUuid) {
* @return {Promise}
*/
export function writeMessage(channelName, readerUuid, messageJson, paths) {

let time = messageJson.time;
if (!time) {
time = microSeconds();
}

paths = paths || getPaths(channelName);
const time = microSeconds();
const writeObject = {
uuid: readerUuid,
time,
data: messageJson
};

Expand Down Expand Up @@ -451,8 +455,8 @@ export function _filterMessage(msgObj, state) {
if (msgObj.senderUuid === state.uuid) return false; // not send by own
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted
if (!state.messagesCallback) return false; // no listener
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback
if (msgObj.time < state.time) return false; // msgObj is older then channel
if (msgObj.time < state.messagesCallbackTime) return false; // not older than onMessageCallback
if (msgObj.time < state.time) return false; // msgObj is older than channel

state.emittedMessagesIds.add(msgObj.token);
return true;
Expand Down
4 changes: 2 additions & 2 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -776,13 +776,13 @@ if (isNode) {
useOptions.push({
type: 'node',
node: {
useFastPath: true
useFastPath: false
}
});
useOptions.push({
type: 'node',
node: {
useFastPath: false
useFastPath: true
}
});
} else {
Expand Down
77 changes: 40 additions & 37 deletions test/unit/node.method.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ describe('unit/node.method.test.js', () => {
await NodeMethod.ensureFoldersExist(channelName);
const sockets = await Promise.all(
new Array(2).fill(0)
.map(async () => {
const readerUuid = AsyncTestUtil.randomString(6);
const socket = await NodeMethod.createSocketEventEmitter(channelName, readerUuid);
return socket;
})
.map(async () => {
const readerUuid = AsyncTestUtil.randomString(6);
const socket = await NodeMethod.createSocketEventEmitter(channelName, readerUuid);
return socket;
})
);
sockets.forEach(socket => socket.server.close());
});
Expand All @@ -143,12 +143,12 @@ describe('unit/node.method.test.js', () => {
// ensure we have more then 30 channel-folders
const sockets = await Promise.all(
new Array(35).fill(0)
.map(async () => {
const uid = AsyncTestUtil.randomString(6);
const cN = AsyncTestUtil.randomString(12);
await NodeMethod.ensureFoldersExist(cN);
return NodeMethod.createSocketEventEmitter(cN, uid);
})
.map(async () => {
const uid = AsyncTestUtil.randomString(6);
const cN = AsyncTestUtil.randomString(12);
await NodeMethod.ensureFoldersExist(cN);
return NodeMethod.createSocketEventEmitter(cN, uid);
})
);

await NodeMethod.ensureFoldersExist(channelName);
Expand Down Expand Up @@ -199,7 +199,11 @@ describe('unit/node.method.test.js', () => {
foo: 'bar'
};

const msgObj = await NodeMethod.writeMessage(channelName, readerUuid, messageJson);
const msgObj = await NodeMethod.writeMessage(
channelName,
readerUuid,
messageJson
);

const exists = require('fs').existsSync(msgObj.path);
assert.ok(exists);
Expand All @@ -216,12 +220,12 @@ describe('unit/node.method.test.js', () => {

const sockets = await Promise.all(
new Array(5).fill(0)
.map(() => AsyncTestUtil.randomString(6))
.map(async (readerUuid) => {
await NodeMethod.createSocketInfoFile(channelName, readerUuid);
const s = await NodeMethod.createSocketEventEmitter(channelName, readerUuid);
return s;
})
.map(() => AsyncTestUtil.randomString(6))
.map(async (readerUuid) => {
await NodeMethod.createSocketInfoFile(channelName, readerUuid);
const s = await NodeMethod.createSocketEventEmitter(channelName, readerUuid);
return s;
})
);

const uuids = await NodeMethod.getReadersUuids(channelName);
Expand Down Expand Up @@ -266,7 +270,6 @@ describe('unit/node.method.test.js', () => {
const messages = await NodeMethod.getAllMessages(channelName);
assert.equal(messages.length, 2);
assert.ok(messages[0].path);
assert.ok(messages[0].time);
assert.ok(messages[0].senderUuid);
assert.ok(messages[0].token);
});
Expand Down Expand Up @@ -299,7 +302,7 @@ describe('unit/node.method.test.js', () => {
// write 5 messages
await Promise.all(
new Array(5).fill(0)
.map(() => NodeMethod.writeMessage(channelName, readerUuid, messageJson))
.map(() => NodeMethod.writeMessage(channelName, readerUuid, messageJson))
);

// w8 until they time out
Expand Down Expand Up @@ -437,7 +440,7 @@ describe('unit/node.method.test.js', () => {
});
});
describe('other', () => {
it('should have cleaned up the messages', async function() {
it('should have cleaned up the messages', async function () {
this.timeout(1000 * 20); // slow on windows
const channelOptions = {
node: {
Expand All @@ -454,7 +457,7 @@ describe('unit/node.method.test.js', () => {
// send 100 messages
await Promise.all(
new Array(100).fill(0)
.map(() => NodeMethod.postMessage(channelStateOwn, msgJson))
.map(() => NodeMethod.postMessage(channelStateOwn, msgJson))
);

// w8 until ttl has reached
Expand All @@ -463,7 +466,7 @@ describe('unit/node.method.test.js', () => {
// send 100 messages again to trigger cleanup
await Promise.all(
new Array(100).fill(0)
.map(() => NodeMethod.postMessage(channelStateOwn, msgJson))
.map(() => NodeMethod.postMessage(channelStateOwn, msgJson))
);

// ensure only the last 100 messages are here
Expand Down Expand Up @@ -508,25 +511,25 @@ describe('unit/node.method.test.js', () => {
const channelName = AsyncTestUtil.randomString(12);
const readers = await Promise.all(
new Array(50).fill(0)
.map(async () => {
const channelState = await NodeMethod.create(channelName);
const emitted = [];
NodeMethod.onMessage(channelState, msg => emitted.push(msg), new Date().getMilliseconds());
return {
channelState,
emitted
};
})
.map(async () => {
const channelState = await NodeMethod.create(channelName);
const emitted = [];
NodeMethod.onMessage(channelState, msg => emitted.push(msg), new Date().getMilliseconds());
return {
channelState,
emitted
};
})
);

const senderState = await NodeMethod.create(channelName);

// send 100 messages
await Promise.all(
new Array(100).fill(0)
.map(() => NodeMethod.postMessage(senderState, {
foo: 'bar'
}))
.map(() => NodeMethod.postMessage(senderState, {
foo: 'bar'
}))
);

await AsyncTestUtil.waitUntil(() => {
Expand All @@ -553,8 +556,8 @@ describe('unit/node.method.test.js', () => {

const otherStates = await Promise.all(
new Array(10)
.fill(0)
.map(() => NodeMethod.create(channelName))
.fill(0)
.map(() => NodeMethod.create(channelName))
);

NodeMethod.refreshReaderClients(channelStateOwn);
Expand Down

0 comments on commit 4ed44c9

Please sign in to comment.