Apache Kafka开源流式KSQL实战
小编:啊南 552阅读 2020.11.30
部署架构
由一个KSQL服务器进程执行查询。一组KSQL进程可以作为集群运行。可以通过启动更多的KSQL实例来动态添加更多的处理能力。这些KSQL实例是容错的,如果一个实例失败了,其他的就会接管它的工作。查询是使用交互式的KSQL命令行客户端启动的,该客户端通过REST API向集群发送命令。命令行允许检查可用的stream和table,发出新的查询,检查状态并终止正在运行的查询。KSQL内部是使用Kafka的stream API构建的,它继承了它的弹性可伸缩性、先进的状态管理和容错功能,并支持Kafka最近引入的一次性处理语义。KSQL服务器将此嵌入到一个分布式SQL引擎中(包括一些用于查询性能的自动字节代码生成)和一个用于查询和控制的REST API。
处理架构抽象概念
KSQL简化了流应用程序,它集成了stream和table的概念,允许使用表示现在发生的事件的stream来连接表示当前状态的table。Apache Kafka中的一个topic可以表示为KSQL中的STREAM或TABLE,具体取决于topic处理的预期语义。下面看看两个核心的解读。
stream:流是无限制的结构化数据序列,stream中的fact是不可变的,这意味着可以将新fact插入到stream中,但是现有fact永远不会被更新或删除。stream可以从Kafka topic创建,或者从现有的stream和table中派生。
table:一个table是一个stream或另一个table的视图,它代表了一个不断变化的fact的集合,它相当于传统的数据库表,但通过流化等流语义来丰富。表中的事实是可变的,这意味着可以将新的事实插入到表中,现有的事实可以被更新或删除。可以从Kafka主题中创建表,也可以从现有的流和表中派生表。
部署
ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默认并没有加入ksql server程序,当然V3和V4是支持ksql的,在V5版本中已经默认加入ksql了,为了方便演示,我们使用confluent kafka V5版本演示,zk和kafka也是单实例启动。
下载
wget https://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz tar zxvf confluent-oss-5.0.0-2.11.tar.gz -C /opt/programs/confluent_5.0.0
启动zk
cd /opt/programs/confluent_5.0.0
bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
启动kafka
cd /opt/programs/confluent_5.0.0
bin/kafka-server-start -daemon etc/kafka/server.properties
创建topic和data
confluent自带了一个ksql-datagen工具,可以创建和产生相关的topic和数据,ksql-datagen可以指定的参数如下:
[bootstrap-server=
[quickstart=
schema=
[schemaRegistryUrl=
format=
topic=
key=
[iterations=
[maxInterval=
[propertiesFile=
创建pageviews,数据格式为delimited
cd /opt/programs/confluent_5.0.0/bin
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
ps:以上命令会源源不断在stdin上输出数据,就是工具自己产生的数据,如下样例
8001 --> ([ 1539063767860 | 'User_6' | 'Page_77' ]) ts:1539063767860
8011 --> ([ 1539063767981 | 'User_9' | 'Page_75' ]) ts:1539063767981
8021 --> ([ 1539063768086 | 'User_5' | 'Page_16' ]) ts:1539063768086
不过使用consumer消费出来的数据是如下样式
1539066430530,User_5,Page_29
1539066430915,User_6,Page_74
1539066431192,User_4,Page_28
1539066431621,User_6,Page_38
1539066431772,User_7,Page_29
1539066432122,User_8,Page_34
创建users,数据格式为json
cd /opt/programs/confluent_5.0.0/bin
./ksql-datagen quickstart=users format=json topic=users maxInterval=100
ps:以上命令会源源不断在stdin上输出数据,就是工具自己产生的数据,如下样例
User_5 --> ([ 1517896551436 | 'User_5' | 'Region_5' | 'MALE' ]) ts:1539063787413
User_7 --> ([ 1513998830510 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1539063787430
User_6 --> ([ 1514865642822 | 'User_6' | 'Region_2' | 'MALE' ]) ts:1539063787481
不过使用consumer消费出来的数据是如下样式
{"registertime":1507118206666,"userid":"User_6","regionid":"Region_7","gender":"OTHER"}
{"registertime":1506192314325,"userid":"User_1","regionid":"Region_1","gender":"MALE"}
{"registertime":1489277749526,"userid":"User_6","regionid":"Region_4","gender":"FEMALE"}
{"registertime":1497188917765,"userid":"User_9","regionid":"Region_3","gender":"OTHER"}
{"registertime":1493121964253,"userid":"User_4","regionid":"Region_3","gender":"MALE"}
{"registertime":1515609444511,"userid":"User_5","regionid":"Region_9","gender":"FEMALE"}
启动ksql
cd /opt/programs/confluent_5.0.0
bin/ksql-server-start -daemon etc/ksql/ksql-server.properties
连接ksql
cd /opt/programs/confluent_5.0.0
bin/ksql http://10.205.151.145:8088
创建stream和tablestream
根据topic pageviews创建一个stream pageviews_original,value_format为DELIMITED
ksql>CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
table
根据topic users创建一个table users_original,value_format为json
ksql>CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH (kafka_topic='users', value_format='JSON', key = 'userid');
查询数据
ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
ksql> SELECT * FROM pageviews_original LIMIT 3;
ps:ksql默认是从kafka最新的数据查询消费的,如果你想从开头查询,则需要在会话上进行设置:SET 'auto.offset.reset' = 'earliest';
持久化查询
持久化查询可以源源不断的把查询出的数据发送到你指定的topic中去,查询的时候在select前面添加create stream关键字即可创建持久化查询。
创建查询
ksql> CREATE STREAM pageviews2 AS SELECT userid FROM pageviews_original;
查询新stream
ksql> SHOW STREAMS;
ps:可以看到新创建了stream PAGEVIEWS2,并且创建了topic PAGEVIEWS2
查询执行任务
ksql> SHOW QUERIES;
ps:可以看到ID为CSAS_PAGEVIEWS2_0的任务在执行,并且有显示执行的语句
消费新数据
cd /opt/programs/confluent_5.0.0/bin
./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic
PAGEVIEWS2
ps:可以看到PAGEVIEWS2 topic里面正是我们通过select筛选出来的数据
终止查询任务
ksql> TERMINATE CSAS_PAGEVIEWS2_0;
相关推荐
- SQL Server 常用函数 1.获取当前时间--GetDate()2.DatePart() 函数3.字符串截取--substring三个参数,第一个参数需要截取的,第二个参数,截取字符串的起始位置(起始位置是1),第三个参数需要截取字符串的长度。4.日期增减函数-DateAdd三个参数,第一参数合法的日期表达式,第二个参…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…