What is RxJava and How to Use it in Android Mobile Development?

April 13th, 2017

Mobile // Natalie

One of the things to consider when you are developing an Android mobile application is how your app will react to the user interactions, such as clicks, swipes, and other, while there are numerous processes taking place in the background.

Before your code becomes an unmanageable mess, use RxJava to orchestrate all these interactions in a simple way. There are many developer tools that notify about operations, but RxJava can help you react to the notifications. We have prepared a RxJava tutorial for android development, but before we start, let's see what is RxJava for android in general.

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Source: ReactiveX / RxJava on GitHub

The best way to work with data in Android applications is via using a Repository pattern. The repository is an interface that shows how actions with data work. Its composition depends on the current project’s tasks, but usually, it’s a combination of the following:

  1. Main (Root) Repository, responsible for managing all other repositories
  2. Remote Repository, responsible for working with remote API (REST)
  3. Database Repository, responsible for working with local databases (for example, SQLite, Realm, etc.)
  4. Memory Repository, responsible for working with memory data cache (for fast access to data from different places within the application)
  5. File Repository, responsible for working with file data caching in internal or external data storages (for example, Shared Preferences or some custom files)

Let's see how it all works.

Main Repository

As we mentioned above, the Main Repository has access to all other implementations (Memory, Database). It also handles all actions with data and decides how to sync data between them.

class MainRepository implements Repository {
  private Repository memoryRepository;
  private Repository databaseRepository;
  private Scheduler subscribeSc = Schedulers.from(BackgroundExecutor.getSafeBackgroundExecutor());
  private Scheduler observeSc = AndroidSchedulers.mainThread();
  MainRepository(@Memory Repository memoryRepository,
                         @Database Repository databaseRepository) {
   this.memoryRepository = memoryRepository;
   this.databaseRepository = databaseRepository;
  }

We use Dagger2 to inject repositories with marker annotations (@Memory, @Database). We also initialize Schedulers for subscribing (parallel thread) and observing (UI/main thread).

Repository for Account Data

Below is an example of a (part of the) Repository interface with contract methods for working with Account data.

public interface Repository {
Flowable<Account> addNewAccount(Account account);
Flowable<List<Account>> getAllAccounts();
Flowable<Account> updateAccount(Account account);
Flowable<Boolean> deleteAccount(int id);
.................................................
}

Repository for Database

At Grossum, we use a static method from RxJava API Flowable.fromCallable() to wrap methods that handle data directly in the database. Here’s how the Repository interface for working with local SQLite database looks like:

@Override
public Flowable<Account> addNewAccount(Account account) {
  return Flowable.fromCallable(() -> addNewAccountDB(account));
}
@Override
public Flowable<List<Account>> getAllAccounts() {
  return Flowable.fromCallable(this::getAllAccountsDB);
}
@Override
public Flowable<Account> updateAccount(Account account) {
  return Flowable.fromCallable(() -> updateAccountDB(account));
}
@Override
public Flowable<Boolean> deleteAccount(int id) {
  return Flowable.fromCallable(() -> deleteAccountDB(id));
}

Repository for Memory Data Cache

Memory data cache is used for quick and frequent access to the data stored in various places in the application. Again, we use Flowable.fromCallable() to wrap methods and Flowable.just() to wrap objects (in this case, List of Accounts). Here’s how the implementation code looks like:

@Override
public Flowable<Account> addNewAccount(Account account) {
  return Flowable.fromCallable(() -> {
      accountList.add(account);
      return account;
  });
}
@Override
public Flowable<Account> updateAccount(Account account) {
  return Flowable.fromCallable(() -> {
      int pos = accountList.indexOf(account);
      return pos >= 0 ?
                accountList.set(pos, account) : null;
  });
}
@Override
public Flowable<List<Account>> getAllAccounts() {
  return accountList == null ?
             null : Flowable.just(accountList);
}
@Override
public Flowable<Boolean> deleteAccount(int id) {
  return Flowable.fromCallable(() -> {
      int pos = getAccountPosById(id);
      boolean b = pos != -1;
      if (b) accountList.remove(pos);
      return b;
  });
}

Working with Accounts

Using addNewAccount method, we get Flowable by adding a new account to the database. After a successful operation, we replace it by Flowable from memory repository (using flatMap operation).

In memory, we add an account that was changed by database operations - for example, in case we didn’t know the ID of the new account and get it only after putting data into the database.

To implement getAllAccounts method, we need to get Flowable from the memory repository first. If it is not null and the data has not expired, it means we can get data from memory because it’s relevant. Otherwise, we should update the data in memory by getting it from the database.

@Override
public Flowable<Account> addNewAccount(Account account) {
  return databaseRepository.addNewAccount(account)
       .flatMap(account1 -> memoryRepository.addNewAccount(account1))
       .subscribeOn(subscribeSc)
       .observeOn(observeSc);
}
@Override
public Flowable<List<Account>> getAllAccounts() {
  Flowable<List<Account>> memoryObservable = memoryRepository.getAllAccounts();
  return memoryObservable != null && !isDataExpired() ?
       memoryObservable :
       loadAllDataFromDB()
               .flatMap(aBoolean -> {
                     return aBoolean ?
                              memoryRepository.getAllAccounts() :
                              databaseRepository.getAllAccounts();
               })
               .subscribeOn(subscribeSc)
               .observeOn(observeSc);
}

For updateAccount method implementation, we use flatMap operation (like in addNewAccount).

For deleteAccount, we should use combineLatest operation, which gives us data from Flowable only after all combined flowables will give some results.

@Override
public Flowable<Account> updateAccount(Account account) {
  return databaseRepository.updateAccount(account)
       .flatMap(account1 -> memoryRepository.updateAccount(account1))
       .subscribeOn(subscribeSc)
       .observeOn(observeSc);
}
@Override
public Flowable<Boolean> deleteAccount(int id) {
  return Flowable.combineLatest(
          databaseRepository.deleteAccount(id),
       memoryRepository.deleteAccount(id),
       (aBoolean, aBoolean2) -> aBoolean && aBoolean2)
       .subscribeOn(subscribeSc)
       .observeOn(observeSc);
}

Syncing Data Between Database and Memory

In order to synchronize relevant data between the database and memory repositories, we use loadAllDataFromDB method. It uses flatMap and combineLatest operations.

private Flowable<Boolean> loadAllDataFromDB() {
  return Flowable.combineLatest(
       databaseRepository.getAllAccounts(),
          databaseRepository.getAllTransactions(),
       databaseRepository.getAllDebts(),
       databaseRepository.getRates(),
       Data::new)
       .flatMap(data ->
               Flowable.combineLatest(
                          memoryRepository.setAllAccounts(data.getAccountList()),
                          memoryRepository.setAllTransactions(data.getTransactionList()),
                          memoryRepository.setAllDebts(data.getDebtList()),
                          memoryRepository.setRates(data.getRatesArray()),
                       (aBoolean, aBoolean2, aBoolean3, aBoolean4) ->
                               aBoolean && aBoolean2 && aBoolean3 && aBoolean4
               ))
       .subscribeOn(subscribeSc)
       .observeOn(observeSc);
}

Working with Models

Models are a part of an MVP architecture and there are several useful operations for it, such as:

  • map converts each object emitted by Flowable to another one, for example, a ViewModel for displaying.
  • flatMap(Flowable::fromIterable) is useful when we receive a list of data and we want to convert is to Flowable that emits each of the list members one by one
  • filter helps to find objects using predefined filter parameters
  • combineLatest combines data from several sources into one container and many other operations.
@Override
public Flowable<List<AccountViewModel>> getAccountList() {
  return repository.getAllAccounts()
          .map(this::transformAccountListToViewModelList);
}
@Override
public Flowable<Account> getAccountById(int id) {
  return repository.getAllAccounts()
       .flatMap(Flowable::fromIterable)
       .filter(account -> id == account.getId());
}
@Override
public Flowable<Pair<Map<String, double[]>, Map<String, double[]>>> getBalanceAndStatistic(int statisticPosition) {
  return Flowable.combineLatest(
          repository.getAccountsAmountSumGroupByTypeAndCurrency(),
          repository.getTransactionsStatistic(statisticPosition),
       Pair::new);
}

Working with Presenters

Presenters, like Models, are a part of the MVP architecture, and RxJava is pretty simple to use there. We need to subscribe to provided Flowable and implement onNext(), which gives us result data, and onError(), which is where we get exceptions in case of errors.

@Override
public void loadData() {
  model.getAccountList()
          .subscribe(
               accountList -> {
                   if (view != null) {
                          view.setAccountList(accountList);
                   }
               },
               Throwable::printStackTrace
          );
}
@Override
public void deleteAccountById(int id) {
  model.deleteAccountById(id)
          .subscribe(
               aBoolean -> {
                   if (aBoolean && view != null) {
                          view.deleteAccount();
                   }
               },
               Throwable::printStackTrace
          );
}

Working with SQLite Database

Background Executor is very useful when you need to work with SQLite database and you need the guarantee that your application will be successful in handling all these actions. You create a Scheduler from this executor and then use it for subscribing to Flowable.

public class BackgroundExecutor {
  private static final int CORE_POOL_SIZE = 1 ;
  private static final int MAXIMUM_POOL_SIZE = 3;
  private static final int KEEP_ALIVE = 1;
  private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<>(128);
  private static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
          CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue);
  public static Executor getSafeBackgroundExecutor() {
   return THREAD_POOL_EXECUTOR;
  }
}

Working with Math

The need for math calculations is not a rare one. When you need to perform math calculations with data that comes from Flowable (for example, get sum, average, min, max, etc.) RxJava Math extensions are incredibly useful.

Flowable.fromIterable(transactionsList)
   .map(Transaction::getGbpAmount)
   .to(MathFlowable::sumDouble)
   .subscribeOn(Schedulers.computation())
      .observeOn(AndroidSchedulers.mainThread())
   .subscribe(sum -> {
               if (view != null) {
                   String total = String.format(
                          context.getString(R.string.total_placeholder),
                          formatManager.doubleToStringFormatter(sum));
                      view.updateTransactionsTotal(total);
               }
           },
           Throwable::printStackTrace);

Migration to RxJava 2.x

In case you need to migrate from RxJava 1.x version to the latest RxJava 2.x, you should replace following dependencies by appropriate ones:

  • compile "io.reactivex:rxjava:1.2.7"
  • compile "io.reactivex:rxandroid:1.2.1"
  • compile "io.reactivex:rxjava-math:1.0.0"
  • compile "com.squareup.retrofit2:adapter-rxjava:2.2.0"
  • compile "io.reactivex.rxjava2:rxjava:2.0.7"
  • compile "io.reactivex.rxjava2:rxandroid:2.0.1"
  • compile "com.github.akarnokd:rxjava2-extensions:0.16.2"
  • compile "com.squareup.retrofit2:adapter-rxjava2:2.2.0"

Need Android mobile app development?

The information was originally presented by Ihor Bilous, Android Developer, at the Grossum's #DevInnerConf.

Author: Natalie

Natalie is a Project Manager who is a great team leader for her mobile development team. She is an expert when it comes to iOS and Android development and to building apps that win markets.

Tags Development Hacks

See all blog

x

Grossum Startup Guide:
Five Things to Consider Before Web & Mobile Development

Get the Guide