SQL结果到kafka
通过SQL语句,用户可以将查询的结果作为事件导出到kafka。
前提条件
- Kafka的服务器地址必须是炎凰数据平台可达的,并且需要导出到的topic在kafka中事先创建好。
- 需要提前使用SQL语句创建一张映射到kafka配置的外部表。
- 暂时不支持kafka的用户名密码验证,因此导出到的kafka实例需要关闭用户名密码认证。
创建外部表映射到kafka配置
为了使用SQL将查询结果导出到某个kafka实例,我们需要先在平台中创建一张外部表,这张外部表配置了目标kafka实例必需的参数,可以理解为对目标kafka实例的一种映射。 用户成功创建了一张kafka类型的外部表后,再往这张外部表中插入查询结果,即等同于往目标kafka实例导出查询结果。 平台中创建一张映射kafka实例的外部表,其语法如下:
CREATE [OR REPLACE] TABLE <table_name> ENGINE=kafka WITH (server_url=<server_ip>,server_port=<server_port>,topic=<topic_name>)
这里table_name
是外部表的唯一标识,它不能与平台中的数据集、视图、物化视图等重名;ENGINE=kafka
表示这是一张映射到kafka的外部表;
WITH
关键字后面跟着的是逗号分隔的配置参数,每一组参数以key=value
的格式赋值,kafka类型的外部表必需的参数列表说明如下:
参数说明:
参数 | 说明 |
---|---|
server_url | kafka服务的IP地址或者有效URL |
server_port | kafka服务使用的连接端口 |
topic | kafka服务上存在的topic名称 |
例如,我们先创建一张名为kafka_0
的外部表,映射到一个地址为1.1.1.1:9999
的kafka实例,这个实例上已经有一个名为new-events
的topic用于接收数据导入:
CREATE OR REPLACE TABLE kafka_0 ENGINE=kafka WITH (server_url='1.1.1.1',server_port='9999',topic='new-events')
创建外部表成功后,我们可以通过查看对应的SHOW TABLES
语法来查看外部表是否创建成功,语法如下:
SHOW [FULL] TABLES [WHERE identity={table_name}]
例如,我们可以直接列出所有外部表的名称,查看是否有我们刚刚创建的表:
SHOW TABLES
我们还可以查看我们创建的外部表的详细配置是否正确,通过下列语句列出外部表的具体参数:
SHOW FULL TABLES WHERE identity='kafka_0'
当我们确认外部表配置无误后,我们就可以通过SQL将查询结果导出到kafka,其语法与将查询结果导入到某个数据集相同:
INSERT INTO <table_name> <query_expr>
例如,通过以下SQL语句,将表函数ip_location
查询的结果导入出到kafka_0
映射的kafka实例,查询结果将被导出到对应kafka的new-events
这个topic中:
INSERT INTO kafka_0 SELECT * FROM ip_location('43.228.180.166')
如果我们不再需要这个映射到kafka的外部表,我们也可以通过SQL将其删除,语法如下。注意删除外部表只是删除该kafka实例在平台上的映射配置,不会删除已经导出到对应kafka实例的数据。
DROP TABLE <table_name>
例如,删除掉名为kafka_0的外部表:
DROP TABLE kafka_0
{% include note.html content="目标topic必须存在" %}
导出查询流程
- 在查询页面中,执行
CREATE TABLE
类型的查询,将目标kafka必需的配置存成一张kafka类型的外部表。 - 在查询页面中,执行一个
INSERT INTO <table_name> <query_expr>
类型的查询。该类型的查询包含了查询结果以及将结果导出到kafka两部分。 - 查询部分完成后,会返回给用户状态信息,提示用户导出任务是否已经被触发。将查询结果导出到kafka服务的工作将会在后台继续运行。
- 用户可前往查询任务页面,查看本次查询结果导出的状态,待查询结果以及结果导出两部分工作都完成后,该任务状态将显示为
已完成
。