Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime Audio Streaming #45

Merged
merged 72 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
8069b9f
LiveStream skel
kingster May 13, 2020
5355fd6
livestream publish to librtmp endpoint
kingster May 15, 2020
be3c596
segfault trace handler
kingster May 15, 2020
1e7f3f0
commit livestream
kingster May 17, 2020
acd1a25
timestamp tuning
Dec 3, 2020
6faed89
timestamp tuning
Dec 3, 2020
522a2df
livestream + ondemand prototype
Mar 1, 2021
f36f826
minor
Mar 1, 2021
5b83ec7
bug fix
Mar 1, 2021
b2979f2
draft implementaion POP
Mar 30, 2021
b6dcea7
POP implementaion
Apr 4, 2021
06dfc06
formatting
Apr 6, 2021
9717083
Merge branch 'master' into onDemand
shushantsharan Apr 7, 2021
dfb006b
refactoring
Apr 8, 2021
eab2ef2
Merge branch 'onDemand' of https://github.com/voiceip/oreka into onDe…
Apr 8, 2021
3b92ff1
segregating livestreaming server
Apr 12, 2021
7948ea9
typo
Apr 12, 2021
532bb93
New API+ review comments+ bugFixes
Apr 18, 2021
bb4522f
review comments+build file update
Apr 19, 2021
2227c48
fix build failure
Apr 20, 2021
e576f75
dummy commit
Apr 20, 2021
23b584f
dummy commit
Apr 20, 2021
14c5f39
Update Dockerfile.orkaudio.build
kingster Apr 23, 2021
ea5f75b
remove srs_librtmp code, link instead
kingster Apr 23, 2021
e3b59a9
linting change [ci skip]
kingster Apr 23, 2021
f3c691d
cleanup LiveStreamServer
kingster Apr 23, 2021
4dff79b
Fix server segfault with empty maps
kingster Apr 23, 2021
1af4af2
Pass by reference VoIpSessions::getByIpAndPort
kingster Apr 23, 2021
11e1e62
Move filter config within filter space.
kingster Apr 24, 2021
3e98e67
bufferQueue Maxsize Handling
Apr 25, 2021
51de952
refactoring+formatting
Apr 26, 2021
799d4af
missing colon+formatting
Apr 26, 2021
26ef897
Merge branch 'master' into onDemand
kingster May 4, 2021
8cef3f2
Merge branch 'master' into onDemand
kingster May 4, 2021
eb8177c
Update config-linux-template.xml
kingster May 4, 2021
5d0aac7
Trigger Github Action on tag push
kingster May 4, 2021
b942dfe
Update LiveStreamSession.cpp
kingster May 4, 2021
60a300c
Update LiveStreamFilter.cpp
kingster May 4, 2021
f97c268
handle empty call id + stream all call config
May 5, 2021
dfc5d68
formatting
May 5, 2021
1e08471
Improve logging
kingster May 5, 2021
33253ae
Refactor rtmpServerEndpoint
kingster May 5, 2021
075d2d3
Update LiveStreamFilter.cpp
kingster May 5, 2021
0827cd3
Bump boost to boost1.70 in build-docker
kingster May 5, 2021
6bd5cb3
Use boost:stacktrace for sig_handler
kingster May 5, 2021
a93c5ad
Merge branch 'backtrace' into onDemand
kingster May 5, 2021
83acdfe
improve logging
kingster May 5, 2021
a8aa272
fix memory issues
kingster May 6, 2021
74f00a5
Merge branch 'master' into onDemand
kingster May 26, 2021
4e0874a
Add Exception Handlers
kingster May 27, 2021
a77cd59
Create RingBuffer.h
kingster May 27, 2021
bfce7e1
fix auto in lamda not available in c++11
kingster May 27, 2021
acdedc9
Remove set_error_handler
kingster May 27, 2021
1c77ae4
CPPHTTPLIB_USE_POLL
kingster May 27, 2021
c99c707
fix channel sync issue + changed threshold to double
May 27, 2021
59eaad5
rename variable
May 27, 2021
26e9111
Refactor Config + silentChannelBuffer
kingster May 28, 2021
e81e01a
Merge branch 'master' into onDemand
kingster May 28, 2021
aefd93a
free only when != null
kingster May 28, 2021
00bf94b
update readme [skip ci]
kingster May 28, 2021
c6536c2
lint changes [skip ci]
kingster May 28, 2021
02d23fd
Fix configure.in cxx check order
kingster May 28, 2021
aa1e08f
ls: switch to ring buffer
kingster Jun 7, 2021
36a68f3
Remove pjsua from dockerfile and move to build-pjsua.sh
kingster Jun 7, 2021
5104bcc
don't drop 2nd channel when buffer is empty.
kingster Jun 7, 2021
f51e127
Merge branch 'master' into onDemand
kingster Jan 4, 2022
9ec4942
Merge branch 'master' into onDemand
kingster Apr 22, 2022
fd5f03a
Merge branch 'master' into onDemand
kingster May 18, 2023
33acd30
Update LiveStreamServer.cpp
kingster May 23, 2023
74b73c9
pin httplib to release
kingster May 23, 2023
a4b4a0d
Update LiveStreamServer.cpp
kingster May 23, 2023
57a118f
Update LiveStreamServer.cpp
kingster May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions orkaudio/OrkAudio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@
#include <thread>
#include "apr_signal.h"

