Skip to content

Commit

Permalink
Add mechanism to tear down KafkaMonitor threads (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
wantastic84 authored and bartelink committed Aug 19, 2020
1 parent 398c4cb commit 0129ea0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- BREAKING: The name of the start function in FsKafka.KafkaMonitor `StartAsChild` was changed to `Start`, changed the return type from to `IDisposable`

### Removed
### Fixed

- Provided the mechanism to tear down the monitoring loop when the consumer stops

<a name="1.5.1"></a>
## [1.5.1] - 2020-08-04

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "M
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
do! KafkaMonitor(log).StartAsChild(consumer.Inner, cfg.Inner.GroupId)
use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
return! consumer.AwaitCompletion()
} |> Async.RunSynchronously
```
21 changes: 13 additions & 8 deletions src/FsKafka/Monitor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ open Confluent.Kafka
open Serilog
open System
open System.Diagnostics
open System.Threading

type PartitionResult =
| OkReachedZero // check 1
Expand Down Expand Up @@ -346,18 +347,22 @@ type KafkaMonitor<'k,'v>
[<CLIEvent>] member __.OnCheckFailed = onCheckFailed.Publish

// One of these runs per topic
member private __.Pump(consumer, topic, group) =
member private __.Pump(consumer, topic, group) = async {
let! ct = Async.CancellationToken
let onQuery res =
MonitorImpl.Logging.logLatest log topic group res
let onStatus topic group xs =
MonitorImpl.Logging.logResults log topic group xs
onStatus.Trigger(topic, xs)
let onCheckFailed count exn =
MonitorImpl.Logging.logFailure log topic group count exn
onCheckFailed.Trigger(topic, count, exn)
MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus)

/// Commences a child task per subscribed topic that will ob
member __.StartAsChild(target : IConsumer<'k,'v>, group) = async {
if not ct.IsCancellationRequested then
MonitorImpl.Logging.logFailure log topic group count exn
onCheckFailed.Trigger(topic, count, exn)
return! MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus)
}
/// Commences a monitoring task per subscribed topic
member __.Start(target : IConsumer<'k,'v>, group) =
let cts = new CancellationTokenSource()
for topic in target.Subscription do
Async.Start(__.Pump(target, topic, group)) }
Async.Start(__.Pump(target, topic, group), cts.Token)
{ new IDisposable with member __.Dispose() = cts.Cancel() }
2 changes: 1 addition & 1 deletion tests/FsKafka.Integration/Integration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ type T4(testOutputHelper) =
let! res = async {
use consumer = BatchedConsumer.Start(log, consumerCfg, handle)
consumer.StopAfter (TimeSpan.FromSeconds 20.)
do! FsKafka.KafkaMonitor(log).StartAsChild(consumer.Inner, consumerCfg.Inner.GroupId)
use _ = FsKafka.KafkaMonitor(log).Start(consumer.Inner, consumerCfg.Inner.GroupId)
return! consumer.AwaitCompletion() |> Async.Catch
}

Expand Down

0 comments on commit 0129ea0

Please sign in to comment.