diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index 820c85be983..53c5030e81c 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -47,6 +47,9 @@ func (ct *Type) UnmarshalText(in []byte) error { } // IsZstd returns true if the compression type is zstd. +// The specified compression level is not validated. +// Because zstd supports returning an encoder level that closest matches the compression ratio of a specific zstd compression level. +// Many input values will provide the same compression level. func (ct *Type) IsZstd() bool { parts := strings.Split(string(*ct), "/") return parts[0] == string(TypeZstd) @@ -68,6 +71,7 @@ func (ct *Type) IsGzip() bool { levelStr == zlib.NoCompression { return true } + return false } return true } @@ -90,6 +94,7 @@ func (ct *Type) IsZlib() bool { levelStr == zlib.NoCompression { return true } + return false } return true } diff --git a/config/configcompression/compressiontype_test.go b/config/configcompression/compressiontype_test.go index cf8166d5a24..4ce2a82e6b2 100644 --- a/config/configcompression/compressiontype_test.go +++ b/config/configcompression/compressiontype_test.go @@ -78,3 +78,116 @@ func TestUnmarshalText(t *testing.T) { }) } } + +func TestIsZstd(t *testing.T) { + tests := []struct { + name string + input Type + expected bool + }{ + { + name: "ValidZstd", + input: TypeZstd, + expected: true, + }, + { + name: "InvalidZstd", + input: TypeGzip, + expected: false, + }, + { + name: "ValidZstdLevel", + input: "zstd/11", + expected: true, + }, + { + name: "ValidZstdLevel", + input: "zstd/One", + expected: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.input.IsZstd()) + }) + } +} + +func TestIsGzip(t *testing.T) { + tests := []struct { + name string + input Type + expected bool + }{ + { + name: "ValidGzip", + input: TypeGzip, + expected: true, + }, + { + name: "InvalidGzip", + input: TypeZlib, + expected: false, + }, + { + name: "ValidZlibCompressionLevel", + input: "gzip/1", + expected: true, + }, + { + name: "InvalidZlibCompressionLevel", + input: "gzip/10", + expected: false, + }, + { + name: "InvalidZlibCompressionLevel", + input: "gzip/one", // Uses the default compression level + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.input.IsGzip()) + }) + } +} + +func TestIsZlib(t *testing.T) { + tests := []struct { + name string + input Type + expected bool + err bool + }{ + { + name: "ValidZlib", + input: TypeZlib, + expected: true, + }, + { + name: "InvalidZlib", + input: TypeGzip, + expected: false, + }, + { + name: "ValidZlibCompressionLevel", + input: "zlib/1", + expected: true, + }, + { + name: "InvalidZlibCompressionLevel", + input: "zlib/10", + expected: false, + }, + { + name: "InvalidZlibCompressionLevel", + input: "zlib/one", // Uses the default compression level + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.input.IsZlib()) + }) + } +} diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 202ab65d472..4498fefe864 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -25,11 +25,6 @@ type compressRoundTripper struct { compressor *compressor } -type CompressionOptions struct { - compressionType configcompression.Type - compressionLevel int -} - var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ "": func(io.ReadCloser) (io.ReadCloser, error) { // Not a compressed payload. Nothing to do. @@ -77,14 +72,14 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro }, } -func newCompressRoundTripper(rt http.RoundTripper, compressionopts CompressionOptions) (*compressRoundTripper, error) { - encoder, err := newCompressor(compressionopts) +func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) { + encoder, err := newCompressor(compressionType) if err != nil { return nil, err } return &compressRoundTripper{ rt: rt, - compressionType: compressionopts.compressionType, + compressionType: compressionType, compressor: encoder, }, nil } diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index ada42c9e295..9ee626aa077 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -110,13 +110,13 @@ func TestHTTPClientCompression(t *testing.T) { Compression: tt.encoding, } client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) - require.NoError(t, err) - res, err := client.Do(req) if tt.shouldError { assert.Error(t, err) return } require.NoError(t, err) + res, err := client.Do(req) + require.NoError(t, err) _, err = io.ReadAll(res.Body) require.NoError(t, err) @@ -296,8 +296,9 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) { req, err := http.NewRequest(http.MethodGet, srv.URL, nil) require.NoError(t, err, "failed to create request to test handler") - client := srv.Client() - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + client := http.Client{} + // compression := CompressionOptions{configcompression.TypeGzip, gzip.BestSpeed} + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, "gzip/1") require.NoError(t, err) res, err := client.Do(req) require.NoError(t, err) diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index a93ff625bf6..5c473d963ad 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -9,6 +9,8 @@ import ( "compress/zlib" "errors" "io" + "strconv" + "strings" "sync" "github.com/golang/snappy" @@ -26,24 +28,46 @@ type compressor struct { pool sync.Pool } +type CompressionOptions struct { + compressionType configcompression.Type + compressionLevel int +} + +// Gets the compression type and level from the configuration. +func getCompression(compressionField configcompression.Type) (compressionType configcompression.Type, compressionLevel int) { + parts := strings.Split(string(compressionField), "/") + + compressionLevel = zlib.DefaultCompression + compressionType = configcompression.Type(parts[0]) + if len(parts) > 1 { + levelStr := parts[1] + if level, err := strconv.Atoi(levelStr); err == nil { + compressionLevel = level + } + } + return compressionType, compressionLevel +} + // writerFactory defines writer field in CompressRoundTripper. // The validity of input is already checked when NewCompressRoundTripper was called in confighttp, -func newCompressor(compressionopts CompressionOptions) (*compressor, error) { - switch compressionopts.compressionType { +func newCompressor(compressionType configcompression.Type) (*compressor, error) { + compressionType, compressionLevel := getCompression(compressionType) + + switch compressionType { case configcompression.TypeGzip: var _ writeCloserReset = (*gzip.Writer)(nil) - return &compressor{pool: sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, compressionopts.compressionLevel); return w }}}, nil + return &compressor{pool: sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, compressionLevel); return w }}}, nil case configcompression.TypeSnappy: var _ writeCloserReset = (*snappy.Writer)(nil) return &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}, nil case configcompression.TypeZstd: var _ writeCloserReset = (*zstd.Encoder)(nil) - compression := zstd.EncoderLevelFromZstd(compressionopts.compressionLevel) + compression := zstd.EncoderLevelFromZstd(compressionLevel) encoderLevel := zstd.WithEncoderLevel(compression) return &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }}}, nil case configcompression.TypeZlib, configcompression.TypeDeflate: var _ writeCloserReset = (*zlib.Writer)(nil) - return &compressor{pool: sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, compressionopts.compressionLevel); return w }}}, nil + return &compressor{pool: sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, compressionLevel); return w }}}, nil } return nil, errors.New("unsupported compression type") } diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 61b8436aa6d..445be0fd38d 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -13,8 +13,6 @@ import ( "net/http" "net/http/cookiejar" "net/url" - "strconv" - "strings" "time" "github.com/rs/cors" @@ -134,21 +132,6 @@ func NewDefaultClientConfig() ClientConfig { } } -func setCompression(compressionField configcompression.Type) (compressionType configcompression.Type, compressionLevel int) { - parts := strings.Split(string(compressionField), "/") - - // Set compression type - compressionLevel = 1 - compressionType = configcompression.Type(parts[0]) - if len(parts) > 1 { - levelStr := parts[1] - if level, err := strconv.Atoi(levelStr); err == nil { - compressionLevel = level - } - } - return compressionType, compressionLevel -} - // ToClient creates an HTTP client. func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, settings component.TelemetrySettings) (*http.Client, error) { tlsCfg, err := hcs.TLSSetting.LoadTLSConfig(ctx) @@ -235,16 +218,16 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett // Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed. if hcs.Compression.IsCompressed() { if hcs.Compression.IsZstd() || hcs.Compression.IsGzip() || hcs.Compression.IsZlib() { - compressionType, compressionLevel := setCompression(hcs.Compression) - compression := CompressionOptions{compressionType, compressionLevel} - clientTransport, err = newCompressRoundTripper(clientTransport, compression) + // compressionType, compressionLevel := getCompression(hcs.Compression) + // compression := CompressionOptions{compressionType, compressionLevel} + clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression) if err != nil { return nil, err } } else { // Use the default if the compression level is not specified. - compressionopts := CompressionOptions{hcs.Compression, -1} - clientTransport, err = newCompressRoundTripper(clientTransport, compressionopts) + // compressionopts := CompressionOptions{hcs.Compression, -1} + clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression) if err != nil { return nil, err }