Add commands engine
This commit is contained in:
parent
1919e4b72c
commit
91584c18d5
9 changed files with 336 additions and 58 deletions
74
core/src/lu/foyer/EventSourcing.scala
Normal file
74
core/src/lu/foyer/EventSourcing.scala
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
package lu.foyer
|
||||
|
||||
import zio.*
|
||||
import java.util.UUID
|
||||
|
||||
final case class Entity[T](entityId: UUID, data: T, version: Long)
|
||||
final case class Event[T](entityId: UUID, data: T, eventId: UUID)
|
||||
|
||||
trait StateRepository[State] extends Repository[Entity[State], UUID]
|
||||
trait EventRepository[State] extends Repository[Event[State], UUID]
|
||||
|
||||
trait Reducer[Event, State]:
|
||||
def reduce(event: Event): Option[State]
|
||||
def reduce(state: State, event: Event): Option[State]
|
||||
|
||||
trait CommandHandler[Command, Event, State]:
|
||||
def name: String
|
||||
|
||||
trait CommandHandlerCreate[Command, Event, State] extends CommandHandler[Command, Event, State]:
|
||||
def onCommand(entityId: UUID, command: Command): Task[Event]
|
||||
|
||||
trait CommandHandlerUpdate[Command, Event, State] extends CommandHandler[Command, Event, State]:
|
||||
def onCommand(entityId: UUID, state: State, command: Command): Task[Event]
|
||||
|
||||
class CommandEngine[Command, Event, State](
|
||||
val handlers: List[CommandHandler[Command, Event, State]],
|
||||
reducer: Reducer[Event, State],
|
||||
val eventRepo: EventRepository[Event],
|
||||
val stateRepo: StateRepository[State]):
|
||||
|
||||
def handleCommand(command: Command, name: String, entityId: UUID): Task[(Event, State)] =
|
||||
for
|
||||
handler <- ZIO
|
||||
.succeed(handlers.find(_.name == name))
|
||||
.someOrFail(new IllegalArgumentException(s"No handler found for command $name"))
|
||||
entityOption <- stateRepo.fetchOne(entityId)
|
||||
(event, newStateOption) <- transition(command, name, entityId, entityOption, handler)
|
||||
newState <-
|
||||
ZIO
|
||||
.succeed(newStateOption)
|
||||
.someOrFail(new IllegalArgumentException("Reducer cannot resolve state transition"))
|
||||
newEntity <-
|
||||
Random.nextUUID.map(Entity(_, newState, entityOption.map(_.version).getOrElse(1)))
|
||||
_ <- if entityOption.isEmpty then stateRepo.insert(newEntity)
|
||||
else stateRepo.update(newEntity.entityId, newEntity)
|
||||
eventEntity <- Random.nextUUID.map(Event(newEntity.entityId, event, _))
|
||||
_ <- eventRepo.insert(eventEntity)
|
||||
yield (event, newState)
|
||||
|
||||
private def transition(
|
||||
command: Command,
|
||||
name: String,
|
||||
entityId: UUID,
|
||||
entityOption: Option[Entity[State]],
|
||||
handler: CommandHandler[Command, Event, State]
|
||||
): Task[(Event, Option[State])] = (entityOption, handler) match
|
||||
case (None, h: CommandHandlerCreate[Command, Event, State]) =>
|
||||
h.onCommand(entityId, command)
|
||||
.map(event => (event, reducer.reduce(event)))
|
||||
case (Some(entity), h: CommandHandlerUpdate[Command, Event, State]) =>
|
||||
h.onCommand(entityId, entity.data, command)
|
||||
.map(event => (event, reducer.reduce(entity.data, event)))
|
||||
case (Some(_), _: CommandHandlerCreate[Command, Event, State]) =>
|
||||
ZIO.fail(
|
||||
new IllegalArgumentException(s"State already exists when applying create command $name")
|
||||
)
|
||||
case (None, _: CommandHandlerUpdate[Command, Event, State]) =>
|
||||
ZIO.fail(new IllegalArgumentException(s"No state found to apply the update command $name"))
|
||||
case _ => ZIO.fail(new IllegalArgumentException("Impossible state"))
|
||||
end CommandEngine
|
||||
|
||||
object CommandEngine:
|
||||
def layer[Command, Event, State](using Tag[Command], Tag[Event], Tag[State]) =
|
||||
ZLayer.fromFunction(CommandEngine[Command, Event, State](_, _, _, _))
|
||||
Loading…
Add table
Add a link
Reference in a new issue