Im vorherigen Blogpost haben wir gesehen wie man ein einfaches Pub/Sub System mit den Bordmittel von Spring umsetzen kann.
Seit Version 10.0 hat die Google Guava Library auch Support um ein einfaches Publish-Subscribe System aufzubauen. Das folgende Beispiel zeigt eine Möglichkeit wie man dies in einer Spring Applikation einsetzen kann. Die Pub/Sub Klassen der Google Guava Library haben ein paar kleine Vorteile gegenüber einer reinen Spring Lösung die im folgenden beschrieben werden.
Zentrales Element ist die Klasse EventBus. Dies ist der Vermittler der die Messages von einem Publisher entgegennimmt und an interessierte Subscriber weiter leitet. Der EventBus wird sinnvollerweise als Singleton instanziiert und kann sehr einfach mit Spring verwaltet werden.
|
1 2 3 4 5 6 7 8 |
@Configuration public class SpringConfig { @Bean public EventBus eventBus() { EventBus eventBus = new EventBus(); return eventBus; } } |
Eine Applikation kann aber auch mehrere EventBus Instanzen besitzen um damit beispielsweise verschiedene Bereiche/Module voneinander abzugrenzen.
Wie mit der Spring-Lösung benötigen wir auch hier Objekte die als Events vom Publisher zum Subscriber gesendet werden. Im Gegensatz zur reinen Spring-Lösung kann der EventBus von Guava jedes beliebige Objekt als Event verarbeiten. Es muss kein Interface implementiert oder eine Superklasse abgeleitet werden.
|
1 2 3 4 5 6 7 8 9 10 11 |
public class MsgEvent { private String message; public MsgEvent(String message) { this.message = message; } public String getMessage() { return message; } } |
Der Publisher benötigt eine Referenz des EventBus damit er Events versenden kann. Mit Aufruf der post Methode wird das Event Objekt versendet.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Service public class Publisher { private EventBus eventBus; @Autowired public Publisher(EventBus eventBus) { this.eventBus = eventBus; } public void publishMsgEvent(String message) { eventBus.post(new MsgEvent(message)); } } |
Der Subscriber ist in diesem Beispiel auch ein gewöhnliches Spring Managed Bean. Es muss kein Interface implementiert werden, aber dafür muss für jedes Event das man verarbeiten möchte eine Methode erstellt werden die mit der Annotation @Subscribe gekennzeichnet ist. Die Methode muss public sein und als einzigen Parameter das gewünschte Event Objekt besitzen.
|
1 2 3 4 5 6 7 |
@Service public class Subscriber { @Subscribe public void handleMsgEvent(MsgEvent event) { System.out.println("MsgEvent: " + event.getMessage()); } } |
Ein Subscriber ist nicht auf ein Event beschränkt sondern kann mehrere verschiedene Events verarbeiten in dem für jedes gewünschte Event eine Methode implementiert wird.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Service public class Subscriber { @Subscribe public void handleMsgEvent(MsgEvent event) { ... } @Subscribe public void handleSpecialMsgEvent(SpecialMsgEvent event) { ... } ... } |
Als letzten Schritt müssen sich alle Subscriber beim EventBus registrieren. Dies geschieht mit der Methode EventBus.register. Wir könnten die EventBus Instanz in unseren Subscriber injizieren und dann zum Beispiel im Konstruktor oder einer @PostConstruct Methode eventBus.register(this) aufrufen. Bei vielen verschiedenen Subscriber Klassen kann dies umständlich werden oder sogar vergessen werden.
Da in diesem Beispiel die Subscriber als Spring Beans implementiert sind können wir stattdessen einen BeanPostProcessor implementieren der die Registrierung übernimmt. Die Methoden des BeanPostProcessor werden für jedes registrierte Bean des ApplicationContext aufgerufen. In der Methode postProcessAfterInitialization untersucht der Processor ob eine Methode mit der Annotation @Subscribe gekennzeichnet ist. Wenn ja wird das Bean im EventBus registriert.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
@Service public class EventBusSubscriberRegisterar implements BeanPostProcessor { private EventBus eventBus; @Autowired public EventBusSubscriberRegisterar(EventBus eventBus) { this.eventBus = eventBus; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { for (Method method : bean.getClass().getMethods()) { if (method.getAnnotation(Subscribe.class) != null) { eventBus.register(bean); break; } } return bean; } } |
Damit ist das Pub/Sub System einsatzbereit und ein Aufruf der Publisher.publishMsgEvent(…) löst den Aufruf des Listeners aus.
Hierarchische Events
Ein Feature des Guava EventBuses ist das die Event Objekte hierarchisch sind. Eine Subscriber Methode erhält nicht nur die Events des spezifizerten Parametertyps sondern auch von allen Subklassen davon. Nehmen wir als Beispiel die folgenden beiden Event Objekte.
|
1 2 3 4 5 6 7 |
public class MsgEvent { ... } public class SpecialMsgEvent extends MsgEvent { ... } |
Ein Subscriber kann Messages vom Typ SpecialMsgEvent verarbeiten indem er folgende Methode implementiert. Diese Methode wird nur dann aufgerufen wenn ein Publisher ein SpecialMsgEvent versendet.
|
1 2 3 4 |
@Subscribe public void handleSpecialMsgEvent(SpecialMsgEvent event) { ... } |
Folgende Methode mit MsgEvent als Parameter wird dagegen immer dann aufgerufen wenn ein MsgEvent oder ein SpecialMsgEvent verschickt wird, da SpecialMsgEvent eine Subklasse von MsgEvent ist.
|
1 2 3 4 |
@Subscribe public void handleMsgEvent(MsgEvent event) { ... } |
Es ist daher möglich eine Methode mit Object als Parameter zu implementieren. Diese Methode wird für jedes versendete Event aufgerufen, da jedes Objekt in Java eine Subklasses von Object ist.
|
1 2 3 4 |
@Subscribe public void handleAllEvents(Object event) { ... } |
DeadEvent
Ein nützliches Feature während der Entwicklung ist das DeadEvent. Eine Subscribe Methode mit DeadEvent als Parameter wird immer dann aufgerufen wenn für ein Event keine eigene Subscribe Methode implementiert wurde.
|
1 2 3 4 5 |
@Subscribe public void handleDeadEvents(DeadEvent deadEvent) { System.out.print("Event without a subscriber: "); System.out.println(deadEvent.getEvent()); } |
Damit findet man recht einfach heraus ob unnütze Events versendet werden oder ob eine Subscribe Methode vergessen wurde zu implementieren.
Asynchrones Eventhandling
Gleich wie bei der reinen Spring-Lösung ruft der EventBus die Subscribe Methoden im gleichen Thread auf wie der Aufruf der eventBus.post Methode. Auch hier kann ein Subscriber den Ablauf blockieren indem umfangreiche Berechnungen gestartet werden. Um das zu verhindern benutzt man anstatt EventBus die Klasse AsyncEventBus. Diese Klasse ruft die Subscribe Methoden in eigenen Threads auf. Dem AsyncEventBus Konstruktor muss ein Executor übergeben werden. Im folgenden ein Beispiel mit einem FixedThreadPool Executor mit 10 Threads. Damit ist es dem EventBus möglich 10 Listeners gleichzeitig in unterschiedlichen Threads aufzurufen.
|
1 2 3 4 5 |
@Bean public EventBus eventBus() { AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newFixedThreadPool(10)); return asyncEventBus; } |
Thread-Safe Handlers
Ein weiteres Feature ist die Möglichkeit Event Handler Methoden als Thread-Safe zu kennzeichen.
|
1 2 3 4 5 |
@Subscribe @AllowConcurrentEvents public void handleMsgEvent(MsgEvent event) { ... } |
Ist die Methode nur mit @Subscribe gekennzeichnet dann wird sie vom EventBus mit einem synchronized Block umschlossen und kann dadurch nicht gleichzeitig mehrfach ausgeführt werden. Die Annotation @AllowConcurrentEvents entfernt diesen synchronized Block und ermöglicht es mehreren Threads die Handler Methode gleichzeitig aufzurufen und auszuführen.
Sourcecode dieser Beispiel: https://github.com/ralscha/playground/tree/master/eventbus/src/main/java/ch/rasc/pubsub/guava
Habt ihr schon was von MBassador gehört –> https://github.com/bennidi/mbassador. Funktioniert ähnlich wie Guava’s Bus aber bietet mehr Funktionen und ist um einiges schneller. Für uns war entscheidend, dass Weak References für die Listener benutzt werden. Damit war es ganz einfach MBassador in unserem Spring basierten System einzubauen. Bisher läuft’s super.
Danke für den Link. Die Library kannte ich bisher nicht, sieht sehr interessant aus.