使用NiFi HTTP处理器来获取并解压文件,然后将指定模式的文件整理后发送到Kafka
为了达到特定的目标或结果
使用NiFi HTTP处理器获取和解压文件,然后将特定模式的文件进行整理,并将其发送到Kafka。
在本教程中,尝试使用PublishKafkaRecord_0_10处理器。
从远程HTTP服务器获取压缩为ZIP的CSV文件,
然后解压缩。
然后,只将特定的CSV文件传递给PublishKafkaRecord_0_10处理器,将其流式传输到Kafka。
- このチュートリアルはNiFi 1.2から実行可能。
环境
-
- Mac OS X
-
- HDF 3.2
Apache NiFi 1.7.0
Apache Kafka 1.1.0
Schema Registry
发布Kafka记录_0_10(CSV转JSON)
准备Kafka环境
安装HDF 3.2。
安装步骤请参考这里。
导入模板文件
导入后的情况
获取HTTP
请注意以下事项:
解压ZIP文件:
文件名称
准备发送消息到 Kafka 主题
给 Kafka 发送一条消息
模板XML
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description>This tutorial workflow gets CSV data from the GroupLens website and
uses the PublishKafkaRecord processor to:
-Convert the data into JSON
-Publish the JSON data to the Kafka topic "Movies"</description>
<groupId>855d9cd4-015d-1000-8dc5-71f1f96a4779</groupId>
<name>PublishKafkaRecord</name>
<snippet>
<connections>
<id>a21d3e99-68e1-3542-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>3f560dd9-2a0c-3090-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>feca53d8-7bfb-38bd-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>af5788a5-23eb-3d39-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1043.3389261227253</x>
<y>685.9336127468443</y>
</bends>
<bends>
<x>1043.3389261227253</x>
<y>735.9336127468443</y>
</bends>
<destination>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>3f560dd9-2a0c-3090-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>3f560dd9-2a0c-3090-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>bca31e1c-69fc-3691-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>f4cb8328-1431-3494-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>4004c547-2a1a-321b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>edd5e36a-f6d3-3dfb-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>c4612b5a-0fe7-367b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>f4cb8328-1431-3494-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f100ccb4-13c6-3e5c-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>feca53d8-7bfb-38bd-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>movies</selectedRelationships>
<source>
<groupId>451a635e-0803-3845-0000-000000000000</groupId>
<id>c4612b5a-0fe7-367b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>b8531252-2a7c-3648-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-registry</key>
<value>bdeb04f8-7ece-385c-0000-000000000000</value>
</entry>
<entry>
<key>schema-name</key>
<value>${schema.name}</value>
</entry>
<entry>
<key>schema-text</key>
<value>${avro.schema}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
<value>custom</value>
</entry>
<entry>
<key>Value Separator</key>
<value>,</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
<value>"</value>
</entry>
<entry>
<key>Escape Character</key>
<value>\</value>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
<value>true</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<controllerServices>
<id>bdeb04f8-7ece-385c-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-registry-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>movies</key>
<value>
<name>movies</name>
</value>
</entry>
</descriptors>
<name>AvroSchemaRegistry</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>movies</key>
<value>{
"type": "record",
"name": "MoviesRecord",
"fields" : [
{"name": "movieId", "type": "long"},
{"name": "title", "type": ["null", "string"]},
{"name": "genres", "type": ["null", "string"]}
]
}</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
</controllerServices>
<controllerServices>
<id>290d56c9-1e35-3dba-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>
<name>Pretty Print JSON</name>
</value>
</entry>
</descriptors>
<name>JsonRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-registry</key>
<value>bdeb04f8-7ece-385c-0000-000000000000</value>
</entry>
<entry>
<key>schema-name</key>
<value>${schema.name}</value>
</entry>
<entry>
<key>schema-text</key>
<value>${avro.schema}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>false</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonRecordSetWriter</type>
</controllerServices>
<processors>
<id>c4612b5a-0fe7-367b-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>441.2567407123754</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>movies</key>
<value>
<name>movies</name>
</value>
</entry>
<entry>
<key>ratings</key>
<value>
<name>ratings</name>
</value>
</entry>
<entry>
<key>tags</key>
<value>
<name>tags</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>movies</key>
<value>${filename:equals('movies.csv')}</value>
</entry>
<entry>
<key>ratings</key>
<value>${filename:equals('ratings.csv')}</value>
</entry>
<entry>
<key>tags</key>
<value>${filename:equals('tags.csv')}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>movies</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>ratings</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>tags</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>f4cb8328-1431-3494-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>224.95887663044255</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Packaging Format</key>
<value>
<name>Packaging Format</name>
</value>
</entry>
<entry>
<key>File Filter</key>
<value>
<name>File Filter</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Packaging Format</key>
<value>zip</value>
</entry>
<entry>
<key>File Filter</key>
<value>.*</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Unzip</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.UnpackContent</type>
</processors>
<processors>
<id>feca53d8-7bfb-38bd-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>645.9336127468443</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>schema.name</key>
<value>
<name>schema.name</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>schema.name</key>
<value>movies</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Add Schema Name Attribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>3f560dd9-2a0c-3090-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<position>
<x>588.3389261227253</x>
<y>645.9336127468443</y>
</position>
<bundle>
<artifact>nifi-kafka-0-10-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>record-reader</name>
</value>
</entry>
<entry>
<key>record-writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>record-writer</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.principal</key>
<value>
<name>sasl.kerberos.principal</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.keytab</key>
<value>
<name>sasl.kerberos.keytab</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>acks</key>
<value>
<name>acks</name>
</value>
</entry>
<entry>
<key>message-key-field</key>
<value>
<name>message-key-field</name>
</value>
</entry>
<entry>
<key>max.request.size</key>
<value>
<name>max.request.size</name>
</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>
<name>ack.wait.time</name>
</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>
<name>max.block.ms</name>
</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>
<name>partitioner.class</name>
</value>
</entry>
<entry>
<key>compression.type</key>
<value>
<name>compression.type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>topic</key>
<value>Movies</value>
</entry>
<entry>
<key>record-reader</key>
<value>b8531252-2a7c-3648-0000-000000000000</value>
</entry>
<entry>
<key>record-writer</key>
<value>290d56c9-1e35-3dba-0000-000000000000</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>sasl.kerberos.principal</key>
</entry>
<entry>
<key>sasl.kerberos.keytab</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>acks</key>
<value>0</value>
</entry>
<entry>
<key>message-key-field</key>
</entry>
<entry>
<key>max.request.size</key>
<value>1 MB</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>5 secs</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>5 sec</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
</entry>
<entry>
<key>compression.type</key>
<value>none</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Publish to "Movies" Topic</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10</type>
</processors>
<processors>
<id>4004c547-2a1a-321b-0000-000000000000</id>
<parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>URL</key>
<value>
<name>URL</name>
</value>
</entry>
<entry>
<key>Filename</key>
<value>
<name>Filename</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Username</key>
<value>
<name>Username</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Connection Timeout</key>
<value>
<name>Connection Timeout</name>
</value>
</entry>
<entry>
<key>Data Timeout</key>
<value>
<name>Data Timeout</name>
</value>
</entry>
<entry>
<key>User Agent</key>
<value>
<name>User Agent</name>
</value>
</entry>
<entry>
<key>Accept Content-Type</key>
<value>
<name>Accept Content-Type</name>
</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>
<name>Follow Redirects</name>
</value>
</entry>
<entry>
<key>redirect-cookie-policy</key>
<value>
<name>redirect-cookie-policy</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Port</key>
<value>
<name>Proxy Port</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>URL</key>
<value>http://files.grouplens.org/datasets/movielens/ml-20m.zip</value>
</entry>
<entry>
<key>Filename</key>
<value>ml-20m.zip</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Username</key>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Connection Timeout</key>
<value>30 sec</value>
</entry>
<entry>
<key>Data Timeout</key>
<value>30 sec</value>
</entry>
<entry>
<key>User Agent</key>
</entry>
<entry>
<key>Accept Content-Type</key>
</entry>
<entry>
<key>Follow Redirects</key>
<value>false</value>
</entry>
<entry>
<key>redirect-cookie-policy</key>
<value>default</value>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Port</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>10 mins</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Get Movie Data</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GetHTTP</type>
</processors>
</snippet>
<timestamp>09/11/2017 11:33:49 EDT</timestamp>
</template>
请参考以下资料
请查看下列参考资料
您可以参考以下资料
- Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+