龙猫云消息推送系统

系统架构

项目结构

  • mpush:开源的实时消息推送系统,基于该项目改造了其中消息推送流程,使用pulsar订阅推送的消息,作为一个broker
  • push-admin:使用spring-boot搭建的消息推送管理后台
  • alloc:是针对client提供的一个轻量级的负载均衡服务,每次客户端在链接broker之前都要调用下该服务
  • mpush-android:android客户端

使用的开源项目

  • netty
  • mpush
  • pulsar:存储与计算分离的新一代消息中间件
  • herdb:HerdDB 一个JVM-embeddable的分布式数据库,内嵌在broker里使用

功能演示

  • 消息推送管理后台

  • Android客户端

未来计划

  • 完成应用管理功能:用户可以创建多个应用,给应用分配appKey
  • 数据统计:接入新设备统计、消息推送记录、消息到达率统计、消息点击率统计
  • 新建龙猫云推送管理平台:应用计费统计等(计划是一个推送云平台产品,功能待定~~)
  • 推送sdk:建设统一sdk,拿分配到的appKey接入龙猫云推送平台

Apache Bookkeeper BookieService服务启动

Apache BookKeeper 是一个可以方便扩展,高可用,低延迟的存储系统。BookKeeper 专门为 append-only 的工作模式提供了优化,在以下的应用场景中非常适用:

  • WAL (Write-Ahead-Logging), 例如 HDFS 的 NameNode

  • 消息存储系统,例如 Apache Pulsar

  • Offset/Cursor 存储系统,例如在 Apache Pulsar 中用来存储消息消费位置

  • Object/Blob Store 对象存储系统,例如存储状态机的 snapshots

avatar

初始化BookieServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
	public BookieService(BookieConfiguration conf,
StatsLogger statsLogger)

throws Exception {

super(NAME, conf, statsLogger);
this.server = new BookieServer(conf.getServerConf(), statsLogger);
}


//BookieServer.java

public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException,
BookieException, UnavailableException, CompatibilityException, SecurityException {

this.conf = conf;
//校验用户权限
validateUser(conf);
String configAsString;
try {
configAsString = conf.asJson();
LOG.info(configAsString);
} catch (ParseJsonException pe) {
LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
}

//构造内存分配器
ByteBufAllocator allocator = getAllocator(conf);
this.statsLogger = statsLogger;
//构造NettyServer
this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
try {
//构造Bookie实例
this.bookie = newBookie(conf, allocator);
} catch (IOException | KeeperException | InterruptedException | BookieException e) {
// interrupted on constructing a bookie
this.nettyServer.shutdown();
throw e;
}
final SecurityHandlerFactory shFactory;

shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
//指定NettyServer的RequestProcessor
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE), shFactory, bookie.getAllocator());
this.nettyServer.setRequestProcessor(this.requestProcessor);
}

BookieServer共有4大组件要初始化

  • StatsLogger:
  • Bookie:bookie实例
  • BookieNettyServer:其实就是一个netty服务端
  • DeathWatcher:一个线程,观察bookie和netty是否还存活

初始化Bookie

