从源码分析RocketMQ系列-start()方法详解

 2023-09-15 阅读 15 评论 0

摘要:导语   在之前的分析中主要介绍的是关于Producer 发送消息的逻辑,但是在实例代码中有一个操作是producer.start()方法,在Consumer中看到的方法是consumer.start(),那么这两个start()方法分别都做了什么样的操作。下面就来看看看这个start()方法为Prod

导语
  在之前的分析中主要介绍的是关于Producer 发送消息的逻辑,但是在实例代码中有一个操作是producer.start()方法,在Consumer中看到的方法是consumer.start(),那么这两个start()方法分别都做了什么样的操作。下面就来看看看这个start()方法为Producer和Consumer分别提供了什么样的操作?

文章目录

    • Producer方面
      • Producer start()方法的第一个case
      • this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 方法研究
    • Consumer
      • this.defaultMQPushConsumerImpl.start() 方法分析
    • MQClientInstance 的start()方法
    • 日志分析
      • 启动Producer日志
      • Consumer日志分析
    • 总结

Producer方面

thinkphp源码分析,  首先进入方法之后先设置了Producer的Group,这个Group的信息是从Namespace上获取的。在RocketMQ消息传递的过程中,最重要的两个信息一个是Topic,另一个就是Group。Topic是为了标识消息的主题、Group则是标识消息的类别。这个方法是由org.apache.rocketmq.client.producer.DefaultMQProducer类提供的,会看到方法体中还调用了一个org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl 对象的start()方法。后续的操作是注入了一个寻迹的Dispatcher。


public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
}

  通过上面代码的观察,会看到start()方法内部的start方法才是问题的关键。也就是下面这个方法,这个方法其中做了很多的初始化设置。下面就来分析一下这些初始化的操作都是什么?
在这里插入图片描述
  从上图中可以看到上来先进行的是服务状态的判断,那么这个服务状态是由谁来决定的呢?是由下面这段代码来决定的,从代码中来看是一个枚举类型并且标识了四种状态,分别是


private ServiceState serviceState = ServiceState.CREATE_JUST;public enum ServiceState {/*** Service just created,not start*/CREATE_JUST,/*** Service Running*/RUNNING,/*** Service shutdown*/SHUTDOWN_ALREADY,/*** Service Start failure*/START_FAILED;
}
  • CREATE_JUST:仅仅创建服务,但是并没有启动,也就是如果是这个状态的话,就需要有一个启动服务的操作。
  • RUNNING:服务运行中
  • SHUTDOWN_ALREADY :服务正确关闭
  • START_FAILED:服务启动失败,这个与之前的服务创建没有关系,只是表示服务的启动过程中出现问题。

Producer start()方法的第一个case

  从上面代码中就可以看到,start()方法中第一个Case是最为关键的一个Case,其他的Case都是在直接判断之后执行后续的代码,只有这个Case所做的事情是最多的,那么就来看看这个Case有那些操作。
在这里插入图片描述

  • 1、会看到上来第一步就来进行了服务状态的赋值,这个赋值是之前提到的启动失败。
  • 2、第二步进进行了一个配置检查,下图展示了这个配置检查其实就是对于Producer Group存在性做了Check
    在这里插入图片描述
  • 3、作为检查之后进入了如下的一个判断中,首先判断Producer Group是否是CLIENT_INNER_PRODUCER 标识的值,如果不是,就需要进入到判断中进行后续的操作

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();
}

这个操作是来自org.apache.rocketmq.client.ClientConfig类中的方法,从方法的实现来看是为实例设置了名称,并且这个名称是格式"pid@hostname",也就是进程ID好加上主机名的形式。

public void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = String.valueOf(UtilAll.getPid());}
}
  • 4、第四步的操作,从下面代码可以看到这个操作其实也是一个链式操作。

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

k8s 源码分析?getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
  这个方法实现类org.apache.rocketmq.client.impl.MQClientManager,也就是一个MQ的客户端管理器,通过这个管理器中的方法创建一个MQClient的实例。

getInstance()
   从这里会看到所调用的链式操作的方法其实是来自于同一个类。那么这样做在编码上有什么好处呢?


