从0到1实现一个支付系统

最近离职在家,闲来无事,研究如何实现一个支付系统

服务划分

  • dubbo服务对应端口
工程 端口 描述
pay-service-account 20801 账户服务
pay-service-bank 20802 银行管理服务
pay-service-banklink 20892 银行后置服务
pay-service-boss 20804 运营服务
pay-service-fee 20807 商户计费服务
pay-service-limit 20809 交易限制服务
pay-service-notify 20822 通知服务
pay-service-payrule 20811 支付规则服务
pay-service-remit 20813 打款服务
pay-service-report 20815 报表服务
pay-service-settlement 20816 结算服务
pay-service-trade 20817 交易服务
pay-service-user 20818 用户服务
  • web服务对应端口
工程 端口 描述
pay-web-boss 8083 运营管理系统
pay-web-gateway 8084 支付网关
pay-web-notify-receive 8086 通知消息接收
pay-web-portal 8085 门户系统
pay-web-trade 8087 交易系统

技术架构

管理

  • maven依赖和项目管理
  • git版本控制
  • Jenkins持续构建

后端

  • IoC容器 Spring
  • web框架 SpringMvc
  • orm框架 Mybatis
  • rpc框架 Dubbo
  • 任务调度框架 quartz
  • 缓存 redis
  • 数据源 druid
  • 日志 slf4j+log4j2
  • Json jackson
  • kaptcha 验证码
  • jsp 模板视图

前端

  • jquery js框架
  • easyui 界面框架
  • zTree 树框架

分布式事务:使用tcc-transaction框架

运营管理后台

gitlab源码管理

jenkins服务自动化部署

项目结构

包括dubbo服务启动脚本

待完善点(或者说不懂做)

  • 风控系统:只要老板不想把底裤都赔掉,那就必须上风控。可对互联网公司来说,风控是一个谜一般的话题,无论是对风控专家还是IT工程师而言。机器学习,深度学习,规则推理,随机森林….这些只想说还不知道怎么玩~~
  • 对账系统:每一笔交易,都要做到各参与者的记录能够吻合,没有偏差。对账系统的工作,是发现有差异的记录,即轧帐;然后通过人工或者自动的方式,解决这些差异,即平帐。

龙猫云消息推送系统

系统架构

项目结构

  • mpush:开源的实时消息推送系统,基于该项目改造了其中消息推送流程,使用pulsar订阅推送的消息,作为一个broker
  • push-admin:使用spring-boot搭建的消息推送管理后台
  • alloc:是针对client提供的一个轻量级的负载均衡服务,每次客户端在链接broker之前都要调用下该服务
  • mpush-android:android客户端

使用的开源项目

  • netty
  • mpush
  • pulsar:存储与计算分离的新一代消息中间件
  • herdb:HerdDB 一个JVM-embeddable的分布式数据库,内嵌在broker里使用

功能演示

  • 消息推送管理后台

  • Android客户端

未来计划

  • 完成应用管理功能:用户可以创建多个应用,给应用分配appKey
  • 数据统计:接入新设备统计、消息推送记录、消息到达率统计、消息点击率统计
  • 新建龙猫云推送管理平台:应用计费统计等(计划是一个推送云平台产品,功能待定~~)
  • 推送sdk:建设统一sdk,拿分配到的appKey接入龙猫云推送平台

Apache Bookkeeper BookieService服务启动

Apache BookKeeper 是一个可以方便扩展,高可用,低延迟的存储系统。BookKeeper 专门为 append-only 的工作模式提供了优化,在以下的应用场景中非常适用:

  • WAL (Write-Ahead-Logging), 例如 HDFS 的 NameNode

  • 消息存储系统,例如 Apache Pulsar

  • Offset/Cursor 存储系统,例如在 Apache Pulsar 中用来存储消息消费位置

  • Object/Blob Store 对象存储系统,例如存储状态机的 snapshots

avatar

初始化BookieServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
	public BookieService(BookieConfiguration conf,
StatsLogger statsLogger)

throws Exception {

super(NAME, conf, statsLogger);
this.server = new BookieServer(conf.getServerConf(), statsLogger);
}


//BookieServer.java

public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException,
BookieException, UnavailableException, CompatibilityException, SecurityException {

this.conf = conf;
//校验用户权限
validateUser(conf);
String configAsString;
try {
configAsString = conf.asJson();
LOG.info(configAsString);
} catch (ParseJsonException pe) {
LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
}

//构造内存分配器
ByteBufAllocator allocator = getAllocator(conf);
this.statsLogger = statsLogger;
//构造NettyServer
this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
try {
//构造Bookie实例
this.bookie = newBookie(conf, allocator);
} catch (IOException | KeeperException | InterruptedException | BookieException e) {
// interrupted on constructing a bookie
this.nettyServer.shutdown();
throw e;
}
final SecurityHandlerFactory shFactory;

shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
//指定NettyServer的RequestProcessor
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE), shFactory, bookie.getAllocator());
this.nettyServer.setRequestProcessor(this.requestProcessor);
}

