计算机系统应用教程网站

网站首页 > 技术文章 正文

SpringBoot RabbitMQ消费者源码解析

btikc 2024-09-04 03:11:35 技术文章 14 ℃ 0 评论

带着问题分析,从Spring RabbitMQ消费者启动,到接收消息和执行消费逻辑,一步步解析其内部实现。

1. 消费者如何启动过程?

1.1 启动配置类

创建RabbitListenerAnnotationBeanPostProcessor

@Configuration
public class RabbitBootstrapConfiguration {

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
		return new RabbitListenerAnnotationBeanPostProcessor();
	}
.....
}

1.2 创建消费者核心逻辑

核心逻辑在RabbitListenerAnnotationBeanPostProcessor,在Spring Bean初始化过程中执行。 对于每个消息监听都会创建对应的MessageListenerContainer(默认实现为SimpleMessageListenerContainer)

// 通过BeanPostProcessor在Bean创建后,创建消息监听器
public class RabbitListenerAnnotationBeanPostProcessor
		implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
			SmartInitializingSingleton {
  ......
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 通过反射获取@RabbitListener修饰的方法
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
  for (RabbitListener rabbitListener : lm.annotations) {
	// 创建MethodRabbitListenerEndpoint,并注册到RabbitListenerEndpointRegistrar
	processAmqpListener(rabbitListener, lm.method, bean, beanName);
  }
}
if (metadata.handlerMethods.length > 0) {
  processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
	Method methodToUse = checkProxy(method, bean);
	MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
	endpoint.setMethod(methodToUse);
	processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
// 创建RabbitMQ消费者核心逻辑
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
		Object adminTarget, String beanName) {
	endpoint.setBean(bean);
	endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
	endpoint.setId(getEndpointId(rabbitListener));
	// resolveQueues方法会处理创建队列的工作
	endpoint.setQueueNames(resolveQueues(rabbitListener));
	.......
// registerEndpoint()里核心创建MessageListenerContainer,其默认实现是SimpleMessageListenerContainer
this.registrar.registerEndpoint(endpoint, factory);
}
 ......
}

1.3 PS: BeanPostPorcessor如何被Spring处理?

大家都熟悉的Spring Bean初始化流程,但还是唠叨一下

调用链路:getBean -> doGetBean -> createBean -> initializeBean ->applyBeanPostProcessorsBeforeInitialization -> applyBeanPostProcessorsAfterInitialization

public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory
		implements AutowireCapableBeanFactory{
	// 创建一个Bean实例对象,应用post-processors
protected Object createBean(String beanName, RootBeanDefinition mbd, Object[] args) throws BeanCreationException {
	// 各种准备工作
	......
   // 最后调用doCreateBean
  Object beanInstance = doCreateBean(beanName, mbdToUse, args);
  if (logger.isDebugEnabled()) {
	logger.debug("Finished creating instance of bean '" + beanName + "'");
  }
  return beanInstance;
}    

protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[] args)
		throws BeanCreationException {
  ......
// Initialize the bean instance.
	Object exposedObject = bean;
	try {
		populateBean(beanName, mbd, instanceWrapper);
		if (exposedObject != null) {
	// 调用initializeBean
			exposedObject = initializeBean(beanName, exposedObject, mbd);
		}
	}
	catch (Throwable ex) {
  .....
	}
}
 
 // 初始化Bean实例
protected Object initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd) {
  ......
  if (mbd == null || !mbd.isSynthetic()) {
	wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
  }
  try {
	invokeInitMethods(beanName, wrappedBean, mbd);
  }
  catch (Throwable ex) {
	throw new BeanCreationException(
		(mbd != null ? mbd.getResourceDescription() : null),
		beanName, "Invocation of init method failed", ex);
  }
  if (mbd == null || !mbd.isSynthetic()) {
	wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
  } 
  return wrappedBean;
} 
}

2. RabbitMQ消息如何被消费

2.1 SimpleMessageListenerContainer

上面说了消费者启动会创建SimpleMessageListenerContainer,它启动时会创建一个AsyncMessageProcessingConsumer内部类的对象(实现了Runnable接口,核心属性是BlockingQueueConsumer),AsyncMessageProcessingConsumer的run()通过while循环不断接收消息并调用我们使用@RabbitListener修饰的方法实现的消费逻辑。

@Override
protected void doStart() throws Exception {
	......
	super.doStart();
	synchronized (this.consumersMonitor) {
		if (this.consumers != null) {
			throw new IllegalStateException("A stopped container should not have consumers");
		}
		// 根据配置的并发数创建对应数量BlockingQueueConsumer 
		int newConsumers = initializeConsumers();
	......
		Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
		for (BlockingQueueConsumer consumer : this.consumers) {
			AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
			processors.add(processor);
			// 执行AsyncMessageProcessingConsumer,轮询调用获取队列里的消息并执行消费逻辑
			getTaskExecutor().execute(processor);
			if (getApplicationEventPublisher() != null) {
				getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
			}
		}
		for (AsyncMessageProcessingConsumer processor : processors) {
			FatalListenerStartupException startupException = processor.getStartupException();
			if (startupException != null) {
				throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
			}
		}
	}
}

2.2 BlockingQueueConsumer

BlockingQueueConsumer扮演一个解耦消息接收和消息消费的角色,一方面负责承接Channel接收的消息并压入BlockingQueue<Delivery> queue,另一方面被AsyncMessageProcessingConsumer轮询调用获取队列里的消息并执行消费逻辑。

// 从队列中获取消息
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
......
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
	throw new ConsumerCancelledException();
}
return message;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
		throws IOException {
	......
	try {
	// 如果BlockingQueueConsumer已被标记为停止,调用offer将消息入队,如果队列满了会马上返回false
	if (BlockingQueueConsumer.this.abortStarted > 0) {
		//如果offer失败,发送basic.nack命令通知服务端消息没有消费成功,然后发送basic.cancel命令通知服务端取消订阅,服务端不再发送消息到该消费者
		if (!BlockingQueueConsumer.this.queue.offer(
				new Delivery(consumerTag, envelope, properties, body, this.queue),
				BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

			RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
			// Defensive - should never happen
			BlockingQueueConsumer.this.queue.clear();
			getChannel().basicNack(envelope.getDeliveryTag(), true, true);
			getChannel().basicCancel(consumerTag);
			try {
				getChannel().close();
			}
			catch (TimeoutException e) {
				// no-op
			}
		}
	}
	else {
	// 如果BlockingQueueConsumer没有标记为停止,调用put入队,如果队列空间满了则会一直等待直到空间可用
		BlockingQueueConsumer.this.queue
				.put(new Delivery(consumerTag, envelope, properties, body, this.queue));
	}
	}
	catch (InterruptedException e) {
		Thread.currentThread().interrupt();
	}
}

注:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表