Skip to content

Commit

Permalink
remove nullable types from kxs serdes, standardise the formatting a b…
Browse files Browse the repository at this point in the history
…it, and 'cache' the serializers (maybe better performance?) (#14)
  • Loading branch information
aSemy authored Mar 11, 2022
1 parent ad21724 commit 468e073
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer


inline fun <reified T> BinaryFormat.kafkaSerializer() =
Serializer { topic: String, data: T? ->
inline fun <reified T> BinaryFormat.kafkaSerializer(): Serializer<T> =
Serializer { topic: String, data: T ->
runCatching {
encodeToByteArray(data)
}.getOrElse { e ->
Expand All @@ -27,7 +27,8 @@ inline fun <reified T> BinaryFormat.kafkaSerializer() =
}
}

inline fun <reified T> BinaryFormat.kafkaDeserializer() =

inline fun <reified T> BinaryFormat.kafkaDeserializer(): Deserializer<T> =
Deserializer { topic: String, data: ByteArray ->
runCatching {
decodeFromByteArray<T>(data)
Expand All @@ -46,10 +47,14 @@ inline fun <reified T> BinaryFormat.kafkaDeserializer() =
}


inline fun <reified T> BinaryFormat.serde() = object : Serde<T> {
override fun serializer(): Serializer<T?> = kafkaSerializer()
override fun deserializer(): Deserializer<T> = kafkaDeserializer()
}
inline fun <reified T> BinaryFormat.serde(): Serde<T> =
object : Serde<T> {
private val serializer: Serializer<T> = kafkaSerializer()
private val deserializer: Deserializer<T> = kafkaDeserializer()
override fun serializer(): Serializer<T> = serializer
override fun deserializer(): Deserializer<T> = deserializer
}


inline fun <reified K, reified V> BinaryFormat.keyValueSerdes(): KeyValueSerdes<K, V> =
KeyValueSerdes(serde(), serde())
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer


inline fun <reified T> StringFormat.kafkaSerializer() =
Serializer { topic: String, data: T? ->
inline fun <reified T> StringFormat.kafkaSerializer(): Serializer<T> =
Serializer { topic: String, data: T ->
runCatching {
encodeToString(data).encodeToByteArray()
// jsonMapper.encodeToString(data as T).encodeToByteArray()
}.getOrElse { e ->
println(
"""
Expand All @@ -28,7 +27,8 @@ inline fun <reified T> StringFormat.kafkaSerializer() =
}
}

inline fun <reified T> StringFormat.kafkaDeserializer() =

inline fun <reified T> StringFormat.kafkaDeserializer(): Deserializer<T> =
Deserializer { topic: String, data: ByteArray ->
runCatching {
decodeFromString<T>(data.decodeToString())
Expand All @@ -46,10 +46,14 @@ inline fun <reified T> StringFormat.kafkaDeserializer() =
}
}

inline fun <reified T> StringFormat.serde() = object : Serde<T> {
override fun serializer(): Serializer<T?> = kafkaSerializer()
override fun deserializer(): Deserializer<T> = kafkaDeserializer()
}

inline fun <reified T> StringFormat.serde(): Serde<T> =
object : Serde<T> {
private val serializer: Serializer<T> = kafkaSerializer()
private val deserializer: Deserializer<T> = kafkaDeserializer()
override fun serializer(): Serializer<T> = serializer
override fun deserializer(): Deserializer<T> = deserializer
}


inline fun <reified K, reified V> StringFormat.keyValueSerdes(): KeyValueSerdes<K, V> =
Expand Down

0 comments on commit 468e073

Please sign in to comment.