BookieServer共有4大组件要初始化

  • StatsLogger:
  • Bookie:bookie实例
  • BookieNettyServer:其实就是一个netty服务端
  • DeathWatcher:一个线程,观察bookie和netty是否还存活

初始化Bookie

public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
        throws IOException, InterruptedException, BookieException {
    super("Bookie-" + conf.getBookiePort());
    this.statsLogger = statsLogger;
    this.conf = conf;
    // 从配置文件中获取journal 目录 list,然后在在每个目录下创建一个current目录
    this.journalDirectories = Lists.newArrayList();
    for (File journalDirectory : conf.getJournalDirs()) {
        this.journalDirectories.add(getCurrentDirectory(journalDirectory));
    }
    /**
     *  初始化DiskChecker,有两个参数 diskUsageThreshold 和 diskUsageWarnThreshold
     *  diskUsageThreshold表示磁盘的最大使用率,默认是0.95,目录列表中的所有目录都超过限制之后
     *  如果bookie配置可以以readonly模式运行,就会转化为readonly状态,否则会停止;
     *  diskUsageWarnThreshold 表示磁盘使用的告警阈值,默认是0.90,超过这个值会抛出
     *  DiskWarnThresholdException,并且会触发gc,当使用率低于这个值时,目录重新变为可写状态
     **/
    DiskChecker diskChecker = createDiskChecker(conf);
    //为ledger和index创建LedgerDirsManager,用来管理ledger和index的目录列表
    this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
    this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
                                                   this.ledgerDirsManager);
    this.allocator = allocator;

    // instantiate zookeeper client to initialize ledger manager
    //初始化zk 客户端
    this.metadataDriver = instantiateMetadataDriver(conf);
    checkEnvironment(this.metadataDriver);
    try {
        if (this.metadataDriver != null) {
            // current the registration manager is zookeeper only
            // 初始化ledgerManagerFactory,用于生成ledgerManager
            ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
            LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
            // ledgerManager负责和zk等元数据存储交互,用来管理ledger的元数据信息
            ledgerManager = ledgerManagerFactory.newLedgerManager();
        } else {
            ledgerManagerFactory = null;
            ledgerManager = null;
        }
    } catch (MetadataException e) {
        throw new MetadataStoreException("Failed to initialize ledger manager", e);
    }
    // 初始化状态管理器
    stateManager = initializeStateManager();
    // register shutdown handler using trigger mode
    stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
    // Initialise dirsMonitor. This would look through all the
    // configured directories. When disk errors or all the ledger
    // directories are full, would throws exception and fail bookie startup.

    //LedgerDirsMonitor, 监控所有配置的目录,如果发现磁盘错误或者所有的leger 目录都满,就抛出异常,
    //bookie启动失败
    List<LedgerDirsManager> dirsManagers = new ArrayList<>();
    dirsManagers.add(ledgerDirsManager);
    if (indexDirsManager != ledgerDirsManager) {
        dirsManagers.add(indexDirsManager);
    }
    this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker, dirsManagers);
    try {
        this.dirsMonitor.init();
    } catch (NoWritableLedgerDirException nle) {
        // start in read-only mode if no writable dirs and read-only allowed
        if (!conf.isReadOnlyModeEnabled()) {
            throw nle;
        } else {
            this.stateManager.transitionToReadOnlyMode();
        }
    }

    // instantiate the journals 初始化journal
    journals = Lists.newArrayList();
    for (int i = 0; i < journalDirectories.size(); i++) {
        journals.add(new Journal(i, journalDirectories.get(i),
                conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
    }

    this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
    CheckpointSource checkpointSource = new CheckpointSourceList(journals);

    //初始化ledgerStore,默认是一个 SortedLedgerStorage
    ledgerStorage = buildLedgerStorage(conf);

    boolean isDbLedgerStorage = ledgerStorage instanceof DbLedgerStorage;

    /*
     * with this change https://github.com/apache/bookkeeper/pull/677,
     * LedgerStorage drives the checkpoint logic.
     *
     * <p>There are two exceptions:
     *
     * 1) with multiple entry logs, checkpoint logic based on a entry log is
     *    not possible, hence it needs to be timebased recurring thing and
     *    it is driven by SyncThread. SyncThread.start does that and it is
     *    started in Bookie.start method.
     *
     * 2) DbLedgerStorage
     */

    /**
     * 一般都是由 LedgerStorage来驱动checkpoint 逻辑,但是有两个例外:
     * 1. 有多个entry logs,checkpoint逻辑不能依赖于一个entry log,应该是一个基于时间的循环,有SyncThread驱动
     * 2. DbLegerStorage
     */
    if (entryLogPerLedgerEnabled || isDbLedgerStorage) {
        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
            @Override
            public void startCheckpoint(Checkpoint checkpoint) {
                /*
                 * in the case of entryLogPerLedgerEnabled, LedgerStorage
                 * dont drive checkpoint logic, but instead it is done
                 * periodically by SyncThread. So startCheckpoint which
                 * will be called by LedgerStorage will be no-op.
                 */
            }

            @Override
            public void start() {
                executor.scheduleAtFixedRate(() -> {
                    doCheckpoint(checkpointSource.newCheckpoint());
                }, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
            }
        };
    } else {
        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
    }

    //初始化LedgerStorage
    ledgerStorage.initialize(
        conf,
        ledgerManager,
        ledgerDirsManager,
        indexDirsManager,
        stateManager,
        checkpointSource,
        syncThread,
        statsLogger,
        allocator);


    /**
     * HandleFactoryImpl 用来获取 handle,这里的 handle 是 LedgerDescriptor,是 ledger 的实现
     * 主要负责向 ledger addEntry 和或者从ledger readeEntry
     */
    handles = new HandleFactoryImpl(ledgerStorage);

    // Expose Stats
    this.bookieStats = new BookieStats(statsLogger);
}

参考: https://www.jianshu.com/p/776028224419

apache bookkeeper bookie 启动流程源码分析

Apache BookKeeper 是一个可以方便扩展,高可用,低延迟的存储系统。BookKeeper 专门为 append-only 的工作模式提供了优化,在以下的应用场景中非常适用:

  • WAL (Write-Ahead-Logging), 例如 HDFS 的 NameNode

  • 消息存储系统,例如 Apache Pulsar

  • Offset/Cursor 存储系统,例如在 Apache Pulsar 中用来存储消息消费位置

  • Object/Blob Store 对象存储系统,例如存储状态机的 snapshots

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) {
int retCode = doMain(args);
Runtime.getRuntime().exit(retCode);
}

static int doMain(String[] args) {

ServerConfiguration conf;

// 0. parse command line
try {
conf = parseCommandLine(args);
} catch (IllegalArgumentException iae) {
return ExitCode.INVALID_CONF;
}

// 1. building the component stack:
LifecycleComponent server;
try {
server = buildBookieServer(new BookieConfiguration(conf));
} catch (Exception e) {
log.error("Failed to build bookie server", e);
return ExitCode.SERVER_EXCEPTION;
}

// 2. start the server
try {
ComponentStarter.startComponent(server).get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// the server is interrupted
log.info("Bookie server is interrupted. Exiting ...");
} catch (ExecutionException ee) {
log.error("Error in bookie shutdown", ee.getCause());
return ExitCode.SERVER_EXCEPTION;
}
return ExitCode.OK;
}

注释很清晰,可以看到启动bookie服务主要做这三步:

  • 解析system property
  • 构建bookie服务所需的组件
  • 启动各个组件

解析system property

1
2
BasicParser parser = new BasicParser();
CommandLine cmdLine = parser.parse(BK_OPTS, args);

system property是java应用程序自身指定的变量,通常我们可以在启动应用的时候指定的,格式是:-DsystemPropertyKey=systemPropertyValue(楼主在本地启动bookie服务在idea设置的Program rguments:–conf /Volumes/longmao/bookkeeper-confg/b1.conf),解析system property使用了apache开源工具commons-cli(自己写应用或框架可以借鉴下其写法,用来加载应用自定义的配置)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static {
BK_OPTS.addOption("c", "conf", true, "Configuration for Bookie Server");
BK_OPTS.addOption("withAutoRecovery", false,
"Start Autorecovery service Bookie server");
BK_OPTS.addOption("r", "readOnly", false,
"Force Start a ReadOnly Bookie server");
BK_OPTS.addOption("z", "zkserver", true, "Zookeeper Server");
BK_OPTS.addOption("m", "zkledgerpath", true, "Zookeeper ledgers root path");
BK_OPTS.addOption("p", "bookieport", true, "bookie port exported");
BK_OPTS.addOption("j", "journal", true, "bookie journal directory");
Option indexDirs = new Option ("i", "indexdirs", true, "bookie index directories");
indexDirs.setArgs(10);
BK_OPTS.addOption(indexDirs);
Option ledgerDirs = new Option ("l", "ledgerdirs", true, "bookie ledgers directories");
ledgerDirs.setArgs(10);
BK_OPTS.addOption(ledgerDirs);
BK_OPTS.addOption("h", "help", false, "Print help message");
}

含义:

  • -c/–conf:使用的配置文件
  • -r/–readOnly:是否只读

