Я хотел создать небольшой конвейер данных с помощью Serverless , основной задачей которого будет запускаться раз в день, вызывать API и загружать эти данные в базу данных.
В основном он используется для получения последних данных из этого API, но я также хотел иметь возможность вызывать его вручную и указывать диапазон дат.
Я создал следующую пару лямбд, которые общаются друг с другом через тему SNS .
Код
serverless.yml
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
service: marks-blogframeworkVersion: ">=1.2.0 <2.0.0"provider: name: aws runtime: python3.6 timeout: 180 iamRoleStatements: - Effect: 'Allow' Action: - "sns:Publish" Resource: - ${self:custom.BlogTopic}custom: BlogTopic: Fn::Join: - ":" - - arn - aws - sns - Ref: AWS::Region - Ref: AWS::AccountId - marks-blog-topicfunctions: message-consumer: name: MessageConsumer handler: handler.consumer events: - sns: topicName: marks-blog-topic displayName: Topic to process events message-producer: name: MessageProducer handler: handler.producer events: - schedule: rate(1 day) |
handler.py
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import boto3import jsonimport datetimefrom datetime import timezone def producer(event, context): sns = boto3.client('sns') context_parts = context.invoked_function_arn.split(':') topic_name = "marks-blog-topic" topic_arn = "arn:aws:sns:{region}:{account_id}:{topic}".format( region=context_parts[3], account_id=context_parts[4], topic=topic_name) now = datetime.datetime.now(timezone.utc) start_date = (now - datetime.timedelta(days=1)).strftime("%Y-%m-%d") end_date = now.strftime("%Y-%m-%d") params = {"startDate": start_date, "endDate": end_date, "tags": ["neo4j"]} sns.publish(TopicArn= topic_arn, Message= json.dumps(params)) def consumer(event, context): for record in event["Records"]: message = json.loads(record["Sns"]["Message"]) start_date = message["startDate"] end_date = message["endDate"] tags = message["tags"] print("start_date: " + start_date) print("end_date: " + end_date) print("tags: " + str(tags)) |
Пробовать
Мы можем смоделировать полученное сообщение локально, выполнив следующую команду:
|
1
2
3
4
5
6
7
8
|
$ serverless invoke local \ --function message-consumer \ --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-29\" }"}}]}' start_date: 2017-09-25end_date: 2017-09-29tags: ['neo4j']null |
Кажется, это работает нормально. А что если мы вызовем производителя сообщений в AWS?
|
1
2
3
|
$ serverless invoke --function message-producer null |
Получил ли потребитель сообщение?
|
1
2
3
4
5
6
7
8
|
$ serverless logs --function message-consumer START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATESTstart_date: 2017-09-29end_date: 2017-09-30tags: ['neo4j']END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65fREPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Duration: 0.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB |
Похоже на то! Мы также можем вызвать потребителя непосредственно в AWS:
|
1
2
3
4
5
|
$ serverless invoke \ --function message-consumer \ --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-26\" }"}}]}' null |
И теперь, если мы проверим логи потребителя, мы увидим оба сообщения:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
$ serverless logs --function message-consumer START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATESTstart_date: 2017-09-29end_date: 2017-09-30tags: ['neo4j']END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65fREPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Duration: 0.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB START RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Version: $LATESTstart_date: 2017-09-25end_date: 2017-09-26tags: ['neo4j']END RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3edREPORT RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Duration: 16.46 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 32 MB |
Успех!
| Опубликовано на Java Code Geeks с разрешения Марка Нидхэма, партнера нашей программы JCG . См. Оригинальную статью здесь: Без сервера: создание мини-конвейера данных производителя / потребителя с AWS SNS
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |