通过 pipes 配合 insert into select files 增量导入
Contents
[NOTE] Updated August 15, 2025. This article may have outdated content or subject matter.
测试目的
- json 文件数据导入到 StarRocks 数据指定表并按列存储字段
- json 文件所在 s3 目录新增文件后,数据库自动导入新增文件
- json 文件数据避免重复
测试过程
- 部署 1 套 v3.3.14 版本 StarRocks 集群(单节点 FE、BE)
- 部署 1 套 Minio 服务(本地 binary 启动)
测试数据样本
- csv 文件数据样本,保存到部署 StarRocks 的节点 1 份
| |
- 需要处理 S3 目录如下场景
- 单个目录下可能存在多种文件格式
- 单个目录或多个目录存在多个文件(每隔数秒新增 1 个文件)
- 目录内已有文件可能存在更新(文件大小的变化)
StarRocks 验证
- 新建数据表,采用 duplicate key 表是为了验证多次创建任务对数据的影响
| |
- 使用 select files 基于本地文件测试读取
- 更多使用姿势参考 https://docs.starrocks.io/zh/docs/sql-reference/sql-functions/table-functions/files/
- json 数据按照 csv 文件放在 /opt/ 目录下
| |
| |
- 测试将数据配合 insert into 写入到目标表(test.ooojson)
- 通过 show load 可以跟踪到该 ETL 任务执行成功
| |
- 通过 pipe 循环写入(此时需要使用到 minio)
- 语法参考:https://docs.starrocks.io/zh/docs/loading/s3/
- 创建 pipe 能力,如下方式确定数据范围【如果变化就导入,如果无任何变化不会触发导入】
"path" = "s3://ooopipe/ooojson.csv",这个只会监控 ooojson 单个文件,不会判断其他文件"path" = "s3://ooopipe/ooo*.csv",这种会监控 ooo 开头的文件"path" = "s3://ooopipe/*.csv",这会监控所有 csv 结尾的文件- 支持使用通配符匹配路径,比如:
"path" = "s3://abc*/dt=*/ooopipe/*.csv"
| |
- 查看 pipe 异步任务执行状态
- 通过 show pipes 查看任务运行状态‘
- 或者
select * from information_schema.pipe_files查看文件导入历史记录
| |
- pipe 已知信息
- 以官网文档为准 官网文档
- 当 S3 一次新增多个文件时候
- 默认:文件个数不大于 BATCH_FILES = 256 个,文件大小不超过 BATCH_SIZE = 1GB 的时候,多个文件会占用 1 个 insert into select 导入能力加载到数据库
- 当新增文件体量超过如上任意条件时,会切分成多个 pipe 任务导入;参考:
fe/fe-core/src/main/java/com/starrocks/load/pipe/FilePipeSource.java- 假设新增 100 个文件,总加起来不超过 1G ,会使用 1 个 task 并发;path 可以设置一次导入多个文件
- 假设新增 50 个文件 && 超过 1G ,会使用多个 task 并发
- 假设新增 500 个文件 && 不超过 1G ,会使用多个 task 并发
- 并发个数被 FE 参数管理:
task_runs_concurrency=4;参考:fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java
- pipe 任务与数据导入
- 可查看:
fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java - 任务删除后,已经导入的记录会被删除
- 重新创建任务,会使用 files() 函数内的 path 信息重新匹配,数据可能存在重复导入
- 当 sink table 是 primary key 表时,该行为可以保证最终幂等性,但是数据加载占用的资源不会忽略
- 当 sink table 是 duplicate key 表时,该行为导致数据重复导入,数据会出现重复
- 数据表删除时,pipe 任务会联动删除
- 可查看:
- pipe 相关资料