* Fiber
```scala=
class Fiber(
context: FiberContext,
callStack: Stack[Any => IO[Any]]
) {
def run(io: IO[Any]): Unit = {
while (currIteration < maxIterations) {
// Паттерн матчинг по типу эффекта,
// с заполнением стэка вызовов
// и добровольным освобождением треда при смене executor'a
// или семантической блокироке
}
// Добровольное освобождение треда
}
}
class FiberContext(
id: FiberId,
scope: Scope,
localValues: Map[Any, Any]
)
```
* ZIO Runtime
```scala=
trait Runtime[+R] {
def environment: ZEnvironment[R]
def fiberRefs: FiberRefs
def runtimeFlags: RuntimeFlags
trait UnsafeAPI {
def run[E, A](zio: ZIO[R, E, A]): Exit[E, A]
}
}
object Runtime {
def apply[R](
r: ZEnvironment[R],
fiberRefs0: FiberRefs,
runtimeFlags0: RuntimeFlags
): Runtime[R]
}
```
* Fiber в ZIO
```scala=
type ZResult = ZIO[Any, Throwable, Int]
val someComputation: ZResult = ???
val anotherComputation: ZResult = ???
for {
fiber1 <- someComputation.fork // UIO[Fiber[Throwable, Int]]
fiber2 <- anotherComputation.fork
result1 <- fiber1.join // Int
result2 <- fiber2.join // Int
} yield result1 + result2
```
```scala=
for {
first <- someComputation.fork
_ <- ZIO.fail(new RuntimeException())
result <- first.join
} yield result
```
```scala=
for {
tuple <- someComputation.zipPar(anotherComputation)
(result1, result2) = t
} yield result1 + result2
```
```scala=
type ZResult = ZIO[Any, Throwable, Int]
val someComputation: ZResult = ???
val anotherComputation: ZResult = ???
val parallelComputation =
for {
fiber1 <- someComputation.fork // UIO[Fiber[Throwable, Int]]
fiber2 <- anotherComputation.fork
result1 <- fiber1.join // Int
result2 <- fiber2.join // Int
} yield result1 + result2
parallelComputation.fork.flatMap { fiber =>
clock.sleep(1.second).flatMap(_ => fiber.interrupt) // interrupt здесь отменит выполнение fiber1 и fiber2 тоже
}
```
* Файберы с расписанием или кастомным скоупом в ZIO
```scala=
val customScope: ZScope[Exit[Any, Any]] = ??? // в ZIO2 ZScope переехал в новую структуру Scope, похожую на ZManaged
val fiberOnCustomScope: UIO[Fiber[Throwable, Unit]] =
ZIO.effect(println("Running on custom scope"))
.forkIn(customScope)
val fiberOnGlobalScope: UIO[Fiber[Throwable, Unit]] =
ZIO.effect(println("Daemon running"))
.delay(1.second)
.forever
.forkDaemon
```
* Fiber run loop в ZIO
```scala=
def run[E](zio: ZIO[Any, R, Any]) = {
while (curIteration < 2048) {
val tag = zio.tag
tag match {
case ZIO.Tags.FlatMap =>
// Трансформация
case ZIO.Tags.Fork =>
// Создание нового файбера и отправка в Executor
case ZIO.Tags.Fail =>
// Разворот стека в попытке найти ZIO.Fold
case ZIO.Tags.Yield =>
// Добровольно отдать тред
case ZIO.Tags.Shift =>
// Отправить вычисление в другой Executor и отдать тред
...
}
}
// Добровольно отдать тред
}
```
* Блокирущие операции в ZIO
```scala=
import zio.blocking._
val blockingInterruptible =
effectBlockingInterrupt{ Thread.sleep(3000) }
val blockingCancelable =
effectBlockingCancelable(
effect = socket.open()
)(cancel = socket.close())
```
* Cats Effect(CE) 2 runtime
```scala=
trait IOApp {
def run(args: List[String]): IO[ExitCode]
def main(args: Array[String]): Unit =
IOAppPlatform.main(
args,
Eval.later(contextShift),
Eval.later(timer)
)(run)
implicit protected def contextShift: ContextShift[IO] =
IOAppPlatform.defaultContextShift
implicit protected def timer: Timer[IO] =
IOAppPlatform.defaultTimer
protected def executionContext: ExecutionContext =
IOAppPlatform.defaultExecutionContext
}
```
* Cats Effect 3 runtime
```scala=
final class IORuntime(
val compute: ExecutionContext,
val blocking: ExecutionContext,
val scheduler: Scheduler,
val fiberMonitor: FiberMonitor,
val shutdown: () => Unit,
val config: IORuntimeConfig
)
```
* Fiber в CE
```scala=
val someComputation: IO[Int] = ???
val anotherComputation: IO[Int] = ???
for {
fiber1 <- someComputation.start // IO[Fiber[IO, Int]]
fiber2 <- anotherComputation.start
res1 <- fiber1.join
res2 <- fiber2.join
} yield res1 + res2
```
* Fiber run loop CE2
```scala=
def run[E](io: IO[Any], callStack: Stack[Any => IO[Any]]): Unit = {
while (true) {
io match {
case Pure(v) => // Взять из стека следующее вычисление
case ContextSwitch(...) => // Продолжить вычисление на другом Executor
case Async(...) => // Сохранить колбек и отдать тред
case RaiseError(e) => // Найти обработчик ошибок или вернуть эту ошибку
}
}
}
```
* CE2 Блокирующие операции
```scala=
class Blocker(ec: ExecutionContext) {
def blockOn[A](io: IO[A])(implicit cs: ContextShift[IO]): IO[A] =
cs.blockOn(this)(io)
}
implicit val cs: ContextShift[IO] = ???
val blocker: Blocker[IO] = ???
val jdbcRequest: IO[Int] = ???
blocker.blockOn(jdbcRequest)
```
* CE3 Блокирующие операции
```scala=
val nonInterruptible =
Sync[IO].blocking(println("blocking non interruptible"))
val interruptibleOnce =
Sync[IO].interruptible(
println("blocking interruptible one try")
)
val interruptibleMany =
Sync[IO].interruptibleMany(
println("blocking interruptible many tries")
)
```
```scala=
trait Async[F[_]] {
def evalOn[A](fa: F[A], ec: ExecutionContext): F[A]
def executionContext: F[ExecutionContext]
}
```