* Fiber ```scala= class Fiber( context: FiberContext, callStack: Stack[Any =&gt; IO[Any]] ) { def run(io: IO[Any]): Unit = { while (currIteration &lt; maxIterations) { // Паттерн матчинг по типу эффекта, // с заполнением стэка вызовов // и добровольным освобождением треда при смене executor'a // или семантической блокироке } // Добровольное освобождение треда } } class FiberContext( id: FiberId, scope: Scope, localValues: Map[Any, Any] ) ``` * ZIO Runtime ```scala= trait Runtime[+R] { def environment: ZEnvironment[R] def fiberRefs: FiberRefs def runtimeFlags: RuntimeFlags trait UnsafeAPI { def run[E, A](zio: ZIO[R, E, A]): Exit[E, A] } } object Runtime { def apply[R]( r: ZEnvironment[R], fiberRefs0: FiberRefs, runtimeFlags0: RuntimeFlags ): Runtime[R] } ``` * Fiber в ZIO ```scala= type ZResult = ZIO[Any, Throwable, Int] val someComputation: ZResult = ??? val anotherComputation: ZResult = ??? for { fiber1 <- someComputation.fork // UIO[Fiber[Throwable, Int]] fiber2 <- anotherComputation.fork result1 <- fiber1.join // Int result2 <- fiber2.join // Int } yield result1 + result2 ``` ```scala= for { first <- someComputation.fork _ <- ZIO.fail(new RuntimeException()) result <- first.join } yield result ``` ```scala= for { tuple <- someComputation.zipPar(anotherComputation) (result1, result2) = t } yield result1 + result2 ``` ```scala= type ZResult = ZIO[Any, Throwable, Int] val someComputation: ZResult = ??? val anotherComputation: ZResult = ??? val parallelComputation = for { fiber1 <- someComputation.fork // UIO[Fiber[Throwable, Int]] fiber2 <- anotherComputation.fork result1 <- fiber1.join // Int result2 <- fiber2.join // Int } yield result1 + result2 parallelComputation.fork.flatMap { fiber => clock.sleep(1.second).flatMap(_ => fiber.interrupt) // interrupt здесь отменит выполнение fiber1 и fiber2 тоже } ``` * Файберы с расписанием или кастомным скоупом в ZIO ```scala= val customScope: ZScope[Exit[Any, Any]] = ??? // в ZIO2 ZScope переехал в новую структуру Scope, похожую на ZManaged val fiberOnCustomScope: UIO[Fiber[Throwable, Unit]] = ZIO.effect(println("Running on custom scope")) .forkIn(customScope) val fiberOnGlobalScope: UIO[Fiber[Throwable, Unit]] = ZIO.effect(println("Daemon running")) .delay(1.second) .forever .forkDaemon ``` * Fiber run loop в ZIO ```scala= def run[E](zio: ZIO[Any, R, Any]) = { while (curIteration &lt; 2048) { val tag = zio.tag tag match { case ZIO.Tags.FlatMap =&gt; // Трансформация case ZIO.Tags.Fork =&gt; // Создание нового файбера и отправка в Executor case ZIO.Tags.Fail =&gt; // Разворот стека в попытке найти ZIO.Fold case ZIO.Tags.Yield =&gt; // Добровольно отдать тред case ZIO.Tags.Shift =&gt; // Отправить вычисление в другой Executor и отдать тред ... } } // Добровольно отдать тред } ``` * Блокирущие операции в ZIO ```scala= import zio.blocking._ val blockingInterruptible = effectBlockingInterrupt{ Thread.sleep(3000) } val blockingCancelable = effectBlockingCancelable( effect = socket.open() )(cancel = socket.close()) ``` * Cats Effect(CE) 2 runtime ```scala= trait IOApp { def run(args: List[String]): IO[ExitCode] def main(args: Array[String]): Unit = IOAppPlatform.main( args, Eval.later(contextShift), Eval.later(timer) )(run) implicit protected def contextShift: ContextShift[IO] = IOAppPlatform.defaultContextShift implicit protected def timer: Timer[IO] = IOAppPlatform.defaultTimer protected def executionContext: ExecutionContext = IOAppPlatform.defaultExecutionContext } ``` * Cats Effect 3 runtime ```scala= final class IORuntime( val compute: ExecutionContext, val blocking: ExecutionContext, val scheduler: Scheduler, val fiberMonitor: FiberMonitor, val shutdown: () => Unit, val config: IORuntimeConfig ) ``` * Fiber в CE ```scala= val someComputation: IO[Int] = ??? val anotherComputation: IO[Int] = ??? for { fiber1 <- someComputation.start // IO[Fiber[IO, Int]] fiber2 <- anotherComputation.start res1 <- fiber1.join res2 <- fiber2.join } yield res1 + res2 ``` * Fiber run loop CE2 ```scala= def run[E](io: IO[Any], callStack: Stack[Any => IO[Any]]): Unit = { while (true) { io match { case Pure(v) => // Взять из стека следующее вычисление case ContextSwitch(...) => // Продолжить вычисление на другом Executor case Async(...) => // Сохранить колбек и отдать тред case RaiseError(e) => // Найти обработчик ошибок или вернуть эту ошибку } } } ``` * CE2 Блокирующие операции ```scala= class Blocker(ec: ExecutionContext) { def blockOn[A](io: IO[A])(implicit cs: ContextShift[IO]): IO[A] = cs.blockOn(this)(io) } implicit val cs: ContextShift[IO] = ??? val blocker: Blocker[IO] = ??? val jdbcRequest: IO[Int] = ??? blocker.blockOn(jdbcRequest) ``` * CE3 Блокирующие операции ```scala= val nonInterruptible = Sync[IO].blocking(println("blocking non interruptible")) val interruptibleOnce = Sync[IO].interruptible( println("blocking interruptible one try") ) val interruptibleMany = Sync[IO].interruptibleMany( println("blocking interruptible many tries") ) ``` ```scala= trait Async[F[_]] { def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] def executionContext: F[ExecutionContext] } ```