public static MQClientManager getInstance() {return instance;
}

  从类的本身来看,它没有继承任何类,也没有被任何类继承,单纯的就是做个一个MQ客户端的管理器这样的一个角色。那么就要怀疑它是不是单例的。这里简单的回顾一下单例模式的实现

  • 第一、有一个静态的无参构造器
  • 第二、暴露一个公共的实例接口
  • 第三、在内部有对应的该对象的初始化操作

public class MQClientManager {private final static InternalLogger log = ClientLogger.getLog();private static MQClientManager instance = new MQClientManager();private AtomicInteger factoryIndexGenerator = new AtomicInteger();private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =new ConcurrentHashMap<String, MQClientInstance>();private MQClientManager() {}public static MQClientManager getInstance() {return instance;}public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {return getAndCreateMQClientInstance(clientConfig, null);}public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}public void removeClientFactory(final String clientId) {this.factoryTable.remove(clientId);}
}

  从上面代码中进行查找似乎是都满足单例模式的条件。但是既然是单例模式,就需要考虑到线程安全的问题了。在多线程场景下是否会有多个实例被创建。或者说它本来就是以单例这种形式去封装一个单线程安全的类。

start工作法、  从代码逻辑分析可以看到,上面代码只是对MQClientManager类进行了单实例的封装,也就是对管理类的封装,并未对所产生的MQClientInstance进行多线程操作,这里首先明确一点就是,这个Manager是不是多线程操作的?首先进入到start()方法之后所有的操作都是对一个Producer线程而言的,所以说在Producer自身来讲是唯一的,如果有多个进程中创建Producer,那么每个进程都有自己对应的MQClientManager。并且在这个进程内是唯一的,所以不会出现多线程的问题。而这个Producer所产生的调用可以有多个MQClient的实例。

  • 5、创建完成实例之后进行,注册操作,注册传入的参数是Group参数和当前对象。

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

  这里主要的一点就是MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);而productTable是一个 private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); 也就是说ConcurrentHashMap是在高并发场景下线程安全的HashMap。用这个数据结构来存储所属组的Producer。如果有的话就返回已经存在,如果没有的话就将内容放入并且返回注册成功标识。这里简单的提出一个问题,如果同一个组下面的存在Producer容量太大,是否可以正常提供服务?


public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {if (null == group || null == producer) {return false;}MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);if (prev != null) {log.warn("the producer group[{}] exist already.", group);return false;}return true;
}
  • 6、对于TopicPublishInfo信息的存储

  毋庸置疑,这里的this.topicPublishInfoTable也是由ConcurrentHashMap来提供,只不过这个对象对于内容测操作看上去有点简单、并且放入Value的对象不是指定的某个对象而是一个完全全新的TopicPublishInfo对象,但是存入其中的Key确实被Topic标识。这里有人会问HashMap对于KV的存是有要求的,那么会不会出现问题呢?在大多数情况下要求是Topic是不一样的,但是也有一个配置,是让RocketMQ自己创建一个Topic。但是这种方式正在慢慢被抛弃。也就是说必须对于每类消息指定一个不一样的Topic,如果出现一样的,会给正常的业务逻辑有影响。这个影响是HashMap的性质而带来的。


this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

  这个地方创建的Value是一个新的TopicPublishInfo,并没有指定某个确定的TopicPublishInfo对象,那么在后续的操作中会不会有修改的地方呢?这个问题留到后面。

js中有startwith方法吗。  到这里所有关于前面提到的Create操作就已经完成了,接下来的操作就是对于启动的操作。这个操作是通过参数传入的,这个参数默认是true,也就是启动。


if (startFactory) {mQClientFactory.start();
}

  上面提到的这个方法其实是来自于org.apache.rocketmq.client.impl.factory.MQClientInstance类,并且这里才对多线程进行了锁定操作。也就是说到这里才进行了一个真正的多线程安全操作。那么为什么在之前的地方都没有,而唯独到这里进行了这样的一个操作?


public void start() throws MQClientException {//同步锁定synchronized (this) {//判断服务状态switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel// 开启请求响应的Channelthis.mQClientAPIImpl.start();// Start various schedule tasks// 启动各种计划任务this.startScheduledTask();// Start pull service// 开始 Pull servicethis.pullMessageService.start();// Start rebalance service// 开启负载均衡服务this.rebalanceService.start();// Start push service// 开启 push 服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}

  之前提到过的,对于MQClientManager是单例的,这个容易理解,但是对于这个单例的MQClientManager来说它产生的MQClientInstance可以有多个,那么在这个多个MQClientInstance启动的过程中就可能会出现线程安全问题。
  在这个启动方法中与SendMessage方法有关的最关键的一个地方就是下面这个地方,在之前分析Message发送的过程中,知道在发送Message会有Netty创建一个Connection。就直接使用对应标识的Channel了,也没有说这个Channel是如何进行开启的。这里就会告诉如何开启并且使用这个Channel的。

// Start request-response channel
// 开启请求响应的Channel
this.mQClientAPIImpl.start();

  在之前的SendMessage逻辑中提到过一个Channel的创建逻辑,那么这里的这个逻辑和之前的逻辑有什么区别和联系呢?细心的会发现,两个逻辑都来自于同一个类org.apache.rocketmq.remoting.netty.NettyRemotingClient,也就是说,这里start()在什么地方创建的Channel,就在哪个地方被使用了,并不是两套逻辑,之前我们看到的就是下面这个逻辑。


ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));

源码、  这个Future之前说过,就是我们希望得到的数据对象。而这里正好就是给了我们这样一个对象,在之前提到的Netty的NIO操作。到这里也可以解释清除了。

this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 方法研究

  首先这里是一个链式方法,在正常调用的时候start()方法传入的是true,这里将这个值变为了false,这个参数的传入唯一影响一个地方。org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl 这个类中的start方法中有如下的一个判断。就是说这里不需要再次进行实例方法start()的调用,这样会进入到一个死循环之中。这里这个方法也是被加上了 @Deprecated 注解。问什么需要有这样一个操作,就看下面的分析了


if (startFactory) {mQClientFactory.start();
}

Consumer

  前面分析了Producer的start()方法,下面就来看看Consumer的start方法,还是关注一个点this.defaultMQPushConsumerImpl.start();


@Override
public void start() throws MQClientException {//设置消费组setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));//启动消费端this.defaultMQPushConsumerImpl.start();//判断跟踪调度器是否为空if (null != traceDispatcher) {try {//开启跟踪调度器traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
}

this.defaultMQPushConsumerImpl.start() 方法分析

  首先进入方法体本身会发现这个方法被synchronized 加锁了,为什么要进行synchronized操作,或者说这里的这个操作是不是就为了防止线程安全问题,为什么在Producer端使用的时候只是在org.apache.rocketmq.client.impl.factory.MQClientInstance类的start()方法内部进行了synchronized操作。带着问题来进行分析。因为其中的方法实现都是不一样的。会不会跟其中的一些操作有关系。
进入方法


public synchronized void start() throws MQClientException

start-up、第一步 输入日志分析
  还是来一步一步的分析这个方法,首先进入到第一个Case。也就是CREATE_JUST,会看到在日志中找到了如下的一条信息

INFO RocketmqClient - the consumer [please_rename_unique_group_name_4] start beginning. messageModel=CLUSTERING, isUnitMode=false

上面这段日志输入是由下面这段代码来实现的,给出的信息分别是Consumer的Group,Message模式、是否订阅组默认是False。

log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

第二步 复制订阅信息修改实例名
  继续向下分析可以看到,拷贝的Subscription的信息,并且判断消息模式是否是MessageModel.CLUSTERING,那么来看看这里的消息模式

//拷贝订阅信息
this.copySubscription();
// 获取消息模式是否是 CLUSTERING
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();
}

会看到这里有两种消息模式,一种是广播,一种集群模式,会看到日志中消息模式都是集群。继续往下跟之前的操作一样,会为这个实例创建一个对应的名字,规则 pid@hostname

public enum MessageModel {/*** broadcast*/BROADCASTING("BROADCASTING"),/*** clustering*/CLUSTERING("CLUSTERING");private String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return modeCN;}
}

rocketmq顺序消息,第三步 获取实例
  会看到这里获取实例的方式与Producer获取实例操作是一样的,唯一的区别就是参数。

// 客户端管理器获取一个创建 defaultMQPushConsumer 实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

