Development

Zipper RX Magic

March 10, 2020

As a devel­op­er you want to be able to fire off requests with­out wor­ry­ing about what order they’re in or when they arrive. All you real­ly care about is when they’re com­plet­ed. And, with a lit­tle Rx mag­ic, that’s when you stum­ble upon Single.zip:

/**
 * Waits until all SingleSource sources provided by the Iterable sequence signal a success
 * value and calls a zipper function with an array of these values to return a result
 * to be emitted to downstream.
...

Next, you pass along your list of Single into Single.zip like so…

Single.zip(
  listOf(source1, source2, source3, ...)
) { results -> /* do something */ }
  .subscribeBy(
    onError = Timber::e,
    onSuccess = {
      celebrate()
    }
  )

Every­thing appears to be going smooth­ly. Until you push the app out into the world and begin receiv­ing crash reports.

The excep­tion couldn’t be deliv­ered to the user because it already canceled/​disposed the flow. That, or the excep­tion has nowhere to go in the first place.

Why is this? Like any respon­si­ble pro­gram­mer, you added an onError con­di­tion to your sub­scrip­tion. But, instead of see­ing it being called, you’re left with a crash that hap­pens after the sin­gle has been completed.

You ques­tion how this could be hap­pen­ing and begin search­ing for the answer. For­tu­nate­ly (or unfor­tu­nate­ly), you begin to real­ize it’s entire­ly by design..

RxJa­va 2 tries to avoid los­ing excep­tions which could be impor­tant to the devel­op­er even if it hap­pens after the nat­ur­al life­cy­cle of a flow.

Then you see one of the authors pro­pose a cou­ple of ​“solu­tions.”

Over­ride the default han­dler with RxJavaPlugins.setOnError() and sup­press what you don’t con­sid­er fatal. Alter­na­tive­ly, apply a per source onErrorReturn or onErrorResumeNext before zip­ping them together.

Though it would be nice to have a delayError flag sim­i­lar to Observable.zip, you’re out of luck. Hey, we all occa­sion­al­ly for­get to add an onErrorReturn to every one of our Single vari­ables (although I strong­ly rec­om­mend tak­ing this step).

Mov­ing for­ward, I’ve been able to pro­tect myself by using safeZip which auto­mat­i­cal­ly wraps all of your Singles, then returns all var­i­ous errors along the way in a sin­gle error at the end.

sealed class SafeResult<out T> {
  class Success<T>(val result: T): SafeResult<T>()
  class Failure<T>(val error: Throwable): SafeResult<Nothing>()
}
 
/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
 * to prevent any source from throwing. Then after all sources have completed, any errors
 * are then reported
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Except there’s still a prob­lem. If an emp­ty list is passed into Single.zip you will throw a java.util.NoSuchElementException excep­tion. Though this will be han­dled by an onError in the sub­scrip­tion, if this is part of a larg­er stream, then the stream will have been com­plet­ed. To avoid this issue you can make our safe zip­per that much safer by return­ing an emp­ty list when one is provided.

/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
to prevent any source from throwing. Then, after all sources have completed, any errors will then be reported.
 
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  if (sources.isEmpty()) {
    return Single.just(emptyList())
  }
 
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Suc­cess!

Scott Schmitz
Scott Schmitz
Staff Engineer

Looking for more like this?

Sign up for our monthly newsletter to receive helpful articles, case studies, and stories from our team.

Cross tab navigation in Jetpack Compose
Android Development

Cross tab navigation in Jetpack Compose

October 4, 2022

Learn how to use Android's Jetpack Compose to navigate from a screen inside of one NavigationBarItem to another.

Read more
MichiganLabs’ approach to product design: A strategic, problem-solving process
Design Team

MichiganLabs’ approach to product design: A strategic, problem-solving process

February 12, 2024

Product design, or UX design, is a strategic problem-solving process that leads to a valuable digital product. Learn what to expect when working with product designers for your custom software.

Read more
The benefits of open source technology for businesses
Business Development

The benefits of open source technology for businesses

March 29, 2024

Read more
View more articles