Статьи

Написать конвейер данных с Apache Falcon

В последних двух постах ( пост 1 , пост 2 ) я представил основные сведения об Apache Falcon. В этом посте я опишу, как мы можем написать базовый конвейер данных Falcon.

Процесс Сокол, который я собираюсь описать, вызывает два условия:

  1.  Время начала процесса (т.е. 15:00 UTC) соблюдается.
  2. И папка триггера создается в папке / tmp / feed-01 / с именем $ {YEAR} — $ {MONTH} — $ {DAY}.

Как только процесс Falcon запущен, он вызывает рабочий процесс Oozie, который вызывает скрипт SSH, который просто печатает два входных параметра в файл /tmp/demo.out на локальной машине FS блока SSH.

Код для кластера Falcon (test-primary-cluster):



<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="test-primary-cluster" description="test-primary-cluster" colo="TEST DEV PRIMARY CLUSTER" xmlns="uri:falcon:cluster:0.1">
    <interfaces>
        <interface type="readonly" endpoint="hftp://localhost:50070" version="2.2.0"/>
        <interface type="write" endpoint="hdfs://localhost:8020" version="2.2.0"/>
        <interface type="execute" endpoint="localhost:8050" version="2.2.0"/>
        <interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0.0"/>
        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.1.6"/>
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/test-primary-cluster/staging"/>
        <location name="temp" path="/tmp"/>
        <location name="working" path="/apps/falcon/test-primary-cluster/working"/>
    </locations>
    <ACL owner="rishav" group="hdpuser" permission="0770"/>
</cluster>

Здесь следует отметить одну важную вещь: вам нужно создавать промежуточные и рабочие каталоги в HDFS с соответствующими разрешениями и правами собственности. Для кластера Hortonworks необходимы следующие разрешения и владелец:

hadoop fs -mkdir -p / apps / falcon / test-primary-cluster / staging /
hadoop fs -chmod 777 / apps / falcon / test-primary-cluster / staging /
hadoop fs -mkdir -p / apps / falcon / test- первичный кластер / рабочий /
hadoop fs -chmod 755 / apps / falcon / test-первичный кластер / рабочий /
hadoop fs -chown -R сокол: hadoop / apps / falcon / test-primary-cluster

Код для подачи Falcon (триггер feed-01):



<?xml version="1.0" encoding="UTF-8"?>
<feed description="feed-01-trigger"
        name="feed-01-trigger" xmlns="uri:falcon:feed:0.1">
        <frequency>days(1)</frequency>
        <late-arrival cut-off="hours(20)" />
        <clusters>
                <cluster name="test-primary-cluster" type="source">
                        <validity start="2015-09-07T14:00Z" end="2099-03-09T12:00Z" />
                        <retention limit="months(9999)" action="archive" />
                        <locations>
                                <location type="data" path="/tmp/feed-01/${YEAR}-${MONTH}-${DAY}" />
                        </locations>
                </cluster>
        </clusters>

        <locations>
                <location type="data" path="/tmp/feed-01/${YEAR}-${MONTH}-${DAY}" />
                <location type="stats" path="/none" />
                <location type="meta" path="/none" />
        </locations>

    <ACL owner="rishav" group="hdpuser" permission="0770"/>
        <schema location="/none" provider="none" />
</feed>

Для этого канала —

  • Срок хранения составляет 9999 месяцев.
  • Предел позднего прибытия установлен на 20 часов.
  • И частота установлена ​​ежедневно.

Код для процесса Falcon (process-01):



<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="process-01"
        xmlns="uri:falcon:process:0.1">
        <clusters>
                <cluster name="test-primary-cluster">
                        <validity start="2015-09-08T15:00Z" end="2099-03-10T23:00Z" />
                </cluster>
        </clusters>
        <parallel>1</parallel>
        <order>FIFO</order>
        <frequency>days(1)</frequency>
        <timezone>UTC</timezone>
        <inputs>
                <input name="feed-01-trigger"
                        end="today(1,0)" start="today(0,0)"
                        feed="feed-01-trigger" />
        </inputs>
        <properties>
                <property name="workflowName" value="workflow-01" />
                <property name="input1" value="variable1" />
                <property name="input2" value="${formatTime(dateOffset(instanceTime(), -1, 'DAY'),'yyyy-MM-dd')}" />
        </properties>
        <workflow name="workflow-01"
                version="2.0.0" engine="oozie"
                path="/tmp/oozie_workflow" />
        <retry policy="periodic" delay="minutes(15)" attempts="2" />
    <ACL owner="rishav" group="hdpuser" permission="0770"/>
</process>

Для этого процесса —

  • Время начала установлено в 15:00 UTC.
  • Зависимость установлена ​​на входной триггер подачи подачи-01.
  • Политика повторных попыток установлена ​​в 2 раза с промежутком в 15 минут.
  • Этот процесс также использует выражение EL для установки переменной input2 для получения вчерашней даты.

Рабочий процесс oozie с действием SSH определен ниже:



<workflow-app name="${workflowName}" xmlns="uri:oozie:workflow:0.1">
 <start to="demo_script"/>

 <action name="demo_script">
        <ssh xmlns="uri:oozie:ssh-action:0.1">
          <host>rishav@poc001</host>
          <command>~/demo.bash</command>
          <args>${input1}</args>
          <args>${input2}</args>
      <capture-output/>
        </ssh>
   <ok to="end"/>
   <error to="kill"/>
 </action>
 <kill name="kill">
   <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
 </kill>
 <end name="end"/>
</workflow-app>

Это рабочий процесс Oozie —

  • Получает переменные input1, input2 и workflowName из процесса Falcon process-01.
  • И вызывает сценарий оболочки на коробке poc001 с input1 и input2 в качестве параметров.

А скрипт demo.bash, вызываемый действием Oozie SSH, приведен ниже:



cd ~
echo `date` >> /tmp/demo.out
echo "input1 $1" >> /tmp/demo.out
echo "input2 $2" >> /tmp/demo.out

demo.bash — это простой скрипт, который выводит текущую дату, переменную input1 и input2 в файл /tmp/demo.out.

В моем следующем посте я объясню, как мы можем отправлять и планировать эти процессы Falcon.