#ifdef linux
#include <execinfo.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#endif

static volatile bool serviceStop = false;

void StopHandler()
Expand Down Expand Up @@ -401,27 +394,10 @@ void MainThread()
OrkLogManager::Instance()->Shutdown();
}

void handler(int sig) {
#ifdef linux

void *array[10];
size_t size;

// get void*'s for all entries on the stack
size = backtrace(array, 10);

// print out all the frames to stderr
fprintf(stderr, "Error: signal %d:\n", sig);
backtrace_symbols_fd(array, size, STDERR_FILENO);
#endif
exit(1);
}
kingster marked this conversation as resolved.
Show resolved Hide resolved


int main(int argc, char* argv[])
{
signal(SIGSEGV, handler); // install fatal error handler

OrkAprSingleton::Initialize();

// the "service name" reported on the tape messages uses CONFIG.m_serviceName
Expand Down
5 changes: 5 additions & 0 deletions orkaudio/audiocaptureplugins/voip/VoIpSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4325,6 +4325,11 @@ void VoIpSessions::ClearLocalPartyMap()
m_localPartyMap.clear();
}

std::map<unsigned long long, VoIpSessionRef> VoIpSessions::getByIpAndPort()
{
return m_byIpAndPort;
}

void VoIpSessions::StopAll()
{
time_t forceExpiryTime = time(NULL) + 2*DLLCONFIG.m_rtpSessionOnHoldTimeOutSec;
Expand Down
2 changes: 1 addition & 1 deletion orkaudio/audiocaptureplugins/voip/VoIpSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class VoIpSessions: public OrkSingleton<VoIpSessions>
void TaggingSipTransferCalls(VoIpSessionRef& session);
void CopyMetadataToNewSession(VoIpSessionRef& oldSession, VoIpSessionRef& newSession);
void ClearLocalPartyMap();
std::map<unsigned long long, VoIpSessionRef> getByIpAndPort();

private:
void CraftMediaAddress(CStdString& mediaAddress, struct in_addr ipAddress, unsigned short udpPort);
Expand All @@ -270,7 +271,6 @@ class VoIpSessions: public OrkSingleton<VoIpSessions>
bool SkinnyFindMostLikelySessionForRtp(RtpPacketInfoRef& rtpPacket, VoIpEndpointInfoRef&);
bool SkinnyFindMostLikelySessionForRtpBehindNat(RtpPacketInfoRef& rtpPacket);
void TrySessionCallPickUp(CStdString replacesCallId, bool& result);

std::map<unsigned long long, VoIpSessionRef> m_byIpAndPort;
std::map<CStdString, VoIpSessionRef> m_byCallId;
std::map<unsigned long long, VoIpEndpointInfoRef> m_endpoints;
Expand Down
6 changes: 4 additions & 2 deletions orkaudio/config-linux-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<TrackerHostname>localhost</TrackerHostname>
<TrackerTcpPort>8080</TrackerTcpPort>

<CapturePortFilters>LiveMonitoring</CapturePortFilters>
<CapturePortFilters>LiveStreamFilter</CapturePortFilters>
<TapeProcessors>BatchProcessing, Reporting</TapeProcessors>

<BatchProcessingEnhancePriority>true</BatchProcessingEnhancePriority>
Expand Down Expand Up @@ -64,7 +64,7 @@

<!-- Use this if you want to force capture from a given list of devices. -->
<!-- All available devices are listed in orkaudio.log when the service is starting -->
<!--<Devices>eth1, eth2</Devices>-->
<Devices>enp0s3</Devices>

<!--<PcapFilter>net 217.14.0.0/16 or host 10.0.0.1</PcapFilter>-->

Expand Down Expand Up @@ -108,4 +108,6 @@
<!-- End of Available Configurations for Mitel Communications Platform -->

</VoIpPlugin>
<RTMPServerEndPoint>172.16.176.65</RTMPServerEndPoint>
<RTMPServerPort>1935</RTMPServerPort>
</config>
2 changes: 1 addition & 1 deletion orkaudio/configure.in
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ AC_PREFIX_DEFAULT(/usr)

AC_SUBST(speex_lib,$speex_lib)

AC_OUTPUT(Makefile audiocaptureplugins/Makefile audiocaptureplugins/generator/Makefile audiocaptureplugins/voip/Makefile filters/Makefile filters/rtpmixer/Makefile filters/silkcodec/Makefile filters/g729codec/Makefile)
AC_OUTPUT(Makefile audiocaptureplugins/Makefile audiocaptureplugins/generator/Makefile audiocaptureplugins/voip/Makefile filters/Makefile filters/rtpmixer/Makefile filters/silkcodec/Makefile filters/g729codec/Makefile filters/LiveStream/Makefile)

echo ""
echo "========= Configuration ==========="
Expand Down
250 changes: 250 additions & 0 deletions orkaudio/filters/LiveStream/LiveStreamFilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Oreka -- A media capture and retrieval platform
*
*/
#pragma warning(disable : 4786) // disables truncated symbols in browse-info warning

#define _WINSOCKAPI_ // prevents the inclusion of winsock.h

#include "LiveStreamFilter.h"

static log4cxx::LoggerPtr s_log = log4cxx::Logger::getLogger("plugin.livestream");

LiveStreamFilter::LiveStreamFilter()
{
LOG4CXX_INFO(s_log, "LiveStream New Instance Created");
}

LiveStreamFilter::~LiveStreamFilter()
{
LOG4CXX_INFO(s_log, "LiveStream Instance Destroying");
}

FilterRef LiveStreamFilter::Instanciate()
{
FilterRef Filter(new LiveStreamFilter());
return Filter;
}

void LiveStreamFilter::AudioChunkIn(AudioChunkRef &inputAudioChunk)
{
m_outputAudioChunk = inputAudioChunk;
// int16_t pcmdata[BUFFER_SAMPLES];
CStdString logMsg;

if (inputAudioChunk.get() == NULL)
{
return;
}

if (inputAudioChunk->GetNumSamples() == 0)
{
return;
}

AudioChunkDetails outputDetails = *inputAudioChunk->GetDetails();
char *inputBuffer = (char *)inputAudioChunk->m_pBuffer;
int size = outputDetails.m_numBytes * 2;

//logMsg.Format("LiveStreamFilter AudioChunkIn Size: %d, Encoding: %s , RTP payload type: %s",size ,toString(outputDetails.m_encoding) , RtpPayloadTypeEnumToString(outputDetails.m_rtpPayloadType));
//LOG4CXX_INFO(s_log, logMsg);

// @param sound_format Format of SoundData. The following values are defined:
// 0 = Linear PCM, platform endian
// 1 = ADPCM
// 2 = MP3
// 3 = Linear PCM, little endian
// 4 = Nellymoser 16 kHz mono
// 5 = Nellymoser 8 kHz mono
// 6 = Nellymoser
// 7 = G.711 A-law logarithmic PCM
// 8 = G.711 mu-law logarithmic PCM
// 9 = reserved
// 10 = AAC
// 11 = Speex
// 14 = MP3 8 kHz
// 15 = Device-specific sound
// Formats 7, 8, 14, and 15 are reserved.
// AAC is supported in Flash Player 9,0,115,0 and higher.
// Speex is supported in Flash Player 10 and higher.

char sound_format = 9;
if (outputDetails.m_rtpPayloadType == pt_PCMU)
sound_format = 8;
else if (outputDetails.m_rtpPayloadType == pt_PCMA)
sound_format = 7;

// @param sound_rate Sampling rate. The following values are defined:
// 0 = 5.5 kHz
// 1 = 11 kHz
// 2 = 22 kHz
// 3 = 44 kHz
char sound_rate = 3;

// @param sound_size Size of each audio sample. This parameter only pertains to
// uncompressed formats. Compressed formats always decode
// to 16 bits internally.
// 0 = 8-bit samples
// 1 = 16-bit samples
char sound_size = 1;

// @param sound_type Mono or stereo sound
// 0 = Mono sound
// 1 = Stereo sound
//char sound_type = outputDetails.m_channel == 0 ? 0 : 1;
char sound_type = 1;

timestamp += 160; //Timestamp increment = clock frequency/frame rate
//160 byte payload of G.711 has a packetization interval of 20 ms
//For 1 second, there will be 1000ms / 20ms = 50 frames
//Audio RTP packet timestamp incremental value = 8kHz / 50 = 8000Hz / 50 = 160

if (isFirstPacket)
{
headChannel = outputDetails.m_channel;
isFirstPacket = false;
}

if (outputDetails.m_channel == headChannel && status)
{
bufferQueue.push(inputBuffer);
shushantsharan marked this conversation as resolved.
Show resolved Hide resolved
}

if (rtmp != NULL && status)
{
if (outputDetails.m_channel != headChannel && bufferQueue.size() > 0)
{
char *outputBuffer = (char *)malloc(size);
char *tempBuffer = bufferQueue.front();
bufferQueue.pop();

for (int i = 0; i < 160; ++i)
{
outputBuffer[i * 2] = tempBuffer[i];
outputBuffer[i * 2 + 1] = inputBuffer[i];
}

if (srs_audio_write_raw_frame(rtmp, sound_format, sound_rate, sound_size, sound_type, outputBuffer, size, timestamp) != 0)
{
srs_human_trace("send audio raw data failed.");
return;
}
CStdString logMsg;
logMsg.Format("LiveStreamFilter::sent packet: type=%s, time=%d, size=%d, codec=%d, rate=%d, sample=%d, channel=%d nativecallId=%s",
srs_human_flv_tag_type2string(SRS_RTMP_TYPE_AUDIO), timestamp, size, sound_format, sound_rate, sound_size, sound_type, m_callId);
LOG4CXX_DEBUG(s_log, logMsg);
}
}
}

void LiveStreamFilter::AudioChunkOut(AudioChunkRef &chunk)
{
chunk = m_outputAudioChunk;
}

AudioEncodingEnum LiveStreamFilter::GetInputAudioEncoding()
{
return UnknownAudio;
}

AudioEncodingEnum LiveStreamFilter::GetOutputAudioEncoding()
{
return UnknownAudio;
}

CStdString LiveStreamFilter::GetName()
{
return "LiveStreamFilter";
}

bool LiveStreamFilter::SupportsInputRtpPayloadType(int rtpPayloadType)
{
//so that BatchProcessing doesn't pick this filter.
return rtpPayloadType == pt_Unknown;
}

void LiveStreamFilter::CaptureEventIn(CaptureEventRef &event)
{
//Start RTP Stream Open
auto key = event->EventTypeToString(event->m_type);
LOG4CXX_INFO(s_log, "LiveStream CaptureEventIn " + key + " : " + event->m_value);
if (event->m_type == CaptureEvent::EventTypeEnum::EtCallId)
{
m_callId = event->m_value;
}

if (event->m_type == CaptureEvent::EventTypeEnum::EtKeyValue && event->m_key == "LiveStream" && event->m_value == "start")
{
std::string url = "rtmp://" + CONFIG.m_rtmpServerEndpoint + ":" + CONFIG.m_rtmpServerPort + "/live/" + m_callId;
shushantsharan marked this conversation as resolved.
Show resolved Hide resolved

LOG4CXX_INFO(s_log, "LiveStream URL : " + url);
//open rstp stream
rtmp = srs_rtmp_create(url.c_str());
if (srs_rtmp_handshake(rtmp) != 0)
{
srs_human_trace("simple handshake failed.");
return;
}
srs_human_trace("simple handshake success");

if (srs_rtmp_connect_app(rtmp) != 0)
{
srs_human_trace("connect vhost/app failed.");
return;
}
srs_human_trace("connect vhost/app success");

if (srs_rtmp_publish_stream(rtmp) != 0)
{
srs_human_trace("publish stream failed.");
return;
}
srs_human_trace("publish stream success");

status = true;
LiveStreamSessionsSingleton::instance()->AddToStreamCallList(m_callId);
}

if (event->m_type == CaptureEvent::EventTypeEnum::EtStop)
{
//close rstp stream
status = false;
LiveStreamSessionsSingleton::instance()->RemoveFromStreamCallList(m_callId);
if (rtmp != NULL)
{
srs_human_trace("stream detroying...");
srs_rtmp_destroy(rtmp);
}
}

if (event->m_type == CaptureEvent::EventTypeEnum::EtKeyValue && event->m_key == "LiveStream" && event->m_value == "end")
{
LiveStreamSessionsSingleton::instance()->RemoveFromStreamCallList(m_callId);
status = false;
}
}

void LiveStreamFilter::CaptureEventOut(CaptureEventRef &event)
{
//LOG4CXX_INFO(s_log, "LiveStream CaptureEventOut " + toString(event.get()));
}

void LiveStreamFilter::SetSessionInfo(CStdString &trackingId)
{
LOG4CXX_INFO(s_log, "LiveStream SetSessionInfo " + trackingId);
}

// =================================================================

extern "C"
{
DLL_EXPORT void __CDECL__ OrkInitialize()
{
LOG4CXX_INFO(s_log, "LiveStream filter starting");
FilterRef filter(new LiveStreamFilter());
FilterRegistry::instance()->RegisterFilter(filter);
LOG4CXX_INFO(s_log, "LiveStream filter initialized");
LiveStreamServer *liveStreamServer = new LiveStreamServer(CONFIG.m_liveStreamingServerPort);
liveStreamServer->Run();
}
}
Loading