Home | Send Feedback | Share on Bluesky |

Access Server-Sent Events from Java

Published: 26. February 2018  •  java

In an earlier post I used Server-Sent Events (SSE) from browser applications. The same protocol also works well for backend services and command-line tools.

Server-Sent Events provide a simple way to stream messages from a server to a client over a long-lived HTTP connection.

Unlike WebSocket, SSE is one-way: the server sends events and the client receives them. For dashboards, notifications, monitoring, and activity feeds, that is often exactly what you want.

In the browser, the EventSource API opens the connection, parses the event stream, reconnects automatically, and keeps track of the last event id.

Under the hood, SSE is just HTTP streaming with a small text-based protocol. The format is defined in the HTML specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events

Because SSE is plain HTTP, you can inspect it with generic tools.

For the following examples, I use the server from this project. It periodically sends memory information from the Java Virtual Machine. To start it, run mvn spring-boot:run.

A quick way to inspect the stream is curl.

curl.exe -N http://localhost:8080/memory

You should see the raw SSE stream.

data:{"heap":155193288,"nonHeap":63686384,"ts":1519646918675}

data:{"heap":155193288,"nonHeap":63820536,"ts":1519646919676}

data:{"heap":155193288,"nonHeap":63820568,"ts":1519646920675}

Raw HTTP Access

Any HTTP client can read an SSE endpoint, but a generic HTTP client only sees text lines. It does not understand event boundaries, multi-line payloads, comments, retry instructions, or reconnection state.

For example, these three data: lines represent one logical event.

data: line one
data: line two
data: line three

If you only want to inspect a stream, Spring WebClient is enough. The following example consumes the local /memory endpoint and logs only the actual payload.

  public static void main(String[] args) {
    ParameterizedTypeReference<ServerSentEvent<String>> typeRef = new ParameterizedTypeReference<>() {
      /* nothing here */};

    WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    client.get().uri("/memory").accept(MediaType.TEXT_EVENT_STREAM).retrieve()
        .bodyToFlux(typeRef).map(ServerSentEvent::data).filter(Objects::nonNull)
        .doOnNext(data -> logger.info("Received: {}", data))
        .blockLast(Duration.ofMinutes(10));
  }

SpringWebClient.java

Example output:

Received: {"heap":160226728,"nonHeap":64349072,"ts":1519647595676}
Received: {"heap":160226728,"nonHeap":64348752,"ts":1519647596676}
Received: {"heap":160226728,"nonHeap":64331664,"ts":1519647597675}

That approach is useful for diagnostics, but for long-running consumers it is more convenient to use a client library that understands the SSE protocol.

SSE Client

In Java, okhttp-eventsource is a practical choice. It is built on top of OkHttp and handles parsing, automatic reconnection, and event dispatch for you.

The first step is to implement a BackgroundEventHandler.

public class SimpleEventHandler implements BackgroundEventHandler {

  @Override
  public void onOpen() throws Exception {
    System.out.println("onOpen");
  }

  @Override
  public void onClosed() throws Exception {
    System.out.println("onClosed");
  }

  @Override
  public void onMessage(String event, MessageEvent messageEvent) throws Exception {
    System.out.println(messageEvent.getData());
  }

  @Override
  public void onComment(String comment) throws Exception {
    System.out.println("onComment");
  }

  @Override
  public void onError(Throwable t) {
    System.out.println("onError: " + t);
  }

}

SimpleEventHandler.java

onOpen and onClosed are called when the connection is established or closed. onComment is invoked for SSE comments, and onError is called when the client encounters an error.

onMessage receives every data event. The first parameter contains the event name. If the server does not specify an event name, the value is message.

Here is a stream with two events. The first arrives as message, and the second arrives as time.

data: 11

event: time
data: 1928192192

To connect to the stream, create the handler and a BackgroundEventSource.

  public static void main(String[] args) throws InterruptedException {
    BackgroundEventHandler eventHandler = new SimpleEventHandler();
    String url = String.format("http://localhost:8080/memory");
    BackgroundEventSource.Builder builder = new BackgroundEventSource.Builder(eventHandler, 
        new EventSource.Builder(URI.create(url)).retryDelay(3, TimeUnit.SECONDS));

    try (BackgroundEventSource eventSource = builder.build()) {
      eventSource.start();

      TimeUnit.MINUTES.sleep(10);
    }
  }

Simple.java

Example output:

