所谓 NIFI
,简单的说就是为了解决不同系统间数据自动流通问题而建立的。虽然 dataflow
这个术语在各种场景都有被使用,但我们在这里使用它来表示不同系统间的自动化的可管理的信息流。
这里简单介绍
NIFI
常用操作和组件基本使用介绍。
一、EL表达式
通常将需要判断的值提到流文件属性,再通过 EL
表达式进行判断。
这里仅列举日常流程中较常用的表达式,详细文档参考官方教程。
1. 字符串
字符串对应的 EL
表达式如下:
函数 | 表达式 |
---|---|
UUID | ${UUID()} |
Equals | ${filename:equals('hello.txt')} |
IsNull | ${filename:isNull()} |
IsEmpty | ${filename:isEmpty()} |
NotNull | ${filename:notNull()} |
Append | ${filename:append('abc')} |
Replace | ${filename:replace('.', '_')} |
Substring | ${filename:substring(start, end)} |
IfElse | ${filename:isNull():ifElse('file null', 'file not null')} |
2. 时间
相对应的时间 EL
表达式如下:
函数 | 表达式 |
---|---|
Now | ${now()} |
Format | ${now():format('yyyy')} |
Timestamp | ${now():toNumber()} |
二、文件读取
1. FTP文件
(1) 读取
配置 FTP
地址以及用户名密码,属性 Delete Original
用于控制在获取文件之后是否删除源文件。
(2) 存入
配置 FTP
地址和用户名密码,以及需要存放的路径,配置冲突文件进行覆盖。
2. 本地文件
(1) 读取
配置本地文件夹路径,会自动获取文件夹内所有文件,属性 Keep Source File
用于控制在获取文件之后是否删除源文件。
(2) 存入
配置需要存放的路径以及冲突文件进行覆盖。
三、内容操作
1. GenerateFlowFile
(1) 作用
通过配置属性可以生产对应流文件,通常用于生成测试数据。
(2) 使用
通过设置 Custome Text
属性值配置生成的流文件内容,可以填入 Json
格式数据生成测试流文件。
2. LogAttribute
(1) 作用
记录日志,通常将组件 Failture
关系连到 LogAttribute
。
3. UpdateAttribute
(1) 作用
用于更新流文件属性,配置 EL
表达式进行使用。
(2) 使用
通过右上角加号新增属性,属性名自定义,属性值可以取固定值,也可以通过 EL
表达式进行赋值。当流文件经过此组件之后,将会自动添加这条属性。
4. EvaluateJsonPath
(1) 作用
当流文件内容为 Json
格式数据时,可以提取相应的键值对值,其中 Destination
设置将取到的值添加到流文件内容还是属性。
(2) 使用
例如此时流文件内容如下:
{
"id": 123,
"name": "budai"
}
在通过新增一条属性 id
,并赋值为 $.id
,即可获取上述中的 123
。
5. JoltTransformJSON
(1) 作用
通常用于过滤 Json
格式的数据。
(2) 使用
假设流文件有如下格式数据,而我们只需要保留 id
的值。
{
"id": 123,
"name": "budai"
}
通过配置 Jolt Specification
属性的值为为如下所示,即可对数据进行过滤。
[{
"operation":"shift",
"spec":{
"id":"id"
}
}]
最终得到的数据如下:
{
"id": 123
}
6. ExtractText
(1) 作用
通常用于将流文件内容提取到属性上。
(2) 使用
在通过新增一条属性 content
,并赋值为 $.*
,当流文件经过此组件之后将会自动添加该属性,值为流文件内容。
不过需要注意一点,属性长度有长度限制,过长时会自动截断。
7. RouteOnAttribute
(1) 作用
通常用于路由判断,利用 UpdateAttribute
与 EvaluateJsonPath
对流文件属性进行修改,再根据 EL
表达式进行判断。
(2) 使用
新建一条属性,根据需求设置EL表达式,这条属性将转化为该组件的一条关系。
8. ReplaceText
(1) 作用
用于替换流文件内容值, Search Value
设置需要被替换的内容, Replacement Values
设置替换后的内容。
(2) 使用
可以进行单值替换,如将流文件内容中 a
替换为 A
。
同时可以进行全局替换,如将流文件内容更替为 this is a test
。
9. UpdateRecord
(1) 作用
用于更新流文件内容值,若已存在则进行覆盖,更新前后的字段类型需一致。
(2) 使用
配置流文件的读写服务,需要注意在属性名之前要加上斜杠 /
。如我们新建属性 /test
,当流文件经过此组件之后,内容将会自动添加 test
的键值对。
四、格式转化
1. SplitText
(1) 作用
用于拆分文本,如读取的 CSV
文件进行拆分为单条数据。
2. SplitRecord
(1) 作用
用于将单个流文件拆分为多个流文件。
(2) 使用
需要配置对应的流文件读写服务并启动。
3. MergeRecord
(1) 作用
用于将多个流文件合并为单个流文件。
(2) 使用
配置相对简单,只需根据流文件格式新建相应的读写服务并点击右侧箭头进行启用即可。
4. ConvertRecord
(1) 作用
用于转化流文件格式,例如数据库抽取的数据格式为 Avro
,但 EvaluateJsonPath
等组件只能对 Json
格式文件进行操作,需要进行格式转换。
(2) 使用
五、网络请求
1. InvokeHttp
(1) 作用
通常用于请求网络接口以获取数据。
(2) 使用
根据场景选择不同请求方式,如 Get、Post
,填上请求地址即可。
2. HandleHttpRequest
(1) 作用
配合 HandleHttpResponse
使用用于生成接口。
(2) 使用
设置访问端口以及接口路径,对应的Http服务除了 Post
属性其余设置为 false
创建服务并启动。
注意对应的服务必须和 HandleHttpRequest
中使用的一致。
3. HandleHttpResponse
(1) 作用
通过 HandleHttpResponse
与 HandleHttpRequest
两个组件相配合可以生成接口。
如通过 Post
方法传入 Json
数据,通过上诉其余组件组合使用实现对数据的修改,最后返回相应的结果
(1) 使用
设置状态码为 200
,创建服务并启动,注意其使用的 HTTP Context Map
服务必须和 HandleHttpRequest
中使用的为同一个。
六、数据库操作
1. 服务配置
(1) MySQL
驱动包需要提前下载到本地,填写绝对路径即可。
jdbc:mysql://{host}:{port}/{database}
com.mysql.jdbc.Driver
(2) Oracle
基本配置和 MySQL
一致,更改 URL
格式和Class Name
即可。
jdbc:oracle:thin:@{host}:{port}:{database}
oracle.jdbc.OracleDriver
2. ExecuteSQL
(1) 作用
用于执行 SQL
语句,配置连接池服务即可执行预期 SQL
命令,当没有填写时会自动读取上游流文件内容进行执行,可以根据需要提前拼凑语句。
(2) 使用
注意第三个红框中的属性值需要设置为true
。
3. QueryDatabaseTable
(1) 作用
用于查询数据表记录,需要配置连接池服务。
(2) 使用
4. PutDatabaseRecord
(1) 作用
用于插入数据表记录,需要配置连接池服务。