Development

Zipper RX Magic

March 10, 2020

As a devel­op­er you want to be able to fire off requests with­out wor­ry­ing about what order they’re in or when they arrive. All you real­ly care about is when they’re com­plet­ed. And, with a lit­tle Rx mag­ic, that’s when you stum­ble upon Single.zip:

/**
 * Waits until all SingleSource sources provided by the Iterable sequence signal a success
 * value and calls a zipper function with an array of these values to return a result
 * to be emitted to downstream.
...

Next, you pass along your list of Single into Single.zip like so…

Single.zip(
  listOf(source1, source2, source3, ...)
) { results -> /* do something */ }
  .subscribeBy(
    onError = Timber::e,
    onSuccess = {
      celebrate()
    }
  )

Every­thing appears to be going smooth­ly. Until you push the app out into the world and begin receiv­ing crash reports.

The excep­tion couldn’t be deliv­ered to the user because it already canceled/​disposed the flow. That, or the excep­tion has nowhere to go in the first place.

Why is this? Like any respon­si­ble pro­gram­mer, you added an onError con­di­tion to your sub­scrip­tion. But, instead of see­ing it being called, you’re left with a crash that hap­pens after the sin­gle has been completed.

You ques­tion how this could be hap­pen­ing and begin search­ing for the answer. For­tu­nate­ly (or unfor­tu­nate­ly), you begin to real­ize it’s entire­ly by design..

RxJa­va 2 tries to avoid los­ing excep­tions which could be impor­tant to the devel­op­er even if it hap­pens after the nat­ur­al life­cy­cle of a flow.

Then you see one of the authors pro­pose a cou­ple of ​“solu­tions.”

Over­ride the default han­dler with RxJavaPlugins.setOnError() and sup­press what you don’t con­sid­er fatal. Alter­na­tive­ly, apply a per source onErrorReturn or onErrorResumeNext before zip­ping them together.

Though it would be nice to have a delayError flag sim­i­lar to Observable.zip, you’re out of luck. Hey, we all occa­sion­al­ly for­get to add an onErrorReturn to every one of our Single vari­ables (although I strong­ly rec­om­mend tak­ing this step).

Mov­ing for­ward, I’ve been able to pro­tect myself by using safeZip which auto­mat­i­cal­ly wraps all of your Singles, then returns all var­i­ous errors along the way in a sin­gle error at the end.

sealed class SafeResult<out T> {
  class Success<T>(val result: T): SafeResult<T>()
  class Failure<T>(val error: Throwable): SafeResult<Nothing>()
}
 
/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
 * to prevent any source from throwing. Then after all sources have completed, any errors
 * are then reported
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Except there’s still a prob­lem. If an emp­ty list is passed into Single.zip you will throw a java.util.NoSuchElementException excep­tion. Though this will be han­dled by an onError in the sub­scrip­tion, if this is part of a larg­er stream, then the stream will have been com­plet­ed. To avoid this issue you can make our safe zip­per that much safer by return­ing an emp­ty list when one is provided.

/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
to prevent any source from throwing. Then, after all sources have completed, any errors will then be reported.
 
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  if (sources.isEmpty()) {
    return Single.just(emptyList())
  }
 
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Suc­cess!

Scott Schmitz
Scott Schmitz
Staff Engineer

Looking for more like this?

Sign up for our monthly newsletter to receive helpful articles, case studies, and stories from our team.

The impact of tech stack choices on software ROI
Android Development iOS

The impact of tech stack choices on software ROI

October 30, 2024

Choosing the right tech stack is key to optimizing software performance, scalability, and maintenance costs. Learn how we work side-by-side with our clients to make sure their tech stack choices line up with their business goals, budget, and long-term vision.

Read more
How brain-computer-interfaces could improve human health
Business

How brain-computer-interfaces could improve human health

May 13, 2024

Read more
To RFP or not to RFP?
Business Process

To RFP or not to RFP?

January 19, 2024

Selecting the right digital product partner is not just about ticking boxes in a request for proposal (RFP). It’s more important to engage in a meaningful discovery process to find a collaborator who can turn your vision into a successful digital reality. Here are three things to watch out for as you search for the perfect digital collaborator.

Read more
View more articles