Sfoglia il codice sorgente

filter.yml get global filter on station_lists

Thomas Hanika 3 anni fa
parent
commit
4bc3bed487
6 ha cambiato i file con 265 aggiunte e 108 eliminazioni
  1. 98 0
      ycast/filter.py
  2. 53 11
      ycast/generic.py
  3. 3 30
      ycast/my_recentlystation.py
  4. 1 11
      ycast/my_stations.py
  5. 66 56
      ycast/radiobrowser.py
  6. 44 0
      ycast/test_filter.py

+ 98 - 0
ycast/filter.py

@@ -0,0 +1,98 @@
+import logging
+
+from ycast import generic
+from ycast.generic import get_json_attr
+
+white_list = {}
+black_list = {}
+filter_dir = {}
+parameter_failed_list = {}
+
+def init_filter():
+    global white_list
+    global black_list
+    global parameter_failed_list
+    global filter_dir
+    filter_dir = generic.read_yaml_file(generic.get_var_path() + '/filter.yml')
+    if filter_dir:
+        white_list = filter_dir['whitelist']
+        black_list = filter_dir['blacklist']
+    else:
+        white_list = { 'lastcheckok': 1 }
+        black_list = {}
+        filter_dir = {}
+        filter_dir['whitelist'] = white_list
+        filter_dir['blacklist'] = black_list
+        generic.write_yaml_file(generic.get_var_path() + '/filter.yml', filter_dir)
+
+        filter_ex = {}
+        filter_ex['whitelist'] = {'lastcheckok': 1, 'countrycode': 'DE,US,NO,GB', 'languagecodes': 'en,no'}
+        filter_ex['blacklist'] = {'favicon': ''}
+        generic.write_yaml_file(generic.get_var_path() + '/filter.yml_example', filter_ex)
+
+    parameter_failed_list.clear()
+    return
+
+def end_filter():
+    if parameter_failed_list:
+        logging.info("Used filter parameter: %s", parameter_failed_list)
+    else:
+        logging.info("Used filter parameter: <nothing used>")
+
+def parameter_hit(param_name):
+    count = 1
+    old = None
+    if parameter_failed_list:
+        old = parameter_failed_list.get(param_name)
+    if old:
+        count = old + 1
+    parameter_failed_list[param_name] = count
+
+def check_station(station_json):
+    station_name = get_json_attr(station_json, 'name')
+    if not station_name:
+        # müll response
+        logging.debug(station_json)
+        return False
+# oder verknüpft
+    if black_list:
+        black_list_hit = False
+        for param_name in black_list:
+            unvalid_elements = black_list[param_name]
+            val =  get_json_attr(station_json, param_name)
+            if not val == None:
+                # attribut in json vorhanden
+                if unvalid_elements:
+                    if val:
+                        pos = unvalid_elements.find(val)
+                        black_list_hit = pos >= 0
+                else:
+                    if not val:
+                        black_list_hit = True
+                if black_list_hit:
+                    parameter_hit(param_name)
+                    logging.debug("FAIL '%s' blacklist hit on '%s' '%s' == '%s'",
+                                  station_name, param_name, unvalid_elements, val)
+                    return False
+
+# und verknüpft
+    if white_list:
+        white_list_hit = True
+        for param_name in white_list:
+            val =  get_json_attr(station_json, param_name)
+            if not val == None:
+                # attribut in json vorhanden
+                valid_elements = white_list[param_name]
+                if type(val) is int:
+                    white_list_hit = val == valid_elements
+                else:
+                    if val:
+                        pos = valid_elements.find(val)
+                        white_list_hit = pos >=0
+                if not white_list_hit:
+                    parameter_hit(param_name)
+                    logging.debug("FAIL '%s' whitelist failed on '%s' '%s' == '%s'",
+                                  station_name, param_name, valid_elements, val)
+                    return False
+    logging.debug("OK   '%s' passed", station_name)
+    return True

+ 53 - 11
ycast/generic.py

@@ -1,11 +1,12 @@
 import logging
 import logging
 import os
 import os
 import hashlib
 import hashlib
