A mock for the Kinesis API, intended for local testing.
It is available as a docker image in the GitHub Container Registry:
docker pull ghcr.io/etspaceman/kinesis-mock:0.2.6
docker run -p 4567:4567 -p 4568:4568 ghcr.io/etspaceman/kinesis-mock:0.2.6
You can also leverage the following executable options in the release assets:
File | Description | Launching |
---|---|---|
kinesis-mock.jar | Executable JAR file that can be run in any environment with JDK 11 | java -jar ./kinesis-mock.jar |
kinesis-mock-linux-amd64-dynamic | GraalVM Native Image executable for Linux. Loads dependencies like libc at runtime. Best for non-docker runtimes. | ./kinesis-mock-linux-amd64-dynamic |
kinesis-mock-linux-amd64-static | GraalVM Native Image executable for Linux. All dependencies are statically provided. Good for docker images. | ./kinesis-mock-linux-amd64-static |
kinesis-mock-linux-amd64-mostly-static | GraalVM Native Image executable for Linux. Most dependencies are statically provided, except libc. Good for docker images. | ./kinesis-mock-linux-amd64-static |
kinesis-mock-macos-amd64-dynamic | GraalVM Native Image executable for MacOS. Loads dependencies like libc at runtime. Best for non-docker runtimes. | ./kinesis-mock-macos-amd64-dynamic |
kinesis-mock-dynamic.exe | GraalVM Native Image executable for Windows. Loads dependencies like libc at runtime. Best for non-docker runtimes. | ./kinesis-mock-dynamic.exe |
kinesis-mock-mostly-static.exe | GraalVM Native Image executable for Windows. Most dependencies are statically provided, except libc. Good for docker images. | ./kinesis-mock-mostly-static.exe |
See the GraalVM documentation for more information about static vs non-static Native Image distributions.
Below is the available configuration for the service. Note that it is not recommended to edit the ports in the docker environment (rather you can map these ports to a local one).
Variable | Data Type | Default Value | Notes |
---|---|---|---|
INITIALIZE_STREAMS | String | A comma-delimited string of stream names, its optional corresponding shard count and an optional region to initialize during startup. If the shard count is not provided, the default shard count of 4 is used. If the region is not provided, the default region is used. For example: "my-first-stream:1,my-other-stream::us-west-2,my-last-stream:1" | |
KINESIS_MOCK_TLS_PORT | Int | 4567 | Https Only |
KINESIS_MOCK_PLAIN_PORT | Int | 4568 | Http Only |
CREATE_STREAM_DURATION | Duration | 500ms | |
DELETE_STREAM_DURATION | Duration | 500ms | |
REGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
START_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
STOP_STREAM_ENCRYPTION_DURATION | Duration | 500ms | |
DEREGISTER_STREAM_CONSUMER_DURATION | Duration | 500ms | |
MERGE_SHARDS_DURATION | Duration | 500ms | |
SPLIT_SHARD_DURATION | Duration | 500ms | |
UPDATE_SHARD_COUNT_DURATION | Duration | 500ms | |
SHARD_LIMIT | Int | 50 | |
AWS_ACCOUNT_ID | String | "000000000000" | |
AWS_REGION | String | "us-east-1" | Default region in use for operations. E.g. if a region is not provided by the INITIALIZE_STREAMS values. |
LOG_LEVEL | String | "INFO" | Sets the log-level for kinesis-mock specific logs |
ROOT_LOG_LEVEL | String | "ERROR" | Sets the log-level for all dependencies |
LOAD_DATA_IF_EXISTS | Boolean | true | Loads data from the configured persisted data file if it exists |
SHOULD_PERSIST_DATA | Boolean | false | Persists data to disk. Used to keep data during restarts of the service |
PERSIST_PATH | String | "data" | Path to persist data to. If it doesn't start with "/", the path is considered relative to the present working directory. |
PERSIST_FILE_NAME | String | "kinesis-data.json" | File name for persisted data |
PERSIST_INTERVAL | Duration | 5s | Delay between data persistence |
You can configure the LOG_LEVEL
of the mock with the following levels in mind:
ERROR
- Unhandled errors in the serviceWARN
- Handled errors in the service (e.g. bad requests)INFO
- High-level, low-noise informational messages (default)DEBUG
- Low-level, high-noise informational messagesTRACE
- Log data bodies going in / out of the service
The image exposes 2 ports for interactions:
- 4567 (https)
- 4568 (http)
For an example docker-compose setup which uses this image, check out the docker-compose.yml file.
There are examples configuring the KPL, KCL and AWS SDK to use this mock in the functional tests.
import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider}
import software.amazon.awssdk.http.SdkHttpConfigurationOption
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model._
import software.amazon.awssdk.utils.AttributeMap
object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AwsCredentials
with AwsCredentialsProvider {
override def accessKeyId(): String = accessKey
override def secretAccessKey(): String = secretKey
override def resolveCredentials(): AwsCredentials = this
}
object AwsCreds {
val LocalCreds: AwsCreds =
AwsCreds("mock-kinesis-access-key", "mock-kinesis-secret-key")
}
// The kinesis-mock uses a self-signed certificate
private val trustAllCertificates =
AttributeMap
.builder()
.put(
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
java.lang.Boolean.TRUE
)
.build()
def nettyClient: SdkAsyncHttpClient =
NettyNioAsyncHttpClient
.builder()
.buildWithDefaults(trustAllCertificates)
val kinesisClient: KinesisAsyncClient =
KinesisAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"https://localhost:4567"))
.build()
}
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AWSCredentials
with AWSCredentialsProvider {
override def getAWSAccessKeyId: String = accessKey
override def getAWSSecretKey: String = secretKey
override def getCredentials: AWSCredentials = this
override def refresh(): Unit = ()
}
object AwsCreds {
val LocalCreds: AwsCreds =
AwsCreds("mock-kinesis-access-key", "mock-kinesis-secret-key")
}
val kplProducer = new KinesisProducer(
new KinesisProducerConfiguration()
.setCredentialsProvider(AwsCreds.LocalCreds)
.setRegion(Regions.US_EAST_1.getName)
.setKinesisEndpoint("localhost")
.setKinesisPort(4567L)
.setCloudwatchEndpoint("localhost")
.setCloudwatchPort(4566L) // Using localstack's Cloudwatch port
.setVerifyCertificate(false)
)
}
import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider}
import software.amazon.awssdk.http.SdkHttpConfigurationOption
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model._
import software.amazon.awssdk.utils.AttributeMap
import software.amazon.kinesis.checkpoint.CheckpointConfig
import software.amazon.kinesis.common._
import software.amazon.kinesis.coordinator.{CoordinatorConfig, Scheduler}
import software.amazon.kinesis.leases.LeaseManagementConfig
import software.amazon.kinesis.lifecycle.LifecycleConfig
import software.amazon.kinesis.lifecycle.events._
import software.amazon.kinesis.metrics.MetricsConfig
import software.amazon.kinesis.processor._
import software.amazon.kinesis.retrieval.polling.PollingConfig
import software.amazon.kinesis.retrieval.RetrievalConfig
object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AwsCredentials
with AwsCredentialsProvider {
override def accessKeyId(): String = accessKey
override def secretAccessKey(): String = secretKey
override def resolveCredentials(): AwsCredentials = this
}
object AwsCreds {
val LocalCreds: AwsCreds =
AwsCreds("mock-kinesis-access-key", "mock-kinesis-secret-key")
}
// The kinesis-mock uses a self-signed certificate
private val trustAllCertificates =
AttributeMap
.builder()
.put(
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
java.lang.Boolean.TRUE
)
.build()
def nettyClient: SdkAsyncHttpClient =
NettyNioAsyncHttpClient
.builder()
.buildWithDefaults(trustAllCertificates)
val kinesisClient: KinesisAsyncClient =
KinesisAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"https://localhost:4567"))
.build()
val cloudwatchClient: CloudWatchAsyncClient -
CloudWatchAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"https://localhost:4566")) // localstack port
.build()
val dynamoClient: DynamoDbAsyncClient -
DynamoDbAsyncClient
.builder()
.httpClient(nettyClient)
.region(Region.US_EAST_1)
.credentialsProvider(AwsCreds.LocalCreds)
.endpointOverride(URI.create(s"http://localhost:8000")) // dynamodb-local port
.build()
object KCLRecordProcessor extends ShardRecordProcessor {
override def initialize(x: InitializationInput): Unit = ()
override def processRecords(x: ProcessRecordsInput): Unit = println(s"GOT RECORDS: $x")
override def leaseLost(x: LeaseLostInput): Unit = ()
override def shardEnded(x: ShardEndedInput): Unit = ()
override def shutdownRequested(x: ShutdownRequestedInput): Unit = ()
}
object KCLRecordProcessorFactory extends ShardRecordProcessorFactory {
override def shardRecordProcessor(): ShardRecordProcessor =
KCLRecordProcessor
override def shardRecordProcessor(
streamIdentifier: StreamIdentifier
): ShardRecordProcessor = KCLRecordProcessor
}
val appName = "some-app-name"
val workerId = "some-worker-id"
val streamName = "some-stream-name"
// kinesis-mock only supports polling consumers today
val retrievalSpecificConfig = new PollingConfig(streamName, kinesisClient)
// Consumer can be executed from this by running scheduler.run()
val scheduler = new Scheduler(
new CheckpointConfig(),
new CoordinatorConfig(appName)
.parentShardPollIntervalMillis(1000L),
new LeaseManagementConfig(
appName,
dynamoClient,
kinesisClient,
workerId
).shardSyncIntervalMillis(1000L),
new LifecycleConfig(),
new MetricsConfig(cloudwatchClient, appName),
new ProcessorConfig(KCLRecordProcessorFactory),
new RetrievalConfig(
kinesisClient,
streamName,
appName
).initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
).retrievalSpecificConfig(retrievalSpecificConfig)
.retrievalFactory(retrievalSpecificConfig.retrievalFactory())
)
}
- Does not currently support Http2 requests (http4s/http4s#4707)
- Does not currently support SubscribeToShard due to lack of push-promise support (https://github.com/http4s/http4s/issues/4624)