irpas技术客

ZooKeeper源码分析三之单机启动_菜鸟+1024_zookeeper 单机启动

大大的周 5167

一、解析配置文件

单机启动时,配置文件解析对象是ServerConfig,查看其parse方法:

public void parse(String path) throws ConfigException { //这里是通过集群启动下的解析对象来解析配置文件 QuorumPeerConfig config = new QuorumPeerConfig(); //解析配置文件,先从文件中读取配置信息到Properties对象中,然后赋值到对应的属性上,配置文件的解析相对简单,值得注意一点的是,如果没有配置dataDir路径,将会抛出异常 config.parse(path); //从QuorumPeerConfig解析得到的数据,配置到ServerConfig中 readFrom(config); }

如下就是解析配置文件得到的信息。

public void readFrom(QuorumPeerConfig config) { clientPortAddress = config.getClientPortAddress(); secureClientPortAddress = config.getSecureClientPortAddress(); dataDir = config.getDataDir(); dataLogDir = config.getDataLogDir(); tickTime = config.getTickTime(); maxClientCnxns = config.getMaxClientCnxns(); minSessionTimeout = config.getMinSessionTimeout(); maxSessionTimeout = config.getMaxSessionTimeout(); jvmPauseMonitorToRun = config.isJvmPauseMonitorToRun(); jvmPauseInfoThresholdMs = config.getJvmPauseInfoThresholdMs(); jvmPauseWarnThresholdMs = config.getJvmPauseWarnThresholdMs(); jvmPauseSleepTimeMs = config.getJvmPauseSleepTimeMs(); metricsProviderClassName = config.getMetricsProviderClassName(); metricsProviderConfiguration = config.getMetricsProviderConfiguration(); listenBacklog = config.getClientPortListenBacklog(); initialConfig = config.getInitialConfig(); }

我们重新回到runFromConfig(config)方法(去掉了部分代码)。

public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { FileTxnSnapLog txnLog = null; try { try { //创建服务指标监控对象,具体的使用将在后续分析中说明 metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } //初始化metricsProvider对象 ServerMetrics.metricsProviderInitialized(metricsProvider); //初始化一些权限认证信息,比如ip、digest ProviderRegistry.initialize(); //实例化事务日志文件,这个对象后续再单独分析 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); //Jvm的监视器 JvmPauseMonitor jvmPauseMonitor = null; if (config.jvmPauseMonitorToRun) { jvmPauseMonitor = new JvmPauseMonitor(config); } //创建一个ZooKeeperServer 实例 final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig); //设置服务状态对象 txnLog.setServerStats(zkServer.serverStats()); //实例化CountDownLatch对象,服务器启动会进行阻塞使用 final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch)); //实例化管理后台对象,默认是8080端口,默认请求路径是/commands adminServer = AdminServerFactory.createAdminServer(); //监控server对象信息 adminServer.setZooKeeperServer(zkServer); //启动管理管理后台 adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { //ZooKeeper中是通过ServerCnxnFactory来管理IO连接 cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); //通过ServerCnxnFactory来启动,这里真正启动了ZooKeeper cnxnFactory.startup(zkServer); needStartZKServer = false; } //ZooKeeper中提供了两种通信方式,安全的和非安全的,可以通过secureClientPortAddress配置 if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } //实例化一个容器管理对象 containerManager = new ContainerManager( zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000), Long.getLong("znode.container.maxNeverUsedIntervalMs", 0) ); //启动 containerManager.start(); ZKAuditProvider.addZKStartStopAuditLog(); //启动后会阻塞在这里,直到服务暂停 shutdownLatch.await(); shutdown(); } catch (InterruptedException e) { } }

从上方法可以提取出两个重要的对象ZooKeeperServer(服务实例)和ServerCnxnFactory(管理服务IO),这里我们重点关注的是服务的启动,所以其它信息暂时先忽略掉。此时我们跟着ServerCnxnFactory的startup方法继续分析启动过程。

我们先查看ServerCnxnFactory.createFactory()方法。

public static ServerCnxnFactory createFactory() throws IOException { //从系统属性中获取zookeeper.serverCnxnFactory String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { //默认是NIO实现 serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor() .newInstance(); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e); throw ioe; } }

ServerCnxnFactory的实现类,ZooKeeper提供了两个实现NIOServerCnxnFactory和NettyServerCnxnFactory。所以此时的startup方法会调用到NIOServerCnxnFactory中的startup方法:

public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException { //启动 start(); //把当前的ServerCnxnFactory实例设置到ZooKeeperServer 实例中 setZooKeeperServer(zks); if (startServer) { //初始化数据结构 zks.startdata(); //启动一些监控信息 zks.startup(); } }

我们先看看cnxnFactory.configure()方法,里边做了一些初始化内容。

public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException { //配置安全验证信息,默认是没有配置的,后续在进行分析 configureSaslLogin(); //最大连接数,默认是60 maxClientCnxns = maxcc; //maxCnxns默认是0 initMaxCnxns(); //默认是10秒,可以通过参数zookeeper.nio.sessionlessCnxnTimeout配置 sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); //过期队列 cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); //connection过期线程 expirerThread = new ConnectionExpirerThread(); //获取系统核心数 int numCores = Runtime.getRuntime().availableProcessors(); // Selector的线程数,也就是接收请求的线程数,默认是核心数除以2再开方,可以通过zookeeper.nio.numSelectorThreads参数配置 numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } //工作线程数,默认是核心数*2,可以参数zookeeper.nio.numWorkerThreads配置 numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); //实例化Selector处理线程 for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } listenBacklog = backlog; //这里就是涉及到NIO网络编程了,打开一个ServerSocketChannel实例 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port {}", addr); if (listenBacklog == -1) { //绑定地址 ss.socket().bind(addr); } else { ss.socket().bind(addr, listenBacklog); } //非阻塞模式 ss.configureBlocking(false); //配置接收请求线程 acceptThread = new AcceptThread(ss, addr, selectorThreads); }

接下来我们再看start方法。

public void start() { stopped = false; //实例化工作池对象,我们先不用管这些具体是做什么的 if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } //启动Selector线程 for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // 启动接收请求线程 if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } //启动过期监测线程 if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }

start方法启动了三种线程,SelectorThread用来处理读写请求,AcceptThread线程用来处理连接请求、ExpirerThread处理过期请求的线程,业务逻辑都在run方法。start方法完成之后就是初始化数据结构的调用以及一些监控信息的启动。所以ZooKeeper的单机启动,实际上就是启动以上三种线程。

二、初始化数据库

这部分先介绍,数据库的初始化过程,至于启动后请求的接收和处理过程下一节继续分析,线程启动完成后就会执行数据库的加载过程zks.startdata()。

public void startdata() throws IOException, InterruptedException { //初始化一个数据库,会传递FileTxnSnapLog对象 if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { //加载数据 loadData(); } }

我们先查看ZKDatabase的构造方法,如下所示(去掉了一些多余的代码)。

public ZKDatabase(FileTxnSnapLog snapLog) { //首先创建一个DataTree实例 dataTree = createDataTree(); //session过期时间集合 sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>(); this.snapLog = snapLog; //快照大小因子 snapshotSizeFactor = Double.parseDouble( System.getProperty(SNAPSHOT_SIZE_FACTOR, Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR))); if (snapshotSizeFactor > 1) { snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR; } //提交日志总数 commitLogCount = Integer.parseInt( System.getProperty(COMMIT_LOG_COUNT, Integer.toString(DEFAULT_COMMIT_LOG_COUNT))); }

接下来跟着loadData()方法:

public void loadData() throws IOException, InterruptedException { //是否已经初始化了 if (zkDb.isInitialized()) { //已经初始化完成,设置当前的事务id为当前数据中最新的事务id setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { //否则加载数据库,并且设置最新的事务id setZxid(zkDb.loadDataBase()); } //清除过期session zkDb.getSessions().stream() .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); //建立快照 takeSnapshot(); }

接下来我们跟着zkDb.loadDataBase()方法继续分析:

public long loadDataBase() throws IOException { //获取加载开始时间 long startTime = Time.currentElapsedTime(); //获取到最新的事务id long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); //当前初始化完成 initialized = true; //得到加载时间 long loadTime = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime); //返回事务id return zxid; }

此时数据的加载过程又是交给了FileTxnSnapLog对象,所以我们继续跟踪FileTxnSnapLog对的restore方法走。

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { //获取开始时间 long snapLoadingStartTime = Time.currentElapsedTime(); //反序列化当前存在的文件,如果是第一次启动,返回的是-1,此处暂时不分析,先接着往下看 long deserializeResult = snapLog.deserialize(dt, sessions); ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime); //先实例化一个事务日志文件对象,如果查看FileTxnSnapLog的构造方法可知,当创建数据目录的时候,会在data目录下创建version-*的子目录 FileTxnLog txnLog = new FileTxnLog(dataDir); boolean trustEmptyDB; File initFile = new File(dataDir.getParent(), "initialize"); if (Files.deleteIfExists(initFile.toPath())) { LOG.info("Initialize file found, an empty database will not block voting participation"); trustEmptyDB = true; } else { //表示当前数据库还没初始化 trustEmptyDB = autoCreateDB; } //这里先不管 RestoreFinalizer finalizer = () -> { //查看是否需要从日志中恢复数据 long highestZxid = fastForwardFromEdits(dt, sessions, listener); DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot(); return highestZxid; }; //如果当前序列化结果为-1,初始值是等于-1 if (-1L == deserializeResult) { //此时为-1 if (txnLog.getLastLoggedZxid() != -1) { if (!trustEmptySnapshot) { throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!"); } else { LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING); return finalizer.run(); } } if (trustEmptyDB) { //调用save方法 save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false); //返回0 return 0L; } else { return -1L; } } //如果已经初始化了,就调用这个方法返回最新的事务id return finalizer.run(); }

此时我们需要分析一下save方法:

public void save(DataTree dataTree,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap) throws IOException { //初始化为0 long lastZxid = dataTree.lastProcessedZxid; //实例化一个快照文件,文件命名方式是,文件名是snapshot,文件的后缀是最新事务id的十六进制+自定义的zookeeper.snapshot.compression.method参数名,默认为空字符 //所以此时的目录结构应该是/data/version-2/snapshot.0(版本号为2时) File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { //序列化当前文件 snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); } catch (IOException e) { throw e; } }

继续serialize方法:

public synchronized void serialize(DataTree dt,Map<Long, Integer> sessions,File snapShot, boolean fsync) throws IOException { if (!close) { //得到一个输出流 try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) { OutputArchive oa = BinaryOutputArchive.getArchive(snapOS); //文件头,快照魔数ZKSN,版本号、数据库id=-1 FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); //序列化,实际调用的是FileHeader的序列化方法 serialize(dt, sessions, oa, header); //此时完成了序列化过程 SnapStream.sealStream(snapOS, oa); if (dt.serializeZxidDigest(oa)) { SnapStream.sealStream(snapOS, oa); } //保存最新快照信息,此时数据目录下会生成一个version-2/snapshot.0文件 lastSnapshotInfo = new SnapshotInfo(Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),snapShot.lastModified() / 1000); } } else { throw new IOException("FileSnap has already been closed"); } }

我们先查看FileHeader 的序列化方法:

public void serialize(OutputArchive a_, String tag) throws java.io.IOException { //这个是个空实现 a_.startRecord(this,tag); //把当前的魔数序列化到文件中 a_.writeInt(magic,"magic"); //把当前版本号序列化到文件中 a_.writeInt(version,"version"); //把当前的数据库id序列化到文件中 a_.writeLong(dbid,"dbid"); //空实现 a_.endRecord(this,tag); }

这里就得到了一个文件头,包含魔数、版本号、数据库id标识,文件头序列化后会继续调用快照的序列化方法。SerializeUtils.serializeSnapshot(dt, oa, sessions)

public static void serializeSnapshot(DataTree dt, OutputArchive oa, Map<Long, Integer> sessions) throws IOException { HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions); //写入过期session到文件中 oa.writeInt(sessSnap.size(), "count"); for (Entry<Long, Integer> entry : sessSnap.entrySet()) { oa.writeLong(entry.getKey().longValue(), "id"); oa.writeInt(entry.getValue().intValue(), "timeout"); } //调用DataTree 的序列化方法 dt.serialize(oa, "tree"); }

dt.serialize(oa, “tree”):

public void serialize(OutputArchive oa, String tag) throws IOException { //序列化权限控制 serializeAcls(oa); //序列化节点 serializeNodes(oa); }

serializeAcls(oa):

public void serializeAcls(OutputArchive oa) throws IOException { aclCache.serialize(oa); }

aclCache是ReferenceCountedACLCache实例,我们继续查看其序列化方法:

public void serialize(OutputArchive oa) throws IOException { Map<Long, List<ACL>> clonedLongKeyMap; synchronized (this) { //默认的权限控制是world:anyone clonedLongKeyMap = new HashMap<>(longKeyMap); } //写入权限控制的总数 oa.writeInt(clonedLongKeyMap.size(), "map"); for (Map.Entry<Long, List<ACL>> val : clonedLongKeyMap.entrySet()) { oa.writeLong(val.getKey(), "long"); List<ACL> aclList = val.getValue(); //开始陷入权限控制器 oa.startVector(aclList, "acls"); for (ACL acl : aclList) { //写入权限控制器,也就是把world和anyone写入文件中 acl.serialize(oa, "acl"); } oa.endVector(aclList, "acls"); } }

接下来就是写入节点值:

public void serializeNodes(OutputArchive oa) throws IOException { serializeNode(oa, new StringBuilder()); if (root != null) { //写入根节点 oa.writeString("/", "path"); } }

ZooKeeper启动时候会有默认的五个节点,节点路径分别是""、/zookeeper/quota、/zookeeper/config、/zookeeper、/,这五类节点,序列化节点就是把这些节点全部保存到文件中,保存方式是节点路径、节点值、节点权限、节点状态(stat)。

以上都是基于第一次启动时,数据库的加载过程,此时会在data目录下生成一个snapshot.0文件,里边保存了魔数、版本号、数据库id、默认权限、以及初始化节点等信息。如果第二次再次启动时,我们再次回到deserialize方法:

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { //这个方法会找到,data目录下所有snapshot.*的文件,并且是通过后缀排序之后的文件(降序),并且会验证当前文件是否是合法文件,验证方式是,文件内容是否大于10个字节(包括header和结尾/字符),以及文件内容是否是以"/"结尾,且前一个int类型是否是1("/"的长度) List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; long snapZxid = -1; boolean foundValid = false; for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot {}", snap); snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) { InputArchive ia = BinaryInputArchive.getArchive(snapIS); //反序列化过程,也就是把当前文件中的数据转换成DataTree数据结构 deserialize(dt, sessions, ia); //去除一些不必要的数据 SnapStream.checkSealIntegrity(snapIS, ia); if (dt.deserializeZxidDigest(ia, snapZxid)) { SnapStream.checkSealIntegrity(snapIS, ia); } //成功读取最新的文件,退出读取 foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file {}", snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } //设置最新的事务id dt.lastProcessedZxid = snapZxid; lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000); if (dt.getDigestFromLoadedSnapshot() != null) { dt.compareSnapshotDigests(dt.lastProcessedZxid); } //返回最新的事务id return dt.lastProcessedZxid; }

到此,数据库的加载过程完成,如果是首次启动就会在data目录下创建一个snapshot.0的文件,把默认节点等信息写入到这个文件中,如果是重启,会从data目录下找到最新的snapshot.*文件,根据文件后缀名降序排序,得到最新的文件。然后把文件中的内容转换成DataTree数据结构,并且会跟日志文件中的数据进行对比,是否需要从日志文件中恢复一部分数据,后续将会重点分析一下ZKDatabase的数据结构。

三、总结

以上简单分析了,ZooKeeper的单机启动过程,先读取配置文件(zoo.cfg),实例化一个ServerCnxnFactory,通过这个对象来实现ZooKeeper的启动,会启动相应的线程来接收请求,启动之后就会进行数据库的加载,加载完成整个服务就算启动成功。下一节讲针对单机启动下,是怎样处理客户端请求。

以上,有任何不对的地方,请留言指正,敬请谅解。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #zookeeper #单机启动 #PUBLIC #void