diff --git a/changelogs/fragments/T6817_T6825_ovr_rep.yml b/changelogs/fragments/T6817_T6825_ovr_rep.yml
new file mode 100644
index 00000000..ab4d70c8
--- /dev/null
+++ b/changelogs/fragments/T6817_T6825_ovr_rep.yml
@@ -0,0 +1,6 @@
+---
+minor_changes:
+ - fixed behavior for override and replaced states
+ - added support for packet-length-exclude for 1.4+ and the states
+ - fixed behavior for log, disable attributes
+ - added a separate test suite test_vyos_firewall_rules14.py
diff --git a/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py b/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py
index 106b2b8b..68ceff80 100644
--- a/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py
+++ b/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py
@@ -1,1077 +1,1167 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The vyos_firewall_rules class
It is in this file where the current configuration (as dict)
is compared to the provided configuration (as dict) and the command set
necessary to bring the current configuration to it's desired end-state is
created
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from copy import deepcopy
from ansible.module_utils.six import iteritems
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import (
ConfigBase,
)
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
remove_empties,
to_list,
)
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.facts import Facts
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.utils import (
list_diff_want_only,
)
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.vyos import get_os_version
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.version import LooseVersion
class Firewall_rules(ConfigBase):
"""
The vyos_firewall_rules class
"""
gather_subset = [
"!all",
"!min",
]
gather_network_resources = [
"firewall_rules",
]
def __init__(self, module):
super(Firewall_rules, self).__init__(module)
def get_firewall_rules_facts(self, data=None):
"""Get the 'facts' (the current configuration)
:rtype: A dictionary
:returns: The current configuration as a dictionary
"""
facts, _warnings = Facts(self._module).get_facts(
self.gather_subset,
self.gather_network_resources,
data=data,
)
firewall_rules_facts = facts["ansible_network_resources"].get("firewall_rules")
if not firewall_rules_facts:
return []
return firewall_rules_facts
def execute_module(self):
"""Execute the module
:rtype: A dictionary
:returns: The result from module execution
"""
result = {"changed": False}
warnings = list()
commands = list()
if self.state in self.ACTION_STATES:
existing_firewall_rules_facts = self.get_firewall_rules_facts()
else:
existing_firewall_rules_facts = []
if self.state in self.ACTION_STATES or self.state == "rendered":
- commands.extend(self.set_config(existing_firewall_rules_facts))
+ commands.extend(self.set_config(deepcopy(existing_firewall_rules_facts)))
if commands and self.state in self.ACTION_STATES:
if not self._module.check_mode:
self._connection.edit_config(commands)
result["changed"] = True
if self.state in self.ACTION_STATES:
result["commands"] = commands
if self.state in self.ACTION_STATES or self.state == "gathered":
changed_firewall_rules_facts = self.get_firewall_rules_facts()
elif self.state == "rendered":
result["rendered"] = commands
elif self.state == "parsed":
running_config = self._module.params["running_config"]
if not running_config:
self._module.fail_json(
msg="value of running_config parameter must not be empty for state parsed",
)
result["parsed"] = self.get_firewall_rules_facts(data=running_config)
else:
changed_firewall_rules_facts = []
if self.state in self.ACTION_STATES:
result["before"] = existing_firewall_rules_facts
if result["changed"]:
result["after"] = changed_firewall_rules_facts
elif self.state == "gathered":
result["gathered"] = changed_firewall_rules_facts
result["warnings"] = warnings
return result
def set_config(self, existing_firewall_rules_facts):
"""Collect the configuration from the args passed to the module,
collect the current configuration (as a dict from facts)
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
want = self._module.params["config"]
+ self._prune_stubs(want)
have = existing_firewall_rules_facts
resp = self.set_state(want, have)
return to_list(resp)
def set_state(self, w, h):
"""Select the appropriate function based on the state provided
:param want: the desired configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if self.state in ("merged", "replaced", "overridden", "rendered") and not w:
self._module.fail_json(
msg="value of config parameter must not be empty for state {0}".format(self.state),
)
if self.state == "overridden":
commands.extend(self._state_overridden(w, h))
elif self.state == "deleted":
commands.extend(self._state_deleted(w, h))
elif w:
if self.state == "merged" or self.state == "rendered":
commands.extend(self._state_merged(w, h))
elif self.state == "replaced":
commands.extend(self._state_replaced(w, h))
return commands
def _state_replaced(self, want, have):
"""The command generator when state is replaced
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if have:
# Iterate over the afi rule sets we already have.
for h in have:
r_sets = self._get_r_sets(h)
# Iterate over each rule set we already have.
for rs in r_sets:
# In the desired configuration, search for the rule set we
# already have (to be replaced by our desired
# configuration's rule set).
rs_id = self._rs_id(rs, h["afi"])
wanted_rule_set = self.search_r_sets_in_have(want, rs_id, "r_list")
+ if self._is_same_rs(remove_empties(wanted_rule_set), remove_empties(rs)):
+ continue
if wanted_rule_set is not None:
# Remove the rules that we already have if the wanted
# rules exist under the same name.
commands.extend(
self._add_r_sets(
h["afi"],
want=rs,
have=wanted_rule_set,
opr=False,
),
)
# Merge the desired configuration into what we already have.
commands.extend(self._state_merged(want, have))
return commands
def _state_overridden(self, want, have):
"""The command generator when state is overridden
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if have:
for h in have:
have_r_sets = self._get_r_sets(h)
for rs in have_r_sets:
rs_id = self._rs_id(rs, h["afi"])
w = self.search_r_sets_in_have(want, rs_id, "r_list")
- if not w:
- commands.append(self._compute_command(rs_id, remove=True))
+ if self._is_same_rs(remove_empties(w), remove_empties(rs)):
+ continue
else:
- commands.extend(self._add_r_sets(h["afi"], rs, w, opr=False))
- commands.extend(self._state_merged(want, have))
+ commands.append(self._compute_command(rs_id, remove=True))
+ # Blank out the only rule set that it is removed.
+ for entry in have:
+ if entry['afi'] == rs_id['afi'] and rs_id['name']:
+ entry["rule_sets"] = [
+ rule_set for rule_set in entry["rule_sets"] if rule_set.get("name") != rs_id['name']
+ ]
+ elif entry['afi'] == rs_id['afi'] and rs_id['filter']:
+ entry["rule_sets"] = [
+ rule_set for rule_set in entry["rule_sets"] if rule_set.get("filter") != rs_id['filter']
+ ]
+ commands.extend(self._state_merged(want, have))
return commands
def _state_merged(self, want, have):
"""The command generator when state is merged
:rtype: A list
:returns: the commands necessary to merge the provided into
the current configuration
"""
commands = []
for w in want:
r_sets = self._get_r_sets(w)
for rs in r_sets:
rs_id = self._rs_id(rs, w["afi"])
h = self.search_r_sets_in_have(have, rs_id, "r_list")
- commands.extend(self._add_r_sets(w["afi"], rs, h))
+ if self._is_same_rs(remove_empties(h), remove_empties(rs)):
+ continue
+ else:
+ commands.extend(self._add_r_sets(w["afi"], rs, h))
return commands
def _state_deleted(self, want, have):
"""The command generator when state is deleted
:rtype: A list
:returns: the commands necessary to remove the current configuration
of the provided objects
"""
commands = []
if want:
for w in want:
r_sets = self._get_r_sets(w)
if r_sets:
for rs in r_sets:
rs_id = self._rs_id(rs, w["afi"])
h = self.search_r_sets_in_have(have, rs_id, "r_list")
if h:
commands.append(self._compute_command(rs_id, remove=True))
elif have:
for h in have:
if h["afi"] == w["afi"]:
commands.append(
self._compute_command(self._rs_id(None, w["afi"]), remove=True)
)
elif have:
for h in have:
r_sets = self._get_r_sets(h)
if r_sets:
commands.append(self._compute_command(self._rs_id(None, h["afi"]), remove=True))
return commands
def _add_r_sets(self, afi, want, have, opr=True):
"""
This function forms the set/delete commands based on the 'opr' type
for rule-sets attributes.
:param afi: address type.
:param want: desired config.
:param have: target config.
:param opr: True/False.
:return: generated commands list.
"""
commands = []
l_set = ("description", "default_action", "default_jump_target", "enable_default_log")
h_rs = {}
h_rules = {}
w_rs = deepcopy(remove_empties(want))
w_rules = w_rs.pop("rules", None)
rs_id = self._rs_id(want, afi=afi)
if have:
h_rs = deepcopy(remove_empties(have))
h_rules = h_rs.pop("rules", None)
if w_rs:
for key, val in iteritems(w_rs):
if opr and key in l_set and not (h_rs and self._is_w_same(w_rs, h_rs, key)):
if key == "enable_default_log":
if val and (not h_rs or key not in h_rs or not h_rs[key]):
commands.append(self._add_rs_base_attrib(rs_id, key, w_rs))
else:
commands.append(self._add_rs_base_attrib(rs_id, key, w_rs))
elif not opr and key in l_set:
if (
key == "enable_default_log"
and val
and h_rs
and (key not in h_rs or not h_rs[key])
):
commands.append(self._add_rs_base_attrib(rs_id, key, w_rs, opr))
elif not (h_rs and self._in_target(h_rs, key)):
commands.append(self._add_rs_base_attrib(rs_id, key, w_rs, opr))
commands.extend(self._add_rules(rs_id, w_rules, h_rules, opr))
if h_rules:
have["rules"] = h_rules
if w_rules:
want["rules"] = w_rules
return commands
def _add_rules(self, rs_id, w_rules, h_rules, opr=True):
"""
This function forms the set/delete commands based on the 'opr' type
for rules attributes.
:param rs_id: rule-set identifier.
:param w_rules: desired config.
:param h_rules: target config.
:param opr: True/False.
:return: generated commands list.
"""
commands = []
l_set = (
"ipsec",
"action",
"number",
"protocol",
"fragment",
"disable",
"description",
- "log",
"jump_target",
)
if w_rules:
for w in w_rules:
cmd = self._compute_command(rs_id, w["number"], opr=opr)
h = self.search_rules_in_have_rs(h_rules, w["number"])
+ if w != h and self.state == "replaced":
+ h = {}
for key, val in iteritems(w):
if val:
if opr and key in l_set and not (h and self._is_w_same(w, h, key)):
if key == "disable":
if not (not val and (not h or key not in h or not h[key])):
commands.append(self._add_r_base_attrib(rs_id, key, w))
else:
commands.append(self._add_r_base_attrib(rs_id, key, w))
elif not opr:
# Note: if you are experiencing sticky configuration on replace
# you may need to add an explicit check for the key here. Anything that
# doesn't have a custom operation is taken care of by the `l_set` check
# below, but I'm not sure how any of the others work.
# It's possible that historically the delete was forced (but now it's
# checked).
if key == "number" and self._is_del(l_set, h):
commands.append(self._add_r_base_attrib(rs_id, key, w, opr=opr))
continue
if (
key == "tcp"
and val
and h
and (key not in h or not h[key] or h[key] != w[key])
):
commands.extend(self._add_tcp(key, w, h, cmd, opr))
if (
key == "state"
and val
and h
and (key not in h or not h[key] or h[key] != w[key])
):
commands.extend(self._add_state(key, w, h, cmd, opr))
if (
key == "icmp"
and val
and h
and (key not in h or not h[key] or h[key] != w[key])
):
commands.extend(self._add_icmp(key, w, h, cmd, opr))
if (
key in ("packet_length", "packet_length_exclude")
and val
and h
and (key not in h or not h[key] or h[key] != w[key])
):
commands.extend(self._add_packet_length(key, w, h, cmd, opr))
elif key == "disable" and val and h and (key not in h or not h[key]):
commands.append(self._add_r_base_attrib(rs_id, key, w, opr=opr))
- elif key in ("inbound_interface", "outbound_interface") and not (
- h and self._is_w_same(w, h, key)
+ if (
+ key in ("inbound_interface", "outbound_interface")
+ and val
+ and h
+ and (key not in h or not h[key] or h[key] != w[key])
):
commands.extend(self._add_interface(key, w, h, cmd, opr))
elif (
key in l_set
and not (h and self._in_target(h, key))
and not self._is_del(l_set, h)
):
commands.append(self._add_r_base_attrib(rs_id, key, w, opr=opr))
elif key == "p2p":
commands.extend(self._add_p2p(key, w, h, cmd, opr))
elif key == "tcp":
commands.extend(self._add_tcp(key, w, h, cmd, opr))
elif key == "time":
commands.extend(self._add_time(key, w, h, cmd, opr))
elif key == "icmp":
commands.extend(self._add_icmp(key, w, h, cmd, opr))
elif key == "state":
commands.extend(self._add_state(key, w, h, cmd, opr))
+ elif key == "log":
+ commands.extend(self._add_log(key, w, h, cmd, opr))
elif key == "limit":
commands.extend(self._add_limit(key, w, h, cmd, opr))
elif key == "recent":
commands.extend(self._add_recent(key, w, h, cmd, opr))
elif key == "destination" or key == "source":
commands.extend(self._add_src_or_dest(key, w, h, cmd, opr))
elif key in ("packet_length", "packet_length_exclude"):
commands.extend(self._add_packet_length(key, w, h, cmd, opr))
elif key in ("inbound_interface", "outbound_interface"):
commands.extend(self._add_interface(key, w, h, cmd, opr))
return commands
def _add_p2p(self, attr, w, h, cmd, opr):
"""
This function forms the set/delete commands based on the 'opr' type
for p2p applications attributes.
:param want: desired config.
:param have: target config.
:return: generated commands list.
"""
commands = []
have = []
if w:
want = w.get(attr) or []
if h:
have = h.get(attr) or []
if want:
if opr:
applications = list_diff_want_only(want, have)
for app in applications:
commands.append(cmd + (" " + attr + " " + app["application"]))
elif not opr and have:
applications = list_diff_want_only(want, have)
for app in applications:
commands.append(cmd + (" " + attr + " " + app["application"]))
return commands
def _add_state(self, attr, w, h, cmd, opr):
"""
This function forms the command for 'state' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
h_state = {}
commands = []
l_set = ("new", "invalid", "related", "established")
if w[attr]:
if h and attr in h.keys():
h_state = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if (
opr
and item in l_set
and not (h_state and self._is_w_same(w[attr], h_state, item))
):
if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
commands.append(cmd + (" " + attr + " " + item))
else:
commands.append(cmd + (" " + attr + " " + item + " " + self._bool_to_str(val)))
elif not opr and item in l_set and not self._in_target(h_state, item):
commands.append(cmd + (" " + attr + " " + item))
return commands
+ def _add_log(self, attr, w, h, cmd, opr):
+ """
+ This function forms the command for 'log' attributes based on the 'opr'.
+ :param attr: attribute name.
+ :param w: base config.
+ :param h: target config.
+ :param cmd: commands to be prepend.
+ :return: generated list of commands.
+ """
+ h_state = {}
+ commands = []
+ if w[attr]:
+ if h and attr in h.keys():
+ h_state = h.get(attr) or {}
+
+ if (
+ LooseVersion(get_os_version(self._module)) < LooseVersion("1.4")
+ and opr
+ and not (h and self._is_w_same(w, h, attr))
+ ):
+ commands.append(cmd + " " + attr + " '" + w[attr] + "'")
+ elif (
+ LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4")
+ and opr
+ and not (h and self._is_w_same(w, h, attr))
+ ):
+ commands.append(cmd + " " + attr)
+ elif not opr and not self._in_target(h_state, w[attr]):
+ commands.append(cmd + (" " + attr + " '" + w[attr] + "'"))
+
+ return commands
+
def _add_recent(self, attr, w, h, cmd, opr):
"""
This function forms the command for 'recent' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_recent = {}
l_set = ("count", "time")
if w[attr]:
if h and attr in h.keys():
h_recent = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if (
opr
and item in l_set
and not (h_recent and self._is_w_same(w[attr], h_recent, item))
):
commands.append(cmd + (" " + attr + " " + item + " " + str(val)))
elif (
not opr and item in l_set and not (h_recent and self._in_target(h_recent, item))
):
commands.append(cmd + (" " + attr + " " + item))
return commands
def _add_icmp(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'icmp' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_icmp = {}
l_set = ("code", "type", "type_name")
if w[attr]:
if h and attr in h.keys():
h_icmp = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if (
opr
and item in l_set
and not (h_icmp and self._is_w_same(w[attr], h_icmp, item))
):
if item == "type_name":
- if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.3"):
+ if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
param_name = "type-name"
else:
param_name = "type"
if "ipv6" in cmd: # ipv6-name or ipv6
commands.append(cmd + (" " + "icmpv6" + " " + param_name + " " + val))
else:
commands.append(
cmd + (" " + attr + " " + item.replace("_", "-") + " " + val),
)
else:
if "ipv6" in cmd: # ipv6-name or ipv6
commands.append(cmd + (" " + "icmpv6" + " " + item + " " + str(val)))
else:
commands.append(cmd + (" " + attr + " " + item + " " + str(val)))
elif not opr and item in l_set and not self._in_target(h_icmp, item):
commands.append(cmd + (" " + attr + " " + item.replace("_", "-") + " " + str(val)))
return commands
def _add_interface(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'interface' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_if = {}
l_set = ("name", "group")
if w[attr]:
if h and attr in h.keys():
h_if = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if opr and item in l_set and not (h_if and self._is_w_same(w[attr], h_if, item)):
commands.append(
cmd
+ (" " + attr.replace("_", "-") + " " + item.replace("_", "-") + " " + val)
)
elif not opr and item in l_set and not (h_if and self._in_target(h_if, item)):
commands.append(
cmd + (" " + attr.replace("_", "-") + " " + item.replace("_", "-"))
)
return commands
def _add_time(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'time' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_time = {}
l_set = (
"utc",
"stopdate",
"stoptime",
"weekdays",
"monthdays",
"startdate",
"starttime",
)
if w[attr]:
if h and attr in h.keys():
h_time = h.get(attr) or {}
for item, val in iteritems(w[attr]):
if (
opr
and item in l_set
and not (h_time and self._is_w_same(w[attr], h_time, item))
):
if item == "utc":
if not (not val and (not h_time or item not in h_time)):
commands.append(cmd + (" " + attr + " " + item))
else:
commands.append(cmd + (" " + attr + " " + item + " " + val))
elif (
not opr
and item in l_set
and not (h_time and self._is_w_same(w[attr], h_time, item))
):
commands.append(cmd + (" " + attr + " " + item))
return commands
def _add_tcp_1_4(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'tcp' attributes based on the 'opr'.
Version 1.4+
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
have = []
key = "flags"
want = []
if w:
if w.get(attr):
want = w.get(attr).get(key) or []
if h:
if h.get(attr):
have = h.get(attr).get(key) or []
if want:
if opr:
flags = list_diff_want_only(want, have)
for flag in flags:
invert = flag.get("invert", False)
commands.append(
cmd + (" " + attr + " flags " + ("not " if invert else "") + flag["flag"])
)
elif not opr:
flags = list_diff_want_only(want, have)
for flag in flags:
invert = flag.get("invert", False)
commands.append(
cmd + (" " + attr + " flags " + ("not " if invert else "") + flag["flag"])
)
return commands
def _add_packet_length(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'packet_length[_exclude]' attributes based on the 'opr'.
If < 1.4, handle tcp attributes.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
have = []
want = []
if w:
if w.get(attr):
want = w.get(attr) or []
if h:
if h.get(attr):
have = h.get(attr) or []
attr = attr.replace("_", "-")
if want:
if opr:
lengths = list_diff_want_only(want, have)
for l_rec in lengths:
commands.append(cmd + " " + attr + " " + str(l_rec["length"]))
elif not opr:
lengths = list_diff_want_only(want, have)
for l_rec in lengths:
commands.append(cmd + " " + attr + " " + str(l_rec["length"]))
return commands
def _tcp_flags_string(self, flags):
"""
This function forms the tcp flags string.
:param flags: flags list.
:return: flags string or None.
"""
if not flags:
return ""
flag_str = ""
for flag in flags:
this_flag = flag["flag"].upper()
if flag.get("invert", False):
this_flag = "!" + this_flag
if len(flag_str) > 0:
flag_str = ",".join([flag_str, this_flag])
else:
flag_str = this_flag
return flag_str
def _add_tcp(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'tcp' attributes based on the 'opr'.
If < 1.4, handle tcp attributes.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
return self._add_tcp_1_4(attr, w, h, cmd, opr)
h_tcp = {}
commands = []
if w[attr]:
key = "flags"
flags = w[attr].get(key) or {}
if flags:
if h and key in h[attr].keys():
h_tcp = h[attr].get(key) or {}
if flags:
flag_str = self._tcp_flags_string(flags)
if opr and not (h_tcp and flags == h_tcp):
commands.append(cmd + (" " + attr + " " + "flags" + " " + flag_str))
if not opr and not (h_tcp and flags == h_tcp):
commands.append(cmd + (" " + attr + " " + "flags" + " " + flag_str))
return commands
def _add_limit(self, attr, w, h, cmd, opr):
"""
This function forms the commands for 'limit' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
h_limit = {}
commands = []
if w[attr]:
key = "burst"
if (
opr
and key in w[attr].keys()
and not (h and attr in h.keys() and self._is_w_same(w[attr], h[attr], key))
):
commands.append(cmd + (" " + attr + " " + key + " " + str(w[attr].get(key))))
elif (
not opr
and key in w[attr].keys()
and not (h and attr in h.keys() and self._in_target(h[attr], key))
):
commands.append(cmd + (" " + attr + " " + key + " " + str(w[attr].get(key))))
key = "rate"
rate = w[attr].get(key) or {}
if rate:
if h and key in h[attr].keys():
h_limit = h[attr].get(key) or {}
if "unit" in rate and "number" in rate:
if opr and not (
h_limit
and self._is_w_same(rate, h_limit, "unit")
and self.is_w_same(rate, h_limit, "number")
):
commands.append(
cmd
+ (
" "
+ attr
+ " "
+ key
+ " "
+ str(rate["number"])
+ "/"
+ rate["unit"]
),
)
if not opr and not (
h_limit
and self._is_w_same(rate, h_limit, "unit")
and self._is_w_same(rate, h_limit, "number")
):
commands.append(cmd + (" " + attr + " " + key))
return commands
def _add_src_or_dest(self, attr, w, h, cmd, opr=True):
"""
This function forms the commands for 'src/dest' attributes based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param cmd: commands to be prepend.
:return: generated list of commands.
"""
commands = []
h_group = {}
g_set = ("port_group", "address_group", "network_group")
if w[attr]:
keys = ("address", "mac_address", "port")
for key in keys:
if (
opr
and key in w[attr].keys()
and not (h and attr in h.keys() and self._is_w_same(w[attr], h[attr], key))
):
commands.append(
cmd + (" " + attr + " " + key.replace("_", "-") + " " + w[attr].get(key)),
)
elif (
not opr
and key in w[attr].keys()
and not (h and attr in h.keys() and self._in_target(h[attr], key))
):
commands.append(cmd + (" " + attr + " " + key))
key = "group"
group = w[attr].get(key) or {}
if group:
h_group = {}
if h and h.get(attr) and key in h[attr].keys():
h_group = h[attr].get(key)
for item, val in iteritems(group):
if val:
if (
opr
and item in g_set
and not (h_group and self._is_w_same(group, h_group, item))
):
commands.append(
cmd
+ (
" "
+ attr
+ " "
+ key
+ " "
+ item.replace("_", "-")
+ " "
+ val
),
)
elif (
not opr
and item in g_set
and not (h_group and self._in_target(h_group, item))
):
commands.append(
cmd + (" " + attr + " " + key + " " + item.replace("_", "-")),
)
return commands
def search_rules_in_have_rs(self, have_rules, r_number):
"""
This function returns the rule if it is present in target config.
:param have: target config.
:param rs_id: rule-set identifier.
:param r_number: rule-number.
:return: rule.
"""
if have_rules:
- for h in have_rules:
- key = "number"
- for r in have_rules:
- if key in r and r[key] == r_number:
- return r
+ key = "number"
+ for r in have_rules:
+ if key in r and r[key] == r_number:
+ return r
return None
def search_r_sets_in_have(self, have, rs_id, type="rule_sets"):
"""
This function returns the rule-set/rule if it is present in target config.
:param have: target config.
:param rs_id: rule-identifier.
:param type: rule_sets if searching a rule_set and r_list if searching from a rule_list.
:return: rule-set/rule.
"""
if "afi" in rs_id:
afi = rs_id["afi"]
else:
afi = None
if rs_id["filter"]:
key = "filter"
w_value = rs_id["filter"]
elif rs_id["name"]:
key = "name"
w_value = rs_id["name"]
else:
raise ValueError("id must be specific to name or filter")
if type not in ("r_list", "rule_sets"):
raise ValueError("type must be rule_sets or r_list")
if have:
if type == "r_list":
for h in have:
if h["afi"] == afi:
r_sets = self._get_r_sets(h)
for rs in r_sets:
if key in rs and rs[key] == w_value:
return rs
else:
# searching a ruleset
for rs in have:
if key in rs and rs[key] == w_value:
return rs
return None
def _get_r_sets(self, item):
"""
This function returns the list of rule-sets.
:param item: config dictionary.
:return: list of rule-sets/rules.
"""
rs_list = []
type = "rule_sets"
r_sets = item[type]
if r_sets:
for rs in r_sets:
rs_list.append(rs)
return rs_list
def _compute_command(
self,
rs_id,
number=None,
attrib=None,
value=None,
remove=False,
opr=True,
):
"""
This function construct the add/delete command based on passed attributes.
:param rs_id: rule-set identifier.
:param number: rule-number.
:param attrib: attribute name.
:param value: value.
:param remove: True if delete command needed to be construct.
:param opr: operation flag.
:return: generated command.
"""
if rs_id["name"] and rs_id["filter"]:
raise ValueError("name and filter cannot be used together")
if remove or not opr:
cmd = "delete firewall " + self._get_fw_type(rs_id["afi"])
else:
cmd = "set firewall " + self._get_fw_type(rs_id["afi"])
if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
if rs_id["name"]:
cmd += " name " + rs_id["name"]
elif rs_id["filter"]:
cmd += " " + rs_id["filter"] + " filter"
elif rs_id["name"]:
cmd += " " + rs_id["name"]
if number:
cmd += " rule " + str(number)
if attrib:
if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4") and attrib == "enable_default_log":
cmd += " " + "default-log"
else:
cmd += " " + attrib.replace("_", "-")
if value and opr and attrib != "enable_default_log" and attrib != "disable":
cmd += " '" + str(value) + "'"
return cmd
def _add_r_base_attrib(self, rs_id, attr, rule, opr=True):
"""
This function forms the command for 'rules' attributes which doesn't
have further sub attributes.
:param rs_id: rule-set identifier.
:param attrib: attribute name
:param rule: rule config dictionary.
:param opr: True/False.
:return: generated command.
"""
if attr == "number":
command = self._compute_command(rs_id, number=rule["number"], opr=opr)
else:
command = self._compute_command(
rs_id=rs_id,
number=rule["number"],
attrib=attr,
value=rule[attr],
opr=opr,
)
return command
def _rs_id(self, have, afi, name=None, filter=None):
"""
This function returns the rule-set identifier based on
the example rule, overriding the components as specified.
:param have: example rule.
:param afi: address type.
:param name: rule-set name.
:param filter: filter name.
:return: rule-set identifier.
"""
identifier = {"name": None, "filter": None}
if afi:
identifier["afi"] = afi
else:
raise ValueError("afi must be provided")
if name:
identifier["name"] = name
return identifier
elif filter:
identifier["filter"] = filter
return identifier
if have:
if "name" in have and have["name"]:
identifier["name"] = have["name"]
return identifier
if "filter" in have and have["filter"]:
identifier["filter"] = have["filter"]
return identifier
# raise ValueError("name or filter must be provided or present in have")
# unless we want a wildcard
return identifier
def _add_rs_base_attrib(self, rs_id, attrib, rule, opr=True):
"""
This function forms the command for 'rule-sets' attributes which don't
have further sub attributes.
:param rs_id: rule-set identifier.
:param attrib: attribute name
:param rule: rule config dictionary.
:param opr: True/False.
:return: generated command.
"""
command = self._compute_command(
rs_id=rs_id,
attrib=attrib,
value=rule[attrib],
opr=opr,
)
return command
def _bool_to_str(self, val):
"""
This function converts the bool value into string.
:param val: bool value.
:return: enable/disable.
"""
return "enable" if val else "disable"
def _get_fw_type(self, afi):
"""
This function returns the firewall rule-set type based on IP address.
:param afi: address type
:return: rule-set type.
"""
if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
return "ipv6" if afi == "ipv6" else "ipv4"
return "ipv6-name" if afi == "ipv6" else "name"
def _is_del(self, l_set, h, key="number"):
"""
This function checks whether rule needs to be deleted based on
the rule number.
:param l_set: attribute set.
:param h: target config.
:param key: number.
:return: True/False.
"""
return key in l_set and not (h and self._in_target(h, key))
def _is_w_same(self, w, h, key):
"""
This function checks whether the key value is same in base and
target config dictionary.
:param w: base config.
:param h: target config.
:param key:attribute name.
:return: True/False.
"""
return True if h and key in h and h[key] == w[key] else False
def _in_target(self, h, key):
"""
This function checks whether the target exists and key present in target config.
:param h: target config.
:param key: attribute name.
:return: True/False.
"""
return True if h and key in h else False
+
+ def _prune_stubs(self, rs):
+ if isinstance(rs, list):
+ for item in rs:
+ self._prune_stubs(item)
+ elif isinstance(rs, dict):
+ keys_to_remove = [key for key, value in rs.items()
+ if (
+ (key == "disable" and value is False)
+ or
+ (key == "log" and value == "disable" and
+ LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"))
+ or
+ (key in ["new", "invalid", "related", "established"] and value is False and
+ LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4")))]
+ for key in keys_to_remove:
+ del rs[key]
+ for key in rs:
+ self._prune_stubs(rs[key])
+
+ def _is_same_rs(self, w, rs):
+ if isinstance(w, dict) and isinstance(rs, dict):
+ if w.keys() != rs.keys():
+ return False
+ for key in w:
+ if not self._is_same_rs(w[key], rs[key]):
+ return False
+ return True
+ elif isinstance(w, list) and isinstance(rs, list):
+ try:
+ sorted_list1 = sorted(w, key=lambda x: str(x)) # pylint: disable=unnecessary-lambda
+ sorted_list2 = sorted(rs, key=lambda x: str(x)) # pylint: disable=unnecessary-lambda
+ except TypeError:
+ return False
+ return all(self._is_same_rs(x, y) for x, y in zip(sorted_list1, sorted_list2))
+ else:
+ return w == rs
diff --git a/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py b/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py
index 1fc70255..3da70891 100644
--- a/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py
+++ b/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py
@@ -1,534 +1,539 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The vyos firewall_rules fact class
It is in this file the configuration is collected from the device
for a given resource, parsed, and the facts tree is populated
based on the configuration.
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from copy import deepcopy
from re import M, findall, search
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common import utils
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.argspec.firewall_rules.firewall_rules import (
Firewall_rulesArgs,
)
class Firewall_rulesFacts(object):
"""The vyos firewall_rules fact class"""
def __init__(self, module, subspec="config", options="options"):
self._module = module
self.argument_spec = Firewall_rulesArgs.argument_spec
spec = deepcopy(self.argument_spec)
if subspec:
if options:
facts_argument_spec = spec[subspec][options]
else:
facts_argument_spec = spec[subspec]
else:
facts_argument_spec = spec
self.generated_spec = utils.generate_dict(facts_argument_spec)
def get_device_data(self, connection):
return connection.get_config()
def populate_facts(self, connection, ansible_facts, data=None):
"""Populate the facts for firewall_rules
:param connection: the device connection
:param ansible_facts: Facts dictionary
:param data: previously collected conf
:rtype: dictionary
:returns: facts
"""
if not data:
# typically data is populated from the current device configuration
# data = connection.get('show running-config | section ^interface')
# using mock data instead
data = self.get_device_data(connection)
# split the config into instances of the resource
objs = []
# check 1.4+ first
new_rules = True
v6_rules = findall(r"^set firewall ipv6 (name|forward|input|output) (?:\'*)(\S+)(?:\'*)", data, M)
if not v6_rules:
v6_rules = findall(r"^set firewall ipv6-name (?:\'*)(\S+)(?:\'*)", data, M)
if v6_rules:
new_rules = False
v4_rules = findall(r"^set firewall ipv4 (name|forward|input|output) (?:\'*)(\S+)(?:\'*)", data, M)
if not v4_rules:
v4_rules = findall(r"^set firewall name (?:\'*)(\S+)(?:\'*)", data, M)
if v4_rules:
new_rules = False
if v6_rules:
if new_rules:
config = self.get_rules_post_1_4(data, v6_rules, type="ipv6")
else:
config = self.get_rules(data, v6_rules, type="ipv6")
if config:
config = utils.remove_empties(config)
objs.append(config)
if v4_rules:
if new_rules:
config = self.get_rules_post_1_4(data, v4_rules, type="ipv4")
else:
config = self.get_rules(data, v4_rules, type="ipv4")
if config:
config = utils.remove_empties(config)
objs.append(config)
ansible_facts["ansible_network_resources"].pop("firewall_rules", None)
facts = {}
if objs:
facts["firewall_rules"] = []
params = utils.validate_config(self.argument_spec, {"config": objs})
for cfg in params["config"]:
facts["firewall_rules"].append(utils.remove_empties(cfg))
ansible_facts["ansible_network_resources"].update(facts)
return ansible_facts
def get_rules(self, data, rules, type):
"""
This function performs following:
- Form regex to fetch 'rule-sets' specific config from data.
- Form the rule-set list based on ip address.
:param data: configuration.
:param rules: list of rule-sets.
:param type: ip address type.
:return: generated rule-sets configuration.
"""
r_v4 = []
r_v6 = []
for r in set(rules):
name_key = "ipv6-name" if type == "ipv6" else "name"
rule_regex = r" %s %s .+$" % (name_key, r.strip("'"))
cfg = findall(rule_regex, data, M)
fr = self.render_config(cfg, r.strip("'"))
fr["name"] = r.strip("'")
if type == "ipv6":
r_v6.append(fr)
else:
r_v4.append(fr)
if r_v4:
config = {"afi": "ipv4", "rule_sets": r_v4}
if r_v6:
config = {"afi": "ipv6", "rule_sets": r_v6}
return config
def get_rules_post_1_4(self, data, rules, type):
"""
This function performs following:
- Form regex to fetch 'rule-sets' specific config from data.
- Form the rule-set list based on ip address.
Specifically for v1.4+ version.
:param data: configuration.
:param rules: list of rule-sets.
:param type: ip address type.
:return: generated rule-sets configuration.
"""
r_v4 = []
r_v6 = []
for kind, name in set(rules):
rule_regex = r" %s %s %s .+$" % (type, kind, name.strip("'"))
cfg = findall(rule_regex, data, M)
fr = self.render_config(cfg, name.strip("'"))
if kind == "name":
fr["name"] = name.strip("'")
elif kind in ("forward", "input", "output"):
fr["filter"] = kind
else:
raise ValueError("Unknown rule kind: %s %s" % kind, name)
if type == "ipv6":
r_v6.append(fr)
else:
r_v4.append(fr)
if r_v4:
config = {"afi": "ipv4", "rule_sets": r_v4}
if r_v6:
config = {"afi": "ipv6", "rule_sets": r_v6}
return config
def render_config(self, conf, match):
"""
Render config as dictionary structure and delete keys
from spec for null values
:param spec: The facts tree, generated from the argspec
:param conf: The configuration
:rtype: dictionary
:returns: The generated config
"""
conf = "\n".join(filter(lambda x: x, conf))
a_lst = ["description", "default_action", "default_jump_target", "enable_default_log", "default_log"]
config = self.parse_attr(conf, a_lst, match)
if not config:
config = {}
if 'default_log' in config:
config['enable_default_log'] = config.pop('default_log')
config["rules"] = self.parse_rules_lst(conf)
return config
def parse_rules_lst(self, conf):
"""
This function forms the regex to fetch the 'rules' with in
'rule-sets'
:param conf: configuration data.
:return: generated rule list configuration.
"""
r_lst = []
rules = findall(r"rule (?:\'*)(\d+)(?:\'*)", conf, M)
if rules:
rules_lst = []
for r in set(rules):
r_regex = r" %s .+$" % r
cfg = "\n".join(findall(r_regex, conf, M))
obj = self.parse_rules(cfg)
obj["number"] = int(r)
if obj:
rules_lst.append(obj)
r_lst = sorted(rules_lst, key=lambda i: i["number"])
return r_lst
def parse_rules(self, conf):
"""
This function triggers the parsing of 'rule' attributes.
a_lst is a list having rule attributes which doesn't
have further sub attributes.
:param conf: configuration
:return: generated rule configuration dictionary.
"""
a_lst = [
"ipsec",
"log",
"action",
"protocol",
"fragment",
"disable",
"description",
"icmp",
"jump_target",
"queue",
"queue_options",
]
rule = self.parse_attr(conf, a_lst)
r_sub = {
"p2p": self.parse_p2p(conf),
"tcp": self.parse_tcp(conf),
"icmp": self.parse_icmp(conf, "icmp"),
"time": self.parse_time(conf, "time"),
"limit": self.parse_limit(conf, "limit"),
"state": self.parse_state(conf, "state"),
"recent": self.parse_recent(conf, "recent"),
"source": self.parse_src_or_dest(conf, "source"),
"destination": self.parse_src_or_dest(conf, "destination"),
"inbound_interface": self.parse_interface(conf, "inbound-interface"),
"outbound_interface": self.parse_interface(conf, "outbound-interface"),
"packet_length": self.parse_packet_length(conf, "packet-length"),
"packet_length_exclude": self.parse_packet_length(conf, "packet-length-exclude"),
}
rule.update(r_sub)
return rule
def parse_interface(self, conf, attrib):
"""
This function triggers the parsing of 'interface' attributes.
:param conf: configuration.
:param attrib: 'interface'.
:return: generated config dictionary.
"""
a_lst = ["name", "group"]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_packet_length(self, conf, attrib=None):
"""
This function triggers the parsing of 'packet-length' attributes.
:param conf: configuration.
:param attrib: 'packet-length'.
:return: generated config dictionary.
"""
lengths = []
- rule_regex = r"%s (\d+)" % attrib
+ rule_regex = r"%s (.+)$" % attrib
found_lengths = findall(rule_regex, conf, M)
if found_lengths:
lengths = []
for l in set(found_lengths):
obj = {"length": l.strip("'")}
lengths.append(obj)
return lengths
def parse_p2p(self, conf):
"""
This function forms the regex to fetch the 'p2p' with in
'rules'
:param conf: configuration data.
:return: generated rule list configuration.
"""
a_lst = []
applications = findall(r"p2p (?:\'*)(\d+)(?:\'*)", conf, M)
if applications:
app_lst = []
for r in set(applications):
obj = {"application": r.strip("'")}
app_lst.append(obj)
a_lst = sorted(app_lst, key=lambda i: i["application"])
return a_lst
def parse_src_or_dest(self, conf, attrib=None):
"""
This function triggers the parsing of 'source or
destination' attributes.
:param conf: configuration.
:param attrib:'source/destination'.
:return:generated source/destination configuration dictionary.
"""
a_lst = ["port", "address", "mac_address"]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
cfg_dict["group"] = self.parse_group(conf, attrib + " group")
return cfg_dict
def parse_recent(self, conf, attrib=None):
"""
This function triggers the parsing of 'recent' attributes
:param conf: configuration.
:param attrib: 'recent'.
:return: generated config dictionary.
"""
a_lst = ["time", "count"]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_tcp(self, conf):
"""
This function triggers the parsing of 'tcp' attributes.
:param conf: configuration.
:param attrib: 'tcp'.
:return: generated config dictionary.
"""
f_lst = []
flags = findall(r"tcp flags (not )?(?:\'*)([\w!,]+)(?:\'*)", conf, M)
# for pre 1.4, this is a string including possible commas
# and ! as an inverter. For 1.4+ this is a single flag per
# command and 'not' as the inverter
if flags:
flag_lst = []
for n, f in set(flags):
f = f.strip("'").lower()
if "," in f:
# pre 1.4 version with multiple flags
fs = f.split(",")
for f in fs:
if "!" in f:
obj = {"flag": f.strip("'!"), "invert": True}
else:
obj = {"flag": f.strip("'")}
flag_lst.append(obj)
elif "!" in f:
obj = {"flag": f.strip("'!"), "invert": True}
flag_lst.append(obj)
else:
obj = {"flag": f.strip("'")}
if n:
obj["invert"] = True
flag_lst.append(obj)
f_lst = sorted(flag_lst, key=lambda i: i["flag"])
return {"flags": f_lst}
def parse_time(self, conf, attrib=None):
"""
This function triggers the parsing of 'time' attributes.
:param conf: configuration.
:param attrib: 'time'.
:return: generated config dictionary.
"""
a_lst = [
"stopdate",
"stoptime",
"weekdays",
"monthdays",
"startdate",
"starttime",
]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_state(self, conf, attrib=None):
"""
This function triggers the parsing of 'state' attributes.
:param conf: configuration
:param attrib: 'state'.
:return: generated config dictionary.
"""
a_lst = ["new", "invalid", "related", "established"]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_group(self, conf, attrib=None):
"""
This function triggers the parsing of 'group' attributes.
:param conf: configuration.
:param attrib: 'group'.
:return: generated config dictionary.
"""
a_lst = ["port_group", "address_group", "network_group"]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_icmp_attr(self, conf, match):
"""
This function peforms the following:
- parse ICMP arguemnts for firewall rules
- consider that older versions may need numbers or letters
in type, newer ones are more specific
:param conf: configuration.
:param match: parent node/attribute name.
:return: generated config dictionary.
"""
config = {}
if not conf:
return config
for attrib in ("code", "type", "type-name"):
regex = self.map_regex(attrib)
if match:
regex = match + " " + regex
out = search(r"^.*" + regex + " (.+)", conf, M)
if out:
val = out.group(1).strip("'")
if attrib == 'type-name':
config['type_name'] = val
if attrib == 'code':
config['code'] = int(val)
if attrib == 'type':
# <1.3 could be # (type), #/# (type/code) or 'type' (type_name)
# recent this is only for strings
if "/" in val: # type/code
(type_no, code) = val.split(".")
config['type'] = type_no
config['code'] = code
elif val.isnumeric():
config['type'] = type_no
else:
config['type_name'] = val
return config
def parse_icmp(self, conf, attrib=None):
"""
This function triggers the parsing of 'icmp' attributes.
:param conf: configuration to be parsed.
:param attrib: 'icmp'.
:return: generated config dictionary.
"""
cfg_dict = self.parse_icmp_attr(conf, "icmp")
if (len(cfg_dict) == 0):
cfg_dict = self.parse_icmp_attr(conf, "icmpv6")
return cfg_dict
def parse_limit(self, conf, attrib=None):
"""
This function triggers the parsing of 'limit' attributes.
:param conf: configuration to be parsed.
:param attrib: 'limit'
:return: generated config dictionary.
"""
cfg_dict = self.parse_attr(conf, ["burst"], match=attrib)
cfg_dict["rate"] = self.parse_rate(conf, "rate")
return cfg_dict
def parse_rate(self, conf, attrib=None):
"""
This function triggers the parsing of 'rate' attributes.
:param conf: configuration.
:param attrib: 'rate'
:return: generated config dictionary.
"""
a_lst = ["unit", "number"]
cfg_dict = self.parse_attr(conf, a_lst, match=attrib)
return cfg_dict
def parse_attr(self, conf, attr_list, match=None):
"""
This function peforms the following:
- Form the regex to fetch the required attribute config.
- Type cast the output in desired format.
:param conf: configuration.
:param attr_list: list of attributes.
:param match: parent node/attribute name.
:return: generated config dictionary.
"""
config = {}
for attrib in attr_list:
regex = self.map_regex(attrib)
if match:
regex = match + " " + regex
if conf:
if self.is_bool(attrib):
out = conf.find(attrib.replace("_", "-"))
dis = conf.find(attrib.replace("_", "-") + " 'disable'")
if out >= 1:
if dis >= 1:
config[attrib] = False
else:
config[attrib] = True
else:
out = search(r"^.*" + regex + " (.+)", conf, M)
- if not out and attrib == "disable":
- out = search(r"^.*\d+" + " ('disable'$)", conf, M)
+ if not out:
+ if attrib == "disable":
+ out = search(r"^.*\d+" + " (disable$)", conf, M)
+ if attrib == 'log':
+ out = search(r"^.*\d+" + " (log$)", conf, M)
if out:
val = out.group(1).strip("'")
if self.is_num(attrib):
val = int(val)
if attrib == "disable":
val = True
+ if attrib == "log":
+ val = "enable"
config[attrib] = val
return config
def map_regex(self, attrib):
"""
- This function construct the regex string.
- replace the underscore with hyphen.
:param attrib: attribute
:return: regex string
"""
regex = attrib.replace("_", "-")
if attrib == "disabled":
regex = "disable"
return regex
def is_bool(self, attrib):
"""
This function looks for the attribute in predefined bool type set.
:param attrib: attribute.
:return: True/False
"""
bool_set = (
"new",
"invalid",
"related",
"disabled",
"established",
"enable_default_log",
"default_log",
)
return True if attrib in bool_set else False
def is_num(self, attrib):
"""
This function looks for the attribute in predefined integer type set.
:param attrib: attribute.
:return: True/false.
"""
num_set = ("time", "code", "type", "count", "burst", "number")
return True if attrib in num_set else False
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/_get_version.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/_get_version.yaml
new file mode 100644
index 00000000..dda9fcc5
--- /dev/null
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/_get_version.yaml
@@ -0,0 +1,31 @@
+- name: make sure to get facts
+ vyos.vyos.vyos_facts:
+ vars:
+ ansible_connection: ansible.netcommon.network_cli
+ register: vyos_facts
+ when: vyos_version is not defined
+
+- name: debug vyos_facts
+ debug:
+ var: vyos_facts
+
+- name: pull version from facts
+ set_fact:
+ vyos_version: "{{ vyos_facts.ansible_facts.ansible_net_version.split('-')[0].split(' ')[-1] }}"
+ when: vyos_version is not defined
+
+- name: fix '.0' versions
+ set_fact:
+ vyos_version: "{{ vyos_version }}.0"
+ when: vyos_version.count('.') == 1
+
+- name: include correct vars
+ include_vars: pre-v1_4.yaml
+ when: vyos_version is version('1.4.0', '<', version_type='semver')
+
+- name: include correct vars
+ include_vars: v1_4.yaml
+ when: vyos_version is version('1.4.0', '>=', version_type='semver')
+
+- name: include common vars
+ include_vars: main.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config.cfg b/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config_1_3.cfg
similarity index 85%
rename from tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config.cfg
rename to tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config_1_3.cfg
index b54c1094..bb8bc23e 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config.cfg
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config_1_3.cfg
@@ -1,25 +1,25 @@
set firewall group address-group 'inbound'
set firewall ipv6-name UPLINK default-action 'accept'
set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
set firewall ipv6-name UPLINK rule 1 action 'accept'
set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
-set firewall ipv6-name UPLINK rule 1 ipsec 'match-ipsec'
+set firewall ipv6-name UPLINK rule 1 protocol 'tcp'
set firewall ipv6-name UPLINK rule 2 action 'accept'
set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
-set firewall ipv6-name UPLINK rule 2 ipsec 'match-ipsec'
+set firewall ipv6-name UPLINK rule 2 protocol 'tcp'
set firewall name INBOUND default-action 'accept'
set firewall name INBOUND description 'IPv4 INBOUND rule set'
set firewall name INBOUND rule 101 action 'accept'
set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
-set firewall name INBOUND rule 101 ipsec 'match-ipsec'
+set firewall name INBOUND rule 101 protocol 'tcp'
set firewall name INBOUND rule 102 action 'reject'
set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
-set firewall name INBOUND rule 102 ipsec 'match-ipsec'
+set firewall name INBOUND rule 102 protocol 'tcp'
set firewall name INBOUND rule 103 action 'accept'
set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
set firewall name INBOUND rule 103 destination group address-group 'inbound'
set firewall name INBOUND rule 103 source address '192.0.2.0'
set firewall name INBOUND rule 103 state established 'enable'
set firewall name INBOUND rule 103 state invalid 'disable'
set firewall name INBOUND rule 103 state new 'disable'
set firewall name INBOUND rule 103 state related 'enable'
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config_1_4.cfg b/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config_1_4.cfg
new file mode 100644
index 00000000..315ae958
--- /dev/null
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/_parsed_config_1_4.cfg
@@ -0,0 +1,23 @@
+set firewall group address-group 'inbound'
+set firewall ipv6 name UPLINK default-action 'accept'
+set firewall ipv6 name UPLINK description 'This is ipv6 specific rule-set'
+set firewall ipv6 name UPLINK rule 1 action 'accept'
+set firewall ipv6 name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
+set firewall ipv6 name UPLINK rule 1 protocol 'tcp'
+set firewall ipv6 name UPLINK rule 2 action 'accept'
+set firewall ipv6 name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
+set firewall ipv6 name UPLINK rule 2 protocol 'tcp'
+set firewall ipv4 name INBOUND default-action 'accept'
+set firewall ipv4 name INBOUND description 'IPv4 INBOUND rule set'
+set firewall ipv4 name INBOUND rule 101 action 'accept'
+set firewall ipv4 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+set firewall ipv4 name INBOUND rule 101 protocol 'tcp'
+set firewall ipv4 name INBOUND rule 102 action 'reject'
+set firewall ipv4 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+set firewall ipv4 name INBOUND rule 102 protocol 'tcp'
+set firewall ipv4 name INBOUND rule 103 action 'accept'
+set firewall ipv4 name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+set firewall ipv4 name INBOUND rule 103 destination group address-group 'inbound'
+set firewall ipv4 name INBOUND rule 103 source address '192.0.2.0'
+set firewall ipv4 name INBOUND rule 103 state established
+set firewall ipv4 name INBOUND rule 103 state related
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/_populate.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/_populate.yaml
index 31e0d131..6c235be3 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/_populate.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/_populate.yaml
@@ -1,31 +1,11 @@
---
-- name: Setup
+- ansible.builtin.include_tasks: _remove_config.yaml
+
+- name: ensure facts
+ include_tasks: _get_version.yaml
+
+- name: Setup {{ vyos_version }}
+ vyos.vyos.vyos_config:
+ lines: "{{ populate_config }}"
vars:
- lines: |-
- set firewall group address-group 'inbound'
- set firewall ipv6-name UPLINK default-action 'accept'
- set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
- set firewall ipv6-name UPLINK rule 1 action 'accept'
- set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
- set firewall ipv6-name UPLINK rule 1 ipsec 'match-ipsec'
- set firewall ipv6-name UPLINK rule 2 action 'accept'
- set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
- set firewall ipv6-name UPLINK rule 2 ipsec 'match-ipsec'
- set firewall name INBOUND default-action 'accept'
- set firewall name INBOUND description 'IPv4 INBOUND rule set'
- set firewall name INBOUND rule 101 action 'accept'
- set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
- set firewall name INBOUND rule 101 ipsec 'match-ipsec'
- set firewall name INBOUND rule 102 action 'reject'
- set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
- set firewall name INBOUND rule 102 ipsec 'match-ipsec'
- set firewall name INBOUND rule 103 action 'accept'
- set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
- set firewall name INBOUND rule 103 destination group address-group 'inbound'
- set firewall name INBOUND rule 103 source address '192.0.2.0'
- set firewall name INBOUND rule 103 state established 'enable'
- set firewall name INBOUND rule 103 state invalid 'disable'
- set firewall name INBOUND rule 103 state new 'disable'
- set firewall name INBOUND rule 103 state related 'enable'
- ansible.netcommon.cli_config:
- config: "{{ lines }}"
+ ansible_connection: ansible.netcommon.network_cli
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/_remove_config.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/_remove_config.yaml
index b4fc7965..31f527f9 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/_remove_config.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/_remove_config.yaml
@@ -1,6 +1,10 @@
---
-- name: Remove Config
+- name: ensure facts
+ include_tasks: _get_version.yaml
+
+- name: Remove pre-existing firewall rules
+ vyos.vyos.vyos_config:
+ lines: "{{ remove_config }}"
+ ignore_errors: true
vars:
- lines: "delete firewall ipv6-name\ndelete firewall name\n"
- ansible.netcommon.cli_config:
- config: "{{ lines }}"
+ ansible_connection: ansible.netcommon.network_cli
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted.yaml
index 97b3ae87..2784c2da 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted.yaml
@@ -1,50 +1,50 @@
---
- debug:
msg: Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}
- include_tasks: _populate.yaml
- block:
- - name: Delete firewall rule set.
+ - name: Delete firewall rule set
register: result
vyos.vyos.vyos_firewall_rules: &id001
config:
- afi: ipv6
rule_sets:
- name: UPLINK
- afi: ipv4
rule_sets:
- name: INBOUND
state: deleted
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_rs['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_rs['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
register: result
vyos.vyos.vyos_firewall_rules: *id001
- name: Assert that the previous task was idempotent
assert:
that:
- result.changed == false
- result.commands|length == 0
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_rs['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_afi.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_afi.yaml
index c7a22787..3df19cd2 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_afi.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_afi.yaml
@@ -1,47 +1,47 @@
---
- debug:
msg: Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}
- include_tasks: _populate.yaml
- block:
- - name: Delete firewall rule.
+ - name: Delete firewall rule
register: result
vyos.vyos.vyos_firewall_rules: &id001
config:
- afi: ipv6
- afi: ipv4
state: deleted
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_afi_all['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
register: result
vyos.vyos.vyos_firewall_rules: *id001
- name: Assert that the previous task was idempotent
assert:
that:
- result.changed == false
- result.commands|length == 0
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_all.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_all.yaml
index c55a4c55..84c66bdf 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_all.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/deleted_all.yaml
@@ -1,44 +1,44 @@
---
- debug:
msg: Start vyos_firewall_rules deleted integration tests ansible_connection={{ ansible_connection }}
- include_tasks: _populate.yaml
- block:
- - name: Delete all the firewall rules.
+ - name: Delete all the firewall rules
register: result
vyos.vyos.vyos_firewall_rules: &id001
config:
state: deleted
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that the correct set of commands were generated
assert:
that:
- "{{ deleted_afi_all['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that the after dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Delete attributes of given interfaces (IDEMPOTENT)
register: result
vyos.vyos.vyos_firewall_rules: *id001
- name: Assert that the previous task was idempotent
assert:
that:
- result.changed == false
- result.commands|length == 0
- name: Assert that the before dicts were correctly generated
assert:
that:
- "{{ deleted_afi_all['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/merged.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/merged.yaml
index 674b4371..27973d80 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/merged.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/merged.yaml
@@ -1,99 +1,99 @@
---
- debug:
msg: START vyos_firewall_rules merged integration tests on connection={{ ansible_connection }}
- include_tasks: _populate.yaml
- include_tasks: _remove_config.yaml
- block:
- name: Merge the provided configuration with the existing running configuration
register: result
vyos.vyos.vyos_firewall_rules: &id001
config:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
rules:
- number: 1
action: accept
description: Fwipv6-Rule 1 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 2
action: accept
description: Fwipv6-Rule 2 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
disabled: true
- number: 102
action: reject
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
disable: true
- number: 103
action: accept
description: Rule 103 is configured by Ansible
destination:
group:
address_group: inbound
source:
address: 192.0.2.0
state:
established: true
new: false
invalid: false
related: true
state: merged
- vyos.vyos.vyos_facts:
gather_network_resources: firewall_rules
- name: Assert that before dicts were correctly generated
assert:
that: "{{ merged['before'] | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that correct set of commands were generated
assert:
that:
- "{{ merged['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that fact was correctly generated
assert:
that:
- "{{ merged['after'] | symmetric_difference(ansible_facts['network_resources']['firewall_rules']) |length == 0 }}"
- name: Assert that after dicts was correctly generated
assert:
that:
- "{{ merged['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Merge the provided configuration with the existing running configuration (IDEMPOTENT)
register: result
vyos.vyos.vyos_firewall_rules: *id001
- name: Assert that the previous task was idempotent
assert:
that:
- result['changed'] == false
- name: Assert that before dicts were correctly generated
assert:
that:
- "{{ merged['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/overridden.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/overridden.yaml
index 6e1b3a39..3b649390 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/overridden.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/overridden.yaml
@@ -1,60 +1,64 @@
---
- debug:
msg: START vyos_firewall_rules overridden integration tests on connection={{ ansible_connection }}
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Overrides all device configuration with provided configuration
register: result
vyos.vyos.vyos_firewall_rules: &id001
config:
- afi: ipv4
rule_sets:
- name: Downlink
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 501
action: accept
description: Rule 501 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 502
action: reject
description: Rule 502 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
state: overridden
+ - name: Print result
+ debug:
+ msg: "Result: {{ result }}"
+
- name: Assert that before dicts were correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that correct commands were generated
assert:
that:
- "{{ overridden['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that after dicts were correctly generated
assert:
that:
- "{{ overridden['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Overrides all device configuration with provided configurations (IDEMPOTENT)
register: result
vyos.vyos.vyos_firewall_rules: *id001
- name: Assert that the previous task was idempotent
assert:
that:
- result['changed'] == false
- name: Assert that before dicts were correctly generated
assert:
that:
- "{{ overridden['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/parsed.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/parsed.yaml
index e6eae78a..85a7c33b 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/parsed.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/parsed.yaml
@@ -1,14 +1,23 @@
---
- debug:
msg: START vyos_firewall_rules parsed integration tests on connection={{ ansible_connection }}
-- name: Parse externally provided Firewall rules config to agnostic model
- register: result
- vyos.vyos.vyos_firewall_rules:
- running_config: "{{ lookup('file', '_parsed_config.cfg') }}"
- state: parsed
+- name: ensure facts
+ include_tasks: _get_version.yaml
+
+- name: version {{ vyos_version }}
+ block:
+ - name: Parse externally provided Firewall rules config to agnostic model
+ register: result
+ vyos.vyos.vyos_firewall_rules:
+ running_config: "{{ lookup('file', parsed_config_file) }}"
+ state: parsed
+ - name: set result
+ set_fact:
+ parsed_result: "{{ result }}"
- name: Assert that config was correctly parsed
assert:
that:
- - "{{ parsed['after'] | symmetric_difference(result['parsed']) |length == 0 }}"
+ - parsed_result.changed == false
+ - "{{ parsed['after'] | symmetric_difference(parsed_result['parsed']) |length == 0 }}"
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/rendered.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/rendered.yaml
index 36feb69a..229ceb0e 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/rendered.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/rendered.yaml
@@ -1,55 +1,55 @@
---
- debug:
msg: START vyos_firewall_rules rendered integration tests on connection={{ ansible_connection }}
- include_tasks: _remove_config.yaml
- block:
- name: Structure provided configuration into device specific commands
register: result
vyos.vyos.vyos_firewall_rules:
config:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 102
action: reject
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 103
action: accept
description: Rule 103 is configured by Ansible
destination:
group:
address_group: inbound
source:
address: 192.0.2.0
state:
established: true
new: false
invalid: false
related: true
state: rendered
- name: Assert that correct set of commands were generated
assert:
that:
- "{{ rendered['commands'] | symmetric_difference(result['rendered']) |length == 0 }}"
- debug:
msg: END vyos_firewall_rules rendered integration tests on connection={{ ansible_connection }}
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/replaced.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/replaced.yaml
index 5959c226..b1944626 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/replaced.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/replaced.yaml
@@ -1,66 +1,66 @@
---
- debug:
msg: START vyos_firewall_rules replaced integration tests on connection={{ ansible_connection }}
- include_tasks: _remove_config.yaml
- include_tasks: _populate.yaml
- block:
- name: Replace device configurations of listed firewall rules with provided configurations
register: result
vyos.vyos.vyos_firewall_rules: &id001
config:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 104
action: reject
description: Rule 104 is configured by Ansible
- ipsec: match-none
+ protocol: udp
state: replaced
- name: Assert that correct set of commands were generated
assert:
that:
- "{{ replaced['commands'] | symmetric_difference(result['commands']) |length == 0 }}"
- name: Assert that before dicts are correctly generated
assert:
that:
- "{{ populate | symmetric_difference(result['before']) |length == 0 }}"
- name: Assert that after dict is correctly generated
assert:
that:
- "{{ replaced['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Replace device configurations of listed firewall rules with provided configurarions (IDEMPOTENT)
register: result
vyos.vyos.vyos_firewall_rules: *id001
- name: Assert that task was idempotent
assert:
that:
- result['changed'] == false
- name: Assert that before dict is correctly generated
assert:
that:
- "{{ replaced['after'] | symmetric_difference(result['before']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/tests/cli/rtt.yaml b/tests/integration/targets/vyos_firewall_rules/tests/cli/rtt.yaml
index dcf5b282..be066f9a 100644
--- a/tests/integration/targets/vyos_firewall_rules/tests/cli/rtt.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/tests/cli/rtt.yaml
@@ -1,88 +1,90 @@
---
- debug:
msg: START vyos_firewall_rules round trip integration tests on connection={{ ansible_connection }}
+- include_tasks: _populate.yaml
+
- include_tasks: _remove_config.yaml
- block:
- name: Apply the provided configuration (base config)
register: base_config
vyos.vyos.vyos_firewall_rules:
config:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
rules:
- number: 1
action: accept
description: Fwipv6-Rule 1 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 2
action: accept
description: Fwipv6-Rule 2 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 102
action: reject
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
state: merged
- name: Gather firewall_rules facts
vyos.vyos.vyos_facts:
gather_subset:
- default
gather_network_resources:
- firewall_rules
- name: Apply the provided configuration (config to be reverted)
register: result
vyos.vyos.vyos_firewall_rules:
config:
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 103
action: accept
description: Rule 103 is configured by Ansible
source:
address: 192.0.2.0
state:
established: true
new: false
invalid: false
related: true
state: merged
- name: Assert that changes were applied
assert:
that: "{{ round_trip['after'] | symmetric_difference(result['after']) |length == 0 }}"
- name: Revert back to base config using facts round trip
register: revert
vyos.vyos.vyos_firewall_rules:
config: "{{ ansible_facts['network_resources']['firewall_rules'] }}"
state: overridden
- name: Assert that config was reverted
assert:
that: "{{ base_config['after'] | symmetric_difference(revert['after']) |length == 0 }}"
always:
- include_tasks: _remove_config.yaml
diff --git a/tests/integration/targets/vyos_firewall_rules/vars/main.yaml b/tests/integration/targets/vyos_firewall_rules/vars/main.yaml
index e2b3e10c..c249b346 100644
--- a/tests/integration/targets/vyos_firewall_rules/vars/main.yaml
+++ b/tests/integration/targets/vyos_firewall_rules/vars/main.yaml
@@ -1,316 +1,219 @@
---
merged:
before: []
- commands:
- - set firewall ipv6-name UPLINK default-action 'accept'
- - set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
- - set firewall ipv6-name UPLINK rule 1 action 'accept'
- - set firewall ipv6-name UPLINK rule 1
- - set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
- - set firewall ipv6-name UPLINK rule 1 ipsec 'match-ipsec'
- - set firewall ipv6-name UPLINK rule 2 action 'accept'
- - set firewall ipv6-name UPLINK rule 2
- - set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
- - set firewall ipv6-name UPLINK rule 2 ipsec 'match-ipsec'
- - set firewall name INBOUND default-action 'accept'
- - set firewall name INBOUND description 'IPv4 INBOUND rule set'
- - set firewall name INBOUND rule 101 action 'accept'
- - set firewall name INBOUND rule 101 disable
- - set firewall name INBOUND rule 101
- - set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
- - set firewall name INBOUND rule 101 ipsec 'match-ipsec'
- - set firewall name INBOUND rule 102 action 'reject'
- - set firewall name INBOUND rule 102 disable
- - set firewall name INBOUND rule 102
- - set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
- - set firewall name INBOUND rule 102 ipsec 'match-ipsec'
- - set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
- - set firewall name INBOUND rule 103 destination group address-group inbound
- - set firewall name INBOUND rule 103
- - set firewall name INBOUND rule 103 source address 192.0.2.0
- - set firewall name INBOUND rule 103 state established enable
- - set firewall name INBOUND rule 103 state related enable
- - set firewall name INBOUND rule 103 state invalid disable
- - set firewall name INBOUND rule 103 state new disable
- - set firewall name INBOUND rule 103 action 'accept'
+ commands: "{{ merged_commands }}"
after:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
rules:
- number: 1
action: accept
description: Fwipv6-Rule 1 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 2
action: accept
description: Fwipv6-Rule 2 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
disable: true
- number: 102
action: reject
disable: true
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 103
action: accept
description: Rule 103 is configured by Ansible
destination:
group:
address_group: inbound
source:
address: 192.0.2.0
- state:
- established: true
- new: false
- invalid: false
- related: true
+ state: "{{ state_dict }}"
+
populate:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
rules:
- number: 1
action: accept
description: Fwipv6-Rule 1 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 2
action: accept
description: Fwipv6-Rule 2 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 102
action: reject
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 103
action: accept
description: Rule 103 is configured by Ansible
destination:
group:
address_group: inbound
source:
address: 192.0.2.0
- state:
- established: true
- new: false
- invalid: false
- related: true
+ state: "{{ state_dict }}"
+
replaced:
- commands:
- - delete firewall ipv6-name UPLINK rule 1
- - delete firewall ipv6-name UPLINK rule 2
- - delete firewall name INBOUND rule 102
- - delete firewall name INBOUND rule 103
- - set firewall name INBOUND rule 104 action 'reject'
- - set firewall name INBOUND rule 104 description 'Rule 104 is configured by Ansible'
- - set firewall name INBOUND rule 104
- - set firewall name INBOUND rule 104 ipsec 'match-none'
+ commands: "{{ replaced_commands }}"
after:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 104
action: reject
description: Rule 104 is configured by Ansible
- ipsec: match-none
+ protocol: udp
overridden:
before:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 104
action: reject
description: Rule 104 is configured by Ansible
- ipsec: match-none
- commands:
- - delete firewall ipv6-name UPLINK
- - delete firewall name INBOUND
- - set firewall name Downlink default-action 'accept'
- - set firewall name Downlink description 'IPv4 INBOUND rule set'
- - set firewall name Downlink rule 501 action 'accept'
- - set firewall name Downlink rule 501
- - set firewall name Downlink rule 501 description 'Rule 501 is configured by Ansible'
- - set firewall name Downlink rule 501 ipsec 'match-ipsec'
- - set firewall name Downlink rule 502 action 'reject'
- - set firewall name Downlink rule 502
- - set firewall name Downlink rule 502 description 'Rule 502 is configured by Ansible'
- - set firewall name Downlink rule 502 ipsec 'match-ipsec'
+ protocol: udp
+ commands: "{{ overridden_commands }}"
after:
- afi: ipv4
rule_sets:
- name: Downlink
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 501
action: accept
description: Rule 501 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 502
action: reject
description: Rule 502 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
parsed:
after:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
rules:
- number: 1
action: accept
description: Fwipv6-Rule 1 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 2
action: accept
description: Fwipv6-Rule 2 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 102
action: reject
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 103
action: accept
description: Rule 103 is configured by Ansible
destination:
group:
address_group: inbound
source:
address: 192.0.2.0
- state:
- established: true
- new: false
- invalid: false
- related: true
-rendered:
- commands:
- - set firewall ipv6-name UPLINK default-action 'accept'
- - set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
- - set firewall name INBOUND default-action 'accept'
- - set firewall name INBOUND description 'IPv4 INBOUND rule set'
- - set firewall name INBOUND rule 101 action 'accept'
- - set firewall name INBOUND rule 101
- - set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
- - set firewall name INBOUND rule 101 ipsec 'match-ipsec'
- - set firewall name INBOUND rule 102 action 'reject'
- - set firewall name INBOUND rule 102
- - set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
- - set firewall name INBOUND rule 102 ipsec 'match-ipsec'
- - set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
- - set firewall name INBOUND rule 103 destination group address-group inbound
- - set firewall name INBOUND rule 103
- - set firewall name INBOUND rule 103 source address 192.0.2.0
- - set firewall name INBOUND rule 103 state established enable
- - set firewall name INBOUND rule 103 state related enable
- - set firewall name INBOUND rule 103 state invalid disable
- - set firewall name INBOUND rule 103 state new disable
- - set firewall name INBOUND rule 103 action 'accept'
-deleted_rs:
- commands:
- - delete firewall ipv6-name UPLINK
- - delete firewall name INBOUND
- after: []
-deleted_afi_all:
- commands:
- - delete firewall ipv6-name
- - delete firewall name
- after: []
+ state: "{{ state_dict }}"
+
round_trip:
after:
- afi: ipv6
rule_sets:
- name: UPLINK
description: This is ipv6 specific rule-set
default_action: accept
rules:
- number: 1
action: accept
description: Fwipv6-Rule 1 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 2
action: accept
description: Fwipv6-Rule 2 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- afi: ipv4
rule_sets:
- name: INBOUND
description: IPv4 INBOUND rule set
default_action: accept
rules:
- number: 101
action: accept
description: Rule 101 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 102
action: reject
description: Rule 102 is configured by Ansible
- ipsec: match-ipsec
+ protocol: tcp
- number: 103
action: accept
description: Rule 103 is configured by Ansible
source:
address: 192.0.2.0
- state:
- established: true
- new: false
- invalid: false
- related: true
+ state: "{{ state_dict }}"
diff --git a/tests/integration/targets/vyos_firewall_rules/vars/pre-v1_4.yaml b/tests/integration/targets/vyos_firewall_rules/vars/pre-v1_4.yaml
new file mode 100644
index 00000000..c7d7398b
--- /dev/null
+++ b/tests/integration/targets/vyos_firewall_rules/vars/pre-v1_4.yaml
@@ -0,0 +1,130 @@
+---
+merged_commands:
+ - set firewall ipv6-name UPLINK default-action 'accept'
+ - set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
+ - set firewall ipv6-name UPLINK rule 1 action 'accept'
+ - set firewall ipv6-name UPLINK rule 1
+ - set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
+ - set firewall ipv6-name UPLINK rule 1 protocol 'tcp'
+ - set firewall ipv6-name UPLINK rule 2 action 'accept'
+ - set firewall ipv6-name UPLINK rule 2
+ - set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
+ - set firewall ipv6-name UPLINK rule 2 protocol 'tcp'
+ - set firewall name INBOUND default-action 'accept'
+ - set firewall name INBOUND description 'IPv4 INBOUND rule set'
+ - set firewall name INBOUND rule 101 action 'accept'
+ - set firewall name INBOUND rule 101 disable
+ - set firewall name INBOUND rule 101
+ - set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+ - set firewall name INBOUND rule 101 protocol 'tcp'
+ - set firewall name INBOUND rule 102 action 'reject'
+ - set firewall name INBOUND rule 102 disable
+ - set firewall name INBOUND rule 102
+ - set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+ - set firewall name INBOUND rule 102 protocol 'tcp'
+ - set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+ - set firewall name INBOUND rule 103 destination group address-group inbound
+ - set firewall name INBOUND rule 103
+ - set firewall name INBOUND rule 103 source address 192.0.2.0
+ - set firewall name INBOUND rule 103 state established enable
+ - set firewall name INBOUND rule 103 state related enable
+ - set firewall name INBOUND rule 103 state invalid disable
+ - set firewall name INBOUND rule 103 state new disable
+ - set firewall name INBOUND rule 103 action 'accept'
+
+populate_config:
+ - set firewall group address-group 'inbound'
+ - set firewall ipv6-name UPLINK default-action 'accept'
+ - set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
+ - set firewall ipv6-name UPLINK rule 1 action 'accept'
+ - set firewall ipv6-name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
+ - set firewall ipv6-name UPLINK rule 1 protocol 'tcp'
+ - set firewall ipv6-name UPLINK rule 2 action 'accept'
+ - set firewall ipv6-name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
+ - set firewall ipv6-name UPLINK rule 2 protocol 'tcp'
+ - set firewall name INBOUND default-action 'accept'
+ - set firewall name INBOUND description 'IPv4 INBOUND rule set'
+ - set firewall name INBOUND rule 101 action 'accept'
+ - set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+ - set firewall name INBOUND rule 101 protocol 'tcp'
+ - set firewall name INBOUND rule 102 action 'reject'
+ - set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+ - set firewall name INBOUND rule 102 protocol 'tcp'
+ - set firewall name INBOUND rule 103 action 'accept'
+ - set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+ - set firewall name INBOUND rule 103 destination group address-group 'inbound'
+ - set firewall name INBOUND rule 103 source address '192.0.2.0'
+ - set firewall name INBOUND rule 103 state established 'enable'
+ - set firewall name INBOUND rule 103 state invalid 'disable'
+ - set firewall name INBOUND rule 103 state new 'disable'
+ - set firewall name INBOUND rule 103 state related 'enable'
+
+remove_config:
+ - delete firewall name
+ - delete firewall ipv6-name
+
+parsed_config_file: "_parsed_config_1_3.cfg"
+
+replaced_commands:
+ - delete firewall ipv6-name UPLINK rule 1
+ - delete firewall ipv6-name UPLINK rule 2
+ - delete firewall name INBOUND rule 102
+ - delete firewall name INBOUND rule 103
+ - set firewall name INBOUND rule 104 action 'reject'
+ - set firewall name INBOUND rule 104 description 'Rule 104 is configured by Ansible'
+ - set firewall name INBOUND rule 104
+ - set firewall name INBOUND rule 104 protocol 'udp'
+
+overridden_commands:
+ - delete firewall ipv6-name UPLINK
+ - delete firewall name INBOUND
+ - set firewall name Downlink default-action 'accept'
+ - set firewall name Downlink description 'IPv4 INBOUND rule set'
+ - set firewall name Downlink rule 501 action 'accept'
+ - set firewall name Downlink rule 501
+ - set firewall name Downlink rule 501 description 'Rule 501 is configured by Ansible'
+ - set firewall name Downlink rule 501 protocol 'tcp'
+ - set firewall name Downlink rule 502 action 'reject'
+ - set firewall name Downlink rule 502
+ - set firewall name Downlink rule 502 description 'Rule 502 is configured by Ansible'
+ - set firewall name Downlink rule 502 protocol 'tcp'
+
+rendered:
+ commands:
+ - set firewall ipv6-name UPLINK default-action 'accept'
+ - set firewall ipv6-name UPLINK description 'This is ipv6 specific rule-set'
+ - set firewall name INBOUND default-action 'accept'
+ - set firewall name INBOUND description 'IPv4 INBOUND rule set'
+ - set firewall name INBOUND rule 101 action 'accept'
+ - set firewall name INBOUND rule 101
+ - set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+ - set firewall name INBOUND rule 101 protocol 'tcp'
+ - set firewall name INBOUND rule 102 action 'reject'
+ - set firewall name INBOUND rule 102
+ - set firewall name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+ - set firewall name INBOUND rule 102 protocol 'tcp'
+ - set firewall name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+ - set firewall name INBOUND rule 103 destination group address-group inbound
+ - set firewall name INBOUND rule 103
+ - set firewall name INBOUND rule 103 source address 192.0.2.0
+ - set firewall name INBOUND rule 103 state established enable
+ - set firewall name INBOUND rule 103 state related enable
+ - set firewall name INBOUND rule 103 state invalid disable
+ - set firewall name INBOUND rule 103 state new disable
+ - set firewall name INBOUND rule 103 action 'accept'
+deleted_rs:
+ commands:
+ - delete firewall ipv6-name UPLINK
+ - delete firewall name INBOUND
+ after: []
+deleted_afi_all:
+ commands:
+ - delete firewall ipv6-name
+ - delete firewall name
+ after: []
+
+state_dict:
+ established: true
+ new: false
+ invalid: false
+ related: true
diff --git a/tests/integration/targets/vyos_firewall_rules/vars/v1_4.yaml b/tests/integration/targets/vyos_firewall_rules/vars/v1_4.yaml
new file mode 100644
index 00000000..267803f6
--- /dev/null
+++ b/tests/integration/targets/vyos_firewall_rules/vars/v1_4.yaml
@@ -0,0 +1,123 @@
+---
+merged_commands:
+ - set firewall ipv6 name UPLINK default-action 'accept'
+ - set firewall ipv6 name UPLINK description 'This is ipv6 specific rule-set'
+ - set firewall ipv6 name UPLINK rule 1 action 'accept'
+ - set firewall ipv6 name UPLINK rule 1
+ - set firewall ipv6 name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
+ - set firewall ipv6 name UPLINK rule 1 protocol 'tcp'
+ - set firewall ipv6 name UPLINK rule 2 action 'accept'
+ - set firewall ipv6 name UPLINK rule 2
+ - set firewall ipv6 name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
+ - set firewall ipv6 name UPLINK rule 2 protocol 'tcp'
+ - set firewall ipv4 name INBOUND default-action 'accept'
+ - set firewall ipv4 name INBOUND description 'IPv4 INBOUND rule set'
+ - set firewall ipv4 name INBOUND rule 101 action 'accept'
+ - set firewall ipv4 name INBOUND rule 101 disable
+ - set firewall ipv4 name INBOUND rule 101
+ - set firewall ipv4 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 101 protocol 'tcp'
+ - set firewall ipv4 name INBOUND rule 102 action 'reject'
+ - set firewall ipv4 name INBOUND rule 102 disable
+ - set firewall ipv4 name INBOUND rule 102
+ - set firewall ipv4 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 102 protocol 'tcp'
+ - set firewall ipv4 name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 103 destination group address-group inbound
+ - set firewall ipv4 name INBOUND rule 103
+ - set firewall ipv4 name INBOUND rule 103 source address 192.0.2.0
+ - set firewall ipv4 name INBOUND rule 103 state established
+ - set firewall ipv4 name INBOUND rule 103 state related
+ - set firewall ipv4 name INBOUND rule 103 action 'accept'
+
+populate_config:
+ - set firewall group address-group 'inbound'
+ - set firewall ipv6 name UPLINK default-action 'accept'
+ - set firewall ipv6 name UPLINK description 'This is ipv6 specific rule-set'
+ - set firewall ipv6 name UPLINK rule 1 action 'accept'
+ - set firewall ipv6 name UPLINK rule 1 description 'Fwipv6-Rule 1 is configured by Ansible'
+ - set firewall ipv6 name UPLINK rule 1 protocol 'tcp'
+ - set firewall ipv6 name UPLINK rule 2 action 'accept'
+ - set firewall ipv6 name UPLINK rule 2 description 'Fwipv6-Rule 2 is configured by Ansible'
+ - set firewall ipv6 name UPLINK rule 2 protocol 'tcp'
+ - set firewall ipv4 name INBOUND default-action 'accept'
+ - set firewall ipv4 name INBOUND description 'IPv4 INBOUND rule set'
+ - set firewall ipv4 name INBOUND rule 101 action 'accept'
+ - set firewall ipv4 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 101 protocol 'tcp'
+ - set firewall ipv4 name INBOUND rule 102 action 'reject'
+ - set firewall ipv4 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 102 protocol 'tcp'
+ - set firewall ipv4 name INBOUND rule 103 action 'accept'
+ - set firewall ipv4 name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 103 destination group address-group 'inbound'
+ - set firewall ipv4 name INBOUND rule 103 source address '192.0.2.0'
+ - set firewall ipv4 name INBOUND rule 103 state established
+ - set firewall ipv4 name INBOUND rule 103 state related
+
+remove_config:
+ - delete firewall ipv4
+ - delete firewall ipv6
+
+parsed_config_file: "_parsed_config_1_4.cfg"
+
+replaced_commands:
+ - delete firewall ipv6 name UPLINK rule 1
+ - delete firewall ipv6 name UPLINK rule 2
+ - delete firewall ipv4 name INBOUND rule 102
+ - delete firewall ipv4 name INBOUND rule 103
+ - set firewall ipv4 name INBOUND rule 104 action 'reject'
+ - set firewall ipv4 name INBOUND rule 104 description 'Rule 104 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 104
+ - set firewall ipv4 name INBOUND rule 104 protocol 'udp'
+
+overridden_commands:
+ - delete firewall ipv6 name UPLINK
+ - delete firewall ipv4 name INBOUND
+ - set firewall ipv4 name Downlink default-action 'accept'
+ - set firewall ipv4 name Downlink description 'IPv4 INBOUND rule set'
+ - set firewall ipv4 name Downlink rule 501 action 'accept'
+ - set firewall ipv4 name Downlink rule 501
+ - set firewall ipv4 name Downlink rule 501 description 'Rule 501 is configured by Ansible'
+ - set firewall ipv4 name Downlink rule 501 protocol 'tcp'
+ - set firewall ipv4 name Downlink rule 502 action 'reject'
+ - set firewall ipv4 name Downlink rule 502
+ - set firewall ipv4 name Downlink rule 502 description 'Rule 502 is configured by Ansible'
+ - set firewall ipv4 name Downlink rule 502 protocol 'tcp'
+
+
+rendered:
+ commands:
+ - set firewall ipv6 name UPLINK default-action 'accept'
+ - set firewall ipv6 name UPLINK description 'This is ipv6 specific rule-set'
+ - set firewall ipv4 name INBOUND default-action 'accept'
+ - set firewall ipv4 name INBOUND description 'IPv4 INBOUND rule set'
+ - set firewall ipv4 name INBOUND rule 101 action 'accept'
+ - set firewall ipv4 name INBOUND rule 101
+ - set firewall ipv4 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 101 protocol 'tcp'
+ - set firewall ipv4 name INBOUND rule 102 action 'reject'
+ - set firewall ipv4 name INBOUND rule 102
+ - set firewall ipv4 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 102 protocol 'tcp'
+ - set firewall ipv4 name INBOUND rule 103 description 'Rule 103 is configured by Ansible'
+ - set firewall ipv4 name INBOUND rule 103 destination group address-group inbound
+ - set firewall ipv4 name INBOUND rule 103
+ - set firewall ipv4 name INBOUND rule 103 source address 192.0.2.0
+ - set firewall ipv4 name INBOUND rule 103 state established
+ - set firewall ipv4 name INBOUND rule 103 state related
+ - set firewall ipv4 name INBOUND rule 103 action 'accept'
+deleted_rs:
+ commands:
+ - delete firewall ipv6 name UPLINK
+ - delete firewall ipv4 name INBOUND
+ after: []
+deleted_afi_all:
+ commands:
+ - delete firewall ipv6
+ - delete firewall ipv4
+ after: []
+
+state_dict:
+ established: true
+ related: true
diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg
index f1fdf1ea..6c248d2b 100644
--- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg
+++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg
@@ -1,16 +1,16 @@
set firewall name V4-INGRESS default-action 'accept'
set firewall ipv6-name V6-INGRESS default-action 'accept'
set firewall name V4-INGRESS description 'This is IPv4 V4-INGRESS rule set'
set firewall name V4-INGRESS enable-default-log
set firewall name V4-INGRESS rule 101 protocol 'icmp'
set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'
set firewall name V4-INGRESS rule 101 fragment 'match-frag'
set firewall name V4-INGRESS rule 101
-set firewall name V4-INGRESS rule 101 'disable'
+set firewall name V4-INGRESS rule 101 disable
set firewall name V4-INGRESS rule 101 action 'accept'
set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec'
set firewall name V4-INGRESS rule 101 log 'enable'
set firewall name EGRESS default-action 'reject'
set firewall ipv6-name EGRESS default-action 'reject'
set firewall ipv6-name EGRESS rule 20
set firewall ipv6-name EGRESS rule 20 icmpv6 type 'echo-request'
diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg
index ef596cde..e82e3903 100644
--- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg
+++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg
@@ -1,26 +1,34 @@
set firewall ipv4 name V4-INGRESS default-action 'accept'
set firewall ipv6 name V6-INGRESS default-action 'accept'
set firewall ipv4 name V4-INGRESS description 'This is IPv4 V4-INGRESS rule set'
set firewall ipv4 name V4-INGRESS default-log
set firewall ipv4 name V4-INGRESS rule 101 protocol 'icmp'
set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'
set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 100
set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 300
set firewall ipv4 name V4-INGRESS rule 101 log
set firewall ipv4 name V4-INGRESS rule 101
-set firewall ipv4 name V4-INGRESS rule 101 'disable'
+set firewall ipv4 name V4-INGRESS rule 101 disable
set firewall ipv4 name V4-INGRESS rule 101 action 'accept'
set firewall ipv4 name EGRESS default-action 'reject'
set firewall ipv6 name EGRESS default-action 'reject'
set firewall ipv6 name EGRESS rule 20
set firewall ipv6 name EGRESS rule 20 icmpv6 type-name 'echo-request'
-set firewall ipv6 input filter 1 jump-target 'V6-INGRESS'
-set firewall ipv6 output filter 1 jump-target 'EGRESS'
-set firewall ipv4 input filter 1 jump-target 'INGRESS'
-set firewall ipv4 output filter 1 jump-target 'EGRESS'
-set firewall ipv4 name IF-TEST rule 10 'disable'
+set firewall ipv6 input filter rule 1
+set firewall ipv6 input filter rule 1 action 'jump'
+set firewall ipv6 input filter rule 1 jump-target 'V6-INGRESS'
+set firewall ipv6 output filter rule 1
+set firewall ipv6 output filter rule 1 action 'jump'
+set firewall ipv6 output filter rule 1 jump-target 'EGRESS'
+set firewall ipv4 input filter rule 1
+set firewall ipv4 input filter rule 1 action 'jump'
+set firewall ipv4 input filter rule 1 jump-target 'INGRESS'
+set firewall ipv4 output filter rule 1
+set firewall ipv4 output filter rule 1 action 'jump'
+set firewall ipv4 output filter rule 1 jump-target 'EGRESS'
+set firewall ipv4 name IF-TEST rule 10 disable
set firewall ipv4 name IF-TEST rule 10 action 'accept'
set firewall ipv4 name IF-TEST rule 10 inbound-interface name 'eth0'
set firewall ipv4 name IF-TEST rule 10 outbound-interface group 'the-ethers'
set firewall ipv4 name IF-TEST rule 10 icmp type-name 'echo-request'
set firewall ipv4 name IF-TEST rule 10 state 'related'
diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py b/tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py
similarity index 72%
copy from tests/unit/modules/network/vyos/test_vyos_firewall_rules.py
copy to tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py
index c0815bfa..101f389e 100644
--- a/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py
+++ b/tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py
@@ -1,1706 +1,1439 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see .
# Make coding more python3-ish
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from unittest.mock import patch
from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_rules
from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args
from .vyos_module import TestVyosModule, load_fixture
-class TestVyosFirewallRulesModule(TestVyosModule):
+class TestVyosFirewallRulesModule13(TestVyosModule):
module = vyos_firewall_rules
def setUp(self):
- super(TestVyosFirewallRulesModule, self).setUp()
+ super(TestVyosFirewallRulesModule13, self).setUp()
self.mock_get_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config",
)
self.get_config = self.mock_get_config.start()
self.mock_load_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config",
)
self.load_config = self.mock_load_config.start()
self.mock_get_resource_connection_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection",
)
self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
self.mock_get_resource_connection_facts = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection",
)
self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
self.mock_execute_show_command = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data",
)
self.mock_execute_show_command = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data",
)
self.execute_show_command = self.mock_execute_show_command.start()
self.mock_get_os_version = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version",
)
self.get_os_version = self.mock_get_os_version.start()
self.get_os_version.return_value = "1.2"
def tearDown(self):
- super(TestVyosFirewallRulesModule, self).tearDown()
+ super(TestVyosFirewallRulesModule13, self).tearDown()
self.mock_get_resource_connection_config.stop()
self.mock_get_resource_connection_facts.stop()
self.mock_get_config.stop()
self.mock_load_config.stop()
self.mock_execute_show_command.stop()
self.mock_get_os_version.stop()
def load_fixtures(self, commands=None, filename=None):
def load_from_file(*args, **kwargs):
return load_fixture("vyos_firewall_rules_config.cfg")
self.execute_show_command.side_effect = load_from_file
def test_vyos_firewall_rule_set_01_merged(self):
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INBOUND",
description="This is IPv6 INBOUND rule set",
default_action="reject",
enable_default_log=True,
rules=[],
),
dict(
name="V6-OUTBOUND",
description="This is IPv6 OUTBOUND rule set",
default_action="accept",
enable_default_log=False,
rules=[],
),
],
),
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INBOUND",
description="This is IPv4 INBOUND rule set",
default_action="reject",
enable_default_log=True,
rules=[],
),
dict(
name="V4-OUTBOUND",
description="This is IPv4 OUTBOUND rule set",
default_action="accept",
enable_default_log=False,
rules=[],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6-name V6-INBOUND default-action 'reject'",
"set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
"set firewall ipv6-name V6-INBOUND enable-default-log",
"set firewall ipv6-name V6-OUTBOUND default-action 'accept'",
"set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
"set firewall name V4-INBOUND default-action 'reject'",
"set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
"set firewall name V4-INBOUND enable-default-log",
"set firewall name V4-OUTBOUND default-action 'accept'",
"set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'",
]
self.execute_module(changed=True, commands=commands)
- def test_vyos_firewall_rule_set_02_merged(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- name="V6-INBOUND",
- description="This is IPv6 INBOUND rule set",
- default_action="reject",
- enable_default_log=True,
- rules=[],
- ),
- dict(
- name="V6-OUTBOUND",
- description="This is IPv6 OUTBOUND rule set",
- default_action="accept",
- enable_default_log=False,
- rules=[],
- ),
- ],
- ),
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="V4-INBOUND",
- description="This is IPv4 INBOUND rule set",
- default_action="reject",
- enable_default_log=True,
- rules=[],
- ),
- dict(
- name="V4-OUTBOUND",
- description="This is IPv4 OUTBOUND rule set",
- default_action="accept",
- enable_default_log=False,
- rules=[],
- ),
- ],
- ),
- ],
- state="merged",
- ),
- )
- commands = [
- "set firewall ipv6-name V6-INBOUND default-action 'reject'",
- "set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
- "set firewall ipv6-name V6-INBOUND enable-default-log",
- "set firewall ipv6-name V6-OUTBOUND default-action 'accept'",
- "set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
- "set firewall name V4-INBOUND default-action 'reject'",
- "set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
- "set firewall name V4-INBOUND enable-default-log",
- "set firewall name V4-OUTBOUND default-action 'accept'",
- "set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'",
- ]
- self.execute_module(changed=True, commands=commands)
-
def test_vyos_firewall_v4_rule_sets_rule_merged_01(self):
+ """Test if plugin correctly adds new rules set and a rule with variant attributes"""
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv4 INBOUND rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
log="disable",
protocol="icmp",
fragment="match-frag",
disable=True,
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall name INBOUND default-action 'accept'",
"set firewall name INBOUND description 'This is IPv4 INBOUND rule set'",
"set firewall name INBOUND enable-default-log",
"set firewall name INBOUND rule 101 protocol 'icmp'",
"set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
"set firewall name INBOUND rule 101 fragment 'match-frag'",
"set firewall name INBOUND rule 101",
"set firewall name INBOUND rule 101 disable",
"set firewall name INBOUND rule 101 action 'accept'",
"set firewall name INBOUND rule 101 ipsec 'match-ipsec'",
"set firewall name INBOUND rule 101 log 'disable'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_02(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="tcp",
source=dict(
address="192.0.2.0",
mac_address="38:00:25:19:76:0c",
port=2127,
),
destination=dict(address="192.0.1.0", port=2124),
limit=dict(
burst=10,
rate=dict(number=20, unit="second"),
),
recent=dict(count=10, time=20),
state=dict(
established=True,
related=True,
invalid=True,
new=True,
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall name INBOUND rule 101 protocol 'tcp'",
"set firewall name INBOUND rule 101 destination address 192.0.1.0",
"set firewall name INBOUND rule 101 destination port 2124",
"set firewall name INBOUND rule 101",
"set firewall name INBOUND rule 101 source address 192.0.2.0",
"set firewall name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
"set firewall name INBOUND rule 101 source port 2127",
"set firewall name INBOUND rule 101 state new enable",
"set firewall name INBOUND rule 101 state invalid enable",
"set firewall name INBOUND rule 101 state related enable",
"set firewall name INBOUND rule 101 state established enable",
"set firewall name INBOUND rule 101 limit burst 10",
"set firewall name INBOUND rule 101 limit rate 20/second",
"set firewall name INBOUND rule 101 recent count 10",
"set firewall name INBOUND rule 101 recent time 20",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_03(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
destination=dict(
group=dict(
address_group="OUT-ADDR-GROUP",
network_group="OUT-NET-GROUP",
port_group="OUT-PORT-GROUP",
),
),
source=dict(
group=dict(
address_group="IN-ADDR-GROUP",
network_group="IN-NET-GROUP",
port_group="IN-PORT-GROUP",
),
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall name INBOUND rule 101 source group address-group IN-ADDR-GROUP",
"set firewall name INBOUND rule 101 source group network-group IN-NET-GROUP",
"set firewall name INBOUND rule 101 source group port-group IN-PORT-GROUP",
"set firewall name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP",
"set firewall name INBOUND rule 101 destination group network-group OUT-NET-GROUP",
"set firewall name INBOUND rule 101 destination group port-group OUT-PORT-GROUP",
"set firewall name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_04(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
time=dict(
monthdays="2",
startdate="2020-01-24",
starttime="13:20:00",
stopdate="2020-01-28",
stoptime="13:30:00",
weekdays="!Sat,Sun",
utc=True,
),
tcp=dict(
flags=[
dict(flag="all"),
]
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall name INBOUND rule 101",
"set firewall name INBOUND rule 101 tcp flags ALL",
"set firewall name INBOUND rule 101 time utc",
"set firewall name INBOUND rule 101 time monthdays 2",
"set firewall name INBOUND rule 101 time startdate 2020-01-24",
"set firewall name INBOUND rule 101 time stopdate 2020-01-28",
"set firewall name INBOUND rule 101 time weekdays !Sat,Sun",
"set firewall name INBOUND rule 101 time stoptime 13:30:00",
"set firewall name INBOUND rule 101 time starttime 13:20:00",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_01(self):
+ """Test if plugin correctly adds new ipv6 rules set and a rule with variant attributes"""
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv6 INBOUND rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
disable=True,
icmp=dict(type_name="echo-request"),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6-name INBOUND default-action 'accept'",
"set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'",
"set firewall ipv6-name INBOUND enable-default-log",
"set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
"set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
"set firewall ipv6-name INBOUND rule 101",
"set firewall ipv6-name INBOUND rule 101 disable",
"set firewall ipv6-name INBOUND rule 101 action 'accept'",
"set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'",
"set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_02(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="tcp",
source=dict(
address="2001:db8::12",
mac_address="38:00:25:19:76:0c",
port=2127,
),
destination=dict(address="2001:db8::11", port=2124),
limit=dict(
burst=10,
rate=dict(number=20, unit="second"),
),
recent=dict(count=10, time=20),
state=dict(
established=True,
related=True,
invalid=True,
new=True,
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6-name INBOUND rule 101 protocol 'tcp'",
"set firewall ipv6-name INBOUND rule 101 destination address 2001:db8::11",
"set firewall ipv6-name INBOUND rule 101 destination port 2124",
"set firewall ipv6-name INBOUND rule 101",
"set firewall ipv6-name INBOUND rule 101 source address 2001:db8::12",
"set firewall ipv6-name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
"set firewall ipv6-name INBOUND rule 101 source port 2127",
"set firewall ipv6-name INBOUND rule 101 state new enable",
"set firewall ipv6-name INBOUND rule 101 state invalid enable",
"set firewall ipv6-name INBOUND rule 101 state related enable",
"set firewall ipv6-name INBOUND rule 101 state established enable",
"set firewall ipv6-name INBOUND rule 101 limit burst 10",
"set firewall ipv6-name INBOUND rule 101 recent count 10",
"set firewall ipv6-name INBOUND rule 101 recent time 20",
"set firewall ipv6-name INBOUND rule 101 limit rate 20/second",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_03(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
destination=dict(
group=dict(
address_group="OUT-ADDR-GROUP",
network_group="OUT-NET-GROUP",
port_group="OUT-PORT-GROUP",
),
),
source=dict(
group=dict(
address_group="IN-ADDR-GROUP",
network_group="IN-NET-GROUP",
port_group="IN-PORT-GROUP",
),
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6-name INBOUND rule 101 source group address-group IN-ADDR-GROUP",
"set firewall ipv6-name INBOUND rule 101 source group network-group IN-NET-GROUP",
"set firewall ipv6-name INBOUND rule 101 source group port-group IN-PORT-GROUP",
"set firewall ipv6-name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP",
"set firewall ipv6-name INBOUND rule 101 destination group network-group OUT-NET-GROUP",
"set firewall ipv6-name INBOUND rule 101 destination group port-group OUT-PORT-GROUP",
"set firewall ipv6-name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_04(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
time=dict(
monthdays="2",
startdate="2020-01-24",
starttime="13:20:00",
stopdate="2020-01-28",
stoptime="13:30:00",
weekdays="!Sat,Sun",
utc=True,
),
tcp=dict(
flags=[
dict(flag="all"),
]
),
),
dict(
number="102",
tcp=dict(
flags=[
dict(flag="ack"),
dict(flag="syn"),
dict(flag="fin", invert=True),
],
)
)
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6-name INBOUND rule 101",
"set firewall ipv6-name INBOUND rule 101 tcp flags ALL",
"set firewall ipv6-name INBOUND rule 101 time utc",
"set firewall ipv6-name INBOUND rule 101 time monthdays 2",
"set firewall ipv6-name INBOUND rule 101 time startdate 2020-01-24",
"set firewall ipv6-name INBOUND rule 101 time stopdate 2020-01-28",
"set firewall ipv6-name INBOUND rule 101 time weekdays !Sat,Sun",
"set firewall ipv6-name INBOUND rule 101 time stoptime 13:30:00",
"set firewall ipv6-name INBOUND rule 101 time starttime 13:20:00",
"set firewall ipv6-name INBOUND rule 102",
"set firewall ipv6-name INBOUND rule 102 tcp flags ACK,SYN,!FIN",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="icmp",
icmp=dict(type_name="port-unreachable"),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6-name INBOUND rule 101 icmpv6 type port-unreachable",
"set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
"set firewall ipv6-name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_01(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="icmp",
icmp=dict(type=1, code=1),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall name INBOUND rule 101 icmp type 1",
"set firewall name INBOUND rule 101 icmp code 1",
"set firewall name INBOUND rule 101 protocol 'icmp'",
"set firewall name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_02(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="icmp",
icmp=dict(type_name="echo-request"),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall name INBOUND rule 101 icmp type-name echo-request",
"set firewall name INBOUND rule 101 protocol 'icmp'",
"set firewall name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_del_01(self):
+ """Test if plugin correctly removes existing rule set
+ """
set_module_args(
dict(
config=[dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")])],
state="deleted",
),
)
commands = ["delete firewall name V4-INGRESS"]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_02(self):
+ """Test if plugin correctly removes existing rule sets, both ipv4 and ipv6
+ """
set_module_args(
dict(
config=[
dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")]),
dict(afi="ipv6", rule_sets=[dict(name="V6-INGRESS")]),
],
state="deleted",
),
)
commands = [
"delete firewall name V4-INGRESS",
"delete firewall ipv6-name V6-INGRESS",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_03(self):
+ """Test if plugin correctly removes existing AFIs, both ipv4 and ipv6
+ """
set_module_args(dict(config=[], state="deleted"))
commands = ["delete firewall name", "delete firewall ipv6-name"]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_04(self):
+ """Test if plugin has no effect on non-existent rule sets
+ """
set_module_args(
dict(
config=[
dict(afi="ipv4", rule_sets=[dict(name="V4-ING")]),
dict(afi="ipv6", rule_sets=[dict(name="V6-ING")]),
],
state="deleted",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_rep_01(self):
+ """Test if plugin correctly replaces a particular rule set(s)
+ without affecting the others
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="reject",
description="Rule 101 is configured by Ansible RM",
ipsec="match-ipsec",
protocol="tcp",
fragment="match-frag",
disable=False,
),
dict(
number="102",
action="accept",
description="Rule 102 is configured by Ansible RM",
protocol="icmp",
disable=True,
),
],
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
description="This rule-set is configured by Ansible RM",
),
dict(
name="EGRESS",
default_action="reject",
description="This rule-set is configured by Ansible RM",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="replaced",
),
)
commands = [
- "delete firewall name V4-INGRESS rule 101 disable",
+ "delete firewall name V4-INGRESS rule 101",
+ "set firewall name V4-INGRESS rule 101",
"set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'",
+ "set firewall name V4-INGRESS rule 101 fragment 'match-frag'",
+ "set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec'",
"set firewall name V4-INGRESS rule 101 protocol 'tcp'",
"set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'",
"set firewall name V4-INGRESS rule 101 action 'reject'",
- "delete firewall name V4-INGRESS rule 101 log",
"set firewall name V4-INGRESS rule 102 disable",
"set firewall name V4-INGRESS rule 102 action 'accept'",
"set firewall name V4-INGRESS rule 102 protocol 'icmp'",
"set firewall name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'",
"set firewall name V4-INGRESS rule 102",
"set firewall ipv6-name V6-INGRESS description 'This rule-set is configured by Ansible RM'",
"set firewall ipv6-name EGRESS description 'This rule-set is configured by Ansible RM'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_rep_02(self):
+ """Test if plugin correctly replaces a particular rule(s) and rule set attribute(s)
+ without affecting the others
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=False,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
fragment="match-frag",
disable=True,
),
],
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="replaced",
),
)
commands = [
+ "delete firewall name V4-INGRESS rule 101",
"delete firewall name V4-INGRESS enable-default-log",
- "delete firewall name V4-INGRESS rule 101 log",
+ "set firewall name V4-INGRESS rule 101",
+ "set firewall name V4-INGRESS rule 101 action 'accept'",
+ "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall name V4-INGRESS rule 101 disable",
+ "set firewall name V4-INGRESS rule 101 fragment 'match-frag'",
+ "set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec'",
+ "set firewall name V4-INGRESS rule 101 protocol 'icmp'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_01(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
fragment="match-frag",
disable=True,
log="enable",
)
],
),
dict(
name="EGRESS",
default_action="reject",
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="replaced",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_02(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
fragment="match-frag",
disable=True,
log="enable"
),
],
),
],
),
],
state="replaced",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_mer_idem_01(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
fragment="match-frag",
disable=True,
)
],
),
dict(
name="EGRESS",
default_action="reject",
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="merged",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_ovr_01(self):
+ """Test if plugin correctly resets the entire rule set if there is a change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-IN",
description="This is IPv4 INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="1",
action="reject",
description="Rule 1 is configured by Ansible RM",
ipsec="match-ipsec",
log="enable",
protocol="tcp",
fragment="match-frag",
disable=False,
source=dict(
group=dict(
address_group="IN-ADDR-GROUP",
network_group="IN-NET-GROUP",
port_group="IN-PORT-GROUP",
),
),
),
dict(
number="2",
action="accept",
description="Rule 102 is configured by Ansible RM",
protocol="icmp",
disable=True,
),
],
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-IN",
default_action="accept",
description="This rule-set is configured by Ansible RM",
),
dict(
name="V6-EG",
default_action="reject",
description="This rule-set is configured by Ansible RM",
),
],
),
],
state="overridden",
),
)
commands = [
"delete firewall ipv6-name V6-INGRESS",
"delete firewall ipv6-name EGRESS",
"delete firewall name V4-INGRESS",
"delete firewall name EGRESS",
"set firewall name V4-IN default-action 'accept'",
"set firewall name V4-IN description 'This is IPv4 INGRESS rule set'",
"set firewall name V4-IN enable-default-log",
"set firewall name V4-IN rule 1 protocol 'tcp'",
"set firewall name V4-IN rule 1 log 'enable'",
"set firewall name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'",
"set firewall name V4-IN rule 1 fragment 'match-frag'",
"set firewall name V4-IN rule 1 source group address-group IN-ADDR-GROUP",
"set firewall name V4-IN rule 1 source group network-group IN-NET-GROUP",
"set firewall name V4-IN rule 1 source group port-group IN-PORT-GROUP",
"set firewall name V4-IN rule 1",
"set firewall name V4-IN rule 1 action 'reject'",
"set firewall name V4-IN rule 1 ipsec 'match-ipsec'",
"set firewall name V4-IN rule 2 disable",
"set firewall name V4-IN rule 2 action 'accept'",
"set firewall name V4-IN rule 2 protocol 'icmp'",
"set firewall name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'",
"set firewall name V4-IN rule 2",
"set firewall ipv6-name V6-IN default-action 'accept'",
"set firewall ipv6-name V6-IN description 'This rule-set is configured by Ansible RM'",
"set firewall ipv6-name V6-EG default-action 'reject'",
"set firewall ipv6-name V6-EG description 'This rule-set is configured by Ansible RM'",
]
self.execute_module(changed=True, commands=commands)
+ def test_vyos_firewall_v4v6_rule_sets_rule_ovr_02(self):
+ """Test if plugin correctly resets the entire rule set
+ while removing the absent ones if there is a change in the configuration
+ """
+ set_module_args(
+ dict(
+ config=[
+ dict(
+ afi="ipv4",
+ rule_sets=[
+ dict(
+ name="V4-INGRESS",
+ description="This is IPv4 INGRESS rule set",
+ default_action="accept",
+ enable_default_log=True,
+ rules=[
+ dict(
+ number="101",
+ action="accept",
+ protocol="udp",
+ ),
+ ],
+ ),
+ ],
+ ),
+ dict(
+ afi="ipv6",
+ rule_sets=[
+ dict(
+ name="EGRESS",
+ default_action="reject",
+ description="This rule-set is configured by Ansible RM",
+ rules=[
+ dict(
+ number="20",
+ action="accept",
+ protocol="udp",
+ ),
+ ],
+ ),
+ ],
+ ),
+ ],
+ state="overridden",
+ ),
+ )
+ commands = [
+ "delete firewall ipv6-name V6-INGRESS",
+ "delete firewall ipv6-name EGRESS",
+ "delete firewall name V4-INGRESS",
+ "delete firewall name EGRESS",
+ "set firewall name V4-INGRESS rule 101",
+ "set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'",
+ "set firewall name V4-INGRESS default-action 'accept'",
+ "set firewall name V4-INGRESS enable-default-log",
+ "set firewall name V4-INGRESS rule 101 protocol 'udp'",
+ "set firewall name V4-INGRESS rule 101 action 'accept'",
+ "set firewall ipv6-name EGRESS description 'This rule-set is configured by Ansible RM'",
+ "set firewall ipv6-name EGRESS default-action 'reject'",
+ "set firewall ipv6-name EGRESS rule 20",
+ "set firewall ipv6-name EGRESS rule 20 protocol 'udp'",
+ "set firewall ipv6-name EGRESS rule 20 action 'accept'"
+ ]
+ self.execute_module(changed=True, commands=commands)
+
def test_vyos_firewall_v4v6_rule_sets_rule_ovr_idem_01(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
fragment="match-frag",
disable=True,
log="enable",
)
],
),
dict(
name="EGRESS",
default_action="reject",
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="overridden",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v6_rule_sets_rule_merged_01_version(self):
- self.get_os_version.return_value = "1.4"
+ """Test if plugin correctly adds ipv6 rule set with rules
+ """
+ self.get_os_version.return_value = "1.3"
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv6 INBOUND rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
disable=True,
icmp=dict(type_name="echo-request"),
log="enable",
),
dict(
number="102",
action="reject",
description="Rule 102 is configured by Ansible",
protocol="ipv6-icmp",
icmp=dict(type=7),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6 name INBOUND default-action 'accept'",
- "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set'",
- "set firewall ipv6 name INBOUND default-log",
- "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'",
- "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 name INBOUND rule 101",
- "set firewall ipv6 name INBOUND rule 101 disable",
- "set firewall ipv6 name INBOUND rule 101 action 'accept'",
- "set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'",
- "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request",
- "set firewall ipv6 name INBOUND rule 101 log 'enable'",
- "set firewall ipv6 name INBOUND rule 102",
- "set firewall ipv6 name INBOUND rule 102 action 'reject'",
- "set firewall ipv6 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'",
- "set firewall ipv6 name INBOUND rule 102 protocol 'ipv6-icmp'",
- 'set firewall ipv6 name INBOUND rule 102 icmpv6 type 7',
+ "set firewall ipv6-name INBOUND default-action 'accept'",
+ "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'",
+ "set firewall ipv6-name INBOUND enable-default-log",
+ "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6-name INBOUND rule 101",
+ "set firewall ipv6-name INBOUND rule 101 disable",
+ "set firewall ipv6-name INBOUND rule 101 action 'accept'",
+ "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'",
+ "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request",
+ "set firewall ipv6-name INBOUND rule 101 log 'enable'",
+ "set firewall ipv6-name INBOUND rule 102",
+ "set firewall ipv6-name INBOUND rule 102 action 'reject'",
+ "set firewall ipv6-name INBOUND rule 102 description 'Rule 102 is configured by Ansible'",
+ "set firewall ipv6-name INBOUND rule 102 protocol 'ipv6-icmp'",
+ 'set firewall ipv6-name INBOUND rule 102 icmpv6 type 7',
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_jump_rules_merged_01(self):
- self.get_os_version.return_value = "1.4"
+ """Test if plugin correctly adds rule set with a jump action
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv6 INBOUND rule set with a jump action",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="jump",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
icmp=dict(type_name="echo-request"),
jump_target="PROTECT-RE",
packet_length_exclude=[dict(length=100), dict(length=200)]
),
dict(
number="102",
action="reject",
description="Rule 102 is configured by Ansible",
protocol="ipv6-icmp",
icmp=dict(type=7),
),
],
),
],
)
],
state="merged",
)
)
commands = [
- "set firewall ipv6 name INBOUND default-action 'accept'",
- "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'",
- "set firewall ipv6 name INBOUND default-log",
- "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'",
- "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100",
- "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200",
- "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 name INBOUND rule 101",
- "set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'",
- "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request",
- "set firewall ipv6 name INBOUND rule 101 action 'jump'",
- "set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'",
- "set firewall ipv6 name INBOUND rule 102",
- "set firewall ipv6 name INBOUND rule 102 action 'reject'",
- "set firewall ipv6 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'",
- "set firewall ipv6 name INBOUND rule 102 protocol 'ipv6-icmp'",
- 'set firewall ipv6 name INBOUND rule 102 icmpv6 type 7',
+ "set firewall ipv6-name INBOUND default-action 'accept'",
+ "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'",
+ "set firewall ipv6-name INBOUND enable-default-log",
+ "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv6-name INBOUND rule 101 packet-length-exclude 100",
+ "set firewall ipv6-name INBOUND rule 101 packet-length-exclude 200",
+ "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6-name INBOUND rule 101",
+ "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'",
+ "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request",
+ "set firewall ipv6-name INBOUND rule 101 action 'jump'",
+ "set firewall ipv6-name INBOUND rule 101 jump-target 'PROTECT-RE'",
+ "set firewall ipv6-name INBOUND rule 102",
+ "set firewall ipv6-name INBOUND rule 102 action 'reject'",
+ "set firewall ipv6-name INBOUND rule 102 description 'Rule 102 is configured by Ansible'",
+ "set firewall ipv6-name INBOUND rule 102 protocol 'ipv6-icmp'",
+ 'set firewall ipv6-name INBOUND rule 102 icmpv6 type 7',
]
self.execute_module(changed=True, commands=commands)
-
-class TestVyosFirewallRulesModule14(TestVyosModule):
- module = vyos_firewall_rules
-
- def setUp(self):
- super(TestVyosFirewallRulesModule14, self).setUp()
- self.mock_get_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config"
- )
- self.get_config = self.mock_get_config.start()
-
- self.mock_load_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config"
- )
- self.load_config = self.mock_load_config.start()
-
- self.mock_get_resource_connection_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection"
- )
- self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
-
- self.mock_get_resource_connection_facts = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection"
- )
- self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
- self.mock_execute_show_command = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data"
- )
-
- self.mock_execute_show_command = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data"
- )
- self.execute_show_command = self.mock_execute_show_command.start()
-
- self.mock_get_os_version = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version"
- )
- self.get_os_version = self.mock_get_os_version.start()
- self.get_os_version.return_value = "1.4"
- self.maxDiff = None
-
- def tearDown(self):
- super(TestVyosFirewallRulesModule14, self).tearDown()
- self.mock_get_resource_connection_config.stop()
- self.mock_get_resource_connection_facts.stop()
- self.mock_get_config.stop()
- self.mock_load_config.stop()
- self.mock_execute_show_command.stop()
- self.mock_get_os_version.stop()
-
- def load_fixtures(self, commands=None, filename=None):
- def load_from_file(*args, **kwargs):
- return load_fixture("vyos_firewall_rules_config_v14.cfg")
-
- self.execute_show_command.side_effect = load_from_file
-
- def test_vyos_firewall_packet_length_merged_01(self):
+ def test_vyos_firewall_log_merged_01(self):
+ """Test if new stanza log is correctly applied"""
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
- description="This is IPv6 INBOUND rule set with a jump action",
+ description="This is IPv6 INBOUND rule set with a log",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
- action="jump",
+ action="accept",
description="Rule 101 is configured by Ansible",
- jump_target="PROTECT-RE",
- packet_length_exclude=[dict(length=100), dict(length=200)],
- packet_length=[dict(length=22)]
+ log="enable",
),
],
),
],
)
],
state="merged",
)
)
commands = [
- "set firewall ipv6 name INBOUND default-action 'accept'",
- "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'",
- "set firewall ipv6 name INBOUND default-log",
- "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100",
- "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200",
- "set firewall ipv6 name INBOUND rule 101 packet-length 22",
- "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 name INBOUND rule 101",
- "set firewall ipv6 name INBOUND rule 101 action 'jump'",
- "set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'",
+ "set firewall ipv6-name INBOUND default-action 'accept'",
+ "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set with a log'",
+ "set firewall ipv6-name INBOUND enable-default-log",
+ "set firewall ipv6-name INBOUND rule 101 log 'enable'",
+ "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6-name INBOUND rule 101",
+ "set firewall ipv6-name INBOUND rule 101 action 'accept'",
]
self.maxDiff = None
self.execute_module(changed=True, commands=commands)
- def test_vyos_firewall_packet_length_replace_01(self):
+ def test_vyos_firewall_log_replace_01(self):
+ """Test that stanza is correctly replaced
+ without touching the other stanzas
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
packet_length_exclude=[dict(length=100), dict(length=200)],
- packet_length=[dict(length=22)]
- ),
- ],
- ),
- ],
- )
- ],
- state="replaced",
- )
- )
- commands = [
- "delete firewall ipv4 name V4-INGRESS rule 101 protocol",
- "delete firewall ipv4 name V4-INGRESS rule 101 disable",
- "delete firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 300",
- "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 200",
- "set firewall ipv4 name V4-INGRESS rule 101 packet-length 22",
- ]
- self.maxDiff = None
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_filter_merged_01(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- filter="input",
- description="This is IPv6 INBOUND rule set with a jump action",
- default_action="accept",
- enable_default_log=True,
- rules=[
- dict(
- number="101",
- action="jump",
- description="Rule 101 is configured by Ansible",
- jump_target="PROTECT-RE",
- packet_length_exclude=[dict(length=100), dict(length=200)],
- packet_length=[dict(length=22)]
- ),
- ],
- ),
- ],
- )
- ],
- state="merged",
- )
- )
- commands = [
- "set firewall ipv6 input filter default-action 'accept'",
- "set firewall ipv6 input filter description 'This is IPv6 INBOUND rule set with a jump action'",
- "set firewall ipv6 input filter default-log",
- "set firewall ipv6 input filter rule 101 packet-length-exclude 100",
- "set firewall ipv6 input filter rule 101 packet-length-exclude 200",
- "set firewall ipv6 input filter rule 101 packet-length 22",
- "set firewall ipv6 input filter rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 input filter rule 101",
- "set firewall ipv6 input filter rule 101 action 'jump'",
- "set firewall ipv6 input filter rule 101 jump-target 'PROTECT-RE'",
- ]
- self.maxDiff = None
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_interface_merged_01(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- name="V6-INGRESS",
- description="This is IPv6 INBOUND rule set with a jump action",
- default_action="accept",
- rules=[
- dict(
- number="101",
- action="jump",
- description="Rule 101 is configured by Ansible",
- jump_target="PROTECT-RE",
- inbound_interface=dict(name="eth0"),
- outbound_interface=dict(group="eth1"),
- ),
- ],
- ),
- ],
- )
- ],
- state="merged",
- )
- )
- commands = [
- "set firewall ipv6 name V6-INGRESS description 'This is IPv6 INBOUND rule set with a jump action'",
- "set firewall ipv6 name V6-INGRESS rule 101 inbound-interface name eth0",
- "set firewall ipv6 name V6-INGRESS rule 101 outbound-interface group eth1",
- "set firewall ipv6 name V6-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 name V6-INGRESS rule 101",
- "set firewall ipv6 name V6-INGRESS rule 101 action 'jump'",
- "set firewall ipv6 name V6-INGRESS rule 101 jump-target 'PROTECT-RE'",
- ]
- self.maxDiff = None
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_interface_replace_02(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="IF-TEST",
- description="Changed",
- rules=[
- dict(
- number="10",
- action="accept",
- description="Rule 10 is configured by Ansible",
- inbound_interface=dict(name="eth1"),
+ packet_length=[dict(length=22)],
+ log="enable",
),
],
),
],
)
],
state="replaced",
)
)
commands = [
- "set firewall ipv4 name IF-TEST description 'Changed'",
- "set firewall ipv4 name IF-TEST rule 10 description 'Rule 10 is configured by Ansible'",
- 'set firewall ipv4 name IF-TEST rule 10 inbound-interface name eth1',
- "delete firewall ipv4 name IF-TEST rule 10 outbound-interface group",
- "delete firewall ipv4 name IF-TEST rule 10 disable",
- "delete firewall ipv4 name IF-TEST rule 10 state related",
- "delete firewall ipv4 name IF-TEST rule 10 icmp type-name echo-request",
+ "delete firewall name V4-INGRESS rule 101",
+ "set firewall name V4-INGRESS rule 101",
+ "set firewall name V4-INGRESS rule 101 action 'accept'",
+ "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall name V4-INGRESS rule 101 packet-length-exclude 100",
+ "set firewall name V4-INGRESS rule 101 packet-length-exclude 200",
+ "set firewall name V4-INGRESS rule 101 packet-length 22",
+ "set firewall name V4-INGRESS rule 101 log 'enable'",
]
self.maxDiff = None
self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v4_rule_sets_rule_merged_02(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="INBOUND",
- rules=[
- dict(
- number="101",
- protocol="tcp",
- source=dict(
- address="192.0.2.0",
- mac_address="38:00:25:19:76:0c",
- port=2127,
- ),
- destination=dict(address="192.0.1.0", port=2124),
- limit=dict(
- burst=10,
- rate=dict(number=20, unit="second"),
- ),
- recent=dict(count=10, time=20),
- state=dict(
- established=True,
- related=True,
- invalid=True,
- new=True,
- ),
- ),
- ],
- ),
- ],
- ),
- ],
- state="merged",
- ),
- )
- commands = [
- "set firewall ipv4 name INBOUND rule 101 protocol 'tcp'",
- "set firewall ipv4 name INBOUND rule 101 destination port 2124",
- "set firewall ipv4 name INBOUND rule 101",
- "set firewall ipv4 name INBOUND rule 101 destination address 192.0.1.0",
- "set firewall ipv4 name INBOUND rule 101 source address 192.0.2.0",
- "set firewall ipv4 name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
- "set firewall ipv4 name INBOUND rule 101 source port 2127",
- "set firewall ipv4 name INBOUND rule 101 state new",
- "set firewall ipv4 name INBOUND rule 101 state invalid",
- "set firewall ipv4 name INBOUND rule 101 state related",
- "set firewall ipv4 name INBOUND rule 101 state established",
- "set firewall ipv4 name INBOUND rule 101 limit burst 10",
- "set firewall ipv4 name INBOUND rule 101 limit rate 20/second",
- "set firewall ipv4 name INBOUND rule 101 recent count 10",
- "set firewall ipv4 name INBOUND rule 101 recent time 20",
- ]
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v4_rule_sets_change_state_01(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="IF-TEST",
- rules=[
- dict(
- number="10",
- disable=False,
- action="accept",
- state=dict(
- established=True,
- new=True,
- ),
- ),
- ],
- ),
- ],
- ),
- ],
- state="replaced",
- ),
- )
- commands = [
- "delete firewall ipv4 name IF-TEST rule 10 disable",
- "delete firewall ipv4 name IF-TEST rule 10 inbound-interface name",
- "delete firewall ipv4 name IF-TEST rule 10 icmp type-name echo-request",
- "delete firewall ipv4 name IF-TEST rule 10 outbound-interface group",
- "delete firewall ipv4 name IF-TEST rule 10 state related",
- "set firewall ipv4 name IF-TEST rule 10 state established",
- "set firewall ipv4 name IF-TEST rule 10 state new",
- ]
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v4v6_rule_sets_del_03(self):
- set_module_args(dict(config=[], state="deleted"))
- commands = ["delete firewall ipv4", "delete firewall ipv6"]
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v6_rule_sets_rule_merged_04(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- name="INBOUND",
- rules=[
- dict(
- number="101",
- time=dict(
- monthdays="2",
- startdate="2020-01-24",
- starttime="13:20:00",
- stopdate="2020-01-28",
- stoptime="13:30:00",
- weekdays="!Sat,Sun",
- utc=True,
- ),
- tcp=dict(
- flags=[
- dict(flag="all"),
- ]
- ),
- ),
- dict(
- number="102",
- tcp=dict(
- flags=[
- dict(flag="ack"),
- dict(flag="syn"),
- dict(flag="fin", invert=True),
- ],
- )
- )
- ],
- ),
- ],
- ),
- ],
- state="merged",
- ),
- )
- commands = [
- "set firewall ipv6 name INBOUND rule 101",
- "set firewall ipv6 name INBOUND rule 101 tcp flags all",
- "set firewall ipv6 name INBOUND rule 101 time utc",
- "set firewall ipv6 name INBOUND rule 101 time monthdays 2",
- "set firewall ipv6 name INBOUND rule 101 time startdate 2020-01-24",
- "set firewall ipv6 name INBOUND rule 101 time stopdate 2020-01-28",
- "set firewall ipv6 name INBOUND rule 101 time weekdays !Sat,Sun",
- "set firewall ipv6 name INBOUND rule 101 time stoptime 13:30:00",
- "set firewall ipv6 name INBOUND rule 101 time starttime 13:20:00",
- "set firewall ipv6 name INBOUND rule 102",
- "set firewall ipv6 name INBOUND rule 102 tcp flags ack",
- "set firewall ipv6 name INBOUND rule 102 tcp flags not fin",
- "set firewall ipv6 name INBOUND rule 102 tcp flags syn",
- ]
- self.execute_module(changed=True, commands=commands)
diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py b/tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py
similarity index 69%
rename from tests/unit/modules/network/vyos/test_vyos_firewall_rules.py
rename to tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py
index c0815bfa..547b8f45 100644
--- a/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py
+++ b/tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py
@@ -1,1706 +1,1863 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see .
# Make coding more python3-ish
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from unittest.mock import patch
from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_rules
from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args
from .vyos_module import TestVyosModule, load_fixture
-class TestVyosFirewallRulesModule(TestVyosModule):
+class TestVyosFirewallRulesModule14(TestVyosModule):
module = vyos_firewall_rules
def setUp(self):
- super(TestVyosFirewallRulesModule, self).setUp()
+ super(TestVyosFirewallRulesModule14, self).setUp()
self.mock_get_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config",
+ "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config"
)
self.get_config = self.mock_get_config.start()
self.mock_load_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config",
+ "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config"
)
self.load_config = self.mock_load_config.start()
self.mock_get_resource_connection_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection",
+ "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection"
)
self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
self.mock_get_resource_connection_facts = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection",
+ "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection"
)
self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
self.mock_execute_show_command = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data",
+ "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data"
)
self.mock_execute_show_command = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data",
+ "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data"
)
self.execute_show_command = self.mock_execute_show_command.start()
self.mock_get_os_version = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version",
+ "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version"
)
self.get_os_version = self.mock_get_os_version.start()
- self.get_os_version.return_value = "1.2"
+ self.get_os_version.return_value = "1.4"
+ self.maxDiff = None
def tearDown(self):
- super(TestVyosFirewallRulesModule, self).tearDown()
+ super(TestVyosFirewallRulesModule14, self).tearDown()
self.mock_get_resource_connection_config.stop()
self.mock_get_resource_connection_facts.stop()
self.mock_get_config.stop()
self.mock_load_config.stop()
self.mock_execute_show_command.stop()
self.mock_get_os_version.stop()
def load_fixtures(self, commands=None, filename=None):
def load_from_file(*args, **kwargs):
- return load_fixture("vyos_firewall_rules_config.cfg")
+ return load_fixture("vyos_firewall_rules_config_v14.cfg")
self.execute_show_command.side_effect = load_from_file
def test_vyos_firewall_rule_set_01_merged(self):
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INBOUND",
description="This is IPv6 INBOUND rule set",
default_action="reject",
enable_default_log=True,
rules=[],
),
dict(
name="V6-OUTBOUND",
description="This is IPv6 OUTBOUND rule set",
default_action="accept",
enable_default_log=False,
rules=[],
),
],
),
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INBOUND",
description="This is IPv4 INBOUND rule set",
default_action="reject",
enable_default_log=True,
rules=[],
),
dict(
name="V4-OUTBOUND",
description="This is IPv4 OUTBOUND rule set",
default_action="accept",
enable_default_log=False,
rules=[],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6-name V6-INBOUND default-action 'reject'",
- "set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
- "set firewall ipv6-name V6-INBOUND enable-default-log",
- "set firewall ipv6-name V6-OUTBOUND default-action 'accept'",
- "set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
- "set firewall name V4-INBOUND default-action 'reject'",
- "set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
- "set firewall name V4-INBOUND enable-default-log",
- "set firewall name V4-OUTBOUND default-action 'accept'",
- "set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'",
+ "set firewall ipv6 name V6-INBOUND default-action 'reject'",
+ "set firewall ipv6 name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
+ "set firewall ipv6 name V6-INBOUND default-log",
+ "set firewall ipv6 name V6-OUTBOUND default-action 'accept'",
+ "set firewall ipv6 name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
+ "set firewall ipv4 name V4-INBOUND default-action 'reject'",
+ "set firewall ipv4 name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
+ "set firewall ipv4 name V4-INBOUND default-log",
+ "set firewall ipv4 name V4-OUTBOUND default-action 'accept'",
+ "set firewall ipv4 name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'",
]
self.execute_module(changed=True, commands=commands)
- def test_vyos_firewall_rule_set_02_merged(self):
+ def test_vyos_firewall_packet_length_merged_01(self):
+ """Test if new stanza packet-lenght is correctly applied"""
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
- name="V6-INBOUND",
- description="This is IPv6 INBOUND rule set",
- default_action="reject",
+ name="INBOUND",
+ description="This is IPv6 INBOUND rule set with a jump action",
+ default_action="accept",
enable_default_log=True,
- rules=[],
+ rules=[
+ dict(
+ number="101",
+ action="jump",
+ description="Rule 101 is configured by Ansible",
+ jump_target="PROTECT-RE",
+ packet_length_exclude=[dict(length=100), dict(length=200)],
+ packet_length=[dict(length=22)]
+ ),
+ ],
),
+ ],
+ )
+ ],
+ state="merged",
+ )
+ )
+ commands = [
+ "set firewall ipv6 name INBOUND default-action 'accept'",
+ "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'",
+ "set firewall ipv6 name INBOUND default-log",
+ "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100",
+ "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200",
+ "set firewall ipv6 name INBOUND rule 101 packet-length 22",
+ "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6 name INBOUND rule 101",
+ "set firewall ipv6 name INBOUND rule 101 action 'jump'",
+ "set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'",
+ ]
+ self.maxDiff = None
+ self.execute_module(changed=True, commands=commands)
+
+ def test_vyos_firewall_packet_length_replace_01(self):
+ """Test that stanza is correctly replaced
+ without touching the other stanzas
+ """
+ set_module_args(
+ dict(
+ config=[
+ dict(
+ afi="ipv4",
+ rule_sets=[
dict(
- name="V6-OUTBOUND",
- description="This is IPv6 OUTBOUND rule set",
+ name="V4-INGRESS",
+ description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
- enable_default_log=False,
- rules=[],
+ enable_default_log=True,
+ rules=[
+ dict(
+ number="101",
+ action="accept",
+ description="Rule 101 is configured by Ansible",
+ packet_length_exclude=[dict(length=100), dict(length=200)],
+ packet_length=[dict(length=22)]
+ ),
+ ],
),
],
- ),
+ )
+ ],
+ state="replaced",
+ )
+ )
+ commands = [
+ "delete firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'",
+ "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 100",
+ "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 200",
+ "set firewall ipv4 name V4-INGRESS rule 101 packet-length 22",
+ ]
+ self.maxDiff = None
+ self.execute_module(changed=True, commands=commands)
+
+ def test_vyos_firewall_filter_merged_01(self):
+ """Test if new stanza filter is correctly applied"""
+ set_module_args(
+ dict(
+ config=[
dict(
- afi="ipv4",
+ afi="ipv6",
rule_sets=[
dict(
- name="V4-INBOUND",
- description="This is IPv4 INBOUND rule set",
- default_action="reject",
+ filter="input",
+ description="This is IPv6 INBOUND rule set with a jump action",
+ default_action="accept",
enable_default_log=True,
- rules=[],
+ rules=[
+ dict(
+ number="101",
+ action="jump",
+ description="Rule 101 is configured by Ansible",
+ jump_target="PROTECT-RE",
+ packet_length_exclude=[dict(length=100), dict(length=200)],
+ packet_length=[dict(length=22)]
+ ),
+ ],
),
+ ],
+ )
+ ],
+ state="merged",
+ )
+ )
+ commands = [
+ "set firewall ipv6 input filter default-action 'accept'",
+ "set firewall ipv6 input filter description 'This is IPv6 INBOUND rule set with a jump action'",
+ "set firewall ipv6 input filter default-log",
+ "set firewall ipv6 input filter rule 101 packet-length-exclude 100",
+ "set firewall ipv6 input filter rule 101 packet-length-exclude 200",
+ "set firewall ipv6 input filter rule 101 packet-length 22",
+ "set firewall ipv6 input filter rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6 input filter rule 101",
+ "set firewall ipv6 input filter rule 101 action 'jump'",
+ "set firewall ipv6 input filter rule 101 jump-target 'PROTECT-RE'",
+ ]
+ self.maxDiff = None
+ self.execute_module(changed=True, commands=commands)
+
+ def test_vyos_firewall_interface_merged_01(self):
+ """Test that the rule with a jump action is correctly applied"""
+ set_module_args(
+ dict(
+ config=[
+ dict(
+ afi="ipv6",
+ rule_sets=[
dict(
- name="V4-OUTBOUND",
- description="This is IPv4 OUTBOUND rule set",
+ name="V6-INGRESS",
+ description="This is IPv6 INBOUND rule set with a jump action",
default_action="accept",
- enable_default_log=False,
- rules=[],
+ rules=[
+ dict(
+ number="101",
+ action="jump",
+ description="Rule 101 is configured by Ansible",
+ jump_target="PROTECT-RE",
+ inbound_interface=dict(name="eth0"),
+ outbound_interface=dict(group="eth1"),
+ ),
+ ],
),
],
- ),
+ )
],
state="merged",
- ),
+ )
+ )
+ commands = [
+ "set firewall ipv6 name V6-INGRESS description 'This is IPv6 INBOUND rule set with a jump action'",
+ "set firewall ipv6 name V6-INGRESS rule 101 inbound-interface name eth0",
+ "set firewall ipv6 name V6-INGRESS rule 101 outbound-interface group eth1",
+ "set firewall ipv6 name V6-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6 name V6-INGRESS rule 101",
+ "set firewall ipv6 name V6-INGRESS rule 101 action 'jump'",
+ "set firewall ipv6 name V6-INGRESS rule 101 jump-target 'PROTECT-RE'",
+ ]
+ self.maxDiff = None
+ self.execute_module(changed=True, commands=commands)
+
+ def test_vyos_firewall_interface_replace_02(self):
+ """Test that new stanza is correctly replaced
+ without touching the other stanzas
+ """
+ set_module_args(
+ dict(
+ config=[
+ dict(
+ afi="ipv4",
+ rule_sets=[
+ dict(
+ name="IF-TEST",
+ description="Changed",
+ rules=[
+ dict(
+ number="10",
+ action="accept",
+ description="Rule 10 is configured by Ansible",
+ inbound_interface=dict(name="eth1"),
+ ),
+ ],
+ ),
+ ],
+ )
+ ],
+ state="replaced",
+ )
)
commands = [
- "set firewall ipv6-name V6-INBOUND default-action 'reject'",
- "set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'",
- "set firewall ipv6-name V6-INBOUND enable-default-log",
- "set firewall ipv6-name V6-OUTBOUND default-action 'accept'",
- "set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'",
- "set firewall name V4-INBOUND default-action 'reject'",
- "set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'",
- "set firewall name V4-INBOUND enable-default-log",
- "set firewall name V4-OUTBOUND default-action 'accept'",
- "set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'",
+ "delete firewall ipv4 name IF-TEST rule 10",
+ "set firewall ipv4 name IF-TEST rule 10",
+ "set firewall ipv4 name IF-TEST description 'Changed'",
+ "set firewall ipv4 name IF-TEST rule 10 description 'Rule 10 is configured by Ansible'",
+ 'set firewall ipv4 name IF-TEST rule 10 inbound-interface name eth1',
+ "set firewall ipv4 name IF-TEST rule 10 action 'accept'",
]
+ self.maxDiff = None
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_01(self):
+ """Test if plugin correctly adds new rules set and a rule with variant attributes"""
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv4 INBOUND rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
log="disable",
protocol="icmp",
fragment="match-frag",
disable=True,
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall name INBOUND default-action 'accept'",
- "set firewall name INBOUND description 'This is IPv4 INBOUND rule set'",
- "set firewall name INBOUND enable-default-log",
- "set firewall name INBOUND rule 101 protocol 'icmp'",
- "set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall name INBOUND rule 101 fragment 'match-frag'",
- "set firewall name INBOUND rule 101",
- "set firewall name INBOUND rule 101 disable",
- "set firewall name INBOUND rule 101 action 'accept'",
- "set firewall name INBOUND rule 101 ipsec 'match-ipsec'",
- "set firewall name INBOUND rule 101 log 'disable'",
+ "set firewall ipv4 name INBOUND default-action 'accept'",
+ "set firewall ipv4 name INBOUND description 'This is IPv4 INBOUND rule set'",
+ "set firewall ipv4 name INBOUND default-log",
+ "set firewall ipv4 name INBOUND rule 101",
+ "set firewall ipv4 name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv4 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv4 name INBOUND rule 101 fragment 'match-frag'",
+ "set firewall ipv4 name INBOUND rule 101 disable",
+ "set firewall ipv4 name INBOUND rule 101 action 'accept'",
+ "set firewall ipv4 name INBOUND rule 101 ipsec 'match-ipsec'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_02(self):
+ """Test that a rule set is correctly applied
+ including variant attributes such as state
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="tcp",
source=dict(
address="192.0.2.0",
mac_address="38:00:25:19:76:0c",
port=2127,
),
destination=dict(address="192.0.1.0", port=2124),
limit=dict(
burst=10,
rate=dict(number=20, unit="second"),
),
recent=dict(count=10, time=20),
state=dict(
established=True,
related=True,
invalid=True,
new=True,
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall name INBOUND rule 101 protocol 'tcp'",
- "set firewall name INBOUND rule 101 destination address 192.0.1.0",
- "set firewall name INBOUND rule 101 destination port 2124",
- "set firewall name INBOUND rule 101",
- "set firewall name INBOUND rule 101 source address 192.0.2.0",
- "set firewall name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
- "set firewall name INBOUND rule 101 source port 2127",
- "set firewall name INBOUND rule 101 state new enable",
- "set firewall name INBOUND rule 101 state invalid enable",
- "set firewall name INBOUND rule 101 state related enable",
- "set firewall name INBOUND rule 101 state established enable",
- "set firewall name INBOUND rule 101 limit burst 10",
- "set firewall name INBOUND rule 101 limit rate 20/second",
- "set firewall name INBOUND rule 101 recent count 10",
- "set firewall name INBOUND rule 101 recent time 20",
+ "set firewall ipv4 name INBOUND rule 101 protocol 'tcp'",
+ "set firewall ipv4 name INBOUND rule 101 destination port 2124",
+ "set firewall ipv4 name INBOUND rule 101",
+ "set firewall ipv4 name INBOUND rule 101 destination address 192.0.1.0",
+ "set firewall ipv4 name INBOUND rule 101 source address 192.0.2.0",
+ "set firewall ipv4 name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
+ "set firewall ipv4 name INBOUND rule 101 source port 2127",
+ "set firewall ipv4 name INBOUND rule 101 state new",
+ "set firewall ipv4 name INBOUND rule 101 state invalid",
+ "set firewall ipv4 name INBOUND rule 101 state related",
+ "set firewall ipv4 name INBOUND rule 101 state established",
+ "set firewall ipv4 name INBOUND rule 101 limit burst 10",
+ "set firewall ipv4 name INBOUND rule 101 limit rate 20/second",
+ "set firewall ipv4 name INBOUND rule 101 recent count 10",
+ "set firewall ipv4 name INBOUND rule 101 recent time 20",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_03(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
destination=dict(
group=dict(
address_group="OUT-ADDR-GROUP",
network_group="OUT-NET-GROUP",
port_group="OUT-PORT-GROUP",
),
),
source=dict(
group=dict(
address_group="IN-ADDR-GROUP",
network_group="IN-NET-GROUP",
port_group="IN-PORT-GROUP",
),
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall name INBOUND rule 101 source group address-group IN-ADDR-GROUP",
- "set firewall name INBOUND rule 101 source group network-group IN-NET-GROUP",
- "set firewall name INBOUND rule 101 source group port-group IN-PORT-GROUP",
- "set firewall name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP",
- "set firewall name INBOUND rule 101 destination group network-group OUT-NET-GROUP",
- "set firewall name INBOUND rule 101 destination group port-group OUT-PORT-GROUP",
- "set firewall name INBOUND rule 101",
+ "set firewall ipv4 name INBOUND rule 101 source group address-group IN-ADDR-GROUP",
+ "set firewall ipv4 name INBOUND rule 101 source group network-group IN-NET-GROUP",
+ "set firewall ipv4 name INBOUND rule 101 source group port-group IN-PORT-GROUP",
+ "set firewall ipv4 name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP",
+ "set firewall ipv4 name INBOUND rule 101 destination group network-group OUT-NET-GROUP",
+ "set firewall ipv4 name INBOUND rule 101 destination group port-group OUT-PORT-GROUP",
+ "set firewall ipv4 name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_04(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
time=dict(
monthdays="2",
startdate="2020-01-24",
starttime="13:20:00",
stopdate="2020-01-28",
stoptime="13:30:00",
weekdays="!Sat,Sun",
utc=True,
),
tcp=dict(
flags=[
dict(flag="all"),
]
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall name INBOUND rule 101",
- "set firewall name INBOUND rule 101 tcp flags ALL",
- "set firewall name INBOUND rule 101 time utc",
- "set firewall name INBOUND rule 101 time monthdays 2",
- "set firewall name INBOUND rule 101 time startdate 2020-01-24",
- "set firewall name INBOUND rule 101 time stopdate 2020-01-28",
- "set firewall name INBOUND rule 101 time weekdays !Sat,Sun",
- "set firewall name INBOUND rule 101 time stoptime 13:30:00",
- "set firewall name INBOUND rule 101 time starttime 13:20:00",
+ "set firewall ipv4 name INBOUND rule 101",
+ "set firewall ipv4 name INBOUND rule 101 tcp flags all",
+ "set firewall ipv4 name INBOUND rule 101 time utc",
+ "set firewall ipv4 name INBOUND rule 101 time monthdays 2",
+ "set firewall ipv4 name INBOUND rule 101 time startdate 2020-01-24",
+ "set firewall ipv4 name INBOUND rule 101 time stopdate 2020-01-28",
+ "set firewall ipv4 name INBOUND rule 101 time weekdays !Sat,Sun",
+ "set firewall ipv4 name INBOUND rule 101 time stoptime 13:30:00",
+ "set firewall ipv4 name INBOUND rule 101 time starttime 13:20:00",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_01(self):
+ """Test if plugin correctly adds new ipv6 rules set and a rule with variant attributes"""
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv6 INBOUND rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
disable=True,
icmp=dict(type_name="echo-request"),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6-name INBOUND default-action 'accept'",
- "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'",
- "set firewall ipv6-name INBOUND enable-default-log",
- "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
- "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6-name INBOUND rule 101",
- "set firewall ipv6-name INBOUND rule 101 disable",
- "set firewall ipv6-name INBOUND rule 101 action 'accept'",
- "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'",
- "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request",
+ "set firewall ipv6 name INBOUND default-action 'accept'",
+ "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set'",
+ "set firewall ipv6 name INBOUND default-log",
+ "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv6 name INBOUND rule 101",
+ "set firewall ipv6 name INBOUND rule 101 disable",
+ "set firewall ipv6 name INBOUND rule 101 action 'accept'",
+ "set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'",
+ "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_02(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="tcp",
source=dict(
address="2001:db8::12",
mac_address="38:00:25:19:76:0c",
port=2127,
),
destination=dict(address="2001:db8::11", port=2124),
limit=dict(
burst=10,
rate=dict(number=20, unit="second"),
),
recent=dict(count=10, time=20),
state=dict(
established=True,
related=True,
invalid=True,
new=True,
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6-name INBOUND rule 101 protocol 'tcp'",
- "set firewall ipv6-name INBOUND rule 101 destination address 2001:db8::11",
- "set firewall ipv6-name INBOUND rule 101 destination port 2124",
- "set firewall ipv6-name INBOUND rule 101",
- "set firewall ipv6-name INBOUND rule 101 source address 2001:db8::12",
- "set firewall ipv6-name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
- "set firewall ipv6-name INBOUND rule 101 source port 2127",
- "set firewall ipv6-name INBOUND rule 101 state new enable",
- "set firewall ipv6-name INBOUND rule 101 state invalid enable",
- "set firewall ipv6-name INBOUND rule 101 state related enable",
- "set firewall ipv6-name INBOUND rule 101 state established enable",
- "set firewall ipv6-name INBOUND rule 101 limit burst 10",
- "set firewall ipv6-name INBOUND rule 101 recent count 10",
- "set firewall ipv6-name INBOUND rule 101 recent time 20",
- "set firewall ipv6-name INBOUND rule 101 limit rate 20/second",
+ "set firewall ipv6 name INBOUND rule 101 protocol 'tcp'",
+ "set firewall ipv6 name INBOUND rule 101 destination address 2001:db8::11",
+ "set firewall ipv6 name INBOUND rule 101 destination port 2124",
+ "set firewall ipv6 name INBOUND rule 101",
+ "set firewall ipv6 name INBOUND rule 101 source address 2001:db8::12",
+ "set firewall ipv6 name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
+ "set firewall ipv6 name INBOUND rule 101 source port 2127",
+ "set firewall ipv6 name INBOUND rule 101 state new",
+ "set firewall ipv6 name INBOUND rule 101 state invalid",
+ "set firewall ipv6 name INBOUND rule 101 state related",
+ "set firewall ipv6 name INBOUND rule 101 state established",
+ "set firewall ipv6 name INBOUND rule 101 limit burst 10",
+ "set firewall ipv6 name INBOUND rule 101 recent count 10",
+ "set firewall ipv6 name INBOUND rule 101 recent time 20",
+ "set firewall ipv6 name INBOUND rule 101 limit rate 20/second",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_03(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
destination=dict(
group=dict(
address_group="OUT-ADDR-GROUP",
network_group="OUT-NET-GROUP",
port_group="OUT-PORT-GROUP",
),
),
source=dict(
group=dict(
address_group="IN-ADDR-GROUP",
network_group="IN-NET-GROUP",
port_group="IN-PORT-GROUP",
),
),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6-name INBOUND rule 101 source group address-group IN-ADDR-GROUP",
- "set firewall ipv6-name INBOUND rule 101 source group network-group IN-NET-GROUP",
- "set firewall ipv6-name INBOUND rule 101 source group port-group IN-PORT-GROUP",
- "set firewall ipv6-name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP",
- "set firewall ipv6-name INBOUND rule 101 destination group network-group OUT-NET-GROUP",
- "set firewall ipv6-name INBOUND rule 101 destination group port-group OUT-PORT-GROUP",
- "set firewall ipv6-name INBOUND rule 101",
+ "set firewall ipv6 name INBOUND rule 101 source group address-group IN-ADDR-GROUP",
+ "set firewall ipv6 name INBOUND rule 101 source group network-group IN-NET-GROUP",
+ "set firewall ipv6 name INBOUND rule 101 source group port-group IN-PORT-GROUP",
+ "set firewall ipv6 name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP",
+ "set firewall ipv6 name INBOUND rule 101 destination group network-group OUT-NET-GROUP",
+ "set firewall ipv6 name INBOUND rule 101 destination group port-group OUT-PORT-GROUP",
+ "set firewall ipv6 name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v6_rule_sets_rule_merged_04(self):
+ """Test that the plugin correctly applies configuration
+ within exsiting rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
time=dict(
monthdays="2",
startdate="2020-01-24",
starttime="13:20:00",
stopdate="2020-01-28",
stoptime="13:30:00",
weekdays="!Sat,Sun",
utc=True,
),
tcp=dict(
flags=[
dict(flag="all"),
]
),
),
dict(
number="102",
tcp=dict(
flags=[
dict(flag="ack"),
dict(flag="syn"),
dict(flag="fin", invert=True),
],
)
)
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6-name INBOUND rule 101",
- "set firewall ipv6-name INBOUND rule 101 tcp flags ALL",
- "set firewall ipv6-name INBOUND rule 101 time utc",
- "set firewall ipv6-name INBOUND rule 101 time monthdays 2",
- "set firewall ipv6-name INBOUND rule 101 time startdate 2020-01-24",
- "set firewall ipv6-name INBOUND rule 101 time stopdate 2020-01-28",
- "set firewall ipv6-name INBOUND rule 101 time weekdays !Sat,Sun",
- "set firewall ipv6-name INBOUND rule 101 time stoptime 13:30:00",
- "set firewall ipv6-name INBOUND rule 101 time starttime 13:20:00",
- "set firewall ipv6-name INBOUND rule 102",
- "set firewall ipv6-name INBOUND rule 102 tcp flags ACK,SYN,!FIN",
+ "set firewall ipv6 name INBOUND rule 101",
+ "set firewall ipv6 name INBOUND rule 101 tcp flags all",
+ "set firewall ipv6 name INBOUND rule 101 time utc",
+ "set firewall ipv6 name INBOUND rule 101 time monthdays 2",
+ "set firewall ipv6 name INBOUND rule 101 time startdate 2020-01-24",
+ "set firewall ipv6 name INBOUND rule 101 time stopdate 2020-01-28",
+ "set firewall ipv6 name INBOUND rule 101 time weekdays !Sat,Sun",
+ "set firewall ipv6 name INBOUND rule 101 time stoptime 13:30:00",
+ "set firewall ipv6 name INBOUND rule 101 time starttime 13:20:00",
+ "set firewall ipv6 name INBOUND rule 102",
+ "set firewall ipv6 name INBOUND rule 102 tcp flags ack",
+ "set firewall ipv6 name INBOUND rule 102 tcp flags not fin",
+ "set firewall ipv6 name INBOUND rule 102 tcp flags syn",
]
self.execute_module(changed=True, commands=commands)
- def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self):
+ def test_vyos_firewall_v4_rule_sets_change_state_01(self):
+ """Test that a rule set is replaced applied without touching the other stanzas
+ in particular variant attributes such as state
+ """
set_module_args(
dict(
config=[
dict(
- afi="ipv6",
+ afi="ipv4",
rule_sets=[
dict(
- name="INBOUND",
+ name="IF-TEST",
rules=[
dict(
- number="101",
- protocol="icmp",
- icmp=dict(type_name="port-unreachable"),
- ),
- ],
- ),
- ],
+ number="10",
+ disable=False,
+ action="accept",
+ state=dict(
+ established=True,
+ new=True,
+ ),
+ ),
+ ],
+ ),
+ ],
+ ),
+ ],
+ state="replaced",
+ ),
+ )
+ commands = [
+ "delete firewall ipv4 name IF-TEST rule 10",
+ "set firewall ipv4 name IF-TEST rule 10",
+ "set firewall ipv4 name IF-TEST rule 10 state established",
+ "set firewall ipv4 name IF-TEST rule 10 state new",
+ "set firewall ipv4 name IF-TEST rule 10 action 'accept'",
+ ]
+ self.execute_module(changed=True, commands=commands)
+
+ def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing ipv6 rule set
+ """
+ set_module_args(
+ dict(
+ config=[
+ dict(
+ afi="ipv6",
+ rule_sets=[
+ dict(
+ name="INBOUND",
+ rules=[
+ dict(
+ number="101",
+ protocol="icmp",
+ icmp=dict(type_name="port-unreachable"),
+ ),
+ ],
+ ),
+ ],
),
],
state="merged",
),
)
commands = [
- "set firewall ipv6-name INBOUND rule 101 icmpv6 type port-unreachable",
- "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'",
- "set firewall ipv6-name INBOUND rule 101",
+ "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name port-unreachable",
+ "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv6 name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_01(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="icmp",
icmp=dict(type=1, code=1),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall name INBOUND rule 101 icmp type 1",
- "set firewall name INBOUND rule 101 icmp code 1",
- "set firewall name INBOUND rule 101 protocol 'icmp'",
- "set firewall name INBOUND rule 101",
+ "set firewall ipv4 name INBOUND rule 101 icmp type 1",
+ "set firewall ipv4 name INBOUND rule 101 icmp code 1",
+ "set firewall ipv4 name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv4 name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_02(self):
+ """Test if plugin correctly adds new rules with variant attributes
+ within existing rule set
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="INBOUND",
rules=[
dict(
number="101",
protocol="icmp",
icmp=dict(type_name="echo-request"),
),
],
),
],
),
],
state="merged",
),
)
commands = [
- "set firewall name INBOUND rule 101 icmp type-name echo-request",
- "set firewall name INBOUND rule 101 protocol 'icmp'",
- "set firewall name INBOUND rule 101",
+ "set firewall ipv4 name INBOUND rule 101 icmp type-name echo-request",
+ "set firewall ipv4 name INBOUND rule 101 protocol 'icmp'",
+ "set firewall ipv4 name INBOUND rule 101",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4_rule_sets_del_01(self):
+ """Test if plugin correctly removes existing rule set
+ """
set_module_args(
dict(
config=[dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")])],
state="deleted",
),
)
- commands = ["delete firewall name V4-INGRESS"]
+ commands = ["delete firewall ipv4 name V4-INGRESS"]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_02(self):
+ """Test if plugin correctly removes existing rule sets, both ipv4 and ipv6
+ """
set_module_args(
dict(
config=[
dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")]),
dict(afi="ipv6", rule_sets=[dict(name="V6-INGRESS")]),
],
state="deleted",
),
)
commands = [
- "delete firewall name V4-INGRESS",
- "delete firewall ipv6-name V6-INGRESS",
+ "delete firewall ipv4 name V4-INGRESS",
+ "delete firewall ipv6 name V6-INGRESS",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_03(self):
+ """Test that the plugin correctly deprovisions
+ variant configuration
+ """
set_module_args(dict(config=[], state="deleted"))
- commands = ["delete firewall name", "delete firewall ipv6-name"]
+ commands = ["delete firewall ipv4", "delete firewall ipv6"]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_del_04(self):
+ """Test if plugin has no effect on non-existent rule sets
+ """
set_module_args(
dict(
config=[
dict(afi="ipv4", rule_sets=[dict(name="V4-ING")]),
dict(afi="ipv6", rule_sets=[dict(name="V6-ING")]),
],
state="deleted",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_rep_01(self):
+ """Test if plugin correctly replaces a particular rule set(s)
+ without affecting the others
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="reject",
description="Rule 101 is configured by Ansible RM",
ipsec="match-ipsec",
protocol="tcp",
fragment="match-frag",
disable=False,
),
dict(
number="102",
action="accept",
description="Rule 102 is configured by Ansible RM",
protocol="icmp",
disable=True,
),
],
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
description="This rule-set is configured by Ansible RM",
),
dict(
name="EGRESS",
default_action="reject",
description="This rule-set is configured by Ansible RM",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="replaced",
),
)
commands = [
- "delete firewall name V4-INGRESS rule 101 disable",
- "set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'",
- "set firewall name V4-INGRESS rule 101 protocol 'tcp'",
- "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'",
- "set firewall name V4-INGRESS rule 101 action 'reject'",
- "delete firewall name V4-INGRESS rule 101 log",
- "set firewall name V4-INGRESS rule 102 disable",
- "set firewall name V4-INGRESS rule 102 action 'accept'",
- "set firewall name V4-INGRESS rule 102 protocol 'icmp'",
- "set firewall name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'",
- "set firewall name V4-INGRESS rule 102",
- "set firewall ipv6-name V6-INGRESS description 'This rule-set is configured by Ansible RM'",
- "set firewall ipv6-name EGRESS description 'This rule-set is configured by Ansible RM'",
+ "delete firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS description 'This is IPv4 INGRESS rule set'",
+ "set firewall ipv4 name V4-INGRESS rule 101 fragment 'match-frag'",
+ "set firewall ipv4 name V4-INGRESS rule 101 ipsec 'match-ipsec'",
+ "set firewall ipv4 name V4-INGRESS rule 101 protocol 'tcp'",
+ "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'",
+ "set firewall ipv4 name V4-INGRESS rule 101 action 'reject'",
+ "set firewall ipv4 name V4-INGRESS rule 102 disable",
+ "set firewall ipv4 name V4-INGRESS rule 102 action 'accept'",
+ "set firewall ipv4 name V4-INGRESS rule 102 protocol 'icmp'",
+ "set firewall ipv4 name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'",
+ "set firewall ipv4 name V4-INGRESS rule 102",
+ "set firewall ipv6 name V6-INGRESS description 'This rule-set is configured by Ansible RM'",
+ "set firewall ipv6 name EGRESS description 'This rule-set is configured by Ansible RM'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_rep_02(self):
+ """Test if plugin correctly replaces a particular rule(s) and rule set attribute(s)
+ without affecting the others
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=False,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
fragment="match-frag",
disable=True,
),
],
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
],
),
],
state="replaced",
),
)
commands = [
- "delete firewall name V4-INGRESS enable-default-log",
- "delete firewall name V4-INGRESS rule 101 log",
+ "delete firewall ipv4 name V4-INGRESS rule 101",
+ "delete firewall ipv4 name V4-INGRESS default-log",
+ "set firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'",
+ "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv4 name V4-INGRESS rule 101 disable",
+ "set firewall ipv4 name V4-INGRESS rule 101 fragment 'match-frag'",
+ "set firewall ipv4 name V4-INGRESS rule 101 ipsec 'match-ipsec'",
+ "set firewall ipv4 name V4-INGRESS rule 101 protocol 'icmp'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_01(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
- ipsec="match-ipsec",
+ packet_length_exclude=[dict(length=100), dict(length=300)],
protocol="icmp",
- fragment="match-frag",
disable=True,
log="enable",
)
],
),
+ dict(
+ filter="input",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="INGRESS",
+ ),
+ ],
+ ),
+ dict(
+ filter="output",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="EGRESS",
+ ),
+ ],
+ ),
+ dict(
+ name="IF-TEST",
+ rules=[
+ dict(
+ number="10",
+ action="accept",
+ icmp=dict(type_name="echo-request"),
+ state=dict(related=True),
+ inbound_interface=dict(name="eth0"),
+ outbound_interface=dict(group="the-ethers"),
+ disable=True,
+ )
+ ],
+ ),
dict(
name="EGRESS",
default_action="reject",
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
+ dict(
+ filter="input",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="V6-INGRESS",
+ ),
+ ],
+ ),
+ dict(
+ filter="output",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="EGRESS",
+ ),
+ ],
+ ),
],
),
],
state="replaced",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_02(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
- ipsec="match-ipsec",
+ packet_length_exclude=[dict(length=100), dict(length=300)],
protocol="icmp",
- fragment="match-frag",
disable=True,
- log="enable"
- ),
+ log="enable",
+ )
],
),
],
),
],
state="replaced",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_mer_idem_01(self):
+ """Test if plugin correctly has no effect if there is no change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
- ipsec="match-ipsec",
+ packet_length_exclude=[dict(length=100), dict(length=300)],
protocol="icmp",
- fragment="match-frag",
+ disable=True,
+ log="enable",
+ )
+ ],
+ ),
+ dict(
+ filter="input",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="INGRESS",
+ ),
+ ],
+ ),
+ dict(
+ filter="output",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="EGRESS",
+ ),
+ ],
+ ),
+ dict(
+ name="IF-TEST",
+ rules=[
+ dict(
+ number="10",
+ action="accept",
+ icmp=dict(type_name="echo-request"),
+ state=dict(related=True),
+ inbound_interface=dict(name="eth0"),
+ outbound_interface=dict(group="the-ethers"),
disable=True,
)
],
),
dict(
name="EGRESS",
default_action="reject",
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
+ dict(
+ filter="input",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="V6-INGRESS",
+ ),
+ ],
+ ),
+ dict(
+ filter="output",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="EGRESS",
+ ),
+ ],
+ ),
],
),
],
state="merged",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v4v6_rule_sets_rule_ovr_01(self):
+ """Test if plugin correctly resets the entire rule set if there is a change in the configuration
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-IN",
description="This is IPv4 INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="1",
action="reject",
description="Rule 1 is configured by Ansible RM",
ipsec="match-ipsec",
log="enable",
protocol="tcp",
fragment="match-frag",
disable=False,
source=dict(
group=dict(
address_group="IN-ADDR-GROUP",
network_group="IN-NET-GROUP",
port_group="IN-PORT-GROUP",
),
),
),
dict(
number="2",
action="accept",
description="Rule 102 is configured by Ansible RM",
protocol="icmp",
disable=True,
),
],
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-IN",
default_action="accept",
description="This rule-set is configured by Ansible RM",
),
dict(
name="V6-EG",
default_action="reject",
description="This rule-set is configured by Ansible RM",
),
],
),
],
state="overridden",
),
)
commands = [
- "delete firewall ipv6-name V6-INGRESS",
- "delete firewall ipv6-name EGRESS",
- "delete firewall name V4-INGRESS",
- "delete firewall name EGRESS",
- "set firewall name V4-IN default-action 'accept'",
- "set firewall name V4-IN description 'This is IPv4 INGRESS rule set'",
- "set firewall name V4-IN enable-default-log",
- "set firewall name V4-IN rule 1 protocol 'tcp'",
- "set firewall name V4-IN rule 1 log 'enable'",
- "set firewall name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'",
- "set firewall name V4-IN rule 1 fragment 'match-frag'",
- "set firewall name V4-IN rule 1 source group address-group IN-ADDR-GROUP",
- "set firewall name V4-IN rule 1 source group network-group IN-NET-GROUP",
- "set firewall name V4-IN rule 1 source group port-group IN-PORT-GROUP",
- "set firewall name V4-IN rule 1",
- "set firewall name V4-IN rule 1 action 'reject'",
- "set firewall name V4-IN rule 1 ipsec 'match-ipsec'",
- "set firewall name V4-IN rule 2 disable",
- "set firewall name V4-IN rule 2 action 'accept'",
- "set firewall name V4-IN rule 2 protocol 'icmp'",
- "set firewall name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'",
- "set firewall name V4-IN rule 2",
- "set firewall ipv6-name V6-IN default-action 'accept'",
- "set firewall ipv6-name V6-IN description 'This rule-set is configured by Ansible RM'",
- "set firewall ipv6-name V6-EG default-action 'reject'",
- "set firewall ipv6-name V6-EG description 'This rule-set is configured by Ansible RM'",
+ "delete firewall ipv6 name V6-INGRESS",
+ "delete firewall ipv6 name EGRESS",
+ "delete firewall ipv4 name V4-INGRESS",
+ "delete firewall ipv4 name EGRESS",
+ "delete firewall ipv4 input filter",
+ "delete firewall ipv4 output filter",
+ "delete firewall ipv6 input filter",
+ "delete firewall ipv6 output filter",
+ "delete firewall ipv4 name IF-TEST",
+ "set firewall ipv4 name V4-IN default-action 'accept'",
+ "set firewall ipv4 name V4-IN description 'This is IPv4 INGRESS rule set'",
+ "set firewall ipv4 name V4-IN default-log",
+ "set firewall ipv4 name V4-IN rule 1 protocol 'tcp'",
+ "set firewall ipv4 name V4-IN rule 1 log",
+ "set firewall ipv4 name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'",
+ "set firewall ipv4 name V4-IN rule 1 fragment 'match-frag'",
+ "set firewall ipv4 name V4-IN rule 1 source group address-group IN-ADDR-GROUP",
+ "set firewall ipv4 name V4-IN rule 1 source group network-group IN-NET-GROUP",
+ "set firewall ipv4 name V4-IN rule 1 source group port-group IN-PORT-GROUP",
+ "set firewall ipv4 name V4-IN rule 1",
+ "set firewall ipv4 name V4-IN rule 1 action 'reject'",
+ "set firewall ipv4 name V4-IN rule 1 ipsec 'match-ipsec'",
+ "set firewall ipv4 name V4-IN rule 2 disable",
+ "set firewall ipv4 name V4-IN rule 2 action 'accept'",
+ "set firewall ipv4 name V4-IN rule 2 protocol 'icmp'",
+ "set firewall ipv4 name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'",
+ "set firewall ipv4 name V4-IN rule 2",
+ "set firewall ipv6 name V6-IN default-action 'accept'",
+ "set firewall ipv6 name V6-IN description 'This rule-set is configured by Ansible RM'",
+ "set firewall ipv6 name V6-EG default-action 'reject'",
+ "set firewall ipv6 name V6-EG description 'This rule-set is configured by Ansible RM'",
+ ]
+ self.execute_module(changed=True, commands=commands)
+
+ def test_vyos_firewall_v4v6_rule_sets_rule_ovr_02(self):
+ """Test that the plugin correctly resets the entire
+ rule sets configuration if changes are detected
+ """
+ set_module_args(
+ dict(
+ config=[
+ dict(
+ afi="ipv4",
+ rule_sets=[
+ dict(
+ name="V4-INGRESS",
+ description="This is IPv4 INGRESS rule set",
+ default_action="accept",
+ enable_default_log=True,
+ rules=[
+ dict(
+ number="101",
+ action="accept",
+ protocol="udp",
+ ),
+ ],
+ ),
+ ],
+ ),
+ dict(
+ afi="ipv6",
+ rule_sets=[
+ dict(
+ name="EGRESS",
+ default_action="reject",
+ description="This rule-set is configured by Ansible RM",
+ rules=[
+ dict(
+ number="20",
+ action="accept",
+ protocol="udp",
+ ),
+ ],
+ ),
+ ],
+ ),
+ ],
+ state="overridden",
+ ),
+ )
+ commands = [
+ "delete firewall ipv6 name V6-INGRESS",
+ "delete firewall ipv6 name EGRESS",
+ "delete firewall ipv4 name V4-INGRESS",
+ "delete firewall ipv4 name EGRESS",
+ "delete firewall ipv4 input filter",
+ "delete firewall ipv4 output filter",
+ "delete firewall ipv6 input filter",
+ "delete firewall ipv6 output filter",
+ "delete firewall ipv4 name IF-TEST",
+ "set firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS default-log",
+ "set firewall ipv4 name V4-INGRESS description 'This is IPv4 INGRESS rule set'",
+ "set firewall ipv4 name V4-INGRESS default-action 'accept'",
+ "set firewall ipv4 name V4-INGRESS rule 101 protocol 'udp'",
+ "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'",
+ "set firewall ipv6 name EGRESS description 'This rule-set is configured by Ansible RM'",
+ "set firewall ipv6 name EGRESS default-action 'reject'",
+ "set firewall ipv6 name EGRESS rule 20",
+ "set firewall ipv6 name EGRESS rule 20 protocol 'udp'",
+ "set firewall ipv6 name EGRESS rule 20 action 'accept'"
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_v4v6_rule_sets_rule_ovr_idem_01(self):
+ """Test that the plugin is idempotent in overridden state
+ if there are no changes to the rule sets
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
- ipsec="match-ipsec",
+ packet_length_exclude=[dict(length=100), dict(length=300)],
protocol="icmp",
- fragment="match-frag",
disable=True,
log="enable",
)
],
),
+ dict(
+ filter="input",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="INGRESS",
+ ),
+ ],
+ ),
+ dict(
+ filter="output",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="EGRESS",
+ ),
+ ],
+ ),
+ dict(
+ name="IF-TEST",
+ rules=[
+ dict(
+ number="10",
+ action="accept",
+ icmp=dict(type_name="echo-request"),
+ state=dict(related=True),
+ inbound_interface=dict(name="eth0"),
+ outbound_interface=dict(group="the-ethers"),
+ disable=True,
+ )
+ ],
+ ),
dict(
name="EGRESS",
default_action="reject",
),
],
),
dict(
afi="ipv6",
rule_sets=[
dict(
name="V6-INGRESS",
default_action="accept",
),
dict(
name="EGRESS",
default_action="reject",
rules=[
dict(
icmp=dict(type_name="echo-request"),
number=20,
),
],
),
+ dict(
+ filter="input",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="V6-INGRESS",
+ ),
+ ],
+ ),
+ dict(
+ filter="output",
+ rules=[
+ dict(
+ number="1",
+ action="jump",
+ jump_target="EGRESS",
+ ),
+ ],
+ ),
],
),
],
state="overridden",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_v6_rule_sets_rule_merged_01_version(self):
- self.get_os_version.return_value = "1.4"
+ """Test if plugin correctly adds ipv6 rule set with rules
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv6 INBOUND rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
disable=True,
icmp=dict(type_name="echo-request"),
log="enable",
),
dict(
number="102",
action="reject",
description="Rule 102 is configured by Ansible",
protocol="ipv6-icmp",
icmp=dict(type=7),
),
],
),
],
),
],
state="merged",
),
)
commands = [
"set firewall ipv6 name INBOUND default-action 'accept'",
"set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set'",
"set firewall ipv6 name INBOUND default-log",
"set firewall ipv6 name INBOUND rule 101 protocol 'icmp'",
"set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
"set firewall ipv6 name INBOUND rule 101",
"set firewall ipv6 name INBOUND rule 101 disable",
"set firewall ipv6 name INBOUND rule 101 action 'accept'",
"set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'",
"set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request",
- "set firewall ipv6 name INBOUND rule 101 log 'enable'",
+ "set firewall ipv6 name INBOUND rule 101 log",
"set firewall ipv6 name INBOUND rule 102",
"set firewall ipv6 name INBOUND rule 102 action 'reject'",
"set firewall ipv6 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'",
"set firewall ipv6 name INBOUND rule 102 protocol 'ipv6-icmp'",
'set firewall ipv6 name INBOUND rule 102 icmpv6 type 7',
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_jump_rules_merged_01(self):
- self.get_os_version.return_value = "1.4"
+ """Test if plugin correctly adds rule set with a jump action
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
description="This is IPv6 INBOUND rule set with a jump action",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="jump",
description="Rule 101 is configured by Ansible",
ipsec="match-ipsec",
protocol="icmp",
icmp=dict(type_name="echo-request"),
jump_target="PROTECT-RE",
packet_length_exclude=[dict(length=100), dict(length=200)]
),
dict(
number="102",
action="reject",
description="Rule 102 is configured by Ansible",
protocol="ipv6-icmp",
icmp=dict(type=7),
),
],
),
],
)
],
state="merged",
)
)
commands = [
"set firewall ipv6 name INBOUND default-action 'accept'",
"set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'",
"set firewall ipv6 name INBOUND default-log",
"set firewall ipv6 name INBOUND rule 101 protocol 'icmp'",
"set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100",
"set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200",
"set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
"set firewall ipv6 name INBOUND rule 101",
"set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'",
"set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request",
"set firewall ipv6 name INBOUND rule 101 action 'jump'",
"set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'",
"set firewall ipv6 name INBOUND rule 102",
"set firewall ipv6 name INBOUND rule 102 action 'reject'",
"set firewall ipv6 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'",
"set firewall ipv6 name INBOUND rule 102 protocol 'ipv6-icmp'",
'set firewall ipv6 name INBOUND rule 102 icmpv6 type 7',
]
self.execute_module(changed=True, commands=commands)
-
-class TestVyosFirewallRulesModule14(TestVyosModule):
- module = vyos_firewall_rules
-
- def setUp(self):
- super(TestVyosFirewallRulesModule14, self).setUp()
- self.mock_get_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config"
- )
- self.get_config = self.mock_get_config.start()
-
- self.mock_load_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config"
- )
- self.load_config = self.mock_load_config.start()
-
- self.mock_get_resource_connection_config = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection"
- )
- self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
-
- self.mock_get_resource_connection_facts = patch(
- "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection"
- )
- self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
- self.mock_execute_show_command = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data"
- )
-
- self.mock_execute_show_command = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data"
- )
- self.execute_show_command = self.mock_execute_show_command.start()
-
- self.mock_get_os_version = patch(
- "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version"
- )
- self.get_os_version = self.mock_get_os_version.start()
- self.get_os_version.return_value = "1.4"
- self.maxDiff = None
-
- def tearDown(self):
- super(TestVyosFirewallRulesModule14, self).tearDown()
- self.mock_get_resource_connection_config.stop()
- self.mock_get_resource_connection_facts.stop()
- self.mock_get_config.stop()
- self.mock_load_config.stop()
- self.mock_execute_show_command.stop()
- self.mock_get_os_version.stop()
-
- def load_fixtures(self, commands=None, filename=None):
- def load_from_file(*args, **kwargs):
- return load_fixture("vyos_firewall_rules_config_v14.cfg")
-
- self.execute_show_command.side_effect = load_from_file
-
- def test_vyos_firewall_packet_length_merged_01(self):
+ def test_vyos_firewall_log_merged_01(self):
+ """Test if new stanza log is correctly applied"""
set_module_args(
dict(
config=[
dict(
afi="ipv6",
rule_sets=[
dict(
name="INBOUND",
- description="This is IPv6 INBOUND rule set with a jump action",
+ description="This is IPv6 INBOUND rule set with a log",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
- action="jump",
+ action="accept",
description="Rule 101 is configured by Ansible",
- jump_target="PROTECT-RE",
- packet_length_exclude=[dict(length=100), dict(length=200)],
- packet_length=[dict(length=22)]
+ log="enable",
),
],
),
],
)
],
state="merged",
)
)
commands = [
"set firewall ipv6 name INBOUND default-action 'accept'",
- "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'",
+ "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a log'",
"set firewall ipv6 name INBOUND default-log",
- "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100",
- "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200",
- "set firewall ipv6 name INBOUND rule 101 packet-length 22",
+ "set firewall ipv6 name INBOUND rule 101 log",
"set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'",
"set firewall ipv6 name INBOUND rule 101",
- "set firewall ipv6 name INBOUND rule 101 action 'jump'",
- "set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'",
+ "set firewall ipv6 name INBOUND rule 101 action 'accept'",
]
self.maxDiff = None
self.execute_module(changed=True, commands=commands)
- def test_vyos_firewall_packet_length_replace_01(self):
+ def test_vyos_firewall_log_replace_01(self):
+ """Test that stanza is correctly replaced
+ without touching the other stanzas
+ """
set_module_args(
dict(
config=[
dict(
afi="ipv4",
rule_sets=[
dict(
name="V4-INGRESS",
description="This is IPv4 V4-INGRESS rule set",
default_action="accept",
enable_default_log=True,
rules=[
dict(
number="101",
action="accept",
description="Rule 101 is configured by Ansible",
packet_length_exclude=[dict(length=100), dict(length=200)],
- packet_length=[dict(length=22)]
+ packet_length=[dict(length=22)],
+ log="enable",
),
],
),
],
)
],
state="replaced",
)
)
commands = [
- "delete firewall ipv4 name V4-INGRESS rule 101 protocol",
- "delete firewall ipv4 name V4-INGRESS rule 101 disable",
- "delete firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 300",
+ "delete firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS rule 101",
+ "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'",
+ "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
+ "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 100",
"set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 200",
"set firewall ipv4 name V4-INGRESS rule 101 packet-length 22",
+ "set firewall ipv4 name V4-INGRESS rule 101 log",
]
self.maxDiff = None
self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_filter_merged_01(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- filter="input",
- description="This is IPv6 INBOUND rule set with a jump action",
- default_action="accept",
- enable_default_log=True,
- rules=[
- dict(
- number="101",
- action="jump",
- description="Rule 101 is configured by Ansible",
- jump_target="PROTECT-RE",
- packet_length_exclude=[dict(length=100), dict(length=200)],
- packet_length=[dict(length=22)]
- ),
- ],
- ),
- ],
- )
- ],
- state="merged",
- )
- )
- commands = [
- "set firewall ipv6 input filter default-action 'accept'",
- "set firewall ipv6 input filter description 'This is IPv6 INBOUND rule set with a jump action'",
- "set firewall ipv6 input filter default-log",
- "set firewall ipv6 input filter rule 101 packet-length-exclude 100",
- "set firewall ipv6 input filter rule 101 packet-length-exclude 200",
- "set firewall ipv6 input filter rule 101 packet-length 22",
- "set firewall ipv6 input filter rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 input filter rule 101",
- "set firewall ipv6 input filter rule 101 action 'jump'",
- "set firewall ipv6 input filter rule 101 jump-target 'PROTECT-RE'",
- ]
- self.maxDiff = None
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_interface_merged_01(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- name="V6-INGRESS",
- description="This is IPv6 INBOUND rule set with a jump action",
- default_action="accept",
- rules=[
- dict(
- number="101",
- action="jump",
- description="Rule 101 is configured by Ansible",
- jump_target="PROTECT-RE",
- inbound_interface=dict(name="eth0"),
- outbound_interface=dict(group="eth1"),
- ),
- ],
- ),
- ],
- )
- ],
- state="merged",
- )
- )
- commands = [
- "set firewall ipv6 name V6-INGRESS description 'This is IPv6 INBOUND rule set with a jump action'",
- "set firewall ipv6 name V6-INGRESS rule 101 inbound-interface name eth0",
- "set firewall ipv6 name V6-INGRESS rule 101 outbound-interface group eth1",
- "set firewall ipv6 name V6-INGRESS rule 101 description 'Rule 101 is configured by Ansible'",
- "set firewall ipv6 name V6-INGRESS rule 101",
- "set firewall ipv6 name V6-INGRESS rule 101 action 'jump'",
- "set firewall ipv6 name V6-INGRESS rule 101 jump-target 'PROTECT-RE'",
- ]
- self.maxDiff = None
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_interface_replace_02(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="IF-TEST",
- description="Changed",
- rules=[
- dict(
- number="10",
- action="accept",
- description="Rule 10 is configured by Ansible",
- inbound_interface=dict(name="eth1"),
- ),
- ],
- ),
- ],
- )
- ],
- state="replaced",
- )
- )
- commands = [
- "set firewall ipv4 name IF-TEST description 'Changed'",
- "set firewall ipv4 name IF-TEST rule 10 description 'Rule 10 is configured by Ansible'",
- 'set firewall ipv4 name IF-TEST rule 10 inbound-interface name eth1',
- "delete firewall ipv4 name IF-TEST rule 10 outbound-interface group",
- "delete firewall ipv4 name IF-TEST rule 10 disable",
- "delete firewall ipv4 name IF-TEST rule 10 state related",
- "delete firewall ipv4 name IF-TEST rule 10 icmp type-name echo-request",
- ]
- self.maxDiff = None
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v4_rule_sets_rule_merged_02(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="INBOUND",
- rules=[
- dict(
- number="101",
- protocol="tcp",
- source=dict(
- address="192.0.2.0",
- mac_address="38:00:25:19:76:0c",
- port=2127,
- ),
- destination=dict(address="192.0.1.0", port=2124),
- limit=dict(
- burst=10,
- rate=dict(number=20, unit="second"),
- ),
- recent=dict(count=10, time=20),
- state=dict(
- established=True,
- related=True,
- invalid=True,
- new=True,
- ),
- ),
- ],
- ),
- ],
- ),
- ],
- state="merged",
- ),
- )
- commands = [
- "set firewall ipv4 name INBOUND rule 101 protocol 'tcp'",
- "set firewall ipv4 name INBOUND rule 101 destination port 2124",
- "set firewall ipv4 name INBOUND rule 101",
- "set firewall ipv4 name INBOUND rule 101 destination address 192.0.1.0",
- "set firewall ipv4 name INBOUND rule 101 source address 192.0.2.0",
- "set firewall ipv4 name INBOUND rule 101 source mac-address 38:00:25:19:76:0c",
- "set firewall ipv4 name INBOUND rule 101 source port 2127",
- "set firewall ipv4 name INBOUND rule 101 state new",
- "set firewall ipv4 name INBOUND rule 101 state invalid",
- "set firewall ipv4 name INBOUND rule 101 state related",
- "set firewall ipv4 name INBOUND rule 101 state established",
- "set firewall ipv4 name INBOUND rule 101 limit burst 10",
- "set firewall ipv4 name INBOUND rule 101 limit rate 20/second",
- "set firewall ipv4 name INBOUND rule 101 recent count 10",
- "set firewall ipv4 name INBOUND rule 101 recent time 20",
- ]
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v4_rule_sets_change_state_01(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv4",
- rule_sets=[
- dict(
- name="IF-TEST",
- rules=[
- dict(
- number="10",
- disable=False,
- action="accept",
- state=dict(
- established=True,
- new=True,
- ),
- ),
- ],
- ),
- ],
- ),
- ],
- state="replaced",
- ),
- )
- commands = [
- "delete firewall ipv4 name IF-TEST rule 10 disable",
- "delete firewall ipv4 name IF-TEST rule 10 inbound-interface name",
- "delete firewall ipv4 name IF-TEST rule 10 icmp type-name echo-request",
- "delete firewall ipv4 name IF-TEST rule 10 outbound-interface group",
- "delete firewall ipv4 name IF-TEST rule 10 state related",
- "set firewall ipv4 name IF-TEST rule 10 state established",
- "set firewall ipv4 name IF-TEST rule 10 state new",
- ]
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v4v6_rule_sets_del_03(self):
- set_module_args(dict(config=[], state="deleted"))
- commands = ["delete firewall ipv4", "delete firewall ipv6"]
- self.execute_module(changed=True, commands=commands)
-
- def test_vyos_firewall_v6_rule_sets_rule_merged_04(self):
- set_module_args(
- dict(
- config=[
- dict(
- afi="ipv6",
- rule_sets=[
- dict(
- name="INBOUND",
- rules=[
- dict(
- number="101",
- time=dict(
- monthdays="2",
- startdate="2020-01-24",
- starttime="13:20:00",
- stopdate="2020-01-28",
- stoptime="13:30:00",
- weekdays="!Sat,Sun",
- utc=True,
- ),
- tcp=dict(
- flags=[
- dict(flag="all"),
- ]
- ),
- ),
- dict(
- number="102",
- tcp=dict(
- flags=[
- dict(flag="ack"),
- dict(flag="syn"),
- dict(flag="fin", invert=True),
- ],
- )
- )
- ],
- ),
- ],
- ),
- ],
- state="merged",
- ),
- )
- commands = [
- "set firewall ipv6 name INBOUND rule 101",
- "set firewall ipv6 name INBOUND rule 101 tcp flags all",
- "set firewall ipv6 name INBOUND rule 101 time utc",
- "set firewall ipv6 name INBOUND rule 101 time monthdays 2",
- "set firewall ipv6 name INBOUND rule 101 time startdate 2020-01-24",
- "set firewall ipv6 name INBOUND rule 101 time stopdate 2020-01-28",
- "set firewall ipv6 name INBOUND rule 101 time weekdays !Sat,Sun",
- "set firewall ipv6 name INBOUND rule 101 time stoptime 13:30:00",
- "set firewall ipv6 name INBOUND rule 101 time starttime 13:20:00",
- "set firewall ipv6 name INBOUND rule 102",
- "set firewall ipv6 name INBOUND rule 102 tcp flags ack",
- "set firewall ipv6 name INBOUND rule 102 tcp flags not fin",
- "set firewall ipv6 name INBOUND rule 102 tcp flags syn",
- ]
- self.execute_module(changed=True, commands=commands)