Skip to content

Commit

Permalink
Null safety (#12)
Browse files Browse the repository at this point in the history
mark some values as nullable
  • Loading branch information
aSemy authored Mar 9, 2022
1 parent a8758d5 commit 1a2c705
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ fun <K, V, otherV, outV> KTable<K, V>.outerJoin(
}


// the value of the resulting KStream is nullable, because it includes record deletions.
fun <inK, V, outK> KTable<inK, V>.toStream(
name: String,
mapper: KeyValueMapper<inK, V, outK>? = null,
): KStream<outK, V> = toStream(mapper, namedAs(name))
): KStream<outK, V?> = toStream(mapper, namedAs(name))


fun <K, V> KTable<K, V>.toStream(name: String): KStream<K, V> = toStream(namedAs(name))
// the value of the resulting KStream is nullable, because it includes record deletions.
fun <K, V> KTable<K, V>.toStream(name: String): KStream<K, V?> = toStream(namedAs(name))


fun <K, V> KTable<K, V>.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.kafka.common.serialization.Serializer


inline fun <reified T> BinaryFormat.kafkaSerializer() =
Serializer { topic: String, data: T ->
Serializer { topic: String, data: T? ->
runCatching {
encodeToByteArray(data)
}.getOrElse { e ->
Expand Down Expand Up @@ -47,7 +47,7 @@ inline fun <reified T> BinaryFormat.kafkaDeserializer() =


inline fun <reified T> BinaryFormat.serde() = object : Serde<T> {
override fun serializer(): Serializer<T> = kafkaSerializer()
override fun serializer(): Serializer<T?> = kafkaSerializer()
override fun deserializer(): Deserializer<T> = kafkaDeserializer()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.kafka.common.serialization.Serializer


inline fun <reified T> StringFormat.kafkaSerializer() =
Serializer { topic: String, data: T ->
Serializer { topic: String, data: T? ->
runCatching {
encodeToString(data).encodeToByteArray()
// jsonMapper.encodeToString(data as T).encodeToByteArray()
Expand Down Expand Up @@ -47,7 +47,7 @@ inline fun <reified T> StringFormat.kafkaDeserializer() =
}

inline fun <reified T> StringFormat.serde() = object : Serde<T> {
override fun serializer(): Serializer<T> = kafkaSerializer()
override fun serializer(): Serializer<T?> = kafkaSerializer()
override fun deserializer(): Deserializer<T> = kafkaDeserializer()
}

Expand Down

0 comments on commit 1a2c705

Please sign in to comment.