如何使用Apache BookKeeper构建分布式数据库-02

在本系列文章中,我想分享一些基本的架构概念,这些架构概念涉及无共享架构的分布式数据库的可能结构。在第一部分中,我们了解了如何将数据库设计为复制状态机。

我们有一簇机器,它们不共享磁盘,它们仅通过网络连接。我们有一个记录表,我们机器的状态就是表的内容。

对表的每个修改(包括INSERT,UPDATE和DELETE操作)都被写入到预写日志中,然后应用于数据的本地副本。

在任何时候,组中只有一台计算机被选为领导者,而客户端仅通过向该计算机发出请求来执行写入和读取操作,只有该节点才能修改表的内容。

其他机器是所谓的跟随者,它们不断跟踪日志:它们从日志中读取操作,然后将它们应用到自己的本地副本中,其顺序完全与领导者编写的顺序相同。

Apache BookKeeper

最初,Apache BookKeeper被设计为Hadoop HDFS Namenode的分布式预写日志,作为Apache Zookeeper项目的一部分,但很快它就开始了自己的独立产品生涯。

它具有支持复制状态机所需的大多数功能,我们感兴趣的主要功能是:

分散式架构:所有逻辑都在富客户端模型上运行,BookKeeper服务器只是数据容器,这使我们可以完全扩展。
不共享存储模式:客户端仅使用网络,不共享磁盘;服务器彼此之间不认识。
支持防护:BookKeeper保证只有一台机器能够写入日志。
最后添加的已确认协议:BookKeeper允许读者始终遵循该日志。
在丢失的存储节点上自动重新复制数据:在丢失机器和网络分区的情况下进行自我修复。
让我们浏览所有这些关键功能,我们将看到我们的数据库如何能够应对分布式系统的所有挑战。

avatar

Rich client model

Leader节点运行BookKeeper编写器(WriteHandle),并创建一个ledger。 ledger是我们日志的只写部分。 可以将其打开以仅写一次,并且可以根据需要读取多次。 一旦写入者将其关闭或死亡,就无法将更多数据追加到ledger中。

在创建时,您需要设置有关复制的三个参数:

  • Ensemble size (ES):将存储ledger的数量
  • Write quorum size (WQ):每个entry的份数
  • Ack quorum size (AQ):在认为写入成功之前要确认的所需副本数

例如,如果每个条目的ES = 3,WQ = 2和AQ = 1,BookKeeper将:

  • 传播副本超过3 bookies
  • 为每个entry写2个副本
  • 仅等待1确认,以声明要写入的entry是成功的

如果您的bookies很少(例如3),建议您从2–2–2开始,这是一个很好的折衷,可以保证每个entry至少有两个副本。
使ES> WQ被称为条带化,这有助于提高性能,因为写入和读取被分散到更多的客户。

writer对bookie故障做出反应,并选择新bookies用作存储,这种机制称为集成更改,如果您有足够的bookie,这对于应用程序是完全透明的,并且您不必在意这种情况。

在每次整体更改时,我们都会开始ledger的新部分,reader可以观察ledger元数据的变化,并能够自动连接到新的bookies。

Fencing

在我们的复制状态机模型中,只有一个领导者可以执行对表状态的更改。 它的领导角色必须得到其他所有人的支持,例如,您可以在ZooKeeper中使用一些领导者选举秘诀,但这不足以保证数据的整体一致性。

从理论上讲,您应该实现某种低级别的分布式共识协议(例如ZooKeeper中的ZAB),但是这会过分杀人,它确实很慢。

BookKeeper解决了这些问题。

当节点开始充当领导者时,它会对应该由前一领导者打开的每个ledger执行“恢复读取”。

此操作将连接到包含该ledger数据的每个Bookie,并将每个ledger标记为已隔离,并且如果前一个leader仍活着,则它将在下一次写操作中收到一个特定的写错误,表明他已被隔离。

BookKeeper处理各种极端情况,例如恢复期间的网络错误或多个并发恢复操作。

您可以确保只有一台计算机可以成功恢复,然后它可以开始对数据库状态进行新的更改。

但是BookKeeper只处理ledger,您必须将建立您的预写日志的ledger列表存储在某处。 此辅助元数据存储也必须处理某种类型的防护。

一种选择是将该列表存储在ZooKeeper上,并利用其内置的分布式比较和设置功能来处理要向活动ledger列表添加新ledger的并发领导者。 请参阅BookKeeper教程,以获取有关如何处理此故事部分的示例。

BookKeeper提供了一个更高级别的API,DistributedLog,它为您完成了这一部分,并为BookKeeper低级别API添加了许多内置功能。

Last add confirmed protocol

现在,每个节点仅使用Zookeeper和BookKeeper以便与其他对等方进行通信。 让我们看看关注者如何知道领导者正在取得进展,现在该寻找新entry了。

每次writer将entry添加到ledger时,它也会写出最大的条目ID,该ID已由bookies确认成功存储,我们将此ID称为“Last-Add-Confirmed entry”(LAC)。

通常,writer在发送写内容方面比在书本上保持并发送确认消息的速度要快于书本,因此即使writer不认为这些内容是持久性的,entry也可能可供reader使用。

这是非常危险的,因为以这种方式,跟随者节点可能具有将来仍未被领导者接受的数据视图。

为了解决这种情况,BookKeeper readers只能读取直到LAC的entry。 您可以确保reader始终落后于writer。

读者通过读取ledger的元数据并询问与ledger关联的bookies,可以在读取过程中获得此LAC。

avatar

让我们看一下BookKeeper新用户遇到的一个通常棘手的情况:ledger将entryX和X-1写为LAC,因此读者最多只能看到X-1,如果没有其他条目被写入,则追随者将无法选出最新的leader。 这在生产中并不是真正的问题,尤其是在重负载下。但尤其是第一次使用BookKeeper时,很难理解。

您必须设法解决此问题:

  • 如果您没有写任何东西,请定期写一个虚拟entry
  • 使用ExplicitLAC功能,这基本上是从常规搭载机制中存储辅助LAC指针

有许多方法可以绕过LAC协议,但是本博客未介绍它们,因为它们在我们的用例中没有用。

Close a ledger

BookKeeper已针对高吞吐量进行了优化,但最重要的功能是关于一致性保证,而这主要是关于使用ZooKeeper和内置防护机制的元数据管理。

一个关键点是必须定义对reader可见的实际有效条目系列,尤其是最后一个条目的ID。

由于写入可能会失败,网络也可能会失败,因此有多种机制在起作用,但最后,写入或恢复过程将密封此有效条目ID的范围。

我们称此操作为“关闭”ledger,基本上是关于将ledger的最终状态写入Zookeeper,而这种情况是在“关闭” WriteHandle时发生的。 关闭ledger后,reader可以读取最后写入的entry。

Replication in case of lost bookies

当您丢失一bookie时,BookKeeper能够通过检测故障来强制执行原始的复制因子(写入仲裁大小),并再次复制应该存储在失效的bookie上的数据。

可以使用BookKeeper工具手动完成此操作,但是也可以由自动恢复守护程序执行。

总结

将BookKeeper用作分布式预写日志是一个不错的选择,因为它可以处理分布式系统的许多方面。 如果要编写自己的日志,只有在为时已晚时,您才会陷入很多困境。

因此,BookKeeper从一开始就被设计为一个高性能的存储系统,它具有许多有关本地磁盘存储管理,网络使用和JVM性能的改进和技巧。

在本系列的下一部分中,您将学习HerdDB如何将Apache BookKeeper用作预写日志并填补该故事的空白:如何存储本地数据,协调副本并执行检查点。

参考资料

https://streamnative.io/blog/tech/2020-04-14-distributed-database-bk2/

如何使用Apache BookKeeper构建分布式数据库-01

初识HerdDB

HerdDB是一个分布式数据库,数据在服务器群集之间分布,而无需共享存储。

HerdDB的主要语言是SQL,建议客户端同时使用JDBC驱动程序API和低级API。

HerdDB可嵌入到任何Java虚拟机中,每个节点无需网络即可访问本地数据。

HerdDB复制功能建立在Apache ZooKeeper和Apache BookKeeper项目上。

HerdDB与NoSQL数据库非常相似,实际上在低级上,它基本上是具有SQL抽象层的键值数据库,使每个用户都可以利用现有的专有技术并将现有的应用程序移植到HerdDB。

HerdDB设计用于快速“写入”和主键读取/更新数据访问模式。

HerdDB支持事务和“提交读取”隔离级别

HerdDB使用Apache Calcite作为SQL解析器和SQL Planner

基本概念

数据与任何SQL数据库一样,都以表进行组织,并且为了利用HerdDB复制功能,表在表空间内进行分组。

表空间是表的逻辑集合,是建立复制的基石。

有些数据库功能仅在相同表空间的表之间可用:

事务可能只跨越相同表空间的表 子查询只能跨越相同表空间的表 复制是在表空间级别配置的,因此对于每个表空间,只有一个服务器被设计为“领导者”(管理者),然后您可以配置一组“副本”。 系统自动在副本之间复制数据并透明地处理服务器故障。

系统总览

让我们从一个高层次的观点出发,我们要构建什么以及我们需要什么特性。

  • 我们需要一个数据库,一个持久保存数据的存储,并且可以从远程客户端访问它
  • 我们将数据存储在机器集群中
  • 我们的机器不共享磁盘或使用共享的安装,仅网络连接(LAN或WAN)
  • 机器可能会发生故障,磁盘随时可能丢失,但是我们希望该服务对客户端可用,直到它们的一部分启动并运行
  • 我们希望能够在不中断服务的情况下添加和删除计算机
  • 我们希望完全控制一致性

这个列表听起来很普通,有几种设计具有这种功能的系统的方法。

为了使其更加具体,让我们创建一个具体方案:

  • 我们有一个带有一个表的SQL数据库
  • 数据通过N台计算机复制
  • 没有共享磁盘,只有服务器之间以及服务器和客户端之间的网络连接
  • 我们采用复制状态机的架构模式

预写日志

为了支持ACID(原子性,一致性,隔离性,耐久性)语义,数据库使用预写日志记录。

假设数据库将表的副本存储在本地易失性存储器(RAM)中。 当客户端请求写操作(如UPDATE操作)时,数据库会将操作“记录”到持久性存储中,并将记录的新值写入日志(WAL)。

当存储确认写入(fsync)时,更改将应用于表的内存副本,然后存储将结果确认给客户端。 一旦我们更新了内存中的副本,其他客户端便能够读取新值。预写日志流和表内容:

avatar

在上面的示例中,表开始为空,然后我们执行第一个写操作INSERT(record1),该操作在LSN(日志序列号)1处发生。我们的表现在包含record1。 然后我们在LSN2上记录另一个修改,即INSERT(record2),现在该表包含record1和record2,然后在LSN3上记录DELETE(record1),该表仅包含record2。

当服务器重新启动时,它执行恢复操作,读取所有日志并重建表的内容,因此最终导致表中仅包含record2。

如果在日志上有一个值,我们确定它不会丢失,并且任何在重新启动事件之前读取该值的客户端都可以再次读取相同的值。

如果我们在写入日志之前应用了内存中的更改,则不会发生这种情况。

请注意,只有更改表内容的操作才写入预写日志:我们不记录读取。

检查点

你始终可以从日志中重建表的内容,但不能存储无限的日志,因为在某个时间点该日志被截断了。 为了释放空间,此操作通常称为检查点。

当数据库执行检查点时,它将在持久存储中刷新给定LSN上表的内容。一个检查点发生在LSN3:

avatar

现在我们已经持久地将表保留在LSN3上,我们可以节省资源并将部分日志从LSN1删除到LSN3。 因此,当服务器执行恢复时,它只需要重播LSN4,这反过来又允许更快的启动顺序。

在检查点期间,表的内容存储在哪里? 您可以有几种策略,例如,可以将内容存储在某些本地磁盘上(记住要使用fsync)。 但是,如果相对于对WAL的写入次数而言,表的内容确实很小(因此您在同一组记录上有很多更改),则可以考虑将表的内容转储到WAL本身。

复制状态机

复制状态机是一个实体,在这种情况下为表,即在给定时间(日志序列号)处于给定状态(表的内容)的状态,并且在一组状态下状态更改的顺序相同 相互连接的机器,因此最终每台机器都保持相同的状态。

每当您在计算机上更改一条记录时,都必须在其他所有副本上应用相同的更改。

我们需要对计算机状态进行更改的总顺序,而我们的预写日志非常适合此目的。每个节点都有一个表的副本,并且WAL是共享的:

avatar

在我们的体系结构中,只有一个节点能够更改系统状态,即更改表的内容:我们称其为领导者。

每个节点在内存中都有整个表的副本。发生写操作时,会将其写到WAL,然后使其对客户端可见以进行读取。

其他非领导者节点(跟随者)跟踪日志,它连续地从日志中读取更改,其更改顺序与领导者发布的顺序完全相同。

追随者会将所有更改应用于自己的本地副本,这样,他们将看到表的相同历史记录。

同样重要的是,只有在WAL确认相同的更改之后,追随者才能应用每个更改,否则,追随者将在领导者的将来。

Apache BookKeeper是我们需要的预写日志:它是持久且分布式的。它不需要共享磁盘或远程存储,不需要保证所有项目的顺序,也不需要防护。在下一篇文章中,我将向您展示Apache Bookkeeper如何保证满足我们的需求。

资料

https://streamnative.io/blog/tech/2020-02-04-how-to-build-database/

HikariCP为啥这么快

数据库连接池原理

在系统初始化的时候,在内存中开辟一片空间,将一定数量的数据库连接作为对象存储在对象池里,并对外提供数据库连接的获取和归还方法。用户访问数据库,并不是建立一个新的连接,而是从数据库连接池中取出一个已有的空闲连接对象;使用完毕归还后的连接也不会马上关闭,而是由数据库连接池统一管理回收,为下一次借用做好准备。如果由于高并发请求导致数据库连接池的连接被借用完毕,其它线程就会等待,直到有连接被归还。整个过程中,连接并不会关闭,而是源源不断地循环使用,有借有还。

常见数据库连接池

  • C3P0:实现jdbc3和jdbc2扩展规范说明的Connection 和Statement 池的DataSources 对象

  • DBCP: Apache下独立的数据库连接池组件,由于Apache的缘故,它可能是使用最多的开源数据库连接池

  • BoneCP: 在c3p0和DBCP存在的时代,BoneCP的出现就是为了追求极致,并且提供了完善的基准测试

  • Druid: 阿里出品,是阿里巴巴唯一使用的数据库连接池,阿里云DRDS和阿里TDDL都采用了Druid,可支持”双十一”等最严苛的使用场景,并且提供了强大的监控功能,在国内有不少用户。

  • HikariCP: HiKariCP是数据库连接池的一个后起之秀,号称性能最好,可以完美地PK掉其他连接池,Springboot 2.0选择HikariCP作为默认数据库连接池

有一个争论是HikariCP与Druid相比哪个更好,对此Druid作者温少是直接上场对过线的,感兴趣的可以参考:

https://github.com/brettwooldridge/HikariCP/issues/232

avatar

HikariCP为什么这么快

在HikariCP官网(https://github.com/brettwooldridge/HikariCP/wiki/Down-the-Rabbit-Hole)详细介绍了HikariCP所做的优化:

  • 优化并精简字节码、优化代码和拦截器
  • 使用FastList替代ArrayList
  • 有更好的并发集合类实现ConcurrentBag
  • 其它针对BoneCP缺陷的优化,比如对耗时超过一个CPU时间片的方法调用的研究

接下来将探究FastList和ConcurrentBag的实现

FastList

HikariCP重现设计了一个List接口实现类,用以替换ArrayList。FastList是List接口的精简实现,只实现了接口中必要的几个方法。

jdk中的ArrayList:

1
2
public class ArrayList<E> extends AbstractList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable

HikariCP中的FastList:

1
public final class FastList<T> implements List<T>, RandomAccess, Serializable

可以看到FastList并没有继承AbstractList

ArrayList的get方法:

1
2
3
4
5
6
7
8
9
10
public E get(int index) {
rangeCheck(index);

return elementData(index);
}

private void rangeCheck(int index) {
if (index >= size)
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

FastList的get方法:

1
2
3
public T get(int index)  {
return elementData[index];
}

可以看出FastList的get方法取消了rangeCheck,在一定程度上追求了极致。

ArrayList的remove(Object)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean remove(Object o) {
if (o == null) {
for (int index = 0; index < size; index++)//从头到尾遍历
if (elementData[index] == null) {
fastRemove(index);//从头到尾移除
return true;
}
} else {
for (int index = 0; index < size; index++)
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
return false;
}

与ArrayList相反,FastList选择从数组的尾部开始遍历(JDBC编程中的常见模式是在使用后立即关闭Statement,或者以打开的相反顺序关闭Statement,可以理解为同一个Connection创建了多个Statement时,后打开的Statement会先关闭),因而更加高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean remove(Object element) {
for (int index = size - 1; index >= 0; index--) {//从尾部遍历
if (element == elementData[index]) {
final int numMoved = size - index - 1;
if (numMoved > 0) {
System.arraycopy(elementData, index + 1, elementData, index, numMoved);
}
elementData[--size] = null;
return true;
}
}

return false;
}

ConcurrentBag

参考资料

HikariCP数据库连接池实战

h2database初始化流程

载入数据库驱动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Class.forName("org.h2.Driver");


//org.h2.Driver
private static final Driver INSTANCE = new Driver();

public static synchronized Driver load() {
try {
if (!registered) {
registered = true;
DriverManager.registerDriver(INSTANCE);
}
} catch (SQLException e) {
DbException.traceThrowable(e);
}
return INSTANCE;
}

获取 session(数据库会话/连接)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Connection conn = DriverManager.getConnection("jdbc:h2:~/test");

//org.h2.Driver#connect
public Connection connect(String url, Properties info) throws SQLException {
try {
if (info == null) {
info = new Properties();
}
if (!acceptsURL(url)) {
return null;
}
if (url.equals(DEFAULT_URL)) {
return DEFAULT_CONNECTION.get();
}
Connection c = DbUpgrade.connectOrUpgrade(url, info);
if (c != null) {
return c;
}
return new JdbcConnection(url, info);
} catch (Exception e) {
throw DbException.toSQLException(e);
}
}

创建session(数据库连接):初始化SessionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
打开一个新的(远程或嵌入式)会话。
//org.h2.engine.SessionRemote#connectEmbeddedOrServer
public SessionInterface connectEmbeddedOrServer(boolean openNew) {
ConnectionInfo ci = connectionInfo;
if (ci.isRemote()) {
connectServer(ci);
return this;
}
// create the session using reflection,
// so that the JDBC layer can be compiled without it
boolean autoServerMode = ci.getProperty("AUTO_SERVER", false);
ConnectionInfo backup = null;
try {
if (autoServerMode) {
backup = ci.clone();
connectionInfo = ci.clone();
}
if (openNew) {
ci.setProperty("OPEN_NEW", "true");
}
if (sessionFactory == null) {
sessionFactory = (SessionFactory) Class.forName(
"org.h2.engine.Engine").getMethod("getInstance").invoke(null);
}
return sessionFactory.createSession(ci);
} catch (Exception re) {
DbException e = DbException.convert(re);
if (e.getErrorCode() == ErrorCode.DATABASE_ALREADY_OPEN_1) {
if (autoServerMode) {
String serverKey = ((JdbcException) e.getSQLException()).getSQL();
if (serverKey != null) {
backup.setServerKey(serverKey);
// OPEN_NEW must be removed now, otherwise
// opening a session with AUTO_SERVER fails
// if another connection is already open
backup.removeProperty("OPEN_NEW", null);
connectServer(backup);
return this;
}
}
}
throw e;
}
}

首先要设置 sessionFactory,利用Class.forName() 装载 org.h2.engine.Engine类并且对其实例化,生成了一个单例的 org.h2.engine.Engine对象赋值给sessionFactory

创建session(数据库会话/连接)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//org.h2.engine.Engine#openSession(org.h2.engine.ConnectionInfo)
private synchronized Session openSession(ConnectionInfo ci) {
boolean ifExists = ci.removeProperty("IFEXISTS", false);
boolean forbidCreation = ci.removeProperty("FORBID_CREATION", false);
boolean ignoreUnknownSetting = ci.removeProperty(
"IGNORE_UNKNOWN_SETTINGS", false);
String cipher = ci.removeProperty("CIPHER", null);
String init = ci.removeProperty("INIT", null);
Session session;
long start = System.nanoTime();
for (;;) {
session = openSession(ci, ifExists, forbidCreation, cipher);
if (session != null) {
break;
}
// we found a database that is currently closing
// wait a bit to avoid a busy loop (the method is synchronized)
if (System.nanoTime() - start > 60_000_000_000L) {
// retry at most 1 minute
throw DbException.get(ErrorCode.DATABASE_ALREADY_OPEN_1,
"Waited for database closing longer than 1 minute");
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw DbException.get(ErrorCode.DATABASE_CALLED_AT_SHUTDOWN);
}
}
synchronized (session) {
session.setAllowLiterals(true);
DbSettings defaultSettings = DbSettings.getDefaultSettings();
for (String setting : ci.getKeys()) {
if (defaultSettings.containsKey(setting)) {
// database setting are only used when opening the database
continue;
}
String value = ci.getProperty(setting);
if (!ParserUtil.isSimpleIdentifier(setting, false, false)) {
throw DbException.get(ErrorCode.UNSUPPORTED_SETTING_1, setting);
}
try {
CommandInterface command = session.prepareCommand(
"SET " + setting + ' ' + value,
Integer.MAX_VALUE);
command.executeUpdate(null);
} catch (DbException e) {
if (e.getErrorCode() == ErrorCode.ADMIN_RIGHTS_REQUIRED) {
session.getTrace().error(e, "admin rights required; user: \"" +
ci.getUserName() + "\"");
} else {
session.getTrace().error(e, "");
}
if (!ignoreUnknownSetting) {
session.close();
throw e;
}
}
}
if (init != null) {
try {
CommandInterface command = session.prepareCommand(init,
Integer.MAX_VALUE);
command.executeUpdate(null);
} catch (DbException e) {
if (!ignoreUnknownSetting) {
session.close();
throw e;
}
}
}
session.setAllowLiterals(false);
session.commit(true);
}
return session;
}


private Session openSession(ConnectionInfo ci, boolean ifExists, boolean forbidCreation, String cipher) {
String name = ci.getName();
Database database;
ci.removeProperty("NO_UPGRADE", false);
boolean openNew = ci.getProperty("OPEN_NEW", false);
boolean opened = false;
User user = null;
synchronized (DATABASES) {
if (openNew || ci.isUnnamedInMemory()) {
database = null;
} else {
database = DATABASES.get(name);
}
if (database == null) {
String p = ci.getProperty("MV_STORE");
boolean exists = p == null ? Database.exists(name)
: Database.exists(name, Utils.parseBoolean(p, true, false));
if (!exists) {
if (ifExists) {
throw DbException.get(ErrorCode.DATABASE_NOT_FOUND_WITH_IF_EXISTS_1, name);
}
if (forbidCreation) {
throw DbException.get(ErrorCode.REMOTE_DATABASE_NOT_FOUND_1, name);
}
}
//使用ConnectionInfo创建Database
database = new Database(ci, cipher);
opened = true;
if (database.getAllUsers().isEmpty()) {
// users is the last thing we add, so if no user is around,
// the database is new (or not initialized correctly)
user = new User(database, database.allocateObjectId(),
ci.getUserName(), false);
user.setAdmin(true);
user.setUserPasswordHash(ci.getUserPasswordHash());
database.setMasterUser(user);
}
if (!ci.isUnnamedInMemory()) {
DATABASES.put(name, database);
}
}
}
if (opened) {
// start the thread when already synchronizing on the database
// otherwise a deadlock can occur when the writer thread
// opens a new database (as in recovery testing)
database.opened();
}
if (database.isClosing()) {
return null;
}
if (user == null) {
if (database.validateFilePasswordHash(cipher, ci.getFilePasswordHash())) {
if (ci.getProperty("AUTHREALM")== null) {
user = database.findUser(ci.getUserName());
if (user != null) {
if (!user.validateUserPasswordHash(ci.getUserPasswordHash())) {
user = null;
}
}
} else {
Authenticator authenticator = database.getAuthenticator();
if (authenticator==null) {
throw DbException.get(ErrorCode.AUTHENTICATOR_NOT_AVAILABLE, name);
} else {
try {
AuthenticationInfo authenticationInfo=new AuthenticationInfo(ci);
user = database.getAuthenticator().authenticate(authenticationInfo, database);
} catch (AuthenticationException authenticationError) {
database.getTrace(Trace.DATABASE).error(authenticationError,
"an error occurred during authentication; user: \"" +
ci.getUserName() + "\"");
}
}
}
}
if (opened && (user == null || !user.isAdmin())) {
// reset - because the user is not an admin, and has no
// right to listen to exceptions
database.setEventListener(null);
}
}
if (user == null) {
DbException er = DbException.get(ErrorCode.WRONG_USER_OR_PASSWORD);
database.getTrace(Trace.DATABASE).error(er, "wrong user or password; user: \"" +
ci.getUserName() + "\"");
database.removeSession(null);
throw er;
}
//Prevent to set _PASSWORD
ci.cleanAuthenticationInfo();
checkClustering(ci, database);

//创建Session
Session session = database.createSession(user, ci.getNetworkConnectionInfo());
if (session == null) {
// concurrently closing
return null;
}
if (ci.getProperty("JMX", false)) {
try {
Utils.callStaticMethod(
"org.h2.jmx.DatabaseInfo.registerMBean", ci, database);
} catch (Exception e) {
database.removeSession(session);
throw DbException.get(ErrorCode.FEATURE_NOT_SUPPORTED_1, e, "JMX");
}
jmx = true;
}
return session;
}

这里很关键的一步是使用ConnectionInfo创建出一个Database,利用这个Database对象创建Session。创建Database过程中会创建系统表

创建数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//使用ConnectionInfo创建Database
database = new Database(ci, cipher);

public Database(ConnectionInfo ci, String cipher) {
if (ASSERT) {
META_LOCK_DEBUGGING.set(null);
META_LOCK_DEBUGGING_DB.set(null);
META_LOCK_DEBUGGING_STACK.set(null);
}
String name = ci.getName();
this.dbSettings = ci.getDbSettings();
this.compareMode = CompareMode.getInstance(null, 0);
this.persistent = ci.isPersistent();
this.filePasswordHash = ci.getFilePasswordHash();
this.fileEncryptionKey = ci.getFileEncryptionKey();
this.databaseName = name;
this.databaseShortName = parseDatabaseShortName();
this.maxLengthInplaceLob = Constants.DEFAULT_MAX_LENGTH_INPLACE_LOB;
this.cipher = cipher;
this.accessModeData = StringUtils.toLowerEnglish(
ci.getProperty("ACCESS_MODE_DATA", "rw"));
this.autoServerMode = ci.getProperty("AUTO_SERVER", false);
this.autoServerPort = ci.getProperty("AUTO_SERVER_PORT", 0);
int defaultCacheSize = Utils.scaleForAvailableMemory(
Constants.CACHE_SIZE_DEFAULT);
this.cacheSize =
ci.getProperty("CACHE_SIZE", defaultCacheSize);
this.pageSize = ci.getProperty("PAGE_SIZE",
Constants.DEFAULT_PAGE_SIZE);
if ("r".equals(accessModeData)) {
readOnly = true;
}
String lockMethodName = ci.getProperty("FILE_LOCK", null);
if (dbSettings.mvStore && lockMethodName == null) {
fileLockMethod = autoServerMode ? FileLockMethod.FILE : FileLockMethod.FS;
} else {
fileLockMethod = FileLock.getFileLockMethod(lockMethodName);
}
this.databaseURL = ci.getURL();
String listener = ci.removeProperty("DATABASE_EVENT_LISTENER", null);
if (listener != null) {
listener = StringUtils.trim(listener, true, true, "'");
setEventListenerClass(listener);
}
String modeName = ci.removeProperty("MODE", null);
if (modeName != null) {
mode = Mode.getInstance(modeName);
if (mode == null) {
throw DbException.get(ErrorCode.UNKNOWN_MODE_1, modeName);
}
}
this.logMode =
ci.getProperty("LOG", PageStore.LOG_MODE_SYNC);
this.javaObjectSerializerName =
ci.getProperty("JAVA_OBJECT_SERIALIZER", null);
this.allowBuiltinAliasOverride =
ci.getProperty("BUILTIN_ALIAS_OVERRIDE", false);
boolean closeAtVmShutdown =
dbSettings.dbCloseOnExit;
int traceLevelFile =
ci.getIntProperty(SetTypes.TRACE_LEVEL_FILE,
TraceSystem.DEFAULT_TRACE_LEVEL_FILE);
int traceLevelSystemOut =
ci.getIntProperty(SetTypes.TRACE_LEVEL_SYSTEM_OUT,
TraceSystem.DEFAULT_TRACE_LEVEL_SYSTEM_OUT);
this.cacheType = StringUtils.toUpperEnglish(
ci.removeProperty("CACHE_TYPE", Constants.CACHE_TYPE_DEFAULT));
this.ignoreCatalogs = ci.getProperty("IGNORE_CATALOGS",
dbSettings.ignoreCatalogs);
openDatabase(traceLevelFile, traceLevelSystemOut, closeAtVmShutdown, ci);
}

第一步先设置一些参数,第二步去创建数据库需要的对象。一些重要对象:

  • User: DBA

    1
    systemUser = new User(this, 0, SYSTEM_USER_NAME, true);
  • Schema: PUBLIC

    1
    2
    mainSchema = new Schema(this, Constants.MAIN_SCHEMA_ID, sysIdentifier(Constants.SCHEMA_MAIN), systemUser,
    true);
  • Schema:INFORMATION_SCHEMA

    1
    2
    infoSchema = new Schema(this, Constants.INFORMATION_SCHEMA_ID, sysIdentifier("INFORMATION_SCHEMA"), systemUser,
    true);
  • Session:systemSession lobSession

    1
    2
    systemSession = new Session(this, systemUser, ++nextSessionId);
    lobSession = new Session(this, systemUser, ++nextSessionId);
  • Table: SYS

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    //创建Table:SYS
    CreateTableData data = new CreateTableData();
    ArrayList<Column> cols = data.columns;
    Column columnId = new Column("ID", Value.INT);
    columnId.setNullable(false);
    cols.add(columnId);
    cols.add(new Column("HEAD", Value.INT));
    cols.add(new Column("TYPE", Value.INT));
    cols.add(new Column("SQL", Value.STRING));
    boolean create = true;
    if (pageStore != null) {
    create = pageStore.isNew();
    }
    data.tableName = "SYS";
    data.id = 0;
    data.temporary = false;
    data.persistData = persistent;
    data.persistIndexes = persistent;
    data.create = create;
    data.isHidden = true;
    data.session = systemSession;
    starting = true;
    meta = mainSchema.createTable(data);

h2架构解析

介绍

H2在基于B树的磁盘存储之上实现了一个嵌入式且独立于ANSI-SQL89的SQL引擎。

截至2013年10月,Thomas仍在开发名为MVStore的下一代存储引擎。 这将及时替换基于B树的存储引擎。

自顶向上概述

从上至下工作,各层如下所示:

  • JDBC driver.
  • Connection/session management.
  • SQL Parser.
  • Command execution and planning.
  • Table/Index/Constraints.
  • Undo log, redo log, and transactions layer.
  • B-tree engine and page-based storage allocation.
  • Filesystem abstraction.

JDBC driver

JDBC驱动程序实现位于org.h2.jdbc,org.h2.jdbcx中

Connection/session management

主要关注的类如下:

Package 描述
org.h2.engine.Database 根/全局类
org.h2.engine.SessionInterface 嵌入式会话和远程会话抽象类
org.h2.engine.Session 本地/嵌入式会话
org.h2.engine.SessionRemote 远程会话

Parser

解析器位于org.h2.command.Parser中。 它使用简单的递归下降设计。

请参阅Wikipedia递归下降解析器页面。

Command execution and planning

与其他数据库不同,我们没有中间步骤,无法生成查询的某种IR(中间表示)。 解析器类直接生成命令执行对象。 然后,我们对命令运行一些优化步骤,以可能生成更有效的命令。 感兴趣的主要软件包是:

Package 描述
org.h2.command.ddl 修改表结构等的命令
org.h2.command.dml 修改数据的命令

Table/Index/Constraints

这里要注意的一件事是,索引只是作为特殊类型的表存储。

感兴趣的主要软件包是:

Package 描述
org.h2.table 各种表的实现
org.h2.index 各种索引的实现

Undo log, redo log, and transactions layer

我们有一个事务日志,该日志在所有会话之间共享。 也可以看看

https://en.wikipedia.org/wiki/Transaction_log/

https://h2database.com/html/grammar.html#set_log/

我们还有一个针对每个会话的撤消日志,用于撤消操作(例如,更新失败)并回滚事务。 从理论上讲,可以使用事务日志,但是为了简单起见,H2当前使用它自己的“操作列表”(通常在内存中)。

有了MVStore,就不再需要它了(只是事务日志)。

B-tree engine and page-based storage allocation.

感兴趣的主要软件包是org.h2.store。

这实现了一种存储机制,该机制分配存储页面(通常为2k大小),并在这些页面上实现b树,以允许快速检索和更新。

Filesystem abstraction

感兴趣的主要类是org.h2.store.FileStore。

这实现了随机访问文件的抽象。 这使高层可以将内存数据库,磁盘数据库和zip文件数据库相同。

sofa-rpc源码解析-服务端netty启动过程

  1. 创建server
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //ServerConfig.java

    public synchronized Server buildIfAbsent() {
    if (server != null) {
    return server;
    }
    // 提前检查协议+序列化方式
    // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
    // SerializationType.valueOf(getSerialization()));

    server = ServerFactory.getServer(this);
    return server;
    }

如果不存在则创建,关键在于ServerFactory的getServer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

/**
* 全部服务端
*/

private final static ConcurrentHashMap<String, Server> SERVER_MAP = new ConcurrentHashMap<String, Server>();

public synchronized static Server getServer(ServerConfig serverConfig) {
try {
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
if (server == null) {
// 算下网卡和端口
resolveServerConfig(serverConfig);

ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
"Unsupported protocol of server!");
}
server = ext.getExtInstance();
server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
}
return server;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}

getServer默认返回的是一个BoltServer

2.启动server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
DefaultProviderBootstrap.java
for (ServerConfig serverConfig : serverConfigs) {

try {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}


//BoltServer.java
@Override
public void start() {
if (started) {
return;
}
synchronized (this) {
if (started) {
return;
}
// 生成Server对象
remotingServer = initRemotingServer();
try {
if (remotingServer.start(serverConfig.getBoundHost())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
serverConfig.getPort());
}
} else {
throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
started = true;

if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}

} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}

protected RemotingServer initRemotingServer() {
// 绑定到端口
RemotingServer remotingServer = new RpcServer(serverConfig.getPort());
remotingServer.registerUserProcessor(boltServerProcessor);
return remotingServer;
}
  • 启动过程使用了双重检查机制,防止Server重复启动
  • initRemotingServer方法里初始化了netty的一些配置,跟到代码里可以看到我们通常启动一个netty服务端需要的bossGroup(workerGroup在初始化过程里doInit方法里设置好)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //RpcServer.java ->sofa-bolt项目
    private static final NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new NamedThreadFactory("Rpc-netty-server-worker"));

    public RpcServer(int port) {
    super(port);
    this.globalSwitch = new GlobalSwitch();
    this.connectionEventListener = new ConnectionEventListener();
    this.userProcessors = new ConcurrentHashMap(4);
    this.bossGroup = new NioEventLoopGroup(1, new NamedThreadFactory("Rpc-netty-server-boss"));
    }
  • 启动:

    1
    2
    3
    4
    protected boolean doStart(String ip) throws InterruptedException {
    this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip, this.port)).sync();
    return this.channelFuture.isSuccess();
    }

标准的netty服务端启动模板代码

sofa-rpc源码解析-服务发布过程

SOFA 中间件是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,包括微服务研发框架,RPC 框架,服务注册中心,分布式定时任务,限流/熔断框架,动态配置推送,分布式链路追踪,Metrics监控度量,分布式高可用消息队列,分布式事务框架,分布式数据库代理层等组件,是一套分布式架构的完整的解决方案,也是在金融场景里锤炼出来的最佳实践。

功能特性

  • 透明化、高性能的远程服务调用
  • 支持多种服务路由及负载均衡策略
  • 支持多种注册中心的集成
  • 支持多种协议,包括 Bolt、Rest、Dubbo 等
  • 支持同步、单向、回调、泛化等多种调用方式
  • 支持集群容错、服务预热、自动故障隔离
  • 强大的扩展功能,可以按需扩展各个功能组件
  • 本源码解析系列将围绕这些特性展开.

编写服务端实现

第一步:创建接口

1
2
3
4
5
6
/**
* Quick Start demo interface
*/

public interface HelloService {
String sayHello(String string);
}

第二步:创建接口实现

1
2
3
4
5
6
7
8
9
10
/**
* Quick Start demo implement
*/

public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String string) {
System.out.println("Server receive: " + string);
return "hello " + string + " !";
}
}

第三步:编写服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Quick Start Server
*/

public class QuickStartServer {

public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程

ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端

providerConfig.export(); // 发布服务
}
}

编写客户端实现

第一步:拿到服务端接口

一般服务端会通过jar的形式将接口类提供给客户端。而在本例中,由于服务端和客户端在一个工程所以跳过。

第二步:编程客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Quick Start client
*/

public class QuickStartClient {
public static void main(String[] args) {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定协议
.setDirectUrl("bolt://127.0.0.1:12200"); // 指定直连地址
// 生成代理类
HelloService helloService = consumerConfig.refer();
while (true) {
System.out.println(helloService.sayHello("world"));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
}
}
}

服务发布

服务发布过程涉及到三个类 RegistryConfig ,ServerConfig ,ProviderConfig 。

1.RegistryConfig

1
2
3
RegistryConfig registryConfig = new RegistryConfig()
.setProtocol("zookeeper")
.setAddress("127.0.0.1:2181")

RegistryConfig 表示注册中心。如上声明了服务注册中心的地址和端口是127.0.0.1:2181,协议是 Zookeeper 。

2.ServerConfig

1
2
3
ServerConfig serverConfig = new ServerConfig()
.setPort(8803)
.setProtocol("bolt");

ServerConfig 表示服务运行容器。如上声明了一个使用8803端口和 bolt 协议的 server 。
3.ProviderConfig

1
2
3
4
5
6
ProviderConfig<HelloWorldService> providerConfig = new ProviderConfig<HelloWorldService>()
.setInterfaceId(HelloWorldService.class.getName())
.setRef(new HelloWorldServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();

ProviderConfig 表示服务发布。如上声明了服务的接口,实现和该服务运行的 server 。 最终通过 export 方法将这个服务发布出去了。

启动底层Netty服务端

1
2
3
4
5
6
7
8
9
/**
* 发布服务
*/

public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}

发布服务过程首先会初始化一个发布服务的包装类 ProviderBootstrap(DefaultProviderBootstrap),从该实例持有服务提供者配置信息。还提供了延迟加载的特性(单位毫秒)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}

doExport会做一些必要的参数检查,例如服务发布次数限制、服务下方法的黑白名单。最重要的是构建请求调用器也就是请求处理器,并将其与Server绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}

providerProxyInvoker是服务端调用链的入口,rpc请求将会沿着这个入口执行。

HikariCP源码解析一创建数据库连接池(二)

上篇介绍了HikariCP创建连接池的几种方式,跟踪源码发现其真正创建连接池是在HikariDataSource里的getConnection方法(即第一次获取连接则去创建连接池)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 private volatile HikariPool pool;
@Override
public Connection getConnection() throws SQLException{
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}

if (fastPathPool != null) {
return fastPathPool.getConnection();
}

// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
LOGGER.info("{} - Starting...", getPoolName());
try {
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
if (pie.getCause() instanceof SQLException) {
throw (SQLException) pie.getCause();
}
else {
throw pie;
}
}
LOGGER.info("{} - Start completed.", getPoolName());
}
}
}

return result.getConnection();
}

这里使用了双重检查机制,要注意的是pool变量使用了volatile这个关键字,原因是new一个对象并不是一个原子操作,要经过以下步骤:

  1. 给pool分配内存
  2. 调用构造函数初始化成员变量
  3. 将pool对象指向分配的内存(此步骤完成pool即为非空)

没有volatile关键字上面这3个步骤可能由于指令重排序令pool在多线程下未正确初始化即被使用则报错。volatile可禁止指令重排序,并强制本地线程去主存中读取pool变量。

HikariCP源码解析一创建数据库连接池

HikariCP是一个快速、简单、可靠的JDBC连接池。大约130Kb,相比于其它流行的数据库连接池非常的轻, spingboot2.0以及在国外非常有名的playFramework框架默认使用该连接池。

连接池 文件数 代码行数
Vibur 34 1927
HikariCP 21 2218
Tomcat-JDBC 31 6345
BoneCP 49 7293
C3P0 120 1550

HikariCP提供了多种创建数据库连接池的方式

  • 硬编码HikariConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    HikariConfig config = new HikariConfig();
    config.setJdbcUrl("jdbc:mysql://localhost:3306/simpsons");
    config.setUsername("bart");
    config.setPassword("51mp50n");
    config.addDataSourceProperty("cachePrepStmts", "true");
    config.addDataSourceProperty("prepStmtCacheSize", "250");
    config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");

    HikariDataSource ds = new HikariDataSource(config);
  • 直接硬编码HikariDataSource

    1
    2
    3
    4
    5
    HikariDataSource ds = new HikariDataSource();
    ds.setJdbcUrl("jdbc:mysql://localhost:3306/simpsons");
    ds.setUsername("bart");
    ds.setPassword("51mp50n");
    ...
  • 加载properties文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    HikariConfig config = new HikariConfig("/some/path/hikari.properties");
    HikariDataSource ds = new HikariDataSource(config);


    example.properties

    dataSourceClassName=org.postgresql.ds.PGSimpleDataSource
    dataSource.user=test
    dataSource.password=test
    dataSource.databaseName=mydb
    dataSource.portNumber=5432
    dataSource.serverName=localhost
  • HikariConfig是一个用来设置数据库连接属性属性的普通java类。还给连接池设置了一些常用默认属性:

1
2
3
4
5
private static final long CONNECTION_TIMEOUT = SECONDS.toMillis(30);
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final int DEFAULT_POOL_SIZE = 10;

值得说明的是HikariCP还可以使用系统的一个默认属性:hikaricp.configurationFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public HikariConfig() {
dataSourceProperties = new Properties();
healthCheckProperties = new Properties();

minIdle = -1;
maxPoolSize = -1;
maxLifetime = MAX_LIFETIME;
connectionTimeout = CONNECTION_TIMEOUT;
validationTimeout = VALIDATION_TIMEOUT;
idleTimeout = IDLE_TIMEOUT;
initializationFailTimeout = 1;
isAutoCommit = true;

String systemProp = System.getProperty("hikaricp.configurationFile");
if (systemProp != null) {
loadProperties(systemProp);
}
}

hikaricp.configurationFile,可用于指定属性文件的位置。 如果您打算使用此选项,则使用默认构造函数构造HikariConfig或HikariDataSource实例,HikariCP会加载该值对应的属性文件。

02-guava CharMatcher、Charsets、Strings

CharMatcher

1
2
3
4
5
6
7
8
try{
byte[] bytes = "foobarbaz".getBytes("UTF-8");
}catch (UnsupportedEncodingException e){
//This really can't happen UTF-8 must be supported
}

简写:
byte[] bytes2 = "foobarbaz".getBytes(Charsets.UTF_8);

Strings

Strings类为使用字符串提供了一些方便的实用方法。

  • 字符填充

    1
    2
    Strings.padEnd("foo", 6, 'x');//fooxxx
    Strings.padEnd("fooaaa", 6, 'x');//fooaaa
  • 空处理

nullToEmpty:这个方法接受一个字符串作为参数并返回
如果值不为空或长度大于0,则为原始字符串;否则
它返回空串

emptyToNull:该方法以类似于nullToEmpty的方式执行,
但如果字符串参数为空或者为null,则返回null

isNullOrEmpty:此方法对字符串执行空长检查
,如果该字符串实际上为null或空(长度为0),则返回true

CharMatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testRemoveWhiteSpace(){
String tabsAndSpaces = "String with spaces and
tabs";
String expected = "String with spaces and tabs";
String scrubbed = CharMatcher.WHITESPACE.
collapseFrom(tabsAndSpaces,' ');
assertThat(scrubbed,is(expected));
}

@Test
public void testTrimRemoveWhiteSpace(){
String tabsAndSpaces = " String with spaces and
tabs";
String expected = "String with spaces and tabs";
String scrubbed = CharMatcher.WHITESPACE.
trimAndCollapseFrom(tabsAndSpaces,' ');
assertThat(scrubbed,is(expected));
}

@Test
public void testRetainForm() {
String letterAndNumbers = "foo989yxbar234";
String expected = "989234";
String retained = CharMatcher.JAVA_DIGIT.retainFrom(letterAndNumbers);
Assert.assertThat(expected, is(retained));
}

来源:Getting Started with Google Guava