Observable<T> createObservable(final RoomDatabase database, final boolean inTransaction, final String[] tableNames, final Callable<T> callable) { Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction)); final Maybe<T> maybe = Maybe.fromCallable(callable); return createObservable(database, tableNames) .subscribeOn(scheduler) .unsubscribeOn(scheduler) .observeOn(scheduler) .flatMapMaybe(new Function<Object, MaybeSource<T>>() { @Override public MaybeSource<T> apply(Object o) throws Exception { return maybe; } }); } private static Executor getExecutor(RoomDatabase database, boolean inTransaction) { if (inTransaction) { return database.getTransactionExecutor(); } else { return database.getQueryExecutor(); } } }