第四步 this.rebalanceImpl
  会看到这里集中进行了如下的一些操作


this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//分配消息队列策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  • 第一 设置消费组
  • 第二 设置消息模式
  • 第三 设置分配消息队列策略
  • 第四 设置消费实例

第五步 设置PullMessage

//pull API 封装
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
//注册一个消息Filter的钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

第六步 设置消息偏移量
  在这里会看到,如果没有得到消息的偏移量,这进行的处理是获取偏移量并直接指定偏移量,如果已经存在了偏移量,则需要根据消息模式来设置不同的消息偏移量,这个在官方的文档中有提到过相关的概念,有兴趣可以找找相关的文档。在广播模式下,偏移量信息是被持久化到本地的Store中的,也就是说不需要关心其他人的读取,只需要关系自己的位置就好了。在集群模式下,消息的读取位置是通过Broker中的位置来决定的。所以说每次读取的时候先要到Broker中去获取偏移量。
在这里插入图片描述

rocketmq源码解析,第七步 消息监听操作
  在完成位置偏移量查找之后,就需要进行偏移量的加载,加载完成之后就需要对消息进行监听操作。会看到这里给出了两种内监听模式,顺序模式、并发模式。对于这两种监听模式,对应的处理服务也是不一样的一个是ConsumeMessageOrderlyService,一个是ConsumeMessageConcurrentlyService,这两个类都实现了一个接口ConsumeMessageService,对于这两种模式,在这里不作为重点来说明,在后续的分享中作为重点来进行说明。
在这里插入图片描述

第八步 启动Consumer消息服务启动
  在完成上面这些操作之后,就需要开始启动。会看到根据不同的消息类型以及消息处理的模式,启动的不同的消息处理类。这个并不是确定的,而是在消息进行监听的时候才根据监听进行确定。这里只是启动了一个消息服务,并没有启动Consumer的实例。还要进行下一步。


//消费者端启动消息服务
this.consumeMessageService.start();

第九步 启动实例
  在启动之前现将之前准备好的所有的东西先注册到指定的group上,然后进行是否注册成功的判断如果注册成功就启动实例进行后续的pull Message操作,如果注册失败,则将监听服务停止,抛出异常。这里还会看到一个日志信息。
在这里插入图片描述
根据代码逻辑,在程序启动成功之后会进行下面这样一段日志的输入,这里确实也输出了

INFO RocketmqClient - the consumer [please_rename_unique_group_name_4] start OK.

MQClientInstance 的start()方法

  在Producer和Consumer方法中最后调用的都是这个方法,在上面分析Producer的时候对这个方法进行了一个简单的分析,这里就详细来看看这个方法。


