CI: functional docker tests (#2056)

This commit is contained in:
mmetc 2023-02-20 14:55:56 +01:00 committed by GitHub
parent 19a01d20dd
commit 8fce946850
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1242 additions and 2 deletions

65
.github/workflows/docker-test.yml vendored Normal file
View file

@ -0,0 +1,65 @@
name: Test Docker images
on:
push:
branches:
- master
- releases/**
paths-ignore:
- 'README.md'
pull_request:
branches:
- master
- releases/**
paths-ignore:
- 'README.md'
jobs:
test_docker_image:
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Build flavors
id: prep
run: |
DOCKER_IMAGE=crowdsecurity/crowdsec
docker build --target full -t "$DOCKER_IMAGE:test" -f Dockerfile .
docker build --target slim -t "$DOCKER_IMAGE:test-slim" -f Dockerfile .
docker build --target full -t "$DOCKER_IMAGE:test-debian" -f Dockerfile.debian .
- name: "Setup Python"
uses: actions/setup-python@v4
with:
python-version: "3.x"
- name: "Install pipenv"
run: |
cd docker/test
python -m pip install --upgrade pipenv wheel
# - id: cache-pipenv
# uses: actions/cache@v3
# with:
# path: ~/.local/share/virtualenvs
# key: ${{ runner.os }}-pipenv-${{ hashFiles('**/Pipfile.lock') }}
- name: "Install dependencies"
if: steps.cache-pipenv.outputs.cache-hit != 'true'
run: |
cd docker/test
pipenv install --deploy --dev
docker network create net-test
- name: "Run tests"
env:
CROWDSEC_TEST_VERSION: test
CROWDSEC_TEST_FLAVORS: full,slim,debian
CROWDSEC_TEST_NETWORK: net-test
run: |
cd docker/test
pipenv run pytest --durations=0 --color=yes

View file

@ -38,6 +38,7 @@ jobs:
uses: docker/setup-qemu-action@v2 uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2 uses: docker/setup-buildx-action@v2
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v2
with: with:

5
.gitignore vendored
View file

@ -44,3 +44,8 @@ msi
*.msi *.msi
**/*.nupkg **/*.nupkg
*.tgz *.tgz
# Python
__pycache__
*.py[cod]
*.egg-info

View file

@ -30,8 +30,8 @@ since v1.4.2:
- `crowdsecurity/crowdsec:slim` - `crowdsecurity/crowdsec:slim`
Reduced size by 60%, does not include notifier plugins nor the GeoIP database. Reduced size by 60%, does not include notifier plugins nor the GeoIP database.
If you need these details on decisions, running `cscli hub upgrade` inside the If you need these details on decisions, run `cscli hub upgrade` inside the
container downloads the GeoIP database at runtime. container to download the GeoIP database at runtime.
### Debian (since v1.3.3) ### Debian (since v1.3.3)

11
docker/test/Pipfile Normal file
View file

@ -0,0 +1,11 @@
[packages]
pytest-dotenv = "*"
pytest-xdist = "*"
gnureadline = "*"
ipdb = "*"
pytest-cs = {ref = "main", git = "https://github.com/crowdsecurity/pytest-cs.git"}
[dev-packages]
[requires]
python_version = "3.10"

11
docker/test/default.env Normal file
View file

@ -0,0 +1,11 @@
# CROWDSEC_TEST_VERSION="test"
# CROWDSEC_TEST_VERSION="v1.5.0"
CROWDSEC_TEST_VERSION="dev"
# all of the following will be tests
# when using the "flavor" fixture
CROWDSEC_TEST_FLAVORS="full"
# CROWDSEC_TEST_FLAVORS="full,slim,debian"
# CROWDSEC_TEST_FLAVORS="full,slim,debian,geoip,plugins-debian-slim,debian-geoip,debian-plugins"
CROWDSEC_TEST_NETWORK="net-test"

View file

@ -0,0 +1,6 @@
[pytest]
# run all tests sequentially, drop to pdb on first failure
addopts = -n 0 --no-header --pdb --pdbcls=IPython.terminal.debugger:Pdb
env_files =
.env
default.env

7
docker/test/pytest.ini Normal file
View file

@ -0,0 +1,7 @@
[pytest]
# run all tests in parallel, compact output
addopts = -n 4 --no-header
required_plugins = pytest-xdist
env_files =
.env
default.env

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python
import time
import pytest
import requests
pytestmark = pytest.mark.compose
def test_compose_simple(compose, datadir):
with compose(datadir / 'docker-compose.yml') as project:
j = project.ps()
assert len(j) == 1
assert j[0]['Name'] == 'test_compose-server-1'
assert j[0]['State'] == 'running'
assert j[0]['Publishers'][0]['TargetPort'] == 8000
port = j[0]['Publishers'][0]['PublishedPort']
# XXX: should retry with a timeout
time.sleep(.5)
assert requests.get(f'http://localhost:{port}').status_code == 200

View file

@ -0,0 +1,8 @@
version: "3"
services:
server:
image: python:alpine
command: python -m http.server 8000
ports:
- 8000

View file

@ -0,0 +1,11 @@
pytest_plugins = ("cs",)
def pytest_configure(config):
config.addinivalue_line(
'markers', 'docker: mark tests for lone or manually orchestrated containers'
)
config.addinivalue_line(
'markers', 'compose: mark tests for docker compose projects'
)

View file

@ -0,0 +1,37 @@
#!/usr/bin/env python
from http import HTTPStatus
import random
import pytest
from pytest_cs import wait_for_log, wait_for_http
pytestmark = pytest.mark.docker
def test_split_lapi_agent(crowdsec):
rand = str(random.randint(0, 10000))
lapiname = f'lapi-{rand}'
agentname = f'agent-{rand}'
lapi_env = {
'AGENT_USERNAME': 'testagent',
'AGENT_PASSWORD': 'testpassword',
}
agent_env = {
'AGENT_USERNAME': 'testagent',
'AGENT_PASSWORD': 'testpassword',
'DISABLE_LOCAL_API': 'true',
'LOCAL_API_URL': f'http://{lapiname}:8080',
}
with crowdsec(name=lapiname, environment=lapi_env) as lapi, crowdsec(name=agentname, environment=agent_env) as agent:
wait_for_log(lapi, "*CrowdSec Local API listening on 0.0.0.0:8080*")
wait_for_log(agent, "*Starting processing data*")
wait_for_http(lapi, 8080, '/health', want_status=HTTPStatus.OK)
res = agent.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout

View file

@ -0,0 +1,61 @@
#!/usr/bin/env python
"""
Test bouncer management: pre-installed, run-time installation and removal.
"""
import hashlib
from http import HTTPStatus
import json
import pytest
from pytest_cs import wait_for_log, wait_for_http
pytestmark = pytest.mark.docker
def hex512(s):
"""Return the sha512 hash of a string as a hex string"""
return hashlib.sha512(s.encode()).hexdigest()
def test_register_bouncer_env(crowdsec, flavor):
"""Test installing bouncers at startup, from envvar"""
env = {
'BOUNCER_KEY_bouncer1name': 'bouncer1key',
'BOUNCER_KEY_bouncer2name': 'bouncer2key'
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli bouncers list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
assert len(j) == 2
bouncer1, bouncer2 = j
assert bouncer1['name'] == 'bouncer1name'
assert bouncer2['name'] == 'bouncer2name'
assert bouncer1['api_key'] == hex512('bouncer1key')
assert bouncer2['api_key'] == hex512('bouncer2key')
# add a second bouncer at runtime
res = cont.exec_run('cscli bouncers add bouncer3name -k bouncer3key')
assert res.exit_code == 0
res = cont.exec_run('cscli bouncers list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
assert len(j) == 3
bouncer3 = j[2]
assert bouncer3['name'] == 'bouncer3name'
assert bouncer3['api_key'] == hex512('bouncer3key')
# remove all bouncers
res = cont.exec_run('cscli bouncers delete bouncer1name bouncer2name bouncer3name')
assert res.exit_code == 0
res = cont.exec_run('cscli bouncers list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
assert len(j) == 0

View file

@ -0,0 +1,46 @@
#!/usr/bin/env python
from http import HTTPStatus
from pytest_cs import log_lines, wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_no_capi(crowdsec, flavor):
"""Test no CAPI (disabled by default in tests)"""
env = {
'DISABLE_ONLINE_API': 'true',
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli capi status')
assert res.exit_code == 1
assert "You can successfully interact with Central API (CAPI)" not in res.output.decode()
logs = log_lines(cont)
assert not any("Successfully registered to Central API (CAPI)" in line for line in logs)
assert not any("Registration to online API done" in line for line in logs)
def test_capi(crowdsec, flavor):
"""Test CAPI"""
env = {
'DISABLE_ONLINE_API': 'false',
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli capi status')
assert res.exit_code == 0
assert "You can successfully interact with Central API (CAPI)" in res.output.decode()
wait_for_log(cont, [
"*Successfully registered to Central API (CAPI)*",
"*Registration to online API done*",
])

View file

@ -0,0 +1,52 @@
#!/usr/bin/env python
import datetime
from pytest_cs import wait_for_log, Status
import pytest
pytestmark = pytest.mark.docker
def test_cold_logs(crowdsec, tmp_path_factory, flavor):
env = {
'DSN': 'file:///var/log/toto.log',
}
logs = tmp_path_factory.mktemp("logs")
now = datetime.datetime.now() - datetime.timedelta(minutes=1)
with open(logs / "toto.log", "w") as f:
# like date '+%b %d %H:%M:%S' but in python
for i in range(10):
ts = (now + datetime.timedelta(seconds=i)).strftime('%b %d %H:%M:%S')
f.write(ts + ' sd-126005 sshd[12422]: Invalid user netflix from 1.1.1.172 port 35424\n')
volumes = {
logs / "toto.log": {'bind': '/var/log/toto.log', 'mode': 'ro'},
}
# missing type
with crowdsec(flavor=flavor, environment=env, volumes=volumes, wait_status=Status.EXITED) as cont:
wait_for_log(cont, "*-dsn requires a -type argument*")
env['TYPE'] = 'syslog'
with crowdsec(flavor=flavor, environment=env, volumes=volumes) as cont:
wait_for_log(cont, [
"*Adding file /var/log/toto.log to filelist*",
"*reading /var/log/toto.log at once*",
"*Ip 1.1.1.172 performed 'crowdsecurity/ssh-bf' (6 events over 5s)*",
"*crowdsec shutdown*"
])
def test_cold_logs_missing_dsn(crowdsec, flavor):
env = {
'TYPE': 'syslog',
}
with crowdsec(flavor=flavor, environment=env, wait_status=Status.EXITED) as cont:
wait_for_log(cont, "*-type requires a -dsn argument*")

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
"""
Test basic behavior of all the image variants
"""
from http import HTTPStatus
import pytest
from pytest_cs import wait_for_log, wait_for_http
pytestmark = pytest.mark.docker
def test_cscli_lapi(crowdsec, flavor):
"""Test if cscli can talk to lapi"""
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
x = cont.exec_run('cscli lapi status')
assert x.exit_code == 0
stdout = x.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
def test_flavor_content(crowdsec, flavor):
"""Test flavor contents"""
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
x = cont.exec_run('ls -1 /var/lib/crowdsec/data/')
assert x.exit_code == 0
stdout = x.output.decode()
if 'slim' in flavor or 'plugins' in flavor:
assert 'GeoLite2-City.mmdb' not in stdout
assert 'GeoLite2-ASN.mmdb' not in stdout
else:
assert 'GeoLite2-City.mmdb' in stdout
assert 'GeoLite2-ASN.mmdb' in stdout
assert 'crowdsec.db' in stdout
x = cont.exec_run(
'ls -1 /usr/local/lib/crowdsec/plugins/')
stdout = x.output.decode()
if 'slim' in flavor or 'geoip' in flavor:
# the exact return code and full message depend
# on the 'ls' implementation (busybox vs coreutils)
assert x.exit_code != 0
assert 'No such file or directory' in stdout
assert 'notification-email' not in stdout
assert 'notification-http' not in stdout
assert 'notification-slack' not in stdout
assert 'notification-splunk' not in stdout
else:
assert x.exit_code == 0
assert 'notification-email' in stdout
assert 'notification-http' in stdout
assert 'notification-slack' in stdout
assert 'notification-splunk' in stdout

View file

@ -0,0 +1,36 @@
#!/usr/bin/env python
"""
Smoke tests in case docker is not set up correctly or has connection issues.
"""
import subprocess
import pytest
pytestmark = pytest.mark.docker
def test_docker_cli_run():
"""Test if docker run works from the command line. Capture stdout too"""
res = subprocess.run(['docker', 'run', '--rm', 'hello-world'],
capture_output=True, text=True)
assert 0 == res.returncode
assert 'Hello from Docker!' in res.stdout
def test_docker_run(docker_client):
"""Test if docker run works from the python SDK."""
output = docker_client.containers.run('hello-world', remove=True)
lines = output.decode().splitlines()
assert "Hello from Docker!" in lines
def test_docker_run_detach(docker_client):
"""Test with python SDK (async)."""
cont = docker_client.containers.run('hello-world', detach=True)
assert cont.status == 'created'
assert cont.attrs['State']['ExitCode'] == 0
lines = cont.logs().decode().splitlines()
assert "Hello from Docker!" in lines
cont.remove(force=True)

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python
"""
Test pre-installed hub items.
"""
from http import HTTPStatus
import json
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_preinstalled_hub(crowdsec, flavor):
"""Test hub objects installed in the entrypoint"""
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli hub list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
collections = {c['name']: c for c in j['collections']}
assert collections['crowdsecurity/linux']['status'] == 'enabled'
parsers = {c['name']: c for c in j['parsers']}
assert parsers['crowdsecurity/whitelists']['status'] == 'enabled'
assert parsers['crowdsecurity/docker-logs']['status'] == 'enabled'

View file

@ -0,0 +1,77 @@
#!/usr/bin/env python
"""
Test collection management
"""
from http import HTTPStatus
import json
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_install_two_collections(crowdsec, flavor):
"""Test installing collections at startup"""
it1 = 'crowdsecurity/apache2'
it2 = 'crowdsecurity/asterisk'
env = {
'COLLECTIONS': f'{it1} {it2}'
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli collections list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name']: c for c in j['collections']}
assert items[it1]['status'] == 'enabled'
assert items[it2]['status'] == 'enabled'
wait_for_log(cont, [
# f'*collections install "{it1}"*'
# f'*collections install "{it2}"*'
f'*Enabled collections : {it1}*',
f'*Enabled collections : {it2}*',
])
def test_disable_collection(crowdsec, flavor):
"""Test removing a pre-installed collection at startup"""
it = 'crowdsecurity/linux'
env = {
'DISABLE_COLLECTIONS': it
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli collections list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['collections']}
assert it not in items
wait_for_log(cont, [
# f'*collections remove "{it}*",
f'*Removed symlink [[]{it}[]]*',
])
def test_install_and_disable_collection(crowdsec, flavor):
"""Declare a collection to install AND disable: disable wins"""
it = 'crowdsecurity/apache2'
env = {
'COLLECTIONS': it,
'DISABLE_COLLECTIONS': it,
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli collections list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['collections']}
assert it not in items
logs = cont.logs().decode().splitlines()
# check that there was no attempt to install
assert not any(f'Enabled collections : {it}' in line for line in logs)

View file

@ -0,0 +1,76 @@
#!/usr/bin/env python
"""
Test parser management
"""
from http import HTTPStatus
import json
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_install_two_parsers(crowdsec, flavor):
"""Test installing parsers at startup"""
it1 = 'crowdsecurity/cpanel-logs'
it2 = 'crowdsecurity/cowrie-logs'
env = {
'PARSERS': f'{it1} {it2}'
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
f'*parsers install "{it1}"*',
f'*parsers install "{it2}"*',
"*Starting processing data*"
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli parsers list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name']: c for c in j['parsers']}
assert items[it1]['status'] == 'enabled'
assert items[it2]['status'] == 'enabled'
# XXX check that the parser is preinstalled by default
def test_disable_parser(crowdsec, flavor):
"""Test removing a pre-installed parser at startup"""
it = 'crowdsecurity/whitelists'
env = {
'DISABLE_PARSERS': it
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
f'*parsers remove "{it}"*',
"*Starting processing data*",
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli parsers list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['parsers']}
assert it not in items
def test_install_and_disable_parser(crowdsec, flavor):
"""Declare a parser to install AND disable: disable wins"""
it = 'crowdsecurity/cpanel-logs'
env = {
'PARSERS': it,
'DISABLE_PARSERS': it,
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli parsers list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['parsers']}
assert it not in items
logs = cont.logs().decode().splitlines()
# check that there was no attempt to install
assert not any(f'parsers install "{it}"' in line for line in logs)

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
"""
Test postoverflow management
"""
from http import HTTPStatus
import json
import pytest
from pytest_cs import wait_for_log, wait_for_http
pytestmark = pytest.mark.docker
def test_install_two_postoverflows(crowdsec, flavor):
"""Test installing postoverflows at startup"""
it1 = 'crowdsecurity/cdn-whitelist'
it2 = 'crowdsecurity/ipv6_to_range'
env = {
'POSTOVERFLOWS': f'{it1} {it2}'
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
f'*postoverflows install "{it1}"*',
f'*postoverflows install "{it2}"*',
"*Starting processing data*"
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli postoverflows list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name']: c for c in j['postoverflows']}
assert items[it1]['status'] == 'enabled'
assert items[it2]['status'] == 'enabled'
def test_disable_postoverflow():
"""Test removing a pre-installed postoverflow at startup"""
pytest.skip("we don't preinstall postoverflows")
def test_install_and_disable_postoverflow(crowdsec, flavor):
"""Declare a postoverflow to install AND disable: disable wins"""
it = 'crowdsecurity/cdn-whitelist'
env = {
'POSTOVERFLOWS': it,
'DISABLE_POSTOVERFLOWS': it,
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli postoverflows list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['postoverflows']}
assert it not in items
logs = cont.logs().decode().splitlines()
# check that there was no attempt to install
assert not any(f'postoverflows install "{it}"' in line for line in logs)

View file

@ -0,0 +1,75 @@
#!/usr/bin/env python
"""
Test scenario management
"""
from http import HTTPStatus
import json
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_install_two_scenarios(crowdsec, flavor):
"""Test installing scenarios at startup"""
it1 = 'crowdsecurity/cpanel-bf-attempt'
it2 = 'crowdsecurity/asterisk_bf'
env = {
'SCENARIOS': f'{it1} {it2}'
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
f'*scenarios install "{it1}*"',
f'*scenarios install "{it2}*"',
"*Starting processing data*"
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli scenarios list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name']: c for c in j['scenarios']}
assert items[it1]['status'] == 'enabled'
assert items[it2]['status'] == 'enabled'
def test_disable_scenario(crowdsec, flavor):
"""Test removing a pre-installed scenario at startup"""
it = 'crowdsecurity/ssh-bf'
env = {
'DISABLE_SCENARIOS': it
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
f'*scenarios remove "{it}"*',
"*Starting processing data*"
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli scenarios list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['scenarios']}
assert it not in items
def test_install_and_disable_scenario(crowdsec, flavor):
"""Declare a scenario to install AND disable: disable wins"""
it = 'crowdsecurity/asterisk_bf'
env = {
'SCENARIOS': it,
'DISABLE_SCENARIOS': it,
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli scenarios list -o json')
assert res.exit_code == 0
j = json.loads(res.output)
items = {c['name'] for c in j['scenarios']}
assert it not in items
logs = cont.logs().decode().splitlines()
# check that there was no attempt to install
assert not any(f'scenarios install "{it}"' in line for line in logs)

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python
from http import HTTPStatus
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_local_api_url_default(crowdsec, flavor):
"""Test LOCAL_API_URL (default)"""
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, [
"*CrowdSec Local API listening on 0.0.0.0:8080*",
"*Starting processing data*"
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "on http://0.0.0.0:8080/" in stdout
assert "You can successfully interact with Local API (LAPI)" in stdout
def test_local_api_url(crowdsec, flavor):
"""Test LOCAL_API_URL (custom)"""
env = {
"LOCAL_API_URL": "http://127.0.0.1:8080"
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
"*CrowdSec Local API listening on 0.0.0.0:8080*",
"*Starting processing data*"
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "on http://127.0.0.1:8080/" in stdout
assert "You can successfully interact with Local API (LAPI)" in stdout
def test_local_api_url_ipv6(crowdsec, flavor):
"""Test LOCAL_API_URL (custom with ipv6)"""
pytest.skip("ipv6 not supported yet")
# how to configure docker with ipv6 in a custom network?
# FIXME: https://forums.docker.com/t/assigning-default-ipv6-addresses/128665/3
# FIXME: https://github.com/moby/moby/issues/41438
env = {
"LOCAL_API_URL": "http://[::1]:8080"
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, [
"*Starting processing data*",
"*CrowdSec Local API listening on 0.0.0.0:8080*",
])
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "on http://[::1]:8080/" in stdout
assert "You can successfully interact with Local API (LAPI)" in stdout

View file

@ -0,0 +1,77 @@
#!/usr/bin/env python
from http import HTTPStatus
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_metrics_port_default(crowdsec, flavor):
"""Test metrics"""
metrics_port = 6060
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
wait_for_http(cont, metrics_port, '/metrics', want_status=HTTPStatus.OK)
res = cont.exec_run(f'wget -O - http://127.0.0.1:{metrics_port}/metrics')
if 'executable file not found' in res.output.decode():
# TODO: find an alternative to wget
pytest.skip('wget not found')
assert res.exit_code == 0
stdout = res.output.decode()
assert "# HELP cs_info Information about Crowdsec." in stdout
def test_metrics_port_default_ipv6(crowdsec, flavor):
"""Test metrics (ipv6)"""
pytest.skip('ipv6 not supported yet')
port = 6060
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run(f'wget -O - http://[::1]:{port}/metrics')
if 'executable file not found' in res.output.decode():
# TODO: find an alternative to wget
pytest.skip('wget not found')
assert res.exit_code == 0
stdout = res.output.decode()
assert "# HELP cs_info Information about Crowdsec." in stdout
def test_metrics_port(crowdsec, flavor):
"""Test metrics (custom METRICS_PORT)"""
port = 7070
env = {
"METRICS_PORT": port
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run(f'wget -O - http://127.0.0.1:{port}/metrics')
if 'executable file not found' in res.output.decode():
# TODO: find an alternative to wget
pytest.skip('wget not found')
assert res.exit_code == 0
stdout = res.output.decode()
assert "# HELP cs_info Information about Crowdsec." in stdout
def test_metrics_port_ipv6(crowdsec, flavor):
"""Test metrics (custom METRICS_PORT, ipv6)"""
pytest.skip('ipv6 not supported yet')
port = 7070
env = {
"METRICS_PORT": port
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run(f'wget -O - http://[::1]:{port}/metrics')
if 'executable file not found' in res.output.decode():
# TODO: find an alternative to wget
pytest.skip('wget not found')
assert res.exit_code == 0
stdout = res.output.decode()
assert "# HELP cs_info Information about Crowdsec." in stdout

View file

@ -0,0 +1,22 @@
#!/usr/bin/env python
from http import HTTPStatus
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_no_agent(crowdsec, flavor):
"""Test DISABLE_AGENT=true"""
env = {
'DISABLE_AGENT': 'true',
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*CrowdSec Local API listening on 0.0.0.0:8080*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout

View file

@ -0,0 +1,19 @@
#!/usr/bin/env python
from pytest_cs import wait_for_log, Status
import pytest
pytestmark = pytest.mark.docker
def test_no_agent(crowdsec, flavor):
"""Test DISABLE_LOCAL_API=true (failing stand-alone container)"""
env = {
'DISABLE_LOCAL_API': 'true',
}
# if an alternative lapi url is not defined, the container should exit
with crowdsec(flavor=flavor, environment=env, wait_status=Status.EXITED) as cont:
wait_for_log(cont, "*dial tcp 0.0.0.0:8080: connect: connection refused*")

View file

@ -0,0 +1,18 @@
#!/usr/bin/env python
from pytest_cs import log_waiters
import pytest
pytestmark = pytest.mark.docker
# XXX this is redundant, already tested in pytest_cs
def test_crowdsec(crowdsec):
with crowdsec() as cont:
for waiter in log_waiters(cont):
with waiter as matcher:
matcher.fnmatch_lines(["*Starting processing data*"])
res = cont.exec_run('sh -c "echo $CI_TESTING"')
assert res.exit_code == 0
assert 'true' == res.output.decode().strip()

View file

@ -0,0 +1,237 @@
#!/usr/bin/env python
"""
Test agent-lapi and cscli-lapi communication via TLS, on the same container.
"""
from http import HTTPStatus
import random
from pytest_cs import wait_for_log, Status, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_missing_key_file(crowdsec, flavor):
"""Test that cscli and agent can communicate to LAPI with TLS"""
env = {
'CERT_FILE': '/etc/ssl/crowdsec/cert.pem',
'USE_TLS': 'true',
}
with crowdsec(flavor=flavor, environment=env, wait_status=Status.EXITED) as cont:
# XXX: this message appears twice, is that normal?
wait_for_log(cont, "*while serving local API: missing TLS key file*")
def test_missing_cert_file(crowdsec, flavor):
"""Test that cscli and agent can communicate to LAPI with TLS"""
env = {
'KEY_FILE': '/etc/ssl/crowdsec/cert.key',
'USE_TLS': 'true',
}
with crowdsec(flavor=flavor, environment=env, wait_status=Status.EXITED) as cont:
wait_for_log(cont, "*while serving local API: missing TLS cert file*")
def test_tls_missing_ca(crowdsec, flavor, certs_dir):
"""Missing CA cert, unknown authority"""
env = {
'CERT_FILE': '/etc/ssl/crowdsec/lapi.crt',
'KEY_FILE': '/etc/ssl/crowdsec/lapi.key',
'USE_TLS': 'true',
'LOCAL_API_URL': 'https://localhost:8080',
}
volumes = {
certs_dir(lapi_hostname='lapi'): {'bind': '/etc/ssl/crowdsec', 'mode': 'ro'},
}
with crowdsec(flavor=flavor, environment=env, volumes=volumes, wait_status=Status.EXITED) as cont:
wait_for_log(cont, "*certificate signed by unknown authority*")
def test_tls_legacy_var(crowdsec, flavor, certs_dir):
"""Test server-only certificate, legacy variables"""
env = {
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'CERT_FILE': '/etc/ssl/crowdsec/lapi.crt',
'KEY_FILE': '/etc/ssl/crowdsec/lapi.key',
'USE_TLS': 'true',
'LOCAL_API_URL': 'https://localhost:8080',
}
volumes = {
certs_dir(lapi_hostname='lapi'): {'bind': '/etc/ssl/crowdsec', 'mode': 'ro'},
}
with crowdsec(flavor=flavor, environment=env, volumes=volumes) as cont:
wait_for_log(cont, "*Starting processing data*")
# TODO: wait_for_https
wait_for_http(cont, 8080, '/health', want_status=None)
x = cont.exec_run('cscli lapi status')
assert x.exit_code == 0
stdout = x.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
def test_tls_mutual_monolith(crowdsec, flavor, certs_dir):
"""Server and client certificates, on the same container"""
env = {
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'LAPI_CERT_FILE': '/etc/ssl/crowdsec/lapi.crt',
'LAPI_KEY_FILE': '/etc/ssl/crowdsec/lapi.key',
'CLIENT_CERT_FILE': '/etc/ssl/crowdsec/agent.crt',
'CLIENT_KEY_FILE': '/etc/ssl/crowdsec/agent.key',
'USE_TLS': 'true',
'LOCAL_API_URL': 'https://localhost:8080',
}
volumes = {
certs_dir(lapi_hostname='lapi'): {'bind': '/etc/ssl/crowdsec', 'mode': 'ro'},
}
with crowdsec(flavor=flavor, environment=env, volumes=volumes) as cont:
wait_for_log(cont, "*Starting processing data*")
# TODO: wait_for_https
wait_for_http(cont, 8080, '/health', want_status=None)
x = cont.exec_run('cscli lapi status')
assert x.exit_code == 0
stdout = x.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
def test_tls_lapi_var(crowdsec, flavor, certs_dir):
"""Test server-only certificate, lapi variables"""
env = {
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'LAPI_CERT_FILE': '/etc/ssl/crowdsec/lapi.crt',
'LAPI_KEY_FILE': '/etc/ssl/crowdsec/lapi.key',
'USE_TLS': 'true',
'LOCAL_API_URL': 'https://localhost:8080',
}
volumes = {
certs_dir(lapi_hostname='lapi'): {'bind': '/etc/ssl/crowdsec', 'mode': 'ro'},
}
with crowdsec(flavor=flavor, environment=env, volumes=volumes) as cont:
wait_for_log(cont, "*Starting processing data*")
# TODO: wait_for_https
wait_for_http(cont, 8080, '/health', want_status=None)
x = cont.exec_run('cscli lapi status')
assert x.exit_code == 0
stdout = x.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
# TODO: bad lapi hostname
# the cert is valid, but has a CN that doesn't match the hostname
# we must set insecure_skip_verify to true to use it
# TODO: bad client OU, auth failure
# the client cert is valid, but the organization unit doesn't match the allowed
# value and will be rejected by the lapi unless we set agents_allow_ou
def test_tls_split_lapi_agent(crowdsec, flavor, certs_dir):
"""Server-only certificate, split containers"""
rand = random.randint(0, 10000)
lapiname = 'lapi-' + str(rand)
agentname = 'agent-' + str(rand)
lapi_env = {
'USE_TLS': 'true',
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'LAPI_CERT_FILE': '/etc/ssl/crowdsec/lapi.crt',
'LAPI_KEY_FILE': '/etc/ssl/crowdsec/lapi.key',
'AGENT_USERNAME': 'testagent',
'AGENT_PASSWORD': 'testpassword',
'LOCAL_API_URL': 'https://localhost:8080',
}
agent_env = {
'USE_TLS': 'true',
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'AGENT_USERNAME': 'testagent',
'AGENT_PASSWORD': 'testpassword',
'LOCAL_API_URL': f'https://{lapiname}:8080',
'DISABLE_LOCAL_API': 'true',
'CROWDSEC_FEATURE_DISABLE_HTTP_RETRY_BACKOFF': 'false',
}
volumes = {
certs_dir(lapi_hostname=lapiname): {'bind': '/etc/ssl/crowdsec', 'mode': 'ro'},
}
with crowdsec(flavor=flavor, name=lapiname, environment=lapi_env, volumes=volumes) as lapi, crowdsec(flavor=flavor, name=agentname, environment=agent_env, volumes=volumes) as agent:
wait_for_log(lapi, [
"*(tls) Client Auth Type set to VerifyClientCertIfGiven*",
"*CrowdSec Local API listening on 0.0.0.0:8080*"
])
# TODO: wait_for_https
wait_for_http(lapi, 8080, '/health', want_status=None)
wait_for_log(agent, "*Starting processing data*")
res = agent.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
res = lapi.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
def test_tls_mutual_split_lapi_agent(crowdsec, flavor, certs_dir):
"""Server and client certificates, split containers"""
rand = random.randint(0, 10000)
lapiname = 'lapi-' + str(rand)
agentname = 'agent-' + str(rand)
lapi_env = {
'USE_TLS': 'true',
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'LAPI_CERT_FILE': '/etc/ssl/crowdsec/lapi.crt',
'LAPI_KEY_FILE': '/etc/ssl/crowdsec/lapi.key',
'LOCAL_API_URL': 'https://localhost:8080',
}
agent_env = {
'USE_TLS': 'true',
'CACERT_FILE': '/etc/ssl/crowdsec/ca.crt',
'CLIENT_CERT_FILE': '/etc/ssl/crowdsec/agent.crt',
'CLIENT_KEY_FILE': '/etc/ssl/crowdsec/agent.key',
'LOCAL_API_URL': f'https://{lapiname}:8080',
'DISABLE_LOCAL_API': 'true',
'CROWDSEC_FEATURE_DISABLE_HTTP_RETRY_BACKOFF': 'false',
}
volumes = {
certs_dir(lapi_hostname=lapiname): {'bind': '/etc/ssl/crowdsec', 'mode': 'ro'},
}
with crowdsec(flavor=flavor, name=lapiname, environment=lapi_env, volumes=volumes) as lapi, crowdsec(flavor=flavor, name=agentname, environment=agent_env, volumes=volumes) as agent:
wait_for_log(lapi, [
"*(tls) Client Auth Type set to VerifyClientCertIfGiven*",
"*CrowdSec Local API listening on 0.0.0.0:8080*"
])
# TODO: wait_for_https
wait_for_http(lapi, 8080, '/health', want_status=None)
wait_for_log(agent, "*Starting processing data*")
res = agent.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout
res = lapi.exec_run('cscli lapi status')
assert res.exit_code == 0
stdout = res.output.decode()
assert "You can successfully interact with Local API (LAPI)" in stdout

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python
from http import HTTPStatus
from pytest_cs import wait_for_log, wait_for_http
import pytest
pytestmark = pytest.mark.docker
def test_use_wal_default(crowdsec, flavor):
"""Test USE_WAL default"""
with crowdsec(flavor=flavor) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli config show --key Config.DbConfig.UseWal -o json')
assert res.exit_code == 0
stdout = res.output.decode()
assert "false" in stdout
def test_use_wal_true(crowdsec, flavor):
"""Test USE_WAL=true"""
env = {
'USE_WAL': 'true',
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli config show --key Config.DbConfig.UseWal -o json')
assert res.exit_code == 0
stdout = res.output.decode()
assert "true" in stdout
def test_use_wal_false(crowdsec, flavor):
"""Test USE_WAL=false"""
env = {
'USE_WAL': 'false',
}
with crowdsec(flavor=flavor, environment=env) as cont:
wait_for_log(cont, "*Starting processing data*")
wait_for_http(cont, 8080, '/health', want_status=HTTPStatus.OK)
res = cont.exec_run('cscli config show --key Config.DbConfig.UseWal -o json')
assert res.exit_code == 0
stdout = res.output.decode()
assert "false" in stdout