-
Notifications
You must be signed in to change notification settings - Fork 19
/
Examples.fsx
executable file
·513 lines (404 loc) · 24.9 KB
/
Examples.fsx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
// Compile the fsproj by either a) right-clicking or b) typing
// dotnet build tests/FsCodec.SystemTextJson.Tests before attempting to send this to FSI with Alt-Enter
#if !USE_LOCAL_BUILD
(* Rider's FSI is not happy without the explicit references :shrug: *)
#I "bin/Debug/net6.0"
#r "FsCodec.dll"
#r "System.Text.Json.dll"
#r "FsCodec.SystemTextJson.dll"
#r "TypeShape.dll"
#r "FSharp.UMX.dll"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
#else
#r "nuget: FsCodec.SystemTextJson, *-*"
#r "nuget: Serilog.Sinks.Console"
#endif
open FsCodec.SystemTextJson
type JsonConverterAttribute = System.Text.Json.Serialization.JsonConverterAttribute
open System
module Contract =
type Item = { value: string option }
// No special policies required as we are using standard types
let private serdes = Serdes.Default
let serialize (x: Item) = serdes.Serialize x
let deserialize (json: string) = serdes.Deserialize json
module Contract2 =
type TypeThatRequiresMyCustomConverter = { mess: int }
type MyCustomConverter() = inherit JsonPickler<string>() override _.Read(_,_) = "" override _.Write(_,_,_) = ()
// NOTE: Pascal-cased field that needs to be converted to camelCase, see `camelCase = true`
type Item = { Value: string option; other: TypeThatRequiresMyCustomConverter }
// Note we add a custom converter here
let private options = Options.Create(converters = [| MyCustomConverter() |], camelCase = true)
let private serdes = Serdes options
let serialize (x: Item) = serdes.Serialize x
let deserialize (json: string) = serdes.Deserialize json
let serdes = Serdes.Default
(* Global vs local Converters
It's recommended to avoid global converters, for at least the following reasons:
- they're less efficient
- they're more easy to get wrong if you have the wrong policy in place
- Explicit is better than implicit *)
type GuidConverter() =
inherit JsonIsomorphism<Guid, string>()
override _.Pickle g = g.ToString "N"
override _.UnPickle g = Guid.Parse g
type WithEmbeddedGuid = { a: string; [<System.Text.Json.Serialization.JsonConverter(typeof<GuidConverter>)>] b: Guid }
serdes.Serialize { a = "testing"; b = Guid.Empty }
// {"a":"testing","b":"00000000000000000000000000000000"}
serdes.Serialize Guid.Empty
// "00000000-0000-0000-0000-000000000000"
let serdesWithGuidConverter = Options.Create(converters = [| GuidConverter() |]) |> Serdes
serdesWithGuidConverter.Serialize Guid.Empty
// 00000000000000000000000000000000
(* TypeSafeEnumConverter basic usage *)
// Without any converters in force, Serdes exposes System.Text.Json's internal behavior, which throws:
type Status = Initial | Active
type StatusMessage = { name: string option; status: Status }
let status = { name = None; status = Initial }
serdes.Serialize status
// System.NotSupportedException: F# discriminated union serialization is not supported. Consider authoring a custom converter for the type.
// at System.Text.Json.Serialization.Converters.FSharpTypeConverterFactory.CreateConverter(Type typeToConvert, JsonSerializerOptions options)
// NOTE: Every Nullary Union Type needs a specific instantiation of the generic converter registered:
let serdesWithConverter = Serdes <| Options.Create(TypeSafeEnumConverter<Status>())
serdesWithConverter.Serialize status
// "{"name":null,"status":"Initial"}"
// Recommended approach is to tag each type:
[<System.Text.Json.Serialization.JsonConverter(typeof<TypeSafeEnumConverter<Status2>>)>]
type Status2 = Initial | Active
type StatusMessage2 = { name: string option; status: Status2 }
let status2 = { name = None; status = Initial }
serdes.Serialize status2
// The equivalent of registering a single global TypeSafeEnumConverter is the `autoTypeSafeEnumToJsonString` option:
let options = Options.Create(autoTypeSafeEnumToJsonString = true, rejectNullStrings = true)
let serdes3 = Serdes options
type Status3 = Initial | Active
type StatusMessage3 = { name: string option; status: Status3 }
let status3 = { name = None; status = Initial }
serdes3.Serialize status3
// "{"name":null,"status":"Initial"}"
[<JsonConverter(typeof<TypeSafeEnumConverter<Outcome>>)>]
type Outcome = Joy | Pain | Misery
type Message = { name: string option; outcome: Outcome }
let value = { name = Some null; outcome = Joy}
serdes.Serialize value
// {"name":null,"outcome":"Joy"}
serdes.Deserialize<Message> """{"name":null,"outcome":"Joy"}"""
// val it : Message = {name = None; outcome = Joy;}
// By design, we throw when a value is unknown. Often this is the correct design.
// If, and only if, your software can do something useful with catch-all case, see the technique in `OutcomeWithOther`
try serdes.Deserialize<Message> """{"name":null,"outcome":"Discomfort"}""" with e -> printf "%A" e; Unchecked.defaultof<Message>
// System.Collections.Generic.KeyNotFoundException: Could not find case 'Discomfort' for type 'FSI_0012+Outcome'
(* TypeSafeEnumConverter fallback
While, in general, one wants to version contracts such that invalid values simply don't arise,
in some cases you want to explicitly handle out of range values.
Here we implement a converter as a JsonIsomorphism to achieve such a mapping *)
[<JsonConverter(typeof<OutcomeWithCatchAllConverter>)>]
type OutcomeWithOther = Joy | Pain | Misery | Other
and OutcomeWithCatchAllConverter() =
inherit JsonIsomorphism<OutcomeWithOther, string>()
override _.Pickle v =
FsCodec.TypeSafeEnum.toString v
override _.UnPickle json =
json
|> FsCodec.TypeSafeEnum.tryParse<OutcomeWithOther>
|> Option.defaultValue Other
type Message2 = { name: string option; outcome: OutcomeWithOther }
let value2 = { name = Some null; outcome = Joy}
serdes.Serialize value2
// {"name":null,"outcome":"Joy"}
serdes.Deserialize<Message2> """{"name":null,"outcome":"Joy"}"""
// val it : Message = {name = None; outcome = Joy;}
serdes.Deserialize<Message2> """{"name":null,"outcome":"Discomfort"}"""
// val it : Message = {name = None; outcome = Other;}
(* Illustrating usage of IEventCodec and its accompanying active patterns *)
module Store =
type Event = FsCodec.ITimelineEvent<EventBody>
// Many stores use a ReadOnlyMemory<byte> to represent a UTF-8 encoded JSON event body
// System.Text.Json.JsonElement can be a useful alternative where the store is JSON based
and EventBody = ReadOnlyMemory<byte>
and Codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> = FsCodec.IEventCodec<'E, EventBody, unit>
// Opt in to:
// - mapping Type Safe Enums (F# Unions where the cases have no bodies) to/from Strings
// - mapping other F# Unions using the UnionConverter with default settings
// TOCONSIDER avoid using this automatic behavior, and instead let the exception that System.Text.Json
// produces trigger adding a JsonConverterAttribute for each type as Documentation
let private options = Options.Create(autoTypeSafeEnumToJsonString = true, autoUnionToJsonObject = true)
let serdes = Serdes options
let codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Codec<'E> =
// NOTE: if EventBody is System.Text.Json, use CodecJsonElement instead
Codec.Create(serdes = serdes)
(* For these tests, we want to use a strongly typed ClientId
There are other tests that show other ways to manage this, but FSharp.UMX is clean and safe default *)
open FSharp.UMX
type ClientId = string<clientId>
and [<Measure>] clientId
module ClientId =
let parse (str: string): ClientId = % str
let toString (value: ClientId): string = % value
let (|Parse|) = parse
(* Stream id generation/parsing logic. Normally kept private; Reactions module exposes relevant parsers to the wider application *)
module private Stream =
// By convention, each contract defines a 'category' used as the first part of the stream name (e.g. `"Favorites-ClientA"`)
let [<Literal>] Category = "Favorites"
/// Generates a strongly typed StreamId from the supplied Id
let id: ClientId -> FsCodec.StreamId = FsCodec.StreamId.gen ClientId.toString
/// Maps from an app level identifier to a stream name as used when storing events in that stream
/// Not normally necessary - typically you generate StreamIds, and you'll load from something that knows the Category
let name: ClientId -> FsCodec.StreamName = id >> FsCodec.StreamName.create Category
/// Inverse of `id`; decodes a StreamId into its constituent parts; throws if the presented StreamId does not adhere to the expected format
let decodeId: FsCodec.StreamId -> ClientId = FsCodec.StreamId.dec ClientId.parse
/// Inspects a stream name; if for this Category, decodes the elements into application level ids. Throws if it's malformed.
let decode: FsCodec.StreamName -> ClientId voption = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId
module Reaction =
/// Active Pattern to determine whether a given {category}-{streamId} StreamName represents the stream associated with this Aggregate
/// Yields a strongly typed id from the streamId if the Category matches
let [<return: Struct>] (|For|_|) = Stream.decode
module Events =
type Added = { item: string }
type Removed = { name: string }
type Event =
| Added of Added
| Removed of Removed
interface TypeShape.UnionContract.IUnionContract
let codec = Store.codec<Event>
let utf8 (s: string) = System.Text.Encoding.UTF8.GetBytes(s) |> ReadOnlyMemory
let streamForClient c = Stream.name (ClientId.parse c)
let events = [
Stream.name (ClientId.parse "ClientA"), FsCodec.Core.TimelineEvent.Create(0L, "Added", utf8 """{ "item": "a" }""")
streamForClient "ClientB", FsCodec.Core.TimelineEvent.Create(0L, "Added", utf8 """{ "item": "b" }""")
FsCodec.StreamName.parse "Favorites-ClientA", FsCodec.Core.TimelineEvent.Create(1L, "Added", utf8 """{ "item": "b" }""")
streamForClient "ClientB", FsCodec.Core.TimelineEvent.Create(1L, "Added", utf8 """{ "item": "a" }""")
streamForClient "ClientB", FsCodec.Core.TimelineEvent.Create(2L, "Removed", utf8 """{ "item": "a" }""")
FsCodec.StreamName.compose "Favorites" [| "ClientB" |], FsCodec.Core.TimelineEvent.Create(3L, "Exported", utf8 """{ "count": 2 }""")
FsCodec.StreamName.parse "Misc-x", FsCodec.Core.TimelineEvent.Create(0L, "Dummy", utf8 """{ "item": "z" }""")
]
// Switch on debug logging to get detailed information about events that don't match (which has no significant perf cost when not switched on)
module Log =
open Serilog
let outputTemplate = "{Message} {Properties}{NewLine}"
let initWithDebugLevel () =
Log.Logger <-
LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(Serilog.Events.LogEventLevel.Debug, outputTemplate=outputTemplate)
.CreateLogger()
Log.initWithDebugLevel ()
(* Explicit matching, showing how some ugly things get into the code if you do the streamName matching and event parsing separately *)
// When we obtain events from an event store via streaming notifications, we typically receive them as ReadOnlyMemory<byte> bodies
type Event = FsCodec.ITimelineEvent<EventBody>
and EventBody = ReadOnlyMemory<byte>
and Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit>
let streamCodec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Codec<'E> =
Codec.Create<'E>(serdes = Store.serdes)
let dec = streamCodec<Events.Event>
let [<return:Struct>] (|DecodeEvent|_|) (codec: Codec<'E>) event = codec.Decode event
let runCodecExplicit () =
for stream, event in events do
match stream, event with
| Reaction.For clientId, DecodeEvent dec e ->
printfn $"Client %s{ClientId.toString clientId}, event %A{e}"
| FsCodec.StreamName.Split struct (cat, sid), e ->
printfn $"Unhandled Event: Category %s{cat}, Ids %s{FsCodec.StreamId.toString sid}, Index %d{e.Index}, Event: %A{e.EventType}"
runCodecExplicit ()
(*
Client ClientA, event Added {item = "a";}
Client ClientB, event Added {item = "b";}
Client ClientA, event Added {item = "b";}
Client ClientB, event Added {item = "a";}
Client ClientB, event Removed {name = null;}
Codec "<Snipped>" Could not decode "Exported" in "Favorites-ClientB" {event="{ \"count\": 2 }"}
Unhandled Event: Category Favorites, Id ClientB, Index 3, Event: "Exported"
Unhandled Event: Category Misc, Id x, Index 0, Event: "Dummy"
*)
(* Simplified by having a DecodeSingle ActivePattern that decodes if it matches *)
module ReactionsBasic =
let dec = streamCodec<Events.Event>
let (|DecodeSingle|_|): FsCodec.StreamName * Event -> (ClientId * Events.Event) option = function
| Reaction.For clientId, DecodeEvent dec event -> Some (clientId, event)
| _ -> None
let reactSingle (clientId: ClientId) (event: Events.Event) =
printfn $"Client %s{ClientId.toString clientId}, event %A{event}"
let runCodecMatch () =
for streamName, event in events do
match streamName, event with
| ReactionsBasic.DecodeSingle (clientId, event) ->
reactSingle clientId event
| FsCodec.StreamName.Split (cat, sid), e ->
printfn $"Unhandled Event: Category %s{cat}, Ids {FsCodec.StreamId.toString sid}, Index %d{e.Index}, Event: %s{e.EventType} "
runCodecMatch ()
(* Standard helper module used for parsing Events delivered via Streams, e.g. from Propulsion *)
module Streams =
(* TODO if using Propulsion, you can `open Propulsion.Sinks` here
NOTE it can still be useful to have type aliases so app wiring can refer to a terse `Streams.Event` etc *)
// Events coming from streams are carried as a TimelineEvent; the body type is configurable
type Event = FsCodec.ITimelineEvent<EventBody>
// Propulsion's Sinks by default use ReadOnlyMemory<byte> as the storage format
and EventBody = ReadOnlyMemory<byte>
// the above Events can be decoded by a Codec implementing this interface
and Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit>
/// Generates a Codec for the specified Event Union type
let codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Codec<'E> =
// Borrowing the Store serdes; frequently the events you parse can use less complex options...
Codec.Create<'E>(serdes = Store.serdes)
// as we know our event bodies are all UTF8 encoded JSON, we can render the string as a log event property
// alternately, you can render the EventBody directly and ensure you have appropriate type destructuring configured
let private render (x: EventBody): string =
System.Text.Encoding.UTF8.GetString(x.Span)
/// Uses the supplied codec to decode the supplied event record `x`
/// (iff at LogEventLevel.Debug, detail fails to `log` citing the `streamName` and body)
let decode<'E> (log: Serilog.ILogger) (codec: Codec<'E>) (streamName: FsCodec.StreamName) (x: Event) =
match codec.Decode x with
| ValueNone ->
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
log.ForContext("event", render x.Data, true)
.Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.EventType, streamName)
ValueNone
| ValueSome x -> ValueSome x
/// Attempts to decode the supplied Event using the supplied Codec
let [<return: Struct>] (|Decode|_|) (codec: Codec<'E>) struct (streamName, event) =
decode Serilog.Log.Logger codec streamName event
module Array = let inline chooseV f xs = [| for item in xs do match f item with ValueSome v -> yield v | ValueNone -> () |]
/// Yields the subset of events that successfully decoded (could be Array.empty)
let decode2<'E> (codec: Codec<'E>) struct (streamName, events: Event[]): 'E[] =
events |> Array.chooseV (decode<'E> Serilog.Log.Logger codec streamName)
let (|Decode|) = decode2
(* When using Propulsion, Events are typically delivered as an array of contiguous events together with a StreamName
The Decode Active Pattern decodes such a batch *)
module Reactions =
/// Active Pattern to determine whether a given {category}-{streamId} StreamName represents the stream associated with this Aggregate
/// Yields a strongly typed id from the streamId if the Category matches
let [<return: Struct>] (|For|_|) = Stream.decode
let private dec = Streams.codec<Events.Event>
/// Yields decoded events and relevant strongly typed ids if the Category of the Stream Name matches
let [<return: Struct>] (|Decode|_|) = function
| struct (For clientId, _) & Streams.Decode dec events -> ValueSome struct (clientId, events)
| _ -> ValueNone
let reactStream (clientId: ClientId) (event: Events.Event[]) =
printfn $"Client %s{ClientId.toString clientId}, events %A{event}"
let handleStream streamName events =
match struct (streamName, events) with
| Reactions.Decode (clientId, events) ->
reactStream clientId events
| FsCodec.StreamName.Split (cat, sid), _ ->
for e in events do
printfn $"Unhandled Event: Category %s{cat}, Id %A{sid}, Index %d{e.Index}, Event: %s{e.EventType} "
let runStreams () =
for streamName, xs in events |> Seq.groupBy fst do
let events = xs |> Seq.map snd |> Array.ofSeq
handleStream streamName events
runStreams ()
(*
Client ClientA, events [|Added { item = "a" }; Added { item = "b" }|]
Codec "<Snipped>" Could not decode "Exported" in "Favorites-ClientB" {event="System.ReadOnlyMemory<Byte>[14]"}
Client ClientB, events [|Added { item = "b" }; Added { item = "a" }; Removed { name = null }|]
Unhandled Event: Category Misc, Id x, Index 0, Event: "Dummy"
*)
(* Round-tripping contextual information to the application using an upconverter and a downconverter
Events being round-tripped from a store (e.g. Equinox etc), typically bear most relevant information in the EventBody
Where relevant, a decoding process may want to extract some contextual information based on the event envelope as the body is decoded
*)
module StoreWithMeta =
type Event<'E> = int64 * Metadata * 'E
and Metadata = { principal: string }
and Codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> = FsCodec.IEventCodec<Event<'E>, Store.EventBody, unit>
// we assume no special requirements for complex data types when deserializing the metadata, so use Default Serdes
let private serdes = Serdes.Default
let codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Codec<'E> =
// here we surface the metadata from the raw event as part of the application level event based on the stored form
let up (raw: Store.Event) (contract: 'E): Event<'E> =
raw.Index, serdes.Deserialize<Metadata> raw.Meta, contract
// _index: up and down are expected to encode/decode symmetrically - when encoding, the app supplies a dummy, and the store assigns it on appending
// the metadata is encoded as the normal bodies are
let down ((_index, meta: Metadata, event: 'E): Event<'E>) =
struct (event, ValueSome meta, ValueNone)
Codec.Create<Event<'E>, 'E, Metadata>(up, down, serdes = Store.serdes)
(* Adding contextual information to the event metadata as it's encoded via an out of band context
As illustrated in StoreWthMeta, in some cases the Metadata can be composed (and then round-tripped back) to the application
logic as a natural part of the system's processing.
Frequently, however, the contextual information is not actually relevant to the application logic.
In such a case, we can pass a _Context_ to the Codec when encoding is taking place.
An example of such a facility is Equinox's `context` argument for `Decider.createWithContext`; whenever an event is
being encoded to go into the store, the relevant `'Context` is supplied to the Codec, where it is then supplied to a
`mapCausation` function
*)
module StoreWithContext =
type Context = { correlationId: string; causationId: string; principal: string }
and Codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> = FsCodec.IEventCodec<'E, Store.EventBody, Context voption>
and Metadata = { principal: string }
let codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Codec<'E> =
let up (_raw: Store.Event) (contract: 'E) = contract
let down (event: 'E) =
// Not producing any Metadata based on the application-level event in this instance
let meta = ValueNone : Metadata voption
let ts = ValueNone
struct (event, meta, ts)
let mapCausation (context: Context voption) (_downConvertedMeta: Metadata voption) =
let eventId = Guid.NewGuid()
let metadata, corrId, causeId =
match context with
| ValueNone ->
// In some parts of this system, we don't have a Context to pass - hence we use `Context voption`
// as the context type in this instance. Generally, it's recommended for this mapping function
// to throw in order to have each path in the system that wishes to generate events be required
// to supply the relevant attribution information. But, here we illustrate how to do it loosey goosey!
ValueNone, null, null
| ValueSome v ->
// We map the correlation/causation identifiers into the designated fields
// the remaining information, we save into the Event's Meta field
// In this instance, we don't have any metadata arising from the application level events,
// but, if we did, we could merge it into the final rendered `ValueSome` we are passing down
let finalMeta = { principal = v.principal }
ValueSome finalMeta, v.correlationId, v.causationId
struct (metadata, eventId, corrId, causeId)
Codec.Create<'E, 'E, Metadata, Context voption>(up, down, mapCausation, serdes = Store.serdes)
(* Decoding contextual information from Streams Metadata
Events arriving from a source (e.g. Propulsion) can bear contextual information
Where relevant, a decoding process may want to extract such context alongside mapping the base information.
*)
module StreamsWithMeta =
type Event<'E> = (struct (int64 * Metadata * 'E))
and Metadata = { principal: string }
and Codec<'E> = FsCodec.IEventCodec<Event<'E>, Streams.EventBody, unit>
// we assume no special requirements for complex data types when deserializing the metadata, so use Default Options
let private serdes = Serdes Options.Default
let codec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Codec<'E> =
// here we surface some metadata from the raw event as part of the application level type
let up (raw: Streams.Event) (contract: 'E): Event<'E> =
struct (raw.Index, serdes.Deserialize<Metadata> raw.Meta, contract)
// We are not using this codec to encode events, so we let the encoding side fail very fast
let down _ = failwith "N/A"
Codec.Create<Event<'E>, 'E, Metadata>(up, down, serdes = Store.serdes)
let eventsWithMeta = seq {
for sn, e in events ->
let meta = utf8 """{"principal": "me"}"""
sn, FsCodec.Core.TimelineEvent.Create(e.Index, e.EventType, e.Data, meta) }
module ReactionsWithMeta =
let dec = StreamsWithMeta.codec<Events.Event>
let [<return: Struct>] (|Decode|_|) = function
| struct (Reactions.For clientId, _) & Streams.Decode dec events -> ValueSome struct (clientId, events)
| _ -> ValueNone
let reactStreamWithMeta (clientId: ClientId) (events: StreamsWithMeta.Event<Events.Event>[]) =
for index, meta, event in events do
printfn $"Client %s{ClientId.toString clientId}, event %i{index} meta %A{meta} event %A{event}"
let handleWithMeta streamName events =
match struct (streamName, events) with
| ReactionsWithMeta.Decode (clientId, events) ->
reactStreamWithMeta clientId events
| FsCodec.StreamName.Split (cat, sid), _ ->
for e in events do
printfn $"Unhandled Event: Category %s{cat}, Id %A{sid}, Index %d{e.Index}, Event: %s{e.EventType} "
let runStreamsWithMeta () =
for streamName, xs in eventsWithMeta |> Seq.groupBy fst do
let events = xs |> Seq.map snd |> Array.ofSeq
handleWithMeta streamName events
runStreamsWithMeta ()
(*
Client ClientA, event 0 meta { principal = "me" } event Added { item = "a" }
Client ClientA, event 1 meta { principal = "me" } event Added { item = "b" }
Codec "<Snipped>" Could not decode "Exported" in "Favorites-ClientB" {event="System.ReadOnlyMemory<Byte>[14]"}
Client ClientB, event 0 meta { principal = "me" } event Added { item = "b" }
Client ClientB, event 1 meta { principal = "me" } event Added { item = "a" }
Client ClientB, event 2 meta { principal = "me" } event Removed { name = null }
Unhandled Event: Category Misc, Id x, Index 0, Event: "Dummy"
*)