Skip to content

Commit

Permalink
move logic to comms flag instead of player enter/leave scnee
Browse files Browse the repository at this point in the history
  • Loading branch information
gonpombo8 committed Oct 18, 2024
1 parent 1fc98ee commit 0132588
Showing 1 changed file with 39 additions and 38 deletions.
77 changes: 39 additions & 38 deletions packages/@dcl/sdk/src/network/message-bus-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ export function addSyncTransport(
pendingMessageBusMessagesToSend.length = 0
return messages
}
const players = definePlayerHelper(engine)

let stateIsSyncronized = false
let transportInitialzed = false

// Add Sync Transport
const transport: Transport = {
filter: syncFilter(engine),
Expand All @@ -55,7 +58,7 @@ export function addSyncTransport(
engine.addTransport(transport)
// End add sync transport

// If we dont have any state initialized, and recieve a state message.
// Receive & Process CRDT_STATE
binaryMessageBus.on(CommsMessage.RES_CRDT_STATE, (value) => {
const { sender, data } = decodeCRDTState(value)
if (sender !== myProfile.userId) return
Expand All @@ -64,34 +67,21 @@ export function addSyncTransport(
stateIsSyncronized = true
})

// Answer to REQ_CRDT_STATE
binaryMessageBus.on(CommsMessage.REQ_CRDT_STATE, async (message, userId) => {
console.log(`Sending CRDT State to: ${userId}`)
transport.onmessage!(message)
binaryMessageBus.emit(CommsMessage.RES_CRDT_STATE, encodeCRDTState(userId, engineToCrdt(engine)))
})

function sleep(ms: number) {
return new Promise<void>((resolve) => {
let timer = 0
function sleepSystem(dt: number) {
timer += dt
if (timer * 1000 >= ms) {
engine.removeSystem(sleepSystem)
resolve()
}
}
engine.addSystem(sleepSystem)
})
}

const players = definePlayerHelper(engine)

let stateIsSyncronized = false

let requestCrdtStateWhenConnected = false
// Process CRDT messages here
binaryMessageBus.on(CommsMessage.CRDT, (value) => {
DEBUG_NETWORK_MESSAGES() &&
console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine)))
transport.onmessage!(value)
})

async function requestState() {
requestCrdtStateWhenConnected = false
let players = Array.from(engine.getEntitiesWith(PlayerIdentityData))
DEBUG_NETWORK_MESSAGES() && console.log(`Requesting state. Players connected: ${players.length - 1}`)
binaryMessageBus.emit(CommsMessage.REQ_CRDT_STATE, engineToCrdt(engine))
Expand All @@ -112,18 +102,23 @@ export function addSyncTransport(

players.onEnterScene((player) => {
DEBUG_NETWORK_MESSAGES() && console.log('[onEnterScene]', player.userId)
if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) {
if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) {
void requestState()
} else {
DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted')
requestCrdtStateWhenConnected = true
}
}
// if (player.userId === myProfile.userId && !requestCrdtStateWhenConnected) {
// if (RealmInfo.getOrNull(engine.RootEntity)?.isConnectedSceneRoom) {
// void requestState()
// } else {
// DEBUG_NETWORK_MESSAGES() && console.log('Waiting to be conneted')
// requestCrdtStateWhenConnected = true
// }
// }
})

// Asks for the REQ_CRDT_STATE when its connected to comms
RealmInfo.onChange(engine.RootEntity, (value) => {
if (value?.isConnectedSceneRoom && requestCrdtStateWhenConnected) {
if (!value?.isConnectedSceneRoom) {
stateIsSyncronized = false
}

if (value?.isConnectedSceneRoom && !stateIsSyncronized) {
void requestState()
}
})
Expand All @@ -133,21 +128,27 @@ export function addSyncTransport(
if (userId === myProfile.userId) {
DEBUG_NETWORK_MESSAGES() && console.log('Disconnected from comms')
stateIsSyncronized = false
requestCrdtStateWhenConnected = false
}
})

// Process CRDT messages here
binaryMessageBus.on(CommsMessage.CRDT, (value) => {
DEBUG_NETWORK_MESSAGES() &&
console.log(Array.from(serializeCrdtMessages('[NetworkMessage received]:', value, engine)))
transport.onmessage!(value)
})

function isStateSyncronized() {
return stateIsSyncronized
}

function sleep(ms: number) {
return new Promise<void>((resolve) => {
let timer = 0
function sleepSystem(dt: number) {
timer += dt
if (timer * 1000 >= ms) {
engine.removeSystem(sleepSystem)
resolve()
}
}
engine.addSystem(sleepSystem)
})
}

return {
...entityDefinitions,
myProfile,
Expand Down

0 comments on commit 0132588

Please sign in to comment.