Split EventSourcing
This commit is contained in:
parent
49d6b6c8cc
commit
4bd2a1518b
6 changed files with 94 additions and 78 deletions
7
README.md
Normal file
7
README.md
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
# Scala 3 - ZIO
|
||||||
|
|
||||||
|
## Run
|
||||||
|
```bash
|
||||||
|
mill api.run
|
||||||
|
```
|
||||||
|
|
||||||
59
core/src/lu/foyer/CommandEngine.scala
Normal file
59
core/src/lu/foyer/CommandEngine.scala
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
package lu.foyer
|
||||||
|
|
||||||
|
import zio.*
|
||||||
|
|
||||||
|
import lu.foyer.auth.UserInfo
|
||||||
|
|
||||||
|
class CommandEngine[Command, Event, State](
|
||||||
|
val handlers: List[CommandHandler[Command, Event, State]],
|
||||||
|
reducer: Reducer[Event, State],
|
||||||
|
val eventRepo: EventRepository[Event],
|
||||||
|
val stateRepo: StateRepository[State]):
|
||||||
|
|
||||||
|
def handleCommand(command: Command, name: String, entityId: String)
|
||||||
|
: ZIO[UserInfo, JsonApiError | Throwable, (lu.foyer.Event[Event], lu.foyer.Entity[State])] =
|
||||||
|
for
|
||||||
|
handler <- ZIO
|
||||||
|
.succeed(handlers.find(_.name == name))
|
||||||
|
.someOrFail(new IllegalArgumentException(s"No handler found for command $name"))
|
||||||
|
entityOption <- stateRepo.fetchOne(entityId)
|
||||||
|
(event, newStateOption) <- transition(command, name, entityId, entityOption, handler)
|
||||||
|
newState <-
|
||||||
|
ZIO
|
||||||
|
.succeed(newStateOption)
|
||||||
|
.someOrFail(new IllegalArgumentException("Reducer cannot resolve state transition"))
|
||||||
|
newEntity = Entity(entityId, newState, entityOption.map(_.version).getOrElse(1))
|
||||||
|
_ <-
|
||||||
|
if entityOption.isEmpty then stateRepo.insert(newEntity.entityId, newEntity)
|
||||||
|
else stateRepo.update(newEntity.entityId, newEntity)
|
||||||
|
eventEntity <- Random.nextUUID.map(id => Event(newEntity.entityId, event, id.toString))
|
||||||
|
_ <- eventRepo.insert(eventEntity.eventId, eventEntity)
|
||||||
|
yield (eventEntity, newEntity)
|
||||||
|
|
||||||
|
private def transition(
|
||||||
|
command: Command,
|
||||||
|
name: String,
|
||||||
|
entityId: String,
|
||||||
|
entityOption: Option[Entity[State]],
|
||||||
|
handler: CommandHandler[Command, Event, State]
|
||||||
|
): ZIO[UserInfo, JsonApiError | Throwable, (Event, Option[State])] = (entityOption, handler) match
|
||||||
|
case (None, h) if !h.isUpdate =>
|
||||||
|
h.asInstanceOf[CommandHandlerCreate[Command, Event]]
|
||||||
|
.onCommand(entityId, command)
|
||||||
|
.map(event => (event, reducer.reduce(event)))
|
||||||
|
case (Some(entity), h) if h.isUpdate =>
|
||||||
|
h.asInstanceOf[CommandHandlerUpdate[Command, Event, State]]
|
||||||
|
.onCommand(entityId, entity.data, command)
|
||||||
|
.map(event => (event, reducer.reduce(entity.data, event)))
|
||||||
|
case (Some(_), h) if !h.isUpdate =>
|
||||||
|
ZIO.fail(
|
||||||
|
new IllegalArgumentException(s"State already exists when applying create command $name")
|
||||||
|
)
|
||||||
|
case (None, h) if h.isUpdate =>
|
||||||
|
ZIO.fail(new IllegalArgumentException(s"No state found to apply the update command $name"))
|
||||||
|
case _ => ZIO.fail(new IllegalArgumentException("Impossible state"))
|
||||||
|
end CommandEngine
|
||||||
|
|
||||||
|
object CommandEngine:
|
||||||
|
def live[Command: Tag, Event: Tag, State: Tag] =
|
||||||
|
ZLayer.fromFunction(CommandEngine[Command, Event, State](_, _, _, _))
|
||||||
24
core/src/lu/foyer/CommandHandler.scala
Normal file
24
core/src/lu/foyer/CommandHandler.scala
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
package lu.foyer
|
||||||
|
|
||||||
|
import zio.*
|
||||||
|
import zio.schema.Schema
|
||||||
|
|
||||||
|
import lu.foyer.auth.UserInfo
|
||||||
|
|
||||||
|
trait CommandHandler[+Command, +Event, +State]:
|
||||||
|
def name: String
|
||||||
|
def isCreate: Boolean
|
||||||
|
inline def isUpdate: Boolean = !isCreate
|
||||||
|
def commandSchema: Schema[?]
|
||||||
|
|
||||||
|
trait CommandHandlerCreate[Command: Schema, Event] extends CommandHandler[Command, Event, Nothing]:
|
||||||
|
def onCommand(entityId: String, command: Command): ZIO[UserInfo, JsonApiError | Throwable, Event]
|
||||||
|
val isCreate = true
|
||||||
|
val commandSchema = summon[Schema[Command]]
|
||||||
|
|
||||||
|
trait CommandHandlerUpdate[Command: Schema, Event, State]
|
||||||
|
extends CommandHandler[Command, Event, State]:
|
||||||
|
def onCommand(entityId: String, state: State, command: Command)
|
||||||
|
: ZIO[UserInfo, JsonApiError | Throwable, Event]
|
||||||
|
val isCreate = false
|
||||||
|
val commandSchema = summon[Schema[Command]]
|
||||||
|
|
@ -22,75 +22,3 @@ trait Reducer[Event, State]:
|
||||||
|
|
||||||
def reduce(state: State, event: Event): Option[State] =
|
def reduce(state: State, event: Event): Option[State] =
|
||||||
fromState.lift((state, event))
|
fromState.lift((state, event))
|
||||||
|
|
||||||
trait CommandHandler[+Command, +Event, +State]:
|
|
||||||
def name: String
|
|
||||||
def isCreate: Boolean
|
|
||||||
inline def isUpdate: Boolean = !isCreate
|
|
||||||
def commandSchema: Schema[?]
|
|
||||||
|
|
||||||
trait CommandHandlerCreate[Command: Schema, Event] extends CommandHandler[Command, Event, Nothing]:
|
|
||||||
def onCommand(entityId: String, command: Command): ZIO[UserInfo, JsonApiError | Throwable, Event]
|
|
||||||
val isCreate = true
|
|
||||||
val commandSchema = summon[Schema[Command]]
|
|
||||||
|
|
||||||
trait CommandHandlerUpdate[Command: Schema, Event, State]
|
|
||||||
extends CommandHandler[Command, Event, State]:
|
|
||||||
def onCommand(entityId: String, state: State, command: Command)
|
|
||||||
: ZIO[UserInfo, JsonApiError | Throwable, Event]
|
|
||||||
val isCreate = false
|
|
||||||
val commandSchema = summon[Schema[Command]]
|
|
||||||
|
|
||||||
class CommandEngine[Command, Event, State](
|
|
||||||
val handlers: List[CommandHandler[Command, Event, State]],
|
|
||||||
reducer: Reducer[Event, State],
|
|
||||||
val eventRepo: EventRepository[Event],
|
|
||||||
val stateRepo: StateRepository[State]):
|
|
||||||
|
|
||||||
def handleCommand(command: Command, name: String, entityId: String)
|
|
||||||
: ZIO[UserInfo, JsonApiError | Throwable, (lu.foyer.Event[Event], lu.foyer.Entity[State])] =
|
|
||||||
for
|
|
||||||
handler <- ZIO
|
|
||||||
.succeed(handlers.find(_.name == name))
|
|
||||||
.someOrFail(new IllegalArgumentException(s"No handler found for command $name"))
|
|
||||||
entityOption <- stateRepo.fetchOne(entityId)
|
|
||||||
(event, newStateOption) <- transition(command, name, entityId, entityOption, handler)
|
|
||||||
newState <-
|
|
||||||
ZIO
|
|
||||||
.succeed(newStateOption)
|
|
||||||
.someOrFail(new IllegalArgumentException("Reducer cannot resolve state transition"))
|
|
||||||
newEntity = Entity(entityId, newState, entityOption.map(_.version).getOrElse(1))
|
|
||||||
_ <-
|
|
||||||
if entityOption.isEmpty then stateRepo.insert(newEntity.entityId, newEntity)
|
|
||||||
else stateRepo.update(newEntity.entityId, newEntity)
|
|
||||||
eventEntity <- Random.nextUUID.map(id => Event(newEntity.entityId, event, id.toString))
|
|
||||||
_ <- eventRepo.insert(eventEntity.eventId, eventEntity)
|
|
||||||
yield (eventEntity, newEntity)
|
|
||||||
|
|
||||||
private def transition(
|
|
||||||
command: Command,
|
|
||||||
name: String,
|
|
||||||
entityId: String,
|
|
||||||
entityOption: Option[Entity[State]],
|
|
||||||
handler: CommandHandler[Command, Event, State]
|
|
||||||
): ZIO[UserInfo, JsonApiError | Throwable, (Event, Option[State])] = (entityOption, handler) match
|
|
||||||
case (None, h) if !h.isUpdate =>
|
|
||||||
h.asInstanceOf[CommandHandlerCreate[Command, Event]]
|
|
||||||
.onCommand(entityId, command)
|
|
||||||
.map(event => (event, reducer.reduce(event)))
|
|
||||||
case (Some(entity), h) if h.isUpdate =>
|
|
||||||
h.asInstanceOf[CommandHandlerUpdate[Command, Event, State]]
|
|
||||||
.onCommand(entityId, entity.data, command)
|
|
||||||
.map(event => (event, reducer.reduce(entity.data, event)))
|
|
||||||
case (Some(_), h) if !h.isUpdate =>
|
|
||||||
ZIO.fail(
|
|
||||||
new IllegalArgumentException(s"State already exists when applying create command $name")
|
|
||||||
)
|
|
||||||
case (None, h) if h.isUpdate =>
|
|
||||||
ZIO.fail(new IllegalArgumentException(s"No state found to apply the update command $name"))
|
|
||||||
case _ => ZIO.fail(new IllegalArgumentException("Impossible state"))
|
|
||||||
end CommandEngine
|
|
||||||
|
|
||||||
object CommandEngine:
|
|
||||||
def live[Command: Tag, Event: Tag, State: Tag] =
|
|
||||||
ZLayer.fromFunction(CommandEngine[Command, Event, State](_, _, _, _))
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package clients
|
||||||
|
|
||||||
import zio.*
|
import zio.*
|
||||||
|
|
||||||
class ClientReducer() extends Reducer[ClientEvent, ClientState]:
|
object ClientReducer extends Reducer[ClientEvent, ClientState]:
|
||||||
|
|
||||||
override val fromEmpty =
|
override val fromEmpty =
|
||||||
case e: ClientEvent.Created => ClientState.create(e)
|
case e: ClientEvent.Created => ClientState.create(e)
|
||||||
|
|
@ -12,5 +12,4 @@ class ClientReducer() extends Reducer[ClientEvent, ClientState]:
|
||||||
case (s: ClientState.Actif, e: ClientEvent.Updated) => s.update(e)
|
case (s: ClientState.Actif, e: ClientEvent.Updated) => s.update(e)
|
||||||
case (s: ClientState.Actif, _: ClientEvent.Disabled) => s.disable()
|
case (s: ClientState.Actif, _: ClientEvent.Disabled) => s.disable()
|
||||||
|
|
||||||
object ClientReducer:
|
val live = ZLayer.succeed(ClientReducer)
|
||||||
val live: ULayer[Reducer[ClientEvent, ClientState]] = ZLayer.succeed(ClientReducer())
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package contracts
|
||||||
|
|
||||||
import zio.*
|
import zio.*
|
||||||
|
|
||||||
class ContractReducer() extends Reducer[ContractEvent, ContractState]:
|
object ContractReducer extends Reducer[ContractEvent, ContractState]:
|
||||||
|
|
||||||
override val fromEmpty =
|
override val fromEmpty =
|
||||||
case e: ContractEvent.Subscribed => ContractState.subscribe(e)
|
case e: ContractEvent.Subscribed => ContractState.subscribe(e)
|
||||||
|
|
@ -22,5 +22,4 @@ class ContractReducer() extends Reducer[ContractEvent, ContractState]:
|
||||||
case (s: ContractState.PendingAmendment, _: ContractEvent.Rejected) => s.reject()
|
case (s: ContractState.PendingAmendment, _: ContractEvent.Rejected) => s.reject()
|
||||||
case (s: ContractState.PendingAmendment, _: ContractEvent.Terminated) => s.terminate()
|
case (s: ContractState.PendingAmendment, _: ContractEvent.Terminated) => s.terminate()
|
||||||
|
|
||||||
object ContractReducer:
|
val live = ZLayer.succeed(ContractReducer)
|
||||||
val live: ULayer[Reducer[ContractEvent, ContractState]] = ZLayer.succeed(ContractReducer())
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue