diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..3f39f6b --- /dev/null +++ b/build.gradle @@ -0,0 +1,39 @@ + +plugins { + id 'java-library' +} + +repositories { + mavenLocal() + mavenCentral() +} + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } +} + +compileJava { + options.compilerArgs << '-Xlint:unchecked' + options.compilerArgs << '-Xlint:rawtypes' + options.release = 17 +} + +compileTestJava { + options.compilerArgs << '-Xlint:unchecked' + options.compilerArgs << '-Xlint:rawtypes' + options.release = 17 +} + +dependencies { + api 'io.vavr:vavr:0.10.4' + + testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' + testImplementation("org.mockito:mockito-core:4.5.1") + testImplementation("org.mockito:mockito-junit-jupiter:4.5.1") +} + +tasks.named('test') { + useJUnitPlatform() +} \ No newline at end of file diff --git a/lib/build.gradle b/lib/build.gradle deleted file mode 100644 index 3f39f6b..0000000 --- a/lib/build.gradle +++ /dev/null @@ -1,39 +0,0 @@ - -plugins { - id 'java-library' -} - -repositories { - mavenLocal() - mavenCentral() -} - -java { - toolchain { - languageVersion = JavaLanguageVersion.of(17) - } -} - -compileJava { - options.compilerArgs << '-Xlint:unchecked' - options.compilerArgs << '-Xlint:rawtypes' - options.release = 17 -} - -compileTestJava { - options.compilerArgs << '-Xlint:unchecked' - options.compilerArgs << '-Xlint:rawtypes' - options.release = 17 -} - -dependencies { - api 'io.vavr:vavr:0.10.4' - - testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' - testImplementation("org.mockito:mockito-core:4.5.1") - testImplementation("org.mockito:mockito-junit-jupiter:4.5.1") -} - -tasks.named('test') { - useJUnitPlatform() -} \ No newline at end of file diff --git a/lib/src/main/java/com/github/tonivade/vavr/effect/Fiber.java b/lib/src/main/java/com/github/tonivade/vavr/effect/Fiber.java deleted file mode 100644 index d934c88..0000000 --- a/lib/src/main/java/com/github/tonivade/vavr/effect/Fiber.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo - * Distributed under the terms of the MIT License - */ -package com.github.tonivade.vavr.effect; - -public sealed interface Fiber { - - IO join(); - - IO cancel(); - - static Fiber of(IO join, IO cancel) { - return new FiberImpl<>(join, cancel); - } - - record FiberImpl(IO join, IO cancel) implements Fiber {} -} \ No newline at end of file diff --git a/lib/src/main/java/com/github/tonivade/vavr/effect/IO.java b/lib/src/main/java/com/github/tonivade/vavr/effect/IO.java deleted file mode 100644 index aa179af..0000000 --- a/lib/src/main/java/com/github/tonivade/vavr/effect/IO.java +++ /dev/null @@ -1,880 +0,0 @@ -/* - * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo - * Distributed under the terms of the MIT License - */ -package com.github.tonivade.vavr.effect; - -import static io.vavr.Function1.identity; -import java.time.Duration; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.function.UnaryOperator; -import io.vavr.CheckedConsumer; -import io.vavr.CheckedRunnable; -import io.vavr.Function1; -import io.vavr.Function2; -import io.vavr.PartialFunction; -import io.vavr.Tuple; -import io.vavr.Tuple2; -import io.vavr.collection.HashMap; -import io.vavr.collection.List; -import io.vavr.collection.Seq; -import io.vavr.concurrent.Future; -import io.vavr.concurrent.Promise; -import io.vavr.control.Either; -import io.vavr.control.Option; -import io.vavr.control.Try; - -public sealed interface IO { - - IO UNIT = pure(Unit.unit()); - - default Future runAsync() { - return runAsync(this, IOConnection.UNCANCELLABLE).future(); - } - - default Future runAsync(Executor executor) { - return forked(executor).andThen(this).runAsync(); - } - - default T unsafeRunSync() { - return safeRunSync().get(); - } - - default Try safeRunSync() { - return runAsync().toTry(); - } - - default void safeRunAsync(Consumer> callback) { - safeRunAsync(Future.DEFAULT_EXECUTOR, callback); - } - - default void safeRunAsync(Executor executor, Consumer> callback) { - runAsync(executor).onComplete(callback); - } - - default IO map(Function1 map) { - return flatMap(map.andThen(IO::pure)); - } - - default IO flatMap(Function1> map) { - return new FlatMapped<>(this, map.andThen(IO::narrowK)); - } - - default IO andThen(IO after) { - return flatMap(ignore -> after); - } - - default IO ap(IO> apply) { - return parMap2(Future.DEFAULT_EXECUTOR, this, apply, (v, a) -> a.apply(v)); - } - - default IO> attempt() { - return map(Try::success).recover(Try::failure); - } - - default IO> either() { - return attempt().map(Try::toEither); - } - - default IO> either(Function1 mapError, - Function1 mapper) { - return either().map(either -> either.bimap(mapError, mapper)); - } - - default IO redeem(Function1 mapError, - Function1 mapper) { - return attempt().map(result -> result.fold(mapError, mapper)); - } - - default IO redeemWith(Function1> mapError, - Function1> mapper) { - return attempt().flatMap(result -> result.fold(mapError, mapper)); - } - - default IO recover(Function1 mapError) { - return recoverWith(partialFunction(x -> true, mapError.andThen(IO::pure))); - } - - @SuppressWarnings("unchecked") - default IO recover(Class type, Function1 function) { - return recoverWith(partialFunction(error -> error.getClass().equals(type), t -> function.andThen(IO::pure).apply((X) t))); - } - - @SuppressWarnings("serial") - static PartialFunction partialFunction(Predicate matcher, Function1 function) { - return new PartialFunction<>() { - - @Override - public boolean isDefinedAt(A value) { - return matcher.test(value); - } - - @Override - public B apply(A t) { - return function.apply(t); - } - }; - } - - default IO recoverWith(PartialFunction> mapper) { - return new Recover<>(this, mapper); - } - - default IO> timed() { - return IO.task(System::nanoTime).flatMap( - start -> map(result -> Tuple.of(Duration.ofNanos(System.nanoTime() - start), result))); - } - - default IO> fork() { - return async(callback -> { - IOConnection connection = IOConnection.cancellable(); - Promise promise = runAsync(this, connection); - - IO join = fromFuture(promise.future()); - IO cancel = exec(connection::cancel); - - callback.accept(Try.success(Fiber.of(join, cancel))); - }); - } - - default IO timeout(Duration duration) { - return timeout(Future.DEFAULT_EXECUTOR, duration); - } - - default IO timeout(Executor executor, Duration duration) { - return racePair(executor, this, sleep(duration)).flatMap(either -> either.fold( - ta -> ta._2().cancel().map(x -> ta._1()), - tb -> tb._1().cancel().flatMap(x -> IO.raiseError(new TimeoutException())))); - } - - default IO repeat() { - return repeat(1); - } - - default IO repeat(int times) { - return repeat(this, unit(), times); - } - - default IO repeat(Duration delay) { - return repeat(delay, 1); - } - - default IO repeat(Duration delay, int times) { - return repeat(this, sleep(delay), times); - } - - default IO retry() { - return retry(1); - } - - default IO retry(int maxRetries) { - return retry(this, unit(), maxRetries); - } - - default IO retry(Duration delay) { - return retry(delay, 1); - } - - default IO retry(Duration delay, int maxRetries) { - return retry(this, sleep(delay), maxRetries); - } - - @SuppressWarnings("unchecked") - static IO narrowK(IO value) { - return (IO) value; - } - - static IO pure(T value) { - return new Pure<>(value); - } - - static IO> race(IO fa, IO fb) { - return race(Future.DEFAULT_EXECUTOR, fa, fb); - } - - static IO> race(Executor executor, IO fa, IO fb) { - return racePair(executor, fa, fb).flatMap(either -> either.fold( - ta -> ta._2().cancel().map(x -> Either.left(ta._1())), - tb -> tb._1().cancel().map(x -> Either.right(tb._2())))); - } - - static IO>, Tuple2, B>>> racePair(Executor executor, IO fa, IO fb) { - return cancellable(callback -> { - - IOConnection connection1 = IOConnection.cancellable(); - IOConnection connection2 = IOConnection.cancellable(); - - Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); - Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); - - promiseA.future().onComplete(result -> callback.accept( - result.map(a -> Either.left(Tuple.of(a, Fiber.of(IO.fromFuture(promiseB.future()), IO.exec(connection2::cancel))))))); - promiseB.future().onComplete(result -> callback.accept( - result.map(b -> Either.right(Tuple.of(Fiber.of(IO.fromFuture(promiseA.future()), IO.exec(connection2::cancel)), b))))); - - return IO.exec(() -> { - try { - connection1.cancel(); - } finally { - connection2.cancel(); - } - }); - }); - } - - static IO raiseError(Throwable error) { - return new Failure<>(error); - } - - static IO delay(Duration delay, Supplier lazy) { - return sleep(delay).andThen(task(lazy)); - } - - static IO suspend(Supplier> lazy) { - return new Suspend<>(lazy); - } - - static Function1> lift(Function1 task) { - return task.andThen(IO::pure); - } - - public static Function1> liftOption(Function1> function) { - return value -> fromOption(function.apply(value)); - } - - public static Function1> liftTry(Function1> function) { - return value -> fromTry(function.apply(value)); - } - - public static Function1> liftEither(Function1> function) { - return value -> fromEither(function.apply(value)); - } - - static IO fromOption(Option task) { - return fromEither(toEither(task)); - } - - static IO fromTry(Try task) { - return fromEither(task.toEither()); - } - - static IO fromEither(Either task) { - return task.fold(IO::raiseError, IO::pure); - } - - static IO fromFuture(Future promise) { - CheckedConsumer>> callback = promise::onComplete; - return async(callback); - } - - static IO fromCompletableFuture(CompletableFuture promise) { - return fromFuture(Future.fromCompletableFuture(promise)); - } - - static IO sleep(Duration duration) { - return sleep(Future.DEFAULT_EXECUTOR, duration); - } - - static IO sleep(Executor executor, Duration duration) { - return cancellable(callback -> { - Future sleep = FutureModule.sleep(executor,duration) - .onComplete(result -> callback.accept(Try.success(Unit.unit()))); - return IO.exec(() -> sleep.cancel(true)); - }); - } - - static IO exec(CheckedRunnable task) { - return task(asSupplier(task)); - } - - static IO task(Supplier producer) { - return new Delay<>(producer); - } - - static IO never() { - return async(callback -> {}); - } - - static IO forked() { - return forked(Future.DEFAULT_EXECUTOR); - } - - static IO forked(Executor executor) { - return async(callback -> executor.execute(() -> callback.accept(Try.success(Unit.unit())))); - } - - static IO async(CheckedConsumer>> callback) { - return cancellable(asFunction(callback)); - } - - static IO cancellable(Function1>, IO> callback) { - return new Async<>(callback); - } - - static IO>> memoize(Function1> function) { - return memoize(Future.DEFAULT_EXECUTOR, function); - } - - static IO>> memoize(Executor executor, Function1> function) { - var ref = Ref.make(HashMap.>empty()); - return ref.map(r -> { - Function1>> result = a -> r.modify(map -> map.get(a).fold(() -> { - Promise promise = Promise.make(); - function.apply(a).safeRunAsync(executor, promise::tryComplete); - return Tuple.of(IO.fromFuture(promise.future()), map.put(a, promise)); - }, promise -> Tuple.of(IO.fromFuture(promise.future()), map))); - return result.andThen(io -> io.flatMap(identity())); - }); - } - - static IO unit() { - return UNIT; - } - - static IO bracket(IO acquire, - Function1> use, Function1> release) { - return cancellable(callback -> { - - IOConnection cancellable = IOConnection.cancellable(); - - Promise promise = runAsync(acquire, cancellable); - - promise.future() - .onFailure(error -> callback.accept(Try.failure(error))) - .onSuccess(resource -> runAsync(use.andThen(IO::narrowK).apply(resource), cancellable).future() - .onComplete(result -> runAsync(release.andThen(IO::narrowK).apply(resource), cancellable).future() - .onComplete(ignore -> callback.accept(result)) - )); - - return IO.exec(cancellable::cancel); - }); - } - - static IO bracket(IO acquire, - Function1> use, CheckedConsumer release) { - return bracket(acquire, use, asFunction(release)); - } - - static IO bracket(IO acquire, - Function1> use) { - return bracket(acquire, use, AutoCloseable::close); - } - - static IO sequence(Seq> sequence) { - IO initial = IO.unit(); - return sequence.foldLeft(initial, (IO a, IO b) -> a.andThen(b)).andThen(IO.unit()); - } - - static IO> traverse(Seq> sequence) { - return traverse(Future.DEFAULT_EXECUTOR, sequence); - } - - static IO> traverse(Executor executor, Seq> sequence) { - return sequence.foldLeft(pure(List.empty()), - (IO> xs, IO a) -> parMap2(executor, xs, a, Seq::append)); - } - - static IO parMap2(IO fa, IO fb, - Function2 mapper) { - return parMap2(Future.DEFAULT_EXECUTOR, fa, fb, mapper); - } - - static IO parMap2(Executor executor, IO fa, IO fb, - Function2 mapper) { - return cancellable(callback -> { - - IOConnection connection1 = IOConnection.cancellable(); - IOConnection connection2 = IOConnection.cancellable(); - - Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); - Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); - - promiseA.future().onComplete(a -> promiseB.future().onComplete(b -> callback.accept(map2(a, b, mapper)))); - - return IO.exec(() -> { - try { - connection1.cancel(); - } finally { - connection2.cancel(); - } - }); - }); - } - - static IO> tuple(IO fa, IO fb) { - return tuple(Future.DEFAULT_EXECUTOR, fa, fb); - } - - static IO> tuple(Executor executor, IO fa, IO fb) { - return parMap2(executor, fa, fb, Tuple::of); - } - - private static Promise runAsync(IO current, IOConnection connection) { - return runAsync(current, connection, new CallStack<>(), Promise.make()); - } - - private static Try map2(Try a, Try b, Function2 mapper) { - return a.flatMap(x -> b.map(y -> mapper.apply(x, y))); - } - - private static Either toEither(Option task) { - return task.fold(() -> Either.left(new NoSuchElementException()), Either::right); - } - - private static Supplier asSupplier(CheckedRunnable task) { - return () -> { - task.unchecked().run(); - return Unit.unit(); - }; - } - - private static Function1> asFunction(CheckedConsumer release) { - return t -> { - release.unchecked().accept(t); - return unit(); - }; - } - - @SuppressWarnings("unchecked") - private static Promise runAsync(IO current, IOConnection connection, CallStack stack, Promise promise) { - while (true) { - try { - current = unwrap(current, stack, identity()); - - if (current instanceof Pure pure) { - return promise.success(pure.value); - } - - if (current instanceof Async async) { - return executeAsync(async, connection, promise); - } - - if (current instanceof FlatMapped) { - stack.push(); - - var flatMapped = (FlatMapped) current; - IO source = IO.narrowK(unwrap(flatMapped.current, stack, u -> u.flatMap(flatMapped.next))); - - if (source instanceof Async async) { - Promise nextPromise = Promise.make(); - - nextPromise.future().andThen(tryU -> tryU.onFailure(promise::failure) - .onSuccess(u -> runAsync(IO.narrowK(flatMapped.next.apply(u)), connection, stack, promise))); - - executeAsync(async, connection, nextPromise); - - return promise; - } - - if (source instanceof Pure pure) { - Function1> andThen = flatMapped.next.andThen(IO::narrowK); - current = andThen.apply(pure.value); - } else if (source instanceof FlatMapped) { - FlatMapped flatMapped2 = (FlatMapped) source; - current = flatMapped2.current.flatMap(a -> flatMapped2.next.apply(a).flatMap(flatMapped.next)); - } - } else { - stack.pop(); - } - } catch (Throwable error) { - Option> result = stack.tryHandle(error); - - if (result.isDefined()) { - current = result.get(); - } else { - return promise.failure(error); - } - } - } - } - - private static IO unwrap(IO current, CallStack stack, Function1, IO> next) { - while (true) { - if (current instanceof Failure failure) { - return stack.sneakyThrow(failure.error); - } else if (current instanceof Recover recover) { - stack.add(partialFunction(recover.mapper::isDefinedAt, recover.mapper.andThen(next))); - current = recover.current; - } else if (current instanceof Suspend suspend) { - Supplier> andThen = () -> IO.narrowK(suspend.lazy.get()); - current = andThen.get(); - } else if (current instanceof Delay delay) { - return IO.pure(delay.task.get()); - } else if (current instanceof Pure) { - return current; - } else if (current instanceof FlatMapped) { - return current; - } else if (current instanceof Async) { - return current; - } else { - throw new IllegalStateException(); - } - } - } - - private static Promise executeAsync(Async current, IOConnection connection, Promise promise) { - if (connection.isCancellable() && !connection.updateState(StateIO::startingNow).isRunnable()) { - return promise.complete(Try.failure(new CancellationException())); - } - - connection.setCancelToken(current.callback.apply(promise::tryComplete)); - - promise.future().andThen(x -> connection.setCancelToken(UNIT)); - - if (connection.isCancellable() && connection.updateState(StateIO::notStartingNow).isCancellingNow()) { - connection.cancelNow(); - } - - return promise; - } - - private static IO repeat(IO self, IO pause, int times) { - return self.redeemWith(IO::raiseError, value -> { - if (times > 0) { - return pause.andThen(repeat(self, pause, times - 1)); - } else return IO.pure(value); - }); - } - - private static IO retry(IO self, IO pause, int maxRetries) { - return self.redeemWith(error -> { - if (maxRetries > 0) { - return pause.andThen(retry(self, pause.repeat(), maxRetries - 1)); - } else return IO.raiseError(error); - }, IO::pure); - } - - final class Pure implements IO { - - private final T value; - - private Pure(T value) { - this.value = Objects.requireNonNull(value); - } - - @Override - public String toString() { - return "Pure(" + value + ")"; - } - } - - final class Failure implements IO { - - private final Throwable error; - - private Failure(Throwable error) { - this.error = Objects.requireNonNull(error); - } - - @Override - public String toString() { - return "Failure(" + error + ")"; - } - } - - final class FlatMapped implements IO { - - private final IO current; - private final Function1> next; - - private FlatMapped(IO current, - Function1> next) { - this.current = Objects.requireNonNull(current); - this.next = Objects.requireNonNull(next); - } - - @Override - public String toString() { - return "FlatMapped(" + current + ", ?)"; - } - } - - final class Delay implements IO { - - private final Supplier task; - - private Delay(Supplier task) { - this.task = Objects.requireNonNull(task); - } - - @Override - public String toString() { - return "Delay(?)"; - } - } - - final class Async implements IO { - - private final Function1>, IO> callback; - - private Async(Function1>, IO> callback) { - this.callback = Objects.requireNonNull(callback); - } - - @Override - public String toString() { - return "Async(?)"; - } - } - - final class Suspend implements IO { - - private final Supplier> lazy; - - private Suspend(Supplier> lazy) { - this.lazy = Objects.requireNonNull(lazy); - } - - @Override - public String toString() { - return "Suspend(?)"; - } - } - - final class Recover implements IO { - - private final IO current; - private final PartialFunction> mapper; - - private Recover(IO current, PartialFunction> mapper) { - this.current = Objects.requireNonNull(current); - this.mapper = Objects.requireNonNull(mapper); - } - - @Override - public String toString() { - return "Recover(" + current + ", ?)"; - } - } -} - -interface IOConnection { - - IOConnection UNCANCELLABLE = new IOConnection() { - @Override - public boolean isCancellable() { return false; } - - @Override - public void setCancelToken(IO cancel) { } - - @Override - public void cancelNow() { } - - @Override - public void cancel() { } - - @Override - public StateIO updateState(UnaryOperator update) { return StateIO.INITIAL; } - }; - - boolean isCancellable(); - - void setCancelToken(IO cancel); - - void cancelNow(); - - void cancel(); - - StateIO updateState(UnaryOperator update); - - static IOConnection cancellable() { - return new IOConnection() { - - private IO cancelToken; - private final AtomicReference state = new AtomicReference<>(StateIO.INITIAL); - - @Override - public boolean isCancellable() { return true; } - - @Override - public void setCancelToken(IO cancel) { this.cancelToken = Objects.requireNonNull(cancel); } - - @Override - public void cancelNow() { cancelToken.runAsync(); } - - @Override - public void cancel() { - if (state.getAndUpdate(StateIO::cancellingNow).isCancelable()) { - cancelNow(); - - state.set(StateIO.CANCELLED); - } - } - - @Override - public StateIO updateState(UnaryOperator update) { - return state.updateAndGet(update::apply); - } - }; - } -} - -final class StateIO { - - public static final StateIO INITIAL = new StateIO(false, false, false); - public static final StateIO CANCELLED = new StateIO(true, false, false); - - private final boolean isCancelled; - private final boolean cancellingNow; - private final boolean startingNow; - - public StateIO(boolean isCancelled, boolean cancellingNow, boolean startingNow) { - this.isCancelled = isCancelled; - this.cancellingNow = cancellingNow; - this.startingNow = startingNow; - } - - public boolean isCancelled() { - return isCancelled; - } - - public boolean isCancellingNow() { - return cancellingNow; - } - - public boolean isStartingNow() { - return startingNow; - } - - public StateIO cancellingNow() { - return new StateIO(isCancelled, true, startingNow); - } - - public StateIO startingNow() { - return new StateIO(isCancelled, cancellingNow, true); - } - - public StateIO notStartingNow() { - return new StateIO(isCancelled, cancellingNow, false); - } - - public boolean isCancelable() { - return !isCancelled && !cancellingNow && !startingNow; - } - - public boolean isRunnable() { - return !isCancelled && !cancellingNow; - } -} - -final class CallStack { - - private StackItem top = new StackItem<>(); - - public void push() { - top.push(); - } - - public void pop() { - if (top.count() > 0) { - top.pop(); - } else { - top = top.prev(); - } - } - - public void add(PartialFunction> mapError) { - if (top.count() > 0) { - top.pop(); - top = new StackItem<>(top); - } - top.add(mapError); - } - - public Option> tryHandle(Throwable error) { - while (top != null) { - top.reset(); - Option> result = top.tryHandle(error); - - if (result.isDefined()) { - return result; - } else { - top = top.prev(); - } - } - return Option.none(); - } - - // XXX: https://www.baeldung.com/java-sneaky-throws - @SuppressWarnings("unchecked") - public R sneakyThrow(Throwable t) throws X { - throw (X) t; - } -} - -final class StackItem { - - private int count = 0; - private final Deque>> recover = new ArrayDeque<>(); - - private final StackItem prev; - - public StackItem() { - this(null); - } - - public StackItem(StackItem prev) { - this.prev = prev; - } - - public StackItem prev() { - return prev; - } - - public int count() { - return count; - } - - public void push() { - count++; - } - - public void pop() { - count--; - } - - public void reset() { - count = 0; - } - - public void add(PartialFunction> mapError) { - recover.addFirst(mapError); - } - - public Option> tryHandle(Throwable error) { - while (!recover.isEmpty()) { - var mapError = recover.removeFirst(); - if (mapError.isDefinedAt(error)) { - return Option.some(mapError.andThen(IO::narrowK).apply(error)); - } - } - return Option.none(); - } -} - -interface FutureModule { - - ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(0); - - static Future sleep(Executor executor, Duration delay) { - return Future.fromCompletableFuture(executor, CompletableFuture.supplyAsync(Unit::unit, delayedExecutor(delay, executor))); - } - - static Executor delayedExecutor(Duration delay, Executor executor) { - return task -> SCHEDULER.schedule(() -> executor.execute(task), delay.toMillis(), TimeUnit.MILLISECONDS); - } -} \ No newline at end of file diff --git a/lib/src/main/java/com/github/tonivade/vavr/effect/Ref.java b/lib/src/main/java/com/github/tonivade/vavr/effect/Ref.java deleted file mode 100644 index 5dd2abf..0000000 --- a/lib/src/main/java/com/github/tonivade/vavr/effect/Ref.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo - * Distributed under the terms of the MIT License - */ -package com.github.tonivade.vavr.effect; - -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.UnaryOperator; - -import io.vavr.Function1; -import io.vavr.Tuple2; - -public final class Ref { - - private final AtomicReference value; - - private Ref(AtomicReference value) { - this.value = Objects.requireNonNull(value); - } - - public IO get() { - return IO.task(value::get); - } - - public IO set(A newValue) { - return IO.task(() -> { value.set(newValue); return Unit.unit(); }); - } - - public IO modify(Function1> change) { - return IO.task(() -> { - var loop = true; - B result = null; - while (loop) { - A current = value.get(); - var tuple = change.apply(current); - result = tuple._1(); - loop = !value.compareAndSet(current, tuple._2()); - } - return result; - }); - } - - public IO lazySet(A newValue) { - return IO.task(() -> { value.lazySet(newValue); return Unit.unit(); }); - } - - public IO getAndSet(A newValue) { - return IO.task(() -> value.getAndSet(newValue)); - } - - public IO updateAndGet(UnaryOperator update) { - return IO.task(() -> value.updateAndGet(update::apply)); - } - - public IO getAndUpdate(UnaryOperator update) { - return IO.task(() -> value.getAndUpdate(update::apply)); - } - - public static IO> make(A value) { - return IO.pure(of(value)); - } - - public static Ref of(A value) { - return new Ref<>(new AtomicReference<>(value)); - } - - @Override - public String toString() { - return String.format("Ref(%s)", value.get()); - } -} diff --git a/lib/src/main/java/com/github/tonivade/vavr/effect/Unit.java b/lib/src/main/java/com/github/tonivade/vavr/effect/Unit.java deleted file mode 100644 index 11b3583..0000000 --- a/lib/src/main/java/com/github/tonivade/vavr/effect/Unit.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo - * Distributed under the terms of the MIT License - */ -package com.github.tonivade.vavr.effect; - -import java.io.Serial; -import java.io.Serializable; - -/** - * Type that represents a single value called Unit. - */ -public final class Unit implements Serializable { - - @Serial - private static final long serialVersionUID = -8253613036328680583L; - - private static final Unit INSTANCE = new Unit(); - - private Unit() {} - - public static Unit unit() { - return INSTANCE; - } - - @Override - public String toString() { - return "Unit"; - } - - @Serial - private Object readResolve() { - return INSTANCE; - } -} diff --git a/lib/src/test/java/com/github/tonivade/vavr/effect/IOTest.java b/lib/src/test/java/com/github/tonivade/vavr/effect/IOTest.java deleted file mode 100644 index 59d9477..0000000 --- a/lib/src/test/java/com/github/tonivade/vavr/effect/IOTest.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo - * Distributed under the terms of the MIT License - */ -package com.github.tonivade.vavr.effect; - -import static org.junit.jupiter.api.Assertions.assertAll; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.time.Duration; -import java.util.NoSuchElementException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import io.vavr.CheckedFunction1; -import io.vavr.Function1; -import io.vavr.Tuple2; -import io.vavr.collection.List; -import io.vavr.collection.Seq; -import io.vavr.concurrent.Future; -import io.vavr.control.Either; -import io.vavr.control.Try; - -@ExtendWith(MockitoExtension.class) -class IOTest { - - @Test - void pure() { - IO pure = IO.pure("hola mundo"); - - assertAll( - () -> assertEquals("hola mundo", pure.unsafeRunSync()), - () -> assertEquals("HOLA MUNDO", pure.map(String::toUpperCase).unsafeRunSync()), - () -> assertArrayEquals(new String[] { "hola", "mundo" }, - pure.flatMap(string -> IO.task(() -> string.split(" "))).unsafeRunSync()), - () -> assertEquals(Integer.valueOf(100), pure.andThen(IO.task(() -> 100)).unsafeRunSync())); - } - - @Test - void asyncSuccess() { - IO async = IO.async(callback -> { - System.out.println(Thread.currentThread().getName()); - Thread.sleep(100); - callback.accept(Try.success("1")); - }); - - Future foldMap = IO.forked().andThen(async).runAsync(); - - assertEquals("1", foldMap.get()); - } - - @Test - void asyncFailure() { - IO async = IO.async(callback -> { - Thread.sleep(100); - callback.accept(Try.failure(new UnsupportedOperationException())); - }); - - Future foldMap = IO.forked().andThen(async).runAsync(); - - assertThrows(UnsupportedOperationException.class, foldMap::get); - } - -// @Test -// void echo() { -// IO echo = narrowK(console.println("write your name")) -// .andThen(narrowK(console.readln())) -// .flatMap(name -> narrowK(console.println("Hello " + name))) -// .andThen(narrowK(console.println("end"))); -// -// ConsoleExecutor executor = new ConsoleExecutor().read("Toni"); -// -// executor.run(echo); -// -// assertEquals("write your name\nHello Toni\nend\n", executor.getOutput()); -// } - - @Test - void safeRunAsync() { - IO> program = currentThreadIO(); - - Try> result = program.runAsync().await(1, TimeUnit.SECONDS).toTry(); - - assertEquals(Try.success(5), result.map(List::size)); - } - - @Test - void bracket() throws SQLException { - ResultSet resultSet = mock(ResultSet.class); - when(resultSet.getString("id")).thenReturn("value"); - - IO bracket = IO.bracket(open(resultSet), IO.liftTry(tryGetString("id"))); - - assertEquals("value", bracket.unsafeRunSync()); - verify(resultSet).close(); - } - - @Test - void safeRunAsyncSuccess(@Mock Consumer> callback) { - IO.pure("hola").safeRunAsync(callback); - - verify(callback, timeout(1000)).accept(Try.success("hola")); - } - - @Test - void unsafeRunAsyncFailure(@Mock Consumer> callback) { - RuntimeException error = new RuntimeException(); - - IO.raiseError(error).safeRunAsync(callback); - - verify(callback, timeout(1000)).accept(Try.failure(error)); - } - - @Test - void recover() { - IO recover = IO.raiseError(new RuntimeException()).recover(error -> "hola mundo"); - - assertEquals("hola mundo", recover.unsafeRunSync()); - } - - @Test - void recoverWith() { - IO recover = IO.raiseError(new IllegalArgumentException()) - .recover(IllegalArgumentException.class, error -> "hola mundo"); - - assertEquals("hola mundo", recover.unsafeRunSync()); - } - - @Test - void recoverWithNotMatch() { - IO recover = IO.raiseError(new IllegalArgumentException()) - .recover(NoSuchElementException.class, error -> "hola mundo"); - - assertThrows(IllegalArgumentException.class, recover::unsafeRunSync); - } - - @Test - void retry(@Mock Supplier computation) { - when(computation.get()).thenThrow(UnsupportedOperationException.class); - - Try retry = IO.task(computation).retry().safeRunSync(); - - assertTrue(retry.isFailure()); - verify(computation, times(2)).get(); - } - - @Test - void retryFailure(@Mock Supplier computation) { - when(computation.get()).thenThrow(UnsupportedOperationException.class); - - Try retry = IO.task(computation).retry(Duration.ofMillis(100), 3).safeRunSync(); - - assertTrue(retry.isFailure()); - verify(computation, times(4)).get(); - } - - @Test - void retrySuccess(@Mock Supplier computation) { - when(computation.get()) - .thenThrow(UnsupportedOperationException.class) - .thenThrow(UnsupportedOperationException.class) - .thenThrow(UnsupportedOperationException.class) - .thenReturn("hola"); - - Try retry = IO.task(computation).retry(Duration.ofMillis(100), 3).safeRunSync(); - - assertEquals("hola", retry.get()); - verify(computation, times(4)).get(); - } - - @Test - void repeatSuccess(@Mock Supplier computation) { - when(computation.get()).thenReturn("hola"); - - Try repeat = IO.task(computation).repeat(Duration.ofMillis(100), 3).safeRunSync(); - - assertEquals("hola", repeat.get()); - verify(computation, times(4)).get(); - } - - @Test - void repeatFailure(@Mock Supplier computation) { - when(computation.get()).thenReturn("hola").thenThrow(UnsupportedOperationException.class); - - Try repeat = IO.task(computation).repeat(Duration.ofMillis(100), 3).safeRunSync(); - - assertTrue(repeat.isFailure()); - verify(computation, times(2)).get(); - } - - @Test - void repeat(@Mock Supplier computation) { - when(computation.get()).thenReturn("hola"); - - Try repeat = IO.task(computation).repeat().safeRunSync(); - - assertEquals("hola", repeat.get()); - verify(computation, times(2)).get(); - } - - @Test - void flatMapped() { - IO io = IO.unit() - .map(ignore -> "hola") - .map(ignore -> "hola") - .map(ignore -> "hola") - .map(ignore -> "adios"); - - assertEquals("adios", io.unsafeRunSync()); - } - - @Test - void stackSafety() { - IO sum = sum(100000, 0); - - Future futureSum = sum.runAsync(); - - assertEquals(705082704, sum.unsafeRunSync()); - assertEquals(Try.success(705082704), futureSum.await(1, TimeUnit.SECONDS).toTry()); - } - - @Test - void timed() { - IO> sum = sum(100000, 0).timed(); - - Tuple2 result = sum.unsafeRunSync(); - - assertEquals(705082704, result._2()); - assertTrue(result._1().toMillis() > 0); - } - - @Test - void timeoutFail() { - assertThrows(TimeoutException.class, IO.never().timeout(Duration.ofSeconds(1))::unsafeRunSync); - } - - @Test - void timeoutSuccess() { - assertEquals(1, IO.pure(1).timeout(Duration.ofSeconds(1)).unsafeRunSync()); - } - - @Test - void traverse() { - IO left = IO.task(() -> "left"); - IO right = IO.task(() -> "right"); - - IO> traverse = IO.traverse(List.of(left, right)); - - assertEquals(List.of("left", "right"), traverse.unsafeRunSync()); - } - - @Test - void raceA() { - IO> race = IO.race( - IO.delay(Duration.ofMillis(10), () -> 10), - IO.delay(Duration.ofMillis(100), () -> "b")); - - Either orElseThrow = race.unsafeRunSync(); - - assertEquals(Either.left(10), orElseThrow); - } - - @Test - void raceB() { - IO> race = IO.race( - IO.delay(Duration.ofMillis(100), () -> 10), - IO.delay(Duration.ofMillis(10), () -> "b")); - - Either orElseThrow = race.unsafeRunSync(); - - assertEquals(Either.right("b"), orElseThrow); - } - - @Test - void fork() { - IO result = IO.pure("hola") - .flatMap(hello -> IO.delay(Duration.ofSeconds(1), () -> hello + " toni").fork()) - .flatMap(Fiber::join); - - String orElseThrow = result.runAsync().get(); - - assertEquals("hola toni", orElseThrow); - } - - @Test - void memoize(@Mock Function1 toUpperCase) { - when(toUpperCase.apply(any())) - .thenAnswer(args -> args.getArgument(0, String.class).toUpperCase()); - - IO>> memoized = IO.memoize((String str) -> IO.pure(toUpperCase.apply(str))); - - IO flatMap = memoized.flatMap(x -> x.apply("hola")); - flatMap.unsafeRunSync(); - flatMap.unsafeRunSync(); - flatMap.unsafeRunSync(); - flatMap.unsafeRunSync(); - - verify(toUpperCase).apply("hola"); - } - - @Test - void fibSyncTest() { - assertAll( - () -> assertEquals(1, fibSync(1).unsafeRunSync()), - () -> assertEquals(1, fibSync(2).unsafeRunSync()), - () -> assertEquals(2, fibSync(3).unsafeRunSync()), - () -> assertEquals(3, fibSync(4).unsafeRunSync()), - () -> assertEquals(5, fibSync(5).unsafeRunSync()), - () -> assertEquals(8, fibSync(6).unsafeRunSync()), - () -> assertEquals(13, fibSync(7).unsafeRunSync()), - () -> assertEquals(21, fibSync(8).unsafeRunSync()), - () -> assertEquals(55, fibSync(10).unsafeRunSync()), - () -> assertEquals(6765, fibSync(20).unsafeRunSync()) - ); - } - - @Test - void fibAsyncTest() { - assertAll( - () -> assertEquals(1, fibAsync(1).unsafeRunSync()), - () -> assertEquals(1, fibAsync(2).unsafeRunSync()), - () -> assertEquals(2, fibAsync(3).unsafeRunSync()), - () -> assertEquals(3, fibAsync(4).unsafeRunSync()), - () -> assertEquals(5, fibAsync(5).unsafeRunSync()), - () -> assertEquals(8, fibAsync(6).unsafeRunSync()), - () -> assertEquals(13, fibAsync(7).unsafeRunSync()), - () -> assertEquals(21, fibAsync(8).unsafeRunSync()), - () -> assertEquals(55, fibAsync(10).unsafeRunSync()), - () -> assertEquals(6765, fibAsync(20).unsafeRunSync()) - ); - } - - private IO fibSync(int number) { - if (number < 2) { - return IO.pure(number); - } - var number1 = fibSync(number - 1); - var number2 = fibSync(number - 2); - return number1.flatMap(x -> number2.map(y -> x + y)); - } - - private IO fibAsync(int number) { - if (number < 2) { - return IO.pure(number); - } - return IO.parMap2(fibAsync(number - 1), fibAsync(number - 2), Integer::sum); - } - - private IO open(ResultSet resultSet) { - return IO.pure(resultSet); - } - - private Function1> tryGetString(String column) { - return rs -> Try.of(() -> getString(column).apply(rs)); - } - - private CheckedFunction1 getString(String column) { - return resultSet -> resultSet.getString(column); - } - - private IO sum(Integer n, Integer sum) { - if (n == 0) { - return IO.pure(sum); - } - return IO.suspend(() -> sum(n - 1, sum + n)); - } - - private IO> currentThreadIO() { - Ref> ref = Ref.of(List.empty()); - IO> currentThread = - ref.updateAndGet(list -> list.append(Thread.currentThread().getName())); - - return currentThread - .andThen(currentThread - .andThen(currentThread - .andThen(currentThread - .andThen(currentThread)))); - } -} diff --git a/settings.gradle b/settings.gradle index 6218084..309d89a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,11 +1 @@ -/* - * This file was generated by the Gradle 'init' task. - * - * The settings file is used to specify which projects to include in your build. - * - * Detailed information about configuring a multi-project build in Gradle can be found - * in the user manual at https://docs.gradle.org/7.4.2/userguide/multi_project_builds.html - */ - rootProject.name = 'vavr-effect' -include('lib') diff --git a/src/main/java/com/github/tonivade/vavr/effect/Fiber.java b/src/main/java/com/github/tonivade/vavr/effect/Fiber.java new file mode 100644 index 0000000..d934c88 --- /dev/null +++ b/src/main/java/com/github/tonivade/vavr/effect/Fiber.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +public sealed interface Fiber { + + IO join(); + + IO cancel(); + + static Fiber of(IO join, IO cancel) { + return new FiberImpl<>(join, cancel); + } + + record FiberImpl(IO join, IO cancel) implements Fiber {} +} \ No newline at end of file diff --git a/src/main/java/com/github/tonivade/vavr/effect/IO.java b/src/main/java/com/github/tonivade/vavr/effect/IO.java new file mode 100644 index 0000000..aa179af --- /dev/null +++ b/src/main/java/com/github/tonivade/vavr/effect/IO.java @@ -0,0 +1,880 @@ +/* + * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +import static io.vavr.Function1.identity; +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import io.vavr.CheckedConsumer; +import io.vavr.CheckedRunnable; +import io.vavr.Function1; +import io.vavr.Function2; +import io.vavr.PartialFunction; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Seq; +import io.vavr.concurrent.Future; +import io.vavr.concurrent.Promise; +import io.vavr.control.Either; +import io.vavr.control.Option; +import io.vavr.control.Try; + +public sealed interface IO { + + IO UNIT = pure(Unit.unit()); + + default Future runAsync() { + return runAsync(this, IOConnection.UNCANCELLABLE).future(); + } + + default Future runAsync(Executor executor) { + return forked(executor).andThen(this).runAsync(); + } + + default T unsafeRunSync() { + return safeRunSync().get(); + } + + default Try safeRunSync() { + return runAsync().toTry(); + } + + default void safeRunAsync(Consumer> callback) { + safeRunAsync(Future.DEFAULT_EXECUTOR, callback); + } + + default void safeRunAsync(Executor executor, Consumer> callback) { + runAsync(executor).onComplete(callback); + } + + default IO map(Function1 map) { + return flatMap(map.andThen(IO::pure)); + } + + default IO flatMap(Function1> map) { + return new FlatMapped<>(this, map.andThen(IO::narrowK)); + } + + default IO andThen(IO after) { + return flatMap(ignore -> after); + } + + default IO ap(IO> apply) { + return parMap2(Future.DEFAULT_EXECUTOR, this, apply, (v, a) -> a.apply(v)); + } + + default IO> attempt() { + return map(Try::success).recover(Try::failure); + } + + default IO> either() { + return attempt().map(Try::toEither); + } + + default IO> either(Function1 mapError, + Function1 mapper) { + return either().map(either -> either.bimap(mapError, mapper)); + } + + default IO redeem(Function1 mapError, + Function1 mapper) { + return attempt().map(result -> result.fold(mapError, mapper)); + } + + default IO redeemWith(Function1> mapError, + Function1> mapper) { + return attempt().flatMap(result -> result.fold(mapError, mapper)); + } + + default IO recover(Function1 mapError) { + return recoverWith(partialFunction(x -> true, mapError.andThen(IO::pure))); + } + + @SuppressWarnings("unchecked") + default IO recover(Class type, Function1 function) { + return recoverWith(partialFunction(error -> error.getClass().equals(type), t -> function.andThen(IO::pure).apply((X) t))); + } + + @SuppressWarnings("serial") + static PartialFunction partialFunction(Predicate matcher, Function1 function) { + return new PartialFunction<>() { + + @Override + public boolean isDefinedAt(A value) { + return matcher.test(value); + } + + @Override + public B apply(A t) { + return function.apply(t); + } + }; + } + + default IO recoverWith(PartialFunction> mapper) { + return new Recover<>(this, mapper); + } + + default IO> timed() { + return IO.task(System::nanoTime).flatMap( + start -> map(result -> Tuple.of(Duration.ofNanos(System.nanoTime() - start), result))); + } + + default IO> fork() { + return async(callback -> { + IOConnection connection = IOConnection.cancellable(); + Promise promise = runAsync(this, connection); + + IO join = fromFuture(promise.future()); + IO cancel = exec(connection::cancel); + + callback.accept(Try.success(Fiber.of(join, cancel))); + }); + } + + default IO timeout(Duration duration) { + return timeout(Future.DEFAULT_EXECUTOR, duration); + } + + default IO timeout(Executor executor, Duration duration) { + return racePair(executor, this, sleep(duration)).flatMap(either -> either.fold( + ta -> ta._2().cancel().map(x -> ta._1()), + tb -> tb._1().cancel().flatMap(x -> IO.raiseError(new TimeoutException())))); + } + + default IO repeat() { + return repeat(1); + } + + default IO repeat(int times) { + return repeat(this, unit(), times); + } + + default IO repeat(Duration delay) { + return repeat(delay, 1); + } + + default IO repeat(Duration delay, int times) { + return repeat(this, sleep(delay), times); + } + + default IO retry() { + return retry(1); + } + + default IO retry(int maxRetries) { + return retry(this, unit(), maxRetries); + } + + default IO retry(Duration delay) { + return retry(delay, 1); + } + + default IO retry(Duration delay, int maxRetries) { + return retry(this, sleep(delay), maxRetries); + } + + @SuppressWarnings("unchecked") + static IO narrowK(IO value) { + return (IO) value; + } + + static IO pure(T value) { + return new Pure<>(value); + } + + static IO> race(IO fa, IO fb) { + return race(Future.DEFAULT_EXECUTOR, fa, fb); + } + + static IO> race(Executor executor, IO fa, IO fb) { + return racePair(executor, fa, fb).flatMap(either -> either.fold( + ta -> ta._2().cancel().map(x -> Either.left(ta._1())), + tb -> tb._1().cancel().map(x -> Either.right(tb._2())))); + } + + static IO>, Tuple2, B>>> racePair(Executor executor, IO fa, IO fb) { + return cancellable(callback -> { + + IOConnection connection1 = IOConnection.cancellable(); + IOConnection connection2 = IOConnection.cancellable(); + + Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); + Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); + + promiseA.future().onComplete(result -> callback.accept( + result.map(a -> Either.left(Tuple.of(a, Fiber.of(IO.fromFuture(promiseB.future()), IO.exec(connection2::cancel))))))); + promiseB.future().onComplete(result -> callback.accept( + result.map(b -> Either.right(Tuple.of(Fiber.of(IO.fromFuture(promiseA.future()), IO.exec(connection2::cancel)), b))))); + + return IO.exec(() -> { + try { + connection1.cancel(); + } finally { + connection2.cancel(); + } + }); + }); + } + + static IO raiseError(Throwable error) { + return new Failure<>(error); + } + + static IO delay(Duration delay, Supplier lazy) { + return sleep(delay).andThen(task(lazy)); + } + + static IO suspend(Supplier> lazy) { + return new Suspend<>(lazy); + } + + static Function1> lift(Function1 task) { + return task.andThen(IO::pure); + } + + public static Function1> liftOption(Function1> function) { + return value -> fromOption(function.apply(value)); + } + + public static Function1> liftTry(Function1> function) { + return value -> fromTry(function.apply(value)); + } + + public static Function1> liftEither(Function1> function) { + return value -> fromEither(function.apply(value)); + } + + static IO fromOption(Option task) { + return fromEither(toEither(task)); + } + + static IO fromTry(Try task) { + return fromEither(task.toEither()); + } + + static IO fromEither(Either task) { + return task.fold(IO::raiseError, IO::pure); + } + + static IO fromFuture(Future promise) { + CheckedConsumer>> callback = promise::onComplete; + return async(callback); + } + + static IO fromCompletableFuture(CompletableFuture promise) { + return fromFuture(Future.fromCompletableFuture(promise)); + } + + static IO sleep(Duration duration) { + return sleep(Future.DEFAULT_EXECUTOR, duration); + } + + static IO sleep(Executor executor, Duration duration) { + return cancellable(callback -> { + Future sleep = FutureModule.sleep(executor,duration) + .onComplete(result -> callback.accept(Try.success(Unit.unit()))); + return IO.exec(() -> sleep.cancel(true)); + }); + } + + static IO exec(CheckedRunnable task) { + return task(asSupplier(task)); + } + + static IO task(Supplier producer) { + return new Delay<>(producer); + } + + static IO never() { + return async(callback -> {}); + } + + static IO forked() { + return forked(Future.DEFAULT_EXECUTOR); + } + + static IO forked(Executor executor) { + return async(callback -> executor.execute(() -> callback.accept(Try.success(Unit.unit())))); + } + + static IO async(CheckedConsumer>> callback) { + return cancellable(asFunction(callback)); + } + + static IO cancellable(Function1>, IO> callback) { + return new Async<>(callback); + } + + static IO>> memoize(Function1> function) { + return memoize(Future.DEFAULT_EXECUTOR, function); + } + + static IO>> memoize(Executor executor, Function1> function) { + var ref = Ref.make(HashMap.>empty()); + return ref.map(r -> { + Function1>> result = a -> r.modify(map -> map.get(a).fold(() -> { + Promise promise = Promise.make(); + function.apply(a).safeRunAsync(executor, promise::tryComplete); + return Tuple.of(IO.fromFuture(promise.future()), map.put(a, promise)); + }, promise -> Tuple.of(IO.fromFuture(promise.future()), map))); + return result.andThen(io -> io.flatMap(identity())); + }); + } + + static IO unit() { + return UNIT; + } + + static IO bracket(IO acquire, + Function1> use, Function1> release) { + return cancellable(callback -> { + + IOConnection cancellable = IOConnection.cancellable(); + + Promise promise = runAsync(acquire, cancellable); + + promise.future() + .onFailure(error -> callback.accept(Try.failure(error))) + .onSuccess(resource -> runAsync(use.andThen(IO::narrowK).apply(resource), cancellable).future() + .onComplete(result -> runAsync(release.andThen(IO::narrowK).apply(resource), cancellable).future() + .onComplete(ignore -> callback.accept(result)) + )); + + return IO.exec(cancellable::cancel); + }); + } + + static IO bracket(IO acquire, + Function1> use, CheckedConsumer release) { + return bracket(acquire, use, asFunction(release)); + } + + static IO bracket(IO acquire, + Function1> use) { + return bracket(acquire, use, AutoCloseable::close); + } + + static IO sequence(Seq> sequence) { + IO initial = IO.unit(); + return sequence.foldLeft(initial, (IO a, IO b) -> a.andThen(b)).andThen(IO.unit()); + } + + static IO> traverse(Seq> sequence) { + return traverse(Future.DEFAULT_EXECUTOR, sequence); + } + + static IO> traverse(Executor executor, Seq> sequence) { + return sequence.foldLeft(pure(List.empty()), + (IO> xs, IO a) -> parMap2(executor, xs, a, Seq::append)); + } + + static IO parMap2(IO fa, IO fb, + Function2 mapper) { + return parMap2(Future.DEFAULT_EXECUTOR, fa, fb, mapper); + } + + static IO parMap2(Executor executor, IO fa, IO fb, + Function2 mapper) { + return cancellable(callback -> { + + IOConnection connection1 = IOConnection.cancellable(); + IOConnection connection2 = IOConnection.cancellable(); + + Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); + Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); + + promiseA.future().onComplete(a -> promiseB.future().onComplete(b -> callback.accept(map2(a, b, mapper)))); + + return IO.exec(() -> { + try { + connection1.cancel(); + } finally { + connection2.cancel(); + } + }); + }); + } + + static IO> tuple(IO fa, IO fb) { + return tuple(Future.DEFAULT_EXECUTOR, fa, fb); + } + + static IO> tuple(Executor executor, IO fa, IO fb) { + return parMap2(executor, fa, fb, Tuple::of); + } + + private static Promise runAsync(IO current, IOConnection connection) { + return runAsync(current, connection, new CallStack<>(), Promise.make()); + } + + private static Try map2(Try a, Try b, Function2 mapper) { + return a.flatMap(x -> b.map(y -> mapper.apply(x, y))); + } + + private static Either toEither(Option task) { + return task.fold(() -> Either.left(new NoSuchElementException()), Either::right); + } + + private static Supplier asSupplier(CheckedRunnable task) { + return () -> { + task.unchecked().run(); + return Unit.unit(); + }; + } + + private static Function1> asFunction(CheckedConsumer release) { + return t -> { + release.unchecked().accept(t); + return unit(); + }; + } + + @SuppressWarnings("unchecked") + private static Promise runAsync(IO current, IOConnection connection, CallStack stack, Promise promise) { + while (true) { + try { + current = unwrap(current, stack, identity()); + + if (current instanceof Pure pure) { + return promise.success(pure.value); + } + + if (current instanceof Async async) { + return executeAsync(async, connection, promise); + } + + if (current instanceof FlatMapped) { + stack.push(); + + var flatMapped = (FlatMapped) current; + IO source = IO.narrowK(unwrap(flatMapped.current, stack, u -> u.flatMap(flatMapped.next))); + + if (source instanceof Async async) { + Promise nextPromise = Promise.make(); + + nextPromise.future().andThen(tryU -> tryU.onFailure(promise::failure) + .onSuccess(u -> runAsync(IO.narrowK(flatMapped.next.apply(u)), connection, stack, promise))); + + executeAsync(async, connection, nextPromise); + + return promise; + } + + if (source instanceof Pure pure) { + Function1> andThen = flatMapped.next.andThen(IO::narrowK); + current = andThen.apply(pure.value); + } else if (source instanceof FlatMapped) { + FlatMapped flatMapped2 = (FlatMapped) source; + current = flatMapped2.current.flatMap(a -> flatMapped2.next.apply(a).flatMap(flatMapped.next)); + } + } else { + stack.pop(); + } + } catch (Throwable error) { + Option> result = stack.tryHandle(error); + + if (result.isDefined()) { + current = result.get(); + } else { + return promise.failure(error); + } + } + } + } + + private static IO unwrap(IO current, CallStack stack, Function1, IO> next) { + while (true) { + if (current instanceof Failure failure) { + return stack.sneakyThrow(failure.error); + } else if (current instanceof Recover recover) { + stack.add(partialFunction(recover.mapper::isDefinedAt, recover.mapper.andThen(next))); + current = recover.current; + } else if (current instanceof Suspend suspend) { + Supplier> andThen = () -> IO.narrowK(suspend.lazy.get()); + current = andThen.get(); + } else if (current instanceof Delay delay) { + return IO.pure(delay.task.get()); + } else if (current instanceof Pure) { + return current; + } else if (current instanceof FlatMapped) { + return current; + } else if (current instanceof Async) { + return current; + } else { + throw new IllegalStateException(); + } + } + } + + private static Promise executeAsync(Async current, IOConnection connection, Promise promise) { + if (connection.isCancellable() && !connection.updateState(StateIO::startingNow).isRunnable()) { + return promise.complete(Try.failure(new CancellationException())); + } + + connection.setCancelToken(current.callback.apply(promise::tryComplete)); + + promise.future().andThen(x -> connection.setCancelToken(UNIT)); + + if (connection.isCancellable() && connection.updateState(StateIO::notStartingNow).isCancellingNow()) { + connection.cancelNow(); + } + + return promise; + } + + private static IO repeat(IO self, IO pause, int times) { + return self.redeemWith(IO::raiseError, value -> { + if (times > 0) { + return pause.andThen(repeat(self, pause, times - 1)); + } else return IO.pure(value); + }); + } + + private static IO retry(IO self, IO pause, int maxRetries) { + return self.redeemWith(error -> { + if (maxRetries > 0) { + return pause.andThen(retry(self, pause.repeat(), maxRetries - 1)); + } else return IO.raiseError(error); + }, IO::pure); + } + + final class Pure implements IO { + + private final T value; + + private Pure(T value) { + this.value = Objects.requireNonNull(value); + } + + @Override + public String toString() { + return "Pure(" + value + ")"; + } + } + + final class Failure implements IO { + + private final Throwable error; + + private Failure(Throwable error) { + this.error = Objects.requireNonNull(error); + } + + @Override + public String toString() { + return "Failure(" + error + ")"; + } + } + + final class FlatMapped implements IO { + + private final IO current; + private final Function1> next; + + private FlatMapped(IO current, + Function1> next) { + this.current = Objects.requireNonNull(current); + this.next = Objects.requireNonNull(next); + } + + @Override + public String toString() { + return "FlatMapped(" + current + ", ?)"; + } + } + + final class Delay implements IO { + + private final Supplier task; + + private Delay(Supplier task) { + this.task = Objects.requireNonNull(task); + } + + @Override + public String toString() { + return "Delay(?)"; + } + } + + final class Async implements IO { + + private final Function1>, IO> callback; + + private Async(Function1>, IO> callback) { + this.callback = Objects.requireNonNull(callback); + } + + @Override + public String toString() { + return "Async(?)"; + } + } + + final class Suspend implements IO { + + private final Supplier> lazy; + + private Suspend(Supplier> lazy) { + this.lazy = Objects.requireNonNull(lazy); + } + + @Override + public String toString() { + return "Suspend(?)"; + } + } + + final class Recover implements IO { + + private final IO current; + private final PartialFunction> mapper; + + private Recover(IO current, PartialFunction> mapper) { + this.current = Objects.requireNonNull(current); + this.mapper = Objects.requireNonNull(mapper); + } + + @Override + public String toString() { + return "Recover(" + current + ", ?)"; + } + } +} + +interface IOConnection { + + IOConnection UNCANCELLABLE = new IOConnection() { + @Override + public boolean isCancellable() { return false; } + + @Override + public void setCancelToken(IO cancel) { } + + @Override + public void cancelNow() { } + + @Override + public void cancel() { } + + @Override + public StateIO updateState(UnaryOperator update) { return StateIO.INITIAL; } + }; + + boolean isCancellable(); + + void setCancelToken(IO cancel); + + void cancelNow(); + + void cancel(); + + StateIO updateState(UnaryOperator update); + + static IOConnection cancellable() { + return new IOConnection() { + + private IO cancelToken; + private final AtomicReference state = new AtomicReference<>(StateIO.INITIAL); + + @Override + public boolean isCancellable() { return true; } + + @Override + public void setCancelToken(IO cancel) { this.cancelToken = Objects.requireNonNull(cancel); } + + @Override + public void cancelNow() { cancelToken.runAsync(); } + + @Override + public void cancel() { + if (state.getAndUpdate(StateIO::cancellingNow).isCancelable()) { + cancelNow(); + + state.set(StateIO.CANCELLED); + } + } + + @Override + public StateIO updateState(UnaryOperator update) { + return state.updateAndGet(update::apply); + } + }; + } +} + +final class StateIO { + + public static final StateIO INITIAL = new StateIO(false, false, false); + public static final StateIO CANCELLED = new StateIO(true, false, false); + + private final boolean isCancelled; + private final boolean cancellingNow; + private final boolean startingNow; + + public StateIO(boolean isCancelled, boolean cancellingNow, boolean startingNow) { + this.isCancelled = isCancelled; + this.cancellingNow = cancellingNow; + this.startingNow = startingNow; + } + + public boolean isCancelled() { + return isCancelled; + } + + public boolean isCancellingNow() { + return cancellingNow; + } + + public boolean isStartingNow() { + return startingNow; + } + + public StateIO cancellingNow() { + return new StateIO(isCancelled, true, startingNow); + } + + public StateIO startingNow() { + return new StateIO(isCancelled, cancellingNow, true); + } + + public StateIO notStartingNow() { + return new StateIO(isCancelled, cancellingNow, false); + } + + public boolean isCancelable() { + return !isCancelled && !cancellingNow && !startingNow; + } + + public boolean isRunnable() { + return !isCancelled && !cancellingNow; + } +} + +final class CallStack { + + private StackItem top = new StackItem<>(); + + public void push() { + top.push(); + } + + public void pop() { + if (top.count() > 0) { + top.pop(); + } else { + top = top.prev(); + } + } + + public void add(PartialFunction> mapError) { + if (top.count() > 0) { + top.pop(); + top = new StackItem<>(top); + } + top.add(mapError); + } + + public Option> tryHandle(Throwable error) { + while (top != null) { + top.reset(); + Option> result = top.tryHandle(error); + + if (result.isDefined()) { + return result; + } else { + top = top.prev(); + } + } + return Option.none(); + } + + // XXX: https://www.baeldung.com/java-sneaky-throws + @SuppressWarnings("unchecked") + public R sneakyThrow(Throwable t) throws X { + throw (X) t; + } +} + +final class StackItem { + + private int count = 0; + private final Deque>> recover = new ArrayDeque<>(); + + private final StackItem prev; + + public StackItem() { + this(null); + } + + public StackItem(StackItem prev) { + this.prev = prev; + } + + public StackItem prev() { + return prev; + } + + public int count() { + return count; + } + + public void push() { + count++; + } + + public void pop() { + count--; + } + + public void reset() { + count = 0; + } + + public void add(PartialFunction> mapError) { + recover.addFirst(mapError); + } + + public Option> tryHandle(Throwable error) { + while (!recover.isEmpty()) { + var mapError = recover.removeFirst(); + if (mapError.isDefinedAt(error)) { + return Option.some(mapError.andThen(IO::narrowK).apply(error)); + } + } + return Option.none(); + } +} + +interface FutureModule { + + ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(0); + + static Future sleep(Executor executor, Duration delay) { + return Future.fromCompletableFuture(executor, CompletableFuture.supplyAsync(Unit::unit, delayedExecutor(delay, executor))); + } + + static Executor delayedExecutor(Duration delay, Executor executor) { + return task -> SCHEDULER.schedule(() -> executor.execute(task), delay.toMillis(), TimeUnit.MILLISECONDS); + } +} \ No newline at end of file diff --git a/src/main/java/com/github/tonivade/vavr/effect/Ref.java b/src/main/java/com/github/tonivade/vavr/effect/Ref.java new file mode 100644 index 0000000..5dd2abf --- /dev/null +++ b/src/main/java/com/github/tonivade/vavr/effect/Ref.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; + +import io.vavr.Function1; +import io.vavr.Tuple2; + +public final class Ref { + + private final AtomicReference value; + + private Ref(AtomicReference value) { + this.value = Objects.requireNonNull(value); + } + + public IO get() { + return IO.task(value::get); + } + + public IO set(A newValue) { + return IO.task(() -> { value.set(newValue); return Unit.unit(); }); + } + + public IO modify(Function1> change) { + return IO.task(() -> { + var loop = true; + B result = null; + while (loop) { + A current = value.get(); + var tuple = change.apply(current); + result = tuple._1(); + loop = !value.compareAndSet(current, tuple._2()); + } + return result; + }); + } + + public IO lazySet(A newValue) { + return IO.task(() -> { value.lazySet(newValue); return Unit.unit(); }); + } + + public IO getAndSet(A newValue) { + return IO.task(() -> value.getAndSet(newValue)); + } + + public IO updateAndGet(UnaryOperator update) { + return IO.task(() -> value.updateAndGet(update::apply)); + } + + public IO getAndUpdate(UnaryOperator update) { + return IO.task(() -> value.getAndUpdate(update::apply)); + } + + public static IO> make(A value) { + return IO.pure(of(value)); + } + + public static Ref of(A value) { + return new Ref<>(new AtomicReference<>(value)); + } + + @Override + public String toString() { + return String.format("Ref(%s)", value.get()); + } +} diff --git a/src/main/java/com/github/tonivade/vavr/effect/Unit.java b/src/main/java/com/github/tonivade/vavr/effect/Unit.java new file mode 100644 index 0000000..11b3583 --- /dev/null +++ b/src/main/java/com/github/tonivade/vavr/effect/Unit.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +import java.io.Serial; +import java.io.Serializable; + +/** + * Type that represents a single value called Unit. + */ +public final class Unit implements Serializable { + + @Serial + private static final long serialVersionUID = -8253613036328680583L; + + private static final Unit INSTANCE = new Unit(); + + private Unit() {} + + public static Unit unit() { + return INSTANCE; + } + + @Override + public String toString() { + return "Unit"; + } + + @Serial + private Object readResolve() { + return INSTANCE; + } +} diff --git a/src/test/java/com/github/tonivade/vavr/effect/IOTest.java b/src/test/java/com/github/tonivade/vavr/effect/IOTest.java new file mode 100644 index 0000000..59d9477 --- /dev/null +++ b/src/test/java/com/github/tonivade/vavr/effect/IOTest.java @@ -0,0 +1,394 @@ +/* + * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import io.vavr.CheckedFunction1; +import io.vavr.Function1; +import io.vavr.Tuple2; +import io.vavr.collection.List; +import io.vavr.collection.Seq; +import io.vavr.concurrent.Future; +import io.vavr.control.Either; +import io.vavr.control.Try; + +@ExtendWith(MockitoExtension.class) +class IOTest { + + @Test + void pure() { + IO pure = IO.pure("hola mundo"); + + assertAll( + () -> assertEquals("hola mundo", pure.unsafeRunSync()), + () -> assertEquals("HOLA MUNDO", pure.map(String::toUpperCase).unsafeRunSync()), + () -> assertArrayEquals(new String[] { "hola", "mundo" }, + pure.flatMap(string -> IO.task(() -> string.split(" "))).unsafeRunSync()), + () -> assertEquals(Integer.valueOf(100), pure.andThen(IO.task(() -> 100)).unsafeRunSync())); + } + + @Test + void asyncSuccess() { + IO async = IO.async(callback -> { + System.out.println(Thread.currentThread().getName()); + Thread.sleep(100); + callback.accept(Try.success("1")); + }); + + Future foldMap = IO.forked().andThen(async).runAsync(); + + assertEquals("1", foldMap.get()); + } + + @Test + void asyncFailure() { + IO async = IO.async(callback -> { + Thread.sleep(100); + callback.accept(Try.failure(new UnsupportedOperationException())); + }); + + Future foldMap = IO.forked().andThen(async).runAsync(); + + assertThrows(UnsupportedOperationException.class, foldMap::get); + } + +// @Test +// void echo() { +// IO echo = narrowK(console.println("write your name")) +// .andThen(narrowK(console.readln())) +// .flatMap(name -> narrowK(console.println("Hello " + name))) +// .andThen(narrowK(console.println("end"))); +// +// ConsoleExecutor executor = new ConsoleExecutor().read("Toni"); +// +// executor.run(echo); +// +// assertEquals("write your name\nHello Toni\nend\n", executor.getOutput()); +// } + + @Test + void safeRunAsync() { + IO> program = currentThreadIO(); + + Try> result = program.runAsync().await(1, TimeUnit.SECONDS).toTry(); + + assertEquals(Try.success(5), result.map(List::size)); + } + + @Test + void bracket() throws SQLException { + ResultSet resultSet = mock(ResultSet.class); + when(resultSet.getString("id")).thenReturn("value"); + + IO bracket = IO.bracket(open(resultSet), IO.liftTry(tryGetString("id"))); + + assertEquals("value", bracket.unsafeRunSync()); + verify(resultSet).close(); + } + + @Test + void safeRunAsyncSuccess(@Mock Consumer> callback) { + IO.pure("hola").safeRunAsync(callback); + + verify(callback, timeout(1000)).accept(Try.success("hola")); + } + + @Test + void unsafeRunAsyncFailure(@Mock Consumer> callback) { + RuntimeException error = new RuntimeException(); + + IO.raiseError(error).safeRunAsync(callback); + + verify(callback, timeout(1000)).accept(Try.failure(error)); + } + + @Test + void recover() { + IO recover = IO.raiseError(new RuntimeException()).recover(error -> "hola mundo"); + + assertEquals("hola mundo", recover.unsafeRunSync()); + } + + @Test + void recoverWith() { + IO recover = IO.raiseError(new IllegalArgumentException()) + .recover(IllegalArgumentException.class, error -> "hola mundo"); + + assertEquals("hola mundo", recover.unsafeRunSync()); + } + + @Test + void recoverWithNotMatch() { + IO recover = IO.raiseError(new IllegalArgumentException()) + .recover(NoSuchElementException.class, error -> "hola mundo"); + + assertThrows(IllegalArgumentException.class, recover::unsafeRunSync); + } + + @Test + void retry(@Mock Supplier computation) { + when(computation.get()).thenThrow(UnsupportedOperationException.class); + + Try retry = IO.task(computation).retry().safeRunSync(); + + assertTrue(retry.isFailure()); + verify(computation, times(2)).get(); + } + + @Test + void retryFailure(@Mock Supplier computation) { + when(computation.get()).thenThrow(UnsupportedOperationException.class); + + Try retry = IO.task(computation).retry(Duration.ofMillis(100), 3).safeRunSync(); + + assertTrue(retry.isFailure()); + verify(computation, times(4)).get(); + } + + @Test + void retrySuccess(@Mock Supplier computation) { + when(computation.get()) + .thenThrow(UnsupportedOperationException.class) + .thenThrow(UnsupportedOperationException.class) + .thenThrow(UnsupportedOperationException.class) + .thenReturn("hola"); + + Try retry = IO.task(computation).retry(Duration.ofMillis(100), 3).safeRunSync(); + + assertEquals("hola", retry.get()); + verify(computation, times(4)).get(); + } + + @Test + void repeatSuccess(@Mock Supplier computation) { + when(computation.get()).thenReturn("hola"); + + Try repeat = IO.task(computation).repeat(Duration.ofMillis(100), 3).safeRunSync(); + + assertEquals("hola", repeat.get()); + verify(computation, times(4)).get(); + } + + @Test + void repeatFailure(@Mock Supplier computation) { + when(computation.get()).thenReturn("hola").thenThrow(UnsupportedOperationException.class); + + Try repeat = IO.task(computation).repeat(Duration.ofMillis(100), 3).safeRunSync(); + + assertTrue(repeat.isFailure()); + verify(computation, times(2)).get(); + } + + @Test + void repeat(@Mock Supplier computation) { + when(computation.get()).thenReturn("hola"); + + Try repeat = IO.task(computation).repeat().safeRunSync(); + + assertEquals("hola", repeat.get()); + verify(computation, times(2)).get(); + } + + @Test + void flatMapped() { + IO io = IO.unit() + .map(ignore -> "hola") + .map(ignore -> "hola") + .map(ignore -> "hola") + .map(ignore -> "adios"); + + assertEquals("adios", io.unsafeRunSync()); + } + + @Test + void stackSafety() { + IO sum = sum(100000, 0); + + Future futureSum = sum.runAsync(); + + assertEquals(705082704, sum.unsafeRunSync()); + assertEquals(Try.success(705082704), futureSum.await(1, TimeUnit.SECONDS).toTry()); + } + + @Test + void timed() { + IO> sum = sum(100000, 0).timed(); + + Tuple2 result = sum.unsafeRunSync(); + + assertEquals(705082704, result._2()); + assertTrue(result._1().toMillis() > 0); + } + + @Test + void timeoutFail() { + assertThrows(TimeoutException.class, IO.never().timeout(Duration.ofSeconds(1))::unsafeRunSync); + } + + @Test + void timeoutSuccess() { + assertEquals(1, IO.pure(1).timeout(Duration.ofSeconds(1)).unsafeRunSync()); + } + + @Test + void traverse() { + IO left = IO.task(() -> "left"); + IO right = IO.task(() -> "right"); + + IO> traverse = IO.traverse(List.of(left, right)); + + assertEquals(List.of("left", "right"), traverse.unsafeRunSync()); + } + + @Test + void raceA() { + IO> race = IO.race( + IO.delay(Duration.ofMillis(10), () -> 10), + IO.delay(Duration.ofMillis(100), () -> "b")); + + Either orElseThrow = race.unsafeRunSync(); + + assertEquals(Either.left(10), orElseThrow); + } + + @Test + void raceB() { + IO> race = IO.race( + IO.delay(Duration.ofMillis(100), () -> 10), + IO.delay(Duration.ofMillis(10), () -> "b")); + + Either orElseThrow = race.unsafeRunSync(); + + assertEquals(Either.right("b"), orElseThrow); + } + + @Test + void fork() { + IO result = IO.pure("hola") + .flatMap(hello -> IO.delay(Duration.ofSeconds(1), () -> hello + " toni").fork()) + .flatMap(Fiber::join); + + String orElseThrow = result.runAsync().get(); + + assertEquals("hola toni", orElseThrow); + } + + @Test + void memoize(@Mock Function1 toUpperCase) { + when(toUpperCase.apply(any())) + .thenAnswer(args -> args.getArgument(0, String.class).toUpperCase()); + + IO>> memoized = IO.memoize((String str) -> IO.pure(toUpperCase.apply(str))); + + IO flatMap = memoized.flatMap(x -> x.apply("hola")); + flatMap.unsafeRunSync(); + flatMap.unsafeRunSync(); + flatMap.unsafeRunSync(); + flatMap.unsafeRunSync(); + + verify(toUpperCase).apply("hola"); + } + + @Test + void fibSyncTest() { + assertAll( + () -> assertEquals(1, fibSync(1).unsafeRunSync()), + () -> assertEquals(1, fibSync(2).unsafeRunSync()), + () -> assertEquals(2, fibSync(3).unsafeRunSync()), + () -> assertEquals(3, fibSync(4).unsafeRunSync()), + () -> assertEquals(5, fibSync(5).unsafeRunSync()), + () -> assertEquals(8, fibSync(6).unsafeRunSync()), + () -> assertEquals(13, fibSync(7).unsafeRunSync()), + () -> assertEquals(21, fibSync(8).unsafeRunSync()), + () -> assertEquals(55, fibSync(10).unsafeRunSync()), + () -> assertEquals(6765, fibSync(20).unsafeRunSync()) + ); + } + + @Test + void fibAsyncTest() { + assertAll( + () -> assertEquals(1, fibAsync(1).unsafeRunSync()), + () -> assertEquals(1, fibAsync(2).unsafeRunSync()), + () -> assertEquals(2, fibAsync(3).unsafeRunSync()), + () -> assertEquals(3, fibAsync(4).unsafeRunSync()), + () -> assertEquals(5, fibAsync(5).unsafeRunSync()), + () -> assertEquals(8, fibAsync(6).unsafeRunSync()), + () -> assertEquals(13, fibAsync(7).unsafeRunSync()), + () -> assertEquals(21, fibAsync(8).unsafeRunSync()), + () -> assertEquals(55, fibAsync(10).unsafeRunSync()), + () -> assertEquals(6765, fibAsync(20).unsafeRunSync()) + ); + } + + private IO fibSync(int number) { + if (number < 2) { + return IO.pure(number); + } + var number1 = fibSync(number - 1); + var number2 = fibSync(number - 2); + return number1.flatMap(x -> number2.map(y -> x + y)); + } + + private IO fibAsync(int number) { + if (number < 2) { + return IO.pure(number); + } + return IO.parMap2(fibAsync(number - 1), fibAsync(number - 2), Integer::sum); + } + + private IO open(ResultSet resultSet) { + return IO.pure(resultSet); + } + + private Function1> tryGetString(String column) { + return rs -> Try.of(() -> getString(column).apply(rs)); + } + + private CheckedFunction1 getString(String column) { + return resultSet -> resultSet.getString(column); + } + + private IO sum(Integer n, Integer sum) { + if (n == 0) { + return IO.pure(sum); + } + return IO.suspend(() -> sum(n - 1, sum + n)); + } + + private IO> currentThreadIO() { + Ref> ref = Ref.of(List.empty()); + IO> currentThread = + ref.updateAndGet(list -> list.append(Thread.currentThread().getName())); + + return currentThread + .andThen(currentThread + .andThen(currentThread + .andThen(currentThread + .andThen(currentThread)))); + } +}