public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
        throws IOException, InterruptedException, BookieException {
    super("Bookie-" + conf.getBookiePort());
    this.statsLogger = statsLogger;
    this.conf = conf;
    // 从配置文件中获取journal 目录 list,然后在在每个目录下创建一个current目录
    this.journalDirectories = Lists.newArrayList();
    for (File journalDirectory : conf.getJournalDirs()) {
        this.journalDirectories.add(getCurrentDirectory(journalDirectory));
    }
    /**
     *  初始化DiskChecker,有两个参数 diskUsageThreshold 和 diskUsageWarnThreshold
     *  diskUsageThreshold表示磁盘的最大使用率,默认是0.95,目录列表中的所有目录都超过限制之后
     *  如果bookie配置可以以readonly模式运行,就会转化为readonly状态,否则会停止;
     *  diskUsageWarnThreshold 表示磁盘使用的告警阈值,默认是0.90,超过这个值会抛出
     *  DiskWarnThresholdException,并且会触发gc,当使用率低于这个值时,目录重新变为可写状态
     **/
    DiskChecker diskChecker = createDiskChecker(conf);
    //为ledger和index创建LedgerDirsManager,用来管理ledger和index的目录列表
    this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
    this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
                                                   this.ledgerDirsManager);
    this.allocator = allocator;

    // instantiate zookeeper client to initialize ledger manager
    //初始化zk 客户端
    this.metadataDriver = instantiateMetadataDriver(conf);
    checkEnvironment(this.metadataDriver);
    try {
        if (this.metadataDriver != null) {
            // current the registration manager is zookeeper only
            // 初始化ledgerManagerFactory,用于生成ledgerManager
            ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
            LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
            // ledgerManager负责和zk等元数据存储交互,用来管理ledger的元数据信息
            ledgerManager = ledgerManagerFactory.newLedgerManager();
        } else {
            ledgerManagerFactory = null;
            ledgerManager = null;
        }
    } catch (MetadataException e) {
        throw new MetadataStoreException("Failed to initialize ledger manager", e);
    }
    // 初始化状态管理器
    stateManager = initializeStateManager();
    // register shutdown handler using trigger mode
    stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
    // Initialise dirsMonitor. This would look through all the
    // configured directories. When disk errors or all the ledger
    // directories are full, would throws exception and fail bookie startup.

    //LedgerDirsMonitor, 监控所有配置的目录,如果发现磁盘错误或者所有的leger 目录都满,就抛出异常,
    //bookie启动失败
    List<LedgerDirsManager> dirsManagers = new ArrayList<>();
    dirsManagers.add(ledgerDirsManager);
    if (indexDirsManager != ledgerDirsManager) {
        dirsManagers.add(indexDirsManager);
    }
    this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker, dirsManagers);
    try {
        this.dirsMonitor.init();
    } catch (NoWritableLedgerDirException nle) {
        // start in read-only mode if no writable dirs and read-only allowed
        if (!conf.isReadOnlyModeEnabled()) {
            throw nle;
        } else {
            this.stateManager.transitionToReadOnlyMode();
        }
    }

    // instantiate the journals 初始化journal
    journals = Lists.newArrayList();
    for (int i = 0; i < journalDirectories.size(); i++) {
        journals.add(new Journal(i, journalDirectories.get(i),
                conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
    }

    this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
    CheckpointSource checkpointSource = new CheckpointSourceList(journals);

    //初始化ledgerStore,默认是一个 SortedLedgerStorage
    ledgerStorage = buildLedgerStorage(conf);

    boolean isDbLedgerStorage = ledgerStorage instanceof DbLedgerStorage;

    /*
     * with this change https://github.com/apache/bookkeeper/pull/677,
     * LedgerStorage drives the checkpoint logic.
     *
     * <p>There are two exceptions:
     *
     * 1) with multiple entry logs, checkpoint logic based on a entry log is
     *    not possible, hence it needs to be timebased recurring thing and
     *    it is driven by SyncThread. SyncThread.start does that and it is
     *    started in Bookie.start method.
     *
     * 2) DbLedgerStorage
     */

    /**
     * 一般都是由 LedgerStorage来驱动checkpoint 逻辑,但是有两个例外:
     * 1. 有多个entry logs,checkpoint逻辑不能依赖于一个entry log,应该是一个基于时间的循环,有SyncThread驱动
     * 2. DbLegerStorage
     */
    if (entryLogPerLedgerEnabled || isDbLedgerStorage) {
        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
            @Override
            public void startCheckpoint(Checkpoint checkpoint) {
                /*
                 * in the case of entryLogPerLedgerEnabled, LedgerStorage
                 * dont drive checkpoint logic, but instead it is done
                 * periodically by SyncThread. So startCheckpoint which
                 * will be called by LedgerStorage will be no-op.
                 */
            }

            @Override
            public void start() {
                executor.scheduleAtFixedRate(() -> {
                    doCheckpoint(checkpointSource.newCheckpoint());
                }, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
            }
        };
    } else {
        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
    }

    //初始化LedgerStorage
    ledgerStorage.initialize(
        conf,
        ledgerManager,
        ledgerDirsManager,
        indexDirsManager,
        stateManager,
        checkpointSource,
        syncThread,
        statsLogger,
        allocator);


    /**
     * HandleFactoryImpl 用来获取 handle,这里的 handle 是 LedgerDescriptor,是 ledger 的实现
     * 主要负责向 ledger addEntry 和或者从ledger readeEntry
     */
    handles = new HandleFactoryImpl(ledgerStorage);

    // Expose Stats
    this.bookieStats = new BookieStats(statsLogger);
}

参考: https://www.jianshu.com/p/776028224419

apache bookkeeper bookie 启动流程源码分析

Apache BookKeeper 是一个可以方便扩展,高可用,低延迟的存储系统。BookKeeper 专门为 append-only 的工作模式提供了优化,在以下的应用场景中非常适用:

  • WAL (Write-Ahead-Logging), 例如 HDFS 的 NameNode

  • 消息存储系统,例如 Apache Pulsar

  • Offset/Cursor 存储系统,例如在 Apache Pulsar 中用来存储消息消费位置

  • Object/Blob Store 对象存储系统,例如存储状态机的 snapshots

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) {
int retCode = doMain(args);
Runtime.getRuntime().exit(retCode);
}

static int doMain(String[] args) {

ServerConfiguration conf;

// 0. parse command line
try {
conf = parseCommandLine(args);
} catch (IllegalArgumentException iae) {
return ExitCode.INVALID_CONF;
}

// 1. building the component stack:
LifecycleComponent server;
try {
server = buildBookieServer(new BookieConfiguration(conf));
} catch (Exception e) {
log.error("Failed to build bookie server", e);
return ExitCode.SERVER_EXCEPTION;
}

// 2. start the server
try {
ComponentStarter.startComponent(server).get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// the server is interrupted
log.info("Bookie server is interrupted. Exiting ...");
} catch (ExecutionException ee) {
log.error("Error in bookie shutdown", ee.getCause());
return ExitCode.SERVER_EXCEPTION;
}
return ExitCode.OK;
}

注释很清晰,可以看到启动bookie服务主要做这三步:

  • 解析system property
  • 构建bookie服务所需的组件
  • 启动各个组件

解析system property

1
2
BasicParser parser = new BasicParser();
CommandLine cmdLine = parser.parse(BK_OPTS, args);

system property是java应用程序自身指定的变量,通常我们可以在启动应用的时候指定的,格式是:-DsystemPropertyKey=systemPropertyValue(楼主在本地启动bookie服务在idea设置的Program rguments:–conf /Volumes/longmao/bookkeeper-confg/b1.conf),解析system property使用了apache开源工具commons-cli(自己写应用或框架可以借鉴下其写法,用来加载应用自定义的配置)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static {
BK_OPTS.addOption("c", "conf", true, "Configuration for Bookie Server");
BK_OPTS.addOption("withAutoRecovery", false,
"Start Autorecovery service Bookie server");
BK_OPTS.addOption("r", "readOnly", false,
"Force Start a ReadOnly Bookie server");
BK_OPTS.addOption("z", "zkserver", true, "Zookeeper Server");
BK_OPTS.addOption("m", "zkledgerpath", true, "Zookeeper ledgers root path");
BK_OPTS.addOption("p", "bookieport", true, "bookie port exported");
BK_OPTS.addOption("j", "journal", true, "bookie journal directory");
Option indexDirs = new Option ("i", "indexdirs", true, "bookie index directories");
indexDirs.setArgs(10);
BK_OPTS.addOption(indexDirs);
Option ledgerDirs = new Option ("l", "ledgerdirs", true, "bookie ledgers directories");
ledgerDirs.setArgs(10);
BK_OPTS.addOption(ledgerDirs);
BK_OPTS.addOption("h", "help", false, "Print help message");
}

