flink kafka offset怎么联接kafka的参数

Flink(29)
原文地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#passing-parameters-to-functions
Passing Parameters to Functions
参数可以使用构造函数或者withParameters(Configuration)方法传递,参数将会作为函数对象的一部分被序列化并传递到task实例中!
1 使用构造函数方式:
package com.daxin
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
* Created by Daxin on .
object PassingParameters2Functions1 {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val toFilter = env.fromElements(1, 2, 3)
class MyFilter(limit: Int) extends FilterFunction[Int] {
override def filter(value: Int): Boolean = {
value & limit
val result =toFilter.filter(new MyFilter(2))
result.print()
2 withParameters(Configuration)方式
这个方法将会携带一个Configuration对象作为参数,这个参数将会传递给Rich Function的open方法(关于Rich Function参见:)。Configuration对象是一个Map,存储Key/Value键值对.
package com.daxin
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
* Created by Daxin on .
* 我的邮箱:
object PassingParameters2Functions2 {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val toFilter = env.fromElements(1, 2, 3)
val c = new Configuration()
c.setInteger("limit", 2)
val result = toFilter.filter(new RichFilterFunction[Int]() {
var limit = 0
override def open(config: Configuration): Unit = {
limit = config.getInteger("limit", 0) //没有的话返回默认值0
def filter(in: Int): Boolean = {
in & limit
}).withParameters(c)
result.print()
3 使用全局的the ExecutionConfig方式:
package com.daxin
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
* Created by Daxin on .
* 传递参数3:Globally via the ExecutionConfig
object PassingParameters2Functions3 {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("1", "2")
val conf = new Configuration()
conf.setString("mykey", "2")
env.getConfig.setGlobalJobParameters(conf)
class RichFunc extends RichMapFunction[String, String] {
var mykey: String = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val globalParams: ExecutionConfig.GlobalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
val globConf = globalParams.asInstanceOf[Configuration]
mykey = globConf.getString("mykey", "default")
override def map(value: String): String = {
if (mykey.equals(value)) "is equals" else "not equals"
data.map(new RichFunc).print()随笔 - 385
评论 - 193关注社区微信公众号: PMvideo
Flink的Apache Kafka连接器
这个连接器提供了对由提供的事件流的访问。
Flink 提供了特殊的Kafka Connectors来从Kafka topic中读取数据或者将数据写入到Kafkatopic中,Flink的Kafka Consumer与Flink的检查点机制相结合,提供exactly-once处理语义。为了做到这一点,Flink并不完全依赖于Kafka的consumer组的offset跟踪,而是在自己的内部去跟踪和检查。
请根据你自己的使用情况和环境来选择一个包和类名,对于大部分用户,FlinkKafkaConsumer08(flink-connector-kafka的一部分)是最合适的。
Maven Dependency
Supported since
Consumer and Producer Class name
Kafka version
flink-connector-kafka-0.8_2.10
FlinkKafkaConsumer08 FlinkKafkaProducer08
API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.9_2.10
FlinkKafkaConsumer09 FlinkKafkaProducer09
Uses the new
flink-connector-kafka-0.10_2.10
FlinkKafkaConsumer010 FlinkKafkaProducer010
This connector supports
both for producing and consuming.
然后,在你的工程中导入connector
&dependency&
&groupId&org.apache.flink&/groupId&
&artifactId&flink-connector-kafka-0.8_2.10&/artifactId&
&version&1.3.0&/version&
&/dependency&
注意,目前streaming的connectors还不是Flink二进制发布包的一部分,请参考来了解在集群执行中与它们连接在一起。
安装Apache Kafka
·按照Kafka 的说明,来下载代码和启动服务(每次启动一个应用前都需要启动一个Zookeeper和一个kafka服务)·如果Kafka和Zookeeper服务运行在一个远程服务器上,那么config/server.properties中的advertised.host.name配置必须要设置成那台服务器的IP地址。
Kafka Consumer
Flink的kafka consumer叫做FlinkKafkaConsumer08(对于Kafka 0.9.0.X来说是09 等),它提供了对一个或者多个Kafka topic的访问。FlinkKafkaConsumer08、09等的构造函数接收以下参数:  1、topic名称或者名称列表  2、反序列化来自kafka的数据的DeserializationSchema/KeyedDeserializationSchema  3、Kafka consumer的一些配置,下面的配置是必需的:      "bootstrap.servers"(以逗号分隔的Kafka brokers列表)      "zookeeper.connect"(以逗号分隔的Zookeeper 服务器列表)      "group.id"(consumer组的id)
例如:Java 代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream&String& stream = env
.addSource(new FlinkKafkaConsumer08&&("topic", new SimpleStringSchema(), properties));
Scala 代码:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
当前FlinkKafkaConsumer的实现会建立一个到Kafka客户端的连接来查询topic的列表和分区。
为此,consumer需要能够访问到从提交Job任务的服务器到Flink服务器的consumer,如果你在客户端遇到任何Kafka Consumer的问题,你都可以在客户端日志中看到关于请求失败的日志。(这段翻译得不太好,待我查看完源码后再重新翻译)
反序列化模式 DeserializationSchema
Flink的Kafka Consumer需要知道如何将Kafka中的二进制转换成Java或者Scala的对象,而DeserializationSchema则是允许用户来指定这样一个模式,T deserialize(byte[] message)方法被每个kafka消息调用,并传入kafka的值。
从AbstractDeserializationSchema开始是非常有用的,它描述了Java/Scala类型的产生到Flink的类型系统。用户实现基本的DeserializationSchema的话,需要自己去实现getProducedType(...)方法。
为了获取Kafka消息中的key和value,KeyedDeserializationSchema需要有下面这个反序列化方法 T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)为了方便起见,Flink提供了下面的模式:  1、TypeInformationSerializationSchema(以及TypeInformationKeyValueSerializationSchema) ,这个是以Flink的TypeInfoSchema为基础创建的,如果数据的读写都是由Flink来做的话,这是非常有用的,这种Schema是高性能的Flink具体的替代其他通用序列化方法的方法。  2、JsonDeserializationSchema(以及JSONKeyValueDeserializationSchema),它能够将序列化的JSON转换成ObjectNode对象,在这个对象中字段可以通过调用objectNode.get("field").as(Int/String/...)()来获取。keyValue类型的ObjectNode包含一个"key"和一个包含所有字段的"value",同时还有可选的用来展示这些消息的offset/partition/topic的"metadata"字段。
当遇到损坏的消息,无法被序列化时,这里有两个选择,要么在deserialize(...)方法中抛出一个异常,这会导致作业失败和重启,要么返回一个null值,来允许Flink Kafka consumer默默地跳过这些错误消息。值得注意的是,由于consumer的容错性(见下面的详细部分),在处理损坏数据时失败的Job会让consumer再次尝试反序列化损坏的消息。因此,如果反序列化任然失败,那么consumer将陷入不断的失败重启中。
Kafka Consumer 开始位置配置
Flink Kafka Consumer允许配置Kafka分区的开始位置是如何确定的。例如:Java 代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08&String& myConsumer = new FlinkKafkaConsumer08&&(...);
myConsumer.setStartFromEarliest();
// start from the earliest record possible
myConsumer.setStartFromLatest();
// start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream&String& stream = env.addSource(myConsumer);
Scala 代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest()
// start from the earliest record possible
myConsumer.setStartFromLatest()
// start from the latest record
myConsumer.setStartFromGroupOffsets()
// the default behaviour
val stream = env.addSource(myConsumer)
所有版本的Flink Kafka Consumer都有下面的切确配置方法来配置开始位置。  setStartFromGroupOffsets(默认的行为):从consumer 分组(在consumer中group.id的配置项)提交到Kafka broker(Kafka 0.8是Zookeeper)的偏移位置开始读取分区。如果分区中没有偏移位置,那么会采用auto.offset.reset的配置信息。  setStartFromEarliest() /setStartFromLatest():从最早的或者最近的记录开始,在这种模式下,在Kafka中commit的偏移位置将会被忽略并且不会再用作开始位置。
你也可以指定一个确切的偏移位置,Kafka Consumer必须从这个位置开始读取每个分区的信息,代码如下:Java 代码:
Map&KafkaTopicPartition, Long& specificStartOffsets = new HashMap&&();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
Scala 代码:
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上面的例子中配置了consumer从myTopic的分区0,1,2的指定偏移位置开始,这个偏移位置必须是读取每个分区的下一个记录。注意:如果consumer需要读取的分区在给定的偏移信息的map中,没有指定的偏移位置,那么将会在这个特定的分区中采用默认的分组偏移的行为(即采用setStartFromGroupOffsets())。
注意:这些开始位置配置方法并不会影响作业失败自动重启或者通过savepoint手动重启的开始位置,在恢复中,每个Kafka分区的开始位置由保存在savepoint或者checkpoint中的偏移来决定的。
Kafka Consumers 和Fault Tolerance
Flink的checkpoint启用之后,Flink Kafka Consumer将会从一个topic中消费记录并以一致性的方式周期性地检查所有Kafka偏移量以及其他操作的状态。Flink将保存流程序到状态的最新的checkpoint中,并重新从Kafka中读取记录,记录从保存在checkpoint中的偏移位置开始读取.
checkpoint的时间间隔定义了程序在发生故障时可以恢复多少.
为了使用容错性的Kafka Consumer,拓扑结构的checkpoint需要在执行环境中启用,代码如下:Java 代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每5000毫秒检查一次
Scala 代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // 每5000毫秒检查一次
同时需要注意的是Flink只能在有足够的slots时才会去重启topology,所以如果topology由于TaskManager丢失而失败时,任然需要有足够的slot可用。Flink on YARN支持YARN container丢失自动重启。
如果checkpoint不可用,那么Kafka Consumer将会周期性地将offset提交到Zookeeper中。
Kafka Consumer Offset提交行为配置
Flink Kafka Consumer允许配置offset提交回Kafka brokers(Kafka 0.8是写回Zookeeper)的行为,注意Flink Kafka Consumer 并不依赖于这个提交的offset来进行容错性保证,这个提交的offset仅仅作为监控consumer处理进度的一种手段。
配置offset提交行为的方式有多种,主要取决于Job的checkpoint机制是否启动。  1、checkpoint禁用:如果checkpoint禁用,Flink Kafka Consumer依赖于Kafka 客户端内部的自动周期性offset提交能力。因此,为了启用或者禁用offset提交,仅需在给定的Properties配置中设置enable.auto.commit(Kafka 0.8是auto.commit.enable)/auto.commit.interval.ms为适当的值即可。  2、checkpoint启用:如果checkpoint启用,当checkpoint完成之后,Flink Kafka Consumer将会提交offset保存到checkpoint State中,这就保证了kafka broker中的committed offset与 checkpoint stata中的offset相一致。用户可以在Consumer中调用setCommitOffsetsOnCheckpoints(boolean) 方法来选择启用或者禁用offset committing(默认情况下是启用的)。注意,在这种情况下,配置在Properties中的自动周期性offset提交将会被完全忽略。
Kafka Consumer与Timestamp抽取器和Watermark发射器
在许多情况下,记录的timestamp都是显式或者隐式地嵌入在记录本身中,此外,用户可能想周期性地发射水印或者不规则地发射,例如:根据Kafka 流中包含当前事件时间的特殊记录的水印。为此,Flink Kafka Consumer允许这些指定:AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks。
你可以指定你自定义的timestamp抽取器或者watermark发射器如描述,或者使用一个,之后你就可以按照下面的方式将它们传递到你的consumer中:Java 代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08&String& myConsumer =
new FlinkKafkaConsumer08&&("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream&String& stream = env
.addSource(myConsumer)
Scala 代码:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
.addSource(myConsumer)
在内部,一个分配器实例会被每个Kafka分区执行,当一个实例被指定,这个extractTimestamp(T element, long previousElementTimestamp)方法就会被调用来为记录分配一个timestamp并且Watermark getCurrentWatermark()(周期性的)或者 Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)(不规则的)会被调用来决定新生成的watermark是否需要发射,跟哪儿timestamp一起发射。
Kafka Producer
Flink的Kafka Producer叫做FlinkKafkaProducer08(Kafka 0.9.x版本是FlinkKafkaProducer09),它允许写流记录到一个或者多个Kafka topic中。例如:Java 代码 Kafka 0.8+:
DataStream&String& stream = ...;
FlinkKafkaProducer08&String& myProducer = new FlinkKafkaProducer08&String&(
"localhost:9092",
// broker list
"my-topic",
// target topic
new SimpleStringSchema());
// serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false);
// "false" by default
myProducer.setFlushOnCheckpoint(true);
// "false" by default
stream.addSink(myProducer);
Java 代码 Kafka 0.9+
DataStream&String& stream = ...;
FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
// input stream
"my-topic",
// target topic
new SimpleStringSchema(),
// serialization schema
properties);
// custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false);
// "false" by default
myProducerConfig.setFlushOnCheckpoint(true);
// "false" by default
Scala 代码 Kafka 0.8+
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer08[String](
"localhost:9092",
// broker list
"my-topic",
// target topic
new SimpleStringSchema)
// serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false)
// "false" by default
myProducer.setFlushOnCheckpoint(true)
// "false" by default
stream.addSink(myProducer)
Scala 代码 0.9+
val stream: DataStream[String] = ...
val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
// input stream
"my-topic",
// target topic
new SimpleStringSchema,
// serialization schema
properties)
// custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false)
// "false" by default
myProducerConfig.setFlushOnCheckpoint(true)
// "false" by default
上面例子展示了创建一个Flink Kafka Producer来写流数据到指定的Kafka topic的基本用法:对于更高级的用法,这里有其他变换的构造函数来允许提供下面的功能:  1、提供自定义的属性:Producer允许为内部的KafkaProducer提供自定义的属性配置,请参考来了解如何配置Kafka Producer的详细信息。  2、自定义分区器:为了给记录指定分区,你可以为构造函数提供一个FlinkKafkaPartitioner的实现,这个分区器将会被流中的每个记录调用,来决定记录要被发送到目标topic的哪个确切分区。  3、高级的序列化模式:与consumer类似,Producer允许使用一个叫KeyedSerializationSchema的高级序列化模式,这个模式允许分开地序列化key和value。同时允许重写目标topic,因此一个Producer可以发送数据到多个topic。
Kafka Producer和容错性
在Flink checkpoint开启的情况下,Flink Kafka Producer可以提供至少一次(at-least-once)的发送保证。除了启用Flink的checkpoint之外,你还需要是当地配置setLogFailuresOnly(boolean) 和setFlushOnCheckpoint(boolean)方法,如前面章节的例子所示:  1、setLogFailuresOnly(boolean):启用这个配置将允许producer记录失败日志而不是捕获和抛出它们,这个本质上会认为记录已经成功,即使记录没有写入目标Kafka topic中,对于at-least-once模式来说,这个配置必须禁用。  2、setFlushOnCheckpoint(boolean):启用这个配置,Flink的checkpoint会等待在checkpoint成功之前被Kafka识别的时间内传输的记录,这就保证了所有checkpoint之前的记录都被写入Kafka 中,在at-least-once模式下,这个配置必须启用。
注意:默认情况下,重试次数设置为“0”,这也就意味着setLogFailuresOnly设置为false,producer失败的话会立即报错,包括leader的切换也会报错。这个值默认情况下设置为0是为了避免重试导致重复的消息进入目标topic中。对于大多数频繁切换broker的生产环境中,我们建议将重试次数设置为一个比较高的值。
注意:现在Kafka还没有事务型的producer,所以Flink无法保证精确地(exactly-once)分发消息到一个Kafka topic中。
在Kafka 0.10中使用Kafka timestamp和 Flink的event time
Apache Kafka 0.10之后的版本中,Kafka的消息可以携带,指出了事件发生的时间(参考Apache Flink的)或者消息被写入到Kafka broker的时间。如果Flink的TimeCharacteristic设置为TimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime))的话,FlinkKafkaConsumer010会发射附有timestamp的记录。Kafka consumer并不会发射watermark,为了发射watermark,原理如"Kafka Consumers and Timestamp Extraction/Watermark Emission"所述,使用assignTimestampsAndWatermarks方法。当使用Kafka中的timestamp时,无需定义timestamp抽取器,extractTimestamp()方法中的previousElementTimestamp参数已经包含了Kafka消息所携带的timestamp。一个Kafka consumer的timestamp抽取器如下所示:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementT
如果setWriteTimestampToKafka(true)配置了的话,FlinkKafkaProducer010 会仅发射记录的timestamp。
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);
Kafka Connector 度量
Flink的Kafka Connector通过Flink的提供了一些度量指标来分析connector的行为,producer通过Flink的度量系统输出所有支持的版本的Kafka内部度量指标。consumer则输出所有的Kafka 0.9版本的度量指标,Kafka在它的中列出了所有的度量指标。
除此之外,所有的consumer都会为每个分区暴露出current-offsets和committed-offsets,current-offsets指出了当前分区的偏移位置,这个指出了我们最近检索并成功发射的元素的偏移位置,committed-offsets是最近提交的偏移位置。
Flink中的Kafka Consumer提交offsets到Zookeeper(Kafka 0.8)或者Kafka Broker中(Kafka 0.9+),如果checkpoint禁用的话。offset会周期性地提交。启用checkpoint的话,一旦所有流topology中的操作已经声明它们已经创建为它们的State创建了一个checkpoint,offset会提交。这些为用户提供了提交offset到Zookeeper或者Broker的at-least-once语义。对于offsetcheckpoint到Flink,系统提供了精确的(exactly once)保证。
提交到ZK或者Broker中的offset可以被用来追踪Kafka consumer的读取进度,提交的offset与每个分区中最近的offset的差异叫做consumer lag,如果Flink topology从topic中消费数据的速度小于新数据添加的速度,那么lag会增加,consumer会落后。对于大多数生产发布我们建议监控这个度量来避免不断增加的延迟。
启用Kerberos认证(对于0.9及以上版本)
Flink为Kafka Connector认证到一个配置了Kerberos的Kafka集群提供了一流的支持,仅仅需要在flink-conf.yaml配置Flink来为Kafka启用Kerberos认证即可,如下:1、通过如下设置来配置Kerberos凭证: security.kerberos.login.use-ticket-cache:默认情况下,这个是true并且Flink会由kinit管理的票据缓存中使用Kerberos凭证。注意,当在部署到YARN的Flink 的Kafka Connectors中,Kerberos认证使用票据缓存是不起作用的,这种情况在作业部署到Mesos中也会出现,因为使用票据缓存的认证在Mesos部署中还未支持。 security.kerberos.login.keytab 和 security.kerberos.login.principal:为了使用Kerberos keytabs, 请为这两个配置设置一个值。2、将KafkaClient追加到security.kerberos.login.contexts中:这会告诉Flink来为Kafka 认证所用的Kafka登录场景提供配置的Kerberos凭证。
一旦基于Kerberos的Flink安全启用,你就可以通过Flink Kafka Consumer或者Producer认证到Kafka中,在提供的属性配置中引入下面两个配置,这两个配置会传入到内部的Kafka 客户端。
0条评论或问题
笔记社区是一个面向中高端IT开发者、程序员的知识共享社区,通过网络抓取与文章分类总结,由专家为用户提供高质量的专题文章系列。
原文链接:http://www.jianshu.com/p/c25bde9893a7
声明:所有文章资源均从网络抓取,如果侵犯到您的著作权,请联系删除文章。联系方式请关注微信公众号PMvideo【锤子视频-程序员喜欢的短视频】,笔记社区开发者交流群 。
今日签到0人
关注微信公众号:PMvideo

我要回帖

更多关于 flink kafka 的文章

 

随机推荐