+import yaml
 
 
 USER_AGENT = 'YCast'
 USER_AGENT = 'YCast'
 VAR_PATH = os.path.expanduser("~") + '/.ycast'
 VAR_PATH = os.path.expanduser("~") + '/.ycast'
 CACHE_PATH = VAR_PATH + '/cache'
 CACHE_PATH = VAR_PATH + '/cache'
-FILTER_PATH = VAR_PATH + '/filter'
+
 
 
 class Directory:
 class Directory:
     def __init__(self, name, item_count, displayname=None):
     def __init__(self, name, item_count, displayname=None):
@@ -47,15 +48,6 @@ def get_cache_path(cache_name):
         return None
         return None
     return cache_path
     return cache_path
 
 
-def get_filter_path():
-    try:
-        os.makedirs(FILTER_PATH)
-    except FileExistsError:
-        pass
-    except PermissionError:
-        logging.error("Could not create cache folders (%s) because of access permissions", cache_path)
-        return None
-    return FILTER_PATH
 
 
 def get_var_path():
 def get_var_path():
     try:
     try:
@@ -63,10 +55,11 @@ def get_var_path():
     except FileExistsError:
     except FileExistsError:
         pass
         pass
     except PermissionError:
     except PermissionError:
-        logging.error("Could not create cache folders (%s) because of access permissions", cache_path)
+        logging.error("Could not create cache folders (%s) because of access permissions", VAR_PATH)
         return None
         return None
     return VAR_PATH
     return VAR_PATH
 
 
+
 def get_checksum(feed, charlimit=12):
 def get_checksum(feed, charlimit=12):
     hash_feed = feed.encode()
     hash_feed = feed.encode()
     hash_object = hashlib.md5(hash_feed)
     hash_object = hashlib.md5(hash_feed)
@@ -76,3 +69,52 @@ def get_checksum(feed, charlimit=12):
         xor_fold[i] ^= b
         xor_fold[i] ^= b
     digest_xor_fold = ''.join(format(x, '02x') for x in bytes(xor_fold))
     digest_xor_fold = ''.join(format(x, '02x') for x in bytes(xor_fold))
     return str(digest_xor_fold[:charlimit]).upper()
     return str(digest_xor_fold[:charlimit]).upper()
+
+
+def read_yaml_file(file_name):
+    try:
+        with open(file_name, 'r') as f:
+            return yaml.safe_load(f)
+    except FileNotFoundError:
+        logging.error("YAML file '%s' not found", file_name)
+    except yaml.YAMLError as e:
+        logging.error("YAML format error in '%':\n    %s", file_name, e)
+    return None
+
+
+def write_yaml_file(file_name, dictionary):
+    try:
+        with open(file_name, 'w') as f:
+            yaml.dump(dictionary, f)
+    except yaml.YAMLError as e:
+        logging.error("YAML format error in '%':\n    %s", file_name, e)
+    except Exception as ex:
+        logging.error("File not written '%s':\n    %s", file_name, ex)
+
+
+def readlns_txt_file(file_name):
+    try:
+        with open(file_name, 'r') as f:
+            return f.readlines()
+    except FileNotFoundError:
+        logging.error("YAML file '%s' not found", file_name)
+    except yaml.YAMLError as e:
+        logging.error("YAML format error in '%':\n    %s", file_name, e)
+    return None
+
+
+def writelns_txt_file(file_name, line_list):
+    try:
+        with open(file_name, 'w') as f:
+            f.writelines(line_list)
+            logging.info("File written '%s'", file_name)
+    except Exception as ex:
+        logging.error("File not written '%s':\n    %s", file_name, ex)
+
+
+def get_json_attr(json, attr):
+    try:
+        return json[attr]
+    except Exception as ex:
+        logging.debug("json: attr '%s' not found: %s", attr, ex)
+        return None

+ 3 - 30
ycast/my_recentlystation.py

@@ -1,10 +1,7 @@
 import logging
 import logging
-
-import yaml
 from ycast import generic
 from ycast import generic
 
 
 MAX_ENTRIES = 15
 MAX_ENTRIES = 15
-
 recently_file = generic.get_var_path() + '/recently.yml'
 recently_file = generic.get_var_path() + '/recently.yml'
 
 
 
 
@@ -34,36 +31,12 @@ def signal_station_selected(name, url, icon):
 
 
 
 
 def set_stations_yaml(heard_stations):
 def set_stations_yaml(heard_stations):
