Skip to content

Commit

Permalink
Adding support for the compression struct while preserving backwards …
Browse files Browse the repository at this point in the history
…compatibility (for string)
  • Loading branch information
rnishtala-sumo committed Nov 21, 2024
1 parent e843c4b commit 976f44c
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 93 deletions.
71 changes: 34 additions & 37 deletions config/configcompression/compressiontype.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package configcompression // import "go.opentelemetry.io/collector/config/config

import (
"fmt"
"strconv"
"strings"

"github.com/klauspost/compress/zlib"
)
Expand All @@ -21,14 +19,15 @@ type TypeWithLevel struct {
}

const (
TypeGzip Type = "gzip"
TypeZlib Type = "zlib"
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
typeNone Type = "none"
typeEmpty Type = ""
TypeGzip Type = "gzip"
TypeZlib Type = "zlib"
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
typeNone Type = "none"
typeEmpty Type = ""
LevelNone Level = 0
)

// IsCompressed returns false if CompressionType is nil, none, or empty.
Expand All @@ -38,42 +37,40 @@ func (ct *Type) IsCompressed() bool {
}

func (ct *TypeWithLevel) UnmarshalText(in []byte) error {
var err error
parts := strings.Split(string(in), "/")
compressionTyp := Type(parts[0])
level := zlib.DefaultCompression
if len(parts) == 2 {
level, err = strconv.Atoi(parts[1])
if err != nil {
return fmt.Errorf("invalid compression level: %q", parts[1])
}
if compressionTyp == TypeSnappy ||
compressionTyp == TypeLz4 ||
compressionTyp == typeNone ||
compressionTyp == typeEmpty {
return fmt.Errorf("compression level is not supported for %q", compressionTyp)
}
typ := Type(in)
if typ == TypeGzip ||
typ == TypeZlib ||
typ == TypeDeflate ||
typ == TypeSnappy ||
typ == TypeZstd ||
typ == TypeLz4 ||
typ == typeNone ||
typ == typeEmpty {
*&ct.Type = typ
return nil
}
ct.Level = Level(level)
if (compressionTyp == TypeGzip && isValidLevel(level)) ||
(compressionTyp == TypeZlib && isValidLevel(level)) ||
(compressionTyp == TypeDeflate && isValidLevel(level)) ||
compressionTyp == TypeSnappy ||
compressionTyp == TypeLz4 ||
compressionTyp == TypeZstd ||
compressionTyp == typeNone ||
compressionTyp == typeEmpty {
ct.Level = Level(level)
ct.Type = compressionTyp
return fmt.Errorf("unsupported compression type %q", typ)
}

func (ct *TypeWithLevel) Validate() error {
if (ct.Type == TypeGzip && isValidLevel(int(ct.Level))) ||
(ct.Type == TypeZlib && isValidLevel(int(ct.Level))) ||
(ct.Type == TypeDeflate && isValidLevel(int(ct.Level))) ||
ct.Type == TypeSnappy ||
ct.Type == TypeLz4 ||
ct.Type == TypeZstd ||
ct.Type == typeNone ||
ct.Type == typeEmpty {
return nil
}

Check warning on line 65 in config/configcompression/compressiontype.go

View check run for this annotation

Codecov / codecov/patch

config/configcompression/compressiontype.go#L55-L65

Added lines #L55 - L65 were not covered by tests

return fmt.Errorf("unsupported compression type and level(default if not specified) %s - %d", compressionTyp, ct.Level)
return fmt.Errorf("unsupported compression type and level %s - %d", ct.Type, ct.Level)

Check warning on line 67 in config/configcompression/compressiontype.go

View check run for this annotation

Codecov / codecov/patch

config/configcompression/compressiontype.go#L67

Added line #L67 was not covered by tests
}

// Checks the validity of zlib/gzip/flate compression levels
func isValidLevel(level int) bool {
return level == zlib.DefaultCompression ||
level == int(LevelNone) ||
level == zlib.HuffmanOnly ||
level == zlib.NoCompression ||
level == zlib.BestSpeed ||
Expand Down
37 changes: 1 addition & 36 deletions config/configcompression/compressiontype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package configcompression

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -48,12 +47,6 @@ func TestUnmarshalText(t *testing.T) {
shouldError: false,
isCompressed: true,
},
{
name: "ValidZstdLevel",
compressionName: []byte("zstd/11"),
shouldError: false,
isCompressed: true,
},
{
name: "ValidEmpty",
compressionName: []byte(""),
Expand All @@ -75,34 +68,6 @@ func TestUnmarshalText(t *testing.T) {
compressionName: []byte("ggip"),
shouldError: true,
},
{
name: "InvalidSnappy",
compressionName: []byte("snappy/1"),
shouldError: true,
},
{
name: "InvalidNone",
compressionName: []byte("none/1"),
shouldError: true,
},
{
name: "InvalidGzip",
compressionName: []byte("gzip/10"),
shouldError: true,
isCompressed: true,
},
{
name: "InvalidZlib",
compressionName: []byte("zlib/10"),
shouldError: true,
isCompressed: true,
},
{
name: "InvalidZstdLevel",
compressionName: []byte("zstd/ten"),
shouldError: true,
isCompressed: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -113,7 +78,7 @@ func TestUnmarshalText(t *testing.T) {
return
}
require.NoError(t, err)
ct := Type(strings.Split(string(tt.compressionName), "/")[0])
ct := Type(tt.compressionName)
assert.Equal(t, temp.Type, ct)
assert.Equal(t, tt.isCompressed, ct.IsCompressed())
})
Expand Down
112 changes: 93 additions & 19 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,68 +26,142 @@ import (
"go.opentelemetry.io/collector/config/configcompression"
)

func TestHTTPClientCompression(t *testing.T) {
func TestHTTPClientCompressionwithLevel(t *testing.T) {
testBody := []byte("uncompressed_text")
compressedGzipBody := compressGzip(t, testBody)
compressedZlibBody := compressZlib(t, testBody)
compressedDeflateBody := compressZlib(t, testBody)
compressedSnappyBody := compressSnappy(t, testBody)
compressedZstdBody := compressZstd(t, testBody)
compressedLz4Body := compressLz4(t, testBody)

const (
gzipLevel configcompression.Type = "gzip/1"
zlibLevel configcompression.Type = "zlib/1"
deflateLevel configcompression.Type = "deflate/1"
zstdLevel configcompression.Type = "zstd/11"
gzipLevel configcompression.Level = 1
zlibLevel configcompression.Level = 1
deflateLevel configcompression.Level = 1
zstdLevel configcompression.Level = 11
)

tests := []struct {
name string
encoding configcompression.Type
enclevel configcompression.Level
enctype configcompression.Type
reqBody []byte
shouldError bool
}{
{
name: "ValidEmpty",
encoding: "",
enctype: "",
reqBody: testBody,
shouldError: false,
},
{
name: "ValidNone",
encoding: "none",
enctype: "none",
reqBody: testBody,
shouldError: false,
},
{
name: "ValidGzip",
encoding: gzipLevel,
enctype: "gzip",
enclevel: gzipLevel,
reqBody: compressedGzipBody.Bytes(),
shouldError: false,
},
{
name: "InvalidGzip",
enctype: "gzip",
enclevel: 20,
reqBody: compressedGzipBody.Bytes(),
shouldError: true,
},
{
name: "ValidZlib",
encoding: zlibLevel,
enctype: "zlib",
enclevel: zlibLevel,
reqBody: compressedZlibBody.Bytes(),
shouldError: false,
},
{
name: "ValidDeflate",
encoding: deflateLevel,
enctype: "deflate",
enclevel: deflateLevel,
reqBody: compressedDeflateBody.Bytes(),
shouldError: false,
},
{
name: "ValidSnappy",
encoding: configcompression.TypeSnappy,
reqBody: compressedSnappyBody.Bytes(),
name: "ValidZstd",
enctype: "zstd",
enclevel: zstdLevel,
reqBody: compressedZstdBody.Bytes(),
shouldError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, tt.reqBody, body)
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)

reqBody := bytes.NewBuffer(testBody)

req, err := http.NewRequest(http.MethodGet, srv.URL, reqBody)
require.NoError(t, err, "failed to create request to test handler")
compression := configcompression.TypeWithLevel{}
err = compression.UnmarshalText([]byte(tt.enctype))
compression.Level = tt.enclevel
err = compression.Validate()
if tt.shouldError {
assert.Error(t, err)
return
}
require.NoError(t, err)
clientSettings := ClientConfig{
Endpoint: srv.URL,
Compression: compression,
}
client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
res, err := client.Do(req)
require.NoError(t, err)

_, err = io.ReadAll(res.Body)
require.NoError(t, err)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
})
}

}

func TestHTTPClientCompression(t *testing.T) {
testBody := []byte("uncompressed_text")
compressedSnappyBody := compressSnappy(t, testBody)
compressedLz4Body := compressLz4(t, testBody)

tests := []struct {
name string
encoding configcompression.Type
reqBody []byte
shouldError bool
}{
{
name: "ValidEmpty",
encoding: "",
reqBody: testBody,
shouldError: false,
},
{
name: "ValidZstd",
encoding: zstdLevel,
reqBody: compressedZstdBody.Bytes(),
name: "ValidNone",
encoding: "none",
reqBody: testBody,
shouldError: false,
},
{
name: "ValidSnappy",
encoding: configcompression.TypeSnappy,
reqBody: compressedSnappyBody.Bytes(),
shouldError: false,
},
{
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestUnmarshalConfig(t *testing.T) {
ReadBufferSize: 123,
WriteBufferSize: 345,
Timeout: time.Second * 10,
Compression: configcompression.TypeWithLevel{Type: "gzip", Level: -1},
Compression: configcompression.TypeWithLevel{Type: "gzip", Level: 0},
MaxIdleConns: &defaultMaxIdleConns,
MaxIdleConnsPerHost: &defaultMaxIdleConnsPerHost,
MaxConnsPerHost: &defaultMaxConnsPerHost,
Expand Down

0 comments on commit 976f44c

Please sign in to comment.