Fix events processing

This commit is contained in:
Paul-Henri Froidmont 2025-09-03 18:08:17 +02:00
parent fcc5f1799e
commit 4af9a78408
Signed by: phfroidmont
GPG key ID: BE948AFD7E7873BE
3 changed files with 16 additions and 19 deletions

View file

@ -16,7 +16,8 @@ trait ScalaCommon extends ScalaModule:
"-preview", "-preview",
"-feature", "-feature",
"-language:implicitConversions", "-language:implicitConversions",
"-Wvalue-discard" "-Wvalue-discard",
"-Wnonunit-statement"
) )
object core extends ScalaCommon: object core extends ScalaCommon:

View file

@ -44,14 +44,11 @@ final case class LiveRoute[A, Event: JsonCodec](
} }
class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]): class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]):
def diffsStream: ZStream[Any, Nothing, (Diff, Meta)] = def diffsStream: ZStream[Any, Nothing, (LiveResponse, Meta)] =
sockets.changes sockets.changes
.map(m => .map(m =>
ZStream ZStream
.mergeAllUnbounded()( .mergeAllUnbounded()(m.values.map(_.outbox).toList*)
m.values
.map(_.outbox).map(ZStream.fromHub(_)).toList*
)
).flatMapParSwitch(1, 1)(identity) ).flatMapParSwitch(1, 1)(identity)
def join[Event: JsonCodec]( def join[Event: JsonCodec](
@ -75,7 +72,7 @@ class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]):
} }
def event(id: String, value: String, meta: WebSocketMessage.Meta): UIO[Unit] = def event(id: String, value: String, meta: WebSocketMessage.Meta): UIO[Unit] =
sockets.get.map { m => sockets.get.flatMap { m =>
m.get(id) match m.get(id) match
case Some(socket) => case Some(socket) =>
socket.inbox socket.inbox
@ -84,9 +81,9 @@ class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?]]]):
.fromJson(using socket.clientEventCodec.decoder) .fromJson(using socket.clientEventCodec.decoder)
.getOrElse(throw new IllegalArgumentException()) .getOrElse(throw new IllegalArgumentException())
-> meta -> meta
) ).unit
case None => ZIO.unit case None => ZIO.unit
}.unit }
end LiveChannel end LiveChannel
@ -110,7 +107,7 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo
meta.messageRef, meta.messageRef,
meta.topic, meta.topic,
"phx_reply", "phx_reply",
Payload.Reply("OK", LiveResponse.Diff(diff)) Payload.Reply("ok", diff)
).toJson ).toJson
) )
) )
@ -169,7 +166,6 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo
.map(_ => None) .map(_ => None)
case Payload.Reply(_, _) => ZIO.die(new IllegalArgumentException()) case Payload.Reply(_, _) => ZIO.die(new IllegalArgumentException())
end match end match
end handleMessage end handleMessage
val routes: Routes[Any, Response] = val routes: Routes[Any, Response] =

View file

@ -1,16 +1,16 @@
package scalive package scalive
import scalive.WebSocketMessage.LiveResponse
import zio.* import zio.*
import zio.json.*
import zio.Queue import zio.Queue
import zio.json.*
import zio.stream.ZStream import zio.stream.ZStream
final case class Socket[Event: JsonCodec] private ( final case class Socket[Event: JsonCodec] private (
id: String, id: String,
token: String, token: String,
// lv: LiveView[CliEvt, SrvEvt],
inbox: Queue[(Event, WebSocketMessage.Meta)], inbox: Queue[(Event, WebSocketMessage.Meta)],
outbox: Hub[(Diff, WebSocketMessage.Meta)], outbox: ZStream[Any, Nothing, (LiveResponse, WebSocketMessage.Meta)],
fiber: Fiber.Runtime[Nothing, Unit], fiber: Fiber.Runtime[Nothing, Unit],
shutdown: UIO[Unit]): shutdown: UIO[Unit]):
val clientEventCodec = JsonCodec[Event] val clientEventCodec = JsonCodec[Event]
@ -24,10 +24,8 @@ object Socket:
): URIO[Scope, Socket[Event]] = ): URIO[Scope, Socket[Event]] =
for for
inbox <- Queue.bounded[(Event, WebSocketMessage.Meta)](4) inbox <- Queue.bounded[(Event, WebSocketMessage.Meta)](4)
outbox <- Hub.bounded[(Diff, WebSocketMessage.Meta)](4) outHub <- Hub.bounded[(LiveResponse, WebSocketMessage.Meta)](4)
initDiff = lv.diff(trackUpdates = false) initDiff = lv.diff(trackUpdates = false)
_ <- outbox.publish(initDiff -> meta).unit
_ <- outbox.size.flatMap(s => ZIO.log(s.toString)) // FIXME
lvRef <- Ref.make(lv) lvRef <- Ref.make(lv)
fiber <- ZStream fiber <- ZStream
.fromQueue(inbox) .fromQueue(inbox)
@ -36,10 +34,12 @@ object Socket:
lv <- lvRef.get lv <- lvRef.get
_ = lv.handleEvent(msg) _ = lv.handleEvent(msg)
diff = lv.diff() diff = lv.diff()
_ <- outbox.publish(diff -> meta) _ <- outHub.publish(LiveResponse.Diff(diff) -> meta)
yield () yield ()
} }
.runDrain .runDrain
.forkScoped .forkScoped
stop = inbox.shutdown *> outbox.shutdown *> fiber.interrupt.unit stop = inbox.shutdown *> outHub.shutdown *> fiber.interrupt.unit
diffStream <- ZStream.fromHubScoped(outHub)
outbox = ZStream.succeed(LiveResponse.InitDiff(initDiff) -> meta) ++ diffStream
yield Socket[Event](id, token, inbox, outbox, fiber, stop) yield Socket[Event](id, token, inbox, outbox, fiber, stop)