2018-09-26 10:22

  一、问题描述

  在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。如果是同一个jvm里面通知的话,就可以使用EventBus。由于EventBus使用起来简单、便捷,因此,工作中会经常用到。深入理解该框架的原理就很有必要。

  二、框架解析

  2.1、组织结构

  eventbus的组织结构如下:

  


  eventbus主要有以下几部分组成:

  1、eventbus、asyncEventBus:事件发送器。

  2、event:事件承载单元。

  3、SubscriberRegistry:订阅者注册器,将订阅者注册到event上,即将有注解Subscribe的方法和event绑定起来。

  4、Dispatcher:事件分发器,将事件的订阅者调用来执行。

  5、Subscriber、SynchronizedSubscriber:订阅者,并发订阅还是同步订阅。

  2.2、运行原理

  1、eventbus是基于注册监听的方式来运行的,因此,首先需要将eventbus,然后才会有事件及监听者。新建eventbus或者AsyncEventBus的方式如下:

  EventBus eventBus = new EventBus();

  或者

  BlockingQueueworkQueue = new LinkedBlockingQueue<>(20);

  ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,

  30, TimeUnit.SECONDS, workQueue);

  AsyncEventBus asyncEventBus = new AsyncEventBus(executor);

  2、注册监听者。

  eventBus.register(eventListener);

  底层就是将类eventListener中所有注解有Subscribe的方法与其Event对放在一个map中(一个event可以对应多个Subscribe的方法)。实现如下:

  void register(Object listener) {

  Multimap<class, Subscriber> listenerMethods = findAllSubscribers(listener);

  for (Entry<class, Collection> entry : listenerMethods.asMap().entrySet()) {

  Class eventType = entry.getKey();

  CollectioneventMethodsInListener = entry.getValue();

  CopyOnWriteArraySeteventSubscribers = subscribers.get(eventType);

  if (eventSubscribers == null) {

  CopyOnWriteArraySetnewSet = new CopyOnWriteArraySet<>();

  eventSubscribers =

  MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);

  }

  eventSubscribers.addAll(eventMethodsInListener);

  }

  }

  3、事件发送:执行指定事件类型的订阅者(包含了method),从订阅者中获取指定事件的订阅者,然后按照规则(同步、异步)执行指定的方法。

  public void post(Object event) {

  IteratoreventSubscribers = subscribers.getSubscribers(event);

  if (eventSubscribers.hasNext()) {

  dispatcher.dispatch(event, eventSubscribers);

  } else if (!(event instanceof DeadEvent)) {

  // the event had no subscribers and was not itself a DeadEvent

  post(new DeadEvent(this, event));

  }

  }

  上述代码说明,如果事件没有监听者,就当作死亡事件来对待。

  /** Dispatches {@code event} to this subscriber using the proper executor. */

  final void dispatchEvent(final Object event) {

  executor.execute(

  new Runnable() {

  @Override

  public void run() {

  try {

  invokeSubscriberMethod(event);

  } catch (InvocationTargetException e) {

  bus.handleSubscriberException(e.getCause(), context(event));

  }

  }

  });

  }

  void invokeSubscriberMethod(Object event) throws InvocationTargetException {

  try {

  method.invoke(target, checkNotNull(event));

  } catch (IllegalArgumentException e) {

  throw new Error("Method rejected target/argument: " + event, e);

  } catch (IllegalAccessException e) {

  throw new Error("Method became inaccessible: " + event, e);

  } catch (InvocationTargetException e) {

  if (e.getCause() instanceof Error) {

  throw (Error) e.getCause();

  }

  throw e;

  }

  }

  这里就说明,最后就是被订阅的方法被调用。

  4、EventBus与AsyncEventBus的区别

  从字面上看,AsyncEventBus是异步的EventBus,那么EventBus应该就是同步的了。EventBus的executor为MoreExecutors.directExecutor(),其实现如下:

  public static Executor directExecutor() {

  return DirectExecutor.INSTANCE;

  }

  /** See {@link #directExecutor} for behavioral notes. */

  private enum DirectExecutor implements Executor {

  INSTANCE;

  @Override

  public void execute(Runnable command) {

  command.run();

  }

  @Override

  public String toString() {

  return "MoreExecutors.directExecutor()";

  }

  }

  其execute方法直接执行线程的run方法,即同步调用run方法执行。EventBus的dispatcher为PerThreadQueuedDispatcher。其dispatch方法如下:

  @Override

  void dispatch(Object event, Iteratorsubscribers) {

  checkNotNull(event);

  checkNotNull(subscribers);

  Queue<:Event> queueForThread = queue.get();

  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {

  dispatching.set(true);

  try {

  Event nextEvent;

  while ((nextEvent = queueForThread.poll()) != null) {

  while (nextEvent.subscribers.hasNext()) {

  nextEvent.subscribers.next().dispatchEvent(nextEvent.event);

  }

  }

  } finally {

  dispatching.remove();

  queue.remove();

  }

  }

  }

  dispatchEvent的实现如下:

  final void dispatchEvent(final Object event) {

  executor.execute(

  new Runnable() {

  @Override

  public void run() {

  try {

  invokeSubscriberMethod(event);

  } catch (InvocationTargetException e) {

  bus.handleSubscriberException(e.getCause(), context(event));

  }

  }

  });

  }

  因此,整个执行过程如下:

  


  整个过程都是同步方式执行,因此,EventBus是同步的。

  AsyncEventBus的dispatcher为LegacyAsyncDispatcher,executor为自己指定的线程池。运行流程如下:

  


  虚线为线程池异步调度,因此,AsyncEventBus为异步方式。

  5、AllowConcurrentEvents的作用

  它所在的代码为:

  static Subscriber create(EventBus bus, Object listener, Method method) {

  return isDeclaredThreadSafe(method)

  ? new Subscriber(bus, listener, method)

  : new SynchronizedSubscriber(bus, listener, method);

  }

  private static boolean isDeclaredThreadSafe(Method method) {

  return method.getAnnotation(AllowConcurrentEvents.class) != null;

  }

  即如果订阅者方法上有注解AllowConcurrentEvents,则返回Subscriber,否则,返回SynchronizedSubscriber。SynchronizedSubscriber的字面意思为同步订阅者,它的实现代码为:

  @Override

  void invokeSubscriberMethod(Object event) throws InvocationTargetException {

  synchronized (this) {

  super.invokeSubscriberMethod(event);

  }

  }

  即没有使用注解AllowConcurrentEvents的订阅者,在并发环境中,都是串行执行。这在高并发环境中,会严重影响性能。

  三、使用案例

  3.1、eventbus定义

  @Configuration

  public class ConfigBean {

  @Bean

  public EventBus executorService() {

  BlockingQueueworkQueue = new LinkedBlockingQueue<>(20);

  ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,

  30, TimeUnit.SECONDS, workQueue);

  return new AsyncEventBus(executor);

  }

  }

  3.2、注册与事件发送

  @Service

  public class TestService implements InitializingBean {

  @Autowired

  private EventListener eventListener ;

  @Autowired

  private EventBus eventBus ;

  public void postEvent(){

  eventBus.post(new LoginEvent("iwill","123456"));

  }

  @Override

  public void afterPropertiesSet() throws Exception {

  eventBus.register(eventListener);

  }

  }

  3.3、订阅者定义

  package com.iwill.eventBus.listener;

  import com.google.common.eventbus.Subscribe;

  import com.iwill.eventBus.event.LoginEvent;

  import com.iwill.eventBus.event.RegisterEvent;

  import org.springframework.stereotype.Component;

  @Component

  public class EventListener {

  @Subscribe

  public void subscribeLoginEvent1(LoginEvent event){

  System.out.println("method 1 : receive login event ");

  }

  @Subscribe

  public void subscribeLoginEvent2(LoginEvent event){

  System.out.println("method 2 : receive login event ");

  }

  @Subscribe

  public void subscribeRegisterEvent(RegisterEvent event){

  try{

  Thread.sleep(10000L);

  }catch (Exception exp){

  exp.printStackTrace();

  }

  System.out.println("method : receive register event ");

  }

  }

  四、注意事项

  1、在高并发的环境下使用AsyncEventBus时,发送事件可能会出现异常,因为它使用的线程池,当线程池的线程不够用时,会拒绝接收任务,就会执行线程池的拒绝策略,如果需要关注是否提交事件成功,就需要将线程池的拒绝策略设为抛出异常,并且try-catch来捕获异常。如下:

  try {

  eventBus.post(new LoginEvent("iwill", "123456"));

  }catch (Exception exp){

  //TODO 落表或者其他处理

  }

  2、本文用到的guava版本如下:

  

  com.google.guava

  guava

  26.0-jre

  原文出处:https://my.oschina.net/yangjianzhou/blog/2208677

评论