|
@@ -7,44 +7,57 @@ from desecapi.tests.base import AuthenticatedRRSetBaseTestCase
|
|
|
|
|
|
|
|
|
|
class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
-
|
|
|
|
@classmethod
|
|
@classmethod
|
|
def setUpTestDataWithPdns(cls):
|
|
def setUpTestDataWithPdns(cls):
|
|
super().setUpTestDataWithPdns()
|
|
super().setUpTestDataWithPdns()
|
|
|
|
|
|
cls.data = [
|
|
cls.data = [
|
|
- {'subname': 'my-cname', 'records': ['example.com.'], 'ttl': 3600, 'type': 'CNAME'},
|
|
|
|
- {'subname': 'my-bulk', 'records': ['desec.io.', 'foobar.example.'], 'ttl': 3600, 'type': 'PTR'},
|
|
|
|
|
|
+ {
|
|
|
|
+ "subname": "my-cname",
|
|
|
|
+ "records": ["example.com."],
|
|
|
|
+ "ttl": 3600,
|
|
|
|
+ "type": "CNAME",
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "subname": "my-bulk",
|
|
|
|
+ "records": ["desec.io.", "foobar.example."],
|
|
|
|
+ "ttl": 3600,
|
|
|
|
+ "type": "PTR",
|
|
|
|
+ },
|
|
]
|
|
]
|
|
|
|
|
|
cls.data_no_records = copy.deepcopy(cls.data)
|
|
cls.data_no_records = copy.deepcopy(cls.data)
|
|
- cls.data_no_records[1].pop('records')
|
|
|
|
|
|
+ cls.data_no_records[1].pop("records")
|
|
|
|
|
|
cls.data_empty_records = copy.deepcopy(cls.data)
|
|
cls.data_empty_records = copy.deepcopy(cls.data)
|
|
- cls.data_empty_records[1]['records'] = []
|
|
|
|
|
|
+ cls.data_empty_records[1]["records"] = []
|
|
|
|
|
|
cls.data_no_subname = copy.deepcopy(cls.data)
|
|
cls.data_no_subname = copy.deepcopy(cls.data)
|
|
- cls.data_no_subname[0].pop('subname')
|
|
|
|
|
|
+ cls.data_no_subname[0].pop("subname")
|
|
|
|
|
|
cls.data_no_ttl = copy.deepcopy(cls.data)
|
|
cls.data_no_ttl = copy.deepcopy(cls.data)
|
|
- cls.data_no_ttl[0].pop('ttl')
|
|
|
|
|
|
+ cls.data_no_ttl[0].pop("ttl")
|
|
|
|
|
|
cls.data_no_type = copy.deepcopy(cls.data)
|
|
cls.data_no_type = copy.deepcopy(cls.data)
|
|
- cls.data_no_type[1].pop('type')
|
|
|
|
|
|
+ cls.data_no_type[1].pop("type")
|
|
|
|
|
|
cls.data_no_records_no_ttl = copy.deepcopy(cls.data_no_records)
|
|
cls.data_no_records_no_ttl = copy.deepcopy(cls.data_no_records)
|
|
- cls.data_no_records_no_ttl[1].pop('ttl')
|
|
|
|
|
|
+ cls.data_no_records_no_ttl[1].pop("ttl")
|
|
|
|
|
|
cls.data_no_subname_empty_records = copy.deepcopy(cls.data_no_subname)
|
|
cls.data_no_subname_empty_records = copy.deepcopy(cls.data_no_subname)
|
|
- cls.data_no_subname_empty_records[0]['records'] = []
|
|
|
|
|
|
+ cls.data_no_subname_empty_records[0]["records"] = []
|
|
|
|
|
|
cls.bulk_domain = cls.create_domain(owner=cls.owner)
|
|
cls.bulk_domain = cls.create_domain(owner=cls.owner)
|
|
for data in cls.data:
|
|
for data in cls.data:
|
|
cls.create_rr_set(cls.bulk_domain, **data)
|
|
cls.create_rr_set(cls.bulk_domain, **data)
|
|
|
|
|
|
def test_bulk_post_my_rr_sets(self):
|
|
def test_bulk_post_my_rr_sets(self):
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
|
|
|
|
- response = self.client.bulk_post_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data)
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
|
|
+ response = self.client.bulk_post_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_201_CREATED)
|
|
self.assertStatus(response, status.HTTP_201_CREATED)
|
|
|
|
|
|
response = self.client.get_rr_sets(self.my_empty_domain.name)
|
|
response = self.client.get_rr_sets(self.my_empty_domain.name)
|
|
@@ -53,26 +66,33 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
|
|
|
|
# Check subname requirement on bulk endpoint (and uniqueness at the same time)
|
|
# Check subname requirement on bulk endpoint (and uniqueness at the same time)
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
- self.client.bulk_post_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data_no_subname),
|
|
|
|
|
|
+ self.client.bulk_post_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data_no_subname
|
|
|
|
+ ),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
[
|
|
[
|
|
- {'subname': ['This field is required.']},
|
|
|
|
- {'non_field_errors': ['Another RRset with the same subdomain and type exists for this domain.']}
|
|
|
|
- ]
|
|
|
|
|
|
+ {"subname": ["This field is required."]},
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Another RRset with the same subdomain and type exists for this domain."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_post_rr_sets_empty_records(self):
|
|
def test_bulk_post_rr_sets_empty_records(self):
|
|
expected_response_data = [copy.deepcopy(self.data_empty_records[0]), None]
|
|
expected_response_data = [copy.deepcopy(self.data_empty_records[0]), None]
|
|
- expected_response_data[0]['domain'] = self.my_empty_domain.name
|
|
|
|
- expected_response_data[0]['name'] = '%s.%s.' % (self.data_empty_records[0]['subname'],
|
|
|
|
- self.my_empty_domain.name)
|
|
|
|
|
|
+ expected_response_data[0]["domain"] = self.my_empty_domain.name
|
|
|
|
+ expected_response_data[0]["name"] = "%s.%s." % (
|
|
|
|
+ self.data_empty_records[0]["subname"],
|
|
|
|
+ self.my_empty_domain.name,
|
|
|
|
+ )
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
- self.client.bulk_post_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data_empty_records),
|
|
|
|
|
|
+ self.client.bulk_post_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data_empty_records
|
|
|
|
+ ),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
- [
|
|
|
|
- {},
|
|
|
|
- {'records': ['This field must not be empty when using POST.']}
|
|
|
|
- ]
|
|
|
|
|
|
+ [{}, {"records": ["This field must not be empty when using POST."]}],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_post_existing_rrsets(self):
|
|
def test_bulk_post_existing_rrsets(self):
|
|
@@ -82,33 +102,62 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
payload=self.data,
|
|
payload=self.data,
|
|
),
|
|
),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
- 2 * [{
|
|
|
|
- 'non_field_errors': ['Another RRset with the same subdomain and type exists for this domain.']
|
|
|
|
- }]
|
|
|
|
|
|
+ 2
|
|
|
|
+ * [
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Another RRset with the same subdomain and type exists for this domain."
|
|
|
|
+ ]
|
|
|
|
+ }
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_post_duplicates(self):
|
|
def test_bulk_post_duplicates(self):
|
|
data = 2 * [self.data[0]] + [self.data[1]]
|
|
data = 2 * [self.data[0]] + [self.data[1]]
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
- self.client.bulk_post_rr_sets(domain_name=self.my_empty_domain.name, payload=data),
|
|
|
|
|
|
+ self.client.bulk_post_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=data
|
|
|
|
+ ),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
[
|
|
[
|
|
- {'non_field_errors': ['Same subname and type as in position(s) 1, but must be unique.']},
|
|
|
|
- {'non_field_errors': ['Same subname and type as in position(s) 0, but must be unique.']},
|
|
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Same subname and type as in position(s) 1, but must be unique."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Same subname and type as in position(s) 0, but must be unique."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
{},
|
|
{},
|
|
- ]
|
|
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
data = 2 * [self.data[0]] + [self.data[1]] + [self.data[0]]
|
|
data = 2 * [self.data[0]] + [self.data[1]] + [self.data[0]]
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
- self.client.bulk_post_rr_sets(domain_name=self.my_empty_domain.name, payload=data),
|
|
|
|
|
|
+ self.client.bulk_post_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=data
|
|
|
|
+ ),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
[
|
|
[
|
|
- {'non_field_errors': ['Same subname and type as in position(s) 1, 3, but must be unique.']},
|
|
|
|
- {'non_field_errors': ['Same subname and type as in position(s) 0, 3, but must be unique.']},
|
|
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Same subname and type as in position(s) 1, 3, but must be unique."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Same subname and type as in position(s) 0, 3, but must be unique."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
{},
|
|
{},
|
|
- {'non_field_errors': ['Same subname and type as in position(s) 0, 1, but must be unique.']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "Same subname and type as in position(s) 0, 1, but must be unique."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_post_missing_fields(self):
|
|
def test_bulk_post_missing_fields(self):
|
|
@@ -116,125 +165,221 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
self.client.bulk_post_rr_sets(
|
|
self.client.bulk_post_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'a.1', 'records': ['dead::beef'], 'ttl': 3622},
|
|
|
|
- {'subname': 'b.1', 'ttl': -50, 'type': 'AAAA', 'records': ['dead::beef']},
|
|
|
|
- {'ttl': 3640, 'type': 'TXT', 'records': ['"bar"']},
|
|
|
|
- {'subname': '', 'ttl': 3640, 'type': 'TXT', 'records': ['"bar"']},
|
|
|
|
- {'subname': 'c.1', 'records': ['dead::beef'], 'type': 'AAAA'},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'AAAA'},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'SOA',
|
|
|
|
- 'records': ['get.desec.io. get.desec.io. 2018034419 10800 3600 604800 60']},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'OPT', 'records': ['9999']},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'TYPE099', 'records': ['v=spf1 mx -all']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {"subname": "a.1", "records": ["dead::beef"], "ttl": 3622},
|
|
|
|
+ {
|
|
|
|
+ "subname": "b.1",
|
|
|
|
+ "ttl": -50,
|
|
|
|
+ "type": "AAAA",
|
|
|
|
+ "records": ["dead::beef"],
|
|
|
|
+ },
|
|
|
|
+ {"ttl": 3640, "type": "TXT", "records": ['"bar"']},
|
|
|
|
+ {"subname": "", "ttl": 3640, "type": "TXT", "records": ['"bar"']},
|
|
|
|
+ {"subname": "c.1", "records": ["dead::beef"], "type": "AAAA"},
|
|
|
|
+ {"subname": "d.1", "ttl": 3650, "type": "AAAA"},
|
|
|
|
+ {
|
|
|
|
+ "subname": "d.1",
|
|
|
|
+ "ttl": 3650,
|
|
|
|
+ "type": "SOA",
|
|
|
|
+ "records": [
|
|
|
|
+ "get.desec.io. get.desec.io. 2018034419 10800 3600 604800 60"
|
|
|
|
+ ],
|
|
|
|
+ },
|
|
|
|
+ {"subname": "d.1", "ttl": 3650, "type": "OPT", "records": ["9999"]},
|
|
|
|
+ {
|
|
|
|
+ "subname": "d.1",
|
|
|
|
+ "ttl": 3650,
|
|
|
|
+ "type": "TYPE099",
|
|
|
|
+ "records": ["v=spf1 mx -all"],
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
),
|
|
),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
[
|
|
[
|
|
- {'type': ['This field is required.']},
|
|
|
|
- {'ttl': [f'Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}.']},
|
|
|
|
- {'subname': ['This field is required.']},
|
|
|
|
|
|
+ {"type": ["This field is required."]},
|
|
|
|
+ {
|
|
|
|
+ "ttl": [
|
|
|
|
+ f"Ensure this value is greater than or equal to {self.my_empty_domain.minimum_ttl}."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {"subname": ["This field is required."]},
|
|
{},
|
|
{},
|
|
- {'ttl': ['This field is required.']},
|
|
|
|
- {'records': ['This field is required.']},
|
|
|
|
- {'type': ['You cannot tinker with the SOA RR set. It is managed automatically.']},
|
|
|
|
- {'type': ['You cannot tinker with the OPT RR set. It is managed automatically.']},
|
|
|
|
- {'type': ['Generic type format is not supported.']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {"ttl": ["This field is required."]},
|
|
|
|
+ {"records": ["This field is required."]},
|
|
|
|
+ {
|
|
|
|
+ "type": [
|
|
|
|
+ "You cannot tinker with the SOA RR set. It is managed automatically."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "type": [
|
|
|
|
+ "You cannot tinker with the OPT RR set. It is managed automatically."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {"type": ["Generic type format is not supported."]},
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_patch_cname_exclusivity(self):
|
|
def test_bulk_patch_cname_exclusivity(self):
|
|
response = self.client.bulk_patch_rr_sets(
|
|
response = self.client.bulk_patch_rr_sets(
|
|
domain_name=self.my_rr_set_domain.name,
|
|
domain_name=self.my_rr_set_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'test', 'type': 'A', 'ttl': 3600, 'records': ['1.2.3.4']},
|
|
|
|
- {'subname': 'test', 'type': 'CNAME', 'ttl': 3600, 'records': ['example.com.']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {"subname": "test", "type": "A", "ttl": 3600, "records": ["1.2.3.4"]},
|
|
|
|
+ {
|
|
|
|
+ "subname": "test",
|
|
|
|
+ "type": "CNAME",
|
|
|
|
+ "ttl": 3600,
|
|
|
|
+ "records": ["example.com."],
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
self.assertResponse(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertResponse(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.json(), [
|
|
|
|
- {"non_field_errors":["RRset with conflicting type present: 1 (CNAME). (No other RRsets are allowed alongside CNAME.)"]},
|
|
|
|
- {"non_field_errors":["RRset with conflicting type present: 0 (A), database (A, TXT). (No other RRsets are allowed alongside CNAME.)"]},
|
|
|
|
- ])
|
|
|
|
|
|
+ self.assertEqual(
|
|
|
|
+ response.json(),
|
|
|
|
+ [
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "RRset with conflicting type present: 1 (CNAME). (No other RRsets are allowed alongside CNAME.)"
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "non_field_errors": [
|
|
|
|
+ "RRset with conflicting type present: 0 (A), database (A, TXT). (No other RRsets are allowed alongside CNAME.)"
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_post_accepts_empty_list(self):
|
|
def test_bulk_post_accepts_empty_list(self):
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
- self.client.bulk_post_rr_sets(domain_name=self.my_empty_domain.name, payload=[]),
|
|
|
|
|
|
+ self.client.bulk_post_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=[]
|
|
|
|
+ ),
|
|
status.HTTP_201_CREATED,
|
|
status.HTTP_201_CREATED,
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_patch_fresh_rrsets_need_records(self):
|
|
def test_bulk_patch_fresh_rrsets_need_records(self):
|
|
- response = self.client.bulk_patch_rr_sets(self.my_empty_domain.name, payload=self.data_no_records)
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ self.my_empty_domain.name, payload=self.data_no_records
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{}, {'records': ['This field is required.']}])
|
|
|
|
|
|
+ self.assertEqual(response.data, [{}, {"records": ["This field is required."]}])
|
|
|
|
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
|
|
|
|
- response = self.client.bulk_patch_rr_sets(self.my_empty_domain.name, payload=self.data_empty_records)
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ self.my_empty_domain.name, payload=self.data_empty_records
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
def test_bulk_patch_fresh_rrsets_need_subname(self):
|
|
def test_bulk_patch_fresh_rrsets_need_subname(self):
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data_no_subname)
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data_no_subname
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
def test_bulk_patch_fresh_rrsets_need_ttl(self):
|
|
def test_bulk_patch_fresh_rrsets_need_ttl(self):
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data_no_ttl)
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data_no_ttl
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{'ttl': ['This field is required.']}, {}])
|
|
|
|
|
|
+ self.assertEqual(response.data, [{"ttl": ["This field is required."]}, {}])
|
|
|
|
|
|
def test_bulk_patch_fresh_rrsets_need_type(self):
|
|
def test_bulk_patch_fresh_rrsets_need_type(self):
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data_no_type)
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data_no_type
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{}, {'type': ['This field is required.']}])
|
|
|
|
|
|
+ self.assertEqual(response.data, [{}, {"type": ["This field is required."]}])
|
|
|
|
|
|
def test_bulk_patch_does_not_accept_single_objects(self):
|
|
def test_bulk_patch_does_not_accept_single_objects(self):
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data[0])
|
|
|
|
- self.assertContains(response, 'Expected a list of items but got dict.', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data[0]
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a list of items but got dict.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_patch_does_accept_empty_list(self):
|
|
def test_bulk_patch_does_accept_empty_list(self):
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=[])
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=[]
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_rr_set_domain.name, payload=[])
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_rr_set_domain.name, payload=[]
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
def test_bulk_patch_does_not_accept_empty_payload(self):
|
|
def test_bulk_patch_does_not_accept_empty_payload(self):
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=None)
|
|
|
|
- self.assertContains(response, 'No data provided', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=None
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response, "No data provided", status_code=status.HTTP_400_BAD_REQUEST
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_patch_cname_exclusivity_atomic_rrset_replacement(self):
|
|
def test_bulk_patch_cname_exclusivity_atomic_rrset_replacement(self):
|
|
- self.create_rr_set(self.my_empty_domain, subname='test', type='A', records=['1.2.3.4'], ttl=3600)
|
|
|
|
|
|
+ self.create_rr_set(
|
|
|
|
+ self.my_empty_domain,
|
|
|
|
+ subname="test",
|
|
|
|
+ type="A",
|
|
|
|
+ records=["1.2.3.4"],
|
|
|
|
+ ttl=3600,
|
|
|
|
+ )
|
|
|
|
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
response = self.client.bulk_patch_rr_sets(
|
|
response = self.client.bulk_patch_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'test', 'type': 'CNAME', 'ttl': 3605, 'records': ['example.com.']},
|
|
|
|
- {'subname': 'test', 'type': 'A', 'records': []},
|
|
|
|
- ]
|
|
|
|
|
|
+ {
|
|
|
|
+ "subname": "test",
|
|
|
|
+ "type": "CNAME",
|
|
|
|
+ "ttl": 3605,
|
|
|
|
+ "records": ["example.com."],
|
|
|
|
+ },
|
|
|
|
+ {"subname": "test", "type": "A", "records": []},
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
self.assertResponse(response, status.HTTP_200_OK)
|
|
self.assertResponse(response, status.HTTP_200_OK)
|
|
self.assertEqual(len(response.data), 1)
|
|
self.assertEqual(len(response.data), 1)
|
|
- self.assertEqual(response.data[0]['type'], 'CNAME')
|
|
|
|
- self.assertEqual(response.data[0]['records'], ['example.com.'])
|
|
|
|
- self.assertEqual(response.data[0]['ttl'], 3605)
|
|
|
|
|
|
+ self.assertEqual(response.data[0]["type"], "CNAME")
|
|
|
|
+ self.assertEqual(response.data[0]["records"], ["example.com."])
|
|
|
|
+ self.assertEqual(response.data[0]["ttl"], 3605)
|
|
|
|
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
response = self.client.bulk_patch_rr_sets(
|
|
response = self.client.bulk_patch_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'test', 'type': 'CNAME', 'records': []},
|
|
|
|
- {'subname': 'test', 'type': 'A', 'ttl': 3600, 'records': ['5.4.2.1']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {"subname": "test", "type": "CNAME", "records": []},
|
|
|
|
+ {
|
|
|
|
+ "subname": "test",
|
|
|
|
+ "type": "A",
|
|
|
|
+ "ttl": 3600,
|
|
|
|
+ "records": ["5.4.2.1"],
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
self.assertResponse(response, status.HTTP_200_OK)
|
|
self.assertResponse(response, status.HTTP_200_OK)
|
|
self.assertEqual(len(response.data), 1)
|
|
self.assertEqual(len(response.data), 1)
|
|
- self.assertEqual(response.data[0]['type'], 'A')
|
|
|
|
- self.assertEqual(response.data[0]['records'], ['5.4.2.1'])
|
|
|
|
- self.assertEqual(response.data[0]['ttl'], 3600)
|
|
|
|
|
|
+ self.assertEqual(response.data[0]["type"], "A")
|
|
|
|
+ self.assertEqual(response.data[0]["records"], ["5.4.2.1"])
|
|
|
|
+ self.assertEqual(response.data[0]["ttl"], 3600)
|
|
|
|
|
|
def test_bulk_patch_full_on_empty_domain(self):
|
|
def test_bulk_patch_full_on_empty_domain(self):
|
|
# Full patch always works
|
|
# Full patch always works
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
|
|
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data)
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
# Check that RRsets have been created
|
|
# Check that RRsets have been created
|
|
@@ -244,9 +389,13 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
|
|
|
|
def test_bulk_patch_change_records(self):
|
|
def test_bulk_patch_change_records(self):
|
|
data_no_ttl = copy.deepcopy(self.data_no_ttl)
|
|
data_no_ttl = copy.deepcopy(self.data_no_ttl)
|
|
- data_no_ttl[0]['records'] = ['example.org.']
|
|
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.bulk_domain.name)):
|
|
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.bulk_domain.name, payload=data_no_ttl)
|
|
|
|
|
|
+ data_no_ttl[0]["records"] = ["example.org."]
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(name=self.bulk_domain.name)
|
|
|
|
+ ):
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.bulk_domain.name, payload=data_no_ttl
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
response = self.client.get_rr_sets(self.bulk_domain.name)
|
|
response = self.client.get_rr_sets(self.bulk_domain.name)
|
|
@@ -255,9 +404,13 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
|
|
|
|
def test_bulk_patch_change_ttl(self):
|
|
def test_bulk_patch_change_ttl(self):
|
|
data_no_records = copy.deepcopy(self.data_no_records)
|
|
data_no_records = copy.deepcopy(self.data_no_records)
|
|
- data_no_records[1]['ttl'] = 3911
|
|
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.bulk_domain.name)):
|
|
|
|
- response = self.client.bulk_patch_rr_sets(domain_name=self.bulk_domain.name, payload=data_no_records)
|
|
|
|
|
|
+ data_no_records[1]["ttl"] = 3911
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(name=self.bulk_domain.name)
|
|
|
|
+ ):
|
|
|
|
+ response = self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.bulk_domain.name, payload=data_no_records
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
response = self.client.get_rr_sets(self.bulk_domain.name)
|
|
response = self.client.get_rr_sets(self.bulk_domain.name)
|
|
@@ -266,7 +419,9 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
|
|
|
|
def test_bulk_patch_does_not_need_ttl(self):
|
|
def test_bulk_patch_does_not_need_ttl(self):
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
- self.client.bulk_patch_rr_sets(domain_name=self.bulk_domain.name, payload=self.data_no_ttl),
|
|
|
|
|
|
+ self.client.bulk_patch_rr_sets(
|
|
|
|
+ domain_name=self.bulk_domain.name, payload=self.data_no_ttl
|
|
|
|
+ ),
|
|
status.HTTP_200_OK,
|
|
status.HTTP_200_OK,
|
|
)
|
|
)
|
|
|
|
|
|
@@ -275,135 +430,241 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
self.client.bulk_patch_rr_sets(
|
|
self.client.bulk_patch_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'a', 'type': 'A', 'records': [], 'ttl': 3622},
|
|
|
|
- {'subname': 'b', 'type': 'AAAA', 'records': []},
|
|
|
|
- ]),
|
|
|
|
|
|
+ {"subname": "a", "type": "A", "records": [], "ttl": 3622},
|
|
|
|
+ {"subname": "b", "type": "AAAA", "records": []},
|
|
|
|
+ ],
|
|
|
|
+ ),
|
|
status.HTTP_200_OK,
|
|
status.HTTP_200_OK,
|
|
[],
|
|
[],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_patch_missing_invalid_fields_1(self):
|
|
def test_bulk_patch_missing_invalid_fields_1(self):
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
self.client.bulk_post_rr_sets(
|
|
self.client.bulk_post_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': '', 'ttl': 3650, 'type': 'TXT', 'records': ['"foo"']},
|
|
|
|
- {'subname': 'c.1', 'records': ['dead::beef'], 'type': 'AAAA', 'ttl': 3603},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'AAAA', 'records': ['::1', '::2']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {"subname": "", "ttl": 3650, "type": "TXT", "records": ['"foo"']},
|
|
|
|
+ {
|
|
|
|
+ "subname": "c.1",
|
|
|
|
+ "records": ["dead::beef"],
|
|
|
|
+ "type": "AAAA",
|
|
|
|
+ "ttl": 3603,
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "subname": "d.1",
|
|
|
|
+ "ttl": 3650,
|
|
|
|
+ "type": "AAAA",
|
|
|
|
+ "records": ["::1", "::2"],
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
self.client.bulk_patch_rr_sets(
|
|
self.client.bulk_patch_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'a.1', 'records': ['dead::beef'], 'ttl': 3622},
|
|
|
|
- {'subname': 'b.1', 'ttl': -50, 'type': 'AAAA', 'records': ['dead::beef']},
|
|
|
|
- {'ttl': 3640, 'type': 'TXT', 'records': ['"bar"']},
|
|
|
|
- {'subname': 'c.1', 'records': ['dead::beef'], 'type': 'AAAA'},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'AAAA'},
|
|
|
|
- ]),
|
|
|
|
|
|
+ {"subname": "a.1", "records": ["dead::beef"], "ttl": 3622},
|
|
|
|
+ {
|
|
|
|
+ "subname": "b.1",
|
|
|
|
+ "ttl": -50,
|
|
|
|
+ "type": "AAAA",
|
|
|
|
+ "records": ["dead::beef"],
|
|
|
|
+ },
|
|
|
|
+ {"ttl": 3640, "type": "TXT", "records": ['"bar"']},
|
|
|
|
+ {"subname": "c.1", "records": ["dead::beef"], "type": "AAAA"},
|
|
|
|
+ {"subname": "d.1", "ttl": 3650, "type": "AAAA"},
|
|
|
|
+ ],
|
|
|
|
+ ),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
[
|
|
[
|
|
- {'type': ['This field is required.']},
|
|
|
|
- {'ttl': [f'Ensure this value is greater than or equal to {settings.MINIMUM_TTL_DEFAULT}.']},
|
|
|
|
- {'subname': ['This field is required.']},
|
|
|
|
|
|
+ {"type": ["This field is required."]},
|
|
|
|
+ {
|
|
|
|
+ "ttl": [
|
|
|
|
+ f"Ensure this value is greater than or equal to {settings.MINIMUM_TTL_DEFAULT}."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {"subname": ["This field is required."]},
|
|
{},
|
|
{},
|
|
{},
|
|
{},
|
|
- ]
|
|
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_patch_missing_invalid_fields_2(self):
|
|
def test_bulk_patch_missing_invalid_fields_2(self):
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)):
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
self.client.bulk_post_rr_sets(
|
|
self.client.bulk_post_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': '', 'ttl': 3650, 'type': 'TXT', 'records': ['"foo"']}
|
|
|
|
- ]
|
|
|
|
|
|
+ {"subname": "", "ttl": 3650, "type": "TXT", "records": ['"foo"']}
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
self.assertResponse(
|
|
self.assertResponse(
|
|
self.client.bulk_patch_rr_sets(
|
|
self.client.bulk_patch_rr_sets(
|
|
domain_name=self.my_empty_domain.name,
|
|
domain_name=self.my_empty_domain.name,
|
|
payload=[
|
|
payload=[
|
|
- {'subname': 'a.1', 'records': ['dead::beef'], 'ttl': 3622},
|
|
|
|
- {'subname': 'b.1', 'ttl': -50, 'type': 'AAAA', 'records': ['dead::beef']},
|
|
|
|
- {'ttl': 3640, 'type': 'TXT', 'records': ['"bar"']},
|
|
|
|
- {'subname': 'c.1', 'records': ['dead::beef'], 'type': 'AAAA'},
|
|
|
|
- {'subname': 'd.1', 'ttl': 3650, 'type': 'AAAA'},
|
|
|
|
- ]),
|
|
|
|
|
|
+ {"subname": "a.1", "records": ["dead::beef"], "ttl": 3622},
|
|
|
|
+ {
|
|
|
|
+ "subname": "b.1",
|
|
|
|
+ "ttl": -50,
|
|
|
|
+ "type": "AAAA",
|
|
|
|
+ "records": ["dead::beef"],
|
|
|
|
+ },
|
|
|
|
+ {"ttl": 3640, "type": "TXT", "records": ['"bar"']},
|
|
|
|
+ {"subname": "c.1", "records": ["dead::beef"], "type": "AAAA"},
|
|
|
|
+ {"subname": "d.1", "ttl": 3650, "type": "AAAA"},
|
|
|
|
+ ],
|
|
|
|
+ ),
|
|
status.HTTP_400_BAD_REQUEST,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
[
|
|
[
|
|
- {'type': ['This field is required.']},
|
|
|
|
- {'ttl': [f'Ensure this value is greater than or equal to {settings.MINIMUM_TTL_DEFAULT}.']},
|
|
|
|
- {'subname': ['This field is required.']},
|
|
|
|
- {'ttl': ['This field is required.']},
|
|
|
|
- {'records': ['This field is required.']},
|
|
|
|
- ]
|
|
|
|
|
|
+ {"type": ["This field is required."]},
|
|
|
|
+ {
|
|
|
|
+ "ttl": [
|
|
|
|
+ f"Ensure this value is greater than or equal to {settings.MINIMUM_TTL_DEFAULT}."
|
|
|
|
+ ]
|
|
|
|
+ },
|
|
|
|
+ {"subname": ["This field is required."]},
|
|
|
|
+ {"ttl": ["This field is required."]},
|
|
|
|
+ {"records": ["This field is required."]},
|
|
|
|
+ ],
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_put_partial(self):
|
|
def test_bulk_put_partial(self):
|
|
# Need all fields
|
|
# Need all fields
|
|
for domain in [self.my_empty_domain, self.bulk_domain]:
|
|
for domain in [self.my_empty_domain, self.bulk_domain]:
|
|
- response = self.client.bulk_put_rr_sets(domain_name=domain.name, payload=self.data_no_records)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=domain.name, payload=self.data_no_records
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{}, {'records': ['This field is required.']}])
|
|
|
|
|
|
+ self.assertEqual(
|
|
|
|
+ response.data, [{}, {"records": ["This field is required."]}]
|
|
|
|
+ )
|
|
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=domain.name, payload=self.data_no_ttl)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=domain.name, payload=self.data_no_ttl
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{'ttl': ['This field is required.']}, {}])
|
|
|
|
|
|
+ self.assertEqual(response.data, [{"ttl": ["This field is required."]}, {}])
|
|
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=domain.name, payload=self.data_no_records_no_ttl)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=domain.name, payload=self.data_no_records_no_ttl
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{},
|
|
|
|
- {'ttl': ['This field is required.'],
|
|
|
|
- 'records': ['This field is required.']}])
|
|
|
|
|
|
+ self.assertEqual(
|
|
|
|
+ response.data,
|
|
|
|
+ [
|
|
|
|
+ {},
|
|
|
|
+ {
|
|
|
|
+ "ttl": ["This field is required."],
|
|
|
|
+ "records": ["This field is required."],
|
|
|
|
+ },
|
|
|
|
+ ],
|
|
|
|
+ )
|
|
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=domain.name, payload=self.data_no_subname)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=domain.name, payload=self.data_no_subname
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{'subname': ['This field is required.']}, {}])
|
|
|
|
|
|
+ self.assertEqual(
|
|
|
|
+ response.data, [{"subname": ["This field is required."]}, {}]
|
|
|
|
+ )
|
|
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=domain.name, payload=self.data_no_subname_empty_records)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=domain.name, payload=self.data_no_subname_empty_records
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{'subname': ['This field is required.']}, {}])
|
|
|
|
|
|
+ self.assertEqual(
|
|
|
|
+ response.data, [{"subname": ["This field is required."]}, {}]
|
|
|
|
+ )
|
|
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=domain.name, payload=self.data_no_type)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=domain.name, payload=self.data_no_type
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertEqual(response.data, [{}, {'type': ['This field is required.']}])
|
|
|
|
|
|
+ self.assertEqual(response.data, [{}, {"type": ["This field is required."]}])
|
|
|
|
|
|
def test_bulk_put_does_not_accept_single_objects(self):
|
|
def test_bulk_put_does_not_accept_single_objects(self):
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data[0])
|
|
|
|
- self.assertContains(response, 'Expected a list of items but got dict.', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data[0]
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a list of items but got dict.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_put_does_accept_empty_list(self):
|
|
def test_bulk_put_does_accept_empty_list(self):
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=[])
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=[]
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_rr_set_domain.name, payload=[])
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_rr_set_domain.name, payload=[]
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
def test_bulk_put_does_not_accept_empty_payload(self):
|
|
def test_bulk_put_does_not_accept_empty_payload(self):
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=None)
|
|
|
|
- self.assertContains(response, 'No data provided', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=None
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response, "No data provided", status_code=status.HTTP_400_BAD_REQUEST
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_put_does_not_accept_list_of_crap(self):
|
|
def test_bulk_put_does_not_accept_list_of_crap(self):
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=['bla'])
|
|
|
|
- self.assertContains(response, 'Expected a dictionary, but got str.', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=["bla"]
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a dictionary, but got str.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=[42])
|
|
|
|
- self.assertContains(response, 'Expected a dictionary, but got int.', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=[42]
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a dictionary, but got int.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_put_does_not_accept_rrsets_with_nonstr_subname(self):
|
|
def test_bulk_put_does_not_accept_rrsets_with_nonstr_subname(self):
|
|
- payload = [{"subname": ["foobar"], "type": "A", "ttl": 3600, "records": ["1.2.3.4"]}]
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=payload)
|
|
|
|
- self.assertContains(response, 'Expected a string, but got list.', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ payload = [
|
|
|
|
+ {"subname": ["foobar"], "type": "A", "ttl": 3600, "records": ["1.2.3.4"]}
|
|
|
|
+ ]
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=payload
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a string, but got list.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_put_does_not_accept_rrsets_with_nonstr_type(self):
|
|
def test_bulk_put_does_not_accept_rrsets_with_nonstr_type(self):
|
|
- payload = [{"subname": "foobar", "type": ["A"], "ttl": 3600, "records": ["1.2.3.4"]}]
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=payload)
|
|
|
|
- self.assertContains(response, 'Expected a string, but got list.', status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ payload = [
|
|
|
|
+ {"subname": "foobar", "type": ["A"], "ttl": 3600, "records": ["1.2.3.4"]}
|
|
|
|
+ ]
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=payload
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a string, but got list.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_put_full(self):
|
|
def test_bulk_put_full(self):
|
|
# Full PUT always works
|
|
# Full PUT always works
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)):
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=self.data)
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)
|
|
|
|
+ ):
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
# Check that RRsets have been created
|
|
# Check that RRsets have been created
|
|
@@ -412,27 +673,37 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
self.assertRRSetsCount(response.data, self.data)
|
|
self.assertRRSetsCount(response.data, self.data)
|
|
|
|
|
|
# Do not expect any updates, but successful code when PUT'ing only existing RRsets
|
|
# Do not expect any updates, but successful code when PUT'ing only existing RRsets
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.bulk_domain.name, payload=self.data)
|
|
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.bulk_domain.name, payload=self.data
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
self.assertStatus(response, status.HTTP_200_OK)
|
|
|
|
|
|
def test_bulk_put_invalid_records(self):
|
|
def test_bulk_put_invalid_records(self):
|
|
for records in [
|
|
for records in [
|
|
- 'asfd',
|
|
|
|
- ['1.1.1.1', '2.2.2.2', 123],
|
|
|
|
- ['1.2.3.4', None],
|
|
|
|
- [True, '1.1.1.1'],
|
|
|
|
- dict(foobar='foobar', asdf='asdf'),
|
|
|
|
|
|
+ "asfd",
|
|
|
|
+ ["1.1.1.1", "2.2.2.2", 123],
|
|
|
|
+ ["1.2.3.4", None],
|
|
|
|
+ [True, "1.1.1.1"],
|
|
|
|
+ dict(foobar="foobar", asdf="asdf"),
|
|
]:
|
|
]:
|
|
- payload = [{'subname': 'a.2', 'ttl': 3600, 'type': 'MX', 'records': records}]
|
|
|
|
- response = self.client.bulk_put_rr_sets(domain_name=self.my_empty_domain.name, payload=payload)
|
|
|
|
|
|
+ payload = [
|
|
|
|
+ {"subname": "a.2", "ttl": 3600, "type": "MX", "records": records}
|
|
|
|
+ ]
|
|
|
|
+ response = self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=payload
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
- self.assertTrue('records' in response.data[0])
|
|
|
|
|
|
+ self.assertTrue("records" in response.data[0])
|
|
|
|
|
|
def test_bulk_put_empty_records(self):
|
|
def test_bulk_put_empty_records(self):
|
|
- with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.bulk_domain.name)):
|
|
|
|
|
|
+ with self.assertPdnsRequests(
|
|
|
|
+ self.requests_desec_rr_sets_update(name=self.bulk_domain.name)
|
|
|
|
+ ):
|
|
self.assertStatus(
|
|
self.assertStatus(
|
|
- self.client.bulk_put_rr_sets(domain_name=self.bulk_domain.name, payload=self.data_empty_records),
|
|
|
|
- status.HTTP_200_OK
|
|
|
|
|
|
+ self.client.bulk_put_rr_sets(
|
|
|
|
+ domain_name=self.bulk_domain.name, payload=self.data_empty_records
|
|
|
|
+ ),
|
|
|
|
+ status.HTTP_200_OK,
|
|
)
|
|
)
|
|
|
|
|
|
def test_bulk_duplicate_rrset(self):
|
|
def test_bulk_duplicate_rrset(self):
|
|
@@ -442,19 +713,26 @@ class AuthenticatedRRSetBulkTestCase(AuthenticatedRRSetBaseTestCase):
|
|
self.client.bulk_put_rr_sets,
|
|
self.client.bulk_put_rr_sets,
|
|
self.client.bulk_post_rr_sets,
|
|
self.client.bulk_post_rr_sets,
|
|
]:
|
|
]:
|
|
- response = bulk_request_rr_sets(domain_name=self.my_empty_domain.name, payload=data)
|
|
|
|
|
|
+ response = bulk_request_rr_sets(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=data
|
|
|
|
+ )
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
def test_bulk_patch_or_post_failure_with_single_rrset(self):
|
|
def test_bulk_patch_or_post_failure_with_single_rrset(self):
|
|
for method in [self.client.bulk_patch_rr_sets, self.client.bulk_put_rr_sets]:
|
|
for method in [self.client.bulk_patch_rr_sets, self.client.bulk_put_rr_sets]:
|
|
- response = method(domain_name=self.my_empty_domain.name, payload=self.data[0])
|
|
|
|
- self.assertContains(response, 'Expected a list of items but got dict.',
|
|
|
|
- status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
+ response = method(
|
|
|
|
+ domain_name=self.my_empty_domain.name, payload=self.data[0]
|
|
|
|
+ )
|
|
|
|
+ self.assertContains(
|
|
|
|
+ response,
|
|
|
|
+ "Expected a list of items but got dict.",
|
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
+ )
|
|
|
|
|
|
def test_bulk_delete_rrsets(self):
|
|
def test_bulk_delete_rrsets(self):
|
|
self.assertStatus(
|
|
self.assertStatus(
|
|
self.client.delete(
|
|
self.client.delete(
|
|
- self.reverse('v1:rrsets', name=self.my_empty_domain.name),
|
|
|
|
|
|
+ self.reverse("v1:rrsets", name=self.my_empty_domain.name),
|
|
data=None,
|
|
data=None,
|
|
),
|
|
),
|
|
status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
status.HTTP_405_METHOD_NOT_ALLOWED,
|