From 0129ea0d78396a121de672f3c396a5ca776f20dd Mon Sep 17 00:00:00 2001 From: Wan Lee Date: Wed, 19 Aug 2020 13:38:43 -0400 Subject: [PATCH] Add mechanism to tear down KafkaMonitor threads (#77) --- CHANGELOG.md | 5 +++++ README.md | 2 +- src/FsKafka/Monitor.fs | 21 +++++++++++++-------- tests/FsKafka.Integration/Integration.fs | 2 +- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65ae157..3b8e1a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,14 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added ### Changed + +- BREAKING: The name of the start function in FsKafka.KafkaMonitor `StartAsChild` was changed to `Start`, changed the return type from to `IDisposable` + ### Removed ### Fixed +- Provided the mechanism to tear down the monitoring loop when the consumer stops + ## [1.5.1] - 2020-08-04 diff --git a/README.md b/README.md index c6be394..240d3e4 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,7 @@ let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "M async { use consumer = BatchedConsumer.Start(log, cfg, handler) - do! KafkaMonitor(log).StartAsChild(consumer.Inner, cfg.Inner.GroupId) + use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId) return! consumer.AwaitCompletion() } |> Async.RunSynchronously ``` diff --git a/src/FsKafka/Monitor.fs b/src/FsKafka/Monitor.fs index 9074126..70fbf15 100644 --- a/src/FsKafka/Monitor.fs +++ b/src/FsKafka/Monitor.fs @@ -6,6 +6,7 @@ open Confluent.Kafka open Serilog open System open System.Diagnostics +open System.Threading type PartitionResult = | OkReachedZero // check 1 @@ -346,18 +347,22 @@ type KafkaMonitor<'k,'v> [] member __.OnCheckFailed = onCheckFailed.Publish // One of these runs per topic - member private __.Pump(consumer, topic, group) = + member private __.Pump(consumer, topic, group) = async { + let! ct = Async.CancellationToken let onQuery res = MonitorImpl.Logging.logLatest log topic group res let onStatus topic group xs = MonitorImpl.Logging.logResults log topic group xs onStatus.Trigger(topic, xs) let onCheckFailed count exn = - MonitorImpl.Logging.logFailure log topic group count exn - onCheckFailed.Trigger(topic, count, exn) - MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus) - - /// Commences a child task per subscribed topic that will ob - member __.StartAsChild(target : IConsumer<'k,'v>, group) = async { + if not ct.IsCancellationRequested then + MonitorImpl.Logging.logFailure log topic group count exn + onCheckFailed.Trigger(topic, count, exn) + return! MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus) + } + /// Commences a monitoring task per subscribed topic + member __.Start(target : IConsumer<'k,'v>, group) = + let cts = new CancellationTokenSource() for topic in target.Subscription do - Async.Start(__.Pump(target, topic, group)) } + Async.Start(__.Pump(target, topic, group), cts.Token) + { new IDisposable with member __.Dispose() = cts.Cancel() } diff --git a/tests/FsKafka.Integration/Integration.fs b/tests/FsKafka.Integration/Integration.fs index becb180..e82bcbc 100644 --- a/tests/FsKafka.Integration/Integration.fs +++ b/tests/FsKafka.Integration/Integration.fs @@ -410,7 +410,7 @@ type T4(testOutputHelper) = let! res = async { use consumer = BatchedConsumer.Start(log, consumerCfg, handle) consumer.StopAfter (TimeSpan.FromSeconds 20.) - do! FsKafka.KafkaMonitor(log).StartAsChild(consumer.Inner, consumerCfg.Inner.GroupId) + use _ = FsKafka.KafkaMonitor(log).Start(consumer.Inner, consumerCfg.Inner.GroupId) return! consumer.AwaitCompletion() |> Async.Catch }