-    try:
-        with open(recently_file, 'w') as f:
-            f.writelines(heard_stations)
-            logging.info("File written '%s'", recently_file)
-
-    except Exception as ex:
-        logging.error("File not written '%s': %s", recently_file, ex)
+    generic.writelns_txt_file(recently_file, heard_stations)
 
 
 
 
 def get_stations_list():
 def get_stations_list():
-    try:
-        with open(recently_file, 'r') as f:
-            heard_stations = f.readlines()
-    except FileNotFoundError:
-        logging.warning("File not found '%s' not found", recently_file)
-        return []
-    except yaml.YAMLError as e:
-        logging.error("Station configuration format error: %s", e)
-        return []
-    return heard_stations
+    return generic.readlns_txt_file(recently_file)
 
 
 
 
 def get_recently_stations_yaml():
 def get_recently_stations_yaml():
-    try:
-        with open(recently_file, 'r') as f:
-            my_stations = yaml.safe_load(f)
-    except FileNotFoundError:
-        logging.error("Station configuration '%s' not found", recently_file)
-        return None
-    except yaml.YAMLError as e:
-        logging.error("Station configuration format error: %s", e)
-        return None
-    return my_stations
+    return generic.read_yaml_file(recently_file)

+ 1 - 11
ycast/my_stations.py

@@ -1,5 +1,4 @@
 import logging
 import logging
-import oyaml as yaml
 
 
 import ycast.vtuner as vtuner
 import ycast.vtuner as vtuner
 import ycast.generic as generic
 import ycast.generic as generic
@@ -44,16 +43,7 @@ def get_station_by_id(vtune_id):
 def get_stations_yaml():
 def get_stations_yaml():
     from ycast.my_recentlystation import get_recently_stations_yaml
     from ycast.my_recentlystation import get_recently_stations_yaml
     my_recently_station = get_recently_stations_yaml()
     my_recently_station = get_recently_stations_yaml()
-    my_stations = None
-    try:
-        with open(config_file, 'r') as f:
-            my_stations = yaml.safe_load(f)
-    except FileNotFoundError:
-        logging.error("Station configuration '%s' not found", config_file)
-
-    except yaml.YAMLError as e:
-        logging.error("Station configuration format error: %s", e)
-
+    my_stations = generic.read_yaml_file(config_file)
     if my_stations:
     if my_stations:
         if my_recently_station:
         if my_recently_station:
             my_stations.update(my_recently_station)
             my_stations.update(my_recently_station)

+ 66 - 56
ycast/radiobrowser.py

@@ -4,6 +4,8 @@ import logging
 from ycast import __version__
 from ycast import __version__
 import ycast.vtuner as vtuner
 import ycast.vtuner as vtuner
 import ycast.generic as generic
 import ycast.generic as generic
+from ycast.filter import check_station, init_filter, end_filter
+from ycast.generic import get_json_attr
 
 
 API_ENDPOINT = "http://all.api.radio-browser.info"
 API_ENDPOINT = "http://all.api.radio-browser.info"
 MINIMUM_COUNT_GENRE = 5
 MINIMUM_COUNT_GENRE = 5
@@ -17,27 +19,20 @@ ID_PREFIX = "RB"
 station_cache = {}
 station_cache = {}
 
 
 
 
-def get_json_attr(json, attr):
-    try:
-        return json[attr]
-    except Exception as ex:
-        logging.error("json: attr '%s' not found: %s", attr, ex)
-        return None
-
-
 class Station:
 class Station:
     def __init__(self, station_json):
     def __init__(self, station_json):
-        self.stationuuid = get_json_attr(station_json, 'stationuuid')
+        self.stationuuid = generic.get_json_attr(station_json, 'stationuuid')
         self.id = generic.generate_stationid_with_prefix(generic.get_checksum(self.stationuuid), ID_PREFIX)
         self.id = generic.generate_stationid_with_prefix(generic.get_checksum(self.stationuuid), ID_PREFIX)
