Streaming by Python & Unit Testing
I am engaging in data analysis of Apache access logs recently, sick of writing mapreduce programs in Java way, I chose python, which is far more better than Java on processing text, and exploited [stream](http://en.wikipedia.org/wiki/Stream_(computing)) which is the nature of mapreduce programming.
Word Count Example
Mapper receives inputs from standard input, splits each line and
sends all words out as word 1
pairs.
1 | #!/usr/bin/env python |
All the pairs are sorted by word
, and sent to the
Reducer, don't worry, hadoop
streaming does it for us.
Reducer adds all the counters of one word
together, and
writes to standard output.
1 | #!/usr/bin/env python |
By executing this command, we'll get the word count results, simple and easy.
hadoop jar contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -input input -output output -mapper wc_mapper.py -reducer wc_reducer.py -file wc_mapper.py -file wc_reducer.py
Unit Testing
Word count has a simple logic, but when it gets complicated, how can we verify the correctness?
We can test it on a hadoop cluster, or test it on our local machine by Unix stream.
cat test_input | ./wc_mapper.py | sort -k1,1 | ./wc_reducer
(-k1,1
means sort by the first column, without it,
sort
might give us an unexpected results which could
confuse reducer)
Yes, both of them could work, but neither of them avoid duplication of efforts - same tests need to be manually run every time code is changed, and manually verified, it's inhuman.
I prefer unit testing, it's like my safe belt, without it, programming is more like a dark and dangerous journey.
But all the code is about reading from stdin
and writing
to stdout
, how to do unit test?
Try mapper as an example, at first, extracts mapper logic to a
procedure with stdin
as argument.
1 | #!/usr/bin/env python |
Then construct a list as fake stdin
in test.
1 | import unittest |
We are done.
Wait, it failed,
AssertionError: ['mapper\t1', 'input\t1', 'word\t1', 'count\t1', 'input\t1'] != None
Oh, mapper has no return value, only prints to
stdout
.
Well, we could check the prints, at least, we don't need to prepare and run the boring tests manually.
But without verification, how can it be called unit test, it's not unit test, it's shit, and stop making excuses.
'Yield' to The Rescue
From Python Reference Manual:
The yield statement is only used when defining a generator function.
When a generator function is called, it returns an iterator known as a generator iterator
The body of the generator function is executed by calling the generator's next() method repeatedly until it raises an exception.
When a yield statement is executed, the state of the generator is frozen and the value of expression_list is returned to next()'s caller.
By "frozen" we mean that all local state is retained, including the current bindings of local variables, the instruction pointer, and the internal evaluation stack: enough information is saved so that the next time next() is invoked, the function can proceed exactly as if the yield statement were just another external call.
To make it simple, let's change the mapper implementation.
1 | #!/usr/bin/env python |
yield
in function mapper
makes it a
generator function, every time when python interpreter evaluates
yield
, it does't evaluate the expressions, but saves them
to next()
's caller as well as all local states.
In main
procedure, by iterating mapper
, the
generator function's next()
method gets called, the saved
expressions are evaluated with the saved local states, just like an
external call.
Now, we change the failed test.
1 | def test_mapper(self): |
This time, it passed! XD
Back To The Old Lisp World
Given a task, to count the sum of all the prime numbers between
a
and b
.
Iterative style is always easy to think, no matter what language it uses.
1 | (define (sum-primes a b) |
a
is increased by 1 iteratively, if it's a prime number,
add it to sum accum
, continue, try another round until
a
is greater than b, return accum
.
Let's try another style, stream style.
1 | (define (sum-primes a b) |
From inner out, enumerate-interval
lists all the numbers
from a
to b
, filter prime?
gets
all prime numbers, and accumulate
adds them together.
Stream style is so simple and elegant that I don't want to write program by iterative style any more.
But when I evaluate (sum-primes 10000 1000000)
, I cried.
XD
It is so inefficient, during evaluation, it needs to store all the
numbers from 10000
to 1000000
, which is nearly
a million integers list, then, this list is filtered by primality test,
then most of the numbers get ignored.
Store the huge integer list is a big waste. It would be perfect if we could interleave the enumeration and the prime number filtering:
"Get 10000
, do primality test, it's non-prime then
ignore, get 10001
, do primarity test again ... until
1000000
"
Here we go, start with defining some stream operations.
1 | (define (cons-stream a b) (cons a (delay b)) |
cons-stream
is used to construct stream pair,
delay
and force
are the key to solve the
inefficient issue.
delay
is syntactic sugar for
(lambda () exp)
, which is used to package an expression so
that it can be evaluated later by force
.
(define (force delayed-object) (delayed-object))
Using the stream operations, we write an example, it looks similar as
the sum-primes
function, so, what does it do actually?
1 | (stream-car (stream-cdr |
Firstly, let's see what's returned by
stream-enumerate-interval
.
1 | (define (stream-enumerate-interval low high) |
It contructs a stream pair by cons-stream
, which simply
cons
left stream with delay
ed right
stream.
(cons 10000 (delay (stream-enumerate-interval 10001 1000000)))
Expression after delay
can't be evaluated until
force
appears, it can be treated like a block of a
procedure definition.
1 | (define (stream-filter pred stream) |
Then stream-filter
checks whether
(car enumer-stream)
is a prime number, since
10000
is not a prime number, it returns
1 | (stream-filter prime? (stream-cdr |
As we see, if (car enumer-stream)
is non-prime, the
stream-filter
filters the stream-cdr
of the
current stream recursively.
1 | (stream-filter prime? |
Until, 10007
is a prime number,
stream-filter
uses con-stream
to construct a
stream pair:
1 | (cons-stream 10007 |
Feeding the filter-stream
into the whole example:
1 | (stream-car (stream-cdr |
We get
1 | (stream-car (stream-cdr |
Result of the example is returned, which is 10009
, the
second prime number between 10000
and
1000000
.
And we see the enumeration and prime number filtering are interleaved with each other.
This is probably the yield
is all about.
Build Scaffold for Python Streaming
Now, let's free to yield
, build mapper helper this
way.
1 | # runner.py |
When run_mapper
gets called, it asks mapper
for input, which asks get_mapper_feeds
for input, as if it
were iterating the sys.stdin
directly.
1 | #!/usr/bin/env python |
Reducer helper is a little complicated,
get_reducer_feeds
needs to combine the values of same
key.
1 | def combine_bykeys(stdin): |
Then reducer function can be easily written.
1 | #!/usr/bin/env python |
At last, how to do unit testing?
1 | import unittest |
Be lazy, always try to avoid duplicated tasks, life can be easier.
Btw, source code is githubbed.