In an earlier post I used Server-Sent Events (SSE) from browser applications. The same protocol also works well for backend services and command-line tools.
Server-Sent Events provide a simple way to stream messages from a server to a client over a long-lived HTTP connection.
Unlike WebSocket, SSE is one-way: the server sends events and the client receives them. For dashboards, notifications, monitoring, and activity feeds, that is often exactly what you want.
In the browser, the EventSource API opens the connection, parses the event stream, reconnects automatically, and keeps track of the last event id.
Under the hood, SSE is just HTTP streaming with a small text-based protocol. The format is defined in the HTML specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
Because SSE is plain HTTP, you can inspect it with generic tools.
For the following examples, I use the server from this project. It periodically sends memory information from the Java Virtual Machine. To start it, run mvn spring-boot:run.
A quick way to inspect the stream is curl.
curl.exe -N http://localhost:8080/memory
You should see the raw SSE stream.
data:{"heap":155193288,"nonHeap":63686384,"ts":1519646918675}
data:{"heap":155193288,"nonHeap":63820536,"ts":1519646919676}
data:{"heap":155193288,"nonHeap":63820568,"ts":1519646920675}
Raw HTTP Access ¶
Any HTTP client can read an SSE endpoint, but a generic HTTP client only sees text lines. It does not understand event boundaries, multi-line payloads, comments, retry instructions, or reconnection state.
For example, these three data: lines represent one logical event.
data: line one
data: line two
data: line three
If you only want to inspect a stream, Spring WebClient is enough. The following example consumes the local /memory endpoint and logs only the actual payload.
public static void main(String[] args) {
ParameterizedTypeReference<ServerSentEvent<String>> typeRef = new ParameterizedTypeReference<>() {
/* nothing here */};
WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();
client.get().uri("/memory").accept(MediaType.TEXT_EVENT_STREAM).retrieve()
.bodyToFlux(typeRef).map(ServerSentEvent::data).filter(Objects::nonNull)
.doOnNext(data -> logger.info("Received: {}", data))
.blockLast(Duration.ofMinutes(10));
}
Example output:
Received: {"heap":160226728,"nonHeap":64349072,"ts":1519647595676}
Received: {"heap":160226728,"nonHeap":64348752,"ts":1519647596676}
Received: {"heap":160226728,"nonHeap":64331664,"ts":1519647597675}
That approach is useful for diagnostics, but for long-running consumers it is more convenient to use a client library that understands the SSE protocol.
SSE Client ¶
In Java, okhttp-eventsource is a practical choice. It is built on top of OkHttp and handles parsing, automatic reconnection, and event dispatch for you.
The first step is to implement a BackgroundEventHandler.
public class SimpleEventHandler implements BackgroundEventHandler {
@Override
public void onOpen() throws Exception {
System.out.println("onOpen");
}
@Override
public void onClosed() throws Exception {
System.out.println("onClosed");
}
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
System.out.println(messageEvent.getData());
}
@Override
public void onComment(String comment) throws Exception {
System.out.println("onComment");
}
@Override
public void onError(Throwable t) {
System.out.println("onError: " + t);
}
}
onOpen and onClosed are called when the connection is established or closed. onComment is invoked for SSE comments, and onError is called when the client encounters an error.
onMessage receives every data event. The first parameter contains the event name. If the server does not specify an event name, the value is message.
Here is a stream with two events. The first arrives as message, and the second arrives as time.
data: 11
event: time
data: 1928192192
To connect to the stream, create the handler and a BackgroundEventSource.
public static void main(String[] args) throws InterruptedException {
BackgroundEventHandler eventHandler = new SimpleEventHandler();
String url = String.format("http://localhost:8080/memory");
BackgroundEventSource.Builder builder = new BackgroundEventSource.Builder(eventHandler,
new EventSource.Builder(URI.create(url)).retryDelay(3, TimeUnit.SECONDS));
try (BackgroundEventSource eventSource = builder.build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
Example output:
onOpen
{"heap":175327208,"nonHeap":64556544,"ts":1519649749675}
{"heap":175327208,"nonHeap":64559880,"ts":1519649750675}
{"heap":175327208,"nonHeap":64612912,"ts":1519649751675}
{"heap":175327208,"nonHeap":64615616,"ts":1519649752676}
The builder in this example configures a base retry delay of three seconds.
new EventSource.Builder(URI.create(url)).retryDelay(3, TimeUnit.SECONDS)
The library applies backoff and jitter between reconnect attempts, which makes it a much better fit for a long-lived consumer than a plain HTTP reader.
Examples ¶
For the following examples, I created a small subinterface that provides default implementations for every method except onMessage.
public interface DefaultEventHandler extends BackgroundEventHandler {
@Override
default void onOpen() throws Exception {
// nothing here
}
@Override
default void onClosed() throws Exception {
// nothing here
}
@Override
default void onComment(String comment) throws Exception {
// nothing here
}
@Override
default void onError(Throwable t) {
// nothing here
}
}
That keeps the handlers focused on the event payload.
The first example consumes the Wikimedia RecentChange stream. It publishes live changes from Wikimedia projects and is documented here: https://wikitech.wikimedia.org/wiki/EventStreams
The handler parses the JSON payload, ignores canary events, and prints the change type together with the page title.
public class WikimediaRecentChangeHandler implements DefaultEventHandler {
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
try (JsonReader jsonReader = Json
.createReader(new StringReader(messageEvent.getData()))) {
JsonObject jsonObject = jsonReader.readObject();
JsonObject meta = jsonObject.getJsonObject("meta");
if (meta != null && "canary".equals(meta.getString("domain", ""))) {
return;
}
String title = jsonObject.getString("title", "");
String changeType = jsonObject.getString("type", "unknown");
System.out.println(changeType + " : " + title);
}
}
}
WikimediaRecentChangeHandler.java
The main class connects to the stream and registers the handler.
public static void main(String[] args) throws InterruptedException {
BackgroundEventHandler eventHandler = new WikimediaRecentChangeHandler();
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
BackgroundEventSource.Builder builder = new BackgroundEventSource.Builder(eventHandler,
new EventSource.Builder(URI.create(url)));
try (BackgroundEventSource eventSource = builder.build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
Example output:
edit : Q123456
new : Example article
edit : File:Example.jpg
log : User:Example
The next example uses the Wikimedia revision-create stream. It prints the user, page title, and revision id for each new revision.
The handler:
public class WikimediaRevisionCreateHandler implements DefaultEventHandler {
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
try (JsonReader jsonReader = Json
.createReader(new StringReader(messageEvent.getData()))) {
JsonObject jsonObject = jsonReader.readObject();
JsonObject meta = jsonObject.getJsonObject("meta");
if (meta != null && "canary".equals(meta.getString("domain", ""))) {
return;
}
JsonObject performer = jsonObject.getJsonObject("performer");
String user = performer != null ? performer.getString("user_text", "unknown")
: "unknown";
String title = jsonObject.getString("page_title", "");
long revisionId = jsonObject.getJsonNumber("rev_id").longValue();
System.out.println(user + " -> " + title + " (rev " + revisionId + ")");
}
}
}
WikimediaRevisionCreateHandler.java
The main class:
public static void main(String[] args) throws InterruptedException {
BackgroundEventHandler eventHandler = new WikimediaRevisionCreateHandler();
String url = "https://stream.wikimedia.org/v2/stream/mediawiki.revision-create";
BackgroundEventSource.Builder builder = new BackgroundEventSource.Builder(eventHandler,
new EventSource.Builder(URI.create(url)));
try (BackgroundEventSource eventSource = builder.build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
Example output:
ExampleUser -> Main Page (rev 123456789)
AnotherUser -> Talk:Example (rev 123456790)
You can find the source code for all examples on GitHub: https://github.com/ralscha/blog/tree/master/sse-client