How to use FlinkSQL to read data from Kafka?
To use Flink SQL to read Kafka data, you need to follow the steps below:
- Add the Kafka dependency to the pom.xml file in the Flink project.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
Ensure that ${flink.version} is the version number of Flink.
- Set up an execution environment for Flink SQL.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- Register a Kafka table in Flink SQL.
String createTableSql = "CREATE TABLE kafka_table (\n" +
" key STRING,\n" +
" value STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'your_topic',\n" +
" 'properties.bootstrap.servers' = 'your_bootstrap_servers',\n" +
" 'properties.group.id' = 'your_group_id',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")";
tEnv.executeSql(createTableSql);
In the above code, ‘topic’ and ‘properties.bootstrap.servers’ should be replaced with your Kafka topic and the address of the bootstrap servers. ‘properties.group.id’ is the unique identifier for the Flink consumer group.
Additionally, the ‘format’ parameter specifies the data format, which can be set to the appropriate value based on the actual situation.
- Execute a Flink SQL query.
String querySql = "SELECT * FROM kafka_table";
Table result = tEnv.sqlQuery(querySql);
- Convert the query results into a DataStream.
DataStream<Row> resultStream = tEnv.toAppendStream(result, Row.class);
Now, you can further process the resultStream, such as printing it or writing it to another system.
Finally, remember to call env.execute() to start the Flink job.