F# event sourcing with Marten
tl;dr: In this post we are going to
- Configure Marten in an F# project with the default serializer Newtonsoft.Json and use the document store as well as the event store
- Check what F# types are supported out of the box
- Use System.Text.Json for JSON serialization and customize some types with the help of FSharp.SystemTextJson
- Refactor the code to be a bit more functional by using a Discriminated Union as our event type
Sample code is available on github: https://github.com/jannikbuschke/blog-samples/tree/main/marten-and-fsharp-getting-started
Getting started
We need the dotnet sdk and a postgresql database.
mkdir marten-fs-sample //highlight-linecd marten-fs-sampledotnet new xunit --language f#dotnet add package Marten
Let's start by creating a Marten store object in order to persist and load a record:
type Du = | Case1 of int | Case2type SampleType = { Id:string; Hello: string; Option: int option; Du: Du }[<Fact>]let ``A first simple test`` () = task { let store = DocumentStore.For(fun options -> options.Connection("xxx") options.AutoCreateSchemaObjects <- AutoCreate.None ) // Finally, use the `IDocumentSession` to start // working with documents and queries. use session = store.OpenSession() // The following will instruct Marten to do an upsert for the `SampleType` given session.Store({ Id = "1"; Hello = "World"; Option = Some 5; Du = Du.Case2 }) let! _ = session.SaveChangesAsync() // Create a new session and try to load the saved object from the database use session = store.OpenSession() let! sample = session.LoadAsync<SampleType>("1") // check if object is same as the one saved Assert.Equal(sample, ({ Id = "1"; Hello = "World"; Option = Some 5; Du = Du.Case2 })) // => true return () }
Executing the above test will cause Marten to generate a table mt_doc_tests_sampletype
with the following columns: id
, data
, mt_last_modified
, mt_version
, and mt_dotnet_type
:
id: 1 // document idmt_last_modified: 2022-12-09 16:02:34.219722+01mt_version: 0184f768-d700-4969-b01a-c809eb24d4f0mt_dotnet_type: Tests+SampleTypedata:
{ "Du": { "Case": "Case2" }, "Id": "1", "Hello": "World", "Option": { "Case": "Some", "Fields": [5] }}
Let's customize the serializer later to save our data in a different format. It was easy to store and retrieve documents, and now let's take a leap and get started with event sourcing.
Event sourcing
In addition to the document store Marten provides an event store and feature for constructing projections. I will use chess a an example domain. First we will create some types representing the domain, an event and an aggregate for our event stream:
type Color = | Black | Whitetype PieceType = | King | Queen | Bishop | Knight | Rook | Pawntype ChessPiece = { Type: PieceType Color: Color Position: int * int }type State = | Initialized | Running | Ended// First Eventtype GameInitialized = { Pieces: ChessPiece list }// Aggregate// As Marten needs a default contstructor, apply the CLIMutable attribute[<CLIMutable>]type ChessGame = { Id: Guid Pieces: ChessPiece list State: State } member this.Apply(e: GameInitialized, meta: Marten.Events.IEvent) : ChessGame = // return new ChessGame record { Id = meta.StreamId Pieces = e.Pieces State = State.Running }
Now let`s write another test in which we will first save an event and then load the aggregate:
[<Fact>]let ``Event store with default options should save events and build a projection`` () = task { let store = DocumentStore.For (fun options -> options.Connection ("xxx") // Marten uses a code generator for the event projections options.GeneratedCodeMode <- LamarCodeGeneration.TypeLoadMode.Auto // Marten can automatically create the database schema for us options.AutoCreateSchemaObjects <- AutoCreate.All // Here we let Marten know that we want `ChessGame` to be a stream of type `SelfAggrgate`. // The second argument tells Marten to apply the event // to any projection immediatly as part of the transaction that saves the event. options.Projections.SelfAggregate<ChessGame> (ProjectionLifecycle.Inline) |> ignore) // Open a session to get access to the database use session = store.LightweightSession () // create and id for our aggregate/stream let gameId = Guid.NewGuid () // Append our first event (GameInitialized), this will not yet save the event session.Events.Append ( gameId, { Pieces = [ { Color = Black Position = (0, 0) Type = Pawn } ] } ) |> ignore // Marten will now save this event to the event table and potentially to projections. let! _ = session.SaveChangesAsync () let! game = session.LoadAsync<ChessGame> (gameId) Assert.Equal ( game, ({ Id = gameId State = Running Pieces = [ { Color = Black Position = (0, 0) Type = Pawn } ] }) ) return () }
Nice, now when we execute the code, Marten will save our event to the table mt_events
, apply it to our ChessGame
projection and saves that to the document store in the table mt_doc_chess_chessgame
! This way Marten provides both an event store and a store for our read model.
Entry in mt_events
:
seq_id: 1 // global sequence idid: 0184f5b5-b110-4dab-bb0e-43c68257e7a9 // event idstream_id: 716c0877-5de2-4451-8590-8713b1e8014a // stream idversion: 1type: game_initializedtimestamp: 2022-12-09 08:13:45.835311+01tenant_id: *DEFAULT*mt_dotnet_type: EventStoreTest.Chess+GameInitialized, blog-samplesis_archived: falsedata:
{ "Pieces": [ { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" }, "Position": { "Item1": 0, "Item2": 0 } } ]}
The seq_id
is a global counter that increments for every event, the id
is a unique identifier for each event, and the stream_id
is the ID of the associated stream/aggregate.
Let's have a look at our projection table mt_doc_chess_chessgame
:
id: d09d093e-95c0-4575-99d1-089d74ed95d1 // aggregate/stream idmt_last_modified: 2022-12-09 08:13:45.816568+01mt_version: 0184f5bb-a6b8-4eb0-9742-0de8bb4d6dc0mt_dotnet_type: EventStoreTest.Chess+ChessGamedata:
{ "Id": "d09d093e-95c0-4575-99d1-089d74ed95d1", "State": { "Case": "Running" }, "Pieces": [ { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" }, "Position": { "Item1": 0, "Item2": 0 } } ]}
Ok let's add another event type and refactor the domain a little bit:
type Color = | Black | Whitetype PieceType = | King | Queen | Bishop | Knight | Rook | Pawntype ChessPiece = { Type: PieceType; Color: Color }type State = | NotStarted | Running | Ended of Resulttype GameInitialized = { Pieces: Map<int * int, ChessPiece> }// new eventtype PieceMoved = { From: int * int; To: int * int }[<CLIMutable>]type ChessGame = { Id: Guid Pieces: Map<int * int, ChessPiece> State: State } member this.Apply(e: GameInitialized, meta: Marten.Events.IEvent) : ChessGame = { Id = meta.StreamId Pieces = e.Pieces State = State.NotStarted } member this.Apply(e: PieceMoved) : ChessGame = { this with Pieces = this.Pieces.Add(e.To, this.Pieces.[e.From]).Remove (e.From) }
We store the pieces in a Map
where the key is the position and the value is the piece. This should make it easier to move pieces around. We've also added an additional Apply
method override to handle the PieceMoved
event type.
Let`s adjust the test:
// ...let gameId = Guid.NewGuid ()// append fist eventsession.Events.Append (gameId, { Pieces = [ ((0, 0), { Color = Black; Type = Pawn }) ] |> Map.ofSeq })|> ignore// append second eventsession.Events.Append (gameId, { From = (0, 0); To = (0, 1) }) |> ignorelet! _ = session.SaveChangesAsync ()let! game = session.LoadAsync<ChessGame> (gameId)Assert.Equal ( ({ Id = gameId State = NotStarted Pieces = [ ((0, 1), { Color = Black; Type = Pawn }) ] |> Map.ofSeq }), game)
Now run the test aaand we get an error:
Newtonsoft.Json.JsonSerializationException:Could not convert string '(0, 1)' to dictionary key type 'System.Tuple`2[System.Int32,System.Int32]'.Create a TypeConverter to convert from the string to the key type object. Path 'Pieces['(0, 1)']', line 1, position 97.
The default serializer Newtonsoft.Json worked well for the different F# types, except for tuples used as keys for Map. Data was saved successfully, but Newtonsoft had difficulty deserializing it. Writing a custom TypeConverter may help, but there are a wide range of F# serializers available, such as FSharpLu.Json, Thoth.Json, Json.NET (Newtonsoft.Json), and System.Text.Json. For this example, we are using System.Text.Json. To change the configuration, follow these steps:
DocumentStore.For (fun options -> options.Connection ("xxx") options.GeneratedCodeMode <- LamarCodeGeneration.TypeLoadMode.Auto options.AutoCreateSchemaObjects <- AutoCreate.All options.Projections.SelfAggregate<ChessGame> (ProjectionLifecycle.Inline) |> ignore // Configure Marten to use System.Text.Json for json serialization let serializer = Marten.Services.SystemTextJsonSerializer(() options.Serializer(serializer))
run the test aaand get the following error:
System.NotSupportedException:F# discriminated union serialization is not supported. Consider authoring a custom converter for the type.
Well, it seems that STJ does not yet fully support F# types. Fortunately, there is a solution! Thanks to FSharp.SystemTextJson, we can use DUs, Maps, and other F# types. To take advantage of this, let's add a package reference and add the JsonFSharpConverter to our project:
dotnet add package FSharp.SystemTextJson
//...let serializer = Marten.Services.SystemTextJsonSerializer ()serializer.Customize (fun v -> v.Converters.Add (JsonFSharpConverter ()))options.Serializer (serializer)
Now the test works again! After cleaing the database and running the test the database looks like this:
mt_events
table:
// First rowseq_id: 1 // global sequence idid: 0184f604-9000-4928-a8e8-efc9a4678922 // event idstream_id: 7fc00dff-a666-4be2-bbb9-68bbf1762bfd // stream idversion: 1type: game_initializedtimestamp: 2022-12-09 09:33:25.059613+01tenant_id: *DEFAULT*mt_dotnet_type: EventStoreTest.Chess+GameInitialized, blog-samplesis_archived: falsedata:
{ "Pieces": [ [[0, 0], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } }] ]}
// Second rowseq_id: 2 // global sequence idid: 0184f604-9002-46f6-94c6-682d3a4df3ca // event idstream_id: 7fc00dff-a666-4be2-bbb9-68bbf1762bfd // stream idversion: 2type: piece_movedtimestamp: 2022-12-09 09:33:25.059613+01tenant_id: *DEFAULT*mt_dotnet_type: EventStoreTest.Chess+PieceMoved, blog-samplesis_archived: falsedata:
{ "To": [0, 1], "From": [0, 0] }
mt_doc_game_chessgame
table:
id: 7fc00dff-a666-4be2-bbb9-68bbf1762bfd // aggregate/stream idmt_last_modified: 2022-12-09 09:33:25.043814+01mt_version: 0184f604-92fb-4d0b-ad8f-0c5923c808c3mt_dotnet_type: EventStoreTest.Chess+ChessGamedata:
{ "Id": "7fc00dff-a666-4be2-bbb9-68bbf1762bfd", "State": { "Case": "NotStarted" }, "Pieces": [ [[0, 1], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } }] ]}
Cool! Marten successfully applied events 1 and 2 to our aggregate and also updated the projection state to its latest state in the document store.
Replay events
Instead of using the persisted projection from table mt_doc_chess_chessgame
we can also tell marten to do a live aggregation to any version of the aggregate:
// rebuild aggregate state at version 1 (one event applied)let! gameV1 = session.Events.AggregateStreamAsync<ChessGame> (gameId, 1)Assert.Equal ( ({ Id = gameId State = NotStarted Pieces = [ ((0, 0), { Color = Black; Type = Pawn }) ] |> Map.ofSeq }), gameV1)// rebuild aggregate state at version 2let! gameV2 = session.Events.AggregateStreamAsync<ChessGame> (gameId, 2)Assert.Equal ( ({ Id = gameId State = Running Pieces = [ ((0, 1), { Color = Black; Type = Pawn }) ] |> Map.ofSeq }), gameV2)// rebuild latest aggregate state (same as version 2)let! gameV3 = session.Events.AggregateStreamAsync<ChessGame> (gameId)Assert.Equal (gameV2, gameV3)
FSharp.SystemTextJson default serialization format
FSharp.SystemTextJson uses some interesting defaults for serializing F# types.
Options are rendered as the value in the Some case and null in the None case
Some 5None
5 // <- Somenull // <- None
Single Case unions are unwrapped
type Id = | Id of System.Guid
"7817a4bd-c7c3-460f-9679-aa10afd4ac89"
Multi case unions have a discriminator with the case name and a Fields array:
type Du = | Case1 | Case2 of int | Case3 of int * string | Case4 of name: string | Case5 of name: string * payload: MyRecord | Case6 of MyRecord
{"Case":"Case1"}{"Case":"Case2","Fields":[2]}{"Case":"Case3","Fields":[5,"hello"]}{"Case":"Case4","Fields":["hello"]}{"Case":"Case5","Fields":["Name",{"Foo":"Hello"}]}{"Case":"Case6","Fields":[{"Foo":"Hello"}]}
Lists will be rendered as an array:
[1;2;3]
[1, 2, 3]
One weird thing which I don't yet fully grap: Records with a property set to null can be serialized but not deserialized.
type MyRecord = { Foo: string }seralizer.Serialize({Foo=null}, defaultOptions)
Produces
{ "Foo": null }
but the following throws:
seralizer.Deserialize<MyRecord>("""{Foo=null}""", defaultOptions)
The docs state that
By default, FSharp.SystemTextJson throws an exception when the following conditions are met:
it is deserializing a record or a union; a field's JSON value is null or the field is unspecified; that field's type isn't an explicitly nullable F# type (like option, voption and Skippable)
So I think this is by design, but it's strange that we can serialize a record with a string property set to null, yet not be able to deserialize it. The best solution would be to disallow serialization altogether. If we need to deal with no-value cases, the explicit Option type is the way to go. I'll update this section if I get more clarity on this. If anyone has ideas on how to improve this behavior, it would be awesome if you drop a comment here or here.
Customize FSharp.SystemTextJson Discriminated Union format
I think the defaults are pretty good. Rendering an array for discriminated union fields however seems a bit off to me. Other options exist, so lets customize the default options. In the following example we mostly initialize the JsonFSharpConverter with the default options, but for union fields will use an object with properties instead of an array and unwrap a record field if it is the only field:
let customOptions = System.Text.Json.JsonSerializerOptions ()customOptions.Converters.Add ( JsonFSharpConverter ( // Encode unions as a 2-valued object: unionTagName (defaults to "Case") contains the union tag, and unionFieldsName (defaults to "Fields") contains the union fields. If the case doesn't have fields, "Fields": [] is omitted. This flag is included in Default. JsonUnionEncoding.AdjacentTag // If unset, union fields are encoded as an array. If set, union fields are encoded as an object using their field names. ||| JsonUnionEncoding.NamedFields // Implicitly sets NamedFields. If set, when a union case has a single field which is a record, the fields of this record are encoded directly as fields of the object representing the union. ||| JsonUnionEncoding.UnwrapRecordCases // If set, None is represented as null, and Some x is represented the same as x. This flag is included in Default. ||| JsonUnionEncoding.UnwrapOption // If set, single-case single-field unions are serialized as the single field's value. This flag is included in Default. ||| JsonUnionEncoding.UnwrapSingleCaseUnions // In AdjacentTag and InternalTag mode, allow deserializing unions where the tag is not the first field in the JSON object. ||| JsonUnionEncoding.AllowUnorderedTag, // Also the default. Throw when deserializing a record or union where a field is null but not an explict nullable type (option, voption, skippable) https://github.com/Tarmil/FSharp.SystemTextJson/blob/master/docs/Customizing.md#allownullfields allowNullFields = false )
type Du = | Case1 | Case2 of int | Case3 of int * string | Case4 of name: string | Case5 of name: string * payload: MyRecord | Case6 of MyRecord | Case7 of Foo: string
{"Case":"Case1"}{"Case":"Case2","Fields":{"Item":2}} // implicitly named 'Item'{"Case":"Case3","Fields":{"Item1":5,"Item2":"hello"}} // implicitly named 'Item1' and 'Item2'{"Case":"Case4","Fields":{"name":"Hello"}} // explicitly named fields{"Case":"Case5","Fields":{"name":"Hello","payload":{"Foo":"Hello"}}} // explicitly named fields{"Case":"Case6","Fields":{"Foo":"Hello"}} // unwrapped single value record{"Case":"Case7","Fields":{"Foo":"Hello"}} // explicitly named field
This format is a little bit more explicit, as our fields get names. I think this should make querying with PostgreSQL simpler.
Last refactor of our domain
Let's refactor our domain one last time, and use the above options to render Unions with named fields instead as arrays and also unwrap single field record cases. Let's refactor our events to use a single GameEvent
Union and use one single Apply method with pattern matching:
//... previous types as before// new: use single type GameEvent with different union casestype GameEvent = | GameInitialized of GameInitialized | PieceMoved of PieceMoved // new, left out for brevity | PieceCaptured of PieceCaptured // new, left out for brevity | GameEnded of GameEnded[<CLIMutable>]type ChessGame = { Id: Guid Pieces: Map<int * int, ChessPiece> State: State } // refactored to one Apply method with pattern matching member this.Apply(e: GameEvent, meta: Marten.Events.IEvent) : ChessGame = match e with | GameInitialized e -> { Id = meta.StreamId Pieces = e.Pieces State = State.NotStarted } | PieceMoved e -> let piece = this.Pieces.[e.From] { this with Pieces = this.Pieces.Add(e.To, piece).Remove (e.From); State = State.Running } // other cases left out for brevity | _ -> this
// ... other code as before// when appending events we now need to save the `GameEvent`:session.Events.Append (gameId, GameEvent.GameInitialized { Pieces = [ ((0, 0), { Color = Black; Type = Pawn }) ] |> Map.ofSeq })|> ignoresession.Events.Append (gameId, GameEvent.PieceMoved { From = (0, 0); To = (0, 1) }) |> ignore
the mt_events
table now looks like this:
seq_id: 1id: 0184f705-88c0-41f0-9ab9-59af825152f6stream_id: d6a8e070-4547-4401-97b4-baff45e88b50version: 1type: game_initializedtimestamp: 2022-12-09 14:14:05.94232+01tenant_id: *DEFAULT*mt_dotnet_type: EventStoreTest.Chess+GameInitialized, blog-samplesis_archived:falsedata:
{ "Case": "GameInitialized", "Fields": { "Pieces": [ [[0, 0], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } }] ] }}
seq_id: 2id: 0184f705-88c4-4162-8cf8-83e80d48d20bstream_id: d6a8e070-4547-4401-97b4-baff45e88b50version: 2type: piece_movedtimestamp: 2022-12-09 14:14:05.94232+01tenant_id: *DEFAULT*mt_dotnet_type: EventStoreTest.Chess+GameEvent+PieceMoved, blog-samplesis_archived: falsedata:
{ "Case": "PieceMoved", "Fields": { "To": [0, 1], "From": [0, 0] } }
And the current projection state in nt_doc_game_chessgame
:
{ "Id": "661ab7db-b3a7-4930-aa46-0e4cc806507c", "State": { "Case": "Running" }, "Pieces": [ [ [0, 1], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } } ] ]}
Thanks for reading!
You can browse the full code at https://github.com/jannikbuschke/blog-samples/tree/main/marten-and-fsharp-getting-started