This library(Scala wrapper for j5ik2o/event-store-adapter-java) is designed to turn DynamoDB into an Event Store for CQRS/Event Sourcing.
Add the following to your build.sbt
(2.13.x, 3.0.x):
val version = "..."
libraryDependencies += Seq(
"com.github.j5ik2o" %% "event-store-adapter-scala" % version,
)
You can easily implement an Event Sourcing-enabled repository using EventStore.
class UserAccountRepositoryAsync(
eventStoreAsync: EventStoreAsync[UserAccountId, UserAccount, UserAccountEvent]
) {
def store(userAccountEvent: UserAccountEvent, version: Long)
(implicit ec: ExecutionContext): Future[Unit] =
eventStoreAsync.persistEvent(userAccountEvent, version)
def store(userAccountEvent: UserAccountEvent, userAccount: UserAccount)
(implicit ec: ExecutionContext): Future[Unit] =
eventStoreAsync.persistEventAndSnapshot(userAccountEvent, userAccount)
def findById(id: UserAccountId)
(implicit ec: ExecutionContext): Future[Option[UserAccount]] = {
eventStoreAsync.getLatestSnapshotById(classOf[UserAccount], id).flatMap {
case Some((userAccount, version)) =>
eventStoreAsync
.getEventsByIdSinceSequenceNumber(
classOf[UserAccountEvent], id, userAccount.sequenceNumber + 1).map { events =>
Some(UserAccount.replay(events, userAccount, version))
}
case None =>
Future.successful(None)
}
}
}
The following is an example of the repository usage.
val eventStore = EventStoreAsync.ofDynamoDB[UserAccountId, UserAccount, UserAccountEvent](
dynamodbClient,
journalTableName,
snapshotTableName,
journalAidIndexName,
snapshotAidIndexName,
32
)
val repository = new UserAccountRepositoryAsync(eventStore)
val id = UserAccountId(UUID.randomUUID().toString)
val (aggregate, event) = UserAccount.create(id, "test-1")
val result = for {
_ <- repository.store(event, aggregate)
aggregate <- repository.findById(id)
} yield aggregate
MIT License. See LICENSE for details.