-
Notifications
You must be signed in to change notification settings - Fork 2
/
stack.py
67 lines (56 loc) · 1.94 KB
/
stack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from typing import Union
from aws_cdk import core
from aws_cdk.aws_iam import PolicyStatement
from aws_cdk.aws_lambda import Function, Code, Runtime
from aws_cdk.aws_events import EventBus, Rule, EventPattern, IRuleTarget
from aws_cdk.aws_events_targets import LambdaFunction
class Stack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.event_bus = EventBus(
scope=self,
id='CustomEventBus',
event_bus_name='CustomEventBus'
)
self.source = Function(
scope=self,
id=f'SourceFunction',
function_name=f'SourceFunction',
code=Code.from_asset(path='./code_source/'),
handler='index.handler',
runtime=Runtime.PYTHON_3_6,
)
self.source.add_to_role_policy(statement=PolicyStatement(
actions=['events:PutEvents'],
resources=[self.event_bus.event_bus_arn]
))
"""
Define rule.
"""
self.rule = Rule(
scope=self,
id='EventBusRule',
description='Sample description.',
enabled=True,
event_bus=self.event_bus,
event_pattern=EventPattern(
detail={
'Domain': ["MedInfo"],
'Reason': ["InvokeTarget"]
}
),
rule_name='EventBusRule',
)
"""
Add target.
"""
self.target = Function(
scope=self,
id=f'TargetFunction',
function_name=f'TargetFunction',
code=Code.from_asset(path='./code_target/'),
handler='index.handler',
runtime=Runtime.PYTHON_3_6,
)
self.target: Union[IRuleTarget, LambdaFunction] = LambdaFunction(handler=self.target)
self.rule.add_target(target=self.target)