public void start() throws MQClientException {//同步锁定synchronized (this) {//判断服务状态switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel// 开启请求响应的Channelthis.mQClientAPIImpl.start();// Start various schedule tasks// 启动各种计划任务this.startScheduledTask();// Start pull service// 开始 Pull servicethis.pullMessageService.start();// Start rebalance service// 开启负载均衡服务this.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}

rocketmq 启动?  详细解析内容已经在通过注释的形式写到了代码中,可以通过注释的方式进行查看,下面就从输入日志方式来看看具体进行了什么样的操作

日志分析

启动Producer日志

MQClientInstance 对象的创建


18:35:14,014 INFO RocketmqRemoting - name server address updated. NEW : [localhost:9876] , OLD: null18:35:14,014 INFO RocketmqClient - user specified name server address: localhost:987618:35:14,014 INFO RocketmqClient - Created a new client Instance, InstanceIndex:0, ClientID:169.254.151.216@5386, ClientConfig:ClientConfig [namesrvAddr=localhost:9876, clientIP=169.254.151.216, instanceName=5386, clientCallbackExecutorThreads=16, pollNameServerInterval=30000, heartbeatBrokerInterval=30000, persistConsumerOffsetInterval=5000, unitMode=false, unitName=null, vipChannelEnabled=false, useTLS=false, language=JAVA, namespace=null], ClientVersion:V4_5_2, SerializerType:JSON18:35:14,014 INFO RocketmqClient - Created new MQClientInstance for clientId:[169.254.151.216@5386]

  以上日志表示到MQClientManager 类调用getAndCreateMQClientInstance()方法创建一个MQClientInstance 对象,之前分析MQClientManager逻辑的时候有一个判断

 MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {

  很显然从上面的日志中可以看到并没有跳过该判断,也就是说至少在博主启动的这种单一的场景中这个判断逻辑是被执行了。那么什么时候开始进入到MQClientManager的逻辑中,也就是是说在第一次 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); 这个逻辑的时候就创建了可这个对象,这个方法被org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl类的start()方法调用。

start方法和run方法。尝试开启线程

 18:35:14,014 INFO RocketmqCommon - Try to start service thread:PullMessageService started:false lastThread:null18:35:14,014 INFO RocketmqClient - PullMessageService service started18:35:14,014 INFO RocketmqCommon - Try to start service thread:RebalanceService started:false lastThread:null18:35:14,014 INFO RocketmqClient - RebalanceService service started18:35:14,014 INFO RocketmqClient - the producer [CLIENT_INNER_PRODUCER] start OK. sendMessageWithVIPChannel=false18:35:14,014 INFO RocketmqClient - the client factory [169.254.151.216@5386] start OK18:35:14,014 INFO RocketmqClient - the producer [please_rename_unique_group_name] start OK. sendMessageWithVIPChannel=false18:35:14,014 INFO RocketmqRemoting - new name server is chosen. OLD: null , NEW: localhost:9876. namesrvIndex = 252

this.pullMessageService.start();
  在MQClientInstance 的start()方法中有如下的一个调用。

this.pullMessageService.start();

  上面这个start()方法调用的日志就是第一行数据,源代码如下,会看到获取到了服务的名称、获取到服务是否开启,以及开启的本地线程。分别是PullMessageService、false、null。

log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);

  上面的start()方法进行了下面这样的操作

 stopped = false;this.thread = new Thread(this, getServiceName());this.thread.setDaemon(isDaemon);this.thread.start();

  既然是有了start() 就必须得有run()。方法内容如下

@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}

Consumer日志分析

  结合代码分析逻辑,既然Consumer和Producer调用的同样的类实例,那么他们创建MQClientInstance的逻辑都是一样的。所以从它的整个的日志逻辑与Consumer是大体一致的。

18:36:14,014 INFO RocketmqClient - user specified name server address: localhost:987618:36:14,014 INFO RocketmqRemoting - name server address updated. NEW : [localhost:9876] , OLD: null18:36:14,014 INFO RocketmqClient - Created a new client Instance, InstanceIndex:0, ClientID:169.254.151.216@5389, ClientConfig:ClientConfig [namesrvAddr=localhost:9876, clientIP=169.254.151.216, instanceName=5389, clientCallbackExecutorThreads=16, pollNameServerInterval=30000, heartbeatBrokerInterval=30000, persistConsumerOffsetInterval=5000, unitMode=false, unitName=null, vipChannelEnabled=false, useTLS=false, language=JAVA, namespace=null], ClientVersion:V4_5_2, SerializerType:JSON18:36:14,014 INFO RocketmqClient - Created new MQClientInstance for clientId:[169.254.151.216@5389]18:36:14,014 INFO RocketmqCommon - Try to start service thread:PullMessageService started:false lastThread:null18:36:14,014 INFO RocketmqClient - PullMessageService service started18:36:14,014 INFO RocketmqCommon - Try to start service thread:RebalanceService started:false lastThread:null18:36:14,014 INFO RocketmqClient - RebalanceService service started18:36:14,014 INFO RocketmqClient - the producer [CLIENT_INNER_PRODUCER] start OK. sendMessageWithVIPChannel=false18:36:14,014 INFO RocketmqClient - the client factory [169.254.151.216@5389] start OK18:36:14,014 INFO RocketmqClient - the consumer [please_rename_unique_group_name_4] start OK.18:36:14,014 INFO RocketmqRemoting - new name server is chosen. OLD: null , NEW: localhost:9876. namesrvIndex = 509

总结

  上面内容从两个start()方法入手,到最后他们归结于同一个start()方法,可以说是殊途同归,但是在这个过程中。他们进行了不同的消息封装,以及配置信息的获取。虽然到最后两个start()方法中还遗留了一些问题。但是在后面的分析中都会对其进行一一的解答。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/60406.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息