Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: return ffmpeg error logs to caller, and fix StreamingStatus #254

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
linters:
disable:
disable: []
enable:
- gosec
40 changes: 22 additions & 18 deletions internal/driver/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
package driver

import (
"context"
"fmt"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"reflect"
"regexp"
"strings"
Expand All @@ -32,13 +33,12 @@ var (
)

type Device struct {
lc logger.LoggingClient
name string
path string
serialNumber string
rtspUri string
transcoder *transcoder.Transcoder
ctx context.Context
cancelFunc context.CancelFunc
autoStreaming bool
mutex sync.Mutex
streamingStatus streamingStatus
Expand All @@ -57,30 +57,34 @@ type streamingStatus struct {
OutputVideoQuality string
}

func (dev *Device) StartStreaming(ctx context.Context, cancel context.CancelFunc) (<-chan error, error) {
func (dev *Device) StartStreaming() (<-chan error, error) {
dev.mutex.Lock()
defer dev.mutex.Unlock()
if dev.streamingStatus.IsStreaming {
isStreaming := dev.streamingStatus.IsStreaming
dev.mutex.Unlock()
if isStreaming {
return nil, fmt.Errorf("video streaming is already in progress")
}
dev.ctx = ctx
dev.cancelFunc = cancel
errChan := dev.transcoder.Run(false)
dev.streamingStatus.IsStreaming = true

dev.lc.Infof("Attempting to start streaming device %s", dev.name)
errChan, err := dev.runTranscoderWithOutput()
if err != nil {
wrappedErr := errors.NewCommonEdgeX(errors.KindServerError, "failed running ffmpeg transcoder for device "+dev.name, err)
return nil, wrappedErr
}
return errChan, nil
}

func (dev *Device) StopStreaming(err error) {
func (dev *Device) StopStreaming() {
dev.mutex.Lock()
defer dev.mutex.Unlock()
if err != nil {
dev.streamingStatus.Error = err.Error()
} else {
dev.streamingStatus.Error = ""
if !dev.streamingStatus.IsStreaming {
return
}
if dev.streamingStatus.IsStreaming {
dev.cancelFunc()
dev.streamingStatus.IsStreaming = false

dev.lc.Debugf("Stopping transcoder for device %s", dev.name)
if err := dev.transcoder.Stop(); err != nil {
dev.lc.Errorf("Failed to stop video streaming transcoder for device %s, error: %s", dev.name, err)
return
}
}

Expand Down
57 changes: 24 additions & 33 deletions internal/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (d *Driver) Start() error {
}

func (d *Driver) StartRTSPCredentialServer() {
d.lc.Infof("Starting rtsp server")
d.lc.Infof("Starting rtsp authentication server on %s", d.rtspAuthenticationServerUri)
defer d.wg.Done()

router := mux.NewRouter()
Expand Down Expand Up @@ -347,11 +347,10 @@ func (d *Driver) HandleWriteCommands(deviceName string, protocols map[string]mod
return errors.NewCommonEdgeXWrapper(edgexErr)
}
case VideoStopStreaming:
device.StopStreaming(nil)
device.StopStreaming()
default:
return errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("unsupported command %s", command), nil)
}
go d.publishStreamingStatus(device)
}

return nil
Expand All @@ -376,7 +375,7 @@ func (d *Driver) Stop(force bool) error {

for _, device := range d.activeDevices {
go func(device *Device) {
device.StopStreaming(nil)
device.StopStreaming()
d.wg.Done()
}(device)
}
Expand Down Expand Up @@ -452,7 +451,7 @@ func (d *Driver) RemoveDevice(deviceName string, protocols map[string]models.Pro
d.mutex.Lock()
defer d.mutex.Unlock()
if device, ok := d.activeDevices[deviceName]; ok {
device.StopStreaming(nil)
device.StopStreaming()
delete(d.activeDevices, deviceName)
d.lc.Debugf("Device %s is removed", deviceName)
}
Expand Down Expand Up @@ -627,6 +626,7 @@ func (d *Driver) newDevice(name string, protocols map[string]models.ProtocolProp
}

return &Device{
lc: d.lc,
name: name,
path: fdPath,
serialNumber: sn,
Expand Down Expand Up @@ -676,41 +676,32 @@ func (d *Driver) getDevice(name string) (*Device, errors.EdgeX) {
}

func (d *Driver) startStreaming(device *Device) errors.EdgeX {
ctx, cancel := context.WithCancel(context.TODO())
errChan, err := device.StartStreaming(ctx, cancel)
errChan, err := device.StartStreaming()
if err != nil {
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf(
"failed to start video streaming for device %s", device.name), err)
}
startErrs := make(chan errors.EdgeX, 1)
d.wg.Add(1)
go func() {
select {
case err := <-errChan:
device.StopStreaming(err)
d.lc.Errorf("the video streaming process for device %s has stopped", device.name)
startErrs <- errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("the video streaming process for device %s has stopped, error: %s", device.name, err), err)
d.wg.Done()
return
case <-device.ctx.Done():
if err := device.transcoder.Stop(); err != nil {
d.lc.Errorf("failed to stop video streaming for device %s, error: %s", device.name, err)
d.wg.Done()
return
}
d.lc.Debugf("the video streaming process for device %s has stopped", device.name)
d.wg.Done()
return
}

defer func() {
go d.publishStreamingStatus(device)
}()
for {
select {
case <-time.After(time.Second):
d.lc.Infof("start video streaming for device %s", device.name)

// wait a little bit before returning to see if there are any errors on startup
select {
case <-time.After(time.Second):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes, one second is not enough for FFmpeg to complete the execution of the start streaming command. This could be the root cause of the issue @vyshali-chitikeshi encountered.
I tried extending it to 3 seconds, and then I was able to receive the expected 500 status code when the streaming failed to start.

However, defining how long to wait seems challenging as it may depend on machine performance. We also need to consider the HTTP request timeout, which is 5 seconds by default. https://github.com/edgexfoundry/edgex-go/blob/3d7959681c73ad1dd76784b2336248bd6b6384c3/cmd/core-common-config-bootstrapper/res/configuration.yaml#L32

Copy link
Contributor Author

@ajcasagrande ajcasagrande May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, originally I had made this to be 2 seconds (instead of the current 1 second) when I started this work, but I dropped it back down to 1 as I noticed my typical error response was like 400-600ms. However, I think that may be based on what the actual error was (eg. bad argument, rtsp server offline, etc), and like you said, system performance. In theory we could make this configurable as well.

I had an idea that we could use something in the output stream to determine if the streaming has started ok, but was not sure what we could parse as a potential indicator of that. I know enabling progress logs would allow that, but doesn't seem worth it for just grabbing beginning of stream confirmation.

The last idea is that we return right away, and expect the user to monitor StreamingStatus endpoint for any errors, since as @vyshali-chitikeshi has validated, that always contains the error when the api returned success too early.

d.lc.Infof("Video streaming for device %s has started without error", device.name)
go func() {
// wait for the process to complete in the background and then publish the streaming status
<-errChan
d.publishStreamingStatus(device)
}()
return nil
case startErr := <-errChan:
if startErr == nil {
return nil
case startErr := <-startErrs:
return startErr
}
return errors.NewCommonEdgeX(errors.KindServerError,
fmt.Sprintf("the video streaming process for device %s has stopped", device.name), startErr)
}
}

Expand Down
14 changes: 14 additions & 0 deletions internal/driver/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,26 @@ package driver

import (
"fmt"
"regexp"

"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
)

const (
redactedStr = "//<redacted>@"
)

var (
userPassRegex = regexp.MustCompile(`//(\S+):(\S+)@`)
)

type EdgeXErrorWrapper struct{}

func (e EdgeXErrorWrapper) CommandError(command string, err error) errors.EdgeX {
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to execute %s command", command), err)
}

// redact removes all instances of basic auth (ie. rtsp://username:password@server) from a url
func redact(val string) string {
return userPassRegex.ReplaceAllString(val, redactedStr)
}
179 changes: 179 additions & 0 deletions internal/driver/transcoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2023 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

package driver

import (
"bufio"
"bytes"
"fmt"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
"os/exec"
"strings"
)

const (
maxStderrLines = 25
ffmpegLogLevel = "info"
)

// runTranscoderWithOutput is based on transcoder.Transcoder.Run(), but tweaks a few things, and adds some
// quality of life improvements for the end user. It starts the transcoding process while also logging the ffmpeg
// output (from StdErr). StdErr text is also returned via the done error channel, so that it can be returned
// to the caller of a REST API. If an error occurs starting the process, it is returned immediately, and not
// via the error channel.
func (dev *Device) runTranscoderWithOutput() (<-chan error, error) {
dev.mutex.Lock()
defer dev.mutex.Unlock()

t := dev.transcoder

// generate the ffmpeg command line options, and prepend with some pre-defined options
// -loglevel level+<ffmpegLogLevel>: will set the log level to ffmpegLogLevel and prefix output with the log level (for parsing)
command := append([]string{"-loglevel", "level+" + ffmpegLogLevel}, t.GetCommand()...)
if dev.lc.LogLevel() != models.TraceLog {
// disable progress output if trace logging is not enabled
command = append([]string{"-nostats"}, command...)
}
// -rtsp_transport tcp: force the rtsp transport to use tcp
// these args must be put in the output section and not the first args, so just inject them right before the last
// arg which is the rtsp url.
command = append(command[0:len(command)-1], "-rtsp_transport", "tcp", command[len(command)-1])
ffmpegBin := t.FFmpegExec()
proc := exec.Command(ffmpegBin, command...)

// Set the stdinPipe in case we need to stop the transcoding
stdinPipe, err := proc.StdinPipe()
if err != nil {
dev.lc.Errorf("Ffmpeg Stdin not available: %s", err.Error())
}

var stdErrLines []string
stdErrPipe, err := proc.StderrPipe()
if err != nil {
dev.lc.Errorf("Ffmpeg StderrPipe not available: %s. Unable to track output from process.", err.Error())
} else {
output := make(chan string, 10)
// use a scanner to read the output of the pipe and send it to output channel
go func() {
defer close(output)
scanner := bufio.NewScanner(stdErrPipe)
scanner.Split(scanFFmpegLines)
scanner.Buffer(make([]byte, 2), bufio.MaxScanTokenSize)

for scanner.Scan() {
// Scan the next line, redact it, and send it to output channel.
output <- redact(scanner.Text())
}
dev.lc.Debugf("Output scanner complete for transcoder for device %s", dev.name)
}()

// keep track of stdErr text, so it can be returned to the caller via done channel
go func() {
for line := range output {
// cap the size so that way the memory usage does not grow on commands with lots of output
if len(stdErrLines) >= maxStderrLines {
continue
}

line = strings.Trim(line, " ")
if len(line) == 0 {
continue // skip blank lines
}

// log the line at specific level depending on the content
if strings.Contains(line, "[error]") || strings.Contains(line, "[fatal]") {
stdErrLines = append(stdErrLines, line)
dev.lc.Errorf("%s transcoder: %s", dev.name, line)
} else if strings.Contains(line, "[warning]") {
stdErrLines = append(stdErrLines, line)
dev.lc.Warnf("%s transcoder: %s", dev.name, line)
} else {
// log everything else as debug, as ffmpeg info messages are just debug data to us
dev.lc.Debugf("%s transcoder: %s", dev.name, line)
}
}
dev.lc.Debugf("Done processing output for transcoder for device %s", dev.name)
}()
}

// attempt to start the process
if err = proc.Start(); err != nil {
return nil, fmt.Errorf("failed to start FFMPEG transcoding for device %s (%s) with %s, message %s",
dev.name, redact(strings.Join(command, " ")), err, strings.Join(stdErrLines, "\n"))
}
// only set the transcoder's process if we are successful in starting it
t.SetProcess(proc)
t.SetProcessStdinPipe(stdinPipe)
dev.lc.Debugf("Set IsStreaming=true for device %s", dev.name)
dev.streamingStatus.IsStreaming = true
dev.streamingStatus.Error = ""

dev.lc.Debugf("FFmpeg transcoder process for device %s has started with pid %d", dev.name, proc.Process.Pid)

// in the background we will wait for the process to complete and return any errors over the done channel
done := make(chan error)
go func() {
defer close(done)

// wait until the process has exited
err = proc.Wait()
dev.lc.Debugf("FFmpeg process with pid %d for device %s exited with code %d. User time: %v, System time: %v",
proc.Process.Pid, dev.name, proc.ProcessState.ExitCode(), proc.ProcessState.UserTime(), proc.ProcessState.UserTime())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dev.lc.Debugf("FFmpeg process with pid %d for device %s exited with code %d. User time: %v, System time: %v",
proc.Process.Pid, dev.name, proc.ProcessState.ExitCode(), proc.ProcessState.UserTime(), proc.ProcessState.UserTime())
dev.lc.Debugf("FFmpeg process with pid %d for device %s exited with code %d. User time: %v, System time: %v",
proc.Process.Pid, dev.name, proc.ProcessState.ExitCode(), proc.ProcessState.UserTime(), proc.ProcessState.SystemTime())


dev.mutex.Lock()
dev.lc.Debugf("Set IsStreaming=false for device %s", dev.name)
dev.streamingStatus.IsStreaming = false

// if ffmpeg returned an error, add more details surrounding it
if err != nil {
err = fmt.Errorf("failed finish FFMPEG transcoding for device %s (%s) with %s message %s",
dev.name, redact(strings.Join(command, " ")), err.Error(), strings.Join(stdErrLines, "\n"))
dev.streamingStatus.Error = err.Error()
} else {
dev.streamingStatus.Error = ""
}
t.SetProcess(nil)
t.SetProcessStdinPipe(nil)
dev.mutex.Unlock()
done <- err
}()

return done, nil
}

// scanFFmpegLines is based on bufio.ScanLines, however it will return a line as soon as
// it reaches a \r even if it is not followed by a \n. The reason for this is that ffmpeg
// sometimes uses \r as a way to replace the previous line, such as when progress is enabled.
// In those cases, the default bufio.ScanLines will miss those messages.
func scanFFmpegLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
// No more data. Return.
return 0, nil, nil
}

if i := bytes.IndexByte(data, '\r'); i == 0 {
return 1, nil, nil // Skip blank line.
} else if i > 0 {
// We have a cr terminated line
return i + 1, data[0:i], nil
}

if i := bytes.IndexByte(data, '\n'); i == 0 {
return 1, nil, nil // Skip blank line.
} else if i > 0 {
// We have a newline terminated line.
return i + 1, data[0:i], nil
}

// If we are at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}

// Request more data.
return 0, nil, nil
}