Business-Friendly Functional Programming – Part 1: Asynchronous Operations

Recently, we refactored one of our micro-services in order to apply the Each library. Each is a macro library developed by Thoughtworks that converts native imperative syntax to scalaz‘s monadic expressions. This means that we can write, among other things, asynchronous code with Futures in a plain imperative style.

The micro-service is a Scala application that serves as a RESTful server. It receives requests from browsers. For each of the requests, the micro-service fetches data from other internal RESTful servers, composites these data into one response, and sends the response to browsers.

During the refactoring, we reduced more than half of the code from our old codebase, and dramatically improved the testability and transparency from the business point of view.

The example application

In this article, we will create multiple versions of an example application in order to demonstrate the evolution of the architecture.

The example application is based on Play framework, which produces an HTML page containing a table of all contacts. The data of these contacts are fetched from a remote service.

A quick and dirty implementation

The entry point of a Play application is the /conf/routes configuration, which specifies a Scala handler for each endpoint.

# /conf/routes

GET /contacts controllers.ContactController.contacts

Then we would create a /controllers/ContactController.scala:

class ContactController extends Controller {
  
  import services.ContactService._
  
  /**
   * All the business logic of this application
   */
  private def contactsXhtml: xml.Elem = {
    val emailList = getEmailList()
    
        <table>{
          for {
            email 
              <td>
                {getContactNameByEmail(email)}
              </td>
              <td>
                {email}
              </td>
            </tr>
          }
        }</table>
      
  }
  
  /**
   * HTTP handler for /contacts
   */
  def contacts = Action {
    Ok(contactsXhtml)
  }
}

We created contacts method that produces an Action of XHTML page. The method invokes contactsXhtml that invokes getEmailList and getContactNameByEmail for external services, which could be implemented later.

object ContactService {

  /**
   * Fetches list of email of all contacts from a remote service.
   */
  def getEmailList(): Seq[String] = ???

  /**
   * Query the contact full name that corresponds to an email from a remote service.
   */
  def getContactNameByEmail(email: String): String = ???

}

The contactsXhtml method filters out invalid email addresses and generates a <tr>…</tr> for each contact.

You can find the complete quick-and-dirty example at part1-direct-style tag of git://github.com/Atry/play-each-example.git.

The contactsXhtml represents the business logic quite straightforwardly, which is understandable by a domain expert. Unfortunately, it does not work when we introduce some technical requirements.

Asynchronous Operations in Scala standard library

Our micro-service is based on the Play framework, which provides all I/O API in an asynchronous flavor. All I/O operations rely on Future, which requires developers to provide callback functions to fetch the I/O results.

In order to perform asynchronous operations, we have to change the return type of our methods for external services:

/**
 * Fetches list of email of all contacts from a remote service.
 */
private def asyncGetEmailList(): Future[Seq[String]] = ???

/**
 * Query the contact full name that corresponds to an email.
 */
private def asyncGetContactNameByEmail(email: String): Future[String] = ???

Then, when we implement our business logic in contactsXhtml, we have to invoke map, flatMap or onComplete on Futures, and provide a callback function to handle the returned data from the Futures.

/**
 * All the business logic of this application
 */
private def asyncContactsXhtml: Future[ xml.Elem ] = {
  asyncGetEmailList().flatMap { emailList =>
    emailList.foldLeft(Future.successful(Seq.empty[ xml.Elem ])) { (headFuture, email) =>
      if (email.matches( """[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,6}""")) {
        headFuture.flatMap { head =>
          asyncGetContactNameByEmail(email).map { name =>
            head :+
              <tr>
                <td>
                  {name}
                </td>
                <td>
                  {email}
                </td>
              </tr>
          }
        }
      } else {
        headFuture
      }
    }.map { trs =>
          <table>
            {trs}
          </table>
    }
  }
}

/**
 * HTTP handler for /contacts
 */
def contacts = Action.async {
  asyncContactsXhtml.map(Ok)
}

You can find the complete asynchronous code example at the part1-continuation-passing-style tag of git://github.com/Atry/play-each-example.git.

You may notice that our new asyncContactsXhtml has 30 lines of code, while the original quick-and-dirty contactsXhtml has only 21 lines.

The different structures between asyncContactsXhtml and contactsXhtml has caused the code size change.

For the original quick-and-dirty implementation, in order to put <tr>s into <table>, we simply nested <tr> generation expressions in <table>. For the new asyncContactsXhtml, we have to split the code into two steps: first generate a Future of Seq of <tr>, then map it and nest the Seq in a <table>.

In order to create <tr>s, the original quick-and-dirty contactXhtml has a for/yield comprehension expression that invokes getContactNameByEmail and creates a <tr> for each email. Unfortunately we could not write the same structure in the new asyncContactsXhtml, because the for/yield comprehension is only able to generate Seq of <td>, not Future of Seq of <td>. Instead, we have to call foldLeft and pass a callback function that appends Future of <td> to the previous Future of Seq of <td>.

As a result, the expressions in the new asyncContactsXhtml do not represent business logic steps one by one, and make little sense to either domain experts or normal Java programmers. This situation is called callback hell, which results in obscure codebases, and hurts transparency in the view of the business.

Asynchronous Operations with Each

Both the quick-and-dirty contactsXhtml method and the callback hell asyncContactsXhtml method represent the same business logic, although they present different coding styles. The style of asyncContactsXhtml is called CPS, or Continuation-Passing Style.

As you have seen in asyncContactsXhtml, manually writting CPS code is difficult, and often error-prone. Fortunately there are some algorithms that translate direct style code like contactsXhtml to continuation-passing style code like asyncContactsXhtml.

Suppose that we want to create a method that performs an asynchronous operation that consists of four steps, we could write it in Scala like this:

def combineThreeSteps(step1: Future[Unit], step2: Future[Unit], step3: Future[Unit]): Future[Unit] = {
  step1.flatMap { _ =>
    step2.flatMap { _ =>
      step3
    }
  }
}

On the other hand, we could get rid of these callback functions, and write normal direct-style code with help of Each:

def combineThreeSteps(step1: Future[Unit], step2: Future[Unit], step3: Future[Unit]): Future[Unit] = monadic[Future] {
  step1.each
  step2.each
  step3.each
}

monadic is a Scala macro, which accepts an Abstract Syntax Tree instead of a value, then produces another AST. Note that the parameter AST is passed to the macro at compile-time instead of run-time.

The parameter AST that passed to monadic contains some calls to each method, which represent the asynchronous result value of each Future, which is Unit in the above example. The each methods are markers for monadic macro, which will disappear and be translated to some monadic expressions by monadic macro. An each marker must be inside a monadic macro, otherwise the Scala compiler will report an error.

The return value of monadic is an AST that contains multiple calls to scalaz.Monad[Future].bind and scalaz.Monad[Future].point. The methods bind and point eventually call Future.flatMap and Future.successful.

// The generated monadic expression after macro expansion.
def combineThreeSteps(step1: Future[Unit], step2: Future[Unit], step3: Future[Unit]): Future[Unit] = {
  Monad[Future].bind(step1) { _ =>
    Monad[Future].bind(step2) { _ =>
      step3
    }
  }
}

Asynchronous result values

each may represent values other than Unit:

def sumOfThreeSteps(step1: Future[Int], step2: Future[Int], step3: Future[Int]): Future[Int] = monadic[Future] {
  step1.each + step2.each + step3.each
}

The above code corresponds to the following native asynchronous code:

def sumOfThreeSteps(step1: Future[Int], step2: Future[Int], step3: Future[Int]): Future[Int] = {
  step1.flatMap { value1 =>
    step2.flatMap { value2 =>
      step3.map { value3 =>
        value1 + value2 + value3
      }
    }
  }
}

Replace flatMap and map to monadic and each

Now we could remove our flatMap and map in asyncContactsXhtml, and use monadic and each instead.

Firstly, we created a monadic block for the whole method.
Then, in the monadic block, we replace the asyncGetEmailList().flatMap and emailList.foldLeft(…){…}.map to asyncGetEmailList().each and emailList.foldLeft(…){…}.each.

private def asyncContactsXhtml: Future[ xml.Elem ] = monadic[Future] {
  val emailList = asyncGetEmailList().each
  
    
      <table>{
        emailList.foldLeft(Future.successful(Seq.empty[ xml.Elem ])) { (headFuture, email) =>
          if (email.matches( """[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,6}""")) {
            headFuture.flatMap { head =>
              asyncGetContactNameByEmail(email).map { name =>
                head :+
                  <tr>
                    <td>
                      {name}
                    </td>
                    <td>
                      {email}
                    </td>
                  </tr>
              }
            }
          } else {
            headFuture
          }
        }.each
      }</table>
    
  
}

Thanks to Each, the foldLeft expression are now nested in <table>, and the entire structure of asyncContactsXhtml become more similar to the original quick-and-dirty contactsXhtml.

for/yield comprehension in a monadic block.

Now there are three higher-ordered functions left, foldLeft, flatMap and map. You may expect that we replace the foldLeft with a for/yield comprehension expression for a Seq.

private def asyncContactsXhtml: Future[ xml.Elem ] = monadic[Future] {
  val emailList = asyncGetEmailList().each
      <table>{
        // Does not work!
        for {
          email 
            <td>
              {asyncGetContactNameByEmail(email).each}
            </td>
            <td>
              {email}
            </td>
          </tr>
        }
      }</table>
}

Unfortunately this does not work and the compiler will report an error message:

each must be inside monadic, throwableMonadic, or catchIoMonadic.

Let me explain why. The monadic macro is able to convert all of Scala’s control structures into monadic expressions, though it does not convert for/yield comprehension expressions, because, unlike do/while, for/yield comprehension expressions are not control structures, and Scala compiler treats for/yield comprehension expressions as function calls.

The above asyncContactsXhtml expression is the same as the following one in Scala compiler’s view:

private def asyncContactsXhtml: Future[ xml.Elem ] = monadic[Future] {
  val emailList = asyncGetEmailList().each
      <table>{
        // Does not work!
        emailList.withFilter { email =>
          email.matches("""[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,6}""")
        }.map { email =>
          <tr>
            <td>
              {asyncGetContactNameByEmail(email).each}
            </td>
            <td>
              {email}
            </td>
          </tr>
        }
      }</table>
}

As you can see, the for/yield comprehension expression became higher-ordered function calls withFilter and map, and asyncGetContactNameByEmail(email).each belongs to the parameter function passed to map instead of the parameter for the monadic macro. As a result, the monadic does not see the each marker and does not convert the expression around the each marker. And finally, the naked each marker reports the compiler error.

More precisely, any each marker inside a for/yield comprehension expression on List or other Scala standard collections causes that error.

Fortunately, monadic macro supports a special type MonadicLoop, and converts function calls withFilter and map on MonadicLoop to monadic expressions. As a result, an each marker in a for/yield comprehension expression on MonadicLoop is acceptable in a monadic block.

In brief, we could convert the List to a MonadicLoop before the for/yield comprehension expression, and convert the MonadicLoop returned by for/yield comprehension to a List before inserting it into <table>.

private def asyncContactsXhtml: Future[ xml.Elem ] = monadic[Future] {
  val emailList = asyncGetEmailList().each
  
    
      <table>{
        (for {
          email <- emailList.monadicLoop // Converts emailList to a MonadicLoop
          if email.matches("""[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,6}""")
        } yield {
          <tr>
            <td>
              {asyncGetContactNameByEmail(email).each}
            </td>
            <td>
              {email}
            </td>
          </tr>
        }).underlying // Converts the MonadicLoop returned from for/yield comprehension back to a List
      }</table>
    
  
}

You can find the complete asynchronous code example with the help of Each at part1-each-style tag of git://github.com/Atry/play-each-example.git.

Cheat Sheet

In this blog, we have learnt how to deal with asynchronous operation with help of Each. You could review them in a cheat sheet that contains some comparison between direct style, continuation-passing style and Each style for some common asynchronous programming tasks.

Direct style Continuation-passing style Each style
Declaring a method for an operation
def op: Result = {
  ???
}
def op: Future[Result] = {
  ???
}
def op: Future[Result] = monadic[Future] {
  ???
}
Performing an operation
val result = op()
op().map { result =>
  ???
}
val result = op().each
Performing multiple operations one by one
val result1 = op1()
val result2 = op2(result1)
op1().flatMap { result1 =>
  op2(result1).map { result2 =>
    ???
  }
}
val result1 = op1().each
val result2 = op2(result1).each
Performing operations in a `for` loop
for (i <- 0 until 10) {
  op(i)
}
(0 until 10).foldLeft(Future.successful(())) { (headFuture, i) =>
  headFuture.flatMap { _ =>
    op(i)
  }
}
for (i <- (0 until 10).monadicLoop) {
  op(i).each
}
Performing operations in a `for`/`yield` comprehension
val all = for {
  i <- 0 until 10
} yield {
  op(i)
}
(0 until 10).foldLeft(Future.successful(Seq.empty)) { (headFuture, i) =>
  headFuture.flatMap { head =>
    op(i).map { currentValue =>
      head :+ currentValue
    }
  }
}.map { all =>
  ???
}
val all = (for {
  i <- (0 until 10).monadicLoop
} yield {
  op(i).each
}).underlying
Performing operations in a `for`/`yield` comprehension with a filter
val all = for {
  i <- 0 until 10
  if i < 3 || i > 7
} yield {
  op(i)
}
(0 until 10).foldLeft(Future.successful(Seq.empty)) { (headFuture, i) =>
  if (i < 3 || i > 7) {
    headFuture.flatMap { head =>
      op(i).map { currentValue =>
        head :+ currentValue
      }
    }
  } else {
    headFuture
  }
}.map { all =>
  ???
}
val all = (for {
  i <- (0 until 10).monadicLoop
  if i < 3 || i > 7
} yield {
  op(i).each
}).underlying

Links

Credits

Thanks for CP Lim and Ken Scambler who reviewed this blog. CP gave many feedbacks about the structure of this blog, the missing information about the basic asynchronous control flow, and the feeling of the MonadicLoop part. Ken made some inspiring comments about the Free monad and Codensity, and fixed huge number of English grammar errors.