含义:

  • -c/–conf:使用的配置文件
  • -r/–readOnly:是否只读

构建bookie服务所需的组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throws Exception {
LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder().withName("bookie-server");

// 1. build stats provider
StatsProviderService statsProviderService =
new StatsProviderService(conf);
StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");

serverBuilder.addComponent(statsProviderService);
log.info("Load lifecycle component : {}", StatsProviderService.class.getName());

// 2. build bookie server
BookieService bookieService =
new BookieService(conf, rootStatsLogger);

serverBuilder.addComponent(bookieService);
log.info("Load lifecycle component : {}", BookieService.class.getName());

if (conf.getServerConf().isLocalScrubEnabled()) {
serverBuilder.addComponent(
new ScrubberService(
rootStatsLogger.scope(ScrubberStats.SCOPE),
conf, bookieService.getServer().getBookie().getLedgerStorage()));
}

// 3. build auto recovery
if (conf.getServerConf().isAutoRecoveryDaemonEnabled()) {
AutoRecoveryService autoRecoveryService =
new AutoRecoveryService(conf, rootStatsLogger.scope(REPLICATION_SCOPE));

serverBuilder.addComponent(autoRecoveryService);
log.info("Load lifecycle component : {}", AutoRecoveryService.class.getName());
}

// 4. build http service
if (conf.getServerConf().isHttpServerEnabled()) {
BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
.setBookieServer(bookieService.getServer())
.setServerConfiguration(conf.getServerConf())
.setStatsProvider(statsProviderService.getStatsProvider())
.build();
HttpService httpService =
new HttpService(provider, conf, rootStatsLogger);

serverBuilder.addComponent(httpService);
log.info("Load lifecycle component : {}", HttpService.class.getName());
}

// 5. build extra services
String[] extraComponents = conf.getServerConf().getExtraServerComponents();
if (null != extraComponents) {
try {
List<ServerLifecycleComponent> components = loadServerComponents(
extraComponents,
conf,
rootStatsLogger);
for (ServerLifecycleComponent component : components) {
serverBuilder.addComponent(component);
log.info("Load lifecycle component : {}", component.getClass().getName());
}
} catch (Exception e) {
if (conf.getServerConf().getIgnoreExtraServerComponentsStartupFailures()) {
log.info("Failed to load extra components '{}' - {}. Continuing without those components.",
StringUtils.join(extraComponents), e.getMessage());
} else {
throw e;
}
}
}

return serverBuilder.build();
}

利用第一步解析配置生成的BookieConfiguration对象构造bookie服务依赖的组件,bookie启动过程中需要启动的服务组件:

  • StatsProviderService 指标服务
  • BookieService bookie服务
  • AutoRecoveryService 自动恢复服务
  • HttpService http rest服务
  • 其它服务

利用了LifecycleComponentStack这各类保存了需要启动的组件并规定了组件生命周期内执行的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void start() {
components.forEach(component -> component.start());
}

@Override
public void stop() {
components.reverse().forEach(component -> component.stop());
}

@Override
public void close() {
components.reverse().forEach(component -> component.close());
}

有没有觉得很熟悉,tomcat源码里也有类似的设计,tomcat中的接口:org.apache.catalina.Lifecycle 定义了容器生命周期、容器状态转换及容器状态迁移事件的监听器和移除等主要接口,tomcat里的组件StandardHost等实现了这个接口,维护自身生命周期运转过程中要执行的逻辑。只能说优秀的源码套路都是差不多^_^

启动服务

遍历LifecycleComponentStack中的
ImmutableList components对象,执行各个服务组件的start方法

Bookeeper基本使用

初始化Bookeeper Client

1
2
3
4
5
6
7
8
9
10
11
12
13

private static final String ZK_ADDR = "127.0.0.1:2181";

try {
//初始化 BookKeeper Client 的方法

BookKeeper bkClient = new BookKeeper(ZK_ADDR);

} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(
"There is an exception throw while creating the BookKeeper client.");
}

创建ledger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
LedgerHandle longmaoHandler = createLedgerSync(bkClient, "longmaoHandler");

System.out.println("longmaoHandler ledgerId:" + longmaoHandler.getId());

