Skip to content

Commit

Permalink
Rename stream to sc_demuxer
Browse files Browse the repository at this point in the history
For consistency with recorder and decoder, name the component which
demuxes a "demuxer".

And add the missing sc_ prefix.
  • Loading branch information
rom1v committed Feb 2, 2022
1 parent 4ee62ab commit 7dec225
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 91 deletions.
2 changes: 1 addition & 1 deletion app/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ src = [
'src/control_msg.c',
'src/controller.c',
'src/decoder.c',
'src/demuxer.c',
'src/device_msg.c',
'src/icon.c',
'src/file_pusher.c',
Expand All @@ -24,7 +25,6 @@ src = [
'src/scrcpy.c',
'src/screen.c',
'src/server.c',
'src/stream.c',
'src/video_buffer.c',
'src/util/acksync.c',
'src/util/file.c',
Expand Down
142 changes: 71 additions & 71 deletions app/src/stream.c → app/src/demuxer.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "stream.h"
#include "demuxer.h"

#include <assert.h>
#include <libavutil/time.h>
Expand All @@ -16,7 +16,7 @@
#define NO_PTS UINT64_C(-1)

static bool
stream_recv_packet(struct stream *stream, AVPacket *packet) {
sc_demuxer_recv_packet(struct sc_demuxer *demuxer, AVPacket *packet) {
// The video stream contains raw packets, without time information. When we
// record, we retrieve the timestamps separately, from a "meta" header
// added by the server before each raw packet.
Expand All @@ -30,7 +30,7 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) {
// It is followed by <packet_size> bytes containing the packet/frame.

uint8_t header[HEADER_SIZE];
ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE);
ssize_t r = net_recv_all(demuxer->socket, header, HEADER_SIZE);
if (r < HEADER_SIZE) {
return false;
}
Expand All @@ -45,7 +45,7 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) {
return false;
}

r = net_recv_all(stream->socket, packet->data, len);
r = net_recv_all(demuxer->socket, packet->data, len);
if (r < 0 || ((uint32_t) r) < len) {
av_packet_unref(packet);
return false;
Expand All @@ -57,9 +57,9 @@ stream_recv_packet(struct stream *stream, AVPacket *packet) {
}

static bool
push_packet_to_sinks(struct stream *stream, const AVPacket *packet) {
for (unsigned i = 0; i < stream->sink_count; ++i) {
struct sc_packet_sink *sink = stream->sinks[i];
push_packet_to_sinks(struct sc_demuxer *demuxer, const AVPacket *packet) {
for (unsigned i = 0; i < demuxer->sink_count; ++i) {
struct sc_packet_sink *sink = demuxer->sinks[i];
if (!sink->ops->push(sink, packet)) {
LOGE("Could not send config packet to sink %d", i);
return false;
Expand All @@ -70,12 +70,12 @@ push_packet_to_sinks(struct stream *stream, const AVPacket *packet) {
}

static bool
stream_parse(struct stream *stream, AVPacket *packet) {
sc_demuxer_parse(struct sc_demuxer *demuxer, AVPacket *packet) {
uint8_t *in_data = packet->data;
int in_len = packet->size;
uint8_t *out_data = NULL;
int out_len = 0;
int r = av_parser_parse2(stream->parser, stream->codec_ctx,
int r = av_parser_parse2(demuxer->parser, demuxer->codec_ctx,
&out_data, &out_len, in_data, in_len,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1);

Expand All @@ -84,13 +84,13 @@ stream_parse(struct stream *stream, AVPacket *packet) {
(void) r;
assert(out_len == in_len);

if (stream->parser->key_frame == 1) {
if (demuxer->parser->key_frame == 1) {
packet->flags |= AV_PKT_FLAG_KEY;
}

packet->dts = packet->pts;

bool ok = push_packet_to_sinks(stream, packet);
bool ok = push_packet_to_sinks(demuxer, packet);
if (!ok) {
LOGE("Could not process packet");
return false;
Expand All @@ -100,57 +100,57 @@ stream_parse(struct stream *stream, AVPacket *packet) {
}

static bool
stream_push_packet(struct stream *stream, AVPacket *packet) {
sc_demuxer_push_packet(struct sc_demuxer *demuxer, AVPacket *packet) {
bool is_config = packet->pts == AV_NOPTS_VALUE;

// A config packet must not be decoded immediately (it contains no
// frame); instead, it must be concatenated with the future data packet.
if (stream->pending || is_config) {
if (demuxer->pending || is_config) {
size_t offset;
if (stream->pending) {
offset = stream->pending->size;
if (av_grow_packet(stream->pending, packet->size)) {
if (demuxer->pending) {
offset = demuxer->pending->size;
if (av_grow_packet(demuxer->pending, packet->size)) {
LOG_OOM();
return false;
}
} else {
offset = 0;
stream->pending = av_packet_alloc();
if (!stream->pending) {
demuxer->pending = av_packet_alloc();
if (!demuxer->pending) {
LOG_OOM();
return false;
}
if (av_new_packet(stream->pending, packet->size)) {
if (av_new_packet(demuxer->pending, packet->size)) {
LOG_OOM();
av_packet_free(&stream->pending);
av_packet_free(&demuxer->pending);
return false;
}
}

memcpy(stream->pending->data + offset, packet->data, packet->size);
memcpy(demuxer->pending->data + offset, packet->data, packet->size);

if (!is_config) {
// prepare the concat packet to send to the decoder
stream->pending->pts = packet->pts;
stream->pending->dts = packet->dts;
stream->pending->flags = packet->flags;
packet = stream->pending;
demuxer->pending->pts = packet->pts;
demuxer->pending->dts = packet->dts;
demuxer->pending->flags = packet->flags;
packet = demuxer->pending;
}
}

if (is_config) {
// config packet
bool ok = push_packet_to_sinks(stream, packet);
bool ok = push_packet_to_sinks(demuxer, packet);
if (!ok) {
return false;
}
} else {
// data packet
bool ok = stream_parse(stream, packet);
bool ok = sc_demuxer_parse(demuxer, packet);

if (stream->pending) {
if (demuxer->pending) {
// the pending packet must be discarded (consumed or error)
av_packet_free(&stream->pending);
av_packet_free(&demuxer->pending);
}

if (!ok) {
Expand All @@ -161,25 +161,25 @@ stream_push_packet(struct stream *stream, AVPacket *packet) {
}

static void
stream_close_first_sinks(struct stream *stream, unsigned count) {
sc_demuxer_close_first_sinks(struct sc_demuxer *demuxer, unsigned count) {
while (count) {
struct sc_packet_sink *sink = stream->sinks[--count];
struct sc_packet_sink *sink = demuxer->sinks[--count];
sink->ops->close(sink);
}
}

static inline void
stream_close_sinks(struct stream *stream) {
stream_close_first_sinks(stream, stream->sink_count);
sc_demuxer_close_sinks(struct sc_demuxer *demuxer) {
sc_demuxer_close_first_sinks(demuxer, demuxer->sink_count);
}

static bool
stream_open_sinks(struct stream *stream, const AVCodec *codec) {
for (unsigned i = 0; i < stream->sink_count; ++i) {
struct sc_packet_sink *sink = stream->sinks[i];
sc_demuxer_open_sinks(struct sc_demuxer *demuxer, const AVCodec *codec) {
for (unsigned i = 0; i < demuxer->sink_count; ++i) {
struct sc_packet_sink *sink = demuxer->sinks[i];
if (!sink->ops->open(sink, codec)) {
LOGE("Could not open packet sink %d", i);
stream_close_first_sinks(stream, i);
sc_demuxer_close_first_sinks(demuxer, i);
return false;
}
}
Expand All @@ -188,35 +188,35 @@ stream_open_sinks(struct stream *stream, const AVCodec *codec) {
}

static int
run_stream(void *data) {
struct stream *stream = data;
run_demuxer(void *data) {
struct sc_demuxer *demuxer = data;

const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264);
if (!codec) {
LOGE("H.264 decoder not found");
goto end;
}

stream->codec_ctx = avcodec_alloc_context3(codec);
if (!stream->codec_ctx) {
demuxer->codec_ctx = avcodec_alloc_context3(codec);
if (!demuxer->codec_ctx) {
LOG_OOM();
goto end;
}

if (!stream_open_sinks(stream, codec)) {
LOGE("Could not open stream sinks");
if (!sc_demuxer_open_sinks(demuxer, codec)) {
LOGE("Could not open demuxer sinks");
goto finally_free_codec_ctx;
}

stream->parser = av_parser_init(AV_CODEC_ID_H264);
if (!stream->parser) {
demuxer->parser = av_parser_init(AV_CODEC_ID_H264);
if (!demuxer->parser) {
LOGE("Could not initialize parser");
goto finally_close_sinks;
}

// We must only pass complete frames to av_parser_parse2()!
// It's more complicated, but this allows to reduce the latency by 1 frame!
stream->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;
demuxer->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;

AVPacket *packet = av_packet_alloc();
if (!packet) {
Expand All @@ -225,13 +225,13 @@ run_stream(void *data) {
}

for (;;) {
bool ok = stream_recv_packet(stream, packet);
bool ok = sc_demuxer_recv_packet(demuxer, packet);
if (!ok) {
// end of stream
break;
}

ok = stream_push_packet(stream, packet);
ok = sc_demuxer_push_packet(demuxer, packet);
av_packet_unref(packet);
if (!ok) {
// cannot process packet (error already logged)
Expand All @@ -241,58 +241,58 @@ run_stream(void *data) {

LOGD("End of frames");

if (stream->pending) {
av_packet_free(&stream->pending);
if (demuxer->pending) {
av_packet_free(&demuxer->pending);
}

av_packet_free(&packet);
finally_close_parser:
av_parser_close(stream->parser);
av_parser_close(demuxer->parser);
finally_close_sinks:
stream_close_sinks(stream);
sc_demuxer_close_sinks(demuxer);
finally_free_codec_ctx:
avcodec_free_context(&stream->codec_ctx);
avcodec_free_context(&demuxer->codec_ctx);
end:
stream->cbs->on_eos(stream, stream->cbs_userdata);
demuxer->cbs->on_eos(demuxer, demuxer->cbs_userdata);

return 0;
}

void
stream_init(struct stream *stream, sc_socket socket,
const struct stream_callbacks *cbs, void *cbs_userdata) {
stream->socket = socket;
stream->pending = NULL;
stream->sink_count = 0;
sc_demuxer_init(struct sc_demuxer *demuxer, sc_socket socket,
const struct sc_demuxer_callbacks *cbs, void *cbs_userdata) {
demuxer->socket = socket;
demuxer->pending = NULL;
demuxer->sink_count = 0;

assert(cbs && cbs->on_eos);

stream->cbs = cbs;
stream->cbs_userdata = cbs_userdata;
demuxer->cbs = cbs;
demuxer->cbs_userdata = cbs_userdata;
}

void
stream_add_sink(struct stream *stream, struct sc_packet_sink *sink) {
assert(stream->sink_count < STREAM_MAX_SINKS);
sc_demuxer_add_sink(struct sc_demuxer *demuxer, struct sc_packet_sink *sink) {
assert(demuxer->sink_count < SC_DEMUXER_MAX_SINKS);
assert(sink);
assert(sink->ops);
stream->sinks[stream->sink_count++] = sink;
demuxer->sinks[demuxer->sink_count++] = sink;
}

bool
stream_start(struct stream *stream) {
LOGD("Starting stream thread");
sc_demuxer_start(struct sc_demuxer *demuxer) {
LOGD("Starting demuxer thread");

bool ok =
sc_thread_create(&stream->thread, run_stream, "scrcpy-stream", stream);
bool ok = sc_thread_create(&demuxer->thread, run_demuxer, "scrcpy-demuxer",
demuxer);
if (!ok) {
LOGC("Could not start stream thread");
LOGC("Could not start demuxer thread");
return false;
}
return true;
}

void
stream_join(struct stream *stream) {
sc_thread_join(&stream->thread, NULL);
sc_demuxer_join(struct sc_demuxer *demuxer) {
sc_thread_join(&demuxer->thread, NULL);
}
51 changes: 51 additions & 0 deletions app/src/demuxer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#ifndef SC_DEMUXER_H
#define SC_DEMUXER_H

#include "common.h"

#include <stdbool.h>
#include <stdint.h>
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>

#include "trait/packet_sink.h"
#include "util/net.h"
#include "util/thread.h"

#define SC_DEMUXER_MAX_SINKS 2

struct sc_demuxer {
sc_socket socket;
sc_thread thread;

struct sc_packet_sink *sinks[SC_DEMUXER_MAX_SINKS];
unsigned sink_count;

AVCodecContext *codec_ctx;
AVCodecParserContext *parser;
// successive packets may need to be concatenated, until a non-config
// packet is available
AVPacket *pending;

const struct sc_demuxer_callbacks *cbs;
void *cbs_userdata;
};

struct sc_demuxer_callbacks {
void (*on_eos)(struct sc_demuxer *demuxer, void *userdata);
};

void
sc_demuxer_init(struct sc_demuxer *demuxer, sc_socket socket,
const struct sc_demuxer_callbacks *cbs, void *cbs_userdata);

void
sc_demuxer_add_sink(struct sc_demuxer *demuxer, struct sc_packet_sink *sink);

bool
sc_demuxer_start(struct sc_demuxer *demuxer);

void
sc_demuxer_join(struct sc_demuxer *demuxer);

#endif
Loading

0 comments on commit 7dec225

Please sign in to comment.