diff --git a/build.mill b/build.mill index cd87940..442cb2d 100644 --- a/build.mill +++ b/build.mill @@ -16,7 +16,8 @@ trait ScalaCommon extends ScalaModule: "-preview", "-feature", "-language:implicitConversions", - "-Wvalue-discard" + "-Wvalue-discard", + "-Wnonunit-statement" ) object core extends ScalaCommon: diff --git a/zio/src/scalive/LiveRouter.scala b/zio/src/scalive/LiveRouter.scala index b0b6f06..b8285fd 100644 --- a/zio/src/scalive/LiveRouter.scala +++ b/zio/src/scalive/LiveRouter.scala @@ -44,14 +44,11 @@ final case class LiveRoute[A, Event: JsonCodec]( } class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]): - def diffsStream: ZStream[Any, Nothing, (Diff, Meta)] = + def diffsStream: ZStream[Any, Nothing, (LiveResponse, Meta)] = sockets.changes .map(m => ZStream - .mergeAllUnbounded()( - m.values - .map(_.outbox).map(ZStream.fromHub(_)).toList* - ) + .mergeAllUnbounded()(m.values.map(_.outbox).toList*) ).flatMapParSwitch(1, 1)(identity) def join[Event: JsonCodec]( @@ -75,7 +72,7 @@ class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]): } def event(id: String, value: String, meta: WebSocketMessage.Meta): UIO[Unit] = - sockets.get.map { m => + sockets.get.flatMap { m => m.get(id) match case Some(socket) => socket.inbox @@ -84,9 +81,9 @@ class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]): .fromJson(using socket.clientEventCodec.decoder) .getOrElse(throw new IllegalArgumentException()) -> meta - ) + ).unit case None => ZIO.unit - }.unit + } end LiveChannel @@ -110,7 +107,7 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo meta.messageRef, meta.topic, "phx_reply", - Payload.Reply("OK", LiveResponse.Diff(diff)) + Payload.Reply("ok", diff) ).toJson ) ) @@ -169,7 +166,6 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo .map(_ => None) case Payload.Reply(_, _) => ZIO.die(new IllegalArgumentException()) end match - end handleMessage val routes: Routes[Any, Response] = diff --git a/zio/src/scalive/Socket.scala b/zio/src/scalive/Socket.scala index d3aa3ac..876d52b 100644 --- a/zio/src/scalive/Socket.scala +++ b/zio/src/scalive/Socket.scala @@ -1,16 +1,16 @@ package scalive +import scalive.WebSocketMessage.LiveResponse import zio.* -import zio.json.* import zio.Queue +import zio.json.* import zio.stream.ZStream final case class Socket[Event: JsonCodec] private ( id: String, token: String, - // lv: LiveView[CliEvt, SrvEvt], inbox: Queue[(Event, WebSocketMessage.Meta)], - outbox: Hub[(Diff, WebSocketMessage.Meta)], + outbox: ZStream[Any, Nothing, (LiveResponse, WebSocketMessage.Meta)], fiber: Fiber.Runtime[Nothing, Unit], shutdown: UIO[Unit]): val clientEventCodec = JsonCodec[Event] @@ -24,10 +24,8 @@ object Socket: ): URIO[Scope, Socket[Event]] = for inbox <- Queue.bounded[(Event, WebSocketMessage.Meta)](4) - outbox <- Hub.bounded[(Diff, WebSocketMessage.Meta)](4) + outHub <- Hub.bounded[(LiveResponse, WebSocketMessage.Meta)](4) initDiff = lv.diff(trackUpdates = false) - _ <- outbox.publish(initDiff -> meta).unit - _ <- outbox.size.flatMap(s => ZIO.log(s.toString)) // FIXME lvRef <- Ref.make(lv) fiber <- ZStream .fromQueue(inbox) @@ -36,10 +34,12 @@ object Socket: lv <- lvRef.get _ = lv.handleEvent(msg) diff = lv.diff() - _ <- outbox.publish(diff -> meta) + _ <- outHub.publish(LiveResponse.Diff(diff) -> meta) yield () } .runDrain .forkScoped - stop = inbox.shutdown *> outbox.shutdown *> fiber.interrupt.unit + stop = inbox.shutdown *> outHub.shutdown *> fiber.interrupt.unit + diffStream <- ZStream.fromHubScoped(outHub) + outbox = ZStream.succeed(LiveResponse.InitDiff(initDiff) -> meta) ++ diffStream yield Socket[Event](id, token, inbox, outbox, fiber, stop)