使用NiFi HTTP处理器来获取并解压文件,然后将指定模式的文件整理后发送到Kafka

为了达到特定的目标或结果

使用NiFi HTTP处理器获取和解压文件,然后将特定模式的文件进行整理,并将其发送到Kafka。

在本教程中,尝试使用PublishKafkaRecord_0_10处理器。
从远程HTTP服务器获取压缩为ZIP的CSV文件,
然后解压缩。
然后,只将特定的CSV文件传递给PublishKafkaRecord_0_10处理器,将其流式传输到Kafka。

    このチュートリアルはNiFi 1.2から実行可能。

环境

    • Mac OS X

 

    • HDF 3.2

Apache NiFi 1.7.0
Apache Kafka 1.1.0
Schema Registry

发布Kafka记录_0_10(CSV转JSON)

准备Kafka环境

安装HDF 3.2。
安装步骤请参考这里。

导入模板文件

86CC1E15-F71B-46EF-9A32-C5DF41D4B100.png
image.png

导入后的情况

image.png

获取HTTP

image.png

请注意以下事项:

image.png

解压ZIP文件:

image.png

文件名称

image.png

准备发送消息到 Kafka 主题

image.png

给 Kafka 发送一条消息

image.png

模板XML

<?xml version="1.0" ?>
<template encoding-version="1.1">
  <description>This tutorial workflow gets CSV data from the GroupLens website and
