Skip to content

Commit

Permalink
Add record uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
Kostya Bats committed Oct 25, 2024
1 parent 93fa85f commit 23f7ac9
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 14 deletions.
33 changes: 30 additions & 3 deletions packages/grabber/capture_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ class GrabberCaptureClient {
this.peerName = peerName;
this.target = new EventTarget();
this.target.dispatchEvent(new CustomEvent('hi', {}));
this.signallingUrl = (signallingUrl ?? "");

const socketPath = (signallingUrl ?? "") + "/ws/peers/" + peerName;
const socketPath = this.signallingUrl + "/ws/peers/" + peerName;

this.socket = new GrabberSocket(socketPath);
this.socket.on("connect", async () => {
Expand All @@ -29,20 +30,35 @@ class GrabberCaptureClient {
const record_stop_handle = async function ({recordStop: {recordId}}) {
this.target.dispatchEvent(new CustomEvent('record_stop', {detail: {recordId}}));
}
const record_upload_handle = async function ({recordUpload: {recordId}}) {
this.target.dispatchEvent(new CustomEvent('record_upload', {detail: {recordId}}));
}
const players_disconnect_handle = async function ({event}) {
this.target.dispatchEvent(new CustomEvent("players_disconnect", {detail: {recordId: "recordId", timeout: 10}}));
this.target.dispatchEvent(new CustomEvent("players_disconnect", {
detail: {
recordId: "recordId",
timeout: 10
}
}));
}

this.socket.on("init_peer", init_peer_handle.bind(this));
this.socket.on("offer", offer_handle.bind(this));
this.socket.on("player_ice", player_ice_handle.bind(this));
this.socket.on("record_start", record_start_handle.bind(this));
this.socket.on("record_stop", record_stop_handle.bind(this));
this.socket.on("record_upload", record_upload_handle.bind(this));
this.socket.on("players_disconnect", players_disconnect_handle.bind(this));
}

send_ping(connectionsCount, streamTypes, currentRecordId) {
this.socket.emit("ping", {ping: {connectionsCount: connectionsCount, streamTypes: streamTypes, currentRecordId: currentRecordId}});
this.socket.emit("ping", {
ping: {
connectionsCount: connectionsCount,
streamTypes: streamTypes,
currentRecordId: currentRecordId
}
});
}

send_offer_answer(playerId, answer) {
Expand All @@ -52,6 +68,17 @@ class GrabberCaptureClient {
send_grabber_ice(peerId, candidate) {
this.socket.emit("grabber_ice", {ice: {peerId, candidate}});
}

record_upload(fileName, fileBlob) {
const formData = new FormData()
formData.append('file', fileBlob, fileName)

return fetch(`${this.signallingUrl}/api/agent/${this.peerName}/record_upload`,
{
method: "POST",
body: formData,
});
}
}

module.exports.GrabberCaptureClient = GrabberCaptureClient;
19 changes: 19 additions & 0 deletions packages/grabber/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ function runGrabbing(window) {
window.webContents.send("record_stop", recordId);
});

client.target.addEventListener("record_upload", async ({detail: {recordId}}) => {
for (const recordType of ["desktop", "webcam"]) {
const fileName = `${recordId}_${recordType}.webm`;
fs.readFile(fileName, function (err, data) {
if (!err) {
const fileBlob = new Blob([data], {type: 'video/webm'})
const promise = client.record_upload(fileName, fileBlob)
promise.then(r => {
r.text().then(text => {
console.log(`Reaction upload ${fileName} to server: ${text}`)
});
}).catch(e => console.error(`Reaction uploading to server error: ${e}`));
} else {
console.error(`Reaction upload ${fileName} error: ${err}`);
}
});
}
});

client.target.addEventListener("players_disconnect", async () => {
window.webContents.send("player_disconnect");
});
Expand Down
10 changes: 10 additions & 0 deletions packages/grabber/scripts/record_uploader.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
@echo off
chcp 65001 > nul

set "directory=%~dp0"
set peername=02

for %%a in ("%directory%*.webm") do (
echo Upload %%a
C:\Windows\System32\curl.exe "https://grabber.kbats.ru/api/agent/%peername%/record_upload" -X POST -F file=@"%%a" --fail && move "%%a" "%%a.backup"
)
11 changes: 11 additions & 0 deletions packages/relay/asset/grabber_admin_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ class GrabberAdminClient {
})
}

recordUpload(peerName, recordId) {
return fetch(this._url + "/api/admin/record_upload", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": "Basic " + btoa("admin:" + this._credential),
},
body: JSON.stringify({ peerName, recordId }),
})
}

