generated from edgexfoundry-holding/template-repo
-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: return ffmpeg error logs to caller, and fix StreamingStatus #254
Merged
ajcasagrande
merged 4 commits into
edgexfoundry:main
from
EdgeX-Camera-Management:return-ffmpeg-error
May 25, 2023
Merged
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
bbba87a
fix: return ffmpeg error logs to caller, and fix StreamingStatus
ajcasagrande ef9f422
fix: lint error gosimple S1007
ajcasagrande a7aa7f0
fix: remove G204 warning from ffmpeg call
ajcasagrande 8854a94
fix: extend error timeout for ffmpeg, return quicker on success
ajcasagrande File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
linters: | ||
disable: | ||
disable: [] | ||
enable: | ||
- gosec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,179 @@ | ||||||||||
// -*- Mode: Go; indent-tabs-mode: t -*- | ||||||||||
// | ||||||||||
// Copyright (C) 2023 Intel Corporation | ||||||||||
// | ||||||||||
// SPDX-License-Identifier: Apache-2.0 | ||||||||||
|
||||||||||
package driver | ||||||||||
|
||||||||||
import ( | ||||||||||
"bufio" | ||||||||||
"bytes" | ||||||||||
"fmt" | ||||||||||
"github.com/edgexfoundry/go-mod-core-contracts/v3/models" | ||||||||||
"os/exec" | ||||||||||
"strings" | ||||||||||
) | ||||||||||
|
||||||||||
const ( | ||||||||||
maxStderrLines = 25 | ||||||||||
ffmpegLogLevel = "info" | ||||||||||
) | ||||||||||
|
||||||||||
// runTranscoderWithOutput is based on transcoder.Transcoder.Run(), but tweaks a few things, and adds some | ||||||||||
// quality of life improvements for the end user. It starts the transcoding process while also logging the ffmpeg | ||||||||||
// output (from StdErr). StdErr text is also returned via the done error channel, so that it can be returned | ||||||||||
// to the caller of a REST API. If an error occurs starting the process, it is returned immediately, and not | ||||||||||
// via the error channel. | ||||||||||
func (dev *Device) runTranscoderWithOutput() (<-chan error, error) { | ||||||||||
dev.mutex.Lock() | ||||||||||
defer dev.mutex.Unlock() | ||||||||||
|
||||||||||
t := dev.transcoder | ||||||||||
|
||||||||||
// generate the ffmpeg command line options, and prepend with some pre-defined options | ||||||||||
// -loglevel level+<ffmpegLogLevel>: will set the log level to ffmpegLogLevel and prefix output with the log level (for parsing) | ||||||||||
command := append([]string{"-loglevel", "level+" + ffmpegLogLevel}, t.GetCommand()...) | ||||||||||
if dev.lc.LogLevel() != models.TraceLog { | ||||||||||
// disable progress output if trace logging is not enabled | ||||||||||
command = append([]string{"-nostats"}, command...) | ||||||||||
} | ||||||||||
// -rtsp_transport tcp: force the rtsp transport to use tcp | ||||||||||
// these args must be put in the output section and not the first args, so just inject them right before the last | ||||||||||
// arg which is the rtsp url. | ||||||||||
command = append(command[0:len(command)-1], "-rtsp_transport", "tcp", command[len(command)-1]) | ||||||||||
ffmpegBin := t.FFmpegExec() | ||||||||||
proc := exec.Command(ffmpegBin, command...) | ||||||||||
|
||||||||||
// Set the stdinPipe in case we need to stop the transcoding | ||||||||||
stdinPipe, err := proc.StdinPipe() | ||||||||||
if err != nil { | ||||||||||
dev.lc.Errorf("Ffmpeg Stdin not available: %s", err.Error()) | ||||||||||
} | ||||||||||
|
||||||||||
var stdErrLines []string | ||||||||||
stdErrPipe, err := proc.StderrPipe() | ||||||||||
if err != nil { | ||||||||||
dev.lc.Errorf("Ffmpeg StderrPipe not available: %s. Unable to track output from process.", err.Error()) | ||||||||||
} else { | ||||||||||
output := make(chan string, 10) | ||||||||||
// use a scanner to read the output of the pipe and send it to output channel | ||||||||||
go func() { | ||||||||||
defer close(output) | ||||||||||
scanner := bufio.NewScanner(stdErrPipe) | ||||||||||
scanner.Split(scanFFmpegLines) | ||||||||||
scanner.Buffer(make([]byte, 2), bufio.MaxScanTokenSize) | ||||||||||
|
||||||||||
for scanner.Scan() { | ||||||||||
// Scan the next line, redact it, and send it to output channel. | ||||||||||
output <- redact(scanner.Text()) | ||||||||||
} | ||||||||||
dev.lc.Debugf("Output scanner complete for transcoder for device %s", dev.name) | ||||||||||
}() | ||||||||||
|
||||||||||
// keep track of stdErr text, so it can be returned to the caller via done channel | ||||||||||
go func() { | ||||||||||
for line := range output { | ||||||||||
// cap the size so that way the memory usage does not grow on commands with lots of output | ||||||||||
if len(stdErrLines) >= maxStderrLines { | ||||||||||
continue | ||||||||||
} | ||||||||||
|
||||||||||
line = strings.Trim(line, " ") | ||||||||||
if len(line) == 0 { | ||||||||||
continue // skip blank lines | ||||||||||
} | ||||||||||
|
||||||||||
// log the line at specific level depending on the content | ||||||||||
if strings.Contains(line, "[error]") || strings.Contains(line, "[fatal]") { | ||||||||||
stdErrLines = append(stdErrLines, line) | ||||||||||
dev.lc.Errorf("%s transcoder: %s", dev.name, line) | ||||||||||
} else if strings.Contains(line, "[warning]") { | ||||||||||
stdErrLines = append(stdErrLines, line) | ||||||||||
dev.lc.Warnf("%s transcoder: %s", dev.name, line) | ||||||||||
} else { | ||||||||||
// log everything else as debug, as ffmpeg info messages are just debug data to us | ||||||||||
dev.lc.Debugf("%s transcoder: %s", dev.name, line) | ||||||||||
} | ||||||||||
} | ||||||||||
dev.lc.Debugf("Done processing output for transcoder for device %s", dev.name) | ||||||||||
}() | ||||||||||
} | ||||||||||
|
||||||||||
// attempt to start the process | ||||||||||
if err = proc.Start(); err != nil { | ||||||||||
return nil, fmt.Errorf("failed to start FFMPEG transcoding for device %s (%s) with %s, message %s", | ||||||||||
dev.name, redact(strings.Join(command, " ")), err, strings.Join(stdErrLines, "\n")) | ||||||||||
} | ||||||||||
// only set the transcoder's process if we are successful in starting it | ||||||||||
t.SetProcess(proc) | ||||||||||
t.SetProcessStdinPipe(stdinPipe) | ||||||||||
dev.lc.Debugf("Set IsStreaming=true for device %s", dev.name) | ||||||||||
dev.streamingStatus.IsStreaming = true | ||||||||||
dev.streamingStatus.Error = "" | ||||||||||
|
||||||||||
dev.lc.Debugf("FFmpeg transcoder process for device %s has started with pid %d", dev.name, proc.Process.Pid) | ||||||||||
|
||||||||||
// in the background we will wait for the process to complete and return any errors over the done channel | ||||||||||
done := make(chan error) | ||||||||||
go func() { | ||||||||||
defer close(done) | ||||||||||
|
||||||||||
// wait until the process has exited | ||||||||||
err = proc.Wait() | ||||||||||
dev.lc.Debugf("FFmpeg process with pid %d for device %s exited with code %d. User time: %v, System time: %v", | ||||||||||
proc.Process.Pid, dev.name, proc.ProcessState.ExitCode(), proc.ProcessState.UserTime(), proc.ProcessState.UserTime()) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
||||||||||
dev.mutex.Lock() | ||||||||||
dev.lc.Debugf("Set IsStreaming=false for device %s", dev.name) | ||||||||||
dev.streamingStatus.IsStreaming = false | ||||||||||
|
||||||||||
// if ffmpeg returned an error, add more details surrounding it | ||||||||||
if err != nil { | ||||||||||
err = fmt.Errorf("failed finish FFMPEG transcoding for device %s (%s) with %s message %s", | ||||||||||
dev.name, redact(strings.Join(command, " ")), err.Error(), strings.Join(stdErrLines, "\n")) | ||||||||||
dev.streamingStatus.Error = err.Error() | ||||||||||
} else { | ||||||||||
dev.streamingStatus.Error = "" | ||||||||||
} | ||||||||||
t.SetProcess(nil) | ||||||||||
t.SetProcessStdinPipe(nil) | ||||||||||
dev.mutex.Unlock() | ||||||||||
done <- err | ||||||||||
}() | ||||||||||
|
||||||||||
return done, nil | ||||||||||
} | ||||||||||
|
||||||||||
// scanFFmpegLines is based on bufio.ScanLines, however it will return a line as soon as | ||||||||||
// it reaches a \r even if it is not followed by a \n. The reason for this is that ffmpeg | ||||||||||
// sometimes uses \r as a way to replace the previous line, such as when progress is enabled. | ||||||||||
// In those cases, the default bufio.ScanLines will miss those messages. | ||||||||||
func scanFFmpegLines(data []byte, atEOF bool) (advance int, token []byte, err error) { | ||||||||||
if atEOF && len(data) == 0 { | ||||||||||
// No more data. Return. | ||||||||||
return 0, nil, nil | ||||||||||
} | ||||||||||
|
||||||||||
if i := bytes.IndexByte(data, '\r'); i == 0 { | ||||||||||
return 1, nil, nil // Skip blank line. | ||||||||||
} else if i > 0 { | ||||||||||
// We have a cr terminated line | ||||||||||
return i + 1, data[0:i], nil | ||||||||||
} | ||||||||||
|
||||||||||
if i := bytes.IndexByte(data, '\n'); i == 0 { | ||||||||||
return 1, nil, nil // Skip blank line. | ||||||||||
} else if i > 0 { | ||||||||||
// We have a newline terminated line. | ||||||||||
return i + 1, data[0:i], nil | ||||||||||
} | ||||||||||
|
||||||||||
// If we are at EOF, we have a final, non-terminated line. Return it. | ||||||||||
if atEOF { | ||||||||||
return len(data), data, nil | ||||||||||
} | ||||||||||
|
||||||||||
// Request more data. | ||||||||||
return 0, nil, nil | ||||||||||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes, one second is not enough for FFmpeg to complete the execution of the start streaming command. This could be the root cause of the issue @vyshali-chitikeshi encountered.
I tried extending it to 3 seconds, and then I was able to receive the expected 500 status code when the streaming failed to start.
However, defining how long to wait seems challenging as it may depend on machine performance. We also need to consider the HTTP request timeout, which is 5 seconds by default. https://github.com/edgexfoundry/edgex-go/blob/3d7959681c73ad1dd76784b2336248bd6b6384c3/cmd/core-common-config-bootstrapper/res/configuration.yaml#L32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, originally I had made this to be 2 seconds (instead of the current 1 second) when I started this work, but I dropped it back down to 1 as I noticed my typical error response was like 400-600ms. However, I think that may be based on what the actual error was (eg. bad argument, rtsp server offline, etc), and like you said, system performance. In theory we could make this configurable as well.
I had an idea that we could use something in the output stream to determine if the streaming has started ok, but was not sure what we could parse as a potential indicator of that. I know enabling progress logs would allow that, but doesn't seem worth it for just grabbing beginning of stream confirmation.
The last idea is that we return right away, and expect the user to monitor StreamingStatus endpoint for any errors, since as @vyshali-chitikeshi has validated, that always contains the error when the api returned success too early.