FS2 Stream 的制作方式是多样化的

使用纯函数式的流式IO库FS2,尝试创建各种不同的流。

首先

希望在Scala开发中,更多地使用诸如http4s和doobie这样的函数式库,但感觉它们的普及程度逐渐增加,但同时也有一些发展不够顺利的感觉。

假设是这样的话,考虑一下原因,不仅仅是函数式编程本身的难度,还有使用http4s和doobie所使用的流式I/O库FS2的复杂性也可能是其中一个原因。

因此,我会尝试写一篇适合FS2初学者的文章。我觉得只要对Cats/Cats Effect有一点了解就可以读懂。

为了使用流(stream)作为一个切入点,首先必须能够获取到流,所以在这里我只收集了有关创建流的方法。在这个过程中,希望能更容易地形象化流是什么样的。

★ 我把Scala 3系和Cats 3系重写到2022/06/21。

取向

尝试从各种数据源创建类型为 Stream[F, String] 的流。

    • 単一の値

 

    • 複数の値

 

    • Either

 

    • エフェクトF[_]内の値

 

    • 標準入力

 

    • ファイル

 

    • タイマー

 

    • DBへのクエリ結果

 

    • HTTPレスポンス

 

    Kafka コンシューマ

作为指导方针,我们需要为以下共享代码中的抽象方法stream在每个数据源中实现,并在run方法中取出并显示前5个元素到标准输出(源代码)。

trait StreamDemoApp extends IOApp.Simple:
  def run: IO[Unit] = stream[IO]
    .take(5)
    .map(s => s"$s\n")
    .through(text.utf8.encode)
    .through(io.stdout)
    .compile.drain

  def stream[F[_]: Async] : Stream[F, String]

Scala和库的版本在这一块

实施

从一个值开始

首先,试着从给定的一个值开始创建一个流。

使用emit来从给定的值创建一个只包含一个元素的流。

Stream.emit("apple")

如果不加上这样的类型注释,类型将变成 Stream[Pure, String],但由于类型 Pure[A] <: Nothing 的定义,可以转换为任意的 F[_] 的 Stream[F, String]。

要创建一个重复流传递给定值的流,可以使用”constant”。

Stream.constant("apple")

如果只是这样,它将成为一个无限重复输出”apple”的流,但在之前给出的共通代码上运行时,将输出”apple”五次然后结束。

来源

从多个值中

fs2.Stream的构造函数接受可变参数,因此可以使用它来创建按顺序流动给定元素的流。

Stream("apple", "banana", "chocolate")

通过使用emits方法,可以从Seq上的多个元素创建一个流。举一个经典的菲波那切数列的例子如下。

val fib = LazyList.iterate((0, 1))((a, b) => (b, a + b)).take(100)
Stream.emits(fib).map(_._1.show)

使用Stream.fromIterator可以将迭代器转换为流。在下面的示例代码中,我们从文件中获取流,在scala.io.Source#getLines中获取迭代器,然后创建流。

来源 (Traditional Chinese: 來源; Pinyin:

无论 从哪里 / 从哪个方面 / 无论哪一个

使用 Stream.fromEither 可以从 Either[Throwable, A] 中获取到一个流。要从 Right 值中获取 Stream[F, String],可以按以下方式操作。

Stream.fromEither[F]("hello".asRight)

从左侧的值得到的流与之前的共同代码一样,但如果直接在前面的共同代码中进行评估,将会抛出运行时异常。

Stream.fromEither[F](Exception("test").asLeft)

事实上,fromEither的F[_] 是基于ApplicativeError的,异常被保留在ApplicativeError[F] 的上下文中,因此可以在.compile.drain等操作后,将其转换为F[_],再使用Cats的applicativeError语法进行处理。例如,可以如下所示:

object FromEitherIO extends IOApp.Simple:
  def run: IO[Unit] =
    Stream.fromEither[IO](Exception("test").asLeft)
      .compile.drain
      .handleErrorWith(_.getMessage pipe IO.println)

来源

请用中文略述以下内容,只需一种选项:

方案[A]

要从进入效果F[_]的A中获得Stream[F, A],可以使用Stream.eval。
例如,使用Stream.eval(IO(“Hello”)),可以得到类型为Stream[IO, String]的流。

还提供了一个将F[Seq[_]]作为值域的evalSeq函数,可以写成如下所示的形式。

val strings: F[List[String]] = Sync[F].delay(List("apple", "banana", "chocolate"))
Stream.evalSeq(strings)

来源

从标准输入读取

为了从标准输入获得流,请使用 fs2-io 模块中的 fs2.io.stdin* 方法。以下代码将直接回显输入的字符串。5

io.stdinUtf8(4096).through(text.lines)

There’s no context provided for the word “ソース,” so I will provide a few possible translations in Chinese:

1. 原料
2. 调料
3. 酱汁 zhī)
4. 源代码 mǎ)

Please provide more context if you want a more accurate translation.

从文件中

例如,

将文件中的文本逐行分割为Stream[F, String]。请确保使用后文件被可靠关闭。

在这种情况下,可以使用Cats Effect 的Resource。

如果 F[A] 是 F[_]: Sync,并且 A 是 AutoCloseable 的子类型,则可以使用 Resource.fromAutoCloseable 从 F[A] 中生成 Resource,并且所得到的 Resource 可以通过 Stream.resource 转换为流。

例如,可以用以下方式写作。

val file: Resource[F, Source] =
  Resource.fromAutoCloseable(sync[F].delay(Source.fromFile("README.md")))

