mirror of
https://github.com/phfroidmont/scalive.git
synced 2025-12-25 05:26:59 +01:00
Fix multiple routes support
This commit is contained in:
parent
44ffa55cc6
commit
a036e3cbb3
6 changed files with 217 additions and 148 deletions
|
|
@ -38,7 +38,7 @@ object zio extends ScalaCommon:
|
|||
|
||||
object example extends ScalaCommon:
|
||||
def moduleDeps = Seq(zio)
|
||||
def mvnDeps = Seq(mvn"dev.optics::monocle-core:3.1.0")
|
||||
def mvnDeps = Seq(mvn"dev.optics::monocle-core:3.1.0", mvn"dev.zio::zio-logging:2.5.1")
|
||||
|
||||
def scaliveBundle = Task {
|
||||
os.copy(
|
||||
|
|
|
|||
|
|
@ -1,27 +1,20 @@
|
|||
import ExampleLiveView.*
|
||||
import CounterLiveView.*
|
||||
import monocle.syntax.all.*
|
||||
import scalive.*
|
||||
import zio.*
|
||||
import zio.json.*
|
||||
import zio.stream.ZStream
|
||||
|
||||
class ExampleLiveView(someParam: String) extends LiveView[Msg, Model]:
|
||||
class CounterLiveView() extends LiveView[Msg, Model]:
|
||||
|
||||
def init = ZIO.succeed(
|
||||
Model(
|
||||
isVisible = true,
|
||||
counter = 0,
|
||||
elems = List(
|
||||
NestedModel("a", 10),
|
||||
NestedModel("b", 15),
|
||||
NestedModel("c", 20)
|
||||
)
|
||||
counter = 0
|
||||
)
|
||||
)
|
||||
|
||||
def update(model: Model) =
|
||||
case Msg.IncAge(value) =>
|
||||
ZIO.succeed(model.focus(_.elems.index(2).age).modify(_ + value))
|
||||
case Msg.ToggleCounter =>
|
||||
ZIO.succeed(model.focus(_.isVisible).modify(!_))
|
||||
case Msg.IncCounter =>
|
||||
|
|
@ -31,38 +24,13 @@ class ExampleLiveView(someParam: String) extends LiveView[Msg, Model]:
|
|||
|
||||
def view(model: Dyn[Model]) =
|
||||
div(
|
||||
cls := "max-w-2xl mx-auto bg-white shadow rounded-2xl p-6 space-y-6",
|
||||
h1(
|
||||
cls := "text-2xl font-semibold tracking-tight text-gray-900",
|
||||
someParam
|
||||
),
|
||||
cls := "max-w-2xl mx-auto bg-white shadow rounded-2xl p-6 space-y-6",
|
||||
idAttr := "42",
|
||||
ul(
|
||||
cls := "divide-y divide-gray-200",
|
||||
model(_.elems).splitByIndex((_, elem) =>
|
||||
li(
|
||||
cls := "py-3 flex flex-wrap items-center justify-between gap-2",
|
||||
span(
|
||||
cls := "text-gray-700",
|
||||
"Nom: ",
|
||||
span(cls := "font-medium", elem(_.name))
|
||||
),
|
||||
span(
|
||||
cls := "text-sm text-gray-500",
|
||||
"Age: ",
|
||||
span(cls := "font-semibold text-gray-700", elem(_.age.toString))
|
||||
)
|
||||
)
|
||||
)
|
||||
"Counter with auto increment every second"
|
||||
),
|
||||
div(
|
||||
cls := "flex flex-wrap items-center gap-3",
|
||||
button(
|
||||
cls := "inline-flex items-center rounded-lg px-3 py-2 text-sm font-medium bg-gray-900 text-white shadow hover:opacity-90 focus:outline-none focus:ring-2 focus:ring-gray-900/30",
|
||||
phx.click := Msg.IncAge(1),
|
||||
"Inc age"
|
||||
),
|
||||
span(cls := "grow"),
|
||||
button(
|
||||
cls := "inline-flex items-center rounded-lg px-3 py-2 text-sm font-medium ring-1 ring-inset ring-gray-300 text-gray-700 hover:bg-gray-50 focus:outline-none focus:ring-2 focus:ring-gray-400/30",
|
||||
phx.click := Msg.ToggleCounter,
|
||||
|
|
@ -98,18 +66,15 @@ class ExampleLiveView(someParam: String) extends LiveView[Msg, Model]:
|
|||
def subscriptions(model: Model) =
|
||||
ZStream.tick(1.second).map(_ => Msg.IncCounter).drop(1)
|
||||
|
||||
end ExampleLiveView
|
||||
end CounterLiveView
|
||||
|
||||
object ExampleLiveView:
|
||||
object CounterLiveView:
|
||||
|
||||
enum Msg derives JsonCodec:
|
||||
case IncAge(value: Int)
|
||||
case ToggleCounter
|
||||
case IncCounter
|
||||
case DecCounter
|
||||
|
||||
final case class Model(
|
||||
isVisible: Boolean,
|
||||
counter: Int,
|
||||
elems: List[NestedModel])
|
||||
final case class NestedModel(name: String, age: Int)
|
||||
counter: Int)
|
||||
|
|
@ -1,18 +1,39 @@
|
|||
import scalive.{label as _, *}
|
||||
import zio.*
|
||||
import zio.http.*
|
||||
import scalive.*
|
||||
import zio.logging.ConsoleLoggerConfig
|
||||
import zio.logging.LogColor
|
||||
import zio.logging.LogFilter
|
||||
import zio.logging.LogFormat.*
|
||||
import zio.logging.consoleLogger
|
||||
|
||||
object Example extends ZIOAppDefault:
|
||||
|
||||
private val logFormat =
|
||||
label("timestamp", timestamp.fixed(32)).color(LogColor.BLUE) |-|
|
||||
label("level", level.fixed(5)).highlight |-|
|
||||
label("thread", fiberId).color(LogColor.WHITE) |-|
|
||||
label("message", quoted(line)).highlight |-|
|
||||
cause
|
||||
|
||||
val logFilter = LogFilter.LogLevelByNameConfig(LogLevel.Debug)
|
||||
|
||||
override val bootstrap =
|
||||
Runtime.removeDefaultLoggers >>> consoleLogger(ConsoleLoggerConfig(logFormat, logFilter))
|
||||
|
||||
val liveRouter =
|
||||
LiveRouter(
|
||||
RootLayout(_),
|
||||
List(
|
||||
LiveRoute(
|
||||
Root,
|
||||
Root / "counter",
|
||||
(_, _) => CounterLiveView()
|
||||
),
|
||||
LiveRoute(
|
||||
Root / "list",
|
||||
(_, req) =>
|
||||
val q = req.queryParam("q").map("Param : " ++ _).getOrElse("No param")
|
||||
ExampleLiveView(q)
|
||||
ListLiveView(q)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
|
@ -20,3 +41,4 @@ object Example extends ZIOAppDefault:
|
|||
val routes = liveRouter.routes @@ Middleware.serveResources(Path.empty / "static", "public")
|
||||
|
||||
override val run = Server.serve(routes).provide(Server.default)
|
||||
end Example
|
||||
|
|
|
|||
72
example/src/ListLiveView.scala
Normal file
72
example/src/ListLiveView.scala
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
import ListLiveView.*
|
||||
import monocle.syntax.all.*
|
||||
import scalive.*
|
||||
import zio.*
|
||||
import zio.json.*
|
||||
import zio.stream.ZStream
|
||||
|
||||
class ListLiveView(someParam: String) extends LiveView[Msg, Model]:
|
||||
|
||||
def init = ZIO.succeed(
|
||||
Model(
|
||||
elems = List(
|
||||
NestedModel("a", 10),
|
||||
NestedModel("b", 15),
|
||||
NestedModel("c", 20)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def update(model: Model) =
|
||||
case Msg.IncAge(value) =>
|
||||
ZIO.succeed(model.focus(_.elems.index(2).age).modify(_ + value))
|
||||
|
||||
def view(model: Dyn[Model]) =
|
||||
div(
|
||||
h1(
|
||||
cls := "text-2xl font-semibold tracking-tight text-gray-900",
|
||||
someParam
|
||||
),
|
||||
cls := "max-w-2xl mx-auto bg-white shadow rounded-2xl p-6 space-y-6",
|
||||
idAttr := "42",
|
||||
ul(
|
||||
cls := "divide-y divide-gray-200",
|
||||
model(_.elems).splitByIndex((_, elem) =>
|
||||
li(
|
||||
cls := "py-3 flex flex-wrap items-center justify-between gap-2",
|
||||
span(
|
||||
cls := "text-gray-700",
|
||||
"Nom: ",
|
||||
span(cls := "font-medium", elem(_.name))
|
||||
),
|
||||
span(
|
||||
cls := "text-sm text-gray-500",
|
||||
"Age: ",
|
||||
span(cls := "font-semibold text-gray-700", elem(_.age.toString))
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
div(
|
||||
cls := "flex flex-wrap items-center gap-3",
|
||||
button(
|
||||
cls := "inline-flex items-center rounded-lg px-3 py-2 text-sm font-medium bg-gray-900 text-white shadow hover:opacity-90 focus:outline-none focus:ring-2 focus:ring-gray-900/30",
|
||||
phx.click := Msg.IncAge(1),
|
||||
"Inc age"
|
||||
),
|
||||
span(cls := "grow")
|
||||
)
|
||||
)
|
||||
|
||||
def subscriptions(model: Model) = ZStream.empty
|
||||
|
||||
end ListLiveView
|
||||
|
||||
object ListLiveView:
|
||||
|
||||
enum Msg derives JsonCodec:
|
||||
case IncAge(value: Int)
|
||||
|
||||
final case class Model(
|
||||
elems: List[NestedModel])
|
||||
final case class NestedModel(name: String, age: Int)
|
||||
|
|
@ -61,19 +61,19 @@ class LiveChannel(private val sockets: SubscriptionRef[Map[String, Socket[?, ?]]
|
|||
lv: LiveView[Msg, Model],
|
||||
meta: WebSocketMessage.Meta
|
||||
): RIO[Scope, Unit] =
|
||||
sockets.updateZIO { m =>
|
||||
m.get(id) match
|
||||
case Some(socket) =>
|
||||
socket.shutdown *>
|
||||
sockets
|
||||
.updateZIO { m =>
|
||||
m.get(id) match
|
||||
case Some(socket) =>
|
||||
socket.shutdown *>
|
||||
Socket
|
||||
.start(id, token, lv, meta)
|
||||
.map(m.updated(id, _))
|
||||
case None =>
|
||||
Socket
|
||||
.start(id, token, lv, meta)
|
||||
.map(m.updated(id, _))
|
||||
case None =>
|
||||
Socket
|
||||
.start(id, token, lv, meta)
|
||||
.map(m.updated(id, _))
|
||||
|
||||
}
|
||||
}.flatMap(_ => ZIO.logDebug(s"LiveView joined $id"))
|
||||
|
||||
def event(id: String, value: String, meta: WebSocketMessage.Meta): UIO[Unit] =
|
||||
sockets.get.flatMap { m =>
|
||||
|
|
@ -99,45 +99,47 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo
|
|||
|
||||
private val socketApp: WebSocketApp[Any] =
|
||||
Handler.webSocket { channel =>
|
||||
ZIO.scoped(for
|
||||
liveChannel <- LiveChannel.make()
|
||||
_ <- liveChannel.diffsStream
|
||||
.runForeach((payload, meta) =>
|
||||
channel
|
||||
.send(
|
||||
Read(
|
||||
WebSocketFrame.text(
|
||||
WebSocketMessage(
|
||||
meta.joinRef,
|
||||
meta.messageRef,
|
||||
meta.topic,
|
||||
payload match
|
||||
case Payload.Diff(_) => "diff"
|
||||
case _ => "phx_reply",
|
||||
payload
|
||||
).toJson
|
||||
ZIO
|
||||
.scoped(for
|
||||
liveChannel <- LiveChannel.make()
|
||||
_ <- liveChannel.diffsStream
|
||||
.runForeach((payload, meta) =>
|
||||
channel
|
||||
.send(
|
||||
Read(
|
||||
WebSocketFrame.text(
|
||||
WebSocketMessage(
|
||||
meta.joinRef,
|
||||
meta.messageRef,
|
||||
meta.topic,
|
||||
payload match
|
||||
case Payload.Diff(_) => "diff"
|
||||
case _ => "phx_reply",
|
||||
payload
|
||||
).toJson
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.tapErrorCause(c => ZIO.logErrorCause("diffsStream pipeline failed", c))
|
||||
.ensuring(ZIO.logWarning("WS out fiber terminated"))
|
||||
.fork
|
||||
_ <- channel
|
||||
.receiveAll {
|
||||
case Read(WebSocketFrame.Text(content)) =>
|
||||
for
|
||||
message <- ZIO
|
||||
.fromEither(content.fromJson[WebSocketMessage])
|
||||
.mapError(new IllegalArgumentException(_))
|
||||
reply <- handleMessage(message, liveChannel)
|
||||
_ <- reply match
|
||||
case Some(r) => channel.send(Read(WebSocketFrame.text(r.toJson)))
|
||||
case None => ZIO.unit
|
||||
yield ()
|
||||
case _ => ZIO.unit
|
||||
}.tapErrorCause(ZIO.logErrorCause(_))
|
||||
yield ())
|
||||
)
|
||||
.tapErrorCause(c => ZIO.logErrorCause("diffsStream pipeline failed", c))
|
||||
.ensuring(ZIO.logWarning("WS out fiber terminated"))
|
||||
.fork
|
||||
_ <- channel
|
||||
.receiveAll {
|
||||
case Read(WebSocketFrame.Close) => ZIO.logDebug("WS connection closed by client")
|
||||
case Read(WebSocketFrame.Text(content)) =>
|
||||
for
|
||||
message <- ZIO
|
||||
.fromEither(content.fromJson[WebSocketMessage])
|
||||
.mapError(new IllegalArgumentException(_))
|
||||
reply <- handleMessage(message, liveChannel)
|
||||
_ <- reply match
|
||||
case Some(r) => channel.send(Read(WebSocketFrame.text(r.toJson)))
|
||||
case None => ZIO.unit
|
||||
yield ()
|
||||
case _ => ZIO.unit
|
||||
}
|
||||
yield ()).tapErrorCause(ZIO.logErrorCause(_))
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -160,15 +162,21 @@ class LiveRouter(rootLayout: HtmlElement => HtmlElement, liveRoutes: List[LiveRo
|
|||
ZIO
|
||||
.fromEither(URL.decode(url)).flatMap(url =>
|
||||
val req = Request(url = url)
|
||||
liveRoutes
|
||||
.collectFirst { route =>
|
||||
val pathParams = route.path.decode(req.path).getOrElse(???)
|
||||
val lv = route.liveviewBuilder(pathParams, req)
|
||||
liveChannel
|
||||
.join(message.topic, session, lv, message.meta)(using route.messageCodec)
|
||||
.map(_ => None)
|
||||
|
||||
}.getOrElse(ZIO.succeed(None))
|
||||
liveRoutes.iterator
|
||||
.map(route =>
|
||||
route.path
|
||||
.decode(req.path)
|
||||
.toOption
|
||||
.map(route.liveviewBuilder(_, req))
|
||||
.map(
|
||||
ZIO.logDebug(s"Joining live view ${route.path.toString} ${message.topic}") *>
|
||||
liveChannel.join(message.topic, session, _, message.meta)(
|
||||
using route.messageCodec
|
||||
)
|
||||
)
|
||||
)
|
||||
.collectFirst { case Some(join) => join.map(_ => None) }
|
||||
.getOrElse(ZIO.succeed(None))
|
||||
)
|
||||
case Payload.Event(_, event, _) =>
|
||||
liveChannel
|
||||
|
|
|
|||
|
|
@ -23,55 +23,57 @@ object Socket:
|
|||
lv: LiveView[Msg, Model],
|
||||
meta: WebSocketMessage.Meta
|
||||
): RIO[Scope, Socket[Msg, Model]] =
|
||||
for
|
||||
inbox <- Queue.bounded[(Msg, WebSocketMessage.Meta)](4)
|
||||
outHub <- Hub.unbounded[(Payload, WebSocketMessage.Meta)]
|
||||
ZIO.logAnnotate("lv", id) {
|
||||
for
|
||||
inbox <- Queue.bounded[(Msg, WebSocketMessage.Meta)](4)
|
||||
outHub <- Hub.unbounded[(Payload, WebSocketMessage.Meta)]
|
||||
|
||||
initModel <- lv.init
|
||||
modelVar = Var(initModel)
|
||||
el = lv.view(modelVar)
|
||||
ref <- Ref.make((modelVar, el))
|
||||
initModel <- lv.init
|
||||
modelVar = Var(initModel)
|
||||
el = lv.view(modelVar)
|
||||
ref <- Ref.make((modelVar, el))
|
||||
|
||||
initDiff = el.diff(trackUpdates = false)
|
||||
initDiff = el.diff(trackUpdates = false)
|
||||
|
||||
lvStreamRef <- SubscriptionRef.make(lv.subscriptions(initModel))
|
||||
lvStreamRef <- SubscriptionRef.make(lv.subscriptions(initModel))
|
||||
|
||||
clientMsgStream = ZStream.fromQueue(inbox)
|
||||
serverMsgStream = (ZStream.fromZIO(lvStreamRef.get) ++ lvStreamRef.changes)
|
||||
.flatMapParSwitch(1, 1)(identity)
|
||||
.map(_ -> meta.copy(messageRef = None, eventType = "diff"))
|
||||
clientMsgStream = ZStream.fromQueue(inbox)
|
||||
serverMsgStream = (ZStream.fromZIO(lvStreamRef.get) ++ lvStreamRef.changes)
|
||||
.flatMapParSwitch(1, 1)(identity)
|
||||
.map(_ -> meta.copy(messageRef = None, eventType = "diff"))
|
||||
|
||||
clientFiber <- clientMsgStream.runForeach { (msg, meta) =>
|
||||
for
|
||||
(modelVar, el) <- ref.get
|
||||
updatedModel <- lv.update(modelVar.currentValue)(msg)
|
||||
_ = modelVar.set(updatedModel)
|
||||
_ <- lvStreamRef.set(lv.subscriptions(updatedModel))
|
||||
diff = el.diff()
|
||||
payload = Payload.okReply(LiveResponse.Diff(diff))
|
||||
_ <- outHub.publish(payload -> meta)
|
||||
yield ()
|
||||
}.fork
|
||||
serverFiber <- serverMsgStream.runForeach { (msg, meta) =>
|
||||
for
|
||||
(modelVar, el) <- ref.get
|
||||
updatedModel <- lv.update(modelVar.currentValue)(msg)
|
||||
_ = modelVar.set(updatedModel)
|
||||
diff = el.diff()
|
||||
payload = Payload.Diff(diff)
|
||||
_ <- outHub.publish(payload -> meta)
|
||||
yield ()
|
||||
}.fork
|
||||
stop =
|
||||
inbox.shutdown *> outHub.shutdown *> clientFiber.interrupt.unit *> serverFiber.interrupt.unit
|
||||
outbox =
|
||||
ZStream.succeed(
|
||||
Payload.okReply(LiveResponse.InitDiff(initDiff)) -> meta
|
||||
) ++ ZStream.unwrapScoped(ZStream.fromHubScoped(outHub)).filterNot {
|
||||
case (Payload.Diff(diff), _) => diff.isEmpty
|
||||
case _ => false
|
||||
}
|
||||
yield Socket[Msg, Model](id, token, inbox, outbox, stop)
|
||||
end for
|
||||
clientFiber <- clientMsgStream.runForeach { (msg, meta) =>
|
||||
for
|
||||
(modelVar, el) <- ref.get
|
||||
updatedModel <- lv.update(modelVar.currentValue)(msg)
|
||||
_ = modelVar.set(updatedModel)
|
||||
_ <- lvStreamRef.set(lv.subscriptions(updatedModel))
|
||||
diff = el.diff()
|
||||
payload = Payload.okReply(LiveResponse.Diff(diff))
|
||||
_ <- outHub.publish(payload -> meta)
|
||||
yield ()
|
||||
}.fork
|
||||
serverFiber <- serverMsgStream.runForeach { (msg, meta) =>
|
||||
for
|
||||
(modelVar, el) <- ref.get
|
||||
updatedModel <- lv.update(modelVar.currentValue)(msg)
|
||||
_ = modelVar.set(updatedModel)
|
||||
diff = el.diff()
|
||||
payload = Payload.Diff(diff)
|
||||
_ <- outHub.publish(payload -> meta)
|
||||
yield ()
|
||||
}.fork
|
||||
stop =
|
||||
inbox.shutdown *> outHub.shutdown *> clientFiber.interrupt.unit *> serverFiber.interrupt.unit
|
||||
outbox =
|
||||
ZStream.succeed(
|
||||
Payload.okReply(LiveResponse.InitDiff(initDiff)) -> meta
|
||||
) ++ ZStream
|
||||
.unwrapScoped(ZStream.fromHubScoped(outHub)).filterNot {
|
||||
case (Payload.Diff(diff), _) => diff.isEmpty
|
||||
case _ => false
|
||||
}
|
||||
yield Socket[Msg, Model](id, token, inbox, outbox, stop)
|
||||
}
|
||||
end start
|
||||
end Socket
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue