All notable changes to this project will be documented in this file.
This is a major version release, for more information and guidance on how to migrate please refer to https://docs.benthos.dev/migration/v3/.
- The
json
processor now allows you tomove
from either a root source or to a root destination. - Added interpolation to the
metadata
processorkey
field.
- Go modules are now fully supported, imports must now include the major version
(e.g.
github.com/Jeffail/benthos/v3
). - Removed deprecated
mmap_file
buffer. - Removed deprecated (and undocumented) metrics paths.
- Moved field
prefix
from root ofmetrics
into relevant child components. - Names of
process_dag
stages must now match the regexp[a-zA-Z0-9_-]+
. - Go API: buffer constructors now take a
types.Manager
argument in parity with other components. - JSON dot paths within the following components have been updated to allow
array-based operations:
awk
processorjson
processorprocess_field
processorprocess_map
processorcheck_field
conditionjson_field
function interpolations3
inputdynamodb
output
- The
sqs
output no longer attempts to send invalid attributes with payloads from metadata.
- The
s3
andsqs
inputs should now correctly log handles and codes from failed SQS message deletes and visibility timeout changes.
- New
message_group_id
andmessage_deduplication_id
fields added tosqs
output for supporting FIFO queues.
- Metadata field
gcp_pubsub_publish_time_unix
added togcp_pubsub
input. - New
tcp
andtcp_server
inputs. - New
udp_server
input. - New
tcp
andudp
outputs. - Metric paths
output.batch.bytes
andoutput.batch.latency
added. - New
rate_limit
processor.
- The
json
processor now correctly stores parsedvalue
JSON when usingset
on the root path.
- The
sqs
input now adds some message attributes as metadata. - Added field
delete_message
tosqs
input. - The
sqs
output now sends metadata as message attributes. - New
batch_policy
field added tomemory
buffer. - New
xml
processor.
- The
prometheus
metrics exporter adds quantiles back to timing metrics.
- Capped slices from lines reader are now enforced.
- The
json
processor now correctly honours anull
value.
- Disabled
kinesis_balanced
input for WASM builds.
- Field
codec
added toprocess_field
processor. - Removed experimental status from sync responses components, which are now considered stable.
- Field
pattern_definitions
added togrok
processor.
- Simplified serverless lambda main function body for improving plugin documentation.
- Fixed a bug where the
prepend
andappend
operators of thetext
processor could result in invalid messages when consuming line-based inputs.
- Field
clean_session
added tomqtt
input. - The
http_server
input now adds request query parameters to messages as metadata.
- Prevent concurrent access race condition on nested parallel
process_map
processors.
- New beta input
kinesis_balanced
. - Field
profile
added to AWS components credentials config.
- Improved error messages attached to payloads that fail
process_dag
. post mappings. - New
redis_hash
output. - New
sns
output.
- Allow extracting metric
rename
submatches into labels. - Field
use_patterns
added toredis_pubsub
input for subscribing to channels using glob-style patterns.
- Go API: It's now possible to specify a custom config unit test file path suffix.
- New rate limit and websocket message fields added to
http_server
input. - The
http
processor now optionally copies headers from response into resulting message metadata. - The
http
processor now sets ahttp_status_code
metadata value into resulting messages (provided one is received.)
- Go API: Removed experimental
Block
functions from the cache and rate limit packages.
- New (experimental) command flags
--test
and--gen-test
added. - All http client components output now set a metric
request_timeout
.
- All errors caught by processors should now be accessible via the
${!error}
interpolation function, rather than just flagged astrue
.
- The
process_field
processor now propagates metadata to the original payload with theresult_type
set to discard. This allows proper error propagation.
- Field
max_buffer
added tosubprocess
processor.
- The
subprocess
processor now correctly logs and recovers subprocess pipeline related errors (such as exceeding buffer limits.)
- New
json_delete
function added to theawk
processor.
- SQS output now correctly waits between retry attempts and escapes error loops during shutdown.
- Go API: Add
RunWithOpts
optOptOverrideConfigDefaults
.
- The
filter
andfilter_parts
config sections now correctly marshall when printing with--all
.
- Go API: A new service method
RunWithOpts
has been added in order to accomodate service customisations with opt funcs.
- New interpolation function
error
.
- New
number
condition. - New
number
processor. - New
avro
processor. - Operator
enum
added totext
condition. - Field
result_type
added toprocess_field
processor for marshalling results into non-string types. - Go API: Plugin APIs now allow nil config constructors.
- Registering plugins automatically adds plugin documentation flags to the main Benthos service.
- Output
http_client
is now able to propagate responses from each request back to inputs supporting sync responses. - Added support for Gzip compression to
http_server
output sync responses. - New
check_interpolation
condition.
- New
sync_response
output type, with experimental support added to thehttp_server
input. - SASL authentication fields added to all Kafka components.
- The
s3
input now setss3_content_encoding
metadata (when not using the download manager.) - New trace logging for the
rename
,blacklist
andwhitelist
metric components to assist with debugging.
- Ability to combine sync and async responses in serverless distributions.
- The
insert_part
,merge_json
andunarchive
processors now propagate message contexts.
- JSON processors no longer escape
&
,<
, and>
characters by default.
- The
http
processor now preserves message metadata and contexts. - Any
http
components that create requests with messages containing empty bodies now correctly function in WASM.
- New
fetch_buffer_cap
field forkafka
andkafka_balanced
inputs. - Input
gcp_pubsub
now has the fieldmax_batch_count
.
- Reduced allocations under most JSON related processors.
- Streams mode API now logs linting errors.
- New interpolation function
batch_size
.
- Output
elasticsearch
no longer reports index not found errors on connect.
- Input reader no longer overrides message contexts for opentracing spans.
- Improved construction error messages for
broker
andswitch
input and outputs.
- Plugins that don't use a configuration structure can now return nil in their sanitise functions in order to have the plugin section omitted.
- The
kafka
andkafka_balanced
inputs now set akafka_lag
metadata field to incoming messages. - The
awk
processor now has a variety of typedjson_set
functionsjson_set_int
,json_set_float
andjson_set_bool
. - Go API: Add experimental function for blocking cache and ratelimit constructors.
- The
json
processor now defaults to an executable operator (clean).
- Add experimental function for blocking processor constructors.
- Core service logic has been moved into new package
service
, making it easier to maintain plugin builds that match upstream Benthos.
- Experimental support for WASM builds.
- Config linting now reports line numbers.
- Config interpolations now support escaping.
- API for creating
cache
implementations. - API for creating
rate_limit
implementations.
This is a major version released due to a series of minor breaking changes, you can read the full migration guide here.
- Benthos now attempts to infer the
type
of config sections whenever the field is omitted, for more information please read this overview: Concise Configuration. - Field
unsubscribe_on_close
of thenats_stream
input is nowfalse
by default.
- The following commandline flags have been removed:
swap-envs
,plugins-dir
,list-input-plugins
,list-output-plugins
,list-processor-plugins
,list-condition-plugins
.
- Package
github.com/Jeffail/benthos/lib/processor/condition
changed togithub.com/Jeffail/benthos/lib/condition
. - Interface
types.Cache
now hastypes.Closable
embedded. - Interface
types.RateLimit
now hastypes.Closable
embedded. - Add method
GetPlugin
to interfacetypes.Manager
. - Add method
WithFields
to interfacelog.Modular
.
- Ensure
process_batch
processor gets normalised correctly.
- New
for_each
processor with the same behaviour asprocess_batch
,process_batch
is now considered an alias forfor_each
.
- The
sql
processor now executes across the batch, documentation updated to clarify.
- Corrected
result_codec
field insql
processor config.
- New
sql
processor.
- Using
json_map_columns
with thedynamodb
output should now correctly storenull
and array values within the target JSON structure.
- New
encode
anddecode
schemehex
.
- Fixed potential panic when attempting an invalid HTTP client configuration.
- Benthos in streams mode no longer tries to load directory
/benthos/streams
by default.
- Field
json_map_columns
added todynamodb
output.
- JSON references are now supported in configuration files.
- The
hash
processor now supportssha1
. - Field
force_path_style_urls
added tos3
components. - Field
content_type
of thes3
output is now interpolated. - Field
content_encoding
added tos3
output.
- The
benthos-lambda
distribution now correctly returns all message parts in synchronous execution.
- Docker builds now use a locally cached
vendor
for dependencies. - All
s3
components no longer default to enforcing path style URLs.
- New output
drop_on_error
. - Field
retry_until_success
added toswitch
output.
- Improved error and output logging for
subprocess
processor when the process exits unexpectedly.
- The main docker image is now based on busybox.
- Lint rule added for
batch
processors outside of the input section.
- Removed potential
benthos-lambda
panic on shut down.
- The
redis
cache no longer incorrectly returns a "key not found" error instead of connection errors.
- Changed docker tag format from
vX.Y.Z
toX.Y.Z
.
- Output
broker
patternfan_out_sequential
. - Output type
drop
for dropping all messages. - New interpolation function
timestamp_utc
.
- New
benthos-lambda
distribution for running Benthos as a lambda function.
- New
s3
cache implementation. - New
file
cache implementation. - Operators
quote
andunquote
added to thetext
processor. - Configs sent via the streams mode HTTP API are now interpolated with environment variable substitutes.
- All AWS
s3
components now enforce path style syntax for bucket URLs. This improves compatibility with third party endpoints.
- New
parallel
processor.
- The
dynamodb
cacheget
call now correctly reports key not found versus general request error.
- New
sqs_bucket_path
field added tos3
input.
- The
sqs
input now rejects messages that fail by resetting the visibility timeout. - The
sqs
input no longer fails to delete consumed messages when the batch contains duplicate message IDs.
- The
metric
processor no longer mixes label keys when processing across parallel pipelines.
- Comma separated
kafka
andkafka_balanced
address and topic values are now trimmed for whitespace.
- Field
max_processing_period
added tokafka
andkafka_balanced
inputs.
- Compaction intervals are now respected by the
memory
cache type.
- Improved
kafka_balanced
consumer group connection behaviour.
- More
kafka_balanced
input config fields for consumer group timeouts.
- New config interpolation function
uuid_v4
.
- The
while
processor now correctly checks conditions against the first batch of the result of last processor loop.
- Field
max_loops
added towhile
processor.
- New
while
processor.
- New
cache
processor. - New
all
condition. - New
any
condition.
- Function interpolation for field
subject
added tonats
output.
- Switched underlying
kafka_balanced
implementation to sarama consumer.
- Always allow acknowledgement flush during graceful termination.
- Removed unnecessary subscription check from
gcp_pubsub
input.
- New field
fields
added tolog
processor for structured log output.
- Function interpolation for field
channel
added toredis_pubsub
output.
- Field
byte_size
added tosplit
processor.
- Field
dependencies
of children of theprocess_dag
processor now correctly parsed from config files.
- Field
push_job_name
added toprometheus
metrics type. - New
rename
metrics target.
- Removed potential race condition in
process_dag
with raw bytes conditions.
- Field
max_batch_count
added tos3
input. - Field
max_number_of_messages
added tosqs
input.
- New
blacklist
metrics target. - New
whitelist
metrics target. - Initial support for opentracing, including a new
tracer
root component. - Improved generated metrics documentation and config examples.
- The
nats_stream
input now has a fieldunsubscribe_on_close
that when disabled allows durable subscription offsets to persist even when all connections are closed. - Metadata field
nats_stream_sequence
added tonats_stream
input.
- The
subprocess
processor no longer sends unexpected empty lines when messages end with a line break.
- New
switch
processor.
- Printing configs now sanitises resource sections.
- The
headers
field inhttp
configs now detects and applieshost
keys.
- New
json_documents
format added to theunarchive
processor. - Field
push_interval
added to theprometheus
metrics type.
- Brokers now correctly parse configs containing plugin types as children.
- Output broker types now correctly allocates nested processors for
fan_out
andtry
patterns. - JSON formatted loggers now correctly escape error messages with line breaks.
- Improved error logging for
s3
input download failures. - More metadata fields copied to messages from the
s3
input. - Field
push_url
added to theprometheus
metrics target.
- Resources (including plugins) that implement
Closable
are now shutdown cleanly.
- New
json_array
format added to thearchive
andunarchive
processors. - Preliminary support added to the resource manager API to allow arbitrary shared resource plugins.
- The
s3
input now caps and iterates batched SQS deletes.
- The
archive
processor now interpolates thepath
per message of the batch.
- Fixed environment variable interpolation when combined with embedded function interpolations.
- Fixed break down metric indexes for input and output brokers.
- Input
s3
can now toggle the use of a download manager, switching off now downloads metadata from the target file. - Output
s3
now writes metadata to the uploaded file. - Operator
unescape_url_query
added totext
processor.
- The
nats_steam
input and output now actively attempt to recover stale connections. - The
awk
processor prints errors and flags failure when the program exits with a non-zero status.
- The
subprocess
processor now attempts to read all flushed stderr output from a process when it fails.
- Function
print_log
added toawk
processor.
- The
awk
processor functionjson_get
no longer returns string values with quotes.
- Processor
awk
codecs changed.
- Output type
sqs
now supports batched message sends.
- Functions
json_get
andjson_set
added toawk
processor.
- Functions
timestamp_format
,timestamp_format_nano
,metadata_get
andmetadata_set
added toawk
processor.
- New
sleep
processor. - New
awk
processor.
- Converted all integer based time period fields to string based, e.g.
timeout_ms: 5000
would now betimeout: 5s
. This will may potentially be disruptive but the--strict
flag should catch all deprecated fields in an existing config.
- Renamed
max_batch_size
tomax_batch_count
for consistency.
- New
max_batch_size
field added tokafka
,kafka_balanced
andamqp
inputs. This provides a mechanism for creating message batches optimistically.
- New
subprocess
processor.
- API: The
types.Processor
interface has been changed in order to add lifetime cleanup methods (addedCloseAsync
andWaitForClose
). For the overwhelming majority of processors these functions will be no-ops. - More consistent
condition
metrics.
- New
try
andcatch
processors for improved processor error handling.
- All processors now attach error flags.
- S3 input is now more flexible with SNS triggered SQS events.
- Processor metrics have been made more consistent.
- New endpoint
/ready
that returns 200 when both the input and output components are connected, otherwise 503. This is intended to be used as a readiness probe.
- Large simplifications to all metrics paths.
- Fully removed the previously deprecated
combine
processor. - Input and output plugins updated to support new connection health checks.
- Field
role_external_id
added to all S3 credential configs. - New
processor_failed
condition and improved processor error handling which can be read about here
- New
content_type
field for thes3
output.
- New
group_by_value
processor.
- Lint errors are logged (level INFO) during normal Benthos operation.
- New
--strict
command flag which causes Benthos to abort when linting errors are found in a config file.
- New
--lint
command flag for linting config files.
- The
s3
output now attempts to batch uploads. - The
s3
input now exposes errors in deleting SQS messages during acks.
- Resource based conditions no longer benefit from cached results. In practice this optimisation was easy to lose in config and difficult to maintain.
- Metadata is now sent to
kafka
outputs. - New
max_inflight
field added to thenats_stream
input.
- Fixed relative path trimming for streams from file directories.
- The
dynamodb
cache and output types now set TTL columns as unix timestamps.
- New
escape_url_query
operator for thetext
processor.
- Removed submatch indexes in the
text
processorfind_regexp
operator and added documentation for expanding submatches in thereplace_regexp
operator.
- Allow submatch indexes in the
find_regexp
operator for thetext
processor.
- New
find_regexp
operator for thetext
processor.
- New
aws
fields to theelasticsearch
output to allow AWS authentication.
- Add max-outstanding fields to
gcp_pubsub
input. - Add new
dynamodb
output.
- The
s3
output now calculatespath
field function interpolations per message of a batch.
- New
set
operator for thetext
processor.
- New
cache
output type.
- New
group_by
processor. - Add bulk send support to
elasticsearch
output.
- New
content
interpolation function.
- New
redis
cache type.
- The
process_map
processor now allows map target path overrides when a target is the parent of another target.
- Field
pipeline
andsniff
added to theelasticsearch
output. - Operators
to_lower
andto_upper
added to thetext
processor.
- Field
endpoint
added to all AWS types.
- Allow
log
config fieldstatic_fields
to be fully overridden.
- New
process_dag
processor. - New
static_fields
map added to log config for setting static log fields.
- JSON log field containing component path moved from
@service
tocomponent
.
- New
gcp_pubsub
input and outputs. - New
log
processor. - New
lambda
processor.
- New
process_batch
processor. - Added
count
field tobatch
processor. - Metrics for
kinesis
output throttles.
- The
combine
processor is now considered DEPRECATED, please use thebatch
processor instead. - The
batch
processor fieldbyte_size
is now set at 0 (and therefore ignored) by default. A log warning has been added in case anyone was relying on the default.
- New
rate_limit
resource with alocal
type. - Field
rate_limit
added tohttp
based processors, inputs and outputs.
- New
prefetch_count
field added tonats
input.
- New
bounds_check
condition type. - New
check_field
condition type. - New
queue
field added tonats
input. - Function interpolation for the
topic
field of thensq
output.
- The
nats
input now defaults to joining a queue.
- The redundant
nsq
output fieldmax_in_flight
has been removed. - The
files
output now interpolates paths per message part of a batch.
- New
hdfs
input and output. - New
switch
output. - New
enum
andhas_prefix
operators for themetadata
condition. - Ability to set
tls
client certificate fields directly.
- New
retry
output. - Added
regex_partial
andregex_exact
operators to themetadata
condition.
- The
kinesis
output fieldretries
has been renamedmax_retries
in order to expose the difference in its zero value behaviour (endless retries) versus otherretry
fields (zero retries).
- New
endpoint
field added tokinesis
input. - New
dynamodb
cache type.
- Function interpolation for the
topic
field of thekafka
output. - New
target_version
field for thekafka_balanced
input. - TLS config fields for client certificates.
- TLS config field
cas_file
has been renamedroot_cas_file
.
- New
zip
option for thearchive
andunarchive
processors.
- The
kinesis
output type now supports batched sends and per message interpolation.
- New
metric
processor.
- New
redis_streams
input and output.
- New
kinesis
input and output.
- The
index
field of theelasticsearch
output can now be dynamically set using function interpolation. - New
hash
processor.
- API: The
metrics.Type
interface has been changed in order to add labels.
- Significant restructuring of
amqp
inputs and outputs. These changes should be backwards compatible for existing pipelines, but changes the way in which queues, exchanges and bindings are declared using these types.
- New durable fields for
amqp
input and output types.
- Improved statsd client with better cached aggregation.
- New
tls
fields foramqp
input and output types.
- New
type
field forelasticsearch
output.
- New
throttle
processor.
- New
less_than
andgreater_than
operators formetadata
condition.
- New
metadata
condition type. - More metadata fields for
kafka
input. - Field
commit_period_ms
forkafka
andkafka_balanced
inputs for specifying a commit period.
- New
retries
field tos3
input, to cap the number of download attempts made on the same bucket item. - Added metadata based mechanism to detect final message from a
read_until
input. - Added field to
split
processor for specifying target batch sizes.
- Metadata fields are now per message part within a batch.
- New
metadata_json_object
function interpolation to return a JSON object of metadata key/value pairs.
- The
metadata
function interpolation now allows part indexing and no longer returns a JSON object when no key is specified, this behaviour can now be done using themetadata_json_object
function.
- Fields for the
http
processor to enable parallel requests from message batches.
- Broker level output processors are now applied before the individual output processors.
- The
dynamic
input and output HTTP paths for CRUD operations are now/inputs/{input_id}
and/outputs/{output_id}
respectively. - Removed deprecated
amazon_s3
,amazon_sqs
andscalability_protocols
input and output types. - Removed deprecated
json_fields
field from thededupe
processor.
- Add conditions to
process_map
processor.
- TLS config fields have been cleaned up for multiple types. This affects
the
kafka
,kafka_balanced
andhttp_client
input and output types, as well as thehttp
processor type.
- New
delete_all
anddelete_prefix
operators formetadata
processor. - More metadata fields extracted from the AMQP input.
- HTTP clients now support function interpolation on the URL and header values,
this includes the
http_client
input and output as well as thehttp
processor.
- New
key
field added to thededupe
processor, allowing you to deduplicate using function interpolation. This deprecates thejson_paths
array field.
- New
s3
andsqs
input and output types, these replace the now deprecatedamazon_s3
andamazon_sqs
types respectively, which will eventually be removed. - New
nanomsg
input and output types, these replace the now deprecatedscalability_protocols
types, which will eventually be removed.
- Metadata fields are now collected from MQTT input.
- AMQP output writes all metadata as headers.
- AMQP output field
key
now supports function interpolation.
- New
metadata
processor and configuration interpolation function.
- New config interpolator function
json_field
for extracting parts of a JSON message into a config value.
- Log level config field no longer stutters,
logger.log_level
is nowlogger.level
.
- Ability to create batches via conditions on message payloads in the
batch
processor. - New
--examples
flag for generating specific examples from Benthos.
- New
text
processor.
- Processor
process_map
replaced fieldstrict_premapping
withpremap_optional
.
- New
process_field
processor. - New
process_map
processor.
- Removed mapping fields from the
http
processor, this behaviour has been put into the newprocess_map
processor instead.
- Renamed
content
condition type totext
in order to clarify its purpose.
- Latency metrics for caches.
- TLS options for
kafka
andkafka_partitions
inputs and outputs.
- Metrics for items configured within the
resources
section are now namespaced under their identifier.
- New
copy
andmove
operators for thejson
processor.
- Metrics for recording
http
request latencies.
- Improved and rearranged fields for
http_client
input and output.
- More compression and decompression targets.
- New
lines
option for archive/unarchive processors. - New
encode
anddecode
processors. - New
period_ms
field for thebatch
processor. - New
clean
operator for thejson
processor.
- New
http
processor, where payloads can be sent to arbitrary HTTP endpoints and the result constructed into a new payload. - New
inproc
inputs and outputs for linking streams together.
- New streams endpoint
/streams/{id}/stats
for obtaining JSON metrics for a stream.
- Allow comma separated topics for
kafka_balanced
.
- Support for PATCH verb on the streams mode
/streams/{id}
endpoint.
- Sweeping changes were made to the environment variable configuration file. This file is now auto generated along with its supporting document. This change will impact the docker image.
- New
filter_parts
processor for filtering individual parts of a message batch. - New field
open_message
forwebsocket
input.
- No longer setting default input processor.
- New
root_path
field for service widehttp
config.
- New
regexp_exact
andregexp_partial
content condition operators.
- The
statsd
metrics target will now periodically report connection errors.
- The
json
processor will nowappend
array values in expanded form.
- More granular config options in the
http_client
output for controlling retry logic. - New
try
pattern for the outputbroker
type, which can be used in order to configure fallback outputs. - New
json
processor, this replacesdelete_json
,select_json
,set_json
.
- The
streams
API endpoints have been changed to become more "RESTy". - Removed the
delete_json
,select_json
andset_json
processors, please use thejson
processor instead.
- New
grok
processor for creating structured objects from unstructured data.
- New
files
input type for reading multiple files as discrete messages.
- Increase default
max_buffer
forstdin
,file
andhttp_client
inputs. - Command flags
--print-yaml
and--print-json
changed to provide sanitised outputs unless accompanied by new--all
flag.
- Badger based buffer option has been removed.
- New metrics wrapper for more basic interface implementations.
- New
delete_json
processor. - New field
else_processors
forconditional
processor.
- New websocket endpoint for
http_server
input. - New websocket endpoint for
http_server
output. - New
websocket
input type. - New
websocket
output type.
- Goreleaser config for generating release packages.
- Back to using Scratch as base for Docker image, instead taking ca-certificates from the build image.
- New
batch
processor for combining payloads up to a number of bytes. - New
conditional
processor, allows you to configure a chain of processors to only be run if the payload passes acondition
. - New
--stream
mode features:- POST verb for
/streams
path now supported. - New
--streams-dir
flag for parsing a directory of stream configs.
- POST verb for
- The
condition
processor has been renamedfilter
. - The
custom_delimiter
fields in any line reader typesfile
,stdin
,stdout
, etc have been renameddelimiter
, where the behaviour is the same. - Now using Alpine as base for Docker image, includes ca-certificates.