Stream.resource(file) >>= (s => Stream.fromIterator(s.getLines, 4096))

这是只要一种选择: 翻译以下内容为中文原生语言:
源代码

从定时器开始

例如,可以将诸如“进行2秒的延迟并发出延迟前后的时间”的任务表示为流。

val currentSec: F[String] =
  Clock[F].realTime.map(n => (n.toSeconds % 60).toString)

val task: Stream[F, String] = for {
  s <- Stream.eval(currentSec)    // 開始秒
  _ <- Stream.sleep[F](2 seconds) // 2秒スリープ
  e <- Stream.eval(currentSec)    // 終了秒
} yield s"$s ---> $e"             // 開始秒と終了秒を書式化

另外,使用FS2的计时器相关方法还可以实现定期执行,例如使用fixedDelay,可以这样写。

Stream.fixedDelay[F](3 seconds) zipRight task.repeat // タスクが終了するごとに3秒待つ

将任务重复为无限流,使用3秒间隔的fixedDelay和zipRight进行操作,执行后将产生以下结果。

13 ---> 15
18 ---> 20
23 ---> 25
28 ---> 30
33 ---> 35

中文翻译: 这个答案来源

从DB查询结果中

通过使用 Doobie,可以将数据库查询结果作为流进行获取。例如,可以按以下方式编写代码。

val xa = Transactor.fromDriverManager[F](
  "org.postgresql.Driver", // driver classname
  "jdbc:postgresql:world", // connect URL (driver-specific)
  "postgres",              // user
  ""                       // password
)
sql"select name from country where population > 50000000 order by population desc"
  .query[String] // Query0[String]
  .stream        // Stream[ConnectionIO, String]
  .transact(xa)  // Stream[F, String]

※ 使用doobie在PostgreSQL上的world DB示例。设置步骤在这里。

可以只提供一种选择的汉语本地释义:调料

从HTTP响应中

试着使用http4s的客户端将Twitter的示例响应转化为流。

val req = Request[F](Method.GET, uri"https://stream.twitter.com/1.1/statuses/sample.json")

// リクエストに署名するメソッド(詳細は下に別記)
val sign: Request[F] => F[Request[F]] = ???

for {
  client    <- BlazeClientBuilder[F].stream // Blaze クライアントを得る
  signedReq <- Stream.eval(sign(req))   // eval で F[Request] から Stream[F, Request]に
  res       <- client.stream(signedReq) // Stream[F, Response[F]] を得る
                 .flatMap(_.body.chunks.parseJsonStream) // Stream[F, Json]を得る
} yield res.spaces2 // いい感じに文字列化する
【補足】如何签名请求
可以通过以下代码对请求进行签名。
val env = (F:Async[F]) ?=> (key:String) => F.delay(
sys.env.get(key).toRight(RuntimeException(s”no env value for key: $key”))
) >>= F.fromEitherval sign: Request[F] => F[Request[F]] = req => for {
consumerKey <- env(“consumerKey”)
consumerSecret <- env(“consumerSecret”)
accessToken <- env(“accessToken”)
accessSecret <- env(“accessSecret”)
signedReq <- oauth1.signRequest(
req = req,
consumer = Consumer(consumerKey, consumerSecret),
token = Token(accessToken, accessSecret).some,
realm = None,
timestampGenerator = Timestamp.now,
nonceGenerator = Nonce.now)
} yield signedReq

要实际运行,需要在Twitter开发者账户中创建一个App,并将consumerKey、consumerSecret、accessToken、accessSecret设置为环境变量。

原文:
ソース

汉语翻译:
酱汁

从Kafka Consumer

使用fs2-kafka可以从Kafka Consumer中获取流。例如,可以使用以下代码。

val consumerSettings = ConsumerSettings[F, String, String]
  .withAutoOffsetReset(AutoOffsetReset.Earliest)
  .withBootstrapServers("localhost:9092")
  .withGroupId("group")

 KafkaConsumer.stream(consumerSettings)
   .evalTap(_.subscribeTo("topic"))
   .flatMap(_.stream)  // ここで KafkaConsumer から stream を得る
   .map(c => s"${c.record.key}->${c.record.value}")

运行上述代码后会进入等待状态,例如在Python的REPL上创建一个Producer发送消息,它会逐个读取并在控制台上显示,处理完5条消息后会结束。下面的代码是使用kafka-python的示例。

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')
>>> producer.send('topic', key=b'kkk', value=bytes(f"hello", 'utf-8'))
【補足】以Docker方式启动Kafka的方法
只需在docker-compose.yaml中写入以下内容,即可通过`$ docker-compose up -d` 快捷方便地启动。
services:

zookeeper:
image: wurstmeister/zookeeper
ports:
– “2181:2181”
kafka:
image: wurstmeister/kafka
ports:
– “9092:9092”
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

原文中没有足够的上下文,因此我无法提供准确的中文翻译。请提供更多信息或重新提出问题。

在2019年3月的一项调查中,Doobie首次成为RDBMS库部门的最受欢迎选择。在同一项调查中,http4s/fs2的票数仍然低于akka http/streams。顺便提一句,在日本的Scala项目/招聘信息中,akka http甚至还不如Play占大多数。

我觉得在我学习《FP in Scala》时,第15章“流处理和增量I/O”的内容尤为困难。

在旧版本的代码中,我们需要继承IOApp类,但在Cats Effect 3中,我们可以使用IOApp.Simple,这样会简洁一些。

在旧版fs2中,为了使用io.stdin,我们需要使用Blocker,但现在不再需要。

广告
将在 10 秒后关闭
bannerAds