irpas技术客

zookeeper源码分析之集群模式服务端_enterpc

未知 3261

集群模式下启动所有的 ZK 节点启动??都是 QuorumPeerMain 类的 main ?法。 main ?法加载配置?件以后,最终会调?到 QuorumPeer 的 start ?法,来看下:

?

?

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { //创建ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); //初始化ServerCnxnFactory,配置客户端连接端口 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { //配置安全连接端口 secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } //--------------------初始化当前ZK服务节点的配置---------------- // 设置数据和快照操作 数据管理器 quorumPeer = getQuorumPeer(); //创建FileTxnSnapLog quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); //选举类型 quorumPeer.setElectionType(config.getElectionAlg()); //server ID quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); // 设置ZK的节点数据库:管理zookeeper的所有会话记录以及DataTree和事务日志的存储 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } //初始化zk数据库 quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if(quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); //--------------初始化当前zk服务节点的配置 quorumPeer.initialize(); //重点 quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }

?

?* 选举前的准备工作

?

??* 开启选举流程

重新回到QuorumPeer类的start方法,执行super.start()

?QuorumPeer是一个线程类,我们要找到重写其父类的run方法来查看选举执行逻辑

?

@Override public void run() { updateThreadName(); LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for(QuorumServer s: getView().values()){ ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(s); try { MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); jmxRemotePeerBean.put(s.id, rBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { /* * Main loop */ while (running) { //根据当前节点的状态执行不同流程 switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } //寻找Leader节点 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); // 当前节点启动模式为Observer setObserver(makeObserver(logFactory)); //与Leader节点进行数据同步 observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); updateServerState(); } break; case FOLLOWING: try { LOG.info("FOLLOWING"); //当前节点启动模式为Follower setFollower(makeFollower(logFactory)); //与Leader节点进行数据同步 follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { //当前节点启动模式为Leader setLeader(makeLeader(logFactory)); //发送自己成为Leader的通知 leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } start_fle = Time.currentElapsedTime(); } } finally { LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(jmxQuorumBean); instance.unregister(jmxLocalPeerBean); for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { instance.unregister(remotePeerBean); } jmxQuorumBean = null; jmxLocalPeerBean = null; jmxRemotePeerBean = null; } }

经过上?的发起投票,统计投票信息最终每个节点都会确认??的身份,节点根据类型的不同会执?以下逻辑: 1. 如果是 Leader 节点,?先会想其他节点发送?条 NEWLEADER 信息,确认??的身份,等到各个节点的 ACK 消息以后开始正式对外提供服务,同时开启新的监听器,处理新节点加?的逻辑。 2. 如果是 Follower 节点,?先向 Leader 节点发送?条 FOLLOWERINFO 信息,告诉 Leader 节点??已处理的事务的最? Zxid ,然后 Leader 节点会根据??的最? Zxid 与 Follower 节点进?同步,如果 Follower 节点落后的不多则会收到 Leader 的 DIFF 信息通过内存同步,如果 Follower 节点落后的很 多则会收到 SNAP 通过快照同步,如果 Follower 节点的 Zxid ?于 Leader 节点则会收到 TRUNC 信息忽 略多余的事务。 3. 如果是 Observer 节点,则与 Follower 节点相同

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #void #config