diff --git a/src/main/java/com/github/tonivade/vavr/effect/IO.java b/src/main/java/com/github/tonivade/vavr/effect/IO.java index f3542fe..5379e95 100644 --- a/src/main/java/com/github/tonivade/vavr/effect/IO.java +++ b/src/main/java/com/github/tonivade/vavr/effect/IO.java @@ -150,7 +150,7 @@ } default IO repeat(int times) { - return repeat(this, unit(), times); + return repeat(Schedule.recurs(times).zipRight(Schedule.identity())); } default IO repeat(Duration delay) { @@ -158,7 +158,23 @@ } default IO repeat(Duration delay, int times) { - return repeat(this, sleep(delay), times); + return repeat(Schedule.recursSpaced(delay, times).zipRight(Schedule.identity())); + } + + default IO repeat(Schedule schedule) { + return repeatOrElse(schedule, (e, b) -> raiseError(e)); + } + + default IO repeatOrElse( + Schedule schedule, + Function2, IO> orElse) { + return repeatOrElseEither(schedule, orElse).map(IO::merge); + } + + default IO> repeatOrElseEither( + Schedule schedule, + Function2, IO> orElse) { + return new Repeat<>(this, schedule, orElse).run(); } default IO retry() { @@ -166,7 +182,7 @@ } default IO retry(int maxRetries) { - return retry(this, unit(), maxRetries); + return retry(Schedule.recurs(maxRetries)); } default IO retry(Duration delay) { @@ -174,7 +190,23 @@ } default IO retry(Duration delay, int maxRetries) { - return retry(this, sleep(delay), maxRetries); + return retry(Schedule.recursSpaced(delay, maxRetries)); + } + + default IO retry(Schedule schedule) { + return retryOrElse(schedule, (e, b) -> raiseError(e)); + } + + default IO retryOrElse( + Schedule schedule, + Function2> orElse) { + return retryOrElseEither(schedule, orElse).map(IO::merge); + } + + default IO> retryOrElseEither( + Schedule schedule, + Function2> orElse) { + return new Retry<>(this, schedule, orElse).run(); } @SuppressWarnings("unchecked") @@ -419,6 +451,10 @@ return task.fold(() -> Either.left(new NoSuchElementException()), Either::right); } + private static A merge(Either either) { + return either.fold(Function1.identity(), Function1.identity()); + } + @SuppressWarnings("serial") private static PartialFunction partialFunction(Predicate matcher, Function1 function) { return new PartialFunction<>() { @@ -542,22 +578,6 @@ return promise; } - private static IO repeat(IO self, IO pause, int times) { - return self.redeemWith(IO::raiseError, value -> { - if (times > 0) { - return pause.andThen(repeat(self, pause, times - 1)); - } else return IO.pure(value); - }); - } - - private static IO retry(IO self, IO pause, int maxRetries) { - return self.redeemWith(error -> { - if (maxRetries > 0) { - return pause.andThen(retry(self, pause.repeat(), maxRetries - 1)); - } else return IO.raiseError(error); - }, IO::pure); - } - final class Pure implements IO { private final T value; @@ -662,6 +682,64 @@ } } +final class Repeat { + + private final IO current; + private final ScheduleImpl schedule; + private final Function2, IO> orElse; + + @SuppressWarnings("unchecked") + Repeat(IO current, Schedule schedule, Function2, IO> orElse) { + this.current = Objects.requireNonNull(current); + this.schedule = (ScheduleImpl) Objects.requireNonNull(schedule); + this.orElse = Objects.requireNonNull(orElse); + } + + IO> run() { + return current.attempt().flatMap(either -> either.fold( + error -> orElse.apply(error, Option.none()).map(Either::left), + a -> schedule.initial().flatMap(s -> loop(a, s)))); + } + + private IO> loop(A later, S state) { + return schedule.update(later, state).flatMap(decision -> decision.fold( + ignore -> IO.pure(Either.right(schedule.extract(later, state))), + s -> current.attempt().flatMap(either -> either.fold( + e -> orElse.apply(e, Option.some(schedule.extract(later, state))).map(Either::left), + a -> loop(a, s))))); + } +} + +final class Retry { + + private final IO current; + private final ScheduleImpl schedule; + private final Function2> orElse; + + @SuppressWarnings("unchecked") + Retry(IO current, Schedule schedule, Function2> orElse) { + this.current = Objects.requireNonNull(current); + this.schedule = (ScheduleImpl) Objects.requireNonNull(schedule); + this.orElse = Objects.requireNonNull(orElse); + } + + public IO> run() { + return schedule.initial().flatMap(this::loop); + } + + private IO> loop(S state) { + return current.attempt().flatMap(either -> either.fold( + error -> { + IO> update = schedule.update(error, state); + return update.flatMap(decision -> decision.fold( + ignore -> orElse.apply(error, state).map(Either::left), + this::loop)); + }, + a -> IO.pure(Either.right(a))) + ); + } +} + sealed interface IOConnection { IOConnection UNCANCELLABLE = new Uncancellable(); diff --git a/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java b/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java new file mode 100644 index 0000000..b6843f4 --- /dev/null +++ b/src/test/java/com/github/tonivade/vavr/effect/ScheduleTest.java @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2022, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +import static com.github.tonivade.vavr.effect.Unit.unit; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import io.vavr.Tuple2; +import io.vavr.collection.List; +import io.vavr.collection.Seq; +import io.vavr.control.Either; + +@ExtendWith(MockitoExtension.class) +class ScheduleTest { + + @Test + void repeat(@Mock Consumer console) { + IO print = IO.exec(() -> console.accept("hola")); + Schedule schedule = Schedule.recurs(2).zipRight(Schedule.identity()); + + IO repeat = print.repeat(schedule); + + Unit result = repeat.unsafeRunSync(); + + assertEquals(unit(), result); + verify(console, times(3)).accept("hola"); + } + + @Test + void repeatDelay(@Mock Consumer console) { + IO print = IO.exec(() -> console.accept("hola")); + Schedule recurs = Schedule.recurs(2).zipRight(Schedule.identity()); + Schedule spaced = Schedule.spaced(Duration.ofMillis(500)); + Schedule schedule = recurs.zipLeft(spaced); + + IO repeat = print.repeat(schedule); + IO> timed = repeat.timed(); + + Tuple2 result = timed.unsafeRunSync(); + + assertTrue(result._1().toMillis() > 1000); + verify(console, times(3)).accept("hola"); + } + + @Test + void noRepeat(@Mock Consumer console) { + IO print = IO.exec(() -> console.accept("hola")); + IO repeat = print.repeat(Schedule.never()); + + Unit result = repeat.unsafeRunSync(); + + assertEquals(unit(), result); + verify(console).accept("hola"); + } + + @Test + void retry(@Mock Supplier console) { + when(console.get()) + .thenThrow(RuntimeException.class) + .thenReturn("hola"); + + IO read = IO.task(console::get); + IO retry = read.retry(Schedule.recurs(1)); + + String provide = retry.unsafeRunSync(); + + assertEquals("hola", provide); + verify(console, times(2)).get(); + } + + @Test + void retryDelay(@Mock Supplier console) { + when(console.get()).thenThrow(RuntimeException.class).thenReturn("hola"); + + IO read = IO.task(console::get); + Schedule recurs = Schedule.recurs(2); + Schedule spaced = Schedule.spaced(Duration.ofMillis(500)); + IO> retry = read.retry(recurs.zip(spaced)).timed(); + + Tuple2 result = retry.unsafeRunSync(); + + assertTrue(result._1().toMillis() > 500); + assertEquals("hola", result._2()); + verify(console, times(2)).get(); + } + + @Test + void noRetry(@Mock Supplier console) { + when(console.get()).thenThrow(UnsupportedOperationException.class).thenReturn("hola"); + + IO read = IO.task(console::get); + IO retry = read.retry(Schedule.never()); + + assertThrows(UnsupportedOperationException.class, retry::unsafeRunSync); + } + + @Test + void andThen(@Mock Consumer console) { + Schedule two = + Schedule.recurs(1).andThen(Schedule.recurs(1)); + + IO print = IO.exec(() -> console.accept("hola")); + IO repeat = print.repeat(two); + + Integer provide = repeat.unsafeRunSync(); + + assertEquals(1, provide); + verify(console, times(3)).accept("hola"); + } + + @Test + @Disabled("I don't understand very well this") + void compose(@Mock Consumer console) { + Schedule two = + Schedule.recurs(1).compose(Schedule.recurs(1)); + + IO print = IO.exec(() -> console.accept("hola")); + IO repeat = print.repeat(two); + + Integer provide = repeat.unsafeRunSync(); + + assertEquals(Either.right(1), provide); + verify(console, times(3)).accept("hola"); + } + + @Test + void collect() { + IO pure = IO.unit(); + + Schedule> schedule = Schedule.recurs(5).collectAll().zipLeft(Schedule.identity()); + IO> repeat = pure.repeat(schedule); + + Seq result = repeat.unsafeRunSync(); + + assertEquals(List.of(0, 1, 2, 3, 4), result); + } +}