В последних двух постах ( пост 1 , пост 2 ) я представил основные сведения об Apache Falcon. В этом посте я опишу, как мы можем написать базовый конвейер данных Falcon.
Процесс Сокол, который я собираюсь описать, вызывает два условия:
- Время начала процесса (т.е. 15:00 UTC) соблюдается.
- И папка триггера создается в папке / tmp / feed-01 / с именем $ {YEAR} — $ {MONTH} — $ {DAY}.
Как только процесс Falcon запущен, он вызывает рабочий процесс Oozie, который вызывает скрипт SSH, который просто печатает два входных параметра в файл /tmp/demo.out на локальной машине FS блока SSH.
Код для кластера Falcon (test-primary-cluster):
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="test-primary-cluster" description="test-primary-cluster" colo="TEST DEV PRIMARY CLUSTER" xmlns="uri:falcon:cluster:0.1">
<interfaces>
<interface type="readonly" endpoint="hftp://localhost:50070" version="2.2.0"/>
<interface type="write" endpoint="hdfs://localhost:8020" version="2.2.0"/>
<interface type="execute" endpoint="localhost:8050" version="2.2.0"/>
<interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0.0"/>
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.1.6"/>
</interfaces>
<locations>
<location name="staging" path="/apps/falcon/test-primary-cluster/staging"/>
<location name="temp" path="/tmp"/>
<location name="working" path="/apps/falcon/test-primary-cluster/working"/>
</locations>
<ACL owner="rishav" group="hdpuser" permission="0770"/>
</cluster>
Здесь следует отметить одну важную вещь: вам нужно создавать промежуточные и рабочие каталоги в HDFS с соответствующими разрешениями и правами собственности. Для кластера Hortonworks необходимы следующие разрешения и владелец:
hadoop fs -mkdir -p / apps / falcon / test-primary-cluster / staging /
hadoop fs -chmod 777 / apps / falcon / test-primary-cluster / staging /
hadoop fs -mkdir -p / apps / falcon / test- первичный кластер / рабочий /
hadoop fs -chmod 755 / apps / falcon / test-первичный кластер / рабочий /
hadoop fs -chown -R сокол: hadoop / apps / falcon / test-primary-cluster
Код для подачи Falcon (триггер feed-01):
<?xml version="1.0" encoding="UTF-8"?>
<feed description="feed-01-trigger"
name="feed-01-trigger" xmlns="uri:falcon:feed:0.1">
<frequency>days(1)</frequency>
<late-arrival cut-off="hours(20)" />
<clusters>
<cluster name="test-primary-cluster" type="source">
<validity start="2015-09-07T14:00Z" end="2099-03-09T12:00Z" />
<retention limit="months(9999)" action="archive" />
<locations>
<location type="data" path="/tmp/feed-01/${YEAR}-${MONTH}-${DAY}" />
</locations>
</cluster>
</clusters>
<locations>
<location type="data" path="/tmp/feed-01/${YEAR}-${MONTH}-${DAY}" />
<location type="stats" path="/none" />
<location type="meta" path="/none" />
</locations>
<ACL owner="rishav" group="hdpuser" permission="0770"/>
<schema location="/none" provider="none" />
</feed>
Для этого канала —
- Срок хранения составляет 9999 месяцев.
- Предел позднего прибытия установлен на 20 часов.
- И частота установлена ежедневно.
Код для процесса Falcon (process-01):
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="process-01"
xmlns="uri:falcon:process:0.1">
<clusters>
<cluster name="test-primary-cluster">
<validity start="2015-09-08T15:00Z" end="2099-03-10T23:00Z" />
</cluster>
</clusters>
<parallel>1</parallel>
<order>FIFO</order>
<frequency>days(1)</frequency>
<timezone>UTC</timezone>
<inputs>
<input name="feed-01-trigger"
end="today(1,0)" start="today(0,0)"
feed="feed-01-trigger" />
</inputs>
<properties>
<property name="workflowName" value="workflow-01" />
<property name="input1" value="variable1" />
<property name="input2" value="${formatTime(dateOffset(instanceTime(), -1, 'DAY'),'yyyy-MM-dd')}" />
</properties>
<workflow name="workflow-01"
version="2.0.0" engine="oozie"
path="/tmp/oozie_workflow" />
<retry policy="periodic" delay="minutes(15)" attempts="2" />
<ACL owner="rishav" group="hdpuser" permission="0770"/>
</process>
Для этого процесса —
- Время начала установлено в 15:00 UTC.
- Зависимость установлена на входной триггер подачи подачи-01.
- Политика повторных попыток установлена в 2 раза с промежутком в 15 минут.
- Этот процесс также использует выражение EL для установки переменной input2 для получения вчерашней даты.
Рабочий процесс oozie с действием SSH определен ниже:
<workflow-app name="${workflowName}" xmlns="uri:oozie:workflow:0.1">
<start to="demo_script"/>
<action name="demo_script">
<ssh xmlns="uri:oozie:ssh-action:0.1">
<host>rishav@poc001</host>
<command>~/demo.bash</command>
<args>${input1}</args>
<args>${input2}</args>
<capture-output/>
</ssh>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
Это рабочий процесс Oozie —
- Получает переменные input1, input2 и workflowName из процесса Falcon process-01.
- И вызывает сценарий оболочки на коробке poc001 с input1 и input2 в качестве параметров.
А скрипт demo.bash, вызываемый действием Oozie SSH, приведен ниже:
cd ~
echo `date` >> /tmp/demo.out
echo "input1 $1" >> /tmp/demo.out
echo "input2 $2" >> /tmp/demo.out
demo.bash — это простой скрипт, который выводит текущую дату, переменную input1 и input2 в файл /tmp/demo.out.
В моем следующем посте я объясню, как мы можем отправлять и планировать эти процессы Falcon.