Streaming by Python & Unit Testing

I am engaging in data analysis of Apache access logs recently, sick of writing mapreduce programs in Java way, I chose python, which is far more better than Java on processing text, and exploited [stream](http://en.wikipedia.org/wiki/Stream_(computing)) which is the nature of mapreduce programming.

Word Count Example

Mapper receives inputs from standard input, splits each line and sends all words out as word 1 pairs.

1
2
3
4
5
6
7
8
#!/usr/bin/env python
import sys

for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)

All the pairs are sorted by word, and sent to the Reducer, don't worry, hadoop streaming does it for us.

Reducer adds all the counters of one word together, and writes to standard output.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)

try:
count = int(count)
except ValueError:
continue

if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word

if current_word == word:
print '%s\t%s' % (current_word, current_count)

By executing this command, we'll get the word count results, simple and easy.

hadoop jar contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -input input -output output -mapper wc_mapper.py -reducer wc_reducer.py -file wc_mapper.py -file wc_reducer.py

Unit Testing

Word count has a simple logic, but when it gets complicated, how can we verify the correctness?

We can test it on a hadoop cluster, or test it on our local machine by Unix stream.

cat test_input | ./wc_mapper.py | sort -k1,1 | ./wc_reducer

(-k1,1 means sort by the first column, without it, sort might give us an unexpected results which could confuse reducer)

Yes, both of them could work, but neither of them avoid duplication of efforts - same tests need to be manually run every time code is changed, and manually verified, it's inhuman.

I prefer unit testing, it's like my safe belt, without it, programming is more like a dark and dangerous journey.

But all the code is about reading from stdin and writing to stdout, how to do unit test?

Try mapper as an example, at first, extracts mapper logic to a procedure with stdin as argument.

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
import sys

def mapper(stdin):
for line in stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)

if __name__ == '__main__':
mapper(sys.stdin)

Then construct a list as fake stdin in test.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import unittest
import wc_mapper

class WCMapperTest(unittest.TestCase):
def test_mapper(self):
input = ['mapper input',
'word count input']
expected = ['mapper\t1', 'input\t1',
'word\t1', 'count\t1', 'input\t1']

result = wc_mapper.mapper(input)
self.assertEqual(expected, result)

if __name__ == '__main__':
unittest.main()

We are done.

Wait, it failed,

AssertionError: ['mapper\t1', 'input\t1', 'word\t1', 'count\t1', 'input\t1'] != None

Oh, mapper has no return value, only prints to stdout.

Well, we could check the prints, at least, we don't need to prepare and run the boring tests manually.

But without verification, how can it be called unit test, it's not unit test, it's shit, and stop making excuses.

'Yield' to The Rescue

From Python Reference Manual:

The yield statement is only used when defining a generator function.

When a generator function is called, it returns an iterator known as a generator iterator

The body of the generator function is executed by calling the generator's next() method repeatedly until it raises an exception.

When a yield statement is executed, the state of the generator is frozen and the value of expression_list is returned to next()'s caller.

By "frozen" we mean that all local state is retained, including the current bindings of local variables, the instruction pointer, and the internal evaluation stack: enough information is saved so that the next time next() is invoked, the function can proceed exactly as if the yield statement were just another external call.

To make it simple, let's change the mapper implementation.

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
import sys

def mapper(stdin):
for line in stdin:
line = line.strip()
words = line.split()
for word in words:
yield '%s\t%s' % (word, 1)

if __name__ == '__main__':
for output in mapper(sys.stdin):
print output

yield in function mapper makes it a generator function, every time when python interpreter evaluates yield, it does't evaluate the expressions, but saves them to next()'s caller as well as all local states.

In main procedure, by iterating mapper, the generator function's next() method gets called, the saved expressions are evaluated with the saved local states, just like an external call.

Now, we change the failed test.

1
2
3
4
5
6
7
8
9
def test_mapper(self):
input = ['mapper input',
'word count input']
expected = ['mapper\t1', 'input\t1',
'word\t1', 'count\t1', 'input\t1']
result = []
for output in wc_mapper.mapper(input):
result.append(output)
self.assertEqual(expected, result)

This time, it passed! XD

Back To The Old Lisp World

Given a task, to count the sum of all the prime numbers between a and b.

Iterative style is always easy to think, no matter what language it uses.

1
2
3
4
5
6
7
(define (sum-primes a b)
(define (iter count accum)
(cond ((> count b) accum)
((prime? count)
(iter (+ count 1) (+ count accum)))
(else (iter (+ count 1) accum))))
(iter a 0))

a is increased by 1 iteratively, if it's a prime number, add it to sum accum, continue, try another round until a is greater than b, return accum.

Let's try another style, stream style.

1
2
3
4
(define (sum-primes a b)
(accumulate +
0
(filter prime? (enumerate-interval a b))))

From inner out, enumerate-interval lists all the numbers from a to b, filter prime? gets all prime numbers, and accumulate adds them together.

Stream style is so simple and elegant that I don't want to write program by iterative style any more.

But when I evaluate (sum-primes 10000 1000000), I cried. XD

It is so inefficient, during evaluation, it needs to store all the numbers from 10000 to 1000000, which is nearly a million integers list, then, this list is filtered by primality test, then most of the numbers get ignored.

Store the huge integer list is a big waste. It would be perfect if we could interleave the enumeration and the prime number filtering:

"Get 10000, do primality test, it's non-prime then ignore, get 10001, do primarity test again ... until 1000000"

Here we go, start with defining some stream operations.

1
2
3
(define (cons-stream a b) (cons a (delay b))
(define (stream-car stream) (car stream))
(define (stream-cdr stream) (force (cdr stream)))

cons-stream is used to construct stream pair, delay and force are the key to solve the inefficient issue.

delay is syntactic sugar for (lambda () exp), which is used to package an expression so that it can be evaluated later by force.

(define (force delayed-object) (delayed-object))

Using the stream operations, we write an example, it looks similar as the sum-primes function, so, what does it do actually?

1
2
3
4
(stream-car (stream-cdr
(stream-filter prime?
(stream-enumerate-interval
10000 1000000))))

Firstly, let's see what's returned by stream-enumerate-interval.

1
2
3
4
5
6
(define (stream-enumerate-interval low high)
(if (> low high)
the-empty-stream
(cons-stream
low
(stream-enumerate-interval (+ low 1) high))))

It contructs a stream pair by cons-stream, which simply cons left stream with delayed right stream.

(cons 10000 (delay (stream-enumerate-interval 10001 1000000)))

Expression after delay can't be evaluated until force appears, it can be treated like a block of a procedure definition.

1
2
3
4
5
6
7
(define (stream-filter pred stream)
(cond ((stream-null? stream) the-empty-stream)
((pred (stream-car stream))
(cons-stream (stream-car stream)
(stream-filter pred
(stream-cdr stream))))
(else (stream-filter pred (stream-cdr stream)))))

Then stream-filter checks whether (car enumer-stream) is a prime number, since 10000 is not a prime number, it returns

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(stream-filter prime? (stream-cdr
(cons 10000
(delay
(stream-enumerate-interval
10001 1000000)))))

;; expand stream-cdr
(stream-filter prime?
(force (cdr
(cons 10000
(delay (stream-enumerate-interval
10001 1000000))))))

;; eval cdr and force
(stream-filter prime?
(stream-enumerate-interval 10001 1000000))

;; eval stream-enumerate-interval
(stream-filter prime?
(cons 10001
(delay
(stream-enumerate-interval
10002 1000000))))

As we see, if (car enumer-stream) is non-prime, the stream-filter filters the stream-cdr of the current stream recursively.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(stream-filter prime?
(cons 10002
(delay
(stream-enumerate-interval
10003 1000000))))

(stream-filter prime?
(cons 10003
(delay
(stream-enumerate-interval
10004 1000000))))

;; ...

(stream-filter prime?
(cons 10007
(delay
(stream-enumerate-interval
10008 1000000))))

Until, 10007 is a prime number, stream-filter uses con-stream to construct a stream pair:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
(cons-stream 10007
(stream-filter prime?
(stream-cdr
(cons 10007
(delay
(stream-enumerate-interval
10008 1000000)))))
;; eval stream-cdr
(cons-stream 10007
(stream-filter prime?
(stream-enumerate-interval 10008 1000000)))

;; expand cons-stream
(cons 10007 (delay
(stream-filter prime?
(stream-enumerate-interval
10008 1000000))))

Feeding the filter-stream into the whole example:

1
2
3
4
(stream-car (stream-cdr
(stream-filter prime?
(stream-enumerate-interval
10000 1000000))))

We get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
(stream-car (stream-cdr
(cons 10007 (delay
(stream-filter prime?
(stream-enumerate-interval
10008 1000000))))))
;; eval stream-cdr
(stream-car
(stream-filter prime?
(stream-enumerate-interval
10008 1000000)))

;; eval stream-enumerate-interval
(stream-car
(stream-filter prime?
(cons 10008
(delay
(stream-enumerate-interval 10009 1000000)))))

;; eval stream-filter, 10008 is not a prime number
(stream-car
(stream-filter prime?
(stream-enumerate-interval 10009 1000000)))

;; eval stream-enumerate-interval again
(stream-car
(stream-filter prime?
(cons 10009
(delay
(stream-enumerate-interval 10010 1000000)))))

;; eval stream-filter, 10009 is a prime number
(stream-car
(cons 10009
(delay
(stream-filter prime?
(stream-enumerate-interval 10010 1000000)))))

Result of the example is returned, which is 10009, the second prime number between 10000 and 1000000.

And we see the enumeration and prime number filtering are interleaved with each other.

This is probably the yield is all about.

Build Scaffold for Python Streaming

Now, let's free to yield, build mapper helper this way.

1
2
3
4
5
6
7
8
9
# runner.py
def get_mapper_feeds():
for line in sys.stdin:
yield line.strip()

def run_mapper(mapper):
for feed in get_mapper_feeds():
for keyvalue in mapper(feed):
print keyvalue

When run_mapper gets called, it asks mapper for input, which asks get_mapper_feeds for input, as if it were iterating the sys.stdin directly.

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
import runner

def mapper(line):
words = line.split()
for word in words:
yield '%s\t%s' % (word, 1)

if __name__ == '__main__':
runner.run_mapper(mapper)

Reducer helper is a little complicated, get_reducer_feeds needs to combine the values of same key.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def combine_bykeys(stdin):
current_key = None
current_values = []

for line in stdin:
line = line.strip()
key, values = line.split('\t', 1)
values = values.split('\t')

if current_key == key:
current_values.append(values)
else:
if current_key:
yield current_key, current_values
current_key = key
current_values = [values]


if current_key != None and current_key == key:
yield current_key, current_values

def get_reducer_feeds():
return combine_bykeys(sys.stdin)

def run_reducer(reducer):
for key, values in get_reducer_feeds():
for keyvalue in reducer(key, values):
print keyvalue

Then reducer function can be easily written.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
import runner
def reducer(key, values):
sum = 0
for value in values:
try:
count = int(value[0])
except ValueError:
continue
sum += count
yield '%s\t%s' % (key, sum)

if __name__ == '__main__':
runner.run_reducer(reducer)

At last, how to do unit testing?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import unittest
import wc_mapper

class WCMapperTest(unittest.TestCase):
def test_mapper(self):
input = 'mapper input of wc'
expected = ['mapper\t1',
'input\t1',
'of\t1',
'wc\t1']
result = []
for output in wc_mapper.mapper(input):
result.append(output)
self.assertEqual(expected, result)

if __name__ == '__main__':
unittest.main()

Be lazy, always try to avoid duplicated tasks, life can be easier.

Btw, source code is githubbed.