构建bookie服务所需的组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throws Exception {
LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder().withName("bookie-server");

// 1. build stats provider
StatsProviderService statsProviderService =
new StatsProviderService(conf);
StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");

serverBuilder.addComponent(statsProviderService);
log.info("Load lifecycle component : {}", StatsProviderService.class.getName());

// 2. build bookie server
BookieService bookieService =
new BookieService(conf, rootStatsLogger);

serverBuilder.addComponent(bookieService);
log.info("Load lifecycle component : {}", BookieService.class.getName());

if (conf.getServerConf().isLocalScrubEnabled()) {
serverBuilder.addComponent(
new ScrubberService(
rootStatsLogger.scope(ScrubberStats.SCOPE),
conf, bookieService.getServer().getBookie().getLedgerStorage()));
}

// 3. build auto recovery
if (conf.getServerConf().isAutoRecoveryDaemonEnabled()) {
AutoRecoveryService autoRecoveryService =
new AutoRecoveryService(conf, rootStatsLogger.scope(REPLICATION_SCOPE));

serverBuilder.addComponent(autoRecoveryService);
log.info("Load lifecycle component : {}", AutoRecoveryService.class.getName());
}

// 4. build http service
if (conf.getServerConf().isHttpServerEnabled()) {
BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
.setBookieServer(bookieService.getServer())
.setServerConfiguration(conf.getServerConf())
.setStatsProvider(statsProviderService.getStatsProvider())
.build();
HttpService httpService =
new HttpService(provider, conf, rootStatsLogger);

serverBuilder.addComponent(httpService);
log.info("Load lifecycle component : {}", HttpService.class.getName());
}

// 5. build extra services
String[] extraComponents = conf.getServerConf().getExtraServerComponents();
if (null != extraComponents) {
try {
List<ServerLifecycleComponent> components = loadServerComponents(
extraComponents,
conf,
rootStatsLogger);
for (ServerLifecycleComponent component : components) {
serverBuilder.addComponent(component);
log.info("Load lifecycle component : {}", component.getClass().getName());
}
} catch (Exception e) {
if (conf.getServerConf().getIgnoreExtraServerComponentsStartupFailures()) {
log.info("Failed to load extra components '{}' - {}. Continuing without those components.",
StringUtils.join(extraComponents), e.getMessage());
} else {
throw e;
}
}
}

return serverBuilder.build();
}

利用第一步解析配置生成的BookieConfiguration对象构造bookie服务依赖的组件,bookie启动过程中需要启动的服务组件:

  • StatsProviderService 指标服务
  • BookieService bookie服务
  • AutoRecoveryService 自动恢复服务
  • HttpService http rest服务
  • 其它服务

利用了LifecycleComponentStack这各类保存了需要启动的组件并规定了组件生命周期内执行的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void start() {
components.forEach(component -> component.start());
}

@Override
public void stop() {
components.reverse().forEach(component -> component.stop());
}

@Override
public void close() {
components.reverse().forEach(component -> component.close());
}

有没有觉得很熟悉,tomcat源码里也有类似的设计,tomcat中的接口:org.apache.catalina.Lifecycle 定义了容器生命周期、容器状态转换及容器状态迁移事件的监听器和移除等主要接口,tomcat里的组件StandardHost等实现了这个接口,维护自身生命周期运转过程中要执行的逻辑。只能说优秀的源码套路都是差不多^_^

启动服务

遍历LifecycleComponentStack中的
ImmutableList components对象,执行各个服务组件的start方法

Bookeeper基本使用

初始化Bookeeper Client

1
2
3
4
5
6
7
8
9
10
11
12
13

private static final String ZK_ADDR = "127.0.0.1:2181";

try {
//初始化 BookKeeper Client 的方法

BookKeeper bkClient = new BookKeeper(ZK_ADDR);

} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(
"There is an exception throw while creating the BookKeeper client.");
}

创建ledger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
LedgerHandle longmaoHandler = createLedgerSync(bkClient, "longmaoHandler");

System.out.println("longmaoHandler ledgerId:" + longmaoHandler.getId());

public static LedgerHandle createLedgerSync(BookKeeper bkClient, String pw) {
byte[] password = pw.getBytes();
try {
LedgerHandle handle = bkClient.createLedger(BookKeeper.DigestType.MAC, password);
return handle;
} catch (BKException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

向Ledger写入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
//添加数据
long maomao = addEntry(longmaoHandler, "maomao");

public static long addEntry(LedgerHandle ledgerHandle, String msg) {
try {
return ledgerHandle.addEntry(msg.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return -1;
}

读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//读取数据
Enumeration<LedgerEntry> ledgerEntryEnumeration = readEntry(longmaoHandler);

System.out.println(ledgerEntryEnumeration);
while (ledgerEntryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = ledgerEntryEnumeration.nextElement();
System.out.println("读取数据");
System.out.println(new String(ledgerEntry.getEntry()));
}

public static Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle) {
try {
return ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return null;
}

java并发基础-01

java对象内存布局

首先要明确的是java对象大小必须是8的倍数,对象头占12字节

java 对象分为三部分:对象头(12字节)、实例数据、对齐填充(保证整个对象大小是8的倍数)

openJDK有个工具包,可以打印对象的内存布局:

1
2
3
4
5
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>

打印java对象内存布局:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Mao {

boolean flag;

int a;
}

public class TestMao {

public static void main(String[] args) {
Mao mao = new Mao();
System.out.println(ClassLayout.parseInstance(mao).toPrintable());
}
}

结果:

1
2
3
4
5
6
7
8
9
10
learn.Mao object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 43 c1 00 f8 (01000011 11000001 00000000 11111000) (-134168253)
12 4 int Mao.a 0
16 1 boolean Mao.flag false
17 7 (loss due to the next object alignment)
Instance size: 24 bytes
Space losses: 0 bytes internal + 7 bytes external = 7 bytes total

可以看到jvm为了保证对象大小是8的倍数,使用了7个字节来填充。

Unsafe 魔法类

R大的官方解释:Unsafe是用于在实质上扩展Java语言表达能力、便于在更高层(Java层)代码里实现原本要在更低层(C层)实现的核心库功能用的。这些功能包括裸内存的申请/释放/访问,低层硬件的atomic/volatile支持,创建未初始化对象等。它原本的设计就只应该被标准库使用。

Unsafe类不能被直接new出来使用,原因是其构造方法是私有的,Unsafe的初始化方法主要是通过getUnsafe方法的单例模式实现,在getUnsafe方法里限定了只有BootStrap classLoader 才能对其进行加载,否则抛出SecurityException

1
2
3
4
5
6
7
8
9
10
11
12
13
//构造方法
private Unsafe() {
}

private static final Unsafe theUnsafe;
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

使用Unsafe的方法,使用反射:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UnsafeInstance {

public static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

Unsafe是java并发包的基石。

Diamond架构解析

Diamond主要提供持久配置的发布和订阅服务,最大特点是结构简单,稳定可靠。主要的使用场景:TDDL使用Diamond动态切换数据库,动态扩容等;业务使用Diamond推送系统开关配置。Diamond产品专注于高可用性,基于此在架构、容灾机制、数据获取模型上有一些与同类产品的不同之处——阿里巴巴Diamond介绍

diamond架构图

无标题2.png

配置信息存放在哪里

无标题3.png

gourp_info表存组的信息,组的概念是用来区分不同环境的(奇怪的是,阿里开源出来的Diamond并未使用到这个表~~)
config_info表用来存放具体的配置信息

数据保存逻辑

diamond3.png

对应的AdminController的postConfig方法,保存顺序:数据库->更新md5cache->本地磁盘->通知其他节点更新;

md5cache是用ConcurrentHashMap存储的,key是group/dataId,
value是content的md5字符串,MD5类是单例的,对数据进行md5的算法加了锁。客户端来请求配置信息(带着group和dataId),服务端会查看内存中的md5cache是否有对应的配置信息(内存中取值,速度快)

通知其他节点更新的方式:首先获取node.properties保存的地址,然后通过http方式发送请求。
其它节点从数据库获取最新配置信息然后保存到本机磁盘上

客户端和服务端如何交互

diamond4.png
diamond5.png

  • 利用工厂类DiamondClientFactory创建单例订阅者类。
  • 将客户端创建的侦听器类添加到侦听器管理list中并注入到新创建的订阅者类中。
  • 为订阅者设置dataId和groupId。
  • 启动订阅者线程,开始轮询消息。

配置变更客户端如何感知

diamond6.png

  • 方法内部启动一个定时线程,客户端第一次启动60秒后执行一次获取最新配置信息,后续默认每隔15秒执行一次
  • 方法内部实际上三个主方法分别是:

    checkLocalConfigInfo:主要是检查本地数据是否有更新,如果没有则返回,有则返回最新数据,并通知客户端配置的listener。

    checkDiamondServerConfigInfo:远程调用服务端,获取最新修改的配置数据并通知客户端listener。

    checkSnapshot:主要是持久化数据信息用的方法。

Diamond缺陷及改进思路

  • 界面过于简单、不够美观
  • 权限控制不够精细
  • 没有灰度发布功能
  • 配置信息变更无法及时生效

h2-insert语句流程分析

1
INSERT INTO TEST(ID, NAME) VALUES(3000, 'aaa');

解析得到Insert命令

1
2
3
4
5
6
7
8
9
10
org.h2.jdbc.JdbcStatement#execute(java.lang.String)
->org.h2.jdbc.JdbcStatement#executeInternal
->org.h2.jdbc.JdbcConnection#prepareCommand(java.lang.String, int)
->org.h2.engine.Session#prepareCommand
->org.h2.engine.Session#prepareLocal
->org.h2.command.Parser#prepareCommand
->org.h2.command.Parser#parse(java.lang.String)
->org.h2.command.Parser#parse(java.lang.String, boolean)
->org.h2.command.Parser#parsePrepared
->org.h2.command.Parser#parseInsert

关键步骤是进入 org.h2.command.Parser#parseInsert 新建了一个Insert命令

  • 校验insert的表是否存在

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private Table readTableOrView(String tableName) {
    if (schemaName != null) {
    Table table = getSchema().resolveTableOrView(session, tableName);
    if (table != null) {
    return table;
    }
    } else {
    //首先查看 PUBLIC Schema里的tablesAndViews 是否存在key为tableName
    Table table = database.getSchema(session.getCurrentSchemaName())
    .resolveTableOrView(session, tableName);
    if (table != null) {
    return table;
    }
    //遍历该session对应Schema 中tablesAndViews 是否存在key为tableName
    String[] schemaNames = session.getSchemaSearchPath();
    if (schemaNames != null) {
    for (String name : schemaNames) {
    Schema s = database.getSchema(name);
    table = s.resolveTableOrView(session, tableName);
    if (table != null) {
    return table;
    }
    }
    }
    }
    if (isDualTable(tableName)) {
    return new DualTable(database);
    }
    //不存在则抛异常 不能insert一个数据库不存在的表吧^_^
    throw DbException.get(ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1, tableName);
    }

h2-create语句流程分析

1
stat.execute("create table test(id int primary key, name varchar(255))");

总流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//解析生成Command
CommandInterface command = conn.prepareCommand(sql, fetchSize);
boolean lazy = false;
boolean returnsResultSet;
synchronized (session) {
setExecutingStatement(command);
try {
if (command.isQuery()) {
returnsResultSet = true;
boolean scrollable = resultSetType != ResultSet.TYPE_FORWARD_ONLY;
boolean updatable = resultSetConcurrency == ResultSet.CONCUR_UPDATABLE;
ResultInterface result = command.executeQuery(maxRows, scrollable);
lazy = result.isLazy();
resultSet = new JdbcResultSet(conn, this, command, result, id,
closedByResultSet, scrollable, updatable);
} else {
returnsResultSet = false;
//执行Command命令
ResultWithGeneratedKeys result = command.executeUpdate(
conn.scopeGeneratedKeys() ? null : generatedKeysRequest);
updateCount = result.getUpdateCount();
ResultInterface gk = result.getGeneratedKeys();
if (gk != null) {
generatedKeys = new JdbcResultSet(conn, this, command, gk, id,
false, true, false);
}
}
} finally {
if (!lazy) {
setExecutingStatement(null);
}
}
}

主要就是两步:

  • 解析sql语句生成Command
  • 执行Command命令

解析sql语句生成Command

1
2
3
4
5
6
org.h2.command.Parser#prepareCommand
->org.h2.command.Parser#parse(java.lang.String)
->org.h2.command.Parser#parsePrepared(这一步解析出是create语句)
->org.h2.command.Parser#parseCreate
->org.h2.command.Parser#parseCreateTable
->org.h2.command.Parser#parseTableColumnDefinition
  • org.h2.command.Parser#parseCreateTable这一步会新建一个CreateTable对象
1
2
3
4
5
6
7
8
9
//获取schema 构建创建sql语句
Schema schema = getSchema();
CreateTable command = new CreateTable(session, schema);
command.setPersistIndexes(persistIndexes);
command.setTemporary(temp);
command.setGlobalTemporary(globalTemp);
command.setIfNotExists(ifNotExists);
command.setTableName(tableName);
command.setComment(readCommentIf());

这个Schema就是创建DataBase对象时生成的PUBLIC Schema

  • org.h2.command.Parser#parseTableColumnDefinition这一步会解析出列-Column对象
1
2
3
4
5
6
7
8
9
10
11
12
13
Column column = parseColumnForTable(columnName, true, true);
if (column.isAutoIncrement() && column.isPrimaryKey()) {
column.setPrimaryKey(false);
IndexColumn[] cols = { new IndexColumn() };
cols[0].columnName = column.getName();
AlterTableAddConstraint pk = new AlterTableAddConstraint(
session, schema, false);
pk.setType(CommandInterface.ALTER_TABLE_ADD_CONSTRAINT_PRIMARY_KEY);
pk.setTableName(tableName);
pk.setIndexColumns(cols);
command.addConstraintCommand(pk);
}
command.addColumn(column);

这里最重要的是往 CreateTable 的columns 变量添加column。最终会返回一个CreateTable的Command

执行update命令

1
2
3
org.h2.command.Command#executeUpdate
->org.h2.command.CommandContainer#update
->org.h2.command.ddl.CreateTable#update
  • org.h2.command.CommandContainer#update这步会新建一个表对象-Table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
changePrimaryKeysToNotNull(data.columns);
data.id = getObjectId();
data.create = create;
data.session = session;
//根据schema创建table
Table table = getSchema().createTable(data);

ArrayList<Sequence> sequences = generateSequences(data.columns, data.temporary);
table.setComment(comment);
if (isSessionTemporary) {
if (onCommitDrop) {
table.setOnCommitDrop(true);
}
if (onCommitTruncate) {
table.setOnCommitTruncate(true);
}
session.addLocalTempTable(table);
} else {
db.lockMeta(session);
db.addSchemaObject(session, table);
}

其中db.addSchemaObject(session, table); 会往PUBLIC这个Schema的变量tablesAndViews(类型为ConcurrentHashMap)出入变量:

  • key: TEST
  • value: 对应的建表语句生成的Table对象

表对象有了,接下来的insert语句会去PUBLIC这个Schema根据key找对应表对象

浅析h2数据库存储引擎-mvStore

设置存储引擎

在1.4以前h2使用的存储引擎是pageStore,现在还保留在org.h2.pagestore下。要想使用旧存储引擎PageStore可以在jdbc链接后面设置:

1
jdbc:h2:~/test;MV_STORE=false

则会使用旧的存储引擎,默认是true,即使用mvStore。存储引擎初始化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//DataBase.java
/**
* 初始化存储引擎
*
* @return
*/

public PageStore getPageStore() {
//MV_STORE=false/true 默认true 表明存储引擎使用 MvStore
if (dbSettings.mvStore) {
if (store == null) {
//传dataBase对象进去 初始化 MVTableEngine.Store(包含 MvStore TransactionStore)
store = MVTableEngine.init(this);
}
return null;
}
//存储引擎使用 pageStore
synchronized (this) {
if (pageStore == null) {
pageStore = new PageStore(this, databaseName +
Constants.SUFFIX_PAGE_FILE, accessModeData, cacheSize);
if (pageSize != Constants.DEFAULT_PAGE_SIZE) {
pageStore.setPageSize(pageSize);
}
if (!readOnly && fileLockMethod == FileLockMethod.FS) {
pageStore.setLockFile(true);
}
pageStore.setLogMode(logMode);
pageStore.open();
}
return pageStore;
}
}

MVStore初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static Store init(final Database db) {
Store store = db.getStore();
if (store != null) {
return store;
}
byte[] key = db.getFileEncryptionKey();
String dbPath = db.getDatabasePath();
MVStore.Builder builder = new MVStore.Builder();
store = new Store();
boolean encrypted = false;
if (dbPath != null) {
String fileName = dbPath + Constants.SUFFIX_MV_FILE;
MVStoreTool.compactCleanUp(fileName);
builder.fileName(fileName);
builder.pageSplitSize(db.getPageSize());
if (db.isReadOnly()) {
builder.readOnly();
} else {
// possibly create the directory
boolean exists = FileUtils.exists(fileName);
if (exists && !FileUtils.canWrite(fileName)) {
// read only
} else {
String dir = FileUtils.getParent(fileName);
FileUtils.createDirectories(dir);
}
int autoCompactFillRate = db.getSettings().maxCompactCount;
if (autoCompactFillRate <= 100) {
builder.autoCompactFillRate(autoCompactFillRate);
}
}
if (key != null) {
encrypted = true;
builder.encryptionKey(decodePassword(key));
}
if (db.getSettings().compressData) {
builder.compress();
// use a larger page split size to improve the compression ratio
builder.pageSplitSize(64 * 1024);
}
builder.backgroundExceptionHandler(new UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
db.setBackgroundException(DbException.convert(e));
}

});
// always start without background thread first, and if necessary,
// it will be set up later, after db has been fully started,
// otherwise background thread would compete for store lock
// with maps opening procedure
builder.autoCommitDisabled();
}
store.open(db, builder, encrypted);
db.setStore(store);
return store;
}


/**
* Open the store for this database.
*
* @param db the database
* @param builder the builder
* @param encrypted whether the store is encrypted
*/

void open(Database db, MVStore.Builder builder, boolean encrypted) {
this.encrypted = encrypted;
try {
//初始化MvStore
this.mvStore = builder.open();
FileStore fs = mvStore.getFileStore();
if (fs != null) {
this.fileName = fs.getFileName();
}
if (!db.getSettings().reuseSpace) {
mvStore.setReuseSpace(false);
}
mvStore.setVersionsToKeep(0);
//初始化 TransactionStore
this.transactionStore = new TransactionStore(mvStore,
new ValueDataType(db, null), db.getLockTimeout());
} catch (IllegalStateException e) {
throw convertIllegalStateException(e);
}
}

首先创建MVStore.Builder对象,利用MVStore.Builder对象创建MVStore,MVStore.Builder可配置的参数:

  • autoCommitBufferSize
  • autoCompactFillRate
  • backgroundExceptionHandler
  • cacheSize
  • compress (LZF和Deflate)
  • encryptionKey
  • fileName
  • fileStore
  • pageSplitSize
  • readOnly

表与存储引擎的关系

一个数据库对应:

  • 一个MVStore实例
  • 一个TransactionStore实例
  • 一个preparedTransactions MVMap
  • 一个undoLog MVMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
MVStore(Map<String, Object> config) {
recoveryMode = config.containsKey("recoveryMode");
compressionLevel = DataUtils.getConfigParam(config, "compress", 0);
String fileName = (String) config.get("fileName");
FileStore fileStore = (FileStore) config.get("fileStore");
fileStoreIsProvided = fileStore != null;
if(fileStore == null && fileName != null) {
fileStore = new FileStore();
}
this.fileStore = fileStore;

int pgSplitSize = 48; // for "mem:" case it is # of keys
CacheLongKeyLIRS.Config cc = null;
if (this.fileStore != null) {
int mb = DataUtils.getConfigParam(config, "cacheSize", 16);
if (mb > 0) {
cc = new CacheLongKeyLIRS.Config();
cc.maxMemory = mb * 1024L * 1024L;
Object o = config.get("cacheConcurrency");
if (o != null) {
cc.segmentCount = (Integer)o;
}
}
pgSplitSize = 16 * 1024;
}
if (cc != null) {
cache = new CacheLongKeyLIRS<>(cc);
} else {
cache = null;
}

pgSplitSize = DataUtils.getConfigParam(config, "pageSplitSize", pgSplitSize);
// Make sure pages will fit into cache
if (cache != null && pgSplitSize > cache.getMaxItemSize()) {
pgSplitSize = (int)cache.getMaxItemSize();
}
pageSplitSize = pgSplitSize;
keysPerPage = DataUtils.getConfigParam(config, "keysPerPage", 48);
backgroundExceptionHandler =
(UncaughtExceptionHandler)config.get("backgroundExceptionHandler");
meta = new MVMap<>(this);
if (this.fileStore != null) {
retentionTime = this.fileStore.getDefaultRetentionTime();
// 19 KB memory is about 1 KB storage
int kb = Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024;
kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", kb);
autoCommitMemory = kb * 1024;
autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 90);
char[] encryptionKey = (char[]) config.get("encryptionKey");
try {
if (!fileStoreIsProvided) {
boolean readOnly = config.containsKey("readOnly");
//创建数据库文件 *.db
this.fileStore.open(fileName, readOnly, encryptionKey);
}
//往 *.db 写数据
if (this.fileStore.size() == 0) {
creationTime = getTimeAbsolute();
lastCommitTime = creationTime;
storeHeader.put(HDR_H, 2);
storeHeader.put(HDR_BLOCK_SIZE, BLOCK_SIZE);
storeHeader.put(HDR_FORMAT, FORMAT_WRITE);
storeHeader.put(HDR_CREATED, creationTime);
writeStoreHeader();
} else {
// there is no need to lock store here, since it is not opened yet,
// just to make some assertions happy, when they ensure single-threaded access
storeLock.lock();
try {
readStoreHeader();
} finally {
storeLock.unlock();
}
}
} catch (IllegalStateException e) {
panic(e);
} finally {
if (encryptionKey != null) {
Arrays.fill(encryptionKey, (char) 0);
}
}
lastCommitTime = getTimeSinceCreation();

scrubMetaMap();

// setAutoCommitDelay starts the thread, but only if
// the parameter is different from the old value
int delay = DataUtils.getConfigParam(config, "autoCommitDelay", 1000);
setAutoCommitDelay(delay);
} else {
autoCommitMemory = 0;
autoCompactFillRate = 0;
}
}

MVStore 初始化了7个MVMap,新建一个表对底层存储引擎来说其实就是新建一个MVMap,然后往这个MVMap插入Page,最终数据落到硬盘里的*.db文件里。

  • meta
    id:0
    name:meta

  • openTransactions
    id:1
    name: openTransactions

  • undoLog.1
    id:2
    name:undoLog.1

  • table.0
    id:3
    name:table.0

  • lobMap
    id:4
    name:lobMap

  • lobRef
    id:5
    name:lobRef

  • lobData
    id:6
    name:lobData