Статьи

Без сервера: создание мини-конвейера данных производителя / потребителя с AWS SNS

Я хотел создать небольшой конвейер данных с помощью Serverless , основной задачей которого будет запускаться раз в день, вызывать API и загружать эти данные в базу данных.

В основном он используется для получения последних данных из этого API, но я также хотел иметь возможность вызывать его вручную и указывать диапазон дат.

Я создал следующую пару лямбд, которые общаются друг с другом через тему SNS .

Код

serverless.yml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
service: marks-blog
 
frameworkVersion: ">=1.2.0 <2.0.0"
 
provider:
  name: aws
  runtime: python3.6
  timeout: 180
  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - "sns:Publish"
      Resource:
        - ${self:custom.BlogTopic}
 
custom:
  BlogTopic:
    Fn::Join:
      - ":"
      - - arn
        - aws
        - sns
        - Ref: AWS::Region
        - Ref: AWS::AccountId
        - marks-blog-topic
 
functions:
  message-consumer:
    name: MessageConsumer
    handler: handler.consumer
    events:
      - sns:
          topicName: marks-blog-topic
          displayName: Topic to process events
  message-producer:
    name: MessageProducer
    handler: handler.producer
    events:
      - schedule: rate(1 day)

handler.py

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import boto3
import json
import datetime
from datetime import timezone
  
def producer(event, context):
    sns = boto3.client('sns')
  
    context_parts = context.invoked_function_arn.split(':')
    topic_name = "marks-blog-topic"
    topic_arn = "arn:aws:sns:{region}:{account_id}:{topic}".format(
        region=context_parts[3], account_id=context_parts[4], topic=topic_name)
  
    now = datetime.datetime.now(timezone.utc)
    start_date = (now - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
    end_date = now.strftime("%Y-%m-%d")
  
    params = {"startDate": start_date, "endDate": end_date, "tags": ["neo4j"]}
  
    sns.publish(TopicArn= topic_arn, Message= json.dumps(params))
  
  
def consumer(event, context):
    for record in event["Records"]:
        message = json.loads(record["Sns"]["Message"])
  
        start_date = message["startDate"]
        end_date = message["endDate"]
        tags = message["tags"]
  
        print("start_date: " + start_date)
        print("end_date: " + end_date)
        print("tags: " + str(tags))

Пробовать

Мы можем смоделировать полученное сообщение локально, выполнив следующую команду:

1
2
3
4
5
6
7
8
$ serverless invoke local \
    --function message-consumer \
    --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-29\"  }"}}]}'
  
start_date: 2017-09-25
end_date: 2017-09-29
tags: ['neo4j']
null

Кажется, это работает нормально. А что если мы вызовем производителя сообщений в AWS?

1
2
3
$ serverless invoke --function message-producer
  
null

Получил ли потребитель сообщение?

1
2
3
4
5
6
7
8
$ serverless logs --function message-consumer
  
START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST
start_date: 2017-09-29
end_date: 2017-09-30
tags: ['neo4j']
END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f
REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f  Duration: 0.46 ms   Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 32 MB

Похоже на то! Мы также можем вызвать потребителя непосредственно в AWS:

1
2
3
4
5
$ serverless invoke \
    --function message-consumer \
    --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-26\"  }"}}]}'
  
null

И теперь, если мы проверим логи потребителя, мы увидим оба сообщения:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
$ serverless logs --function message-consumer
  
START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST
start_date: 2017-09-29
end_date: 2017-09-30
tags: ['neo4j']
END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f
REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f  Duration: 0.46 ms   Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 32 MB 
  
START RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Version: $LATEST
start_date: 2017-09-25
end_date: 2017-09-26
tags: ['neo4j']
END RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed
REPORT RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed  Duration: 16.46 ms  Billed Duration: 100 ms     Memory Size: 1024 MB    Max Memory Used: 32 MB

Успех!

Опубликовано на Java Code Geeks с разрешения Марка Нидхэма, партнера нашей программы JCG . См. Оригинальную статью здесь: Без сервера: создание мини-конвейера данных производителя / потребителя с AWS SNS

Мнения, высказанные участниками Java Code Geeks, являются их собственными.