Extra Cookie

Yet Another Programmer's Blog

TDD on Swift

Long long ago, I wrote a post about how to do TDD using Objective-C, since Apple WWDC 2014, Swift is really eye-catching, I think I should write a new one to follow the trend.

XCTest is used as the unit test framework, and Xcode 6 is needed.

TDD Work-flow

  1. Add a test for a user case or a user story
  2. Run all tests and see if the new one fails
  3. Write some code that causes the test to pass
  4. Run tests, change production code until all test cases pass
  5. Refactor the production code
  6. Refactor the test code
  7. Return to 1, and repeat

The 5 and 6 are optional, do them only if needed, but be sure that DO NOT do them at the same time. That is, when you refactor production code, you can’t change the test code, until all the test cases are passed, then you are confident that your production code refactoring is perfect, then, you can refactor the test code, and this time, you can’t change the production code.

A Simple Example

We are about to implement a super simple bank account management tool.

Create a Project

Use Xcode to create a project BankAccount (iOS Single View Application)

Add a Test Case

Create a Swift file named SavingAccountTest, and choose BankAccountTests as target.

“People can deposit money to a saving account”, it’s our first user story.

1
2
3
4
5
6
7
8
9
10
11
import Foundation
import XCTest

class SavingAccountTest: XCTestCase {

    func testDeposit() {
        var account = SavingAccount()
        account.deposit(100)
        XCTAssertEqual(100, account.balance)
    }
}

Run All Tests

Run all the unit tests, it fails as we expected.

Write Code to Pass the Test

Create a Swift file named SavingAccount, and choose both BankAccount and BankAccountTests as targets.

Make it simple, just to pass the test.

1
2
3
4
5
6
7
8
9
import Foundation

class SavingAccount {
    var balance:Int = 100

    func deposit(money:Int) {

    }
}

Run All Tests

It passes.

Next User Story?

“People could withdraw some money”

Let’s change the testDeposit test case.

1
2
3
4
5
6
7
8
9
10
11
12
import Foundation
import XCTest

class SavingAccountTest: XCTestCase {

    func testDepositAndWithdraw() {
        var account = SavingAccount()
        account.deposit(100)
        account.withdraw(50)
        XCTAssertEqual(50, account.balance)
    }
}

Also, add an empty withdraw method to SavingAccount to satisfy the compiler. Do not add any other code until we see it fails.

Run All Tests

The test fails, because the account balance was not updated after people withdrew some money.

Write Code to Support Withdraw

1
2
3
4
5
6
7
8
9
10
11
12
13
import Foundation

class SavingAccount {
    var balance:Int = 0

    func deposit(money:Int) {
        balance += money
    }

    func withdraw(money:Int) {
        balance -= money
    }
}

Run All Tests

All the user stories are satisfied.

Any Other New User Story?

“People can’t withdraw money beyond their account balance”

We add a new test case testNegativeBalanceIsNotFine

1
2
3
4
5
6
func testNegativeBalanceIsNotFine() {
    var account = SavingAccount()
    account.deposit(50)
    account.withdraw(100)
    XCTAssertEqual(0, account.balance)
}

Run All Tests

It fails, we have to fix it.

Write Code

Change the withdraw method, set account balance to 0 if it is less than 0.

1
2
3
4
5
6
func withdraw(money:Int) {
    balance -= money
    if balance < 0 {
        balance = 0
    }
}

Run All Tests

All right, all the test cases are succeeded.

Refactoring

Until now, we haven’t do any refactoring on our code base.

I think the production code is fine, so we skip the step 5, and refactor the test code.

We can see that both test cases create an instance of SavingAccount, the duplicated code can be removed by using only one SavingAccount instance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SavingAccountTest: XCTestCase {
    var account = SavingAccount()

    func testDepositAndWithdraw() {
        account.deposit(100)
        account.withdraw(50)
        XCTAssertEqual(50, account.balance)
    }

    func testNegativeBalanceIsNotFine() {
        account.deposit(50)
        account.withdraw(100)
        XCTAssertEqual(0, account.balance)
    }
}

Don’t forget to run all the tests, make sure it is succeeded.

Why no setup and tearDown

People coming from objc may doubt that why the account instance is not put into setUp method, the way we use might cause different test cases sharing one instance variable, as we know, test cases should be independent with each other.

Yes, I had this doubt, too. So I did a test, by adding a “account balance should be 0” check before each test cases.

1
2
3
4
5
6
7
8
9
10
11
12
13
func testDepositAndWithdraw() {
    XCTAssertEqual(0, account.balance)
    account.deposit(100)
    account.withdraw(50)
    XCTAssertEqual(50, account.balance)
}

func testNegativeBalanceIsNotFine() {
    XCTAssertEqual(0, account.balance)
    account.deposit(50)
    account.withdraw(100)
    XCTAssertEqual(0, account.balance)
}

The result shows that the XCTest framework avoids instance variable sharing between test cases by instantiating a brand new XCTestCase object for each test case. That is, it instantiated two SavingAccountTest objects as our tests run.

To TDD Haters

If you hate TDD, and may think this blog post is garbage.

Sorry for that, you can remove your browser history of this address, if it makes you feel better.

Also, I strongly recommend you to watch the “TDD dead” discussions by Martin Fowler, Kent Beck and David Heinemeier Hansson.

The Hadoop 2.x - Running a YARN Job (2)

In last blog post, The job Client has been created and initialized.

This post will discuss on how does Client do to deploy the job to hadoop cluster.

Code snippets will be full of this post, to not confuse you, all comments added by me begin with //** instead of // or /* and the code can be cloned from Apache Git Repository, commit id is 2e01e27e5ba4ece19650484f646fac42596250ce.

The Resource Manager Proxy

1
2
3
4
5
6
7
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L330
public boolean run() throws IOException, YarnException {

  LOG.info("Running Client");
  yarnClient.start();

  ...

At first, the start method of yarnClient is invoked. In last blog post we know, the yarnClient is actually a YarnClientImpl instance, but since it and its super YarnClient don’t have the start interface implemented, the code path goes to AbstractService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//** org.apache.hadoop.service.AbstractService.java L184
public void start() {
  if (isInState(STATE.STARTED)) {
    return;
  }
  //enter the started state
  synchronized (stateChangeLock) {
    if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
      try {
        startTime = System.currentTimeMillis();
        serviceStart();
        if (isInState(STATE.STARTED)) {
          //if the service started (and isn't now in a later state), notify
          if (LOG.isDebugEnabled()) {
            LOG.debug("Service " + getName() + " is started");
          }
          notifyListeners();
        }
      } catch (Exception e) {
        ...
      }
    }
  }
}

In above start method, the service state transfers from STATE.INITED to STATE.STARTED, and serviceStart method which implemented in YarnClientImpl is invoked.

1
2
3
4
5
6
7
8
9
10
  //** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L105
  protected void serviceStart() throws Exception {
    try {
      rmClient = ClientRMProxy.createRMProxy(getConfig(),
            ApplicationClientProtocol.class);
    } catch (IOException e) {
      throw new YarnRuntimeException(e);
    }
    super.serviceStart();
  }

The ApplicationClientProtocol instance rmClient is created in serviceStart.

When I step into the createRMProxy method, I was shocked, it is really a lot of code. For simplicity, this method creates a RPC proxy link to the Resource Manager, and communicates according to the ApplicationClientProtocol, the implementation is in ApplicationClientProtocolPBClientImpl class.

We’ll discuss what’s under the hood of Hadoop/YARN RPC framework in later posts, believe in me. :)

The Protocol Buffer Protocols

After the Resource Manager proxy is established, the Client begins to ask questions to Resource Manager.

1
2
3
4
5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L335
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
    + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
...
1
2
3
4
5
6
7
8
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L242
public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
  IOException {
  GetClusterMetricsRequest request =
    Records.newRecord(GetClusterMetricsRequest.class);
  GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
  return response.getClusterMetrics();
}

By the rmClient proxy, the Client queries cluster metrics, running nodes and queue information from Resource Manager. The code path is clear, creates the input parameter, passes to method call, and gets the result.

But you might want to ask, what does Records.newRecord do?

1
2
3
4
5
6
7
8
9
10
//** org.apache.hadoop.yarn.util.Records L30
public class Records {
  // The default record factory
  private static final RecordFactory factory =
      RecordFactoryProvider.getRecordFactory(null);

  public static <T> T newRecord(Class<T> cls) {
    return factory.newRecordInstance(cls);
  }
}

In newRecord method, by passing the cls parameter to newRecordInstance method call on factory, a new record is generated, with the cls type GetClusterMetricsRequest.

As we see in above code, the factory is created by calling getRecordFactory from RecordFactoryProvider.

Step into the RecordFactoryProvider.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//** org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.java L43
public static RecordFactory getRecordFactory(Configuration conf) {
  ...
  String recordFactoryClassName = conf.get(
      YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
      YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
  return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
}

private static Object getFactoryClassInstance(String factoryClassName) {
  try {
    Class<?> clazz = Class.forName(factoryClassName);
    Method method = clazz.getMethod("get", null);
    method.setAccessible(true);
    return method.invoke(null, null);
  } catch (ClassNotFoundException e) {
    ...
  }
}

If the IPC_RECORD_FACTORY_CLASS parameter is not set in configuration, the factory instance is created as DEFAULT_IPC_RECORD_FACTORY_CLASS which is “org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl” by calling its get method.

1
2
3
4
//** org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl L44
public static RecordFactory get() {
  return self;
}

The factory is got, then let’s see how the GetClusterMetricsRequest instance is created by newRecordInstance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//** org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.java L50
public <T> T newRecordInstance(Class<T> clazz) {
  Constructor<?> constructor = cache.get(clazz);
  if (constructor == null) {
    Class<?> pbClazz = null;
    try {
      pbClazz = localConf.getClassByName(getPBImplClassName(clazz));
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Failed to load class: ["
          + getPBImplClassName(clazz) + "]", e);
    }
    try {
      constructor = pbClazz.getConstructor();
      constructor.setAccessible(true);
      cache.putIfAbsent(clazz, constructor);
    } catch (NoSuchMethodException e) {
      throw new YarnRuntimeException("Could not find 0 argument constructor", e);
    }
  }
  try {
    Object retObject = constructor.newInstance();
    return (T)retObject;
  } catch (InvocationTargetException e) {
    ...
  }
}

private String getPBImplClassName(Class<?> clazz) {
  String srcPackagePart = getPackageName(clazz);
  String srcClassName = getClassName(clazz);
  String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
  String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
  return destPackagePart + "." + destClassPart;
}

At first, a class name is got from getPBImplClassName, since the parameter clazz is GetClusterMetricsRequest, the class name is GetClusterMetricsRequestPBImpl.

Then, gets the class constructor, and creates a new instance by calling below default constructor.

1
2
3
4
//** org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsRequestPBImpl.java L36
public GetClusterMetricsRequestPBImpl() {
  builder = GetClusterMetricsRequestProto.newBuilder();
}

When you see the “Proto”, “newBuilder”, I think you might know now, it’s Protocol Buffer.

Now we get the GetClusterMetricsRequest instance as a parameter to the rmClient proxy.

By calling the getClusterMetrics, a GetClusterMetricsResponse instance is returned as a response.

Let’s step into the real Resource Manager proxy implementation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  //** org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.java L146
  public GetClusterMetricsResponse getClusterMetrics(
      GetClusterMetricsRequest request) throws YarnException,
      IOException {
    GetClusterMetricsRequestProto requestProto =
        ((GetClusterMetricsRequestPBImpl) request).getProto();
    try {
      return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }

The returned response is actually a GetClusterMetricsResponsePBImpl instance, in which has a getClusterMetrics interface implemented to translate the RPC result to native object.

Finally, the getYarnClusterMetrics method call on yarnClient is finished. Later method call such as getNodeReports, getQueueInfo is similar to getYarnClusterMetrics.

Let’s continue the run method of Client.

The Job Preparation

1
2
3
4
5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L368
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
...

A YarnClientApplication instance is created through yarnClient proxy. If you really understand the above code flow for getting the cluster metrics, it is easy for you to look deep into the createApplication method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//** org.apache.hadoop.yarn.client.api.YarnClientApplication.java L38
public YarnClientApplication(GetNewApplicationResponse newAppResponse,
                             ApplicationSubmissionContext appContext) {
  this.newAppResponse = newAppResponse;
  this.appSubmissionContext = appContext;
}

public GetNewApplicationResponse getNewApplicationResponse() {
  return newAppResponse;
}

public ApplicationSubmissionContext getApplicationSubmissionContext() {
  return appSubmissionContext;
}

A YarnClientApplication has two instance variables, a GetNewApplicationResponse and a ApplicationSubmissionContext instance, the latter is used to store all needed information of a job, which will be submitted to the Resource Manager.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L369
//** Get appContext from YarnClientApplication instance
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
//** Get appId from appContext
ApplicationId appId = appContext.getApplicationId();
//** Set application name to appContext,
//** which will be used to submit the job later
appContext.setApplicationName(appName);

//** Create a ContainerLaunchContextPBImpl instance,
//** which obeys the ContainerLaunchContextProto protocol
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

//** Create a local resouces map for the application master
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

//** Copy the application master jar to the hdfs
FileSystem fs = FileSystem.get(conf);
Path src = new Path(appMasterJar);
String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
...
//** Create a `LocalResourcePBImpl` instance, and add the AppMaster jar to it
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
...
localResources.put("AppMaster.jar",  amJarRsrc);
...

//** Copy the shell script to hdfs if exsits
String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
...

// Save local resource info into app master container launch context
amContainer.setLocalResources(localResources);

// Set the env variables to be setup in the env where the application master will be run
Map<String, String> env = new HashMap<String, String>();
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

//** Add AppMaster.jar location to classpath
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
  .append(File.pathSeparatorChar).append("./*");
...
env.put("CLASSPATH", classPathEnv.toString());

//** Save env to amContainer
amContainer.setEnvironment(env);

// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
...
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
  vargs.add("--shell_command " + shellCommand + "");
}
if (!shellArgs.isEmpty()) {
  vargs.add("--shell_args " + shellArgs + "");
}
...
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}

List<String> commands = new ArrayList<String>();
commands.add(command.toString());

//** Save command to amContainer
amContainer.setCommands(commands);
...
//** Save amContainer to appContext
appContext.setAMContainerSpec(amContainer);

// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);

// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);

// Submit the application to the applications manager
//** Again, use the rmClient proxy
yarnClient.submitApplication(appContext);

return monitorApplication(appId);

Now the job is deployed to hadoop cluster, and the running status is fetched by monitorApplication through rmClient proxy at every 1 second.