Apache Bookkeeper BookieService服务启动

Apache BookKeeper 是一个可以方便扩展,高可用,低延迟的存储系统。BookKeeper 专门为 append-only 的工作模式提供了优化,在以下的应用场景中非常适用:

  • WAL (Write-Ahead-Logging), 例如 HDFS 的 NameNode

  • 消息存储系统,例如 Apache Pulsar

  • Offset/Cursor 存储系统,例如在 Apache Pulsar 中用来存储消息消费位置

  • Object/Blob Store 对象存储系统,例如存储状态机的 snapshots

avatar

初始化BookieServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
	public BookieService(BookieConfiguration conf,
StatsLogger statsLogger)

throws Exception {

super(NAME, conf, statsLogger);
this.server = new BookieServer(conf.getServerConf(), statsLogger);
}


//BookieServer.java

public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException,
BookieException, UnavailableException, CompatibilityException, SecurityException {

this.conf = conf;
//校验用户权限
validateUser(conf);
String configAsString;
try {
configAsString = conf.asJson();
LOG.info(configAsString);
} catch (ParseJsonException pe) {
LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
}

//构造内存分配器
ByteBufAllocator allocator = getAllocator(conf);
this.statsLogger = statsLogger;
//构造NettyServer
this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
try {
//构造Bookie实例
this.bookie = newBookie(conf, allocator);
} catch (IOException | KeeperException | InterruptedException | BookieException e) {
// interrupted on constructing a bookie
this.nettyServer.shutdown();
throw e;
}
final SecurityHandlerFactory shFactory;

shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
//指定NettyServer的RequestProcessor
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE), shFactory, bookie.getAllocator());
this.nettyServer.setRequestProcessor(this.requestProcessor);
}

BookieServer共有4大组件要初始化

  • StatsLogger:
  • Bookie:bookie实例
  • BookieNettyServer:其实就是一个netty服务端
  • DeathWatcher:一个线程,观察bookie和netty是否还存活

初始化Bookie

public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
        throws IOException, InterruptedException, BookieException {
    super("Bookie-" + conf.getBookiePort());
    this.statsLogger = statsLogger;
    this.conf = conf;
    // 从配置文件中获取journal 目录 list,然后在在每个目录下创建一个current目录
    this.journalDirectories = Lists.newArrayList();
    for (File journalDirectory : conf.getJournalDirs()) {
        this.journalDirectories.add(getCurrentDirectory(journalDirectory));
    }
    /**
     *  初始化DiskChecker,有两个参数 diskUsageThreshold 和 diskUsageWarnThreshold
     *  diskUsageThreshold表示磁盘的最大使用率,默认是0.95,目录列表中的所有目录都超过限制之后
     *  如果bookie配置可以以readonly模式运行,就会转化为readonly状态,否则会停止;
     *  diskUsageWarnThreshold 表示磁盘使用的告警阈值,默认是0.90,超过这个值会抛出
     *  DiskWarnThresholdException,并且会触发gc,当使用率低于这个值时,目录重新变为可写状态
     **/
    DiskChecker diskChecker = createDiskChecker(conf);
    //为ledger和index创建LedgerDirsManager,用来管理ledger和index的目录列表
    this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
    this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
                                                   this.ledgerDirsManager);
    this.allocator = allocator;

    // instantiate zookeeper client to initialize ledger manager
    //初始化zk 客户端
    this.metadataDriver = instantiateMetadataDriver(conf);
    checkEnvironment(this.metadataDriver);
    try {
        if (this.metadataDriver != null) {
            // current the registration manager is zookeeper only
            // 初始化ledgerManagerFactory,用于生成ledgerManager
            ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
            LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
            // ledgerManager负责和zk等元数据存储交互,用来管理ledger的元数据信息
            ledgerManager = ledgerManagerFactory.newLedgerManager();
        } else {
            ledgerManagerFactory = null;
            ledgerManager = null;
        }
    } catch (MetadataException e) {
        throw new MetadataStoreException("Failed to initialize ledger manager", e);
    }
    // 初始化状态管理器
    stateManager = initializeStateManager();
    // register shutdown handler using trigger mode
    stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
    // Initialise dirsMonitor. This would look through all the
    // configured directories. When disk errors or all the ledger
    // directories are full, would throws exception and fail bookie startup.

    //LedgerDirsMonitor, 监控所有配置的目录,如果发现磁盘错误或者所有的leger 目录都满,就抛出异常,
    //bookie启动失败
    List<LedgerDirsManager> dirsManagers = new ArrayList<>();
    dirsManagers.add(ledgerDirsManager);
    if (indexDirsManager != ledgerDirsManager) {
        dirsManagers.add(indexDirsManager);
    }
    this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker, dirsManagers);
    try {
        this.dirsMonitor.init();
    } catch (NoWritableLedgerDirException nle) {
        // start in read-only mode if no writable dirs and read-only allowed
        if (!conf.isReadOnlyModeEnabled()) {
            throw nle;
        } else {
            this.stateManager.transitionToReadOnlyMode();
        }
    }

    // instantiate the journals 初始化journal
    journals = Lists.newArrayList();
    for (int i = 0; i < journalDirectories.size(); i++) {
        journals.add(new Journal(i, journalDirectories.get(i),
                conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
    }

    this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
    CheckpointSource checkpointSource = new CheckpointSourceList(journals);

    //初始化ledgerStore,默认是一个 SortedLedgerStorage
    ledgerStorage = buildLedgerStorage(conf);

    boolean isDbLedgerStorage = ledgerStorage instanceof DbLedgerStorage;

    /*
     * with this change https://github.com/apache/bookkeeper/pull/677,
     * LedgerStorage drives the checkpoint logic.
     *
     * <p>There are two exceptions:
     *
     * 1) with multiple entry logs, checkpoint logic based on a entry log is
     *    not possible, hence it needs to be timebased recurring thing and
     *    it is driven by SyncThread. SyncThread.start does that and it is
     *    started in Bookie.start method.
     *
     * 2) DbLedgerStorage
     */

    /**
     * 一般都是由 LedgerStorage来驱动checkpoint 逻辑,但是有两个例外:
     * 1. 有多个entry logs,checkpoint逻辑不能依赖于一个entry log,应该是一个基于时间的循环,有SyncThread驱动
     * 2. DbLegerStorage
     */
    if (entryLogPerLedgerEnabled || isDbLedgerStorage) {
        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
            @Override
            public void startCheckpoint(Checkpoint checkpoint) {
                /*
                 * in the case of entryLogPerLedgerEnabled, LedgerStorage
                 * dont drive checkpoint logic, but instead it is done
                 * periodically by SyncThread. So startCheckpoint which
                 * will be called by LedgerStorage will be no-op.
                 */
            }

            @Override
            public void start() {
                executor.scheduleAtFixedRate(() -> {
                    doCheckpoint(checkpointSource.newCheckpoint());
                }, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
            }
        };
    } else {
        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
    }

    //初始化LedgerStorage
    ledgerStorage.initialize(
        conf,
        ledgerManager,
        ledgerDirsManager,
        indexDirsManager,
        stateManager,
        checkpointSource,
        syncThread,
        statsLogger,
        allocator);


    /**
     * HandleFactoryImpl 用来获取 handle,这里的 handle 是 LedgerDescriptor,是 ledger 的实现
     * 主要负责向 ledger addEntry 和或者从ledger readeEntry
     */
    handles = new HandleFactoryImpl(ledgerStorage);

    // Expose Stats
    this.bookieStats = new BookieStats(statsLogger);
}

参考: https://www.jianshu.com/p/776028224419