Skip to content

Commit

Permalink
client: skip unsupported tracks (#214)
Browse files Browse the repository at this point in the history
* Use supported tracks where possible

Fix debug message

* add tests

---------

Co-authored-by: ingalls <[email protected]>
  • Loading branch information
aler9 and ingalls authored Jan 21, 2025
1 parent 62e427b commit b08e07e
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 6 deletions.
17 changes: 11 additions & 6 deletions client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,24 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
p.onDecodeError(err)
})

var supportedTracks []*mpegts.Track

for _, track := range p.reader.Tracks() {
switch track.Codec.(type) {
case *mpegts.CodecH264, *mpegts.CodecMPEG4Audio:
default:
return fmt.Errorf("unsupported codec: %T", track.Codec)
supportedTracks = append(supportedTracks, track)
}
}

leadingTrackID := mpegtsPickLeadingTrack(p.reader.Tracks())
if len(supportedTracks) == 0 {
return fmt.Errorf("no supported tracks found")
}

leadingTrackID := mpegtsPickLeadingTrack(supportedTracks)

tracks := make([]*Track, len(p.reader.Tracks()))
tracks := make([]*Track, len(supportedTracks))

for i, mpegtsTrack := range p.reader.Tracks() {
for i, mpegtsTrack := range supportedTracks {
tracks[i] = &Track{
Codec: codecs.FromMPEGTS(mpegtsTrack.Codec),
ClockRate: 90000,
Expand All @@ -179,7 +184,7 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
return fmt.Errorf("terminated")
}

for i, mpegtsTrack := range p.reader.Tracks() {
for i, mpegtsTrack := range supportedTracks {
track := p.clientStreamTracks[i]
isLeadingTrack := (i == leadingTrackID)
var trackProc *clientTrackProcessorMPEGTS
Expand Down
134 changes: 134 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gohlslib

import (
"bytes"
"context"
"crypto/tls"
"io"
Expand All @@ -11,6 +12,7 @@ import (
"testing"
"time"

"github.com/asticode/go-astits"
"github.com/bluenviron/gohlslib/v2/pkg/codecs"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
Expand Down Expand Up @@ -1052,6 +1054,138 @@ func TestClient(t *testing.T) {
}
}

func TestClientUnsupportedTracks(t *testing.T) {
httpServ := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && r.URL.Path == "/stream.m3u8" {
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:2\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment1.ts\n"))
} else if r.Method == http.MethodGet && r.URL.Path == "/segment1.ts" {
w.Header().Set("Content-Type", `video/MP2T`)

var buf bytes.Buffer
mux := astits.NewMuxer(context.Background(), &buf)

err := mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 120,
StreamType: astits.StreamTypeH264Video,
})
require.NoError(t, err)

err = mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 121,
StreamType: astits.StreamTypeDIRACVideo,
})
require.NoError(t, err)

mux.SetPCRPID(120)

data, err := h264.AnnexBMarshal([][]byte{
{7}, // SPS
{8}, // PPS
{5}, // IDR
})
require.NoError(t, err)

_, err = mux.WriteData(&astits.MuxerData{
PID: 120,
AdaptationField: &astits.PacketAdaptationField{
RandomAccessIndicator: true,
},
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: 0},
},
StreamID: 222,
},
Data: data,
},
})
require.NoError(t, err)

_, err = mux.WriteData(&astits.MuxerData{
PID: 121,
AdaptationField: &astits.PacketAdaptationField{
RandomAccessIndicator: true,
},
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: 0},
},
StreamID: 222,
},
Data: []byte{1, 2, 3, 4},
},
})
require.NoError(t, err)

w.Write(buf.Bytes())
}
}),
}

ln, err := net.Listen("tcp", "localhost:5780")
require.NoError(t, err)

go httpServ.Serve(ln)
defer httpServ.Shutdown(context.Background())

tr := &http.Transport{}
defer tr.CloseIdleConnections()

recv := make(chan struct{})

var c *Client
c = &Client{
URI: "http://localhost:5780/stream.m3u8",
HTTPClient: &http.Client{Transport: tr},
OnTracks: func(tracks []*Track) error {
require.Equal(t, []*Track{{
Codec: &codecs.H264{},
ClockRate: 90000,
}}, tracks)

c.OnDataH26x(tracks[0], func(_, _ int64, au [][]byte) {
select {
case <-recv:
return
default:
}
require.Equal(t, [][]byte{{7}, {8}, {5}}, au)
close(recv)
})
return nil
},
}
require.NoError(t, err)

err = c.Start()
require.NoError(t, err)

<-recv

err = <-c.Wait()
require.EqualError(t, err, "next segment not found or not ready yet")

c.Close()
}

func TestClientErrorInvalidSequenceID(t *testing.T) {
first := true

Expand Down

0 comments on commit b08e07e

Please sign in to comment.