FS2 Stream 的制作方式是多样化的
使用纯函数式的流式IO库FS2,尝试创建各种不同的流。
首先
希望在Scala开发中,更多地使用诸如http4s和doobie这样的函数式库,但感觉它们的普及程度逐渐增加,但同时也有一些发展不够顺利的感觉。
假设是这样的话,考虑一下原因,不仅仅是函数式编程本身的难度,还有使用http4s和doobie所使用的流式I/O库FS2的复杂性也可能是其中一个原因。
因此,我会尝试写一篇适合FS2初学者的文章。我觉得只要对Cats/Cats Effect有一点了解就可以读懂。
为了使用流(stream)作为一个切入点,首先必须能够获取到流,所以在这里我只收集了有关创建流的方法。在这个过程中,希望能更容易地形象化流是什么样的。
★ 我把Scala 3系和Cats 3系重写到2022/06/21。
取向
尝试从各种数据源创建类型为 Stream[F, String] 的流。
-
- 単一の値
-
- 複数の値
-
- Either
-
- エフェクトF[_]内の値
-
- 標準入力
-
- ファイル
-
- タイマー
-
- DBへのクエリ結果
-
- HTTPレスポンス
- Kafka コンシューマ
作为指导方针,我们需要为以下共享代码中的抽象方法stream在每个数据源中实现,并在run方法中取出并显示前5个元素到标准输出(源代码)。
trait StreamDemoApp extends IOApp.Simple:
def run: IO[Unit] = stream[IO]
.take(5)
.map(s => s"$s\n")
.through(text.utf8.encode)
.through(io.stdout)
.compile.drain
def stream[F[_]: Async] : Stream[F, String]
Scala和库的版本在这一块
实施
从一个值开始
首先,试着从给定的一个值开始创建一个流。
使用emit来从给定的值创建一个只包含一个元素的流。
Stream.emit("apple")
如果不加上这样的类型注释,类型将变成 Stream[Pure, String],但由于类型 Pure[A] <: Nothing 的定义,可以转换为任意的 F[_] 的 Stream[F, String]。
要创建一个重复流传递给定值的流,可以使用”constant”。
Stream.constant("apple")
如果只是这样,它将成为一个无限重复输出”apple”的流,但在之前给出的共通代码上运行时,将输出”apple”五次然后结束。
来源
从多个值中
fs2.Stream的构造函数接受可变参数,因此可以使用它来创建按顺序流动给定元素的流。
Stream("apple", "banana", "chocolate")
通过使用emits方法,可以从Seq上的多个元素创建一个流。举一个经典的菲波那切数列的例子如下。
val fib = LazyList.iterate((0, 1))((a, b) => (b, a + b)).take(100)
Stream.emits(fib).map(_._1.show)
使用Stream.fromIterator可以将迭代器转换为流。在下面的示例代码中,我们从文件中获取流,在scala.io.Source#getLines中获取迭代器,然后创建流。
来源 (Traditional Chinese: 來源; Pinyin:
无论 从哪里 / 从哪个方面 / 无论哪一个
使用 Stream.fromEither 可以从 Either[Throwable, A] 中获取到一个流。要从 Right 值中获取 Stream[F, String],可以按以下方式操作。
Stream.fromEither[F]("hello".asRight)
从左侧的值得到的流与之前的共同代码一样,但如果直接在前面的共同代码中进行评估,将会抛出运行时异常。
Stream.fromEither[F](Exception("test").asLeft)
事实上,fromEither的F[_] 是基于ApplicativeError的,异常被保留在ApplicativeError[F] 的上下文中,因此可以在.compile.drain等操作后,将其转换为F[_],再使用Cats的applicativeError语法进行处理。例如,可以如下所示:
object FromEitherIO extends IOApp.Simple:
def run: IO[Unit] =
Stream.fromEither[IO](Exception("test").asLeft)
.compile.drain
.handleErrorWith(_.getMessage pipe IO.println)
来源
请用中文略述以下内容,只需一种选项:
方案[A]
要从进入效果F[_]的A中获得Stream[F, A],可以使用Stream.eval。
例如,使用Stream.eval(IO(“Hello”)),可以得到类型为Stream[IO, String]的流。
还提供了一个将F[Seq[_]]作为值域的evalSeq函数,可以写成如下所示的形式。
val strings: F[List[String]] = Sync[F].delay(List("apple", "banana", "chocolate"))
Stream.evalSeq(strings)
来源
从标准输入读取
为了从标准输入获得流,请使用 fs2-io 模块中的 fs2.io.stdin* 方法。以下代码将直接回显输入的字符串。5
io.stdinUtf8(4096).through(text.lines)
There’s no context provided for the word “ソース,” so I will provide a few possible translations in Chinese:
1. 原料
2. 调料
3. 酱汁 zhī)
4. 源代码 mǎ)
Please provide more context if you want a more accurate translation.
从文件中
例如,
将文件中的文本逐行分割为Stream[F, String]。请确保使用后文件被可靠关闭。
在这种情况下,可以使用Cats Effect 的Resource。
如果 F[A] 是 F[_]: Sync,并且 A 是 AutoCloseable 的子类型,则可以使用 Resource.fromAutoCloseable 从 F[A] 中生成 Resource,并且所得到的 Resource 可以通过 Stream.resource 转换为流。
例如,可以用以下方式写作。
val file: Resource[F, Source] =
Resource.fromAutoCloseable(sync[F].delay(Source.fromFile("README.md")))
Stream.resource(file) >>= (s => Stream.fromIterator(s.getLines, 4096))
这是只要一种选择: 翻译以下内容为中文原生语言:
源代码
从定时器开始
例如,可以将诸如“进行2秒的延迟并发出延迟前后的时间”的任务表示为流。
val currentSec: F[String] =
Clock[F].realTime.map(n => (n.toSeconds % 60).toString)
val task: Stream[F, String] = for {
s <- Stream.eval(currentSec) // 開始秒
_ <- Stream.sleep[F](2 seconds) // 2秒スリープ
e <- Stream.eval(currentSec) // 終了秒
} yield s"$s ---> $e" // 開始秒と終了秒を書式化
另外,使用FS2的计时器相关方法还可以实现定期执行,例如使用fixedDelay,可以这样写。
Stream.fixedDelay[F](3 seconds) zipRight task.repeat // タスクが終了するごとに3秒待つ
将任务重复为无限流,使用3秒间隔的fixedDelay和zipRight进行操作,执行后将产生以下结果。
13 ---> 15
18 ---> 20
23 ---> 25
28 ---> 30
33 ---> 35
中文翻译: 这个答案来源
从DB查询结果中
通过使用 Doobie,可以将数据库查询结果作为流进行获取。例如,可以按以下方式编写代码。
val xa = Transactor.fromDriverManager[F](
"org.postgresql.Driver", // driver classname
"jdbc:postgresql:world", // connect URL (driver-specific)
"postgres", // user
"" // password
)
sql"select name from country where population > 50000000 order by population desc"
.query[String] // Query0[String]
.stream // Stream[ConnectionIO, String]
.transact(xa) // Stream[F, String]
※ 使用doobie在PostgreSQL上的world DB示例。设置步骤在这里。
可以只提供一种选择的汉语本地释义:调料
从HTTP响应中
试着使用http4s的客户端将Twitter的示例响应转化为流。
val req = Request[F](Method.GET, uri"https://stream.twitter.com/1.1/statuses/sample.json")
// リクエストに署名するメソッド(詳細は下に別記)
val sign: Request[F] => F[Request[F]] = ???
for {
client <- BlazeClientBuilder[F].stream // Blaze クライアントを得る
signedReq <- Stream.eval(sign(req)) // eval で F[Request] から Stream[F, Request]に
res <- client.stream(signedReq) // Stream[F, Response[F]] を得る
.flatMap(_.body.chunks.parseJsonStream) // Stream[F, Json]を得る
} yield res.spaces2 // いい感じに文字列化する
可以通过以下代码对请求进行签名。
val env = (F:Async[F]) ?=> (key:String) => F.delay(
sys.env.get(key).toRight(RuntimeException(s”no env value for key: $key”))
) >>= F.fromEitherval sign: Request[F] => F[Request[F]] = req => for {
consumerKey <- env(“consumerKey”)
consumerSecret <- env(“consumerSecret”)
accessToken <- env(“accessToken”)
accessSecret <- env(“accessSecret”)
signedReq <- oauth1.signRequest(
req = req,
consumer = Consumer(consumerKey, consumerSecret),
token = Token(accessToken, accessSecret).some,
realm = None,
timestampGenerator = Timestamp.now,
nonceGenerator = Nonce.now)
} yield signedReq
要实际运行,需要在Twitter开发者账户中创建一个App,并将consumerKey、consumerSecret、accessToken、accessSecret设置为环境变量。
原文:
ソース
汉语翻译:
酱汁
从Kafka Consumer
使用fs2-kafka可以从Kafka Consumer中获取流。例如,可以使用以下代码。
val consumerSettings = ConsumerSettings[F, String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")
KafkaConsumer.stream(consumerSettings)
.evalTap(_.subscribeTo("topic"))
.flatMap(_.stream) // ここで KafkaConsumer から stream を得る
.map(c => s"${c.record.key}->${c.record.value}")
运行上述代码后会进入等待状态,例如在Python的REPL上创建一个Producer发送消息,它会逐个读取并在控制台上显示,处理完5条消息后会结束。下面的代码是使用kafka-python的示例。
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')
>>> producer.send('topic', key=b'kkk', value=bytes(f"hello", 'utf-8'))
只需在docker-compose.yaml中写入以下内容,即可通过`$ docker-compose up -d` 快捷方便地启动。
services:
…
zookeeper:
image: wurstmeister/zookeeper
ports:
– “2181:2181”
kafka:
image: wurstmeister/kafka
ports:
– “9092:9092”
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
原文中没有足够的上下文,因此我无法提供准确的中文翻译。请提供更多信息或重新提出问题。
我觉得在我学习《FP in Scala》时,第15章“流处理和增量I/O”的内容尤为困难。
在旧版本的代码中,我们需要继承IOApp类,但在Cats Effect 3中,我们可以使用IOApp.Simple,这样会简洁一些。
在旧版fs2中,为了使用io.stdin,我们需要使用Blocker,但现在不再需要。