Apache BookKeeper 是一个可以方便扩展,高可用,低延迟的存储系统。BookKeeper 专门为 append-only 的工作模式提供了优化,在以下的应用场景中非常适用:
WAL (Write-Ahead-Logging), 例如 HDFS 的 NameNode
消息存储系统,例如 Apache Pulsar
Offset/Cursor 存储系统,例如在 Apache Pulsar 中用来存储消息消费位置
Object/Blob Store 对象存储系统,例如存储状态机的 snapshots
初始化BookieServer
1 | public BookieService(BookieConfiguration conf, |
BookieServer共有4大组件要初始化
- StatsLogger:
- Bookie:bookie实例
- BookieNettyServer:其实就是一个netty服务端
- DeathWatcher:一个线程,观察bookie和netty是否还存活
初始化Bookie
public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
throws IOException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.statsLogger = statsLogger;
this.conf = conf;
// 从配置文件中获取journal 目录 list,然后在在每个目录下创建一个current目录
this.journalDirectories = Lists.newArrayList();
for (File journalDirectory : conf.getJournalDirs()) {
this.journalDirectories.add(getCurrentDirectory(journalDirectory));
}
/**
* 初始化DiskChecker,有两个参数 diskUsageThreshold 和 diskUsageWarnThreshold
* diskUsageThreshold表示磁盘的最大使用率,默认是0.95,目录列表中的所有目录都超过限制之后
* 如果bookie配置可以以readonly模式运行,就会转化为readonly状态,否则会停止;
* diskUsageWarnThreshold 表示磁盘使用的告警阈值,默认是0.90,超过这个值会抛出
* DiskWarnThresholdException,并且会触发gc,当使用率低于这个值时,目录重新变为可写状态
**/
DiskChecker diskChecker = createDiskChecker(conf);
//为ledger和index创建LedgerDirsManager,用来管理ledger和index的目录列表
this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
this.ledgerDirsManager);
this.allocator = allocator;
// instantiate zookeeper client to initialize ledger manager
//初始化zk 客户端
this.metadataDriver = instantiateMetadataDriver(conf);
checkEnvironment(this.metadataDriver);
try {
if (this.metadataDriver != null) {
// current the registration manager is zookeeper only
// 初始化ledgerManagerFactory,用于生成ledgerManager
ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
// ledgerManager负责和zk等元数据存储交互,用来管理ledger的元数据信息
ledgerManager = ledgerManagerFactory.newLedgerManager();
} else {
ledgerManagerFactory = null;
ledgerManager = null;
}
} catch (MetadataException e) {
throw new MetadataStoreException("Failed to initialize ledger manager", e);
}
// 初始化状态管理器
stateManager = initializeStateManager();
// register shutdown handler using trigger mode
stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
// Initialise dirsMonitor. This would look through all the
// configured directories. When disk errors or all the ledger
// directories are full, would throws exception and fail bookie startup.
//LedgerDirsMonitor, 监控所有配置的目录,如果发现磁盘错误或者所有的leger 目录都满,就抛出异常,
//bookie启动失败
List<LedgerDirsManager> dirsManagers = new ArrayList<>();
dirsManagers.add(ledgerDirsManager);
if (indexDirsManager != ledgerDirsManager) {
dirsManagers.add(indexDirsManager);
}
this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker, dirsManagers);
try {
this.dirsMonitor.init();
} catch (NoWritableLedgerDirException nle) {
// start in read-only mode if no writable dirs and read-only allowed
if (!conf.isReadOnlyModeEnabled()) {
throw nle;
} else {
this.stateManager.transitionToReadOnlyMode();
}
}
// instantiate the journals 初始化journal
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
journals.add(new Journal(i, journalDirectories.get(i),
conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
}
this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
CheckpointSource checkpointSource = new CheckpointSourceList(journals);
//初始化ledgerStore,默认是一个 SortedLedgerStorage
ledgerStorage = buildLedgerStorage(conf);
boolean isDbLedgerStorage = ledgerStorage instanceof DbLedgerStorage;
/*
* with this change https://github.com/apache/bookkeeper/pull/677,
* LedgerStorage drives the checkpoint logic.
*
* <p>There are two exceptions:
*
* 1) with multiple entry logs, checkpoint logic based on a entry log is
* not possible, hence it needs to be timebased recurring thing and
* it is driven by SyncThread. SyncThread.start does that and it is
* started in Bookie.start method.
*
* 2) DbLedgerStorage
*/
/**
* 一般都是由 LedgerStorage来驱动checkpoint 逻辑,但是有两个例外:
* 1. 有多个entry logs,checkpoint逻辑不能依赖于一个entry log,应该是一个基于时间的循环,有SyncThread驱动
* 2. DbLegerStorage
*/
if (entryLogPerLedgerEnabled || isDbLedgerStorage) {
syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
public void startCheckpoint(Checkpoint checkpoint) {
/*
* in the case of entryLogPerLedgerEnabled, LedgerStorage
* dont drive checkpoint logic, but instead it is done
* periodically by SyncThread. So startCheckpoint which
* will be called by LedgerStorage will be no-op.
*/
}
public void start() {
executor.scheduleAtFixedRate(() -> {
doCheckpoint(checkpointSource.newCheckpoint());
}, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
}
};
} else {
syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
}
//初始化LedgerStorage
ledgerStorage.initialize(
conf,
ledgerManager,
ledgerDirsManager,
indexDirsManager,
stateManager,
checkpointSource,
syncThread,
statsLogger,
allocator);
/**
* HandleFactoryImpl 用来获取 handle,这里的 handle 是 LedgerDescriptor,是 ledger 的实现
* 主要负责向 ledger addEntry 和或者从ledger readeEntry
*/
handles = new HandleFactoryImpl(ledgerStorage);
// Expose Stats
this.bookieStats = new BookieStats(statsLogger);
}