I am engaging in data analysis of Apache access logs recently, sick of writing mapreduce programs in Java way, I chose python, which is far more better than Java on processing text, and exploited [stream](http://en.wikipedia.org/wiki/Stream_(computing)) which is the nature of mapreduce programming.
Word Count Example
Mapper receives inputs from standard input, splits each line and sends all words out as
word 1 pairs.
All the pairs are sorted by
word, and sent to the Reducer, don't worry, hadoop streaming does it for us.
Reducer adds all the counters of one
word together, and writes to standard output.
By executing this command, we'll get the word count results, simple and easy.
hadoop jar contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -input input -output output -mapper wc_mapper.py -reducer wc_reducer.py -file wc_mapper.py -file wc_reducer.py
Word count has a simple logic, but when it gets complicated, how can we verify the correctness?
We can test it on a hadoop cluster, or test it on our local machine by Unix stream.
cat test_input | ./wc_mapper.py | sort -k1,1 | ./wc_reducer
-k1,1 means sort by the first column, without it,
sort might give us an unexpected results which could confuse reducer)
Yes, both of them could work, but neither of them avoid duplication of efforts - same tests need to be manually run every time code is changed, and manually verified, it's inhuman.
I prefer unit testing, it's like my safe belt, without it, programming is more like a dark and dangerous journey.
But all the code is about reading from
stdin and writing to
stdout, how to do unit test?
Try mapper as an example, at first, extracts mapper logic to a procedure with
stdin as argument.
Then construct a list as fake
stdin in test.
We are done.
Wait, it failed,
AssertionError: ['mapper\t1', 'input\t1', 'word\t1', 'count\t1', 'input\t1'] != None
Oh, mapper has no return value, only prints to
Well, we could check the prints, at least, we don't need to prepare and run the boring tests manually.
But without verification, how can it be called unit test, it's not unit test, it's shit, and stop making excuses.
'Yield' to The Rescue
From Python Reference Manual:
The yield statement is only used when defining a generator function.
When a generator function is called, it returns an iterator known as a generator iterator
The body of the generator function is executed by calling the generator's next() method repeatedly until it raises an exception.
When a yield statement is executed, the state of the generator is frozen and the value of expression_list is returned to next()'s caller.
By "frozen" we mean that all local state is retained, including the current bindings of local variables, the instruction pointer, and the internal evaluation stack: enough information is saved so that the next time next() is invoked, the function can proceed exactly as if the yield statement were just another external call.
To make it simple, let's change the mapper implementation.
yield in function
mapper makes it a generator function, every time when python interpreter evaluates
yield, it does't evaluate the expressions, but saves them to
next()'s caller as well as all local states.
main procedure, by iterating
mapper, the generator function's
next() method gets called, the saved expressions are evaluated with the saved local states, just like an external call.
Now, we change the failed test.
This time, it passed! XD
Back To The Old Lisp World
Given a task, to count the sum of all the prime numbers between
Iterative style is always easy to think, no matter what language it uses.
(define (sum-primes a b)
a is increased by 1 iteratively, if it's a prime number, add it to sum
accum, continue, try another round until
a is greater than b, return
Let's try another style, stream style.
(define (sum-primes a b)
From inner out,
enumerate-interval lists all the numbers from
filter prime? gets all prime numbers, and
accumulate adds them together.
Stream style is so simple and elegant that I don't want to write program by iterative style any more.
But when I evaluate
(sum-primes 10000 1000000), I cried. XD
It is so inefficient, during evaluation, it needs to store all the numbers from
1000000, which is nearly a million integers list, then, this list is filtered by primality test, then most of the numbers get ignored.
Store the huge integer list is a big waste. It would be perfect if we could interleave the enumeration and the prime number filtering:
10000, do primality test, it's non-prime then ignore, get
10001, do primarity test again ... until
Here we go, start with defining some stream operations.
(define (cons-stream a b) (cons a (delay b))
cons-stream is used to construct stream pair,
force are the key to solve the inefficient issue.
delay is syntactic sugar for
(lambda () exp), which is used to package an expression so that it can be evaluated later by
(define (force delayed-object) (delayed-object))
Using the stream operations, we write an example, it looks similar as the
sum-primes function, so, what does it do actually?
Firstly, let's see what's returned by
(define (stream-enumerate-interval low high)
It contructs a stream pair by
cons-stream, which simply
cons left stream with
delayed right stream.
(cons 10000 (delay (stream-enumerate-interval 10001 1000000)))
delay can't be evaluated until
force appears, it can be treated like a block of a procedure definition.
(define (stream-filter pred stream)
stream-filter checks whether
(car enumer-stream) is a prime number, since
10000 is not a prime number, it returns
(stream-filter prime? (stream-cdr
As we see, if
(car enumer-stream) is non-prime, the
stream-filter filters the
stream-cdr of the current stream recursively.
10007 is a prime number,
con-stream to construct a stream pair:
filter-stream into the whole example:
Result of the example is returned, which is
10009, the second prime number between
And we see the enumeration and prime number filtering are interleaved with each other.
This is probably the
yield is all about.
Build Scaffold for Python Streaming
Now, let's free to
yield, build mapper helper this way.
run_mapper gets called, it asks
mapper for input, which asks
get_mapper_feeds for input, as if it were iterating the
Reducer helper is a little complicated,
get_reducer_feeds needs to combine the values of same key.
Then reducer function can be easily written.
At last, how to do unit testing?
Be lazy, always try to avoid duplicated tasks, life can be easier.
Btw, source code is githubbed.