diff --git a/build.gradle b/build.gradle index a0103e8..2d7841d 100644 --- a/build.gradle +++ b/build.gradle @@ -10,28 +10,28 @@ java { toolchain { - languageVersion = JavaLanguageVersion.of(17) + languageVersion = JavaLanguageVersion.of(21) } } compileJava { options.compilerArgs << '-Xlint:unchecked' options.compilerArgs << '-Xlint:rawtypes' - options.release = 17 + options.release = 21 } compileTestJava { options.compilerArgs << '-Xlint:unchecked' options.compilerArgs << '-Xlint:rawtypes' - options.release = 17 + options.release = 21 } dependencies { api 'io.vavr:vavr:0.10.4' - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.1' - testImplementation("org.mockito:mockito-core:5.7.0") - testImplementation("org.mockito:mockito-junit-jupiter:5.7.0") + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation("org.mockito:mockito-core:5.10.0") + testImplementation("org.mockito:mockito-junit-jupiter:5.10.0") } tasks.named('test') { diff --git a/src/main/java/com/github/tonivade/vavr/effect/IO.java b/src/main/java/com/github/tonivade/vavr/effect/IO.java index 64f59e1..4f13d46 100644 --- a/src/main/java/com/github/tonivade/vavr/effect/IO.java +++ b/src/main/java/com/github/tonivade/vavr/effect/IO.java @@ -52,7 +52,7 @@ default Future runAsync(Executor executor) { return forked(executor).andThen(this).runAsync(); } - + default T unsafeRunSync() { return safeRunSync().get(); } @@ -74,7 +74,7 @@ } default IO flatMap(Function1> map) { - return new FlatMapped<>(this, map.andThen(IO::narrowK)); + return new FlatMapped<>(this, map); } default IO andThen(IO after) { @@ -125,15 +125,15 @@ return IO.task(System::nanoTime).flatMap( start -> map(result -> Tuple.of(Duration.ofNanos(System.nanoTime() - start), result))); } - + default IO> fork() { return async(callback -> { IOConnection connection = IOConnection.cancellable(); Promise promise = runAsync(this, connection); - + IO join = fromFuture(promise.future()); IO cancel = exec(connection::cancel); - + callback.accept(Try.success(Fiber.of(join, cancel))); }); } @@ -141,7 +141,7 @@ default IO timeout(Duration duration) { return timeout(Future.DEFAULT_EXECUTOR, duration); } - + default IO timeout(Executor executor, Duration duration) { return racePair(executor, this, sleep(duration)).flatMap(either -> either.fold( ta -> ta._2().cancel().map(x -> ta._1()), @@ -163,7 +163,7 @@ default IO repeat(Duration delay, int times) { return repeat(Schedule.recursSpaced(delay, times).zipRight(Schedule.identity())); } - + default IO repeat(Schedule schedule) { return repeatOrElse(schedule, (e, b) -> raiseError(e)); } @@ -195,7 +195,7 @@ default IO retry(Duration delay, int maxRetries) { return retry(Schedule.recursSpaced(delay, maxRetries)); } - + default IO retry(Schedule schedule) { return retryOrElse(schedule, (e, b) -> raiseError(e)); } @@ -220,23 +220,23 @@ static IO pure(T value) { return new Pure<>(value); } - + static IO> race(IO fa, IO fb) { return race(Future.DEFAULT_EXECUTOR, fa, fb); } - + static IO> race(Executor executor, IO fa, IO fb) { return racePair(executor, fa, fb).flatMap(either -> either.fold( ta -> ta._2().cancel().map(x -> Either.left(ta._1())), tb -> tb._1().cancel().map(x -> Either.right(tb._2())))); } - + static IO>, Tuple2, B>>> racePair(Executor executor, IO fa, IO fb) { return cancellable(callback -> { - + IOConnection connection1 = IOConnection.cancellable(); IOConnection connection2 = IOConnection.cancellable(); - + Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); @@ -294,12 +294,12 @@ static IO fromEither(Either task) { return task.fold(IO::raiseError, IO::pure); } - + static IO fromFuture(Future promise) { CheckedConsumer>> callback = promise::onComplete; return async(callback); } - + static IO fromCompletableFuture(CompletableFuture promise) { return fromFuture(Future.fromCompletableFuture(promise)); } @@ -327,11 +327,11 @@ static IO never() { return async(callback -> {}); } - + static IO forked() { return forked(Future.DEFAULT_EXECUTOR); } - + static IO forked(Executor executor) { return async(callback -> executor.execute(() -> callback.accept(Try.success(Unit.unit())))); } @@ -364,31 +364,31 @@ return UNIT; } - static IO bracket(IO acquire, + static IO bracket(IO acquire, Function1> use, Function1> release) { return cancellable(callback -> { - + IOConnection cancellable = IOConnection.cancellable(); - + Promise promise = runAsync(acquire, cancellable); - + promise.future() .onFailure(error -> callback.accept(Try.failure(error))) - .onSuccess(resource -> runAsync(use.andThen(IO::narrowK).apply(resource), cancellable).future() - .onComplete(result -> runAsync(release.andThen(IO::narrowK).apply(resource), cancellable).future() + .onSuccess(resource -> runAsync(use.apply(resource), cancellable).future() + .onComplete(result -> runAsync(release.apply(resource), cancellable).future() .onComplete(ignore -> callback.accept(result)) )); - + return IO.exec(cancellable::cancel); }); } - static IO bracket(IO acquire, + static IO bracket(IO acquire, Function1> use, CheckedConsumer release) { return bracket(acquire, use, asFunction(release)); } - static IO bracket(IO acquire, + static IO bracket(IO acquire, Function1> use) { return bracket(acquire, use, AutoCloseable::close); } @@ -403,7 +403,7 @@ } static IO> traverse(Executor executor, Seq> sequence) { - return sequence.foldLeft(pure(List.empty()), + return sequence.foldLeft(pure(List.empty()), (IO> xs, IO a) -> parMap2(executor, xs, a, Seq::append)); } @@ -415,15 +415,15 @@ static IO parMap2(Executor executor, IO fa, IO fb, Function2 mapper) { return cancellable(callback -> { - + IOConnection connection1 = IOConnection.cancellable(); IOConnection connection2 = IOConnection.cancellable(); - + Promise promiseA = runAsync(IO.forked(executor).andThen(fa), connection1); Promise promiseB = runAsync(IO.forked(executor).andThen(fb), connection2); - + promiseA.future().onComplete(a -> promiseB.future().onComplete(b -> callback.accept(map2(a, b, mapper)))); - + return IO.exec(() -> { try { connection1.cancel(); @@ -453,20 +453,20 @@ private static Either toEither(Option task) { return task.fold(() -> Either.left(new NoSuchElementException()), Either::right); } - + private static A merge(Either either) { return either.fold(Function1.identity(), Function1.identity()); } - + @SuppressWarnings("serial") private static PartialFunction partialFunction(Predicate matcher, Function1 function) { return new PartialFunction<>() { - + @Override public boolean isDefinedAt(A value) { return matcher.test(value); } - + @Override public B apply(A t) { return function.apply(t); @@ -493,29 +493,29 @@ while (true) { try { current = unwrap(current, stack, identity()); - + if (current instanceof Pure pure) { return promise.success(pure.value); } - + if (current instanceof Async async) { return executeAsync(async, connection, promise); } - + if (current instanceof FlatMapped) { stack.push(); var flatMapped = (FlatMapped) current; IO source = IO.narrowK(unwrap(flatMapped.current, stack, u -> u.flatMap(flatMapped.next))); - + if (source instanceof Async async) { Promise nextPromise = Promise.make(); - + nextPromise.future().andThen(tryU -> tryU.onFailure(promise::failure) .onSuccess(u -> runAsync(IO.narrowK(flatMapped.next.apply(u)), connection, stack, promise))); - + executeAsync(async, connection, nextPromise); - + return promise; } @@ -531,7 +531,7 @@ } } catch (Throwable error) { Option> result = stack.tryHandle(error); - + if (result.isDefined()) { current = result.get(); } else { @@ -549,8 +549,7 @@ stack.add(partialFunction(recover.mapper::isDefinedAt, recover.mapper.andThen(next))); current = recover.current; } else if (current instanceof Suspend suspend) { - Supplier> andThen = () -> IO.narrowK(suspend.lazy.get()); - current = andThen.get(); + current = IO.narrowK(suspend.lazy.get()); } else if (current instanceof Delay delay) { return IO.pure(delay.task.unchecked().get()); } else if (current instanceof Pure) { @@ -569,11 +568,11 @@ if (connection.isCancellable() && !connection.updateState(StateIO::startingNow).isRunnable()) { return promise.complete(Try.failure(new CancellationException())); } - + connection.setCancelToken(current.callback.apply(promise::tryComplete)); - + promise.future().andThen(x -> connection.setCancelToken(UNIT)); - + if (connection.isCancellable() && connection.updateState(StateIO::notStartingNow).isCancellingNow()) { connection.cancelNow(); } @@ -686,7 +685,7 @@ } final class Repeat { - + private final IO current; private final ScheduleImpl schedule; private final Function2, IO> orElse; @@ -697,7 +696,7 @@ this.schedule = (ScheduleImpl) Objects.requireNonNull(schedule); this.orElse = Objects.requireNonNull(orElse); } - + IO> run() { return current.attempt().flatMap(either -> either.fold( error -> orElse.apply(error, Option.none()).map(Either::left), @@ -706,9 +705,9 @@ private IO> loop(A later, S state) { return schedule.update(later, state).flatMap(decision -> decision.fold( - ignore -> IO.pure(Either.right(schedule.extract(later, state))), + ignore -> IO.pure(Either.right(schedule.extract(later, state))), s -> current.attempt().flatMap(either -> either.fold( - e -> orElse.apply(e, Option.some(schedule.extract(later, state))).map(Either::left), + e -> orElse.apply(e, Option.some(schedule.extract(later, state))).map(Either::left), a -> loop(a, s))))); } } @@ -735,34 +734,34 @@ error -> { IO> update = schedule.update(error, state); return update.flatMap(decision -> decision.fold( - ignore -> orElse.apply(error, state).map(Either::left), + ignore -> orElse.apply(error, state).map(Either::left), this::loop)); - }, + }, a -> IO.pure(Either.right(a))) ); } } sealed interface IOConnection { - + IOConnection UNCANCELLABLE = new Uncancellable(); - + boolean isCancellable(); void setCancelToken(IO cancel); - + void cancelNow(); - + void cancel(); - + StateIO updateState(UnaryOperator update); - + static IOConnection cancellable() { return new Cancellable(); } - + static final class Uncancellable implements IOConnection { - + private Uncancellable() { } @Override @@ -790,7 +789,7 @@ return StateIO.INITIAL; } } - + static final class Cancellable implements IOConnection { private IO cancelToken; @@ -830,35 +829,35 @@ } record StateIO(boolean isCancelled, boolean isCancellingNow, boolean isStartingNow) { - + static final StateIO INITIAL = new StateIO(false, false, false); static final StateIO CANCELLED = new StateIO(true, false, false); - + StateIO cancellingNow() { return new StateIO(isCancelled, true, isStartingNow); } - + StateIO startingNow() { return new StateIO(isCancelled, isCancellingNow, true); } - + StateIO notStartingNow() { return new StateIO(isCancelled, isCancellingNow, false); } - + boolean isCancelable() { return !isCancelled && !isCancellingNow && !isStartingNow; } - + boolean isRunnable() { return !isCancelled && !isCancellingNow; } } final class CallStack { - + private StackItem top = new StackItem<>(); - + void push() { top.push(); } @@ -870,7 +869,7 @@ top = top.prev(); } } - + void add(PartialFunction> mapError) { if (top.count() > 0) { top.pop(); @@ -878,12 +877,12 @@ } top.add(mapError); } - + Option> tryHandle(Throwable error) { while (top != null) { top.reset(); Option> result = top.tryHandle(error); - + if (result.isDefined()) { return result; } else { @@ -892,7 +891,7 @@ } return Option.none(); } - + // XXX: https://www.baeldung.com/java-sneaky-throws @SuppressWarnings("unchecked") R sneakyThrow(Throwable t) throws X { @@ -901,7 +900,7 @@ } final class StackItem { - + private int count = 0; private final Deque>> recover = new ArrayDeque<>(); @@ -914,27 +913,27 @@ StackItem(StackItem prev) { this.prev = prev; } - + StackItem prev() { return prev; } - + int count() { return count; } - + void push() { count++; } - + void pop() { count--; } - + void reset() { count = 0; } - + void add(PartialFunction> mapError) { recover.addFirst(mapError); } @@ -943,7 +942,7 @@ while (!recover.isEmpty()) { var mapError = recover.removeFirst(); if (mapError.isDefinedAt(error)) { - return Option.some(mapError.andThen(IO::narrowK).apply(error)); + return Option.some(IO.narrowK(mapError.apply(error))); } } return Option.none(); diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java deleted file mode 100644 index 03f8505..0000000 --- a/src/main/java/module-info.java +++ /dev/null @@ -1,5 +0,0 @@ -module com.github.tonivade.vavr.effect { - exports com.github.tonivade.vavr.effect; - - requires io.vavr; -} \ No newline at end of file diff --git a/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java b/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java index b6843f4..e604c76 100644 --- a/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java +++ b/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java @@ -31,16 +31,29 @@ void repeat(@Mock Consumer console) { IO print = IO.exec(() -> console.accept("hola")); Schedule schedule = Schedule.recurs(2).zipRight(Schedule.identity()); - + IO repeat = print.repeat(schedule); - + Unit result = repeat.unsafeRunSync(); - + assertEquals(unit(), result); verify(console, times(3)).accept("hola"); } @Test + void repeatStackSafe(@Mock Consumer console) { + IO print = IO.exec(() -> console.accept("hola")); + Schedule schedule = Schedule.recurs(10000).zipRight(Schedule.identity()); + + IO repeat = print.repeat(schedule); + + Unit result = repeat.unsafeRunSync(); + + assertEquals(unit(), result); + verify(console, times(10001)).accept("hola"); + } + + @Test void repeatDelay(@Mock Consumer console) { IO print = IO.exec(() -> console.accept("hola")); Schedule recurs = Schedule.recurs(2).zipRight(Schedule.identity()); @@ -49,20 +62,20 @@ IO repeat = print.repeat(schedule); IO> timed = repeat.timed(); - + Tuple2 result = timed.unsafeRunSync(); - + assertTrue(result._1().toMillis() > 1000); verify(console, times(3)).accept("hola"); } - + @Test void noRepeat(@Mock Consumer console) { IO print = IO.exec(() -> console.accept("hola")); IO repeat = print.repeat(Schedule.never()); - + Unit result = repeat.unsafeRunSync(); - + assertEquals(unit(), result); verify(console).accept("hola"); } @@ -75,9 +88,9 @@ IO read = IO.task(console::get); IO retry = read.retry(Schedule.recurs(1)); - + String provide = retry.unsafeRunSync(); - + assertEquals("hola", provide); verify(console, times(2)).get(); } @@ -90,24 +103,24 @@ Schedule recurs = Schedule.recurs(2); Schedule spaced = Schedule.spaced(Duration.ofMillis(500)); IO> retry = read.retry(recurs.zip(spaced)).timed(); - + Tuple2 result = retry.unsafeRunSync(); - + assertTrue(result._1().toMillis() > 500); assertEquals("hola", result._2()); verify(console, times(2)).get(); } - + @Test void noRetry(@Mock Supplier console) { when(console.get()).thenThrow(UnsupportedOperationException.class).thenReturn("hola"); - + IO read = IO.task(console::get); IO retry = read.retry(Schedule.never()); - + assertThrows(UnsupportedOperationException.class, retry::unsafeRunSync); } - + @Test void andThen(@Mock Consumer console) { Schedule two = @@ -115,13 +128,13 @@ IO print = IO.exec(() -> console.accept("hola")); IO repeat = print.repeat(two); - + Integer provide = repeat.unsafeRunSync(); - + assertEquals(1, provide); verify(console, times(3)).accept("hola"); } - + @Test @Disabled("I don't understand very well this") void compose(@Mock Consumer console) { @@ -130,22 +143,22 @@ IO print = IO.exec(() -> console.accept("hola")); IO repeat = print.repeat(two); - + Integer provide = repeat.unsafeRunSync(); - + assertEquals(Either.right(1), provide); verify(console, times(3)).accept("hola"); } - + @Test void collect() { IO pure = IO.unit(); - + Schedule> schedule = Schedule.recurs(5).collectAll().zipLeft(Schedule.identity()); IO> repeat = pure.repeat(schedule); - + Seq result = repeat.unsafeRunSync(); - + assertEquals(List.of(0, 1, 2, 3, 4), result); } }