// 1. building the component stack: LifecycleComponent server; try { server = buildBookieServer(new BookieConfiguration(conf)); } catch (Exception e) { log.error("Failed to build bookie server", e); return ExitCode.SERVER_EXCEPTION; }
// 2. start the server try { ComponentStarter.startComponent(server).get(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // the server is interrupted log.info("Bookie server is interrupted. Exiting ..."); } catch (ExecutionException ee) { log.error("Error in bookie shutdown", ee.getCause()); return ExitCode.SERVER_EXCEPTION; } return ExitCode.OK; }
注释很清晰,可以看到启动bookie服务主要做这三步:
解析system property
构建bookie服务所需的组件
启动各个组件
解析system property
1 2
BasicParser parser = new BasicParser(); CommandLine cmdLine = parser.parse(BK_OPTS, args);
system property是java应用程序自身指定的变量,通常我们可以在启动应用的时候指定的,格式是:-DsystemPropertyKey=systemPropertyValue(楼主在本地启动bookie服务在idea设置的Program rguments:–conf /Volumes/longmao/bookkeeper-confg/b1.conf),解析system property使用了apache开源工具commons-cli(自己写应用或框架可以借鉴下其写法,用来加载应用自定义的配置)。
if (conf.getServerConf().isLocalScrubEnabled()) { serverBuilder.addComponent( new ScrubberService( rootStatsLogger.scope(ScrubberStats.SCOPE), conf, bookieService.getServer().getBookie().getLedgerStorage())); }
// 3. build auto recovery if (conf.getServerConf().isAutoRecoveryDaemonEnabled()) { AutoRecoveryService autoRecoveryService = new AutoRecoveryService(conf, rootStatsLogger.scope(REPLICATION_SCOPE));
}); // always start without background thread first, and if necessary, // it will be set up later, after db has been fully started, // otherwise background thread would compete for store lock // with maps opening procedure builder.autoCommitDisabled(); } store.open(db, builder, encrypted); db.setStore(store); return store; } /** * Open the store for this database. * * @param db the database * @param builder the builder * @param encrypted whether the store is encrypted */ voidopen(Database db, MVStore.Builder builder, boolean encrypted){ this.encrypted = encrypted; try { //初始化MvStore this.mvStore = builder.open(); FileStore fs = mvStore.getFileStore(); if (fs != null) { this.fileName = fs.getFileName(); } if (!db.getSettings().reuseSpace) { mvStore.setReuseSpace(false); } mvStore.setVersionsToKeep(0); //初始化 TransactionStore this.transactionStore = new TransactionStore(mvStore, new ValueDataType(db, null), db.getLockTimeout()); } catch (IllegalStateException e) { throw convertIllegalStateException(e); } }
int pgSplitSize = 48; // for "mem:" case it is # of keys CacheLongKeyLIRS.Config cc = null; if (this.fileStore != null) { int mb = DataUtils.getConfigParam(config, "cacheSize", 16); if (mb > 0) { cc = new CacheLongKeyLIRS.Config(); cc.maxMemory = mb * 1024L * 1024L; Object o = config.get("cacheConcurrency"); if (o != null) { cc.segmentCount = (Integer)o; } } pgSplitSize = 16 * 1024; } if (cc != null) { cache = new CacheLongKeyLIRS<>(cc); } else { cache = null; }
pgSplitSize = DataUtils.getConfigParam(config, "pageSplitSize", pgSplitSize); // Make sure pages will fit into cache if (cache != null && pgSplitSize > cache.getMaxItemSize()) { pgSplitSize = (int)cache.getMaxItemSize(); } pageSplitSize = pgSplitSize; keysPerPage = DataUtils.getConfigParam(config, "keysPerPage", 48); backgroundExceptionHandler = (UncaughtExceptionHandler)config.get("backgroundExceptionHandler"); meta = new MVMap<>(this); if (this.fileStore != null) { retentionTime = this.fileStore.getDefaultRetentionTime(); // 19 KB memory is about 1 KB storage int kb = Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024; kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", kb); autoCommitMemory = kb * 1024; autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 90); char[] encryptionKey = (char[]) config.get("encryptionKey"); try { if (!fileStoreIsProvided) { boolean readOnly = config.containsKey("readOnly"); //创建数据库文件 *.db this.fileStore.open(fileName, readOnly, encryptionKey); } //往 *.db 写数据 if (this.fileStore.size() == 0) { creationTime = getTimeAbsolute(); lastCommitTime = creationTime; storeHeader.put(HDR_H, 2); storeHeader.put(HDR_BLOCK_SIZE, BLOCK_SIZE); storeHeader.put(HDR_FORMAT, FORMAT_WRITE); storeHeader.put(HDR_CREATED, creationTime); writeStoreHeader(); } else { // there is no need to lock store here, since it is not opened yet, // just to make some assertions happy, when they ensure single-threaded access storeLock.lock(); try { readStoreHeader(); } finally { storeLock.unlock(); } } } catch (IllegalStateException e) { panic(e); } finally { if (encryptionKey != null) { Arrays.fill(encryptionKey, (char) 0); } } lastCommitTime = getTimeSinceCreation();
scrubMetaMap();
// setAutoCommitDelay starts the thread, but only if // the parameter is different from the old value int delay = DataUtils.getConfigParam(config, "autoCommitDelay", 1000); setAutoCommitDelay(delay); } else { autoCommitMemory = 0; autoCompactFillRate = 0; } }