public static LedgerHandle createLedgerSync(BookKeeper bkClient, String pw) {
byte[] password = pw.getBytes();
try {
LedgerHandle handle = bkClient.createLedger(BookKeeper.DigestType.MAC, password);
return handle;
} catch (BKException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

向Ledger写入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
//添加数据
long maomao = addEntry(longmaoHandler, "maomao");

public static long addEntry(LedgerHandle ledgerHandle, String msg) {
try {
return ledgerHandle.addEntry(msg.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return -1;
}

读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//读取数据
Enumeration<LedgerEntry> ledgerEntryEnumeration = readEntry(longmaoHandler);

System.out.println(ledgerEntryEnumeration);
while (ledgerEntryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = ledgerEntryEnumeration.nextElement();
System.out.println("读取数据");
System.out.println(new String(ledgerEntry.getEntry()));
}

public static Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle) {
try {
return ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return null;
}

java并发基础-01

java对象内存布局

首先要明确的是java对象大小必须是8的倍数,对象头占12字节

java 对象分为三部分:对象头(12字节)、实例数据、对齐填充(保证整个对象大小是8的倍数)

openJDK有个工具包,可以打印对象的内存布局:

1
2
3
4
5
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>

打印java对象内存布局:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Mao {

boolean flag;

int a;
}

public class TestMao {

public static void main(String[] args) {
Mao mao = new Mao();
System.out.println(ClassLayout.parseInstance(mao).toPrintable());
}
}

结果:

1
2
3
4
5
6
7
8
9
10
learn.Mao object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 43 c1 00 f8 (01000011 11000001 00000000 11111000) (-134168253)
12 4 int Mao.a 0
16 1 boolean Mao.flag false
17 7 (loss due to the next object alignment)
Instance size: 24 bytes
Space losses: 0 bytes internal + 7 bytes external = 7 bytes total

可以看到jvm为了保证对象大小是8的倍数,使用了7个字节来填充。

Unsafe 魔法类

R大的官方解释:Unsafe是用于在实质上扩展Java语言表达能力、便于在更高层(Java层)代码里实现原本要在更低层(C层)实现的核心库功能用的。这些功能包括裸内存的申请/释放/访问,低层硬件的atomic/volatile支持,创建未初始化对象等。它原本的设计就只应该被标准库使用。

Unsafe类不能被直接new出来使用,原因是其构造方法是私有的,Unsafe的初始化方法主要是通过getUnsafe方法的单例模式实现,在getUnsafe方法里限定了只有BootStrap classLoader 才能对其进行加载,否则抛出SecurityException

1
2
3
4
5
6
7
8
9
10
11
12
13
//构造方法
private Unsafe() {
}

private static final Unsafe theUnsafe;
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

使用Unsafe的方法,使用反射:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UnsafeInstance {

public static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

Unsafe是java并发包的基石。

Diamond架构解析

Diamond主要提供持久配置的发布和订阅服务,最大特点是结构简单,稳定可靠。主要的使用场景:TDDL使用Diamond动态切换数据库,动态扩容等;业务使用Diamond推送系统开关配置。Diamond产品专注于高可用性,基于此在架构、容灾机制、数据获取模型上有一些与同类产品的不同之处——阿里巴巴Diamond介绍

diamond架构图

无标题2.png

配置信息存放在哪里

无标题3.png

gourp_info表存组的信息,组的概念是用来区分不同环境的(奇怪的是,阿里开源出来的Diamond并未使用到这个表~~)
config_info表用来存放具体的配置信息

数据保存逻辑

diamond3.png

对应的AdminController的postConfig方法,保存顺序:数据库->更新md5cache->本地磁盘->通知其他节点更新;

md5cache是用ConcurrentHashMap存储的,key是group/dataId,
value是content的md5字符串,MD5类是单例的,对数据进行md5的算法加了锁。客户端来请求配置信息(带着group和dataId),服务端会查看内存中的md5cache是否有对应的配置信息(内存中取值,速度快)

通知其他节点更新的方式:首先获取node.properties保存的地址,然后通过http方式发送请求。
其它节点从数据库获取最新配置信息然后保存到本机磁盘上

客户端和服务端如何交互

diamond4.png
diamond5.png

  • 利用工厂类DiamondClientFactory创建单例订阅者类。
  • 将客户端创建的侦听器类添加到侦听器管理list中并注入到新创建的订阅者类中。
  • 为订阅者设置dataId和groupId。
  • 启动订阅者线程,开始轮询消息。

配置变更客户端如何感知

diamond6.png

  • 方法内部启动一个定时线程,客户端第一次启动60秒后执行一次获取最新配置信息,后续默认每隔15秒执行一次
  • 方法内部实际上三个主方法分别是:

    checkLocalConfigInfo:主要是检查本地数据是否有更新,如果没有则返回,有则返回最新数据,并通知客户端配置的listener。

    checkDiamondServerConfigInfo:远程调用服务端,获取最新修改的配置数据并通知客户端listener。

    checkSnapshot:主要是持久化数据信息用的方法。

Diamond缺陷及改进思路

  • 界面过于简单、不够美观
  • 权限控制不够精细
  • 没有灰度发布功能
  • 配置信息变更无法及时生效

h2-insert语句流程分析

1
INSERT INTO TEST(ID, NAME) VALUES(3000, 'aaa');

解析得到Insert命令

1
2
3
4
5
6
7
8
9
10
org.h2.jdbc.JdbcStatement#execute(java.lang.String)
->org.h2.jdbc.JdbcStatement#executeInternal
->org.h2.jdbc.JdbcConnection#prepareCommand(java.lang.String, int)
->org.h2.engine.Session#prepareCommand
->org.h2.engine.Session#prepareLocal
->org.h2.command.Parser#prepareCommand
->org.h2.command.Parser#parse(java.lang.String)
->org.h2.command.Parser#parse(java.lang.String, boolean)
->org.h2.command.Parser#parsePrepared
->org.h2.command.Parser#parseInsert

关键步骤是进入 org.h2.command.Parser#parseInsert 新建了一个Insert命令

  • 校验insert的表是否存在

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private Table readTableOrView(String tableName) {
    if (schemaName != null) {
    Table table = getSchema().resolveTableOrView(session, tableName);
    if (table != null) {
    return table;
    }
    } else {
    //首先查看 PUBLIC Schema里的tablesAndViews 是否存在key为tableName
    Table table = database.getSchema(session.getCurrentSchemaName())
    .resolveTableOrView(session, tableName);
    if (table != null) {
    return table;
    }
    //遍历该session对应Schema 中tablesAndViews 是否存在key为tableName
    String[] schemaNames = session.getSchemaSearchPath();
    if (schemaNames != null) {
    for (String name : schemaNames) {
    Schema s = database.getSchema(name);
    table = s.resolveTableOrView(session, tableName);
    if (table != null) {
    return table;
    }
    }
    }
    }
    if (isDualTable(tableName)) {
    return new DualTable(database);
    }
    //不存在则抛异常 不能insert一个数据库不存在的表吧^_^
    throw DbException.get(ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1, tableName);
    }

h2-create语句流程分析

1
stat.execute("create table test(id int primary key, name varchar(255))");

总流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//解析生成Command
CommandInterface command = conn.prepareCommand(sql, fetchSize);
boolean lazy = false;
boolean returnsResultSet;
synchronized (session) {
setExecutingStatement(command);
try {
if (command.isQuery()) {
returnsResultSet = true;
boolean scrollable = resultSetType != ResultSet.TYPE_FORWARD_ONLY;
boolean updatable = resultSetConcurrency == ResultSet.CONCUR_UPDATABLE;
ResultInterface result = command.executeQuery(maxRows, scrollable);
lazy = result.isLazy();
resultSet = new JdbcResultSet(conn, this, command, result, id,
closedByResultSet, scrollable, updatable);
} else {
returnsResultSet = false;
//执行Command命令
ResultWithGeneratedKeys result = command.executeUpdate(
conn.scopeGeneratedKeys() ? null : generatedKeysRequest);
updateCount = result.getUpdateCount();
ResultInterface gk = result.getGeneratedKeys();
if (gk != null) {
generatedKeys = new JdbcResultSet(conn, this, command, gk, id,
false, true, false);
}
}
} finally {
if (!lazy) {
setExecutingStatement(null);
}
}
}

主要就是两步:

  • 解析sql语句生成Command
  • 执行Command命令

解析sql语句生成Command

1
2
3
4
5
6
org.h2.command.Parser#prepareCommand
->org.h2.command.Parser#parse(java.lang.String)
->org.h2.command.Parser#parsePrepared(这一步解析出是create语句)
->org.h2.command.Parser#parseCreate
->org.h2.command.Parser#parseCreateTable
->org.h2.command.Parser#parseTableColumnDefinition
  • org.h2.command.Parser#parseCreateTable这一步会新建一个CreateTable对象
1
2
3
4
5
6
7
8
9
//获取schema 构建创建sql语句
Schema schema = getSchema();
CreateTable command = new CreateTable(session, schema);
command.setPersistIndexes(persistIndexes);
command.setTemporary(temp);
command.setGlobalTemporary(globalTemp);
command.setIfNotExists(ifNotExists);
command.setTableName(tableName);
command.setComment(readCommentIf());

这个Schema就是创建DataBase对象时生成的PUBLIC Schema

  • org.h2.command.Parser#parseTableColumnDefinition这一步会解析出列-Column对象
1
2
3
4
5
6
7
8
9
10
11
12
13
Column column = parseColumnForTable(columnName, true, true);
if (column.isAutoIncrement() && column.isPrimaryKey()) {
column.setPrimaryKey(false);
IndexColumn[] cols = { new IndexColumn() };
cols[0].columnName = column.getName();
AlterTableAddConstraint pk = new AlterTableAddConstraint(
session, schema, false);
pk.setType(CommandInterface.ALTER_TABLE_ADD_CONSTRAINT_PRIMARY_KEY);
pk.setTableName(tableName);
pk.setIndexColumns(cols);
command.addConstraintCommand(pk);
}
command.addColumn(column);

这里最重要的是往 CreateTable 的columns 变量添加column。最终会返回一个CreateTable的Command

执行update命令

1
2
3
org.h2.command.Command#executeUpdate
->org.h2.command.CommandContainer#update
->org.h2.command.ddl.CreateTable#update
  • org.h2.command.CommandContainer#update这步会新建一个表对象-Table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
changePrimaryKeysToNotNull(data.columns);
data.id = getObjectId();
data.create = create;
data.session = session;
//根据schema创建table
Table table = getSchema().createTable(data);

ArrayList<Sequence> sequences = generateSequences(data.columns, data.temporary);
table.setComment(comment);
if (isSessionTemporary) {
if (onCommitDrop) {
table.setOnCommitDrop(true);
}
if (onCommitTruncate) {
table.setOnCommitTruncate(true);
}
session.addLocalTempTable(table);
} else {
db.lockMeta(session);
db.addSchemaObject(session, table);
}

其中db.addSchemaObject(session, table); 会往PUBLIC这个Schema的变量tablesAndViews(类型为ConcurrentHashMap)出入变量:

  • key: TEST
  • value: 对应的建表语句生成的Table对象

表对象有了,接下来的insert语句会去PUBLIC这个Schema根据key找对应表对象

浅析h2数据库存储引擎-mvStore

设置存储引擎

在1.4以前h2使用的存储引擎是pageStore,现在还保留在org.h2.pagestore下。要想使用旧存储引擎PageStore可以在jdbc链接后面设置:

1
jdbc:h2:~/test;MV_STORE=false

则会使用旧的存储引擎,默认是true,即使用mvStore。存储引擎初始化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//DataBase.java
/**
* 初始化存储引擎
*
* @return
*/

public PageStore getPageStore() {
//MV_STORE=false/true 默认true 表明存储引擎使用 MvStore
if (dbSettings.mvStore) {
if (store == null) {
//传dataBase对象进去 初始化 MVTableEngine.Store(包含 MvStore TransactionStore)
store = MVTableEngine.init(this);
}
return null;
}
//存储引擎使用 pageStore
synchronized (this) {
if (pageStore == null) {
pageStore = new PageStore(this, databaseName +
Constants.SUFFIX_PAGE_FILE, accessModeData, cacheSize);
if (pageSize != Constants.DEFAULT_PAGE_SIZE) {
pageStore.setPageSize(pageSize);
}
if (!readOnly && fileLockMethod == FileLockMethod.FS) {
pageStore.setLockFile(true);
}
pageStore.setLogMode(logMode);
pageStore.open();
}
return pageStore;
}
}

MVStore初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static Store init(final Database db) {
Store store = db.getStore();
if (store != null) {
return store;
}
byte[] key = db.getFileEncryptionKey();
String dbPath = db.getDatabasePath();
MVStore.Builder builder = new MVStore.Builder();
store = new Store();
boolean encrypted = false;
if (dbPath != null) {
String fileName = dbPath + Constants.SUFFIX_MV_FILE;
MVStoreTool.compactCleanUp(fileName);
builder.fileName(fileName);
builder.pageSplitSize(db.getPageSize());
if (db.isReadOnly()) {
builder.readOnly();
} else {
// possibly create the directory
boolean exists = FileUtils.exists(fileName);
if (exists && !FileUtils.canWrite(fileName)) {
// read only
} else {
String dir = FileUtils.getParent(fileName);
FileUtils.createDirectories(dir);
}
int autoCompactFillRate = db.getSettings().maxCompactCount;
if (autoCompactFillRate <= 100) {
builder.autoCompactFillRate(autoCompactFillRate);
}
}
if (key != null) {
encrypted = true;
builder.encryptionKey(decodePassword(key));
}
if (db.getSettings().compressData) {
builder.compress();
// use a larger page split size to improve the compression ratio
builder.pageSplitSize(64 * 1024);
}
builder.backgroundExceptionHandler(new UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
db.setBackgroundException(DbException.convert(e));
}

});
// always start without background thread first, and if necessary,
// it will be set up later, after db has been fully started,
// otherwise background thread would compete for store lock
// with maps opening procedure
builder.autoCommitDisabled();
}
store.open(db, builder, encrypted);
db.setStore(store);
return store;
}


/**
* Open the store for this database.
*
* @param db the database
* @param builder the builder
* @param encrypted whether the store is encrypted
*/

void open(Database db, MVStore.Builder builder, boolean encrypted) {
this.encrypted = encrypted;
try {
//初始化MvStore
this.mvStore = builder.open();
FileStore fs = mvStore.getFileStore();
if (fs != null) {
this.fileName = fs.getFileName();
}
if (!db.getSettings().reuseSpace) {
mvStore.setReuseSpace(false);
}
mvStore.setVersionsToKeep(0);
//初始化 TransactionStore
this.transactionStore = new TransactionStore(mvStore,
new ValueDataType(db, null), db.getLockTimeout());
} catch (IllegalStateException e) {
throw convertIllegalStateException(e);
}
}

首先创建MVStore.Builder对象,利用MVStore.Builder对象创建MVStore,MVStore.Builder可配置的参数:

  • autoCommitBufferSize
  • autoCompactFillRate
  • backgroundExceptionHandler
  • cacheSize
  • compress (LZF和Deflate)
  • encryptionKey
  • fileName
  • fileStore
  • pageSplitSize
  • readOnly

表与存储引擎的关系

一个数据库对应:

  • 一个MVStore实例
  • 一个TransactionStore实例
  • 一个preparedTransactions MVMap
  • 一个undoLog MVMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
MVStore(Map<String, Object> config) {
recoveryMode = config.containsKey("recoveryMode");
compressionLevel = DataUtils.getConfigParam(config, "compress", 0);
String fileName = (String) config.get("fileName");
FileStore fileStore = (FileStore) config.get("fileStore");
fileStoreIsProvided = fileStore != null;
if(fileStore == null && fileName != null) {
fileStore = new FileStore();
}
this.fileStore = fileStore;

int pgSplitSize = 48; // for "mem:" case it is # of keys
CacheLongKeyLIRS.Config cc = null;
if (this.fileStore != null) {
int mb = DataUtils.getConfigParam(config, "cacheSize", 16);
if (mb > 0) {
cc = new CacheLongKeyLIRS.Config();
cc.maxMemory = mb * 1024L * 1024L;
Object o = config.get("cacheConcurrency");
if (o != null) {
cc.segmentCount = (Integer)o;
}
}
pgSplitSize = 16 * 1024;
}
if (cc != null) {
cache = new CacheLongKeyLIRS<>(cc);
} else {
cache = null;
}

pgSplitSize = DataUtils.getConfigParam(config, "pageSplitSize", pgSplitSize);
// Make sure pages will fit into cache
if (cache != null && pgSplitSize > cache.getMaxItemSize()) {
pgSplitSize = (int)cache.getMaxItemSize();
}
pageSplitSize = pgSplitSize;
keysPerPage = DataUtils.getConfigParam(config, "keysPerPage", 48);
backgroundExceptionHandler =
(UncaughtExceptionHandler)config.get("backgroundExceptionHandler");
meta = new MVMap<>(this);
if (this.fileStore != null) {
retentionTime = this.fileStore.getDefaultRetentionTime();
// 19 KB memory is about 1 KB storage
int kb = Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024;
kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", kb);
autoCommitMemory = kb * 1024;
autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 90);
char[] encryptionKey = (char[]) config.get("encryptionKey");
try {
if (!fileStoreIsProvided) {
boolean readOnly = config.containsKey("readOnly");
//创建数据库文件 *.db
this.fileStore.open(fileName, readOnly, encryptionKey);
}
//往 *.db 写数据
if (this.fileStore.size() == 0) {
creationTime = getTimeAbsolute();
lastCommitTime = creationTime;
storeHeader.put(HDR_H, 2);
storeHeader.put(HDR_BLOCK_SIZE, BLOCK_SIZE);
storeHeader.put(HDR_FORMAT, FORMAT_WRITE);
storeHeader.put(HDR_CREATED, creationTime);
writeStoreHeader();
} else {
// there is no need to lock store here, since it is not opened yet,
// just to make some assertions happy, when they ensure single-threaded access
storeLock.lock();
try {
readStoreHeader();
} finally {
storeLock.unlock();
}
}
} catch (IllegalStateException e) {
panic(e);
} finally {
if (encryptionKey != null) {
Arrays.fill(encryptionKey, (char) 0);
}
}
lastCommitTime = getTimeSinceCreation();

scrubMetaMap();

// setAutoCommitDelay starts the thread, but only if
// the parameter is different from the old value
int delay = DataUtils.getConfigParam(config, "autoCommitDelay", 1000);
setAutoCommitDelay(delay);
} else {
autoCommitMemory = 0;
autoCompactFillRate = 0;
}
}

MVStore 初始化了7个MVMap,新建一个表对底层存储引擎来说其实就是新建一个MVMap,然后往这个MVMap插入Page,最终数据落到硬盘里的*.db文件里。

  • meta
    id:0
    name:meta

  • openTransactions
    id:1
    name: openTransactions

  • undoLog.1
    id:2
    name:undoLog.1

  • table.0
    id:3
    name:table.0

  • lobMap
    id:4
    name:lobMap

  • lobRef
    id:5
    name:lobRef

  • lobData
    id:6
    name:lobData

如何使用Apache BookKeeper构建分布式数据库-03

在这一部分中,我们将深入研究HerdDB,这是一个分布式数据库,依靠BookKeeper来实现自己的日志,并处理我们在先前文章中讨论的所有问题。

在本系列文章中,我想分享一些基本的架构概念,这些架构概念涉及无共享架构的分布式数据库的可能结构。 在第一部分中,我们了解了如何将数据库设计为复制状态机。 在第二部分中,我们将看到Apache BookKeeper如何通过提供强大的机制来构建数据库的预写日志来帮助我们。

现在,我们将深入研究HerdDB,这是一个分布式数据库,依靠BookKeeper来实现自己的日志并处理我们在先前文章中讨论的所有问题。

为什么使用 HerdDB

我们在EmailSuccess.com上启动了HerdDB,这是一个Java应用程序,它使用SQL数据库存储要传递的电子邮件状态。 EmailSuccess是一个MTA(邮件传输代理),即使使用一台机器,它也可以处理具有数百万条消息的数千个队列。

现在,HerdDB已在其他应用程序中使用,例如Apache Pulsar Manager或CarapaceProxy,我们正在MagNews.com平台中使用它。

最初,EmailSucess使用MySQL,但我们需要一个在Java应用程序的同一进程中运行的数据库,以简化部署和管理,例如SQLLite。

我们还需要一个数据库,该数据库可以跨越多台计算机,并可能利用系统固有的多租户体系结构:几台计算机上的数千个独立队列(通常为1到10)。

所以这里是HerdDB! 用Java编写的分布式可嵌入数据库。 如果您想了解更多有关此数据库的信息,请参考Wiki和可用的文档。

HerdDB数据模型

HerdDB数据库由表空间组成。 每个表空间是一组表,并且独立于其他表空间。 每个表都是一个键值存储,它将键(字节数组)映射到一个值(字节数组)。

为了完全替代MySQL,HerdDB附带了一个内置的高效SQL层,它能够支持SQL数据库中您期望的大多数功能,但需要权衡取舍。 我们将Apache Calcite用于所有SQL解析和执行计划。

使用JDBC驱动程序时,您只能访问SQL API,但是HerdDB还有其他应用程序,例如HerdDB Collections Framework直接使用较低级别的模型。

每个表都有一个SQL模式,该模式描述列,主键,约束和索引。

在一个表空间中,HerdDB支持表和跨多行和多表的事务之间的联接。

对于本文的其余部分,我们将仅讨论底层模型:

  • 表空间由表组成
  • 表是将二进制键映射到二进制值的字典
  • 在表上支持的操作是:INSERT,UPDATE,UPSERT,DELETE,GET和SCAN
  • 事务可以访问一个表空间中的多个表和行

数据和元数据

我们有几层数据和元数据。

群集元数据与整个系统有关:

  • 节点元数据(发现服务)
  • 表空间元数据(分配给它的节点,复制设置)

表空间元数据:

  • 表和索引元数据
  • 检查点元数据

表空间数据:

  • 未提交事务的数据
  • 临时操作数据
  • wal(预写入日志)

表数据:

  • 记录
  • 索引
  • 检查点元数据

当HerdDB在集群模式下运行时,我们将集群元数据存储在Apache ZooKeeper上,并将表空间元数据和数据存储在本地磁盘上。 预写日志位于Apache BookKeeper上。

发现服务和网络架构

我们有一组参与集群的机器,我们称它们为节点。每个节点都有一个ID,该ID在集群中唯一地对其进行标识。

对于每个节点,我们将所有有用的信息存储在ZooKeeper上,以查找节点,例如当前的网络地址和支持的协议(TLS可用性)。这样,如果您没有固定的iPhone地址或DNS名称,则可以轻松更改网络地址。

对于每个表空间,我们定义了一组副本节点。每个节点都存储整个表空间的副本。发生这种情况是因为我们支持跨多个记录的查询和事务:您可以通过单个操作访问或修改许多表中的许多记录,这必须非常有效。

选出其中一个节点作为领导者,然后将其他节点命名为跟随者。

客户端向表空间发出操作时,它将使用表空间元数据来定位领导节点,并使用发现服务来定位当前网络地址。所有这些信息都在ZooKeeper上,并在本地缓存。

avatar

服务器节点不互相通信,但是所有更新都通过BookKeeper进行。

仅在跟随者节点进行引导且其本地数据不足以从BookKeeper恢复时,才需要服务器到服务器的通信。

写步骤

写入操作遵循以下流程:

  • 客户端找到表空间的领导节点(元数据在本地缓存)
  • 客户端建立连接(连接被池化)
  • 客户端发送写请求
  • 节点解析SQL并计划执行操作
  • 验证操作并为执行做好准备(行级锁,表空间检查点锁,约束验证,新值的计算……)
  • 日志条目已排队,可以写入日志。
  • BookKeeper将entry发送到法定数量的Bookies(写入法定bookies大小配置参数)
  • 配置的Bookies数量(ack quorum size配置参数)确认已写入,BookKeeper客户端唤醒。
  • 该操作的效果将应用于表的内存副本中的本地
  • 执行清理操作(释放行级别锁,表空间检查点锁…)
  • 该写操作已确认给客户端

这些步骤大多数都是异步的,因此可以提高吞吐量。

跟随者节点一直在跟踪日志:它们侦听BookKeeper的新entry,并将相同的更改应用于表的内存副本中的本地

我们正在使用长轮询读取模式以节省资源。 请检查BookKeeper文档中的ReadHandle#readLastAddConfirmedAndEntry。

切换到新的领导者

BookKeeper保证追随者最终将与表的最新版本保持最新,但我们必须将故事的其余部分全部实现。

我们有多个节点。 一个节点是领导者,另一个是跟随者。 但是,如何保证领导者只有一个节点呢? 我们必须处理网络分区。

对于每个表空间,我们在ZooKeeper中存储一个描述所有这些元数据的结构(表空间元数据),尤其是:

  • 一组保存数据的节点
  • 当前领导节点
  • 复制参数:
    • 预期副本数
    • 最大领导者活动超时时间设置

我们没有深入研究HerdDB中leader选举的工作方式。 让我们关注保证系统一致性的机制。

上面的结构对客户和系统管理很有用,但是我们需要另一个数据结构,该数据结构包含构成日志的当前Ledger集合,并且该结构也将成为领导力实施的另一个关键。

我们具有LedgersInfo结构:

  • 生成日志的分类帐列表(activeledgers)
  • 表空间历史记录中的第一个分类帐的ID(firstledgerid)
    领导者节点仅保持一个总账保持打开状态,这始终是lactiveledgers列表末尾的总账。

BookKeeper保证领导者是唯一可以写入日志的领导者,因为分类账只能从一个客户端写入一次。

每个关注者节点都使用LedgerInfo在BookKeeper上查找数据。

当新的跟随者节点启动时,它将检查第一个ledger的ID。 如果第一个ledger仍在活动ledger列表中,则只需读取第一个到最新的分类帐序列即可执行恢复。

如果此ledger不再存在于活动ledger列表中,则它必须找到该ledger并下载完整的数据快照。

将跟随者节点提升为领导者角色时,它将执行两个步骤:

  • 它将使用“recovery”标志打开活动ledger列表中的所有ledger,这将依次隔离当前的ledger
  • 这将打开一个新的ledger进行写入
  • 它将其添加到活动ledger列表中

对LedgersInfo的所有写操作都是使用ZooKeeper内置的比较并设置功能执行的,这保证了只有一个节点可以强制其领导。

如果有两个并发的新领导者试图将其自己的ledger ID附加到列表中,则其中一个将导致对ZooKeeper的写入失败,并且将使引导失败。

检查点

HerdDB无法永远保留所有ledger,因为它们将阻止Bookies回收空间,因此我们必须在可能的情况下将其删除。

每个节点(领导者或跟随者)定期执行检查点操作:

在检查点期间,服务器将其自身的数据本地副本和日志的当前位置合并到日志中:从该时间点开始,直到该位置的日志部分将无用,可以将其删除。

但是在集群模式下,您不能天真地这样做:

  • 您不能只删除ledger的一部分,而只能删除整个ledger
  • 追随者仍在跟踪日志,领导者无法删除尚未使用并在每个其他节点上建立检查点的珍贵数据
  • 追随者不允许更改日志(只有领导者可以触摸LedgerInfo结构)

当前的HerdDB方法是使用一个配置参数来定义ledger的最长生存时间。 在那之后,所有在领导者节点检查点期间无用的旧ledger都将被丢弃。

如果您的追随者人数很少(通常是两个)并且已经启动并正在运行,则这种方法效果很好,目前大多数HerdDB安装就是这种情况。 预期跟随者节点的关闭时间不会超过日志生存时间。

通过这种方式,如果发生这种情况,引导跟随者可以连接到领导者,然后下载最近检查点的快照。

通常,由很少计算机组成的HerdDB集群可容纳数十到数百个表空间,每个节点是某些表空间的领导者,而其他节点则是跟随者。

事务

BookKeeper必须确认每个操作,然后才能将其应用于本地内存并将响应返回给客户端。 BookKeeper将写入内容发送给仲裁中的每个Bookie,然后等待。 这可能很慢!

在事务内,客户希望只有当事务成功提交后,事务操作的结果才会自动应用于表。

无需等待bookies确认属于事务的每个写入,您只需等待并检查最终提交操作的写入结果,因为BookKeeper保证所有写入均能持久且成功地持久保存。

avatar

这并不像您期望的那样简单,例如,您必须处理以下事实:客户端可能在事务上下文中发送长时间运行的操作,并由于某些应用程序级别超时而发出回滚命令:必须将回滚写入到 事务的所有其他操作之后的日志,否则跟随者(以及自我恢复过程中的领导节点本身)将看到一系列奇怪的事件:“开始事务”,“操作”,“回滚事务”以及事务的其他操作那已经不存在了。

回滚新Ledgers

如果领导者没有遇到麻烦并且永远不会重新启动,则它只能保持打开一个ledger并继续对其进行写入。

实际上,这不是一个好主意,因为该ledger将无限制地增长,并且使用BookKeeper不能删除一个分类帐的一部分,而只能删除完整的ledger,因此Bookies将无法回收空间。

在HerdDB中,在配置了一定数量的字节之后,我们将滚动一个新的ledger。这样,您可以在连续写入日志的情况下快速回收空间。

但是BookKeeper保证仅在处理单个ledger时适用。它保证只有当所有其他ID小于该条目ID的条目都已被成功写入时,才会向写入者确认每次写入。

启动新ledger时,必须等待并检查发布到前一个ledger(或至少最后一个ledger)的所有写入的结果。

您还可以看看Apache DistributedLog,它是Apache BookKeeper的高级API,它解决了我在本文中讨论的许多问题。

总结

我们已经看到了BookKeeper的实际应用程序,以及如何使用它来实现分布式数据库的预写日志。 Apache BookKeeper和Apache ZooKeeper提供了处理数据和元数据一致性所需的所有工具。 处理异步操作可能很棘手,您将不得不处理很多极端情况。 您还必须设计日志并让其回收磁盘空间,而又不会阻止跟随者节点的正确行为。

HerdDB仍然是一个年轻的项目,但是它正在关键任务应用程序的生产中运行,社区和产品的增长与新用户提出的用例一样多。

参考资料

https://streamnative.io/blog/tech/2020-05-12-distributed-database-bk3/