onOpen
{"heap":175327208,"nonHeap":64556544,"ts":1519649749675}
{"heap":175327208,"nonHeap":64559880,"ts":1519649750675}
{"heap":175327208,"nonHeap":64612912,"ts":1519649751675}
{"heap":175327208,"nonHeap":64615616,"ts":1519649752676}

The builder in this example configures a base retry delay of three seconds.

new EventSource.Builder(URI.create(url)).retryDelay(3, TimeUnit.SECONDS)

The library applies backoff and jitter between reconnect attempts, which makes it a much better fit for a long-lived consumer than a plain HTTP reader.

Examples

For the following examples, I created a small subinterface that provides default implementations for every method except onMessage.

public interface DefaultEventHandler extends BackgroundEventHandler {

  @Override
  default void onOpen() throws Exception {
    // nothing here
  }

  @Override
  default void onClosed() throws Exception {
    // nothing here
  }

  @Override
  default void onComment(String comment) throws Exception {
    // nothing here
  }

  @Override
  default void onError(Throwable t) {
    // nothing here
  }

}

DefaultEventHandler.java

That keeps the handlers focused on the event payload.

The first example consumes the Wikimedia RecentChange stream. It publishes live changes from Wikimedia projects and is documented here: https://wikitech.wikimedia.org/wiki/EventStreams

The handler parses the JSON payload, ignores canary events, and prints the change type together with the page title.

public class WikimediaRecentChangeHandler implements DefaultEventHandler {

  @Override
  public void onMessage(String event, MessageEvent messageEvent) throws Exception {
    try (JsonReader jsonReader = Json
        .createReader(new StringReader(messageEvent.getData()))) {
      JsonObject jsonObject = jsonReader.readObject();

      JsonObject meta = jsonObject.getJsonObject("meta");
      if (meta != null && "canary".equals(meta.getString("domain", ""))) {
        return;
      }

      String title = jsonObject.getString("title", "");
      String changeType = jsonObject.getString("type", "unknown");
      System.out.println(changeType + " : " + title);
    }
  }

}

WikimediaRecentChangeHandler.java

The main class connects to the stream and registers the handler.

  public static void main(String[] args) throws InterruptedException {
    BackgroundEventHandler eventHandler = new WikimediaRecentChangeHandler();
    String url = "https://stream.wikimedia.org/v2/stream/recentchange";
    BackgroundEventSource.Builder builder = new BackgroundEventSource.Builder(eventHandler,
        new EventSource.Builder(URI.create(url)));

    try (BackgroundEventSource eventSource = builder.build()) {
      eventSource.start();

      TimeUnit.MINUTES.sleep(10);
    }
  }

WikimediaRecentChanges.java

Example output:

edit : Q123456
new : Example article
edit : File:Example.jpg
log : User:Example

The next example uses the Wikimedia revision-create stream. It prints the user, page title, and revision id for each new revision.

The handler:

public class WikimediaRevisionCreateHandler implements DefaultEventHandler {

  @Override
  public void onMessage(String event, MessageEvent messageEvent) throws Exception {
    try (JsonReader jsonReader = Json
        .createReader(new StringReader(messageEvent.getData()))) {
      JsonObject jsonObject = jsonReader.readObject();

      JsonObject meta = jsonObject.getJsonObject("meta");
      if (meta != null && "canary".equals(meta.getString("domain", ""))) {
        return;
      }

      JsonObject performer = jsonObject.getJsonObject("performer");
      String user = performer != null ? performer.getString("user_text", "unknown")
          : "unknown";
      String title = jsonObject.getString("page_title", "");
      long revisionId = jsonObject.getJsonNumber("rev_id").longValue();

      System.out.println(user + " -> " + title + " (rev " + revisionId + ")");
    }
  }

}

WikimediaRevisionCreateHandler.java

The main class:

  public static void main(String[] args) throws InterruptedException {
    BackgroundEventHandler eventHandler = new WikimediaRevisionCreateHandler();
    String url = "https://stream.wikimedia.org/v2/stream/mediawiki.revision-create";
    BackgroundEventSource.Builder builder = new BackgroundEventSource.Builder(eventHandler,
        new EventSource.Builder(URI.create(url)));

    try (BackgroundEventSource eventSource = builder.build()) {
      eventSource.start();

      TimeUnit.MINUTES.sleep(10);
    }
  }

WikimediaRevisionCreates.java

Example output:

ExampleUser -> Main Page (rev 123456789)
AnotherUser -> Talk:Example (rev 123456790)

You can find the source code for all examples on GitHub: https://github.com/ralscha/blog/tree/master/sse-client