# Streaming by Python & Unit Testing

I am engaging in data analysis of Apache access logs recently, sick of writing mapreduce programs in Java way, I chose python, which is far more better than Java on processing text, and exploited stream which is the nature of mapreduce programming.

## Word Count Example

Mapper receives inputs from standard input, splits each line and sends all words out as word 1 pairs.

All the pairs are sorted by word, and sent to the Reducer, don’t worry, hadoop streaming does it for us.

Reducer adds all the counters of one word together, and writes to standard output.

By executing this command, we’ll get the word count results, simple and easy.

hadoop jar contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -input input -output output -mapper wc_mapper.py -reducer wc_reducer.py -file wc_mapper.py -file wc_reducer.py

## Unit Testing

Word count has a simple logic, but when it gets complicated, how can we verify the correctness?

We can test it on a hadoop cluster, or test it on our local machine by Unix stream.

cat test_input | ./wc_mapper.py | sort -k1,1 | ./wc_reducer

(-k1,1 means sort by the first column, without it, sort might give us an unexpected results which could confuse reducer)

Yes, both of them could work, but neither of them avoid duplication of efforts - same tests need to be manually run every time code is changed, and manually verified, it’s inhuman.

I prefer unit testing, it’s like my safe belt, without it, programming is more like a dark and dangerous journey.

But all the code is about reading from stdin and writing to stdout, how to do unit test?

Try mapper as an example, at first, extracts mapper logic to a procedure with stdin as argument.

Then construct a list as fake stdin in test.

We are done.

Wait, it failed,

AssertionError: ['mapper\t1', 'input\t1', 'word\t1', 'count\t1', 'input\t1'] != None

Oh, mapper has no return value, only prints to stdout.

Well, we could check the prints, at least, we don’t need to prepare and run the boring tests manually.

But without verification, how can it be called unit test, it’s not unit test, it’s shit, and stop making excuses.

## ‘Yield’ to The Rescue

The yield statement is only used when defining a generator function.

When a generator function is called, it returns an iterator known as a generator iterator

The body of the generator function is executed by calling the generator’s next() method repeatedly until it raises an exception.

When a yield statement is executed, the state of the generator is frozen and the value of expression_list is returned to next()’s caller.

By “frozen” we mean that all local state is retained, including the current bindings of local variables, the instruction pointer, and the internal evaluation stack: enough information is saved so that the next time next() is invoked, the function can proceed exactly as if the yield statement were just another external call.

To make it simple, let’s change the mapper implementation.

yield in function mapper makes it a generator function, every time when python interpreter evaluates yield, it does’t evaluate the expressions, but saves them to next()’s caller as well as all local states.

In main procedure, by iterating mapper, the generator function’s next() method gets called, the saved expressions are evaluated with the saved local states, just like an external call.

Now, we change the failed test.

This time, it passed! XD

## Back To The Old Lisp World

Given a task, to count the sum of all the prime numbers between a and b.

Iterative style is always easy to think, no matter what language it uses.

a is increased by 1 iteratively, if it’s a prime number, add it to sum accum, continue, try another round until a is greater than b, return accum.

Let’s try another style, stream style.

From inner out, enumerate-interval lists all the numbers from a to b, filter prime? gets all prime numbers, and accumulate adds them together.

Stream style is so simple and elegant that I don’t want to write program by iterative style any more.

But when I evaluate (sum-primes 10000 1000000), I cried. XD

It is so inefficient, during evaluation, it needs to store all the numbers from 10000 to 1000000, which is nearly a million integers list, then, this list is filtered by primality test, then most of the numbers get ignored.

Store the huge integer list is a big waste. It would be perfect if we could interleave the enumeration and the prime number filtering:

“Get 10000, do primality test, it’s non-prime then ignore, get 10001, do primarity test again … until 1000000

cons-stream is used to construct stream pair, delay and force are the key to solve the inefficient issue.

delay is syntactic sugar for (lambda () exp), which is used to package an expression so that it can be evaluated later by force.

(define (force delayed-object) (delayed-object))

Using the stream operations, we write an example, it looks similar as the sum-primes function, so, what does it do actually?

Firstly, let’s see what’s returned by stream-enumerate-interval.

It contructs a stream pair by cons-stream, which simply cons left stream with delayed right stream.

(cons 10000 (delay (stream-enumerate-interval 10001 1000000)))

Expression after delay can’t be evaluated until force appears, it can be treated like a block of a procedure definition.

Then stream-filter checks whether (car enumer-stream) is a prime number, since 10000 is not a prime number, it returns

As we see, if (car enumer-stream) is non-prime, the stream-filter filters the stream-cdr of the current stream recursively.

Until, 10007 is a prime number, stream-filter uses con-stream to construct a stream pair:

Feeding the filter-stream into the whole example:

We get

Result of the example is returned, which is 10009, the second prime number between 10000 and 1000000.

And we see the enumeration and prime number filtering are interleaved with each other.

This is probably the yield is all about.

## Build Scaffold for Python Streaming

Now, let’s free to yield, build mapper helper this way.

When run_mapper gets called, it asks mapper for input, which asks get_mapper_feeds for input, as if it were iterating the sys.stdin directly.

Reducer helper is a little complicated, get_reducer_feeds needs to combine the values of same key.

Then reducer function can be easily written.

At last, how to do unit testing?

Be lazy, always try to avoid duplicated tasks, life can be easier.

Btw, source code is githubbed.