- Converted
org.occurrent.subscription.OccurrentSubscriptionFilter
from a Java class to a record. This means that thepublic final
filter instance field is now a record property. So if you ever usedoccurrentSubscriptionFilter.filter
to access the underluying filter, you now need to dooccurrentSubscriptionFilter.filter()
instead.
- Implemented "in" conditions so you can now do e.g.
subscriptionModel.subscribe("id", OccurrentSubscriptionFilter.filter(Filter.streamVersion(Condition.in(12L, 14L))
. There's also a Kotlin extension function,isIn
, which can be imported fromorg.occurrent.condition.isIn
. - Upgraded kotlin from 2.0.20 to 2.0.21
- Upgraded spring-boot from 3.3.3 to 3.3.5
- Upgraded spring-data-mongodb from 4.3.3 to 4.3.5
- Upgraded mongodb-driver-sync from 5.1.4 to 5.2.0
- Upgraded jobrunr from 7.2.3 to 7.3.1
- Upgraded project reactor from 3.6.9 to 3.6.11
- Fixed so that inserting events with "any" WriteCondition never fails even if more than two threads are writing events to the same stream at the same time. (Fixed in MongoEventStore and SpringMongoEventStore)
- Fixed so that blocking and reactive EventStoreQueries really uses
SortBy.unsorted()
by default as was intended in the previous release.
- Added better debug logging
- Improved queryOne performance in DomainEventQueries
- Fixed issue in Kotlin extensions for EventStoreQueries which made them unusable
- Introduced
SortBy.unsorted()
which is now the default sort specification used when no one is specified explicitly in queries - Upgraded spring-boot from 3.3.3 to 3.3.4
-
Added two kotlin extension functions to DomainEventQueries:
queryForList
that just takes a filter and a "SortBy"queryForSequence
that just takes a filter and a "SortBy"
The reason for this is to avoid ambiguity with other extension function when only specifying these values.
OccurrentAnnotationBeanPostProcessor
is only applied ifoccurrent.subscription.enabled
property is missing or istrue
.- Added ability to disable the creation a default instance of an
ApplicationService
when using thespring-boot-starter-mongodb
module by specifyingoccurrent.application-service.enabled=false
. - Upgraded cloudevents from 3.0.0 to 4.0.1
- Upgraded jackson from 2.17.1 to 2.17.2
- Upgraded jobrunr from 7.2.1 to 7.2.3
- Upgraded kotlin from 2.0.0 to 2.0.20
- Upgraded mongodb-driver-sync from 5.1.1 to 5.1.4
- Upgraded project reactor from 3.6.7 to 3.6.9
- Upgraded spring-boot from 3.3.1 to 3.3.3
- Upgraded spring-data-mongodb from 4.3.1 to 4.3.3
- Upgraded kotlinx-html-jvm from 0.7.2 to 0.11.0
- Ignoring NoSuchBeanDefinitionException when getting the springApplicationAdminRegistrar bean when working around spring-projects/spring-framework#32904
- Made OccurrentAnnotationBeanPostProcessor a static bean in OccurrentMongoAutoConfiguration to avoid Spring warning logs when booting up
- Fixed a bug in OccurrentAnnotationBeanPostProcessor that caused
@Subscription(id="myId", startAt = BEGINNING_OF_TIME)
not to replay events from the beginning of time - CompetingConsumerSubscriptionModel supports delegating to parent subscription model if the
StartAt
position returnsnull
. This means that the CompetingConsumerSubscriptionModel can be bypassed for certain subscriptions. This is useful if you have an in-memory subscription on multiple nodes with aCompetingConsumerSubscriptionModel
. - Added an overloaded method to SubscriptionModelLifeCycle (implemented by most SubscriptionModels) start allows you to start a subscription model without automatically starting all paused subscriptions. This method is called
start
and takes a boolean that tells if all subscriptions should be automatically started when the subscription model starts. - When using "in-memory" subscriptions, by doing e.g.
@Subscription(id="myId", startAt = BEGINNING_OF_TIME, resumeBehavior = SAME_AS_START)
, the subscription will be started on all nodes even when a CompetingConsumerSubscriptionModel is used. - The waitUntilStarted() method in the Subscription interface is now a default method.
- CatchupSubscriptionModel subscriptions are now started in a background thread by default. Call the "waitUntilStarted()" on the Subscription to make is synchronous.
- The java.util.Stream returned from SpringMongoEventStore is now automatically closed when the last element is consumed.
- Added ability to specify whether the subscription should "waitUntilStarted" in the Subscriptions DSL.
- Upgraded Spring Boot from 3.2.5 to 3.3.1
- Upgraded Kotlin from 1.9.23 to 2.0.0
- Upgraded Mongo sync driver from 4.11.2 to 5.1.1
- Upgraded Jackson from 2.15.4 to 2.17.1
- Upgraded reactor from 3.6.5 to 3.6.7
- Upgraded jobrunr from 7.1.1 to 7.2.1
- Upgraded amqp-client from 5.20.0 to 5.21.0
- Upgraded spring-aspects from 6.1.1 to 6.1.10
- Upgraded spring-retry from 2.0.3 to 2.0.6
- Upgraded spring-hateoas from 2.2.0 to 2.3.0
- Upgraded spring-data-mongodb from 4.3.0 to 4.3.1
- Upgraded kotlinx-collections-immutable-jvm from 0.3.4 to 0.3.7
- Upgraded arrow-core from 1.2.1 to 1.2.4
- Upgraded jetbrains annotations from 22.0.0 to 24.1.0
- Upgraded logback-classic from 1.4.14 to 1.5.6
- Major improvements to
CatchupSubscriptionModel
, it now handles and includes events that have been written while the catch-up subscription phase runs. Also, the "idempotency cache" is only used while switching from catch-up to continuous mode, and not during the entire catch-up phase. - Major changes to the
spring-boot-starter-mongodb
module. It now includes aCatchupSubscriptionModel
which allows you to start subscriptions from an historic date more easily. StartAt.Dynamic(..)
now takes aSubscriptionModelContext
as a parameter. This means that subscription models can add a "context" that can be useful for dynamic behavior. For example, you can prevent a certain subscription model to start (and instead delegate to its parent) if you returnnull
asStartAt
from a dynamic position.- Added annotation support for subscriptions when using the
spring-boot-starter-mongodb
module. You can now do:It also allows you to easily start the subscription from a moment in the past (such as beginning of time). See javadoc in@Subscription(id = "mySubscription") void mySubscription(MyDomainEvent event) { System.out.println("Received event: " + event); }
org.occurrent.annotation.Subscription
for more info. - Added
org.occurrent.subscription.blocking.durable.catchup.StartAtTime
as a help to theCatchupSubscriptionModel
to easier specify anOffsetDateTime
or "beginning of time" when starting a subscription catchup subscription model. Before you had to do:but now you can do:subscriptionModel.subscribe("myId", StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()), System.out::println);
which is shorter. You're using Kotlin you can importsubscriptionModel.subscribe("myId", StartAtTime.beginningOfTime(), System.out::println);
org.occurrent.subscription.blocking.durable.catchup.beginningOfTime
and do:subscriptionModel.subscribe("myId", StartAt.beginningOfTime(), ::println)
- Changed the default behavior of
CatchupSubscriptionModel
. Before it replayed all historic events by default if no specific start at position was supplied, but now it delegates to the wrapped subscription and no historic events will be replayed. Instead, you need to explicitly specifybeggingOfTime
or anOffsetDateTime
as the start position. For example:subscriptionModel.subscribe("myId", StartAtTime.beginningOfTime(), System.out::println);
- Upgraded Spring Boot from 3.2.1 to 3.2.5
- Upgraded Mongo sync driver 4.11.1 to 4.11.2
- Upgraded jobrunr from 6.3.3 to 7.1.1
- Upgraded project reactor from 3.6.0 to 3.6.5
- Upgraded jackson from 2.15.3 to 2.15.4
- Upgraded Kotlin from 1.9.22 to 1.9.23
- Upgraded spring-data-mongodb from 4.2.0 to 4.2.5
- Upgraded cloudevents from 2.5.0 to 3.0.0
- Fixed issue in CompetingConsumerSubscriptionModel in which it failed to reacquire consumption rights in some cases where MongoDB connection was lost.
-
Fixed issue in Subscription DSL when using "subscribe" functions with a single event type different from the "base event type", i.e. this didn't work in previous version in Java:
// GameEvent is the "base event type" Subscriptions<GameEvent> subscriptions = new Subscriptions<>(..); // GameStarted has GameEvent as parent, the following didn't compile in version 0.17.0 subscriptions.subscribe("mysubscription", GameStarted.class, gameStarted -> System.out.println("gameStarted: " + gameStarted));
-
Using slf4j-api and not logback-classic in several modules that accidentally brought logback in as a compile time dependency.
-
Upgraded slf4j-api from 2.0.5 to 2.0.12
-
In the
spring-boot-starter-mongodb
module, it's now possible to enable/disable the event store or subscriptions from theapplication.yaml
file. For example, you can disable the event store like this:occurrent: event-store: enabled: false # Disable the creation of an event store Spring bean
and the subscriptions like this:
occurrent: subscription: enabled: false # Disable the creation of beans related to subscriptions
This is useful if you have an application where you only need the event store or only need the subscriptions.
-
Added
queryForList
Kotlin extension function toEventStoreQueries
andDomainEventQueries
. It works in a similar way asqueryForSequence
, but returns aList
instead of a KotlinSequence
. -
Fixed an issue with
CatchupSubscriptionModel
in which it threw an IllegalArgumentException when storing the position of stored events when using Atlas free tier.
- spring-boot-starter-mongodb no longer autoconfigures itself by just importing the library in the classpath, instead you need to bootstrap by annotating your Spring Boot class with @EnableOccurrent.
- Fixed bug in spring-boot-starter-mongodb module in which it didn't automatically configure MongoDB.
- Domain event subscriptions now accepts metadata as the first parameter, besides just the event. The metadata currently contains the stream version and stream id, which can be useful when building projections.
- Fixed a bug in SpringMongoSubscriptionModel in which it didn't restart correctly on non DataAccessException's
- Introducing Decider support (experimental)
- Fixed a rare ConcurrentModificationException issue in SpringMongoSubscriptionModel if the subscription model is shutdown while it's restarting
- Upgraded from Kotlin 1.9.20 to 1.9.22
- Upgraded amqp-client from 5.16.0 to 5.20.0
- Upgraded Spring Boot from 3.1.4 to 3.2.1
- Upgraded reactor from 3.5.10 to 3.6.0
- Upgraded Spring data MongoDB from 4.1.4 to 4.2.0
- Upgraded jobrunr from 6.3.2 to 6.3.3
- Upgraded mongodb drivers from 4.10.2 to 4.11.1
- Upgraded lettuce core from 6.2.6.RELEASE to 6.3.1.RELEASE
- Upgraded spring-aspects from 6.0.10 to 6.1.1
- Upgraded jackson from 2.15.2 to 2.15.3
-
Removed
isFinalError
method fromErrorInfo
used byRetryStrategy
, useisLastAttempt()
instead. -
Added
RetryInfo
as argument to theexec
extension function inRetryStrategy
. -
Added
retryAttemptException
as an extension property toorg.occurrent.retry.AfterRetryInfo
so that you don't need to use thegetFailedRetryAttemptException
method that returns anOptional
in the Java interface. Instead, theretryAttemptException
function returns aThrowable?
. Import the extension property from theorg.occurrent.retry.AfterRetryInfoExtensions
file. -
Added
nextBackoff
as an extension property toorg.occurrent.retry.ErrorInfo
so that you don't need to use thegetBackoffBeforeNextRetryAttempt
method that returns anOptional
in the Java interface. Instead, thenextBackoff
function returns aDuration?
. Import the extension property from theorg.occurrent.retry.ErrorInfoExtensions
file. -
In the previous version, in the retry strategy module,
onBeforeRetry
,onAfterRetry
,onError
etc, accepted aBiConsumer<Throwable, RetryInfo>
. The arguments have now been reversed, so the types of the BiConsumer is nowBiConsumer<RetryInfo, Throwable>
. -
Added
onRetryableError
method toRetryStrategy
which you can use to listen to errors that are retryable (i.e. matching the retry predicate). This is a convenience method foronError
whenisRetryable
is true. -
Added Kotlin extensions to
JacksonCloudEventConverter
. You can import the functionorg.occurrent.application.converter.jackson.jacksonCloudEventConverter
and use like this:val objectMapper = ObjectMapper() val cloudEventConverter: JacksonCloudEventConverter<MyEvent> = jacksonCloudEventConverter( objectMapper = objectMapper, cloudEventSource = URI.create("urn:myevents"), typeMapper = MyCloudEventTypeMapper() )
-
Fixed problem with spring-boot autoconfiguration in which it previously failed to create a default cloud event converter if no type mapper was specified explicitly.
-
Upgraded to Kotlin 1.9.20
-
Added a "deleteAll" method to InMemoryEventStore which is useful for testing
-
The
org.occurrent.eventstore.api.WriteConditon
has been converted to a java record. -
Removed the deprecated method "getStreamVersion" in
org.occurrent.eventstore.api.WriteConditon
, usenewStreamVersion()
instead.
- Several changes to
RetryStrategy
again:onError
is will be called for each throwable again. The newErrorInfo
instance, that is supplied to the error listener, can be used to determine whether the error is "final" or if it's retryable.- In the previous version,
onBeforeRetry
andonAfterRetry
, accepted aBiConsumer<RetryInfo, Throwable>
. The arguments have now been reversed, so the types of the BiConsumer is nowBiConsumer<Throwable, RetryInfo>
.
- Added
onAfterRetry
toRetryStrategy
- Upgraded jakarta-api from 1.3.5 to 2.11 (which means that all javax annotations have been replaced by jakarta)
- Fixed a bug in CatchupSubscriptionModel that prevented it from working in MongoDB clusters that doesn't have access to the
hostInfo
command such as Atlas free-tier. - Several changes to the
RetryStrategy
:- Renamed
getNumberOfAttempts
togetNumberOfPreviousAttempts
- Added
getAttemptNumber
which is the number of the current attempt onError
is now only called if the end result is an error. I.e. it will only be called at most once, and not for intermediate errors. Because of this, the variant ofonError
that took aBiConsumer<RetryInfo, Throwable>
has been removed (because there's no need forRetryInfo
when the operation has failed).- Added the
onBeforeRetry
method, which is called before a retry attempt is made. This function takes aBiConsumer<RetryInfo, Throwable>
in which theRetryInfo
instance contains details about the current retry attempt.
- Renamed
- Added equals/hashcode and toString to RetryInfo
- Small changes to how retries are performed in the competing consumer strategies for MongoDB
- Improved debug logging in competing consumer implementations
- Upgraded Spring Boot from 3.0.8 to 3.1.4
- Upgraded kotlin from 1.9.0 to 1.9.10
- Upgraded jobrunr from 6.3.0 to 6.3.2
- Upgraded spring data mongodb from 4.0.8 to 4.1.4
- Upgraded jackson from version 2.14.3 to 2.15.2
- Upgraded project reactor from 3.5.8 to 3.5.10
- Upgraded spring-retry from 2.0.0 to 2.0.3
- Upgraded lettuce-core from 6.2.2.RELEASE to 6.2.6.RELEASE
- The SpringMongoSubscriptionModel is now restarted for all instances of
org.springframework.dao.DataAccessException
instead of just instances oforg.springframework.data.mongodb.UncategorizedMongoDbException
. - Upgraded cloudevents from 2.4.2 to 2.5.0
- Upgraded Spring Boot from 3.0.7 to 3.0.8
- Upgraded project reactor from 3.5.6 to 3.5.8
- Upgraded spring data mongodb from 4.0.6 to 4.0.8
- Upgraded mongo driver from 4.8.1 to 4.10.2
- Upgraded jobrunr from 6.1.4 to 6.3.0
- Improved debug logging in
org.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModel
-
A kotlin extension function that makes it easier to execute a
RetryStrategy
with a "Supplier".The reasons for this is that when just doing this from kotlin:
val string = retryStrategy.execute { "hello" }
This will return
Unit
and not the "hello" string that you would expect. This is because execute in the example above delegates to org.occurrent.retry.RetryStrategy.execute(java.lang.Runnable) and not org.occurrent.retry.RetryStrategy.execute(java.util.function.Supplier) which one would expect. Thus, you can use this function instead to avoid specifying theSupplier
SAM explicitly.I.e. instead of doing:
val string : String = retryStrategy.execute(Supplier { "hello" })
you can do:
val string : String = retryStrategy.exec { "hello" }
after having imported
org.occurrent.retry.exec
. -
Kotlin jvm target is set to 17
-
Added ability to map errors with
RetryStrategy
, either by doing:retryStrategy .mapError(IllegalArgumentException.class, IllegalStateException::new) .maxAttempts(2) .execute(() -> { throw new IllegalArgumentException("expected"); }));
In the end, an
IllegalStateException
will be thrown. You can also do like this:retryStrategy .mapError(t -> { if (t instanceof IllegalArgumentException iae) { return new IllegalStateException(iae.getMessage()); } else { return t; } }) .maxAttempts(2) .execute(() -> { throw new IllegalArgumentException("expected"); }));
-
Added a new
execute
Kotlin extension function to theApplicationService
that allows one to use ajava.util.UUID
as a streamId when working with lists of events (as opposed toSequence
). -
Upgraded xstream from 1.4.19 to 1.4.20
-
Added better logging to
org.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModel
, including some debug logs that can be used for detailed information about what's going on. -
Upgraded Kotlin from 1.8.21 to 1.9.0
-
Upgraded Spring Boot from 3.0.6 to 3.0.7
-
Upgraded Spring Aspects from 6.0.9 to 6.0.10
- Added support to the retry module to execute retries with a function that takes an instance of
org.occurrent.retry.RetryInfo
. This is useful if you need to know the current state of your of the retry while retrying. For example:RetryStrategy retryStrategy = RetryStrategy .exponentialBackoff(initialDelay, maxDelay, 2.0) .maxAttempts(10) retryStrategy.execute(info -> { if(info.getNumberOfAttempts() > 2 && info.getNumberOfAttempts() < 6) { System.out.println("Number of attempts is between 3 and 5"); } ... });
- Fixed bug in the retry module, in which error listeners where not called for the last error.
- Upgraded jobrunr from 5.3.0 to 6.1.4
- Upgraded Kotlin from 1.8.0 to 1.8.21
- Upgraded Jackson from 2.14.1 to 2.14.3
- Upgraded project reactor from 3.5.0 to 3.5.6
- Upgraded to Spring Boot from 3.0.3 to 3.0.6
- Upgraded to Spring from 6.0.6 to 6.0.9
- Upgraded to Spring Data MongoDB from 4.0.0 to 4.0.6
- Upgraded cloudevents from 2.4.1 to 2.4.2
- Upgraded Kotlin from 1.7.20 to 1.8.0
- Upgraded cloudevents to 2.4.1
- Improvements to
SpringMongoSubscriptionModel
in which it'll restart the subscription from the default subscription position instead of now on unknown or query-related MongoDB errors. This eliminates the risk of loosing messages if using a durable subscription model. - Fixed a subtle bug in
SpringMongoLeaseCompetingConsumerStrategy
in which it could crash in some cases where MongoDB was down for more than 30 seconds. - Upgraded to Spring Boot 3.0.3
- Upgraded spring-aspects from 6.0.2 to 6.0.6
- Fix error in the sequence command composition that leaves old events in the sequence (issue #131) (thanks to chrisdginn for pull request)
- Occurrent now require Java 17 instead of Java 8. This is major change to support the latest Spring client libraries for various databases such MongoDB and Redis. This was also done to better support Spring Boot 3 and jakartaee.
- Lots of changes under the hood, refactorings to make use of records, sealed classes and built-in functional constructs available in Java 17.
- Refactored SubscriptionPositionStorageConfig to sealed interface
- Refactored CompetingConsumerSubscriptionModel
- Refactored StartAt to a sealed interface
- Refactored ClassName to a sealed interface
- Refactored RetryStrategy to a sealed interface
- Converted Deadline to a sealed interface
- Converted CompetingConsumer in CompetingConsumerSubscriptionModel to a record
- Converting Backoff to sealed interface
- Converting Condition and WriteCondition to sealed interfaces
- Converting SortBy to a sealed interface
- Refactor MaxAttempts to sealed interface and implementations to records
- The spring-boot-starter module now supports Spring Boot 3 (thanks to Kirill Gavrilov for pull request)
Introducing deadline scheduling. Scheduling (aka deadlines, alarm clock) is a very handy technique to schedule to commands to be executed in the future or periodically.
Imagine, for example, a multiplayer game, where we want to game to end automatically after 10 hours of inactivity.
This means that as soon as a player has made a guess, we’d like to schedule a “timeout game command” to be executed after 10 hours.
The way it works in Occurrent is that you schedule a org.occurrent.deadline.api.blocking.Deadline
using a org.occurrent.deadline.api.blocking.DeadlineScheduler
implementation.
The Deadline
is a date/time in the future when the deadline is up. You also register a org.occurrent.deadline.api.blocking.DeadlineConsumer
to a
org.occurrent.deadline.api.blocking.DeadlineConsumerRegistry
implementation, and it'll be invoked when a deadline is up. For example:
// In some method we schedule a deadline two hours from now with data "hello world"
var deadlineId = UUID.randomUUID();
var deadlineCategory = "hello-world";
var deadline = Deadline.afterHours(2)
deadlineScheduler.schedule(deadlineId, deadlineCategory, deadline, "hello world");
// In some other method, during application startup, we register a deadline consumer to the registry for the "hello-world" deadline category
deadlineConsumerRegistry.register("hello-world", (deadlineId, deadlineCategory, deadline, data) -> System.out.println(data));
In the example above, the deadline consumer will print "hello world" after 2 hours.
There are two implementations of DeadlineScheduler
and DeadlineConsumerRegistry
, one that uses JobRunr and one in-memory implementation.
Depend on org.occurrent:deadline-jobrunr:0.15.0
to get the JobRunr implementation, and org.occurrent:deadline-inmemory:0.15.0
to get the in-memory implementation.
- Upgraded Kotlin from 1.7.10 to 1.7.20
- Upgraded cloudevents from 2.3.0 to 2.4.0
- Upgraded Spring Boot from 2.7.3 to 2.7.5
- Changed toString() on StreamVersionWriteCondition when condition is null from "any stream version" to "any"
- Fixed a bug in SpringMongoEventStore when several writes happened in parallel to the same stream and write condition was "any". This could result in a WriteConditionNotFulfilledException since the underlying MongoDB transaction failed. Now, after the fix, the events are stored as indented.
- Fixed NPE issue in the toString() method in
org.occurrent.eventstore.api.StreamVersionWriteCondition
when stream condition wasany
.
- Fixed issue in
SpringMongoSubscriptionModel
that prevented restart of subscriptions when MongoDB goes into leader election mode. - Upgraded spring-boot to 2.7.3
- Upgraded Spring Data MongoDB from 3.3.4 to 3.3.7
- InMemoryEventStore now checks for duplicate events. You can no longer write two events with the same cloud event id and source to the same stream.
- Fixed an issue with command composition in Kotlin in which, in version 0.14.5, returned all events in a stream and not only new events.
- Updated Kotlin extension functions for partial function application (
org.occurrent.application.composition.command.PartialExtensions
) to work on any type of function instead of just those that hasList
orSequence
. - Fixed an issue in JacksonCloudEventConvert in which it didn't use the CloudEventTypeMapper correctly when calling
toCloudEvent
(issue 119).
- Removed
PartialListCommandApplication
,PartialStreamCommandApplication
andPartialApplicationFunctions
in packageorg.occurrent.application.composition.command.partial
of modulecommand-composition
. They have all been replaced byorg.occurrent.application.composition.command.partial.PartialFunctionApplication
which is a generic form a partial function application that works on all kinds of functions, not only those takingStream
and/orList
. A simple search and replace should be enough to migrate. - Upgraded Jackson from 2.13.2 to 2.13.3
- Upgraded project reactor to 3.4.16 to 3.4.21
- Upgraded Spring Boot from 2.6.7 to 2.7.1
- Upgraded Java MongoDB driver from 4.5.1 to 4.6.1
- Upgraded Kotlin from 1.6.21 to 1.7.10
- Upgraded to Kotlin from 1.6.0 to 1.6.21
- Upgraded project reactor to 3.4.12 to 3.4.16
- Upgraded Spring Data MongoDB from 3.3.0 to 3.3.0
- Upgraded Spring Boot from 2.5.6 to 2.6.7
- Upgraded Java MongoDB driver from 4.4.0 to 4.5.1
- Upgraded Java cloudevents SDK from 2.2.0 to 2.3.0
- Upgraded Jackson from 2.13.0 to 2.13.2
- Upgraded Jackson Databind from 2.13.0 to 2.13.2.1
- Improved
SpringMongoEventStore
,MongoEventStore
andReactorMongoEventStore
so that they never does in-memory filtering of events that we're not interested in. - Added
oldStreamVersion
toWriteResult
(that is returned when callingwrite(..)
on an event store). ThegetStreamVersion()
method has been deprecated in favor ofgetNewStreamVersion()
. - Upgraded to Kotlin 1.6.0
- Upgraded Java MongoDB driver to 4.4.0
- Upgraded Spring Data MongoDB to 3.3.0
- Upgraded Jackson to 2.13.0
- Upgraded amqp-client to 5.14.0
-
Using
insert
fromMongoTemplate
when writing events in theSpringMongoEventStore
. Previously, the vanillamongoClient
was (accidentally) used for this operation. -
When using the spring boot starter project for MongoDB (
org.occurrent:spring-boot-starter-mongodb
), the transaction manager used by default is now configured to use "majority" read- and write concerns. To revert to the "default" settings used by Spring, or change it to your own needs, specify aMongoTransactionManager
bean. For example:@Bean public MongoTransactionManager mongoTransactionManager(MongoDatabaseFactory dbFactory) { return new MongoTransactionManager(dbFactory, TransactionOptions.builder(). .. .build()); }
-
Separating read- and query options configuration so that you can e.g. configure queries made by
EventStoreQueries
and reads from theEventStore.read(..)
separately.
This useful if you want to e.g. allow queries fromEventStoreQueries
to be made to secondary nodes but still force reads fromEventStore.read(..)
to be made from the primary. You can configure this by supplying areadOption
(to configure the reads from theEventStore
) andqueryOption
(forEventStoreQueries
) in theEventStoreConfig
. This has been implemented forSpringMongoEventStore
andReactorMongoEventStore
.
-
Non-backward compitable change: CloudEventConverter's now has a third method that you must implement:
/** * Get the cloud event type from a Java class. * * @param type The java class that represents a specific domain event type * @return The cloud event type of the domain event (cannot be {@code null}) */ @NotNull String getCloudEventType(@NotNull Class<? extends T> type);
The reason for this is that several components, such as the subscription dsl, needs to get the cloud event type from the domain event class. And since this is highly related to "cloud event conversion", this method has been added there to avoid complicating the API.
-
Introduced the concept of CloudEventTypeMapper's. A cloud event type mapper is component whose purpose it is to get the cloud event type from a domain event type and vice versa. Cloud Event Type mappers are used by certain
CloudEventConverter
's to define how they should derive the cloud event type from the domain event as well as a way to reconstruct the domain event type from the cloud event type. and the new domain queries DSL. You should use the same type mapper instance for all these components. To write a custom type mapper, depend on theorg.occurent:cloudevent-type-mapper-api
module and implement theorg.occurrent.application.converter.typemapper.CloudEventTypeMapper
(functional) interface. -
Introduced a blocking Query DSL. It's a small wrapper around the EventStoreQueries API that lets you work with domain events instead of CloudEvents. Depend on the
org.occurrent:query-dsl-blocking
module and create an instance oforg.occurrent.dsl.query.blocking.DomainEventQueries
. For example:EventStoreQueries eventStoreQueries = .. CloudEventConverter<DomainEvent> cloudEventConverter = .. DomainEventQueries<DomainEvent> domainEventQueries = new DomainEventQueries<DomainEvent>(eventStoreQueries, cloudEventConverter); Stream<DomainEvent> events = domainQueries.query(Filter.subject("someSubject"));
There's also support for skip, limits and sorting and convenience methods for querying for a single event:
Stream<DomainEvent> events = domainQueries.query(GameStarted.class, GameEnded.class); // Find only events of this type GameStarted event1 = domainQueries.queryOne(GameStarted.class); // Find the first event of this type GamePlayed event2 = domainQueries.queryOne(Filter.id("d7542cef-ac20-4e74-9128-fdec94540fda")); // Find event with this id
There are also some Kotlin extensions that you can use to query for a
Sequence
of events instead of aStream
:val events : Sequence<DomainEvent> = domainQueries.queryForSequence(GamePlayed::class, GameWon::class, skip = 2) // Find only events of this type and skip the first two events val event1 = domainQueries.queryOne<GameStarted>() // Find the first event of this type val event2 = domainQueries.queryOne<GamePlayed>(Filter.id("d7542cef-ac20-4e74-9128-fdec94540fda")) // Find event with this id
-
Introducing spring boot starter project to easily bootstrap Occurrent if using Spring. Depend on
org.occurrent:spring-boot-starter-mongodb
and create a Spring Boot application annotated with@SpringBootApplication
as you would normally do. Occurrent will then configure the following components automatically:-
Spring MongoDB Event Store instance (
EventStore
) -
A Spring
SubscriptionPositionStorage
instance -
A durable Spring MongoDB competing consumer subscription model (
SubscriptionModel
) -
A Jackson-based
CloudEventConverter
-
A
GenericApplication
instance (ApplicationService
) -
A subscription dsl instance (
Subscriptions
) -
A reflection based type mapper that uses the fully-qualified class name as cloud event type (you should absolutely override this bean for production use cases) (
CloudEventTypeMapper
) For example, by doing:@Bean public CloudEventTypeMapper<GameEvent> cloudEventTypeMapper() { return ReflectionCloudEventTypeMapper.simple(GameEvent.class); }
This will use the "simple name" (via reflection) of a domain event as the cloud event type. But since the package name is now lost, the
ReflectionCloudEventTypeMapper
will append the package name ofGameEvent
to when converting back into a domain event. This only works if all your domain events are located in the exact same package asGameEvent
. If this is not that case you need to implement a more advancedCloudEventTypeMapper
such as:class CustomTypeMapper : CloudEventTypeMapper<GameEvent> { override fun getCloudEventType(type: Class<out GameEvent>): String = type.simpleName override fun <E : GameEvent> getDomainEventType(cloudEventType: String): Class<E> = when (cloudEventType) { GameStarted::class.simpleName -> GameStarted::class GamePlayed::class.simpleName -> GamePlayed::class // Add all other events here!! ... else -> throw IllegalStateException("Event type $cloudEventType is unknown") }.java as Class<E> }
See
org.occurrent.springboot.OccurrentMongoAutoConfiguration
if you want to know exactly what gets configured. -
-
Upgraded spring-boot from 2.5.4 to 2.5.6.
- No longer using transactional reads in
ReactorMongoEventStore
, this also means that thetransactionalReads
configuration property could be removed since it's no longer used.
- Reading event streams from
MongoEventStore
andSpringMongoEventStore
are now much faster and more reliable. Before there was a bug in both implementation in which the stream could be abruptly closed when reading a large number of events. This has now been fixed, and as a consequence, Occurrent doesn't need to start a MongoDB transaction when reading an event stream, which improves performance. - Removed the
transactionalReads
property (introduced in previous release) fromEventStoreConfig
for bothMongoEventStore
andSpringMongoEventStore
since it's no longer needed. - Upgraded jackson from version 2.11.1 to 2.12.5
-
Added ability to map event type to event name in subscriptions DSL from Kotlin
-
Upgraded Kotlin to 1.5.31
-
Upgraded spring-boot used in examples to 2.5.4
-
Upgraded spring-mongodb to 3.2.5
-
Upgraded the mongodb java driver to 4.3.2
-
Upgraded project reactor to 3.4.10
-
Upgrading to cloudevents sdk 2.2.0
-
Minor tweak in ApplicationService extension function for Kotlin so that it no longer converts the Java stream into a temporary Kotlin sequence before converting it to a List
-
Allow configuring (using the
EventStoreConfig
builder) whether transactional reads should be enabled or disabled for all MongoDB event stores. This is an advanced feature, and you almost always want to have it enabled. There are two reasons for disabling it:- There's a bug/limitation on Atlas free tier clusters which yields an exception when reading large number of events in a stream in a transaction.
To workaround this you could disable transactional reads. The exception takes this form:
It's possible that this would work if you enable "no cursor timeout" on the query, but this is not allowed on Atlas free tier.
java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:79) at com.mongodb.internal.session.BaseClientSessionImpl.getServerSession(BaseClientSessionImpl.java:101) at com.mongodb.internal.session.ClientSessionContext.getSessionId(ClientSessionContext.java:44) at com.mongodb.internal.connection.ClusterClockAdvancingSessionContext.getSessionId(ClusterClockAdvancingSessionContext.java:46) at com.mongodb.internal.connection.CommandMessage.getExtraElements(CommandMessage.java:265) at com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:155) at com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138) at com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:59) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:268) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:253) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:110) at com.mongodb.internal.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:268) at com.mongodb.internal.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:141) at com.mongodb.client.internal.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:54) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
- You're set back by the performance penalty of transactions and are willing to sacrifice read consistency
If you disable transactional reads, you may end up with a mismatch between the version number in the
EventStream
and the last event returned from the event stream. This is because Occurrent does two reads to MongoDB when reading an event stream. First it finds the current version number of the stream (A), and secondly it queries for all events (B). If you disable transactional reads, then another thread might have written more events before the call to B has been made. Thus, the version number received from query A might be stale. This may or may not be a problem for your domain, but it's generally recommended having transactional reads enabled. Configuration example:EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder().transactionalReads(false). .. .build(); eventStore = new SpringMongoEventStore(mongoTemplate, eventStoreConfig);
- There's a bug/limitation on Atlas free tier clusters which yields an exception when reading large number of events in a stream in a transaction.
To workaround this you could disable transactional reads. The exception takes this form:
-
Added ability to tweak query options for reads in the event store, for example cursor timeouts, allow reads from slave etc. You can configure this in the
EventStoreConfig
for each event store by using thequeryOption
higher-order function. For example:EventStoreConfig eventStoreConfig = new EventStoreConfig.Builder().eventStoreCollectionName(connectionString.getCollection()).transactionConfig(mongoTransactionManager).timeRepresentation(TimeRepresentation.DATE) .queryOptions(query -> query.noCursorTimeout().allowSecondaryReads()).build(); var eventStore = new SpringMongoEventStore(mongoTemplate, eventStoreConfig);
Note that you must not use this to change the query itself, i.e. don't use the
Query#with(Sort)
etc. Only use options such asQuery#cursorBatchSize(int)
that doesn't change the actual query or sort order. This is an advanced feature and should be used sparingly. -
Added ability to convert a
Stream
of cloud events to domain events and vice versa in theCloudEventConverter
by overriding the newtoCloudEvents
and/ortoDomainEvents
methods. The reason for overriding any of these methods is to allow adding things such as correlation id that should be the same for all events in a stream. -
Non-backward compatible change: The cloud event converter module name has changed from
org.occurrent:cloudevent-converter
toorg.occurrent:cloudevent-converter-api
-
Non-backward compatible change: The generic cloud event converter (
org.occurrent.application.converter.generic.GenericCloudEventConverter
) has been moved to its own module, depend onorg.occurrent:cloudevent-converter-generic
to use it. -
Introduced a cloud event converter that uses XStream to (de-)serialize the domain event to cloud event data. Depend on
org.occurrent:cloudevent-converter-xstream
and then use it like this:XStream xStream = new XStream(); xStream.allowTypeHierarchy(MyDomainEvent.class); XStreamCloudEventConverter<MyDomainEvent> cloudEventConverter = new XStreamCloudEventConverter<>(xStream, URI.create("urn:occurrent:domain"));
You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder,
new XStreamCloudEventConverter.Builder<MyDomainEvent>().. build()
. -
Introduced a cloud event converter that uses Jackson to (de-)serialize the domain event to cloud event data. Depend on
org.occurrent:cloudevent-converter-jackson
and then use it like this:ObjectMapper objectMapper = new ObjectMapper(); JacksonCloudEventConverter<MyDomainEvent> cloudEventConverter = new JacksonCloudEventConverter<>(objectMapper, URI.create("urn:occurrent:domain"));
You can also configure how different attributes of the domain event should be represented in the cloud event by using the builder,
new JacksonCloudEventConverter.Builder<MyDomainEvent>().. build()
.
-
Improved error message and version for write condition not fulfilled that may happen when parallel writers write to the same stream at the same time.
-
Upgraded to cloud events java sdk to version 2.1.1
-
Upgraded to Kotlin 1.5.21
-
Added a
mapRetryPredicate
function toRetry
that easily allows you to map the current retry predicate into a new one. This is useful if you e.g. want to add an additional predicate to the existing predicate. For example:// Let's say you have a retry strategy: Retry retry = RetryStrategy.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(2), 2.0f).maxAttempts(5).retryIf(WriteConditionNotFulfilledException.class::isInstance); // Now you also want to retry if an IllegalArgumentException is thrown: retry.mapRetryPredicate(currentRetryPredicate -> currentRetryPredicate.or(IllegalArgument.class::isInstance))
-
The GenericApplicationService now has a RetryStrategy enabled by default. The default retry strategy uses exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry, if
WriteConditionNotFulfilledException
is caught. It will, by default, only retry 5 times before giving up, rethrowing the original exception. You can override the default strategy by callingnew GenericApplicationService(eventStore, cloudEventConverter, retryStrategy)
. Usenew GenericApplicationService(eventStore, cloudEventConverter, RetryStrategy.none())
to revert to previous behavior. -
Upgraded spring-boot used in examples to 2.5.3
-
Upgraded spring-mongodb to 3.2.3
-
Upgraded the mongodb java driver to 4.3.1
-
Added ability to write a single event to the event store instead of a stream. For example:
CloudEvent event = ... eventStore.write("streamId", event);
This have been implemented for both the blocking and reactive event stores.
- The event store API's now returns an instance of
org.occurrent.eventstore.api.WriteResult
when writing events to the event store (previouslyvoid
was returned). TheWriteResult
instance contains the stream id and the new stream version of the stream. The reason for this change is to make it easier to implement use cases such as "read your own writes". - The blocking ApplicationService
org.occurrent.application.service.blocking.ApplicationService
now returnsWriteResult
instead ofvoid
. - Fixed bug in
InMemoryEventStore
that accidentally could skip version numbers when new events were inserted into the database. - Improved detection of duplicate cloud event's in all MongoDB event stores
- Fixed a bug where
WriteConditionNotFulfilledException
was not thrown when a streams was updated by several threads in parallel (fixed for all mongodb event store implementations) - Upgraded Spring Boot from 2.4.2 to 2.4.4
- Upgraded reactor from 3.4.2 to 3.4.4
- Upgraded spring-data-mongodb from 3.1.1 to 3.1.7
- Upgraded lettuce-core from 6.0.1 to 6.1.0
- Upgraded mongo java client from 4.1.1 to 4.2.2
- Upgraded spring-aspects from 5.2.9.RELEASE to 5.3.5
- Upgraded spring-retry from 1.3.0 to 1.3.1
- Upgraded kotlin from 1.4.31 to 1.4.32
- Upgraded kotlinx-collections-immutable-jvm from 0.3.2 to 0.3.4
-
Fixed a bug in
InMemorySubscription
that accidentally pushednull
values to subscriptions every 500 millis unless an actual event was received. -
Renamed
org.occurrent.subscription.mongodb.spring.blocking.SpringSubscriptionModelConfig
toorg.occurrent.subscription.mongodb.spring.blocking.SpringMongoSubscriptionModelConfig
. -
Upgraded to Kotlin 1.4.31
-
All blocking subscriptions now implements the life cycle methods defined in the
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
interface. A new interface,org.occurrent.subscription.api.blocking.Subscribable
has been defined, that contains all "subscribe" methods. You can use this interface in your application if all you want to do is start subscriptions. -
Introduced a new default "StartAt" implementation called "default" (
StartAt.subscriptionModelDefault()
). This is different toStartAt.now()
in that it will allow the subscription model to choose where to start automatically if you don't want to start at an earlier position. -
Removed the ability to pass a supplier returning
StartAt
to the subscribe methods inorg.occurrent.subscription.api.blocking.Subscribable
interface. Instead, useStartAt.dynamic(supplier)
to achieve the same result. -
Upgraded to CloudEvents Java SDK 2.0.0
-
Waiting for internal message listener to be shutdown when stopping
SpringMongoSubscriptionModel
. -
Using a
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
as executor inSpringMongoSubscriptionModel
instead of the defaultorg.springframework.core.task.SimpleAsyncTaskExecutor
. The reason for this is that theDefaultMessageListenerContainer
used internally inSpringMongoSubscriptionModel
will wait for all threads in theThreadPoolTaskExecutor
to stop when stopping theSpringMongoSubscriptionModel
instance. Otherwise, a race conditions can occur when stopping and then immediately starting aSpringMongoSubscriptionModel
. -
Introducing competing consumer support! A competing consumer subscription model wraps another subscription model to allow several subscribers to subscribe to the same subscription. One of the subscribes will get a lock of the subscription and receive events from it. If a subscriber looses its lock, another subscriber will take over automatically. To achieve distributed locking, the subscription model uses a
org.occurrent.subscription.api.blocking.CompetingConsumerStrategy
to support different algorithms. You can write custom algorithms by implementing this interface yourself. Here's an example of how to use theCompetingConsumerSubscriptionModel
. First add theorg.occurrent:competing-consumer-subscription
module to classpath. This example uses theNativeMongoLeaseCompetingConsumerStrategy
from moduleorg.occurrent:subscription-mongodb-native-blocking-competing-consumer-strategy
. It also wraps the DurableSubscriptionModel which in turn wraps the Native MongoDB subscription model.MongoDatabase mongoDatabase = mongoClient.getDatabase("some-database"); SubscriptionPositionStorage positionStorage = NativeMongoSubscriptionPositionStorage(mongoDatabase, "position-storage"); SubscriptionModel wrappedSubscriptionModel = new DurableSubscriptionModel(new NativeMongoSubscriptionModel(mongoDatabase, "events", TimeRepresentation.DATE), positionStorage); // Create the CompetingConsumerSubscriptionModel NativeMongoLeaseCompetingConsumerStrategy competingConsumerStrategy = NativeMongoLeaseCompetingConsumerStrategy.withDefaults(mongoDatabase); CompetingConsumerSubscriptionModel competingConsumerSubscriptionModel = new CompetingConsumerSubscriptionModel(wrappedSubscriptionModel, competingConsumerStrategy); // Now subscribe! competingConsumerSubscriptionModel.subscribe("subscriptionId", type("SomeEvent"));
If the above code is executed on multiple nodes/processes, then only one subscriber will receive events.
-
Only log with "warn" when subscription is restarted due to "ChangeStreamHistoryLost".
-
InMemoryEventStore
now sorts queries by insertion order by default (before "time" was used) -
Added a new default compound index to MongoDB event stores,
{ streamid : 1, streamversion : 1}
. The reason for this is to get the events back in order when reading a stream from the event store and to make this efficient. Previous$natural
order was used but this would skip the index, making reads slower if you have lots of data. -
Removed the index,
{ streamid : 1, streamversion : -1 }
, from all MongoDB EventStore's. It's no longer needed now that we have{ streamid : 1, streamversion : 1}
. -
All MongoDB EventStore's now loads the events for a stream by leveraging the new
{ streamid : 1, streamversion : 1}
index. -
CatchupSubscriptionModel
now sorts by time and then by stream version to allow for a consistent read order (see MongoDB documentation). Note that the above is only true if you supply aTimeBasedSubscriptionPosition
that is not equal to ``TimeBasedSubscriptionPosition.beginningOfTime()` (which is default if no filter is supplied). -
Major change in how you can sort the result from queries. Before you only had four options, "natural" (ascending/descending) and "time" (ascending/descending), now you can specify any support CloudEvent field. This means that e.g.
SortBy.TIME_ASC
has been removed. It has been replaced with theSortBy
API (org.occurrent.eventstore.api.SortBy
), that allows you to do e.g.SortBy.time(ASCENDING)
Sorting can now be composed, e.g.
SortBy.time(ASCENDING).thenNatural(DESCENDING)
This has been implemented for all event stores.
-
It's now possible to change how
CatchupSubscriptionModel
sorts events read from the event store during catch-up phase. For example:var subscriptionModel = ... var eventStore = .. var cfg = new CatchupSubscriptionModelConfig(100).catchupPhaseSortBy(SortBy.descending(TIME)); var catchupSubscriptionModel = CatchupSubscriptionModel(subscriptionModel, eventStore, cfg);
By default, events are sorted by time and then stream version (if two or more events have the same time).
-
Added better logging to
SpringMongoSubscriptionModel
, it'll now include the subscription id if an error occurs. -
If there's not enough history available in the mongodb oplog to resume a subscription created from a
SpringMongoSubscriptionModel
, this subscription model now supports restarting the subscription from the current time automatically. This is only of concern when an application is restarted, and the subscriptions are configured to start from a position in the oplog that is no longer available. It's disabled by default since it might not be 100% safe (meaning that you can miss some events when the subscription is restarted). It's not 100% safe if you run subscriptions in a different process than the event store and you have lot's of writes happening to the event store. It's safe if you run the subscription in the same process as the writes to the event store if you make sure that the subscription is started before you accept writes to the event store on startup. To enable automatic restart, you can do like this:var subscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, SpringSubscriptionModelConfig.withConfig("events", TimeRepresentation.RFC_3339_STRING).restartSubscriptionsOnChangeStreamHistoryLost(true));
An alternative approach to restarting automatically is to use a catch-up subscription and restart the subscription from an earlier date.
-
Better shutdown handling of all executor services used by subscription models.
-
Don't log to error when a
SpringMongoSubscriptionModel
subscription is paused right after it was created, leading to a race condition. This is not an error. It's now logged in "debug" mode instead.
- Removed the automatic creation of the "streamid" index in all MongoDB event stores. The reason is that it's not needed since there's another (compound) index (streamid+version) and queries for "streamid" will be covered by that index.
- When running MongoDB subscriptions on services like Atlas, it's not possible to get the current time (global subscription position) when starting a new subscription since access is denied.
If this happens then the subscription will start at the "current time" instead (
StartAt.now()
). There's a catch however! If processing the very first event fails and the application is restarted, then the event cannot be retried. If this is major concern, consider upgrading your MongoDB server to a non-shared environment.
- Removed
org.occurrent:eventstore-inmemory
as dependency toorg.occurrent:application-service-blocking
(it should have been a test dependency) - Including a "details" message in
DuplicateCloudEventException
that adds more details on why this happens (which index etc). This is especially useful if you're creating custom, unique, indexes over the events and a write fail due to a duplicate cloud event. - Upgraded to Kotlin 1.3.40
- Upgraded project-reactor to 3.4.2 (previously 3.4.0 was used)
- When running MongoDB subscriptions on services like Atlas, it's not possible to get the current time (global subscription position) when starting a new subscription since access is denied. If this happens then the local time of the client is used instead.
-
Introduced many more life-cycle methods to blocking subscription models. It's now possible to pause/resume individual subscriptions as well as starting/stopping all subscriptions. This is useful for testing purposes when you want to write events to the event store without triggering all subscriptions. The subscription models that supports this implements the new
org.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
interface. Supported subscription models are:InMemorySubscriptionModel
,NativeMongoSubscriptionModel
andSpringMongoSubscriptionModel
. -
The
SpringMongoSubscriptionModel
now implementsorg.springframework.context.SmartLifecycle
, which means that if you define it as a bean, it allows controlling it as a regular Spring life-cycle bean. -
Introduced the
org.occurrent.subscription.api.blocking.DelegatingSubscriptionModel
interface. Subscription models that wraps other subscription models and delegates subscriptions to them implements this interface. It contains methods for getting the wrapped subscription model. This is useful for testing purposes, if the underlying subscription model needs to stopped/started etc. -
Fixed a bug with command composition that accidentally included the "previous events" when invoking the generated composition function.
-
Added more command composition extension functions for Kotlin. You can now compose lists of functions and not only sequences.
-
The
SpringMongoSubscriptionModel
now evaluates the "start at" supplier passed to thesubscribe
method each time a subscription is resumed. -
Fixed a bug in
InMemorySubscription
where thewaitUntilStarted(Duration)
method always returnedfalse
. -
InMemorySubscription
now really waits for the subscription to start when callingwaitUntilStarted(Duration)
andwaitUntilStarted
. -
Moved the
cancelSubscription
method from theorg.occurrent.subscription.api.blocking.SubscriptionModel
to theorg.occurrent.subscription.api.blocking.SubscriptionModelCancelSubscription
interface. This interface is also extended byorg.occurrent.subscription.api.blocking.SubscriptionModelLifeCycle
. -
Introduced a much improved
RetryStrategy
. You can now configure max attempts, a retry predicate, error listener as well as the backoff strategy. Retry is provided in its own module,org.occurrent:retry
, but many modules already depend on this module transitively. Here's an example:RetryStrategy retryStrategy = RetryStrategy.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(200), 2.0) .retryIf(throwable -> throwable instanceof OptimisticLockingException) .maxAttempts(5) .onError((info, throwable) -> log.warn("Caught exception {}, will retry in {} millis")), throwable.class.getSimpleName(), info.getDuration().toMillis())); retryStrategy.execute(Something::somethingThing);
RetryStrategy
is immutable, which means that you can safely do things like this:RetryStrategy retryStrategy = RetryStrategy.retry().fixed(200).maxAttempts(5); // Uses default 200 ms fixed delay retryStrategy.execute(() -> Something.something()); // Use 600 ms fixed delay retryStrategy.backoff(fixed(600)).execute(() -> SomethingElse.somethingElse()); // 200 ms fixed delay again retryStrategy.execute(() -> Thing.thing());
-
Renamed method
shutdownSubscribers
inDurableSubscriptonModel
toshutdown
. -
Added default subscription name to subscription DSL. You can now do:
subscriptions(subscriptionModel) { subscribe<NameDefined> { e -> log.info("Hello ${e.name}") } }
The id of the subscription will be "NameDefine" (the unqualified name of the
NameDefined
class). -
Added
exists
method toEventStoreQueries
API (both blocking and reactive). This means that you can easily check if events exists, for example:val doesSomeTypeExists = eventStoreQueries.exists(type("sometype"))
-
Added retry strategy support to SpringMongoSubscriptionPositionStorage. You can define your own by passing an instance of
RetryStrategy
to the constructor. By default it'll add a retry strategy with exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry when reading/saving/deleting the subscription position. -
Added retry strategy support to NativeMongoSubscriptionPositionStorage. You can define your own by passing an instance of
RetryStrategy
to the constructor. By default it'll add a retry strategy with exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry when reading/saving/deleting the subscription position. -
Added retry strategy support to SpringRedisSubscriptionPositionStorage. You can define your own by passing an instance of
RetryStrategy
to the constructor. By default it'll add a retry strategy with exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry when reading/saving/deleting the subscription position. -
Added retry strategy support to SpringMongoSubscriptionModel. You can define your own by passing an instance of
RetryStrategy
to the constructor. By default it'll add a retry strategy with exponential backoff starting with 100 ms and progressively go up to max 2 seconds wait time between each retry when exceptions are thrown from theaction
callback (the callback that you implement to handle aCloudEvent
instance from a subscription). -
All blocking subscription models will throw an
IllegalArgumentException
if a subscription is registered more than once.
- Renamed
org.occurrent.subscription.redis.spring.blocking.SpringSubscriptionPositionStorageForRedis
toSpringRedisSubscriptionPositionStorage
. - Renamed
org.occurrent.subscription.mongodb.spring.reactor.ReactorMongoSubscription
toReactorMongoSubscriptionModel
.
-
Renamed
org.occurrent.subscription.api.blocking.BlockingSubscription
toorg.occurrent.subscription.api.blocking.SubscriptionModel
. The reason for this is that it was previously very confusing to differentiate between aorg.occurrent.subscription.api.blocking.BlockingSubscription
(where you start/cancel subscriptions) and aorg.occurrent.subscription.api.blocking.Subscription
(the actual subscription instance). The same thinking has been applied to the reactor counterparts as well (org.occurrent.subscription.api.reactor.ReactorSubscription
has now been renamed toorg.occurrent.subscription.api.reactor.SubscriptionModel
). -
Derivatives of
org.occurrent.subscription.api.blocking.BlockingSubscription
such asPositionAwareBlockingSubscription
has been renamed toorg.occurrent.subscription.api.blockking.PositionAwareSubscriptionModel
. -
Derivatives of the reactor counterpart,
org.occurrent.subscription.api.reactor.PositionAwareReactorSubscription
has been renamedto
, such as has been renamed toorg.occurrent.subscription.api.reactor.PositionAwareSubscriptionModel
. -
org.occurrent.subscription.util.blocking.catchup.subscription.CatchupSubscriptionModelConfig
has been renamed toorg.occurrent.subscription.blocking.catchup.CatchupSubscriptionModelConfig
. -
org.occurrent.subscription.util.blocking.catchup.subscription.CatchupSubscriptionModel
has been renamed toorg.occurrent.subscription.blocking.catchup.CatchupSubscriptionModel
. -
org.occurrent.subscription.util.blocking.AutoPersistingSubscriptionModelConfig
has been renamed toorg.occurrent.subscription.blocking.durable.DurableSubscriptionModelConfig
. -
org.occurrent.subscription.util.blocking.BlockingSubscriptionWithAutomaticPositionPersistence
has been renamed toorg.occurrent.subscription.blocking.durable.DurableSubscriptionModel
. -
org.occurrent.subscription.mongodb.nativedriver.blocking.BlockingSubscriptionForMongoDB
has been renamed toNativeMongoSubscriptionModel
. -
org.occurrent.subscription.mongodb.nativedriver.blocking.BlockingSubscriptionPositionStorageForMongoDB
has been renamed toNativeMongoSubscriptionPositionStorage
. -
Removed
org.occurrent.subscription.mongodb.nativedriver.blocking.BlockingSubscriptionWithPositionPersistenceInMongoDB
. Use anorg.occurrent.subscription.blocking.DurableSubscriptionModel
from moduleorg.occurrent:durable-subscription
instead. -
org.occurrent.subscription.mongodb.spring.blocking.MongoDBSpringSubscription
has been renamed toSpringMongoSubscription
. -
org.occurrent.subscription.mongodb.spring.blocking.SpringBlockingSubscriptionForMongoDB
has been renamed toSpringMongoSubscription
. -
org.occurrent.subscription.mongodb.spring.blocking.SpringMongoDBSubscriptionPositionStorage
has been renamed toSpringMongoSubscriptionPositionStorage
. -
org.occurrent.subscription.mongodb.spring.reactor.SpringReactorSubscriptionForMongoDB
has been renamed toReactorMongoSubscription
. -
org.occurrent.subscription.mongodb.spring.reactor.SpringReactorSubscriptionPositionStorageForMongoDB
has been renamed toReactorSubscriptionPositionStorage
. -
org.occurrent.subscription.util.reactor.ReactorSubscriptionWithAutomaticPositionPersistence
has been renamed toorg.occurrent.subscription.reactor.durable.ReactorDurableSubscriptionModel
. -
org.occurrent.subscription.util.reactor.ReactorSubscriptionWithAutomaticPositionPersistenceConfig
has been renamed toorg.occurrent.subscription.reactor.durable.ReactorDurableSubscriptionConfig
. -
org.occurrent.eventstore.mongodb.spring.reactor.SpringReactorMongoEventStore
has been renamed toReactorMongoEventStore
since "Spring" is implicit. -
org.occurrent.subscription.mongodb.MongoDBFilterSpecification
has been renamed toMongoFilterSpecification
. -
org.occurrent.subscription.mongodb.MongoDBFilterSpecification.JsonMongoDBFilterSpecification
has been renamed toMongoJsonFilterSpecification
. -
org.occurrent.subscription.mongodb.MongoDBFilterSpecification.BsonMongoDBFilterSpecification
has been renamed toMongoBsonFilterSpecification
. -
org.occurrent.subscription.mongodb.internal.MongoDBCloudEventsToJsonDeserializer
has been renamed toMongoCloudEventsToJsonDeserializer
. -
org.occurrent.subscription.mongodb.internal.MongoDBCommons
has been renamed toMongoCommons
. -
org.occurrent.subscription.mongodb.MongoDBOperationTimeBasedSubscriptionPosition
has been renamed toMongoOperationTimeSubscriptionPosition
. -
org.occurrent.subscription.mongodb.MongoDBResumeTokenBasedSubscriptionPosition
has been renamed toMongoResumeTokenSubscriptionPosition
. -
org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDBDocumentMapper
has been renamed toOccurrentCloudEventMongoDocumentMapper
. -
org.occurrent.eventstore.mongodb.spring.blocking.SpringBlockingMongoEventStore
has been renamed toSpringMongoEventStore
. -
Renamed module
org.occurrent:subscription-util-blocking-catchup-subscription
toorg.occurrent:catchup-subscription
. -
Renamed module
org.occurrent:subscription-util-blocking-automatic-position-persistence
toorg.occurrent:durable-subscription
. -
Renamed module
org.occurrent:subscription-util-reactor-automatic-position-persistence
toorg.occurrent:reactor-durable-subscription
. -
Moved
org.occurrent.application.converter.implementation.GenericCloudEventConverter
toorg.occurrent.application.converter.generic.GenericCloudEventConverter
. -
Moved
org.occurrent.application.service.blocking.implementation.GenericApplicationService
toorg.occurrent.application.service.blocking.generic.GenericApplicationService
. -
Added a new "Subscription DSL" module that adds a domain event specific abstraction on-top of the existing subscription model api's. This DSL makes it easier to create subscriptions that are using domain events instead of cloud events. The module is called
org.occurrent:subscription-dsl
. For example:val subscriptionModel = SpringMongoSubscriptionModel(..) val cloudEventConverter = GenericCloudEventConverter<DomainEvent>(..) // Subscription DSL subscriptions(subscriptionModel, cloudEventConverter) { subscribe<GameStarted>("id1") { gameStarted -> log.info("Game was started $gameStarted") } subscribe<GameWon, GameLost>("id2") { domainEvent -> log.info("Game was either won or lost: $domainEvent") } subscribe("everything") { domainEvent -> log.info("I subscribe to every event: $domainEvent") } }
-
Implemented ability to delete cloud events by a filter in the in-memory event store.
-
Added "listener" support to the in-memory event store. This means that you can supply a "listener" (a consumer) to the
InMemoryEventStore
constructor that will be invoked (synchronously) after new events have been written. This is mainly useful to allow in-memory subscription models. -
Added an in-memory subscription model that can be used to subscribe to events from the in-memory event store. Add module
org.occurrent:subscription-inmemory
and then instantiate it using:InMemorySubscriptionModel inMemorySubscriptionModel = new InMemorySubscriptionModel(); InMemoryEventStore inMemoryEventStore = new InMemoryEventStore(inMemorySubscriptionModel); inMemorySubscriptionModel.subscribe("subscription1", System.out::println);
-
Renamed groupId
org.occurrent.inmemory
toorg.occurrent
for consistency. This means that you should depend on moduleorg.occurrent:eventstore-inmemory
instead oforg.occurrent.inmemory:eventstore-inmemory
when using the in-memory event store. -
Added support for querying the in-memory event store (all fields expect the "data" field works)
-
Changed from
Executor
toExecutorService
inNativeMongoSubscriptionModel
in theorg.occurrent:subscription-mongodb-native-blocking
module. -
Added a
@PreDestroy
annotation to theshutdown
method in theNativeMongoSubscriptionModel
implementation so that, if you're frameworks such as Spring Boot, you don't need to explicitly call theshutdown
method when stopping. -
Added partial extension functions for
List<DomainEvent>
, import from thepartial
method fromorg.occurrent.application.composition.command
.
- Upgraded to Kotlin 1.4.21
- Upgraded to cloud events 2.0.0.RC2
-
Upgraded to Kotlin 1.4.20
-
Upgraded to cloud events 2.0.0.RC1
-
Breaking change! The attributes added by the Occurrent cloud event extension has been renamed from "streamId" and "streamVersion" to "streamid" and "streamversion" to comply with the specification.
-
Added optimized support for
io.cloudevents.core.data.PojoCloudEventData
. Occurrent can convertPojoCloudEventData
that containsMap<String, Object>
andString
efficiently. -
Breaking change! Removed
org.occurrent.eventstore.mongodb.cloudevent.DocumentCloudEventData
since it's no longer needed after the CloudEvent SDK has introducedPojoCloudEventData
. UsePojoCloudEventData
and pass the document or preferably, map, to it. -
Removed the
org.occurrent:application-service-blocking-kotlin
module, useorg.occurrent:application-service-blocking
instead. The Kotlin extension functions are provided with that module instead. -
Added partial function application support for Kotlin. Depend on module
org.occurrent:command-composition
and import extension functions fromorg.occurrent.application.composition.command.partial
. This means that instead of doing:val playerId = ... applicationService.execute(gameId) { events -> Uno.play(events, Timestamp.now(), playerId, DigitCard(Three, Blue)) }
you can do:
val playerId = ... applicationService.execute(gameId, Uno::play.partial(Timestamp.now(), playerId, DigitCard(Three, Blue)))
-
Added command composition support for Kotlin. Depend on module
org.occurrent:command-composition
and import extension functions fromorg.occurrent.application.composition.command.*
. This means that you can compose two functions like this using theandThen
(infix) function:val numberOfPlayers = 4 val timestamp = Timestamp.now() applicationService.execute(gameId, Uno::start.partial(gameId, timestamp, numberOfPlayers) andThen Uno::play.partial(timestamp, player1, DigitCard(Three, Blue)))
In the example above,
start
andplay
will be composed together into a single "command" that will be executed atomically.If you have more than two commands, it could be easier to use the
composeCommand
function instead of repeatingandThen
:val numberOfPlayers = 4 val timestamp = Timestamp.now() applicationService.execute(gameId, composeCommands( Uno::start.partial(gameId, timestamp, numberOfPlayers), Uno::play.partial(timestamp, player1, DigitCard(Three, Blue)), Uno::play.partial(timestamp, player2, DigitCard(Four, Blue)) ) )
-
Added Kotlin extension functions to the blocking event store. They make it easier to write, read and query the event store with Kotlin
Sequence
's. Import extension functions from packageorg.occurrent.eventstore.api.blocking
. -
Added support for deleting events from event store using a
org.occurrent.filter.Filter
. For example:eventStoreOperations.delete(streamId("myStream").and(streamVersion(lte(19L)));
This will delete all events in stream "myStream" that has a version less than or equal to 19. This is useful if you implement "closing the books" or certain types of snapshots, and don't need the old events anymore. This has been implemented for all MongoDB event stores (both blocking and reactive) but not for the InMemory event store.
-
Upgraded Java Mongo driver from 4.0.4 to 4.1.1
-
Upgraded to cloud events 2.0.0-milestone4. This introduces a breaking change since the
CloudEvent
SDK no longer returns abyte[]
as data but rather aCloudEventData
interface. You need to change your code from:byte[] data = cloudEvent.getData();
to
byte[] data = cloudEvent.getData().toBytes();
-
Fixed so that not only JSON data can be used as cloud event data. Now the content-type of the event is taken into consideration, and you can store any kind of data.
-
Introduced
org.occurrent.eventstore.mongodb.cloudevent.DocumentCloudEventData
, cloud event data will be represented in this format with loading events from an event store. This means that you could check if theCloudEventData
returned bycloudEvent.getData()
is instance ofDocumentCloudEventData
and if so extract the underlyingorg.bson.Document
that represent the data in the database. -
Occurrent no longer needs to perform double encoding of the cloud event data if content type is json. Instead of serializing the content manually to a
byte[]
you can use either the built-inJsonCloudEventData
class from thecloudevents-json-jackson
module, or use theDocumentCloudEventData
provided by Occurrent to avoid this. -
Upgrading to spring-data-mongodb 3.1.1
-
Upgrading to reactor 3.4.0
-
The MongoDB event stores no longer needs to depend on the
cloudevents-json-jackson
module since Occurrent now ships with a custom event reader/writer. -
The MongoDB event subscriptions no longer needs to depend on the
cloudevents-json-jackson
module since Occurrent now ships with a custom event reader/writer.
- Fixed typo in
CatchupSupportingBlockingSubscriptionConfig
, renamed methoddontSubscriptionPositionStorage
todontUseSubscriptionPositionStorage
. - Added
getSubscriptionPosition()
toPositionAwareCloudEvent
that returnsOptional<SubscriptionPosition>
. - Removed duplicate
GenericCloudEventConverter
located in theorg.occurrent.application.service.blocking.implementation
package. Useorg.occurrent.application.converter.implementation.CloudEventConverter
instead. - Handling if the domain model returns a null
Stream<DomainEvent>
in theGenericApplicationService
.
- Renamed method
CloudEventWithSubscriptionPosition.getStreamPosition()
toCloudEventWithSubscriptionPosition.getSubscriptionPosition()
since this was a typo. - Added ability to provide a list of conditions when composing them with
and
andor
. - Added special convenience (overloaded) method for creating "or" with "equal to" conditions. For example you can now do:
filter(type(or("x", "y"))
. Before you had to do:filter(type(or(eq("x"), eq("y")))
. - MongoDB event streams are now explicitly sorted by natural order by default. The reason for this is that just relying on default "sorting" on read lead to wrong order on certain occasions.
- Writing an empty stream to a mongodb-based event store will just ignore the stream and not try to persist the empty stream to the datastore.
- Upgraded to cloudevents sdk 2.0.0-milestone3
- Non-backward compatible change:
CatchupSupportingBlockingSubscription
no longer requires a subscription position storage during the catch-up phase. Instead, you pass the storage implementation toCatchupSupportingBlockingSubscriptionConfig
along with the position persistence predicate. BlockingSubscriptionWithAutomaticPositionPersistence
now implements thePositionAwareBlockingSubscription
interface- Removed the generic type T from the
org.occurrent.subscription.api.blocking.SubscriptionModel
andorg.occurrent.subscription.api.reactor.SubscriptionModel
. The reason for this was the implementation returning different kinds of CloudEvent implementations where not compatible. For example if you created a Spring Bean with aT
ofCloudEventWithSubscriptionPosition
then such a subscription couldn't be assigned to a field expecting a subscription with justCloudEvent
. To avoid having users to know which cloud event implementation to expect, we change the API so that it always deals with pureCloudEvent
's. Implementors now have to useorg.occurrent.subscription.PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent)
to get the position. It's also possible to check if aCloudEvent
contains a subscription position by callingorg.occurrent.subscription.PositionAwareCloudEvent.hasSubscriptionPosition(cloudEvent)
. - Fixed several corner-cases for the
CatchupSupportingBlockingSubscription
, it should now be safer to use and produce fewer duplicates when switching from catch-up to continuous subscription mode. - Added "exists" method to the
BlockingSubscriptionPositionStorage
interface (and implemented for all implementations of this interface). - The global position of
PositionAwareBlockingSubscription
for MongoDB increases the "increment" of the currentBsonTimestamp
by 1 in order to avoid duplicate potential duplication of events during replay. - Added a generic application service implementation (and interfaces). You don't have to use it, it's ok to simply cut and paste and make custom changes. You
can also write your own class. The implementation,
org.occurrent.application.service.blocking.implementation.GenericApplicationService
, quite simplistic but should cover most of the basic use cases. The application service uses aorg.occurrent.application.converter.CloudEventConverter
to convert to and from cloud events and your custom domain events. This is why bothCloudEventConverter
andApplicationService
takes a generic type parameter,T
, which is the type of your custom domain event. Note that the application service is not yet implemented for the reactive event store. The application service also contains a way to execute side-effects after the events are written to the event store. This is useful for executing synchronous policies after the events are written to the event store. If policies write the the same database as your event store, you start a transaction and write both policies and events in the same transaction!
There are also Kotlin extension functions for the application service and policies in theorg.occurrent:application-service-blocking
module. - Added utilities,
org.occurrent:command-composition
for to easier do command composition when calling an application service. This module also contains utilities for doing partial application of functions which can be useful when composing functions.
- Catchup subscriptions (blocking)
- EveryN for stream persistence (both blocking and reactive)
- Added "count" to EventStoreQueries (both blocking and reactive)
- Added ability to query for "data" attribute in EventStoreQueries and subscriptions