uses the PublishKafkaRecord processor to:
-Convert the data into JSON
-Publish the JSON data to the Kafka topic "Movies"</description>
  <groupId>855d9cd4-015d-1000-8dc5-71f1f96a4779</groupId>
  <name>PublishKafkaRecord</name>
  <snippet>
    <connections>
      <id>a21d3e99-68e1-3542-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>feca53d8-7bfb-38bd-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>af5788a5-23eb-3d39-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <bends>
        <x>1043.3389261227253</x>
        <y>685.9336127468443</y>
      </bends>
      <bends>
        <x>1043.3389261227253</x>
        <y>735.9336127468443</y>
      </bends>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>failure</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>3f560dd9-2a0c-3090-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>bca31e1c-69fc-3691-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>f4cb8328-1431-3494-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>4004c547-2a1a-321b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>edd5e36a-f6d3-3dfb-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>c4612b5a-0fe7-367b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>success</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>f4cb8328-1431-3494-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <connections>
      <id>f100ccb4-13c6-3e5c-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
      <backPressureObjectThreshold>10000</backPressureObjectThreshold>
      <destination>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>feca53d8-7bfb-38bd-0000-000000000000</id>
        <type>PROCESSOR</type>
      </destination>
      <flowFileExpiration>0 sec</flowFileExpiration>
      <labelIndex>1</labelIndex>
      <name></name>
      <selectedRelationships>movies</selectedRelationships>
      <source>
        <groupId>451a635e-0803-3845-0000-000000000000</groupId>
        <id>c4612b5a-0fe7-367b-0000-000000000000</id>
        <type>PROCESSOR</type>
      </source>
      <zIndex>0</zIndex>
    </connections>
    <controllerServices>
      <id>b8531252-2a7c-3648-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-record-serialization-services-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>schema-access-strategy</key>
          <value>
            <name>schema-access-strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>
            <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
            <name>schema-registry</name>
          </value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>
            <name>schema-name</name>
          </value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>
            <name>schema-text</name>
          </value>
        </entry>
        <entry>
          <key>Date Format</key>
          <value>
            <name>Date Format</name>
          </value>
        </entry>
        <entry>
          <key>Time Format</key>
          <value>
            <name>Time Format</name>
          </value>
        </entry>
        <entry>
          <key>Timestamp Format</key>
          <value>
            <name>Timestamp Format</name>
          </value>
        </entry>
        <entry>
          <key>CSV Format</key>
          <value>
            <name>CSV Format</name>
          </value>
        </entry>
        <entry>
          <key>Value Separator</key>
          <value>
            <name>Value Separator</name>
          </value>
        </entry>
        <entry>
          <key>Skip Header Line</key>
          <value>
            <name>Skip Header Line</name>
          </value>
        </entry>
        <entry>
          <key>Quote Character</key>
          <value>
            <name>Quote Character</name>
          </value>
        </entry>
        <entry>
          <key>Escape Character</key>
          <value>
            <name>Escape Character</name>
          </value>
        </entry>
        <entry>
          <key>Comment Marker</key>
          <value>
            <name>Comment Marker</name>
          </value>
        </entry>
        <entry>
          <key>Null String</key>
          <value>
            <name>Null String</name>
          </value>
        </entry>
        <entry>
          <key>Trim Fields</key>
          <value>
            <name>Trim Fields</name>
          </value>
        </entry>
      </descriptors>
      <name>CSVReader</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>schema-access-strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>bdeb04f8-7ece-385c-0000-000000000000</value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>${schema.name}</value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>${avro.schema}</value>
        </entry>
        <entry>
          <key>Date Format</key>
        </entry>
        <entry>
          <key>Time Format</key>
        </entry>
        <entry>
          <key>Timestamp Format</key>
        </entry>
        <entry>
          <key>CSV Format</key>
          <value>custom</value>
        </entry>
        <entry>
          <key>Value Separator</key>
          <value>,</value>
        </entry>
        <entry>
          <key>Skip Header Line</key>
          <value>true</value>
        </entry>
        <entry>
          <key>Quote Character</key>
          <value>"</value>
        </entry>
        <entry>
          <key>Escape Character</key>
          <value>\</value>
        </entry>
        <entry>
          <key>Comment Marker</key>
        </entry>
        <entry>
          <key>Null String</key>
        </entry>
        <entry>
          <key>Trim Fields</key>
          <value>true</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.csv.CSVReader</type>
    </controllerServices>
    <controllerServices>
      <id>bdeb04f8-7ece-385c-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-registry-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>movies</key>
          <value>
            <name>movies</name>
          </value>
        </entry>
      </descriptors>
      <name>AvroSchemaRegistry</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>movies</key>
          <value>{
  "type": "record",
  "name": "MoviesRecord",
  "fields" : [
    {"name": "movieId", "type": "long"},
    {"name": "title", "type": ["null", "string"]},
    {"name": "genres", "type": ["null", "string"]}
  ]
}</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
    </controllerServices>
    <controllerServices>
      <id>290d56c9-1e35-3dba-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <bundle>
        <artifact>nifi-record-serialization-services-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <comments></comments>
      <descriptors>
        <entry>
          <key>Schema Write Strategy</key>
          <value>
            <name>Schema Write Strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-access-strategy</key>
          <value>
            <name>schema-access-strategy</name>
          </value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>
            <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
            <name>schema-registry</name>
          </value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>
            <name>schema-name</name>
          </value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>
            <name>schema-text</name>
          </value>
        </entry>
        <entry>
          <key>Date Format</key>
          <value>
            <name>Date Format</name>
          </value>
        </entry>
        <entry>
          <key>Time Format</key>
          <value>
            <name>Time Format</name>
          </value>
        </entry>
        <entry>
          <key>Timestamp Format</key>
          <value>
            <name>Timestamp Format</name>
          </value>
        </entry>
        <entry>
          <key>Pretty Print JSON</key>
          <value>
            <name>Pretty Print JSON</name>
          </value>
        </entry>
      </descriptors>
      <name>JsonRecordSetWriter</name>
      <persistsState>false</persistsState>
      <properties>
        <entry>
          <key>Schema Write Strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-access-strategy</key>
          <value>schema-name</value>
        </entry>
        <entry>
          <key>schema-registry</key>
          <value>bdeb04f8-7ece-385c-0000-000000000000</value>
        </entry>
        <entry>
          <key>schema-name</key>
          <value>${schema.name}</value>
        </entry>
        <entry>
          <key>schema-text</key>
          <value>${avro.schema}</value>
        </entry>
        <entry>
          <key>Date Format</key>
        </entry>
        <entry>
          <key>Time Format</key>
        </entry>
        <entry>
          <key>Timestamp Format</key>
        </entry>
        <entry>
          <key>Pretty Print JSON</key>
          <value>false</value>
        </entry>
      </properties>
      <state>ENABLED</state>
      <type>org.apache.nifi.json.JsonRecordSetWriter</type>
    </controllerServices>
    <processors>
      <id>c4612b5a-0fe7-367b-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>441.2567407123754</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Routing Strategy</key>
            <value>
              <name>Routing Strategy</name>
            </value>
          </entry>
          <entry>
            <key>movies</key>
            <value>
              <name>movies</name>
            </value>
          </entry>
          <entry>
            <key>ratings</key>
            <value>
              <name>ratings</name>
            </value>
          </entry>
          <entry>
            <key>tags</key>
            <value>
              <name>tags</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Routing Strategy</key>
            <value>Route to Property name</value>
          </entry>
          <entry>
            <key>movies</key>
            <value>${filename:equals('movies.csv')}</value>
          </entry>
          <entry>
            <key>ratings</key>
            <value>${filename:equals('ratings.csv')}</value>
          </entry>
          <entry>
            <key>tags</key>
            <value>${filename:equals('tags.csv')}</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>RouteOnAttribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>movies</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>ratings</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>tags</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>unmatched</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
    </processors>
    <processors>
      <id>f4cb8328-1431-3494-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>224.95887663044255</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Packaging Format</key>
            <value>
              <name>Packaging Format</name>
            </value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>
              <name>File Filter</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Packaging Format</key>
            <value>zip</value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>.*</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Unzip</name>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>original</name>
      </relationships>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.UnpackContent</type>
    </processors>
    <processors>
      <id>feca53d8-7bfb-38bd-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>645.9336127468443</y>
      </position>
      <bundle>
        <artifact>nifi-update-attribute-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Delete Attributes Expression</key>
            <value>
              <name>Delete Attributes Expression</name>
            </value>
          </entry>
          <entry>
            <key>Store State</key>
            <value>
              <name>Store State</name>
            </value>
          </entry>
          <entry>
            <key>Stateful Variables Initial Value</key>
            <value>
              <name>Stateful Variables Initial Value</name>
            </value>
          </entry>
          <entry>
            <key>schema.name</key>
            <value>
              <name>schema.name</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Delete Attributes Expression</key>
          </entry>
          <entry>
            <key>Store State</key>
            <value>Do not store state</value>
          </entry>
          <entry>
            <key>Stateful Variables Initial Value</key>
          </entry>
          <entry>
            <key>schema.name</key>
            <value>movies</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Add Schema Name Attribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
    </processors>
    <processors>
      <id>3f560dd9-2a0c-3090-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>588.3389261227253</x>
        <y>645.9336127468443</y>
      </position>
      <bundle>
        <artifact>nifi-kafka-0-10-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>bootstrap.servers</key>
            <value>
              <name>bootstrap.servers</name>
            </value>
          </entry>
          <entry>
            <key>topic</key>
            <value>
              <name>topic</name>
            </value>
          </entry>
          <entry>
            <key>record-reader</key>
            <value>
              <identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
              <name>record-reader</name>
            </value>
          </entry>
          <entry>
            <key>record-writer</key>
            <value>
              <identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
              <name>record-writer</name>
            </value>
          </entry>
          <entry>
            <key>security.protocol</key>
            <value>
              <name>security.protocol</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.service.name</key>
            <value>
              <name>sasl.kerberos.service.name</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.principal</key>
            <value>
              <name>sasl.kerberos.principal</name>
            </value>
          </entry>
          <entry>
            <key>sasl.kerberos.keytab</key>
            <value>
              <name>sasl.kerberos.keytab</name>
            </value>
          </entry>
          <entry>
            <key>ssl.context.service</key>
            <value>
              <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
              <name>ssl.context.service</name>
            </value>
          </entry>
          <entry>
            <key>acks</key>
            <value>
              <name>acks</name>
            </value>
          </entry>
          <entry>
            <key>message-key-field</key>
            <value>
              <name>message-key-field</name>
            </value>
          </entry>
          <entry>
            <key>max.request.size</key>
            <value>
              <name>max.request.size</name>
            </value>
          </entry>
          <entry>
            <key>ack.wait.time</key>
            <value>
              <name>ack.wait.time</name>
            </value>
          </entry>
          <entry>
            <key>max.block.ms</key>
            <value>
              <name>max.block.ms</name>
            </value>
          </entry>
          <entry>
            <key>partitioner.class</key>
            <value>
              <name>partitioner.class</name>
            </value>
          </entry>
          <entry>
            <key>compression.type</key>
            <value>
              <name>compression.type</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>bootstrap.servers</key>
            <value>localhost:9092</value>
          </entry>
          <entry>
            <key>topic</key>
            <value>Movies</value>
          </entry>
          <entry>
            <key>record-reader</key>
            <value>b8531252-2a7c-3648-0000-000000000000</value>
          </entry>
          <entry>
            <key>record-writer</key>
            <value>290d56c9-1e35-3dba-0000-000000000000</value>
          </entry>
          <entry>
            <key>security.protocol</key>
            <value>PLAINTEXT</value>
          </entry>
          <entry>
            <key>sasl.kerberos.service.name</key>
          </entry>
          <entry>
            <key>sasl.kerberos.principal</key>
          </entry>
          <entry>
            <key>sasl.kerberos.keytab</key>
          </entry>
          <entry>
            <key>ssl.context.service</key>
          </entry>
          <entry>
            <key>acks</key>
            <value>0</value>
          </entry>
          <entry>
            <key>message-key-field</key>
          </entry>
          <entry>
            <key>max.request.size</key>
            <value>1 MB</value>
          </entry>
          <entry>
            <key>ack.wait.time</key>
            <value>5 secs</value>
          </entry>
          <entry>
            <key>max.block.ms</key>
            <value>5 sec</value>
          </entry>
          <entry>
            <key>partitioner.class</key>
            <value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
          </entry>
          <entry>
            <key>compression.type</key>
            <value>none</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Publish to "Movies" Topic</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10</type>
    </processors>
    <processors>
      <id>4004c547-2a1a-321b-0000-000000000000</id>
      <parentGroupId>451a635e-0803-3845-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>0.0</y>
      </position>
      <bundle>
        <artifact>nifi-standard-nar</artifact>
        <group>org.apache.nifi</group>
        <version>1.3.0</version>
      </bundle>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>URL</key>
            <value>
              <name>URL</name>
            </value>
          </entry>
          <entry>
            <key>Filename</key>
            <value>
              <name>Filename</name>
            </value>
          </entry>
          <entry>
            <key>SSL Context Service</key>
            <value>
              <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
              <name>SSL Context Service</name>
            </value>
          </entry>
          <entry>
            <key>Username</key>
            <value>
              <name>Username</name>
            </value>
          </entry>
          <entry>
            <key>Password</key>
            <value>
              <name>Password</name>
            </value>
          </entry>
          <entry>
            <key>Connection Timeout</key>
            <value>
              <name>Connection Timeout</name>
            </value>
          </entry>
          <entry>
            <key>Data Timeout</key>
            <value>
              <name>Data Timeout</name>
            </value>
          </entry>
          <entry>
            <key>User Agent</key>
            <value>
              <name>User Agent</name>
            </value>
          </entry>
          <entry>
            <key>Accept Content-Type</key>
            <value>
              <name>Accept Content-Type</name>
            </value>
          </entry>
          <entry>
            <key>Follow Redirects</key>
            <value>
              <name>Follow Redirects</name>
            </value>
          </entry>
          <entry>
            <key>redirect-cookie-policy</key>
            <value>
              <name>redirect-cookie-policy</name>
            </value>
          </entry>
          <entry>
            <key>Proxy Host</key>
            <value>
              <name>Proxy Host</name>
            </value>
          </entry>
          <entry>
            <key>Proxy Port</key>
            <value>
              <name>Proxy Port</name>
            </value>
          </entry>
        </descriptors>
        <executionNode>ALL</executionNode>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>URL</key>
            <value>http://files.grouplens.org/datasets/movielens/ml-20m.zip</value>
          </entry>
          <entry>
            <key>Filename</key>
            <value>ml-20m.zip</value>
          </entry>
          <entry>
            <key>SSL Context Service</key>
          </entry>
          <entry>
            <key>Username</key>
          </entry>
          <entry>
            <key>Password</key>
          </entry>
          <entry>
            <key>Connection Timeout</key>
            <value>30 sec</value>
          </entry>
          <entry>
            <key>Data Timeout</key>
            <value>30 sec</value>
          </entry>
          <entry>
            <key>User Agent</key>
          </entry>
          <entry>
            <key>Accept Content-Type</key>
          </entry>
          <entry>
            <key>Follow Redirects</key>
            <value>false</value>
          </entry>
          <entry>
            <key>redirect-cookie-policy</key>
            <value>default</value>
          </entry>
          <entry>
            <key>Proxy Host</key>
          </entry>
          <entry>
            <key>Proxy Port</key>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>10 mins</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>Get Movie Data</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.GetHTTP</type>
    </processors>
  </snippet>
  <timestamp>09/11/2017 11:33:49 EDT</timestamp>
</template>

请参考以下资料

请查看下列参考资料

您可以参考以下资料

    Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
广告
将在 10 秒后关闭
bannerAds