Я хотел создать небольшой конвейер данных с помощью Serverless , основной задачей которого будет запускаться раз в день, вызывать API и загружать эти данные в базу данных.
В основном он используется для получения последних данных из этого API, но я также хотел иметь возможность вызывать его вручную и указывать диапазон дат.
Я создал следующую пару лямбд, которые общаются друг с другом через тему SNS .
Код
serverless.yml
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
service: marks-blog frameworkVersion: ">=1.2.0 <2.0.0" provider: name: aws runtime: python3. 6 timeout: 180 iamRoleStatements: - Effect: 'Allow' Action: - "sns:Publish" Resource: - ${self:custom.BlogTopic} custom: BlogTopic: Fn::Join: - ":" - - arn - aws - sns - Ref: AWS::Region - Ref: AWS::AccountId - marks-blog-topic functions: message-consumer: name: MessageConsumer handler: handler.consumer events: - sns: topicName: marks-blog-topic displayName: Topic to process events message-producer: name: MessageProducer handler: handler.producer events: - schedule: rate( 1 day) |
handler.py
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import boto3 import json import datetime from datetime import timezone def producer(event, context): sns = boto3.client( 'sns' ) context_parts = context.invoked_function_arn.split( ':' ) topic_name = "marks-blog-topic" topic_arn = "arn:aws:sns:{region}:{account_id}:{topic}" .format( region=context_parts[ 3 ], account_id=context_parts[ 4 ], topic=topic_name) now = datetime.datetime.now(timezone.utc) start_date = (now - datetime.timedelta(days= 1 )).strftime( "%Y-%m-%d" ) end_date = now.strftime( "%Y-%m-%d" ) params = { "startDate" : start_date, "endDate" : end_date, "tags" : [ "neo4j" ]} sns.publish(TopicArn= topic_arn, Message= json.dumps(params)) def consumer(event, context): for record in event[ "Records" ]: message = json.loads(record[ "Sns" ][ "Message" ]) start_date = message[ "startDate" ] end_date = message[ "endDate" ] tags = message[ "tags" ] print( "start_date: " + start_date) print( "end_date: " + end_date) print( "tags: " + str(tags)) |
Пробовать
Мы можем смоделировать полученное сообщение локально, выполнив следующую команду:
1
2
3
4
5
6
7
8
|
$ serverless invoke local \ -- function message-consumer \ --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-29\" }"}}]}' start_date: 2017-09-25 end_date: 2017-09-29 tags: [ 'neo4j' ] null |
Кажется, это работает нормально. А что если мы вызовем производителя сообщений в AWS?
1
2
3
|
$ serverless invoke -- function message-producer null |
Получил ли потребитель сообщение?
1
2
3
4
5
6
7
8
|
$ serverless logs -- function message-consumer START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST start_date: 2017-09-29 end_date: 2017-09-30 tags: [ 'neo4j' ] END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Duration: 0.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB |
Похоже на то! Мы также можем вызвать потребителя непосредственно в AWS:
1
2
3
4
5
|
$ serverless invoke \ -- function message-consumer \ --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-26\" }"}}]}' null |
И теперь, если мы проверим логи потребителя, мы увидим оба сообщения:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
$ serverless logs -- function message-consumer START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST start_date: 2017-09-29 end_date: 2017-09-30 tags: [ 'neo4j' ] END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Duration: 0.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB START RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Version: $LATEST start_date: 2017-09-25 end_date: 2017-09-26 tags: [ 'neo4j' ] END RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed REPORT RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Duration: 16.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB |
Успех!
Опубликовано на Java Code Geeks с разрешения Марка Нидхэма, партнера нашей программы JCG . См. Оригинальную статью здесь: Без сервера: создание мини-конвейера данных производителя / потребителя с AWS SNS
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |