Extra Cookie

Yet Another Programmer's Blog

Readings in Database Systems - Interactive Analytics

最近在看 Stonebraker“Readings in Database Systems”, 发觉开拓了很多思路。

这么多年自己一直在从事大数据方面的工作,但除了翻过数据挖掘算法和分布式系统设计方面的论文外,完全没想过去翻翻数据库相关的论文看。现在想想,其实大数据和数据库两者很多需求和场景是一致的,要解决的问题,没准学术界很多年前就已经有方案了。

这篇文章主要是 “Interactive Analytics” 相关部分。

What is Interactive Analytics

假如你是一家电商公司的分析师,如果有 100 万用户原始交易数据打印出来摆在你面前,让你去分析这些数据的意义,你会怎么做?

如果这十万条数据给我,我估计是看不出什么东西出来。而且我相信每个人,也是如此,因为人的认知是有 bug 的,比如不能直接处理大量原始数据。

那该怎么办?

我们需要把数据通过一些方式做提炼,变成小的结果集,或者以可视化的形式展现出来。用过 SQL 的人也许会想到 Group by 语句,是的,往往通过 Group by 做 aggregation 后的数据,会好理解很多。

大数据不只是数据量超大,更在于能从大量数据里面发现价值。

而 “Interactive Analytics” 指的就是这个过程,但加了个前提:这个过程必须能在较短的时间内完成,哪怕甚至来不及遍历所有需要的原始数据。

当数据量超大的时候,这个前提对每个数据系统都是一个很大的挑战。

Ideas

那怎么让一个查询请求的执行过程比直接遍历所有依赖的数据还快呢?

结论,显而易见,只能不去遍历所有的依赖数据,能有这样的方案,那问题也就迎刃而解了。

目前靠谱的方案有两种:

  1. Precomputing,如果预先把查询请求依赖的相关数据都做了一定的 “提炼”,便可大大减少查询需要去遍历的数据。
  2. Sampling,可以对数据进行取样,每次查询请求都只遍历取样后的数据,这样遍历数据也可以大大减少。

“Red Book” 给出了四篇参考文献,12是关于 Precomputing 的,34是关于 Sampling 的。

Precomputing

Data Cube

还是以之前的交易数据举例,假如我们只关注零部件(Part)、供应商(Supplier)和客户(Customer)三个维度,关注的指标是总销售额,那么我们预先可以分别把每个不同部件 p、供应商 s、客户 c 的销售额总和统计出来,以 (p, s, c) 形式存起来,如果有相关查询请求,直接返回结果就可以了。

这就是一个三维 Data Cube 的建立和使用,每一个 cell 代表一种部件、供应商、客户组合 (p, s, c),对应的 value 就是这个组合的销售额总计。

Build Data Cube

当然,实际情况下,分析任务关注的维度肯定不仅是三个,可能是多个不同的维度组合。

对 data cube 的建立有如下三种方式:

  1. 预先计算出所有组合的 data cube,之后所有的请求就可以得到最快的响应,但会带来很大的预计算和数据存储压力。(如果有 K 的维度,需要执行 个 Group by 语句来做预计算。)
  2. 不做任何预计算,每个请求都直接从原始数据进行提取,这种方式没有额外的数据存储压力,但数据量大的情况下请求执行耗时会非常长。
  3. 预先计算一些维度组合的 data cube,这个是 1 采取的方式,这种方式目标是做到请求执行耗时、预计算耗时和存储的平衡,但选择哪些维度组合做预计算是关键,选择错了,可能还不如采用上面两种方式。

前面的例子,要全部预计算出部件、供应商和客户三个维度的 data cube,需要如下 8 个组合:

  1. psc (part, supplier, customer) (6M: 6 million rows)
  2. pc (part, customer) (6M)
  3. ps (part, supplier) (0.8M)
  4. sc (supplier, customer) (6M)
  5. p (part) (0.2M)
  6. s (supplier) (0.01M)
  7. c (customer) (0.1M)
  8. none (1)

(组合后面的数字代表该组合所有结果数据的行数)

可以发现,其实如果 psc 的数据有了,pc 可以通过按 supplier 维度汇聚 psc 的数据得到,p 可以通过按 cutomer 维度汇聚 pc 的数据得到,其他依次类推。

pc 组合和 psc 组合都有 6 百万行记录,也就是对 pcpsc 两个维度组合进行查询都要遍历这么多行记录,那么如果不预计算 pc,而在用户请求 pc 维度时直接通过对 psc 维度进行汇聚,遍历的数据行数是一致的,如果以数据行数作为衡量指标,预计算 pc 便是毫无必要的。

The Lattice Framework

1 中提出了一个 Lattice 模型,如下图所示:

每个节点表示一个 data cube 组合,下方的节点可以通过上方节点汇聚得到。

左边是上面例子的 lattice 模型,右边是模型之间的合并过程。

通过这样的结构,可以将维度组合选择转化为一个最优选择的问题:

在限制节点个数的情况下,最小化每个节点预计算的平均耗时。

1 中首先提出了个 cost model 来评估通过依赖节点计算自身 data cube 的 cost,然后提出了个 greedy algorithm 通过计算平均最少 cost 来进行预计算维度选择,算法的细节和证明大家可以细读该论文

2 中给出了一种基于内存的 data cube 计算方法,有兴趣可以下载阅读

Sampling

Data Cube 模式,不论如何优化,都是需要离线任务去预先构建大量的 Cube 集,在需要的维度很多、或者数据延迟要求很低的场景下,不能很好的满足要求。

Sampling 方式是在降低准确性的前提下,减少遍历的数据量,达到快速响应查询请求的目的。

4 中通过对用户的查询请求进行统计,评估出经常用的查询列集合,预先进行 Sample 创建。

Sample Creation

如何进行 Sample 创建,我在看论文的时候,直接想到的是将数据记录打乱,随机分布在若干个 partition 里面,当有请求过来的时候,直接选择一个或多个 partition 进行查询即可。

4 中提到了这样做(uniform sampling)的问题:

如果只是全局的对数据做统计,效果比较好,但如果有 filter 或者 group by 操作,这种方式往往得不到好的效果。

举个例子,比如我要按城市来统计销售额的分布,如果是采用我想的那种分布方式的话,一些交易量很少的城市,可能在 sample 里完全消失了,这样的分布统计,其实是错的。

4 中提出了 Stratified Sampling 方式来解决这个问题。

基本思想就是首先对维度列进行统计,将相同列值的行作为一个 group,然后分别进行 sampling,论文中详细介绍了 sampling 的方法和每个 group sampling size 的设定。

Sample Selection

在如何选择 Sample 的问题上,4 提出了 ELP(Error Latency Profile)模型,通过用户设定的准确率和耗时要求,进行 sample 选择。

当然,这是个非常复杂的过程。4 中详细讲了如何去评估各个 Sample 的耗时和准确率,怎么样在生成执行计划的过程中考虑用户的准确率和耗时的要求。有兴趣大家可以详细阅读

3 通过提出的如随机数据访问、在线排序、ripple join 等算法,在已有的关系型数据库,实现了一套支持 online sampling 的原型系统,有兴趣可以详细阅读

Summary

Data Cube 方案,在数据的准确度方面是毋须质疑优于 Sampling 方案的,工程界的 Apache Kylin 就是如此的方式。而 Sampling 方式目前的应用还比较少,对于很多用户而言,Sampling 的方案即使是 99% 的准确度,还是无法接受的,哪怕其实已经满足了他的需求。

但我倒比较看好 Sampling 方式,因为 Data Cube 的整个机制对数据变化和实时方面有很大的限制,随着内存越来越廉价,以及越来越好的列存储方案,数据进行实时交互分析变得越来越可行,比如 ImpalaPresto,在大数据量的情况下,性能都很好,不大的集群都可以做到秒级响应对亿级数据量的查询。

当然,资源不可能是无限的,也不可能每个查询请求都能有资源保证快速遍历海量数据,所以,通过对准确率方面的牺牲,达到查询耗时的降低,其实是一种比较经济的方案 (Presto 是有类似 4 中提到的 Sampling 方案)。

Reference

[1] Venky Harinarayan, Anand Rajaraman, Jeffrey D. Ullman. Implementing Data Cubes Efficiently. SIGMOD, 1996.

[2] Yihong Zhao, Prasad M. Deshpande, Jeffrey F. Naughton. An Array-Based Algorithm for Simultaneous Multidimensional Aggregates. SIGMOD, 1997.

[3] Joseph M. Hellerstein, Ron Avnur, Vijayshankar Raman. Informix under CONTROL: Online Query Processing. Data Mining and Knowledge Discovery, 4(4), 2000, 281-314.

[4] Sameer Agarwal, Barzan Mozafari, Aurojit Panda, Henry Milner, Samuel Madden, Ion Stoica. BlinkDB: Queries with Bounded Errors and Bounded Response Times on Very Large Data. EuroSys, 2013.

A Bug in a Java Servlet

We have a legacy system, which is a web service, receives HTTP POST from clients, parses the data, then stores them in a file.

The function of the system is simple, and people already done functional and performance test, it’s stable. As time drifted away, the system was copy and paste to some projects by only changing the data parsing logic.

I had a similar requirement recently, then I delved into the legacy code to check if it works in order to not reinventing the wheel.

WTF

At first, I noticed below code in a HttpServlet class, it allocates more than 1M memory for each HTTP POST request.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final long MAX_CONTENT_LENGTH = 1024 * 1024;
private static final int BUFFER_SIZE = 4096;

...

public void doPost(HttpServletRequest request, HttpServletResponse response)
		throws ServletException, IOException {

    ...

    int requestContentBufSize = request.getContentLength() + MAX_CONTENT_LENGTH;
    ByteBuffer requestContentBuf = ByteBuffer.allocate(requestContentBufSize);
    byte[] buffer = new byte[BUFFER_SIZE];
    requestInputStream = new DataInputStream(request.getInputStream());
    int readBytes = 0;
    int totalReadBytes = 0;
    while ((readBytes = requestInputStream.read(buffer)) > 0) {
        requestContentBuf.put(buffer);
    	totalReadBytes = totalReadBytes + readBytes;
    }
    byte[] requestContent = Arrays.copyOf(requestContentBuf.array(), totalReadBytes);

    ...
}

It’s insane, I believe the memory should be the same as each HTTP POST body size. Then I changed the code.

1
int requestContentBufSize = request.getContentLength();

Deployed the service and sent one HTTP POST request to it.

curl -d 'Hello, World' http://my.server.com:9000/log

An Exception occurred.

The BufferOverflowException

After reducing the memory allocated for ByteBuffer, it overflows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java.nio.BufferOverflowException
	at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:183)
	at java.nio.ByteBuffer.put(ByteBuffer.java:830)
	at com.myproject.servlet.LogServer.doPost(LogServer.java:99)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:643)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:723)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
	at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
	at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:606)
	at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489)
	at java.lang.Thread.run(Thread.java:701)

I thought I’d better dig into how does the servlet do to make ByteBuffer get its data?

  1. It creates a small buffer occupied BUFFER_SIZE (4096) bytes.
  2. It iterates the HTTP request input stream, to put the data into the small buffer.
  3. It puts the small buffer to ByteBuffer and loop back to 1.

Well, in the last loop, the data read from the HTTP request input stream might smaller than the BUFFER_SIZE, but the servlet still puts BUFFER_SIZE bytes to ByteBuffer.

Then, to fix the ExceptionBufferOverflowException, I increased the capacity of previous ByteBuffer by BUFFER_SIZE.

1
int requestContentBufSize = request.getContentLength() + BUFFER_SIZE;

Deployed again, and

curl -d 'Hello, World' http://my.server.com:9000/log

The bug was fixed.

Did I?

The ServletInputStream

When client posts huge data, what could happen?

I created a String which is 7516 bytes, and sent to server.

curl -d 'very very long string' http://my.server.com:9000/log

Sometimes, the java.nio.BufferOverflowException occurred, and sometimes it didn’t.

What went wrong?

To find the root cause, I added some logs to trace the ByteBuffer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int requestContentBufSize = request.getContentLength() + BUFFER_SIZE;
ByteBuffer requestContentBuf = ByteBuffer.allocate(requestContentBufSize);
byte[] buffer = new byte[BUFFER_SIZE];
requestInputStream = new DataInputStream(request.getInputStream());
int readBytes = 0;
int totalReadBytes = 0;
log.debug("1: ByteBuffer position: " + requestContentBuf.position() +
        ", buffer capacity: " + requestContentBuf.capacity() +
        ", buffer remaining: " + requestContentBuf.remaining());
while ((readBytes = requestInputStream.read(buffer)) > 0) {
	requestContentBuf.put(buffer);
	totalReadBytes = totalReadBytes + readBytes;
    log.debug("2. Bytes read: " + readBytes);
    log.debug("1: ByteBuffer position: " + requestContentBuf.position() +
            ", buffer capacity: " + requestContentBuf.capacity() +
            ", buffer remaining: " + requestContentBuf.remaining());
}

The log printed when no exception,

1
2
3
4
5
- 1: ByteBuffer position: 0, buffer capacity: 11612, buffer remaining: 11612
- 2. Bytes read: 4096
- 1: ByteBuffer position: 4096, buffer capacity: 11612, buffer remaining: 7516
- 2. Bytes read: 3420
- 1: ByteBuffer position: 8192, buffer capacity: 11612, buffer remaining: 3420

The log printed when exception occurred,

1
2
3
4
5
- 1: ByteBuffer position: 0, buffer capacity: 11612, buffer remaining: 11612
- 2. Bytes read: 1356
- 1: ByteBuffer position: 4096, buffer capacity: 11612, buffer remaining: 7516
- 2. Bytes read: 1356
- 1: ByteBuffer position: 8192, buffer capacity: 11612, buffer remaining: 3420

Now, it is easy to find out the root cause is in these lines of code.

1
2
while ((readBytes = requestInputStream.read(buffer)) > 0) {
    requestContentBuf.put(buffer);

The read method call won’t put data to the buffer fully which was specified as 4096 bytes even when the input stream still has data.

And to fix it, just specify the offset and length of the small buffer.

1
2
while ((readBytes = requestInputStream.read(buffer)) > 0) {
    requestContentBuf.put(buffer, 0, readBytes);

I had increased the capacity of the ByteBuffer by BUFFER_SIZE, this change should also be reverted.

Now, the bug is fixed, and this is network programming.

Questions

“The system works a long time, and it shouldn’t have this problem or we knew it long ago”

This is because the client seldom posts data more than 4096 bytes to server.

“I have read the Javadoc of DataInputStream, the read method will put data fully to the specified buffer”

It didn’t, please read it again.

“I have tested the read method of DataInputStream on a file, it reads fully 4096 bytes in every iteration”

This is a web service, deploy it to a server and test.

“I have tested it on my local machine as a web service, and it reads fully 4096 bytes in every iteration”

This is a web service, it should be in a network.

At Last

When a potential bug was reported, we do tests to make it happen again and find the root cause.

We do not stop listening and just look for reasons to reject it.

When we find a bug, we do help others to make it reappear to collect information.

We do not sit there and just blame on others for their mistakes.

TDD on Swift

Long long ago, I wrote a post about how to do TDD using Objective-C, since Apple WWDC 2014, Swift is really eye-catching, I think I should write a new one to follow the trend.

XCTest is used as the unit test framework, and Xcode 6 is needed.

TDD Work-flow

  1. Add a test for a user case or a user story
  2. Run all tests and see if the new one fails
  3. Write some code that causes the test to pass
  4. Run tests, change production code until all test cases pass
  5. Refactor the production code
  6. Refactor the test code
  7. Return to 1, and repeat

The 5 and 6 are optional, do them only if needed, but be sure that DO NOT do them at the same time. That is, when you refactor production code, you can’t change the test code, until all the test cases are passed, then you are confident that your production code refactoring is perfect, then, you can refactor the test code, and this time, you can’t change the production code.

A Simple Example

We are about to implement a super simple bank account management tool.

Create a Project

Use Xcode to create a project BankAccount (iOS Single View Application)

Add a Test Case

Create a Swift file named SavingAccountTest, and choose BankAccountTests as target.

“People can deposit money to a saving account”, it’s our first user story.

1
2
3
4
5
6
7
8
9
10
11
import Foundation
import XCTest

class SavingAccountTest: XCTestCase {

    func testDeposit() {
        var account = SavingAccount()
        account.deposit(100)
        XCTAssertEqual(100, account.balance)
    }
}

Run All Tests

Run all the unit tests, it fails as we expected.

Write Code to Pass the Test

Create a Swift file named SavingAccount, and choose both BankAccount and BankAccountTests as targets.

Make it simple, just to pass the test.

1
2
3
4
5
6
7
8
9
import Foundation

class SavingAccount {
    var balance:Int = 100

    func deposit(money:Int) {

    }
}

Run All Tests

It passes.

Next User Story?

“People could withdraw some money”

Let’s change the testDeposit test case.

1
2
3
4
5
6
7
8
9
10
11
12
import Foundation
import XCTest

class SavingAccountTest: XCTestCase {

    func testDepositAndWithdraw() {
        var account = SavingAccount()
        account.deposit(100)
        account.withdraw(50)
        XCTAssertEqual(50, account.balance)
    }
}

Also, add an empty withdraw method to SavingAccount to satisfy the compiler. Do not add any other code until we see it fails.

Run All Tests

The test fails, because the account balance was not updated after people withdrew some money.

Write Code to Support Withdraw

1
2
3
4
5
6
7
8
9
10
11
12
13
import Foundation

class SavingAccount {
    var balance:Int = 0

    func deposit(money:Int) {
        balance += money
    }

    func withdraw(money:Int) {
        balance -= money
    }
}

Run All Tests

All the user stories are satisfied.

Any Other New User Story?

“People can’t withdraw money beyond their account balance”

We add a new test case testNegativeBalanceIsNotFine

1
2
3
4
5
6
func testNegativeBalanceIsNotFine() {
    var account = SavingAccount()
    account.deposit(50)
    account.withdraw(100)
    XCTAssertEqual(0, account.balance)
}

Run All Tests

It fails, we have to fix it.

Write Code

Change the withdraw method, set account balance to 0 if it is less than 0.

1
2
3
4
5
6
func withdraw(money:Int) {
    balance -= money
    if balance < 0 {
        balance = 0
    }
}

Run All Tests

All right, all the test cases are succeeded.

Refactoring

Until now, we haven’t do any refactoring on our code base.

I think the production code is fine, so we skip the step 5, and refactor the test code.

We can see that both test cases create an instance of SavingAccount, the duplicated code can be removed by using only one SavingAccount instance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SavingAccountTest: XCTestCase {
    var account = SavingAccount()

    func testDepositAndWithdraw() {
        account.deposit(100)
        account.withdraw(50)
        XCTAssertEqual(50, account.balance)
    }

    func testNegativeBalanceIsNotFine() {
        account.deposit(50)
        account.withdraw(100)
        XCTAssertEqual(0, account.balance)
    }
}

Don’t forget to run all the tests, make sure it is succeeded.

Why no setup and tearDown

People coming from objc may doubt that why the account instance is not put into setUp method, the way we use might cause different test cases sharing one instance variable, as we know, test cases should be independent with each other.

Yes, I had this doubt, too. So I did a test, by adding a “account balance should be 0” check before each test cases.

1
2
3
4
5
6
7
8
9
10
11
12
13
func testDepositAndWithdraw() {
    XCTAssertEqual(0, account.balance)
    account.deposit(100)
    account.withdraw(50)
    XCTAssertEqual(50, account.balance)
}

func testNegativeBalanceIsNotFine() {
    XCTAssertEqual(0, account.balance)
    account.deposit(50)
    account.withdraw(100)
    XCTAssertEqual(0, account.balance)
}

The result shows that the XCTest framework avoids instance variable sharing between test cases by instantiating a brand new XCTestCase object for each test case. That is, it instantiated two SavingAccountTest objects as our tests run.

To TDD Haters

If you hate TDD, and may think this blog post is garbage.

Sorry for that, you can remove your browser history of this address, if it makes you feel better.

Also, I strongly recommend you to watch the “TDD dead” discussions by Martin Fowler, Kent Beck and David Heinemeier Hansson.