Skip to content

Commit

Permalink
firewall: always use policy 'drop'
Browse files Browse the repository at this point in the history
There is a problem with having separate default action ("policy") and
rules because it isn't possible to set both of them atomically at the
same time.
To solve this problem, always have policy 'drop' (as a safe default),
but by default have a single rule with action 'accept'

Fixes QubesOS/qubes-issues#2869
  • Loading branch information
marmarek committed Jun 26, 2017
1 parent 9198416 commit 842efb5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 27 deletions.
27 changes: 12 additions & 15 deletions qubes/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,30 @@ def __init__(self, vm, load=True):
self.vm = vm
#: firewall rules
self.rules = []
#: default action
self.policy = None

if load:
self.load()

@property
def policy(self):
''' Default action - always 'drop' '''
return Action('drop')

def __eq__(self, other):
if isinstance(other, Firewall):
return self.policy == other.policy and self.rules == other.rules
return self.rules == other.rules
return NotImplemented

def load_defaults(self):
'''Load default firewall settings'''
self.rules = []
self.policy = Action('accept')
self.rules = [Rule(None, action='accept')]

def clone(self, other):
'''Clone firewall settings from other instance.
This method discards pre-existing firewall settings.
:param other: other :py:class:`Firewall` instance
'''
self.policy = other.policy
rules = []
for rule in other.rules:
new_rule = Rule()
Expand Down Expand Up @@ -422,9 +423,9 @@ def load_v1(self, xml_root):
policy_v1 = xml_root.get('policy')
assert policy_v1 in ('allow', 'deny')
if policy_v1 == 'allow':
self.policy = Action('accept')
policy = Action('accept')
else:
self.policy = Action('drop')
policy = Action('drop')

def _translate_action(key):
if xml_root.get(key, policy_v1) == 'allow':
Expand All @@ -439,19 +440,19 @@ def _translate_action(key):
action=_translate_action('icmp'),
proto=Proto.icmp))

if self.policy == Action.accept:
if policy == Action.accept:
rule_action = Action.drop
else:
rule_action = Action.accept

for element in xml_root:
rule = Rule.from_xml_v1(element, rule_action)
self.rules.append(rule)
if policy == Action.accept:
self.rules.append(Rule(None, action='accept'))

def load_v2(self, xml_root):
'''Load new (Qubes >= 4.0) firewall XML format'''
self.policy = Action(xml_root.findtext('policy'))

xml_rules = xml_root.find('rules')
for xml_rule in xml_rules:
rule = Rule(xml_rule)
Expand All @@ -464,10 +465,6 @@ def save(self):

xml_root = lxml.etree.Element('firewall', version=str(2))

xml_policy = lxml.etree.Element('policy')
xml_policy.text = str(self.policy)
xml_root.append(xml_policy)

xml_rules = lxml.etree.Element('rules')
for rule in self.rules:
if rule.expire:
Expand Down
22 changes: 10 additions & 12 deletions qubes/tests/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,17 +463,17 @@ def tearDown(self):
def test_000_defaults(self):
fw = qubes.firewall.Firewall(self.vm, False)
fw.load_defaults()
self.assertEqual(fw.policy, 'accept')
self.assertEqual(fw.rules, [])
self.assertEqual(fw.policy, 'drop')
self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])

def test_001_save_load_empty(self):
fw = qubes.firewall.Firewall(self.vm, True)
self.assertEqual(fw.policy, 'accept')
self.assertEqual(fw.rules, [])
self.assertEqual(fw.policy, 'drop')
self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
fw.save()
fw.load()
self.assertEqual(fw.policy, 'accept')
self.assertEqual(fw.rules, [])
self.assertEqual(fw.policy, 'drop')
self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])

def test_002_save_load_rules(self):
fw = qubes.firewall.Firewall(self.vm, True)
Expand All @@ -485,13 +485,13 @@ def test_002_save_load_rules(self):
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
fw.rules.extend(rules)
fw.policy = qubes.firewall.Action.drop
fw.save()
self.assertTrue(os.path.exists(os.path.join(
self.vm.dir_path, self.vm.firewall_conf)))
fw = qubes.firewall.Firewall(TestVM(), True)
self.assertEqual(fw.policy, qubes.firewall.Action.drop)
self.assertEqual(fw.rules, rules)
self.assertEqual(fw.rules,
[qubes.firewall.Rule(None, action='accept')] + rules)

def test_003_load_v1(self):
xml_txt = """<QubesFirewallRules dns="allow" icmp="allow"
Expand Down Expand Up @@ -524,8 +524,7 @@ def test_004_save_skip_expired(self):
dstports=67, expire=1373300257),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
fw.rules.extend(rules)
fw.policy = qubes.firewall.Action.drop
fw.rules = rules
fw.save()
rules.pop(2)
fw = qubes.firewall.Firewall(self.vm, True)
Expand All @@ -539,8 +538,7 @@ def test_005_qdb_entries(self):
qubes.firewall.Rule(None, action='accept', proto='udp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
fw.rules.extend(rules)
fw.policy = qubes.firewall.Action.drop
fw.rules = rules
expected_qdb_entries = {
'policy': 'drop',
'0000': 'action=drop proto=icmp',
Expand Down

0 comments on commit 842efb5

Please sign in to comment.