From 6c7f0e835bc23db0885e44ef0684b29fcbcff054 Mon Sep 17 00:00:00 2001 From: Will Billingsley <wbilling@une.edu.au> Date: Mon, 17 Sep 2018 19:50:49 +1000 Subject: [PATCH] Added examples for WebSocket and EventSource --- app/controllers/ExamplesController.java | 89 ++++++++++++++++++++++++- conf/routes | 4 ++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/app/controllers/ExamplesController.java b/app/controllers/ExamplesController.java index d1bd298..d4b0fa7 100644 --- a/app/controllers/ExamplesController.java +++ b/app/controllers/ExamplesController.java @@ -1,14 +1,23 @@ package controllers; +import akka.Done; +import akka.stream.Graph; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.SourceQueueWithComplete; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import play.libs.EventSource; import play.libs.Json; import play.mvc.Controller; import play.mvc.Result; +import play.mvc.WebSocket; -import java.util.Optional; -import java.util.Random; +import java.util.*; +import java.util.concurrent.CompletionStage; /** * This controller contains an action to handle HTTP requests @@ -73,4 +82,80 @@ public class ExamplesController extends Controller { } } + List<SourceQueueWithComplete<String>> queues = new ArrayList<>(); + + { + Timer timer = new Timer(); + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + String s = "The time is now " + System.currentTimeMillis(); + + for (SourceQueueWithComplete<String> queue : queues) { + queue.offer(s); + } + } + }; + timer.schedule(timerTask, 1000, 1000); + } + + + /** + * An example websocket + */ + public WebSocket webSocket() { + return WebSocket.Text.accept(request -> + + + // A websocket is made from a "flow", which has an input and an output + Flow.fromSinkAndSourceCoupledMat( + // First, the input. Sink.foreach lets us script what to do with every incoming message + Sink.foreach(msg -> { + // let's just print the messages out + System.out.println("Message recieved: " + msg); + }), + + // Next, what is the source of output going to be? This says it'll be a queue that we can push + // messages to + Source.queue(50, OverflowStrategy.backpressure()), + + // This lambda is called when the flow starts. The second parameter is our queue. + // The first parameter is a "CompletionStage" (Java's version of a Promise) that will complete when + // the socket is closed. + (done, queue) -> { + // As soon as we have the queue, add it to the list of queues to push ticks out on + queues.add(queue); + + // When the socket closes, remove the queue from the list of queues to push ticks out on + done.whenComplete((success, error) -> queues.remove(queue)); + return done; + } + ) + ); + } + + + /** + * EventSource example. Not supported on Edge + * @return + */ + public Result eventSource() { + + // We're using an asynchronous queue as the source. What we get back is a "source" - the actual queue + // will be created for us when the flow starts. + Source<String, SourceQueueWithComplete<String>> source = Source.queue(50, OverflowStrategy.backpressure()); + + // An eventsource is a chunked response + return ok().chunked( + // For each string from the source, create an event. This produces a source of events. + // viaMat "materialises" this when the eventsource starts. What we do is in the callback + source.map((str) -> EventSource.Event.event(str)).viaMat(EventSource.flow(), (queue, done) -> { + // In our callback, the first argment is the queue. Now we have an actual queue we can push messages to. + // Let's put it in the list of queues so the tickTimer (above) will push messages to it. + queues.add(queue); + return done; + }) + ).as("text/event-stream"); + } + } diff --git a/conf/routes b/conf/routes index 1cb4b5a..325a3c7 100644 --- a/conf/routes +++ b/conf/routes @@ -10,5 +10,9 @@ POST /example/json controllers.ExamplesController.receiveJsonPo GET /example/game controllers.ExamplesController.game(id ?= null) +GET /example/websocket controllers.ExamplesController.webSocket() + +GET /example/eventsource controllers.ExamplesController.eventSource() + # Map static resources from the /public folder to the /assets URL path GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset) -- GitLab