|
@@ -6,11 +6,8 @@ Test collection management
|
|
|
|
|
|
from http import HTTPStatus
|
|
|
import json
|
|
|
-import os
|
|
|
-import pwd
|
|
|
|
|
|
import pytest
|
|
|
-import yaml
|
|
|
|
|
|
pytestmark = pytest.mark.docker
|
|
|
|
|
@@ -85,12 +82,7 @@ def test_taint_bubble_up(crowdsec, tmp_path_factory, flavor):
|
|
|
'COLLECTIONS': f'{coll}'
|
|
|
}
|
|
|
|
|
|
- hub = tmp_path_factory.mktemp("hub")
|
|
|
- volumes = {
|
|
|
- hub: {'bind': '/etc/crowdsec/hub', 'mode': 'rw'}
|
|
|
- }
|
|
|
-
|
|
|
- with crowdsec(flavor=flavor, environment=env, volumes=volumes) as cs:
|
|
|
+ with crowdsec(flavor=flavor, environment=env) as cs:
|
|
|
cs.wait_for_http(8080, '/health', want_status=HTTPStatus.OK)
|
|
|
res = cs.cont.exec_run('cscli collections list -o json')
|
|
|
assert res.exit_code == 0
|
|
@@ -102,25 +94,13 @@ def test_taint_bubble_up(crowdsec, tmp_path_factory, flavor):
|
|
|
f'*Enabled collections : {coll}*',
|
|
|
])
|
|
|
|
|
|
- # change file permissions to allow edit
|
|
|
- current_uid = pwd.getpwuid(os.getuid()).pw_uid
|
|
|
- res = cs.cont.exec_run(f'chown -R {current_uid} /etc/crowdsec/hub')
|
|
|
- assert res.exit_code == 0
|
|
|
-
|
|
|
- scenario = 'crowdsecurity/http-crawl-non_statics'
|
|
|
- scenario_file = hub / f'scenarios/{scenario}.yaml'
|
|
|
+ scenario = 'crowdsecurity/http-crawl-non_statics'
|
|
|
|
|
|
- with open(scenario_file) as f:
|
|
|
- yml = yaml.safe_load(f)
|
|
|
-
|
|
|
- yml['description'] += ' (tainted)'
|
|
|
- # won't be able to read it back because description is taken from the index
|
|
|
-
|
|
|
- with open(scenario_file, 'w') as f:
|
|
|
- yaml.dump(yml, f)
|
|
|
+ # the description won't be read back, it's from the index
|
|
|
+ yq_command = f"yq -e -i '.description=\"tainted\"' /etc/crowdsec/hub/scenarios/{scenario}.yaml"
|
|
|
+ res = cs.cont.exec_run(yq_command)
|
|
|
+ assert res.exit_code == 0
|
|
|
|
|
|
- with crowdsec(flavor=flavor, environment=env, volumes=volumes) as cs:
|
|
|
- cs.wait_for_http(8080, '/health', want_status=HTTPStatus.OK)
|
|
|
res = cs.cont.exec_run(f'cscli scenarios inspect {scenario} -o json')
|
|
|
assert res.exit_code == 0
|
|
|
j = json.loads(res.output)
|