diff --git a/Dockerfile b/Dockerfile index 92e539d..0de5d81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM pedromol/catoso-base +FROM pedromol/catoso:base COPY cmd /go/src/app/cmd COPY pkg /go/src/app/pkg diff --git a/go.work.sum b/go.work.sum new file mode 100644 index 0000000..ba5e7ee --- /dev/null +++ b/go.work.sum @@ -0,0 +1 @@ +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= diff --git a/pkg/catoso/catoso.go b/pkg/catoso/catoso.go index 36839e1..96a649a 100644 --- a/pkg/catoso/catoso.go +++ b/pkg/catoso/catoso.go @@ -13,6 +13,7 @@ import ( "github.com/pedromol/catoso/pkg/camera" "github.com/pedromol/catoso/pkg/config" "github.com/pedromol/catoso/pkg/encoder" + "github.com/pedromol/catoso/pkg/health" "github.com/pedromol/catoso/pkg/storage" "github.com/pedromol/catoso/pkg/telegram" "github.com/pedromol/catoso/pkg/vision" @@ -45,7 +46,7 @@ func NewCatoso(cfg *config.Config) (*Catoso, error) { cam := camera.NewCamera(cfg.OnvifIP, cfg.OnvifPort) - enc := encoder.NewEncoder(cfg.InputImage, cfg.InputFps) + enc := encoder.NewEncoder(cfg.InputImage, cfg.InputFps, cfg.InputProtocol) w, h, err := enc.GetVideoSize() if err != nil { return nil, err @@ -74,7 +75,7 @@ func NewCatoso(cfg *config.Config) (*Catoso, error) { draw := false if cfg.DrawOverFace != "" { - debug = true + draw = true } vis := vision.NewVision(cfg.CascadePath, w, h, delay, fskip, debug, draw) @@ -82,8 +83,13 @@ func NewCatoso(cfg *config.Config) (*Catoso, error) { var st *vision.Stream if cfg.StreamPort != "" { st = vision.NewStream() + h, err := health.NewHealth(cfg.InputImage) + if err != nil { + return nil, err + } http.Handle("/", st) + http.Handle("/health", h) server := &http.Server{ Addr: "0.0.0.0:" + cfg.StreamPort, diff --git a/pkg/config/config.go b/pkg/config/config.go index 69c2e98..fde1170 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -11,7 +11,11 @@ type Config struct { OnvifIP string `json:"onvifIP"` OnvifPort string `json:"onvifPort"` InputImage string `json:"inputImage"` + InputProtocol string `json:"inputProtocol"` InputFps string `json:"inputFps"` + InputPrefix string `json:"inputPrefix"` + InputHost string `json:"inputHost"` + InputSuffix string `json:"inputSuffix"` CascadePath string `json:"cascadePath"` CenterCamera string `json:"centerCamera"` CatosoDebug string `json:"catosoDebug"` @@ -33,10 +37,14 @@ func NewConfig() (*Config, error) { OnvifIP: os.Getenv("ONVIF_IP"), OnvifPort: os.Getenv("ONVIF_PORT"), InputImage: os.Getenv("INPUT_IMAGE"), + InputProtocol: os.Getenv("INPUT_PROTOCOL"), CascadePath: os.Getenv("CASCADE_PATH"), CenterCamera: os.Getenv("CENTER_CAMERA"), CatosoDebug: os.Getenv("CATOSO_DEBUG"), InputFps: os.Getenv("INPUT_FPS"), + InputPrefix: os.Getenv("INPUT_PREFIX"), + InputHost: os.Getenv("INPUT_HOST"), + InputSuffix: os.Getenv("INPUT_SUFFIX"), StreamPort: os.Getenv("STREAM_PORT"), OutputFrameSkip: os.Getenv("OUTPUT_FRAMESKIP"), DelayAfterDetectMin: os.Getenv("DELAY_AFTER_DETECT_MIN"), @@ -48,6 +56,9 @@ func NewConfig() (*Config, error) { BucketSecret: os.Getenv("BUCKET_SECRET"), } + if cfg.InputImage == "" { + cfg.InputImage = cfg.InputPrefix + cfg.InputHost + cfg.InputSuffix + } if cfg.InputImage == "" { return nil, errors.New("missing INPUT_IMAGE env") } diff --git a/pkg/encoder/encoder.go b/pkg/encoder/encoder.go index a8c0665..01205bb 100644 --- a/pkg/encoder/encoder.go +++ b/pkg/encoder/encoder.go @@ -20,19 +20,24 @@ type VideoInfo struct { } type Encoder struct { - InputImage string - Fps string + InputImage string + InputProtocol string + Fps string } -func NewEncoder(input string, fps string) *Encoder { +func NewEncoder(input string, fps string, protocol string) *Encoder { + if protocol == "" { + protocol = "tcp" + } return &Encoder{ - InputImage: input, - Fps: fps, + InputImage: input, + InputProtocol: protocol, + Fps: fps, } } func (h Encoder) GetVideoSize() (int, int, error) { - data, err := ffmpeg.ProbeWithTimeout(h.InputImage, time.Duration(time.Second*30), ffmpeg.KwArgs{"rtsp_transport": "tcp"}) + data, err := ffmpeg.ProbeWithTimeout(h.InputImage, time.Duration(time.Second*30), ffmpeg.KwArgs{"rtsp_transport": h.InputProtocol}) if err != nil { return 0, 0, err } @@ -58,7 +63,7 @@ func (h Encoder) ReadStream(ctx context.Context, stdout io.WriteCloser, stderr i } else { output = ffmpeg.KwArgs{"format": "rawvideo", "pix_fmt": "rgb24"} } - return ffmpeg.Input(h.InputImage, ffmpeg.KwArgs{"rtsp_transport": "tcp"}). + return ffmpeg.Input(h.InputImage, ffmpeg.KwArgs{"rtsp_transport": h.InputProtocol}). Output("pipe:", output). WithOutput(stdout). WithErrorOutput(stderr). diff --git a/pkg/health/health.go b/pkg/health/health.go new file mode 100644 index 0000000..8f8f67c --- /dev/null +++ b/pkg/health/health.go @@ -0,0 +1,59 @@ +package health + +import ( + "bytes" + "errors" + "net" + "net/http" + "regexp" +) + +type Health struct { + Url string + Host string +} + +func NewHealth(url string) (*Health, error) { + s := regexp.MustCompile(`^[a-z]+://([^:]+):([0-9]+)`).FindStringSubmatch(url) + if len(s) != 3 { + return nil, errors.New("invalid url") + } + return &Health{ + Url: url, + Host: s[1] + ":" + s[2], + }, nil +} + +func (h *Health) ServeHTTP(w http.ResponseWriter, r *http.Request) { + c, err := net.Dial("tcp", h.Host) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + defer c.Close() + + _, err = c.Write([]byte("OPTIONS * RTSP/1.0\r\nCSeq: 1\r\n\r\n")) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + buff := []byte{} + for { + message := make([]byte, 1024) + n, err := c.Read(message) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + } + buff = append(buff, message[:n]...) + if bytes.Contains(message[:n], []byte("\r\n\r\n")) { + break + } + } + + w.WriteHeader(http.StatusOK) + w.Write(buff) +}