-        self.name = get_json_attr(station_json, 'name')
-        self.url = get_json_attr(station_json, 'url')
-        self.icon = get_json_attr(station_json, 'favicon')
-        self.tags = get_json_attr(station_json, 'tags').split(',')
-        self.countrycode = get_json_attr(station_json, 'countrycode')
-        self.language = get_json_attr(station_json, 'language')
-        self.votes = get_json_attr(station_json, 'votes')
-        self.codec = get_json_attr(station_json, 'codec')
-        self.bitrate = get_json_attr(station_json, 'bitrate')
+        self.name = generic.get_json_attr(station_json, 'name')
+        self.url = generic.get_json_attr(station_json, 'url')
+        self.icon = generic.get_json_attr(station_json, 'favicon')
+        self.tags = generic.get_json_attr(station_json, 'tags').split(',')
+        self.countrycode = generic.get_json_attr(station_json, 'countrycode')
+        self.language = generic.get_json_attr(station_json, 'language')
+        self.languagecodes = generic.get_json_attr(station_json, 'languagecodes')
+        self.votes = generic.get_json_attr(station_json, 'votes')
+        self.codec = generic.get_json_attr(station_json, 'codec')
+        self.bitrate = generic.get_json_attr(station_json, 'bitrate')
 
 
     def to_vtuner(self):
     def to_vtuner(self):
         return vtuner.Station(self.id, self.name,
         return vtuner.Station(self.id, self.name,
@@ -67,23 +62,13 @@ def request(url):
 
 
 
 
 def get_station_by_id(vtune_id):
 def get_station_by_id(vtune_id):
-    return station_cache[vtune_id]
-
-
-def search(name, limit=DEFAULT_STATION_LIMIT):
-    station_cache.clear()
-    stations = []
-    stations_json = request('stations/search?order=name&reverse=false&limit=' + str(limit) + '&name=' + str(name))
-    for station_json in stations_json:
-        if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
-            cur_station = Station(station_json)
-            if SHOW_WITHOUT_FAVICON or cur_station.icon:
-                station_cache[cur_station.id] = cur_station
-                stations.append(cur_station)
-    return stations
+    if station_cache:
+        return station_cache[vtune_id]
+    return None
 
 
 
 
 def get_country_directories():
 def get_country_directories():
+    init_filter()
     country_directories = []
     country_directories = []
     apicall = 'countries'
     apicall = 'countries'
     if not SHOW_BROKEN_STATIONS:
     if not SHOW_BROKEN_STATIONS:
@@ -98,6 +83,7 @@ def get_country_directories():
 
 
 
 
 def get_language_directories():
 def get_language_directories():
+    init_filter()
     language_directories = []
     language_directories = []
     apicall = 'languages'
     apicall = 'languages'
     if not SHOW_BROKEN_STATIONS:
     if not SHOW_BROKEN_STATIONS:
@@ -128,52 +114,76 @@ def get_genre_directories():
 
 
 
 
 def get_stations_by_country(country):
 def get_stations_by_country(country):
+    init_filter()
     station_cache.clear()
     station_cache.clear()
     stations = []
     stations = []
-    stations_json = request('stations/search?order=name&reverse=false&countryExact=true&country=' + str(country))
-    for station_json in stations_json:
-        if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
+    stations_list_json = request('stations/search?order=name&reverse=false&countryExact=true&country=' + str(country))
+    for station_json in stations_list_json:
+        if check_station(station_json):
             cur_station = Station(station_json)
             cur_station = Station(station_json)
-            if SHOW_WITHOUT_FAVICON or cur_station.icon:
-                station_cache[cur_station.id] = cur_station
-                stations.append(cur_station)
+            station_cache[cur_station.id] = cur_station
+            stations.append(cur_station)
+    logging.info("Stations (%d/%d)", len(stations), len(stations_list_json))
+    end_filter()
     return stations
     return stations
 
 
 
 
 def get_stations_by_language(language):
 def get_stations_by_language(language):
+    init_filter()
     station_cache.clear()
     station_cache.clear()
     stations = []
     stations = []
-    stations_json = request('stations/search?order=name&reverse=false&languageExact=true&language=' + str(language))
-    for station_json in stations_json:
-        if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
+    stations_list_json = \
+        request('stations/search?order=name&reverse=false&languageExact=true&language=' + str(language))
+    for station_json in stations_list_json:
+        if check_station(station_json):
             cur_station = Station(station_json)
             cur_station = Station(station_json)
-            if SHOW_WITHOUT_FAVICON or cur_station.icon:
-                station_cache[cur_station.id] = cur_station
-                stations.append(cur_station)
+            station_cache[cur_station.id] = cur_station
+            stations.append(cur_station)
+    logging.info("Stations (%d/%d)", len(stations), len(stations_list_json))
+    end_filter()
     return stations
     return stations
 
 
 
 
 def get_stations_by_genre(genre):
 def get_stations_by_genre(genre):
+    init_filter()
     station_cache.clear()
     station_cache.clear()
     stations = []
     stations = []
-    stations_json = request('stations/search?order=name&reverse=false&tagExact=true&tag=' + str(genre))
-    for station_json in stations_json:
-        if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
+    stations_list_json = request('stations/search?order=name&reverse=false&tagExact=true&tag=' + str(genre))
+    for station_json in stations_list_json:
+        if check_station(station_json):
             cur_station = Station(station_json)
             cur_station = Station(station_json)
-            if SHOW_WITHOUT_FAVICON or cur_station.icon:
-                station_cache[cur_station.id] = cur_station
-                stations.append(cur_station)
+            station_cache[cur_station.id] = cur_station
+            stations.append(cur_station)
+    logging.info("Stations (%d/%d)", len(stations), len(stations_list_json))
+    end_filter()
     return stations
     return stations
 
 
 
 
 def get_stations_by_votes(limit=DEFAULT_STATION_LIMIT):
 def get_stations_by_votes(limit=DEFAULT_STATION_LIMIT):
+    init_filter()
+    station_cache.clear()
+    stations = []
+    stations_list_json = request('stations?order=votes&reverse=true&limit=' + str(limit))
+    for station_json in stations_list_json:
+        if check_station(station_json):
+            cur_station = Station(station_json)
+            station_cache[cur_station.id] = cur_station
+            stations.append(cur_station)
+    logging.info("Stations (%d/%d)", len(stations), len(stations_list_json))
+    end_filter()
+    return stations
+
+
+def search(name, limit=DEFAULT_STATION_LIMIT):
+    init_filter()
     station_cache.clear()
     station_cache.clear()
     stations = []
     stations = []
-    stations_json = request('stations?order=votes&reverse=true&limit=' + str(limit))
-    for station_json in stations_json:
-        if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
+    stations_list_json = request('stations/search?order=name&reverse=false&limit=' + str(limit) + '&name=' + str(name))
+    for station_json in stations_list_json:
+        if check_station(station_json):
             cur_station = Station(station_json)
             cur_station = Station(station_json)
-            if SHOW_WITHOUT_FAVICON or cur_station.icon:
-                station_cache[cur_station.id] = cur_station
-                stations.append(cur_station)
+            station_cache[cur_station.id] = cur_station
+            stations.append(cur_station)
+    logging.info("Stations (%d/%d)", len(stations), len(stations_list_json))
+    end_filter()
     return stations
     return stations

+ 44 - 0
ycast/test_filter.py

@@ -0,0 +1,44 @@
+import json
+import logging
+import unittest
+from io import StringIO
+
+import filter
+import generic
+from ycast import radiobrowser
+
+class MyTestCase(unittest.TestCase):
+
+    logging.getLogger().setLevel(logging.DEBUG)
+
+    def test_init_filter(self):
+        filter.init_filter()
+
+        for elem in filter.filter_dir:
+            logging.warning("Name filtertype: %s", elem)
+            filter_param = filter.filter_dir[elem]
+            if filter_param:
+                for par in filter_param:
+                    logging.warning("    Name paramter: %s",par)
+            else:
+                logging.warning("    <empty list>")
+
+
+    def test_valid_station(self):
+        filter.init_filter()
+        test_lines = generic.readlns_txt_file(generic.get_var_path()+"/test.json")
+        io = StringIO(test_lines[0])
+        stations_json = json.load(io)
+        count = 0
+        for station_json in stations_json:
+            if filter.check_station(station_json):
+                station = radiobrowser.Station(station_json)
+                count = count + 1
+
+        logging.info("Stations (%d/%d)" , count, len(stations_json) )
+        logging.info("Used filter parameter", filter.parameter_failed_list)
+
+
+if __name__ == '__main__':
+    unittest.main()
+