playersDisconnect(peerName) {
return fetch(this._url + `/api/admin/players_disconnect/${peerName}`, {
method: "POST",
Expand Down
11 changes: 10 additions & 1 deletion packages/relay/asset/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@
const recordButton = document.createElement('button');
recordButton.innerText = "Record";
recordButton.onclick = () => {
adminClient.recordStart(peer.name, peer.name + "_" + Date.now(), 30 * 1000);
let recordId = prompt("Please enter record id", peer.name + "_test");
adminClient.recordStart(peer.name, recordId, 30 * 1000);
}
pStatus.appendChild(recordButton);

Expand All @@ -251,6 +252,14 @@
}
pStatus.appendChild(recordStopButton);
}

const recordUploadButton = document.createElement('button');
recordUploadButton.innerText = "Record upload";
recordUploadButton.onclick = () => {
let recordId = prompt("Please enter record id", peer.name + "_test");
adminClient.recordUpload(peer.name, recordId);
}
pStatus.appendChild(recordUploadButton);
}
} else {
connectStreamButton("desktop", true);
Expand Down
22 changes: 14 additions & 8 deletions packages/relay/internal/api/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
GrabberMessageEventPlayerIce = GrabberMessageEvent("player_ice")
GrabberMessageEventRecordStart = GrabberMessageEvent("record_start")
GrabberMessageEventRecordStop = GrabberMessageEvent("record_stop")
GrabberMessageEventRecordUpload = GrabberMessageEvent("record_upload")
GrabberMessageEventPlayersDisconnect = GrabberMessageEvent("players_disconnect")
)

Expand All @@ -47,14 +48,15 @@ type PlayerAuthMessage struct {
}

type GrabberMessage struct {
Event GrabberMessageEvent `json:"event"`
Ping *PeerStatus `json:"ping"`
InitPeer *GrabberInitPeerMessage `json:"initPeer"`
Offer *OfferMessage `json:"offer"`
OfferAnswer *OfferAnswerMessage `json:"offerAnswer"`
Ice *IceMessage `json:"ice"`
RecordStart *RecordStartMessage `json:"recordStart"`
RecordStop *RecordStopMessage `json:"recordStop"`
Event GrabberMessageEvent `json:"event"`
Ping *PeerStatus `json:"ping"`
InitPeer *GrabberInitPeerMessage `json:"initPeer"`
Offer *OfferMessage `json:"offer"`
OfferAnswer *OfferAnswerMessage `json:"offerAnswer"`
Ice *IceMessage `json:"ice"`
RecordStart *RecordStartMessage `json:"recordStart"`
RecordStop *RecordStopMessage `json:"recordStop"`
RecordUpload *RecordUploadMessage `json:"recordUpload"`
//PlayerAuth *PlayerAuthMessage `json:"playerAuth"`
//PeersStatus []Peer `json:"peersStatus"`
//ParticipantsStatus []Peer `json:"participantsStatus"`
Expand Down Expand Up @@ -96,3 +98,7 @@ type RecordStartMessage struct {
type RecordStopMessage struct {
RecordId string `json:"recordId"`
}

type RecordUploadMessage struct {
RecordId string `json:"recordId"`
}
28 changes: 28 additions & 0 deletions packages/relay/internal/signalling/api_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ func (s *Server) setupAdminApi() {
return c.Status(fiber.StatusOK).SendString("Ok")
})

router.Post("/record_upload", func(c *fiber.Ctx) error {
var req uploadRecordRequest
if err := c.BodyParser(&req); err != nil {
return c.Status(fiber.StatusBadRequest).SendString("Bad Request")
}

var socket sockets.Socket = nil
if peer, ok := s.storage.getPeerByName(req.PeerName); ok {
socket = s.grabberSockets.GetSocket(peer.SocketId)
}
if socket == nil {
return c.Status(fiber.StatusNotFound).SendString("Peer not found")
}

err := socket.WriteJSON(api.GrabberMessage{
Event: api.GrabberMessageEventRecordUpload,
RecordUpload: &api.RecordUploadMessage{RecordId: req.RecordId}})
if err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("Failed to send stop recoding request")
}
return c.Status(fiber.StatusOK).SendString("Ok")
})

router.Post("/players_disconnect/:peerName", func(c *fiber.Ctx) error {
peerName := c.Params("peerName")

Expand Down Expand Up @@ -100,3 +123,8 @@ type stopRecordRequest struct {
PeerName string `json:"peerName"`
RecordId string `json:"recordId"`
}

type uploadRecordRequest struct {
PeerName string `json:"peerName"`
RecordId string `json:"recordId"`
}
2 changes: 1 addition & 1 deletion packages/relay/internal/signalling/api_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *Server) setupAgentApi() {
return c.Status(fiber.StatusBadRequest).SendString("Failed to upload file")
}

return nil
return c.Status(fiber.StatusOK).SendString("OK")
})
})
}
1 change: 0 additions & 1 deletion packages/relay/internal/signalling/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ func (s *Server) processGrabberMessage(id sockets.SocketID, m api.GrabberMessage
grabberId := string(id)
switch m.Event {
case api.GrabberMessageEventPing:
log.Printf("grabber ping received %v", m.Ping)
if m.Ping == nil {
return nil
}
Expand Down

0 comments on commit 23f7ac9

Please sign in to comment.