Skip to content

Commit

Permalink
handle EOF on single line content (elastic#33568)
Browse files Browse the repository at this point in the history
* handle EOF on single line content

* changelog

* fallback to encode_eof if no events in aws-s3 input

* lint

* lint

* collect on EOF in line reader

* remove encode eof

* remove iterN

* fix test

* increase test coverage

* linting

* more linting

* increase coverage
  • Loading branch information
Andrea Spacca authored Nov 30, 2022
1 parent 141ad33 commit 7b45320
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
- Fix handling of empty array in httpjson input. {pull}32001[32001]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597]
- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609]
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]
Expand Down
5 changes: 3 additions & 2 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package readfile
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
Expand All @@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) {
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
Expand Down
8 changes: 7 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ type Config struct {
BufferSize int
Terminator LineTerminator
MaxBytes int
// If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this
// will ensure full content including last line with no EOL will be returned for fully retrieved content that's
// not appended anymore between reads.
// If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
CollectOnEOF bool
}

// New creates a new Encode reader from input reader by applying
// NewEncodeReader creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
Expand Down
96 changes: 68 additions & 28 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"errors"
"fmt"
"io"

Expand All @@ -33,18 +34,22 @@ const unlimited = 0
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached.
// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
type LineReader struct {
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
collectOnEOF bool
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// NewLineReader creates a new reader object
Expand All @@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
}

return &LineReader{
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
collectOnEOF: config.CollectOnEOF,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}

Expand All @@ -88,12 +94,46 @@ func (r *LineReader) Next() (b []byte, n int, err error) {
// read next 'potential' line from input buffer/reader
err := r.advance()
if err != nil {
if errors.Is(err, io.EOF) && r.collectOnEOF {
// Found EOF and collectOnEOF is true
// -> decode input sequence into outBuffer
// let's take whole buffer len without len(nl) if it ends with it
end := r.inBuffer.Len()
if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) {
end -= len(r.nl)
}

sz, err := r.decode(end)
if err != nil {
r.logger.Errorf("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = r.inBuffer.Len()
}

// Consume transformed bytes from input buffer
_ = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// output buffer contains untile EOF. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
if err != nil {
// This should never happen as otherwise we have a broken state
panic(err)
}

// return and reset consumed bytes count
sz = r.byteCount
r.byteCount = 0
return bytes, sz, io.EOF
}

// return and reset consumed bytes count
sz := r.byteCount
r.byteCount = 0
return nil, sz, err
}

// Check last decoded byte really being newline also unencoded
// if not, continue reading
buf := r.outBuffer.Bytes()
Expand Down Expand Up @@ -144,13 +184,13 @@ func (r *LineReader) advance() error {
// Try to read more bytes into buffer
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
if errors.Is(err, io.EOF) && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])
_, _ = r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
Expand All @@ -169,7 +209,7 @@ func (r *LineReader) advance() error {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
_ = r.inBuffer.Advance(idx + len(r.nl))
r.byteCount += idx + len(r.nl)
r.inBuffer.Reset()
r.inOffset = 0
Expand Down Expand Up @@ -237,7 +277,7 @@ func (r *LineReader) skipUntilNewLine() (int, error) {
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
_, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx + len(r.nl)
} else {
skipped += n
Expand Down Expand Up @@ -267,8 +307,8 @@ func (r *LineReader) decode(end int) (int, error) {
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
r.outBuffer.Write(inBytes[0:end])
if !errors.Is(err, transform.ErrShortDst) {
_, _ = r.outBuffer.Write(inBytes[0:end])
start = end
break
}
Expand All @@ -278,7 +318,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(r.tempBuffer[:nDst])
_, _ = r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down
Loading

0 comments on commit 7b45320

Please sign in to comment.