forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RuleScheduleExpression.py
111 lines (92 loc) · 4.05 KB
/
RuleScheduleExpression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules import RuleMatch
class RuleScheduleExpression(CloudFormationLintRule):
"""Validate AWS Events Schedule expression format"""
id = 'E3027'
shortdesc = 'Validate AWS Event ScheduleExpression format'
description = 'Validate the formation of the AWS::Event ScheduleExpression'
source_url = 'https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html'
tags = ['resources', 'events']
def initialize(self, cfn):
"""Initialize the rule"""
self.resource_property_types = ['AWS::Events::Rule']
def check_rate(self, value, path):
"""Check Rate configuration"""
matches = []
# Extract the expression from rate(XXX)
rate_expression = value[value.find('(') + 1 : value.find(')')]
if not rate_expression:
matches.append(
RuleMatch(path, 'Rate value of ScheduleExpression cannot be empty')
)
else:
# Rate format: rate(Value Unit)
items = rate_expression.split(' ')
if len(items) != 2:
message = 'Rate expression must contain 2 elements (Value Unit), rate contains {} elements'
matches.append(RuleMatch(path, message.format(len(items))))
else:
# Check the Value
if not items[0].isdigit():
message = 'Rate Value ({}) should be of type Integer.'
extra_args = {
'actual_type': type(items[0]).__name__,
'expected_type': int.__name__,
}
matches.append(
RuleMatch(path, message.format(items[0]), **extra_args)
)
return matches
def check_cron(self, value, path):
"""Check Cron configuration"""
matches = []
# Extract the expression from cron(XXX)
cron_expression = value[value.find('(') + 1 : value.find(')')]
if not cron_expression:
matches.append(
RuleMatch(path, 'Cron value of ScheduleExpression cannot be empty')
)
else:
# Rate format: cron(Minutes Hours Day-of-month Month Day-of-week Year)
items = cron_expression.split(' ')
if len(items) != 6:
message = 'Cron expression must contain 6 elements (Minutes Hours Day-of-month Month Day-of-week Year), cron contains {} elements'
matches.append(RuleMatch(path, message.format(len(items))))
return matches
_, _, day_of_month, _, day_of_week, _ = cron_expression.split(' ')
if day_of_month != '?' and day_of_week != '?':
matches.append(
RuleMatch(
path,
'Don\'t specify the Day-of-month and Day-of-week fields in the same cron expression',
)
)
return matches
def check_value(self, value, path):
"""Count ScheduledExpression value"""
matches = []
# Value is either "cron()" or "rate()"
if value.startswith('rate(') and value.endswith(')'):
matches.extend(self.check_rate(value, path))
elif value.startswith('cron(') and value.endswith(')'):
matches.extend(self.check_cron(value, path))
else:
message = 'Invalid ScheduledExpression specified ({}). Value has to be either cron() or rate()'
matches.append(RuleMatch(path, message.format(value)))
return matches
def match_resource_properties(self, properties, _, path, cfn):
"""Check CloudFormation Properties"""
matches = []
matches.extend(
cfn.check_value(
obj=properties,
key='ScheduleExpression',
path=path[:],
check_value=self.check_value,
)
)
return matches