Skip to content
Snippets Groups Projects
Commit b1266280 authored by Will Billingsley's avatar Will Billingsley
Browse files

Added WebSocket helper and example

parent e0f40946
Branches
No related tags found
No related merge requests found
...@@ -2,14 +2,22 @@ package controllers ...@@ -2,14 +2,22 @@ package controllers
import javax.inject._ import javax.inject._
import scala.util.Random import akka.stream.{Materializer, OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Flow, Sink, Source, SourceQueueWithComplete}
import scala.concurrent.duration._
import scala.util.Random
import play.api.libs.json.{JsValue, Json} import play.api.libs.json.{JsValue, Json}
import play.api.mvc._ import play.api.mvc._
import scala.collection.mutable
import play.api.libs.concurrent.Futures._
import scala.concurrent.ExecutionContext
@Singleton @Singleton
class ExamplesController @Inject()(cc: ControllerComponents) extends AbstractController(cc) { class ExamplesController @Inject()(cc: ControllerComponents)(implicit ec: ExecutionContext, mat: Materializer) extends AbstractController(cc) {
/** /**
* A controller action that receives JSON as input. * A controller action that receives JSON as input.
...@@ -62,4 +70,86 @@ class ExamplesController @Inject()(cc: ControllerComponents) extends AbstractCon ...@@ -62,4 +70,86 @@ class ExamplesController @Inject()(cc: ControllerComponents) extends AbstractCon
} }
// A list of connected sockets that we'll push the time out to every second
var listeners = mutable.Set.empty[SourceQueueWithComplete[String]]
// Just keeps putting out the time to every listening websocket every second
val timeTick = Source.tick(1.second, 1.second, 1).runForeach { _ =>
for { q <- listeners } q.offer(s"The time now is ${System.currentTimeMillis()}")
}
/**
* A websocket written using a helper function that I have defined for you.
*
* In this case, whenever a websocket is connected, it starts sending random message
*
* @return
*/
def websocketWithHelper():WebSocket = {
WebSocketHelper.createWebSocket[String, String](
{ queue =>
// We now have an asynchronous queue. What would we like to do with it
listeners.add(queue)
},
{ msg =>
// What would we like to do with each message
println(msg)
},
{ queue =>
// What would we like to do with the queue when the websocket is complete?
listeners.remove(queue)
}
)
}
/**
* An example of a WebSocket in Play. Play uses "Akka Streams" which is a fairly advanced way of doing streams of
* asynchronous work. However, using this template, you should just
* @return
*/
def websocket() = WebSocket.accept[String, String] { request =>
// Our source is going to be an asynchronous queue we can push messages to with 'out.offer(msg)'. This is going to
// be the "source" of our flow. A complication is it doesn't create the queue immediately - this just says
// *when* this source is connected to a flow, create a queue. So we don't have a queue yet.
val outSource = Source.queue[String](50, OverflowStrategy.backpressure)
// Because I want to refer to the queue in the sink, but I haven't actually got the queue yet, I'm just creating
// a box to put our queue in. At the moment, it's empty
var outOpt:Option[SourceQueueWithComplete[String]] = None
// Create a "sink" that describes what we want to do with each message. Here, I'm just going to count the characters
// and echo it straight back out on the "out" queue.
val in = Sink.foreach[String] { message =>
// If we have an out queue, send the message on it
outOpt foreach { out =>
out.offer(s"That message had ${message.length} characters in it")
}
}
// This defines a "flow" -- something that has an input and an output.
// "Coupled" means that if the input closes, the output will close
// "Mat" means "materialised" -- ie, it'll give us the output queue that gets created and the Future that will
// complete when the flow is done.
Flow.fromSinkAndSourceCoupledMat(in, outSource) { case (done, out) =>
// done is a future that will complete when the flow finishes
// out is our "materialised" output queue. All we need do is put it in the box we made earlier
out.offer("Connected!")
outOpt = Some(out)
done.foreach { _ =>
outOpt = None
println("Connection closed!")
}
}
}
} }
package controllers
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Sink, Source, SourceQueueWithComplete}
import play.api.mvc.WebSocket
import scala.concurrent.ExecutionContext
object WebSocketHelper {
def createWebSocket[In, Out](
queueReady: SourceQueueWithComplete[Out] => Unit,
handleMsg: In => Unit,
whenDone: SourceQueueWithComplete[Out] => Unit
)(implicit ec: ExecutionContext, t: WebSocket.MessageFlowTransformer[In, Out]):WebSocket = {
WebSocket.accept[In, Out] { _ =>
// Our source is going to be an asynchronous queue we can push messages to with 'out.offer(msg)'. This is going to
// be the "source" of our flow. A complication is it doesn't create the queue immediately - this just says
// *when* this source is connected to a flow, create a queue. So we don't have a queue yet.
val outSource = Source.queue[Out](50, OverflowStrategy.backpressure)
// Create a "sink" that describes what we want to do with each message. Here, I'm just going to count the characters
// and echo it straight back out on the "out" queue.
val in = Sink.foreach[In](handleMsg)
// This defines a "flow" -- something that has an input and an output.
// "Coupled" means that if the input closes, the output will close
// "Mat" means "materialised" -- ie, it'll give us the output queue that gets created and the Future that will
// complete when the flow is done.
Flow.fromSinkAndSourceCoupledMat(in, outSource) { case (done, out) =>
queueReady(out)
done.onComplete { _ => whenDone(out) }
}
}
}
}
...@@ -11,5 +11,9 @@ POST /example/json controllers.ExamplesController.receiveJsonPo ...@@ -11,5 +11,9 @@ POST /example/json controllers.ExamplesController.receiveJsonPo
GET /example/game controllers.ExamplesController.game(id:Option[String]) GET /example/game controllers.ExamplesController.game(id:Option[String])
GET /example/socket controllers.ExamplesController.websocket()
GET /example/time controllers.ExamplesController.websocketWithHelper()
# Map static resources from the /public folder to the /assets URL path # Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset) GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment