super T, ? extends Publisher<? extends R>> mapper; final Supplier<? extends Queue<T>> queueSupplier; final int prefetch; FluxConcatMap(Flux<? extends T> source, Function<? super T, ? extends Publisher<? extends R>> mapper, Supplier<? extends Queue<T>> queueSupplier, int prefetch) { super(source); if (prefetch <= 0) throw new IllegalArgumentException("prefetch <= 0"); this.mapper = Objects.requireNonNull(mapper, "mapper"); this.queueSupplier = Objects.requireNonNull(queueSupplier); this.prefetch = prefetch; } @Override public void subscribe(CoreSubscriber<? super R> actual) { source.subscribe(new ConcatMapImmediate<>(s, mapper, queueSupplier, prefetch)); }