diff --git a/src/main/java/com/github/tonivade/vavr/effect/Fiber.java b/src/main/java/com/github/tonivade/vavr/effect/Fiber.java index 3afe2ec..6fcab2f 100644 --- a/src/main/java/com/github/tonivade/vavr/effect/Fiber.java +++ b/src/main/java/com/github/tonivade/vavr/effect/Fiber.java @@ -4,15 +4,6 @@ */ package com.github.tonivade.vavr.effect; -public sealed interface Fiber { +public record Fiber(IO join, IO cancel) { - IO join(); - - IO cancel(); - - static Fiber of(IO join, IO cancel) { - return new FiberImpl<>(join, cancel); - } - - record FiberImpl(IO join, IO cancel) implements Fiber {} } \ No newline at end of file diff --git a/src/main/java/com/github/tonivade/vavr/effect/IO.java b/src/main/java/com/github/tonivade/vavr/effect/IO.java index 2f0df53..33b477e 100644 --- a/src/main/java/com/github/tonivade/vavr/effect/IO.java +++ b/src/main/java/com/github/tonivade/vavr/effect/IO.java @@ -124,7 +124,7 @@ } default IO> timed() { - return IO.task(System::nanoTime).flatMap( + return task(System::nanoTime).flatMap( start -> map(result -> Tuple.of(Duration.ofNanos(System.nanoTime() - start), result))); } @@ -136,7 +136,7 @@ IO join = fromFuture(promise.future()); IO cancel = exec(connection::cancel); - callback.accept(Try.success(Fiber.of(join, cancel))); + callback.accept(Try.success(new Fiber<>(join, cancel))); }); } @@ -147,7 +147,7 @@ default IO timeout(Executor executor, Duration duration) { return racePair(executor, this, sleep(duration)).flatMap(either -> either.fold( ta -> ta._2().cancel().map(x -> ta._1()), - tb -> tb._1().cancel().flatMap(x -> IO.raiseError(new TimeoutException())))); + tb -> tb._1().cancel().flatMap(x -> raiseError(new TimeoutException())))); } default IO repeat() { @@ -233,21 +233,24 @@ tb -> tb._1().cancel().map(x -> Either.right(tb._2())))); } - static IO>, Tuple2, B>>> racePair(Executor executor, IO fa, IO fb) { + static IO>, Tuple2, B>>> racePair( + Executor executor, IO fa, IO fb) { return cancellable(callback -> { IOConnection connection1 = IOConnection.cancellable(); IOConnection connection2 = IOConnection.cancellable(); - Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); - Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); + Promise promiseA = runAsync(forked(executor).andThen(fa), connection1); + Promise promiseB = runAsync(forked(executor).andThen(fb), connection2); promiseA.future().onComplete(result -> callback.accept( - result.map(a -> Either.left(Tuple.of(a, Fiber.of(IO.fromFuture(promiseB.future()), IO.exec(connection2::cancel))))))); + result.map(a -> Either.left( + Tuple.of(a, new Fiber<>(fromFuture(promiseB.future()), exec(connection2::cancel))))))); promiseB.future().onComplete(result -> callback.accept( - result.map(b -> Either.right(Tuple.of(Fiber.of(IO.fromFuture(promiseA.future()), IO.exec(connection2::cancel)), b))))); + result.map(b -> Either.right( + Tuple.of(new Fiber<>(fromFuture(promiseA.future()), exec(connection2::cancel)), b))))); - return IO.exec(() -> { + return exec(() -> { try { connection1.cancel(); } finally { @@ -314,7 +317,7 @@ return cancellable(callback -> { Future sleep = FutureModule.sleep(executor,duration) .onComplete(result -> callback.accept(Try.success(Unit.unit()))); - return IO.exec(() -> sleep.cancel(true)); + return exec(() -> sleep.cancel(true)); }); } @@ -356,8 +359,8 @@ Function1>> result = a -> r.modify(map -> map.get(a).fold(() -> { Promise promise = Promise.make(); function.apply(a).safeRunAsync(executor, promise::tryComplete); - return Tuple.of(IO.fromFuture(promise.future()), map.put(a, promise)); - }, promise -> Tuple.of(IO.fromFuture(promise.future()), map))); + return Tuple.of(fromFuture(promise.future()), map.put(a, promise)); + }, promise -> Tuple.of(fromFuture(promise.future()), map))); return result.andThen(io -> io.flatMap(identity())); }); } @@ -381,7 +384,7 @@ .onComplete(ignore -> callback.accept(result)) )); - return IO.exec(cancellable::cancel); + return exec(cancellable::cancel); }); } @@ -396,8 +399,8 @@ } static IO sequence(Seq> sequence) { - IO initial = IO.unit(); - return sequence.foldLeft(initial, (IO a, IO b) -> a.andThen(b)).andThen(IO.unit()); + IO initial = unit(); + return sequence.foldLeft(initial, (IO a, IO b) -> a.andThen(b)).andThen(unit()); } static IO> traverse(Seq> sequence) { @@ -421,12 +424,12 @@ IOConnection connection1 = IOConnection.cancellable(); IOConnection connection2 = IOConnection.cancellable(); - Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); - Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); + Promise promiseA = runAsync(forked(executor).andThen(fa), connection1); + Promise promiseB = runAsync(forked(executor).andThen(fb), connection2); promiseA.future().onComplete(a -> promiseB.future().onComplete(b -> callback.accept(map2(a, b, mapper)))); - return IO.exec(() -> { + return exec(() -> { try { connection1.cancel(); } finally { @@ -508,13 +511,13 @@ stack.push(); var flatMapped = (FlatMapped) current; - IO source = IO.narrowK(unwrap(flatMapped.current, stack, u -> u.flatMap(flatMapped.next))); + IO source = narrowK(unwrap(flatMapped.current, stack, u -> u.flatMap(flatMapped.next))); if (source instanceof Async async) { Promise nextPromise = Promise.make(); nextPromise.future().andThen(tryU -> tryU.onFailure(promise::failure) - .onSuccess(u -> runAsync(IO.narrowK(flatMapped.next.apply(u)), connection, stack, promise))); + .onSuccess(u -> runAsync(narrowK(flatMapped.next.apply(u)), connection, stack, promise))); executeAsync(async, connection, nextPromise); @@ -551,9 +554,9 @@ stack.add(partialFunction(recover.mapper::isDefinedAt, recover.mapper.andThen(next))); current = recover.current; } else if (current instanceof Suspend suspend) { - current = IO.narrowK(suspend.lazy.get()); + current = narrowK(suspend.lazy.get()); } else if (current instanceof Delay delay) { - return IO.pure(delay.task.unchecked().get()); + return pure(delay.task.unchecked().get()); } else if (current instanceof Pure) { return current; } else if (current instanceof FlatMapped) { @@ -582,12 +585,10 @@ return promise; } - final class Pure implements IO { + record Pure(T value) implements IO { - private final T value; - - private Pure(T value) { - this.value = Objects.requireNonNull(value); + public Pure { + Objects.requireNonNull(value); } @Override @@ -596,12 +597,10 @@ } } - final class Failure implements IO { + record Failure(Throwable error) implements IO { - private final Throwable error; - - private Failure(Throwable error) { - this.error = Objects.requireNonNull(error); + public Failure { + Objects.requireNonNull(error); } @Override @@ -610,15 +609,11 @@ } } - final class FlatMapped implements IO { + record FlatMapped(IO current, Function1> next) implements IO { - private final IO current; - private final Function1> next; - - private FlatMapped(IO current, - Function1> next) { - this.current = Objects.requireNonNull(current); - this.next = Objects.requireNonNull(next); + public FlatMapped { + Objects.requireNonNull(current); + Objects.requireNonNull(next); } @Override @@ -627,12 +622,10 @@ } } - final class Delay implements IO { + record Delay(CheckedFunction0 task) implements IO { - private final CheckedFunction0 task; - - private Delay(CheckedFunction0 task) { - this.task = Objects.requireNonNull(task); + public Delay { + Objects.requireNonNull(task); } @Override @@ -641,12 +634,10 @@ } } - final class Async implements IO { + record Async(Function1>, IO> callback) implements IO { - private final Function1>, IO> callback; - - private Async(Function1>, IO> callback) { - this.callback = Objects.requireNonNull(callback); + public Async { + Objects.requireNonNull(callback); } @Override @@ -655,12 +646,10 @@ } } - final class Suspend implements IO { + record Suspend(Supplier> lazy) implements IO { - private final Supplier> lazy; - - private Suspend(Supplier> lazy) { - this.lazy = Objects.requireNonNull(lazy); + public Suspend { + Objects.requireNonNull(lazy); } @Override @@ -669,14 +658,11 @@ } } - final class Recover implements IO { + record Recover(IO current, PartialFunction> mapper) implements IO { - private final IO current; - private final PartialFunction> mapper; - - private Recover(IO current, PartialFunction> mapper) { - this.current = Objects.requireNonNull(current); - this.mapper = Objects.requireNonNull(mapper); + public Recover { + Objects.requireNonNull(current); + Objects.requireNonNull(mapper); } @Override diff --git a/src/main/java/com/github/tonivade/vavr/effect/Schedule.java b/src/main/java/com/github/tonivade/vavr/effect/Schedule.java index 176f20e..949ccb8 100644 --- a/src/main/java/com/github/tonivade/vavr/effect/Schedule.java +++ b/src/main/java/com/github/tonivade/vavr/effect/Schedule.java @@ -17,7 +17,7 @@ import io.vavr.control.Either; public sealed interface Schedule { - + Schedule map(Function1 mapper); Schedule contramap(Function1 comap); @@ -98,7 +98,7 @@ Schedule untilOutputM(Function1> condition); Schedule check(Function2> condition); - + static Schedule once() { return Schedule.recurs(1).unit(); } @@ -191,7 +191,7 @@ B extract(A last, S state); } - + private static A merge(Either either) { return either.fold(Function1.identity(), Function1.identity()); } @@ -202,26 +202,26 @@ } final class ScheduleImpl implements Schedule, Schedule.Update, Schedule.Extract { - + private final IO initial; private final Update update; private final Extract extract; - + private ScheduleImpl(IO initial, Update update, Extract extract) { this.initial = Objects.requireNonNull(initial); this.update = Objects.requireNonNull(update); this.extract = Objects.requireNonNull(extract); } - + public IO initial() { return initial; } - + @Override public IO> update(A last, S state) { return update.update(last, state); } - + @Override public B extract(A last, S state) { return extract.extract(last, state); @@ -230,16 +230,16 @@ @Override public Schedule map(Function1 mapper) { return ScheduleImpl.of( - initial, - update, + initial, + update, (a, s) -> mapper.apply(extract(a, s))); } @Override public Schedule contramap(Function1 comap) { return ScheduleImpl.of( - initial, - (c, s) -> update(comap.apply(c), s), + initial, + (c, s) -> update(comap.apply(c), s), (c, s) -> extract(comap.apply(c), s)); } @@ -248,6 +248,7 @@ return andThenEither(next).map(ScheduleImpl::merge); } + @Override public Schedule> andThenEither(Schedule next) { return doAndThenEither((ScheduleImpl) next); } @@ -265,24 +266,24 @@ @Override public Schedule foldM(Z zero, Function2>> next) { return ScheduleImpl.of( - initial.map(s -> Tuple.of(s, zero)), + initial.map(s -> Tuple.of(s, zero)), (a, sz) -> { IO> update = update(a, sz._1()); IO> other = next.apply(sz._2(), extract(a, sz._1())).map(Either::narrow); return IO.parMap2(update, other, (x, y) -> ScheduleImpl.>map2(x, y, Tuple::of)); - }, + }, (a, sz) -> sz._2()); } @Override public Schedule addDelayM(Function1> map) { return updated(u -> (a, s) -> { - IO>> map2 = + IO>> map2 = IO.parMap2( - map.apply(extract(a, s)), - u.update(a, s), + map.apply(extract(a, s)), + u.update(a, s), (duration, either) -> either.map(x -> Tuple.of(duration, x))); - + return map2.flatMap((Either> either) -> { IO fold = either.fold(IO::pure, tuple -> IO.sleep(tuple._1())); return fold.map(ignore -> either.map(Tuple2::_2)); @@ -324,7 +325,7 @@ IO> u = other.update(a, t); return u.map(e -> e.map(Either::right)); }); - IO>> map = + IO>> map = this.update(a, s).map(e -> e.map(Either::left)); return IO.parMap2(map, orElse, Either>::orElse); }, @@ -361,11 +362,12 @@ private static IO> tuple(IO a, IO b) { return IO.parMap2(a, b, Tuple::of); } - - private static Either map2(Either a, Either b, Function2 map) { + + private static Either map2( + Either a, Either b, Function2 map) { return a.flatMap(x -> b.map(y -> map.apply(x, y))); } - + private static A merge(Either either) { return either.fold(Function1.identity(), Function1.identity()); } @@ -373,9 +375,9 @@ private ScheduleImpl updated(Function1, Update> update) { return ScheduleImpl.of(initial, update.apply(this.update), this.extract); } - + public static ScheduleImpl of( - IO initial, + IO initial, Update update, Extract extract) { return new ScheduleImpl<>(initial, update, extract); diff --git a/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java b/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java index 7cdd9fa..260e441 100644 --- a/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java +++ b/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java @@ -14,7 +14,6 @@ import java.time.Duration; import java.util.function.Consumer; import java.util.function.Supplier; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -22,7 +21,6 @@ import io.vavr.Tuple2; import io.vavr.collection.List; import io.vavr.collection.Seq; -import io.vavr.control.Either; @ExtendWith(MockitoExtension.class) class ScheduleTest { @@ -136,17 +134,16 @@ } @Test - @Disabled("I don't understand very well this") void compose(@Mock Consumer console) { Schedule two = - Schedule.recurs(1).compose(Schedule.recurs(1)); + Schedule.recurs(2).compose(Schedule.recurs(2)); IO print = IO.exec(() -> console.accept("hola")); IO repeat = print.repeat(two); Integer provide = repeat.unsafeRunSync(); - assertEquals(Either.right(1), provide); + assertEquals(2, provide); verify(console, times(3)).accept("hola"); }