Skip to content
Snippets Groups Projects
Commit 6c7f0e83 authored by Will Billingsley's avatar Will Billingsley
Browse files

Added examples for WebSocket and EventSource

parent 5bc09bc9
Branches master
No related tags found
No related merge requests found
package controllers;
import akka.Done;
import akka.stream.Graph;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import play.libs.EventSource;
import play.libs.Json;
import play.mvc.Controller;
import play.mvc.Result;
import play.mvc.WebSocket;
import java.util.Optional;
import java.util.Random;
import java.util.*;
import java.util.concurrent.CompletionStage;
/**
* This controller contains an action to handle HTTP requests
......@@ -73,4 +82,80 @@ public class ExamplesController extends Controller {
}
}
List<SourceQueueWithComplete<String>> queues = new ArrayList<>();
{
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
String s = "The time is now " + System.currentTimeMillis();
for (SourceQueueWithComplete<String> queue : queues) {
queue.offer(s);
}
}
};
timer.schedule(timerTask, 1000, 1000);
}
/**
* An example websocket
*/
public WebSocket webSocket() {
return WebSocket.Text.accept(request ->
// A websocket is made from a "flow", which has an input and an output
Flow.fromSinkAndSourceCoupledMat(
// First, the input. Sink.foreach lets us script what to do with every incoming message
Sink.foreach(msg -> {
// let's just print the messages out
System.out.println("Message recieved: " + msg);
}),
// Next, what is the source of output going to be? This says it'll be a queue that we can push
// messages to
Source.queue(50, OverflowStrategy.backpressure()),
// This lambda is called when the flow starts. The second parameter is our queue.
// The first parameter is a "CompletionStage" (Java's version of a Promise) that will complete when
// the socket is closed.
(done, queue) -> {
// As soon as we have the queue, add it to the list of queues to push ticks out on
queues.add(queue);
// When the socket closes, remove the queue from the list of queues to push ticks out on
done.whenComplete((success, error) -> queues.remove(queue));
return done;
}
)
);
}
/**
* EventSource example. Not supported on Edge
* @return
*/
public Result eventSource() {
// We're using an asynchronous queue as the source. What we get back is a "source" - the actual queue
// will be created for us when the flow starts.
Source<String, SourceQueueWithComplete<String>> source = Source.queue(50, OverflowStrategy.backpressure());
// An eventsource is a chunked response
return ok().chunked(
// For each string from the source, create an event. This produces a source of events.
// viaMat "materialises" this when the eventsource starts. What we do is in the callback
source.map((str) -> EventSource.Event.event(str)).viaMat(EventSource.flow(), (queue, done) -> {
// In our callback, the first argment is the queue. Now we have an actual queue we can push messages to.
// Let's put it in the list of queues so the tickTimer (above) will push messages to it.
queues.add(queue);
return done;
})
).as("text/event-stream");
}
}
......@@ -10,5 +10,9 @@ POST /example/json controllers.ExamplesController.receiveJsonPo
GET /example/game controllers.ExamplesController.game(id ?= null)
GET /example/websocket controllers.ExamplesController.webSocket()
GET /example/eventsource controllers.ExamplesController.eventSource()
# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment