Development

Zipper RX Magic

March 10, 2020

As a devel­op­er you want to be able to fire off requests with­out wor­ry­ing about what order they’re in or when they arrive. All you real­ly care about is when they’re com­plet­ed. And, with a lit­tle Rx mag­ic, that’s when you stum­ble upon Single.zip:

/**
 * Waits until all SingleSource sources provided by the Iterable sequence signal a success
 * value and calls a zipper function with an array of these values to return a result
 * to be emitted to downstream.
...

Next, you pass along your list of Single into Single.zip like so…

Single.zip(
  listOf(source1, source2, source3, ...)
) { results -> /* do something */ }
  .subscribeBy(
    onError = Timber::e,
    onSuccess = {
      celebrate()
    }
  )

Every­thing appears to be going smooth­ly. Until you push the app out into the world and begin receiv­ing crash reports.

The excep­tion couldn’t be deliv­ered to the user because it already canceled/​disposed the flow. That, or the excep­tion has nowhere to go in the first place.

Why is this? Like any respon­si­ble pro­gram­mer, you added an onError con­di­tion to your sub­scrip­tion. But, instead of see­ing it being called, you’re left with a crash that hap­pens after the sin­gle has been completed.

You ques­tion how this could be hap­pen­ing and begin search­ing for the answer. For­tu­nate­ly (or unfor­tu­nate­ly), you begin to real­ize it’s entire­ly by design..

RxJa­va 2 tries to avoid los­ing excep­tions which could be impor­tant to the devel­op­er even if it hap­pens after the nat­ur­al life­cy­cle of a flow.

Then you see one of the authors pro­pose a cou­ple of ​“solu­tions.”

Over­ride the default han­dler with RxJavaPlugins.setOnError() and sup­press what you don’t con­sid­er fatal. Alter­na­tive­ly, apply a per source onErrorReturn or onErrorResumeNext before zip­ping them together.

Though it would be nice to have a delayError flag sim­i­lar to Observable.zip, you’re out of luck. Hey, we all occa­sion­al­ly for­get to add an onErrorReturn to every one of our Single vari­ables (although I strong­ly rec­om­mend tak­ing this step).

Mov­ing for­ward, I’ve been able to pro­tect myself by using safeZip which auto­mat­i­cal­ly wraps all of your Singles, then returns all var­i­ous errors along the way in a sin­gle error at the end.

sealed class SafeResult<out T> {
  class Success<T>(val result: T): SafeResult<T>()
  class Failure<T>(val error: Throwable): SafeResult<Nothing>()
}
 
/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
 * to prevent any source from throwing. Then after all sources have completed, any errors
 * are then reported
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Except there’s still a prob­lem. If an emp­ty list is passed into Single.zip you will throw a java.util.NoSuchElementException excep­tion. Though this will be han­dled by an onError in the sub­scrip­tion, if this is part of a larg­er stream, then the stream will have been com­plet­ed. To avoid this issue you can make our safe zip­per that much safer by return­ing an emp­ty list when one is provided.

/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
to prevent any source from throwing. Then, after all sources have completed, any errors will then be reported.
 
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  if (sources.isEmpty()) {
    return Single.just(emptyList())
  }
 
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Suc­cess!

Scott Schmitz
Scott Schmitz
Staff Engineer

Looking for more like this?

Sign up for our monthly newsletter to receive helpful articles, case studies, and stories from our team.

What to know about the cost of custom app development
Business Process

What to know about the cost of custom app development

January 10, 2024

We hear a lot of ideas for apps at MichiganLabs. People from large enterprises and small startups, located all over the world, call us to explore their mobile and web-based application ideas, and one of the first questions they ask is: How much is this app going to cost?

Read more
Why I use NextJS
Development Web

Why I use NextJS

December 21, 2022

Is NextJS right for your next project? In this post, David discusses three core functionalities that NextJS excels at, so that you can make a well-informed decision on your project’s major framework.

Read more
How our Associates are using AI tools: Advice for early-career developers
Development

How our Associates are using AI tools: Advice for early-career developers

August 13, 2024

Our 2024 Associates at Michigan Labs share their experiences using AI tools like GitHub Copilot and ChatGPT in software development. They discuss how these tools have enhanced their productivity, the challenges they've faced, and provide advice for using AI effectively.

Read more
View more articles