From a036e3cbb312a722233c7ecd748a87b3ecfc28e2 Mon Sep 17 00:00:00 2001 From: Paul-Henri Froidmont Date: Fri, 12 Sep 2025 17:35:11 +0200 Subject: [PATCH] Fix multiple routes support --- build.mill | 2 +- ...leLiveView.scala => CounterLiveView.scala} | 51 ++------ example/src/Example.scala | 28 +++- example/src/ListLiveView.scala | 72 +++++++++++ zio/src/scalive/LiveRouter.scala | 120 ++++++++++-------- zio/src/scalive/Socket.scala | 92 +++++++------- 6 files changed, 217 insertions(+), 148 deletions(-) rename example/src/{ExampleLiveView.scala => CounterLiveView.scala} (60%) create mode 100644 example/src/ListLiveView.scala diff --git a/build.mill b/build.mill index a221ffe..043d072 100644 --- a/build.mill +++ b/build.mill @@ -38,7 +38,7 @@ object zio extends ScalaCommon: object example extends ScalaCommon: def moduleDeps = Seq(zio) - def mvnDeps = Seq(mvn"dev.optics::monocle-core:3.1.0") + def mvnDeps = Seq(mvn"dev.optics::monocle-core:3.1.0", mvn"dev.zio::zio-logging:2.5.1") def scaliveBundle = Task { os.copy( diff --git a/example/src/ExampleLiveView.scala b/example/src/CounterLiveView.scala similarity index 60% rename from example/src/ExampleLiveView.scala rename to example/src/CounterLiveView.scala index 0caec78..ba02d61 100644 --- a/example/src/ExampleLiveView.scala +++ b/example/src/CounterLiveView.scala @@ -1,27 +1,20 @@ -import ExampleLiveView.* +import CounterLiveView.* import monocle.syntax.all.* import scalive.* import zio.* import zio.json.* import zio.stream.ZStream -class ExampleLiveView(someParam: String) extends LiveView[Msg, Model]: +class CounterLiveView() extends LiveView[Msg, Model]: def init = ZIO.succeed( Model( isVisible = true, - counter = 0, - elems = List( - NestedModel("a", 10), - NestedModel("b", 15), - NestedModel("c", 20) - ) + counter = 0 ) ) def update(model: Model) = - case Msg.IncAge(value) => - ZIO.succeed(model.focus(_.elems.index(2).age).modify(_ + value)) case Msg.ToggleCounter => ZIO.succeed(model.focus(_.isVisible).modify(!_)) case Msg.IncCounter => @@ -31,38 +24,13 @@ class ExampleLiveView(someParam: String) extends LiveView[Msg, Model]: def view(model: Dyn[Model]) = div( + cls := "max-w-2xl mx-auto bg-white shadow rounded-2xl p-6 space-y-6", h1( cls := "text-2xl font-semibold tracking-tight text-gray-900", - someParam - ), - cls := "max-w-2xl mx-auto bg-white shadow rounded-2xl p-6 space-y-6", - idAttr := "42", - ul( - cls := "divide-y divide-gray-200", - model(_.elems).splitByIndex((_, elem) => - li( - cls := "py-3 flex flex-wrap items-center justify-between gap-2", - span( - cls := "text-gray-700", - "Nom: ", - span(cls := "font-medium", elem(_.name)) - ), - span( - cls := "text-sm text-gray-500", - "Age: ", - span(cls := "font-semibold text-gray-700", elem(_.age.toString)) - ) - ) - ) + "Counter with auto increment every second" ), div( cls := "flex flex-wrap items-center gap-3", - button( - cls := "inline-flex items-center rounded-lg px-3 py-2 text-sm font-medium bg-gray-900 text-white shadow hover:opacity-90 focus:outline-none focus:ring-2 focus:ring-gray-900/30", - phx.click := Msg.IncAge(1), - "Inc age" - ), - span(cls := "grow"), button( cls := "inline-flex items-center rounded-lg px-3 py-2 text-sm font-medium ring-1 ring-inset ring-gray-300 text-gray-700 hover:bg-gray-50 focus:outline-none focus:ring-2 focus:ring-gray-400/30", phx.click := Msg.ToggleCounter, @@ -98,18 +66,15 @@ class ExampleLiveView(someParam: String) extends LiveView[Msg, Model]: def subscriptions(model: Model) = ZStream.tick(1.second).map(_ => Msg.IncCounter).drop(1) -end ExampleLiveView +end CounterLiveView -object ExampleLiveView: +object CounterLiveView: enum Msg derives JsonCodec: - case IncAge(value: Int) case ToggleCounter case IncCounter case DecCounter final case class Model( isVisible: Boolean, - counter: Int, - elems: List[NestedModel]) - final case class NestedModel(name: String, age: Int) + counter: Int) diff --git a/example/src/Example.scala b/example/src/Example.scala index a7d9a66..a830196 100644 --- a/example/src/Example.scala +++ b/example/src/Example.scala @@ -1,18 +1,39 @@ +import scalive.{label as _, *} import zio.* import zio.http.* -import scalive.* +import zio.logging.ConsoleLoggerConfig +import zio.logging.LogColor +import zio.logging.LogFilter +import zio.logging.LogFormat.* +import zio.logging.consoleLogger object Example extends ZIOAppDefault: + private val logFormat = + label("timestamp", timestamp.fixed(32)).color(LogColor.BLUE) |-| + label("level", level.fixed(5)).highlight |-| + label("thread", fiberId).color(LogColor.WHITE) |-| + label("message", quoted(line)).highlight |-| + cause + + val logFilter = LogFilter.LogLevelByNameConfig(LogLevel.Debug) + + override val bootstrap = + Runtime.removeDefaultLoggers >>> consoleLogger(ConsoleLoggerConfig(logFormat, logFilter)) + val liveRouter = LiveRouter( RootLayout(_), List( LiveRoute( - Root, + Root / "counter", + (_, _) => CounterLiveView() + ), + LiveRoute( + Root / "list", (_, req) => val q = req.queryParam("q").map("Param : " ++ _).getOrElse("No param") - ExampleLiveView(q) + ListLiveView(q) ) ) ) @@ -20,3 +41,4 @@ object Example extends ZIOAppDefault: val routes = liveRouter.routes @@ Middleware.serveResources(Path.empty / "static", "public") override val run = Server.serve(routes).provide(Server.default) +end Example diff --git a/example/src/ListLiveView.scala b/example/src/ListLiveView.scala new file mode 100644 index 0000000..9beb8d8 --- /dev/null +++ b/example/src/ListLiveView.scala @@ -0,0 +1,72 @@ +import ListLiveView.* +import monocle.syntax.all.* +import scalive.* +import zio.* +import zio.json.* +import zio.stream.ZStream + +class ListLiveView(someParam: String) extends LiveView[Msg, Model]: + + def init = ZIO.succeed( + Model( + elems = List( + NestedModel("a", 10), + NestedModel("b", 15), + NestedModel("c", 20) + ) + ) + ) + + def update(model: Model) = + case Msg.IncAge(value) => + ZIO.succeed(model.focus(_.elems.index(2).age).modify(_ + value)) + + def view(model: Dyn[Model]) = + div( + h1( + cls := "text-2xl font-semibold tracking-tight text-gray-900", + someParam + ), + cls := "max-w-2xl mx-auto bg-white shadow rounded-2xl p-6 space-y-6", + idAttr := "42", + ul( + cls := "divide-y divide-gray-200", + model(_.elems).splitByIndex((_, elem) => + li( + cls := "py-3 flex flex-wrap items-center justify-between gap-2", + span( + cls := "text-gray-700", + "Nom: ", + span(cls := "font-medium", elem(_.name)) + ), + span( + cls := "text-sm text-gray-500", + "Age: ", + span(cls := "font-semibold text-gray-700", elem(_.age.toString)) + ) + ) + ) + ), + div( + cls := "flex flex-wrap items-center gap-3", + button( + cls := "inline-flex items-center rounded-lg px-3 py-2 text-sm font-medium bg-gray-900 text-white shadow hover:opacity-90 focus:outline-none focus:ring-2 focus:ring-gray-900/30", + phx.click := Msg.IncAge(1), + "Inc age" + ), + span(cls := "grow") + ) + ) + + def subscriptions(model: Model) = ZStream.empty + +end ListLiveView + +object ListLiveView: + + enum Msg derives JsonCodec: + case IncAge(value: Int) + + final case class Model( + elems: List[NestedModel]) + final case class NestedModel(name: String, age: Int) diff --git a/zio/src/scalive/LiveRouter.scala b/zio/src/scalive/LiveRouter.scala index 7898394..450e442 100644 --- a/zio/src/scalive/LiveRouter.scala +++ b/zio/src/scalive/LiveRouter.scala @@ -61,19 +61,19 @@ class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?, ?]] lv: LiveView[Msg, Model], meta: WebSocketMessage.Meta ): RIO[Scope, Unit] = - sockets.updateZIO { m => - m.get(id) match - case Some(socket) => - socket.shutdown *> + sockets + .updateZIO { m => + m.get(id) match + case Some(socket) => + socket.shutdown *> + Socket + .start(id, token, lv, meta) + .map(m.updated(id, _)) + case None => Socket .start(id, token, lv, meta) .map(m.updated(id, _)) - case None => - Socket - .start(id, token, lv, meta) - .map(m.updated(id, _)) - - } + }.flatMap(_ => ZIO.logDebug(s"LiveView joined $id")) def event(id: String, value: String, meta: WebSocketMessage.Meta): UIO[Unit] = sockets.get.flatMap { m => @@ -99,45 +99,47 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo private val socketApp: WebSocketApp[Any] = Handler.webSocket { channel => - ZIO.scoped(for - liveChannel <- LiveChannel.make() - _ <- liveChannel.diffsStream - .runForeach((payload, meta) => - channel - .send( - Read( - WebSocketFrame.text( - WebSocketMessage( - meta.joinRef, - meta.messageRef, - meta.topic, - payload match - case Payload.Diff(_) => "diff" - case _ => "phx_reply", - payload - ).toJson + ZIO + .scoped(for + liveChannel <- LiveChannel.make() + _ <- liveChannel.diffsStream + .runForeach((payload, meta) => + channel + .send( + Read( + WebSocketFrame.text( + WebSocketMessage( + meta.joinRef, + meta.messageRef, + meta.topic, + payload match + case Payload.Diff(_) => "diff" + case _ => "phx_reply", + payload + ).toJson + ) ) ) - ) - ) - .tapErrorCause(c => ZIO.logErrorCause("diffsStream pipeline failed", c)) - .ensuring(ZIO.logWarning("WS out fiber terminated")) - .fork - _ <- channel - .receiveAll { - case Read(WebSocketFrame.Text(content)) => - for - message <- ZIO - .fromEither(content.fromJson[WebSocketMessage]) - .mapError(new IllegalArgumentException(_)) - reply <- handleMessage(message, liveChannel) - _ <- reply match - case Some(r) => channel.send(Read(WebSocketFrame.text(r.toJson))) - case None => ZIO.unit - yield () - case _ => ZIO.unit - }.tapErrorCause(ZIO.logErrorCause(_)) - yield ()) + ) + .tapErrorCause(c => ZIO.logErrorCause("diffsStream pipeline failed", c)) + .ensuring(ZIO.logWarning("WS out fiber terminated")) + .fork + _ <- channel + .receiveAll { + case Read(WebSocketFrame.Close) => ZIO.logDebug("WS connection closed by client") + case Read(WebSocketFrame.Text(content)) => + for + message <- ZIO + .fromEither(content.fromJson[WebSocketMessage]) + .mapError(new IllegalArgumentException(_)) + reply <- handleMessage(message, liveChannel) + _ <- reply match + case Some(r) => channel.send(Read(WebSocketFrame.text(r.toJson))) + case None => ZIO.unit + yield () + case _ => ZIO.unit + } + yield ()).tapErrorCause(ZIO.logErrorCause(_)) } @@ -160,15 +162,21 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo ZIO .fromEither(URL.decode(url)).flatMap(url => val req = Request(url = url) - liveRoutes - .collectFirst { route => - val pathParams = route.path.decode(req.path).getOrElse(???) - val lv = route.liveviewBuilder(pathParams, req) - liveChannel - .join(message.topic, session, lv, message.meta)(using route.messageCodec) - .map(_ => None) - - }.getOrElse(ZIO.succeed(None)) + liveRoutes.iterator + .map(route => + route.path + .decode(req.path) + .toOption + .map(route.liveviewBuilder(_, req)) + .map( + ZIO.logDebug(s"Joining live view ${route.path.toString} ${message.topic}") *> + liveChannel.join(message.topic, session, _, message.meta)( + using route.messageCodec + ) + ) + ) + .collectFirst { case Some(join) => join.map(_ => None) } + .getOrElse(ZIO.succeed(None)) ) case Payload.Event(_, event, _) => liveChannel diff --git a/zio/src/scalive/Socket.scala b/zio/src/scalive/Socket.scala index b722628..d76c58e 100644 --- a/zio/src/scalive/Socket.scala +++ b/zio/src/scalive/Socket.scala @@ -23,55 +23,57 @@ object Socket: lv: LiveView[Msg, Model], meta: WebSocketMessage.Meta ): RIO[Scope, Socket[Msg, Model]] = - for - inbox <- Queue.bounded[(Msg, WebSocketMessage.Meta)](4) - outHub <- Hub.unbounded[(Payload, WebSocketMessage.Meta)] + ZIO.logAnnotate("lv", id) { + for + inbox <- Queue.bounded[(Msg, WebSocketMessage.Meta)](4) + outHub <- Hub.unbounded[(Payload, WebSocketMessage.Meta)] - initModel <- lv.init - modelVar = Var(initModel) - el = lv.view(modelVar) - ref <- Ref.make((modelVar, el)) + initModel <- lv.init + modelVar = Var(initModel) + el = lv.view(modelVar) + ref <- Ref.make((modelVar, el)) - initDiff = el.diff(trackUpdates = false) + initDiff = el.diff(trackUpdates = false) - lvStreamRef <- SubscriptionRef.make(lv.subscriptions(initModel)) + lvStreamRef <- SubscriptionRef.make(lv.subscriptions(initModel)) - clientMsgStream = ZStream.fromQueue(inbox) - serverMsgStream = (ZStream.fromZIO(lvStreamRef.get) ++ lvStreamRef.changes) - .flatMapParSwitch(1, 1)(identity) - .map(_ -> meta.copy(messageRef = None, eventType = "diff")) + clientMsgStream = ZStream.fromQueue(inbox) + serverMsgStream = (ZStream.fromZIO(lvStreamRef.get) ++ lvStreamRef.changes) + .flatMapParSwitch(1, 1)(identity) + .map(_ -> meta.copy(messageRef = None, eventType = "diff")) - clientFiber <- clientMsgStream.runForeach { (msg, meta) => - for - (modelVar, el) <- ref.get - updatedModel <- lv.update(modelVar.currentValue)(msg) - _ = modelVar.set(updatedModel) - _ <- lvStreamRef.set(lv.subscriptions(updatedModel)) - diff = el.diff() - payload = Payload.okReply(LiveResponse.Diff(diff)) - _ <- outHub.publish(payload -> meta) - yield () - }.fork - serverFiber <- serverMsgStream.runForeach { (msg, meta) => - for - (modelVar, el) <- ref.get - updatedModel <- lv.update(modelVar.currentValue)(msg) - _ = modelVar.set(updatedModel) - diff = el.diff() - payload = Payload.Diff(diff) - _ <- outHub.publish(payload -> meta) - yield () - }.fork - stop = - inbox.shutdown *> outHub.shutdown *> clientFiber.interrupt.unit *> serverFiber.interrupt.unit - outbox = - ZStream.succeed( - Payload.okReply(LiveResponse.InitDiff(initDiff)) -> meta - ) ++ ZStream.unwrapScoped(ZStream.fromHubScoped(outHub)).filterNot { - case (Payload.Diff(diff), _) => diff.isEmpty - case _ => false - } - yield Socket[Msg, Model](id, token, inbox, outbox, stop) - end for + clientFiber <- clientMsgStream.runForeach { (msg, meta) => + for + (modelVar, el) <- ref.get + updatedModel <- lv.update(modelVar.currentValue)(msg) + _ = modelVar.set(updatedModel) + _ <- lvStreamRef.set(lv.subscriptions(updatedModel)) + diff = el.diff() + payload = Payload.okReply(LiveResponse.Diff(diff)) + _ <- outHub.publish(payload -> meta) + yield () + }.fork + serverFiber <- serverMsgStream.runForeach { (msg, meta) => + for + (modelVar, el) <- ref.get + updatedModel <- lv.update(modelVar.currentValue)(msg) + _ = modelVar.set(updatedModel) + diff = el.diff() + payload = Payload.Diff(diff) + _ <- outHub.publish(payload -> meta) + yield () + }.fork + stop = + inbox.shutdown *> outHub.shutdown *> clientFiber.interrupt.unit *> serverFiber.interrupt.unit + outbox = + ZStream.succeed( + Payload.okReply(LiveResponse.InitDiff(initDiff)) -> meta + ) ++ ZStream + .unwrapScoped(ZStream.fromHubScoped(outHub)).filterNot { + case (Payload.Diff(diff), _) => diff.isEmpty + case _ => false + } + yield Socket[Msg, Model](id, token, inbox, outbox, stop) + } end start end Socket