diff --git a/src/main/java/com/github/tonivade/vavr/effect/IO.java b/src/main/java/com/github/tonivade/vavr/effect/IO.java index 8e8e1e4..f3542fe 100644 --- a/src/main/java/com/github/tonivade/vavr/effect/IO.java +++ b/src/main/java/com/github/tonivade/vavr/effect/IO.java @@ -113,22 +113,6 @@ default IO recover(Class type, Function1 function) { return recoverWith(partialFunction(error -> error.getClass().equals(type), t -> function.andThen(IO::pure).apply((X) t))); } - - @SuppressWarnings("serial") - static PartialFunction partialFunction(Predicate matcher, Function1 function) { - return new PartialFunction<>() { - - @Override - public boolean isDefinedAt(A value) { - return matcher.test(value); - } - - @Override - public B apply(A t) { - return function.apply(t); - } - }; - } default IO recoverWith(PartialFunction> mapper) { return new Recover<>(this, mapper); @@ -434,6 +418,22 @@ private static Either toEither(Option task) { return task.fold(() -> Either.left(new NoSuchElementException()), Either::right); } + + @SuppressWarnings("serial") + private static PartialFunction partialFunction(Predicate matcher, Function1 function) { + return new PartialFunction<>() { + + @Override + public boolean isDefinedAt(A value) { + return matcher.test(value); + } + + @Override + public B apply(A t) { + return function.apply(t); + } + }; + } private static Supplier asSupplier(CheckedRunnable task) { return () -> { diff --git a/src/main/java/com/github/tonivade/vavr/effect/Schedule.java b/src/main/java/com/github/tonivade/vavr/effect/Schedule.java new file mode 100644 index 0000000..dc65f7d --- /dev/null +++ b/src/main/java/com/github/tonivade/vavr/effect/Schedule.java @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2018-2021, Antonio Gabriel Muñoz Conejo + * Distributed under the terms of the MIT License + */ +package com.github.tonivade.vavr.effect; + +import java.time.Duration; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; +import io.vavr.Function1; +import io.vavr.Function2; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.collection.List; +import io.vavr.collection.Seq; +import io.vavr.control.Either; + +public sealed interface Schedule { + + Schedule map(Function1 mapper); + + Schedule contramap(Function1 comap); + + default Schedule dimap(Function1 comap, Function1 map) { + Schedule contramap = contramap(comap); + return contramap.map(map); + } + + default Schedule as(C value) { + return map(ignore -> value); + } + + default Schedule unit() { + return as(Unit.unit()); + } + + default Schedule andThen(Schedule next) { + return andThenEither(next).map(Schedule::merge); + } + + Schedule> andThenEither(Schedule next); + + Schedule> zip(Schedule other); + + default Schedule zipLeft(Schedule other) { + return zip(other).map(Tuple2::_1); + } + + default Schedule zipRight(Schedule other) { + return zip(other).map(Tuple2::_2); + } + + Schedule compose(Schedule other); + + default Schedule> collectAll() { + return this.>fold(List.empty(), Seq::append); + } + + default Schedule fold(Z zero, Function2 next) { + return foldM(zero, (z, b) -> IO.pure(next.andThen(Either::right).apply(z, b))); + } + + Schedule foldM(Z zero, Function2>> next); + + default Schedule addDelay(Function1 map) { + return addDelayM(map.andThen(IO::pure)); + } + + Schedule addDelayM(Function1> map); + + default Schedule whileInput(Predicate condition) { + return whileInputM(asFunction(condition).andThen(IO::pure)); + } + + default Schedule whileInputM(Function1> condition) { + return check((a, b) -> condition.apply(a)); + } + + default Schedule whileOutput(Predicate condition) { + return whileOutputM(asFunction(condition).andThen(IO::pure)); + } + + default Schedule whileOutputM(Function1> condition) { + return check((a, b) -> condition.apply(b)); + } + + default Schedule untilInput(Predicate condition) { + return untilInputM(asFunction(condition).andThen(IO::pure)); + } + + Schedule untilInputM(Function1> condition); + + default Schedule untilOutput(Predicate condition) { + return untilOutputM(asFunction(condition).andThen(IO::pure)); + } + + Schedule untilOutputM(Function1> condition); + + Schedule check(Function2> condition); + + static Schedule once() { + return Schedule.recurs(1).unit(); + } + + static Schedule recurs(int times) { + return Schedule.forever().whileOutput(x -> x < times); + } + + static Schedule spaced(Duration delay) { + return Schedule.forever().addDelay(x -> delay); + } + + static Schedule linear(Duration delay) { + return delayed(Schedule.forever().map(i -> delay.multipliedBy(i + 1L))); + } + + static Schedule exponential(Duration delay) { + return exponential(delay, 2.0); + } + + static Schedule exponential(Duration delay, double factor) { + return delayed(Schedule.forever().map(i -> delay.multipliedBy((long) Math.pow(factor, i.doubleValue())))); + } + + static Schedule delayed(Schedule schedule) { + return schedule.addDelay(x -> x); + } + + static Schedule> recursSpaced(Duration delay, int times) { + return Schedule.recurs(times).zip(Schedule.spaced(delay)); + } + + static Schedule never() { + return ScheduleImpl.of( + IO.pure(Unit.unit()), + (a, s) -> IO.pure(Either.left(Unit.unit())), + (a, s) -> s); + } + + static Schedule forever() { + return unfold(0, a -> a + 1); + } + + static Schedule succeed(B value) { + return Schedule.forever().as(value); + } + + static Schedule identity() { + return ScheduleImpl.of( + IO.pure(Unit.unit()), + (a, s) -> IO.pure(Either.right(Unit.unit())), + (a, s) -> a); + } + + static Schedule doWhile(Predicate condition) { + return Schedule.doWhileM(asFunction(condition).andThen(IO::pure)); + } + + static Schedule doWhileM(Function1> condition) { + return Schedule.identity().whileInputM(condition); + } + + static Schedule doUntil(Predicate condition) { + return doUntilM(asFunction(condition).andThen(IO::pure)); + } + + static Schedule doUntilM(Function1> condition) { + return Schedule.identity().untilInputM(condition); + } + + static Schedule unfold(B initial, UnaryOperator next) { + return unfoldM(IO.pure(initial), next.andThen(Either::right).andThen(IO::pure)::apply); + } + + static Schedule unfoldM( + IO initial, Function1>> next) { + return ScheduleImpl.of(initial, (a, s) -> next.apply(s), (a, s) -> s); + } + + @FunctionalInterface + interface Update { + + IO> update(A last, S state); + + } + + @FunctionalInterface + interface Extract { + + B extract(A last, S state); + + } + + private static A merge(Either either) { + return either.fold(Function1.identity(), Function1.identity()); + } + + private static Function1 asFunction(Predicate condition) { + return condition::test; + } +} + +final class ScheduleImpl implements Schedule, Schedule.Update, Schedule.Extract { + + private final IO initial; + private final Update update; + private final Extract extract; + + private ScheduleImpl(IO initial, Update update, Extract extract) { + this.initial = Objects.requireNonNull(initial); + this.update = Objects.requireNonNull(update); + this.extract = Objects.requireNonNull(extract); + } + + public IO initial() { + return initial; + } + + @Override + public IO> update(A last, S state) { + return update.update(last, state); + } + + @Override + public B extract(A last, S state) { + return extract.extract(last, state); + } + + @Override + public Schedule map(Function1 mapper) { + return ScheduleImpl.of( + initial, + update, + (a, s) -> mapper.apply(extract(a, s))); + } + + @Override + public Schedule contramap(Function1 comap) { + return ScheduleImpl.of( + initial, + (c, s) -> update(comap.apply(c), s), + (c, s) -> extract(comap.apply(c), s)); + } + + @Override + public Schedule andThen(Schedule next) { + return andThenEither(next).map(ScheduleImpl::merge); + } + + public Schedule> andThenEither(Schedule next) { + return doAndThenEither((ScheduleImpl) next); + } + + @Override + public Schedule> zip(Schedule other) { + return doZip((ScheduleImpl) other); + } + + @Override + public Schedule compose(Schedule other) { + return doCompose((ScheduleImpl) other); + } + + @Override + public Schedule foldM(Z zero, Function2>> next) { + return ScheduleImpl.of( + initial.map(s -> Tuple.of(s, zero)), + (a, sz) -> { + IO> update = update(a, sz._1()); + IO> other = next.apply(sz._2(), extract(a, sz._1())).map(Either::narrow); + return IO.parMap2(update, other, (x, y) -> ScheduleImpl.>map2(x, y, Tuple::of)); + }, + (a, sz) -> sz._2()); + } + + @Override + public Schedule addDelayM(Function1> map) { + return updated(u -> (a, s) -> { + IO>> map2 = + IO.parMap2( + map.apply(extract(a, s)), + u.update(a, s), + (duration, either) -> either.map(x -> Tuple.of(duration, x))); + + return map2.flatMap((Either> either) -> { + IO fold = either.fold(IO::pure, tuple -> IO.sleep(tuple._1())); + return fold.map(ignore -> either.map(Tuple2::_2)); + }); + }); + } + + @Override + public Schedule untilInputM(Function1> condition) { + return updated(u -> (a, s) -> { + IO apply = condition.apply(a); + return apply.flatMap(test -> test ? IO.pure(Either.left(Unit.unit())) : update(a, s)); + }); + } + + @Override + public Schedule untilOutputM(Function1> condition) { + return updated(u -> (a, s) -> { + IO apply = condition.apply(extract(a, s)); + return apply.flatMap(test -> test ? IO.pure(Either.left(Unit.unit())) : update(a, s)); + }); + } + + @Override + public Schedule check(Function2> condition) { + return updated(u -> (a, s) -> { + IO apply = condition.apply(a, extract(a, s)); + return apply.flatMap(result -> result ? u.update(a, s) : IO.pure(Either.left(Unit.unit()))); + }); + } + + private ScheduleImpl, A, Either> doAndThenEither(ScheduleImpl other) { + return ScheduleImpl., A, Either>of( + initial.map(Either::left), + (a, st) -> st.fold( + s -> { + IO>> orElse = + other.initial.flatMap(t -> { + IO> u = other.update(a, t); + return u.map(e -> e.map(Either::right)); + }); + IO>> map = + this.update(a, s).map(e -> e.map(Either::left)); + return IO.parMap2(map, orElse, Either>::orElse); + }, + t -> other.update(a, t).map(e -> e.map(Either::right))), + (a, st) -> st.fold( + s -> Either.left(this.extract(a, s)), + t -> Either.right(other.extract(a, t)))); + } + + private ScheduleImpl, A, Tuple2> doZip(ScheduleImpl other) { + return ScheduleImpl., A, Tuple2>of( + tuple(this.initial, other.initial), + (a, st) -> { + IO> self = this.update(a, st._1()); + IO> next = other.update(a, st._2()); + return IO.parMap2(self, next, (x, y) -> map2(x, y, Tuple::of)); + }, + (a, st) -> Tuple.of( + this.extract(a, st._1), + other.extract(a, st._2()))); + } + + private ScheduleImpl, A, C> doCompose(ScheduleImpl other) { + return ScheduleImpl., A, C>of( + tuple(this.initial, other.initial), + (a, st) -> { + IO> self = this.update(a, st._1()); + IO> next = other.update(this.extract(a, st._1()), st._2()); + return IO.parMap2(self, next, (x, y) -> map2(x, y, Tuple::of)); + }, + (a, st) -> other.extract(this.extract(a, st._1()), st._2())); + } + + private static IO> tuple(IO a, IO b) { + return IO.parMap2(a, b, Tuple::of); + } + + private static Either map2(Either a, Either b, Function2 map) { + return a.flatMap(x -> b.map(y -> map.apply(x, y))); + } + + private static A merge(Either either) { + return either.fold(Function1.identity(), Function1.identity()); + } + + private ScheduleImpl updated(Function1, Update> update) { + return ScheduleImpl.of(initial, update.apply(this.update), this.extract); + } + + public static ScheduleImpl of( + IO initial, + Update update, + Extract extract) { + return new ScheduleImpl<>(initial, update, extract); + } +}