This commit is contained in:
Thomas Hanika 2023-02-24 21:25:52 +01:00 committed by GitHub
commit 8701799f8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1821 additions and 173 deletions

1
.gitignore vendored
View file

@ -2,5 +2,6 @@ build
dist
*.egg-info
.idea
.vscode
*.iml
*.pyc

View file

@ -1,2 +1,4 @@
include README.md
include LICENCE.txt
recursive-include ycast/templates *
recursive-include ycast/static *

View file

@ -1,14 +1,23 @@
<img src="https://image.ibb.co/iBY6hq/yamaha.png" width="600">
# YCast
# YCast (advanced)
[![PyPI latest version](https://img.shields.io/pypi/v/ycast?color=success)](https://pypi.org/project/ycast/) [![GitHub latest version](https://img.shields.io/github/v/release/milaq/YCast?color=success&label=github&sort=semver)](https://github.com/milaq/YCast/releases) [![Python version](https://img.shields.io/pypi/pyversions/ycast)](https://www.python.org/downloads/) [![License](https://img.shields.io/pypi/l/ycast)](https://www.gnu.org/licenses/gpl-3.0.en.html) [![GitHub issues](https://img.shields.io/github/issues/milaq/ycast)](https://github.com/milaq/YCast/issues)
[Get it via PyPI](https://pypi.org/project/ycast/)
[Download from GitHub](https://github.com/THanika/YCast/releases)
[Download from GitHub](https://github.com/milaq/YCast/releases)
[Issue tracker](https://github.com/THanika/YCast/issues)
[Issue tracker](https://github.com/milaq/YCast/issues)
#### pip3 install git+https://github.com/ThHanika/YCast
### The advanced feature:
* Icons in my favorites list 'stations.yml' (the icon URL can be appended after the pipe character '|')
* recently visited radio stations are stored in /.yast/resently.yml (compatible with stations.yml, for easy editing of your favorites and pasting into stations.yml)
* global filter/limits configurable file ./ycast/filter.yml (with this you can globally reduce the radio stations according to your interests). The filter can be modified at runtime useing a REST API (/control/filter...), see below.
* 5 frequently used radio stations can be selected on the target page (self-learning algorithm based on frequency of station selection)
* web frontend to setup your favorites
<img src="https://github.com/ThHanika/YCast/blob/master/webFrontEnd.png" width="400">
YCast is a self hosted replacement for the vTuner internet radio service which many AVRs use.
It emulates a vTuner backend to provide your AVR with the necessary information to play self defined categorized internet radio stations and listen to Radio stations listed in the [Community Radio Browser index](http://www.radio-browser.info).
@ -117,7 +126,7 @@ You can redirect all traffic destined for the original request URL (e.g. `radioy
__Attention__: Do not rewrite the requests transparently. YCast expects the complete URL (i.e. including `/ycast` or `/setupapp`). It also need an intact `Host` header; so if you're proxying YCast you need to pass the original header on. For Nginx, this can be accomplished with `proxy_set_header Host $host;`.
In case you are using (or plan on using) Nginx to proxy requests, have a look at [this example](examples/nginx-ycast.conf.example).
This can be used together with [this systemd service example](examples/ycast.service.example) for a fully functional deployment.
This can be used together with [this systemd service example](examples/ycast.service.example_ycast) for a fully functional deployment.
#### With WSGI
@ -139,6 +148,20 @@ Category two name:
You can also have a look at the provided [example](examples/stations.yml.example) to better understand the configuration.
### Filter/limits
As the amount of stations can be overwhelming on a AV receiver interface Ycast allows for filtering. The filter configuration file .ycast/filter.yml allows to filter stations based on a whitelist / blacklist. The contents of this list specifies which attributes to filter on. Look at the provided [example](examples/filter.yml.example) for the details.
The limits allow to filter out genres, countries and languages that fail to have a certain amount of items. It also sets the default station limit for search and votes and allows to show or hide broken stations. Defaults are as follows:
* MINIMUM_COUNT_GENRE : 40
* MINIMUM_COUNT_COUNTRY : 5
* MINIMUM_COUNT_LANGUAGE : 5
* DEFAULT_STATION_LIMIT : 200
* SHOW_BROKEN_STATIONS : False
You can set your own values in filter.xml by adding these attributes and values in the limits list. The filter file is not reread automatically when modified while the server is running. Send a HUP signal to trigger but it's preferred to use the api (see below) to modify the lists.
The current filters/limits can be queried through a REST API by calling the GET method on /control/filter/whitelist, /control/filter/blacklist and /control/filter/limits. They can be modified by using the POST method an posting a JSON with the items to modify. Specifying a null value for an item will delete it from the list or, in the case of the limits, reset it to its default.
## Firewall rules
* Your AVR needs access to the internet.

69
docker/Dockerfile Normal file
View file

@ -0,0 +1,69 @@
#
# Docker Buildfile for the ycast-docker container based on alpine linux - about 41.4MB
# put dockerfile and bootstrap.sh in same directory and build or enter
# docker build https://github.com/MaartenSanders/ycast-docker.git
#
FROM alpine:latest
#
# Variables
# YC_VERSION version of ycast software
# YC_STATIONS path an name of the indiviudual stations.yml e.g. /ycast/stations/stations.yml
# YC_DEBUG turn ON or OFF debug output of ycast server else only start /bin/sh
# YC_PORT port ycast server listens to, e.g. 80
#
ENV YC_VERSION master
ENV YC_STATIONS /opt/ycast/stations.yml
ENV YC_DEBUG OFF
ENV YC_PORT 80
#
# Upgrade alpine Linux, install python3 and dependencies for pillow - alpine does not use glibc
# pip install needed modules for ycast
#
RUN apk --no-cache update && \
apk --no-cache upgrade && \
apk add --no-cache python3 && \
apk add --no-cache py3-pip && \
apk add --no-cache zlib-dev && \
apk add --no-cache jpeg-dev && \
apk add --no-cache build-base && \
apk add --no-cache python3-dev && \
pip3 install --no-cache-dir requests && \
pip3 install --no-cache-dir flask && \
pip3 install --no-cache-dir PyYAML && \
pip3 install --no-cache-dir Pillow && \
pip3 install --no-cache-dir olefile && \
mkdir -p /opt/ycast/YCast-master && \
apk del --no-cache python3-dev && \
apk del --no-cache build-base && \
apk del --no-cache zlib-dev && \
apk add --no-cache curl
# download ycast tar.gz and extract it in ycast Directory
RUN curl -L https://codeload.github.com/superclass/YCast/tar.gz/master \
| tar xvzC /opt/ycast
# delete unneeded stuff
RUN apk del --no-cache curl && \
find /usr/lib -name \*.pyc -exec rm -f {} \; && \
find /usr/lib -type f -name \*.exe -exec rm -f {} \;
#
# Set Workdirectory on ycast folder
#
WORKDIR /opt/ycast/YCast-${YC_VERSION}
#
# Copy bootstrap.sh to /opt
#
COPY bootstrap.sh /opt
#
# Docker Container should be listening for AVR on port 80
#
EXPOSE ${YC_PORT}/tcp
#
# Start bootstrap on Container start
#
RUN ["chmod", "+x", "/opt/bootstrap.sh"]
ENTRYPOINT ["/opt/bootstrap.sh"]

24
docker/bootstrap.sh Normal file
View file

@ -0,0 +1,24 @@
#!/bin/sh
#Bootstrap File für ycast docker container
#Variables
#YC_VERSION version of ycast software
#YC_STATIONS path an name of the indiviudual stations.yml e.g. /ycast/stations/stations.yml
#YC_DEBUG turn ON or OFF debug output of ycast server else only start /bin/sh
#YC_PORT port ycast server listens to, e.g. 80
if [ "$YC_DEBUG" = "OFF" ]; then
/usr/bin/python3 -m ycast -c $YC_STATIONS -p $YC_PORT
elif [ "$YC_DEBUG" = "ON" ]; then
/usr/bin/python3 -m ycast -c $YC_STATIONS -p $YC_PORT -d
else
/bin/sh
fi

24
examples/apicalls.sh Normal file
View file

@ -0,0 +1,24 @@
ENDPOINT=127.0.0.1
# API
# Get recently played stations
curl "http://${ENDPOINT}/api/stations?category=recently"
# Get highest rated stations
curl "http://${ENDPOINT}/api/stations?category=voted"
# Get stations by language, specify by language paramter (default=german)
curl "http://${ENDPOINT}/api/stations?category=language&language=dutch"
# Get stations by country, specify by country paramter (default=Germany)
curl "http://${ENDPOINT}/api/stations?category=country&country=The%20Netherlands"
# Ycast XML calls
curl "http://${ENDPOINT}/setupapp"
# Search by name
curl "http://${ENDPOINT}/ycast/search/?search=Pinguin"
# List top directories (Genres, Countries, Languages, Most Popular).
curl "http://${ENDPOINT}/ycast/radiobrowser/"
# Play station
curl "http://${ENDPOINT}/ycast/play?id=stationid"
# Get station info
curl "http://${ENDPOINT}/ycast/station?id=stationid"
curl "http://${ENDPOINT}/ycast/icon?id=stationid"
# Get station icon

View file

@ -0,0 +1,53 @@
# Filters can be applied to the search results coming back from
# api.radio-browser.info. Results can either be whitelisted or blacklisted. The
# attributes in the whitelist are actually the attributes of the Station Struct
# defined here: https://de1.api.radio-browser.info/#Struct_station. The most
# useful ones to filter on are: codec, bitrate, language, languagecode. Ycast
# has a default whitelist for the lastcheckok attribute to be set (1) which
# indicates the stream is currently operational, as it does not make sense to
# return stations to the AV receiver that are broken. There are a few
# attributes that have a multi value string, the values are separated by a
# comma (,) (imo this should be a json list so clients don't have to parse the
# string, but it is as it is). The filter code will split these strings into a
# list first and then will try to match the the value from the whitelist or
# blacklist on any of the values in the list. The most interesting multi value
# attribute is the tags attribute which carries the genre(s) of the station.
# Unfortunately the values are rather free format, so there is no fixed list of
# genres to filter on and most stations indicate multiple genres. Attribute
# filter values can be either a single or multi-value. Multi-values
# should be entered in the filter file as a json list, either using the bracket
# ([]) or list (-) syntax. See the examples below.
#
# For the directory listings by Genre, Language, Country the following filter
# attributes will be applied: Genre: tags; Language: languagecodes; Country:
# country.
whitelist:
#Filter on the full country name:
#country: Germany
#Filter on any of the country codes specified in this [] list:
#countrycode: [ "DE","US","NO","GB" ]
#Filter on any of the language codes specified in this - list:
#languagecodes:
# - "en"
# - "no"
# Filter on bitrate:
#bitrate: 192
#To override the lastcheckok default (1) use this:
#lastcheckok: [ 0,1 ]
blacklist:
# Filter out stations with no favicon:
favicon: ''
# Filter on codecs:
#codec: ["AAC", "AAC+"]
# Limits can be applied, below are the hardcoded defautls, which can be overridden in this file.
#limits:
# The following limits will be applied to the directory listing of genre, country and language. Each item should contain this minium amount of entries to be returned.
#MINIMUM_COUNT_GENRE: 40
#MINIMUM_COUNT_COUNTRY: 5
#MINIMUM_COUNT_LANGUAGE: 5
# The default maximum amount of entries to return from search and search by
# votes.
#DEFAULT_STATION_LIMIT: 200
# Include broken stations in the result.
#SHOW_BROKEN_STATIONS: False

View file

@ -1,12 +0,0 @@
[Unit]
Description=YCast internet radio service
After=network.target
[Service]
Type=simple
User=ycast
Group=ycast
ExecStart=/usr/bin/python3 -m ycast -l 127.0.0.1 -p 8010
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,21 @@
[Unit]
Description=YCast internet radio service
After=network.target
[Service]
Type=simple
WorkingDirectory=/var/www/ycast
# StandardOutput=file:/var/www/ycast/service.log
# StandardError=file:/var/www/ycast/ycast.log
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=ycast
Restart=always
RestartSec=130
ExecStart=/usr/bin/python3 -m ycast -c /var/www/ycast/stations.yml -d
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,15 @@
[Unit]
Description=YCast internet radio service
After=network.target
[Service]
Type=simple
User=ycast
Group=ycast
WorkingDirectory=/home/ycast
StandardOutput=file:/home/ycast/service.log
StandardError=file:/home/ycast/ycast.log
ExecStart=/usr/bin/python3 -m ycast -l 127.0.0.1 -p 8010 -d -c /home/ycast/.ycast/stations.yml
[Install]
WantedBy=multi-user.target

View file

@ -13,6 +13,7 @@ setup(
description='Self hosted vTuner internet radio service emulation',
long_description=long_description,
long_description_content_type="text/markdown",
include_package_data=True,
url='https://github.com/milaq/YCast',
license='GPLv3',
classifiers=[

BIN
webFrontEnd.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 176 KiB

View file

@ -1 +1 @@
__version__ = '1.1.0'
__version__ = '1.3.0'

View file

@ -3,12 +3,18 @@
import argparse
import logging
import sys
import signal
from ycast import __version__
from ycast import server
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)
def handler(signum, frame):
logging.info('Signal received: rereading filter config')
from ycast.my_filter import init_filter_file
init_filter_file()
signal.signal(signal.SIGHUP, handler)
def launch_server():
parser = argparse.ArgumentParser(description='vTuner API emulation')
@ -23,6 +29,13 @@ def launch_server():
logging.debug("Debug logging enabled")
else:
logging.getLogger('werkzeug').setLevel(logging.WARNING)
# initialize important ycast parameters
from ycast.generic import init_base_dir
init_base_dir('/.ycast')
from ycast.my_filter import init_filter_file
init_filter_file()
server.run(arguments.config, arguments.address, arguments.port)

View file

@ -1,9 +1,17 @@
import logging
import os
import hashlib
import sys
import yaml
USER_AGENT = 'YCast'
VAR_PATH = os.path.expanduser("~") + '/.ycast'
CACHE_PATH = VAR_PATH + '/cache'
# initialize it start
VAR_PATH = ''
CACHE_PATH = ''
stations_file_by_config = ''
class Directory:
@ -15,6 +23,50 @@ class Directory:
else:
self.displayname = name
def to_dict(self):
return {'name': self.name , 'displayname': self.displayname, 'count': self.item_count }
def mk_writeable_dir(path):
try:
os.makedirs(path)
except FileExistsError:
pass
except Exception as ex:
logging.error("Could not create base folder (%s) because of access permissions: %s", path, ex)
return None
return path
def init_base_dir(path_element):
global VAR_PATH, CACHE_PATH
logging.info('Initialize base directory %s', path_element)
logging.debug(' HOME: %s',os.path.expanduser("~"))
logging.debug(' PWD: %s',os.getcwd())
var_dir = None
if not os.getcwd().endswith('/ycast'):
# specified working dir with /ycast has prio
try_path = os.path.expanduser("~") + path_element
logging.info(' try Home-Dir: %s', try_path)
var_dir = mk_writeable_dir(try_path)
if var_dir is None:
# avoid using root '/' and it's subdir
if len(os.getcwd()) < 6:
logging.error(" len(PWD) < 6 (PWD is too small) < 6: '%s'", os.getcwd())
else:
try_path = os.getcwd() + path_element
logging.info(' try Work-Dir: %s', try_path)
var_dir = mk_writeable_dir(os.getcwd() + path_element)
if var_dir is None:
sys.exit('YCast: ###### No usable directory found #######, I give up....')
logging.info('using var directory: %s', var_dir)
VAR_PATH = var_dir
CACHE_PATH = var_dir + '/cache'
return
def generate_stationid_with_prefix(uid, prefix):
if not prefix or len(prefix) != 2:
@ -41,7 +93,9 @@ def get_stationid_without_prefix(uid):
def get_cache_path(cache_name):
cache_path = CACHE_PATH + '/' + cache_name
cache_path = CACHE_PATH
if cache_name:
cache_path = CACHE_PATH + '/' + cache_name
try:
os.makedirs(cache_path)
except FileExistsError:
@ -50,3 +104,97 @@ def get_cache_path(cache_name):
logging.error("Could not create cache folders (%s) because of access permissions", cache_path)
return None
return cache_path
def get_var_path():
try:
os.makedirs(VAR_PATH)
except FileExistsError:
pass
except PermissionError:
logging.error("Could not create cache folders (%s) because of access permissions", VAR_PATH)
return None
return VAR_PATH
def get_recently_file():
return get_var_path() + '/recently.yml'
def get_filter_file():
return get_var_path() + '/filter.yml'
def get_stations_file():
global stations_file_by_config
if stations_file_by_config:
return stations_file_by_config
return get_var_path() + '/stations.yml'
def set_stations_file(stations_file):
global stations_file_by_config
if stations_file:
stations_file_by_config = stations_file
def get_checksum(feed, charlimit=12):
hash_feed = feed.encode()
hash_object = hashlib.md5(hash_feed)
digest = hash_object.digest()
xor_fold = bytearray(digest[:8])
for i, b in enumerate(digest[8:]):
xor_fold[i] ^= b
digest_xor_fold = ''.join(format(x, '02x') for x in bytes(xor_fold))
return str(digest_xor_fold[:charlimit]).upper()
def read_yaml_file(file_name):
try:
with open(file_name, 'r') as f:
return yaml.safe_load(f)
except FileNotFoundError:
logging.warning("YAML file '%s' not found", file_name)
except yaml.YAMLError as e:
logging.error("YAML format error in '%':\n %s", file_name, e)
return None
def write_yaml_file(file_name, dictionary):
try:
with open(file_name, 'w') as f:
# no sort please
yaml.dump(dictionary, f, sort_keys=False)
return True
except yaml.YAMLError as e:
logging.error("YAML format error in '%':\n %s", file_name, e)
except Exception as ex:
logging.error("File not written '%s':\n %s", file_name, ex)
return False
def readlns_txt_file(file_name):
try:
with open(file_name, 'r') as f:
return f.readlines()
except FileNotFoundError:
logging.warning("TXT file '%s' not found", file_name)
return None
def writelns_txt_file(file_name, line_list):
try:
with open(file_name, 'w') as f:
f.writelines(line_list)
return True
except Exception as ex:
logging.error("File not written '%s':\n %s", file_name, ex)
return False
def get_json_attr(json, attr):
try:
return json[attr]
except Exception as ex:
logging.debug("json: attr '%s' not found: %s", attr, ex)
return None

170
ycast/my_filter.py Normal file
View file

@ -0,0 +1,170 @@
import logging
from ycast import generic
from ycast.generic import get_json_attr
white_list = {'lastcheckok': 1}
black_list = {}
limit_list = {}
limit_defs_int ={ 'MINIMUM_COUNT_GENRE' : 40, 'MINIMUM_COUNT_COUNTRY' : 5, 'MINIMUM_COUNT_LANGUAGE' : 5, 'DEFAULT_STATION_LIMIT' : 200}
limit_defs_bool ={ 'SHOW_BROKEN_STATIONS' : False}
parameter_failed_list = {}
count_used = 0
count_hit = 0
def init_filter_file():
global white_list, black_list, limit_list
logging.info('Reading Limits and Filters')
filter_dictionary = generic.read_yaml_file(generic.get_filter_file())
if filter_dictionary is None:
filter_dictionary = {}
is_updated = True
if 'whitelist' in filter_dictionary:
if filter_dictionary['whitelist'] is None:
white_list = { "lastcheckok" : 1}
else:
# Copy so the default is preserved.
for f in filter_dictionary['whitelist']:
white_list[f]=filter_dictionary['whitelist'][f]
if 'blacklist' in filter_dictionary:
# reference, no defaults
if filter_dictionary['blacklist'] is None:
black_list={}
else:
black_list=filter_dictionary['blacklist']
if 'limits' in filter_dictionary:
set_limits(filter_dictionary['limits'])
def write_filter_config():
global limit_list
filter_dictionary = {'whitelist': white_list, 'blacklist': black_list}
if len(limit_list) > 0: filter_dictionary['limits']=limit_list
generic.write_yaml_file(generic.get_var_path() + '/filter.yml', filter_dictionary)
def begin_filter():
global parameter_failed_list
global count_used
global count_hit
count_used = 0
count_hit = 0
#init_filter_file()
parameter_failed_list.clear()
return
def end_filter():
if parameter_failed_list:
logging.info("(%d/%d) stations filtered by: %s", count_hit, count_used, parameter_failed_list)
else:
logging.info("(%d/%d) stations filtered by: <no filter used>")
def parameter_failed_evt(param_name):
count = 1
old = None
if parameter_failed_list:
old = parameter_failed_list.get(param_name)
if old:
count = old + 1
parameter_failed_list[param_name] = count
def verify_value(ref_val, val):
if isinstance(val, str) and val.find(",") > -1:
val_list=val.split(",")
else:
val_list=[val]
for v in val_list:
if v == None:
v=''
if isinstance(ref_val, list):
return v in ref_val
if str(ref_val) == str(v):
return True
if ref_val is None:
return len(v) == 0
# if type(val) is int:
## return val == int(ref_val)
# if val:
# return ref_val.find(str(val)) >= 0
return False
def chk_parameter(parameter_name, val):
if black_list:
if parameter_name in black_list:
if verify_value(black_list[parameter_name], val):
return False
if white_list:
if parameter_name in white_list:
return verify_value(white_list[parameter_name], val)
return True
def check_station(station_json):
global count_used
global count_hit
count_used = count_used + 1
station_name = get_json_attr(station_json, 'name')
if not station_name:
# müll response
logging.debug(station_json)
return False
# oder verknüpft
if black_list:
for param_name in black_list:
val = get_json_attr(station_json, param_name)
if verify_value(black_list[param_name], val):
parameter_failed_evt(param_name)
# logging.debug("FAIL '%s' blacklist failed on '%s' '%s' == '%s'",
# station_name, param_name, black_list[param_name], val)
return False
# und verknüpft
if white_list:
for param_name in white_list:
val = get_json_attr(station_json, param_name)
if val is not None:
# attribut in json vorhanden
if not verify_value(white_list[param_name], val):
parameter_failed_evt(param_name)
# logging.debug("FAIL '%s' whitelist failed on '%s' '%s' == '%s'",
# station_name, param_name, white_list[param_name], val)
return False
count_hit = count_hit + 1
return True
def get_limit(param_name):
global limit_defs
if param_name in limit_defs_int: return limit_list.get(param_name,limit_defs_int[param_name])
if param_name in limit_defs_bool: return limit_list.get(param_name,limit_defs_bool[param_name])
else: return None
def get_limit_list():
global limit_defs_int, limit_defs_bool
my_limits={}
limit_defs=dict(limit_defs_int)
limit_defs.update(limit_defs_bool)
for l in limit_defs:
my_limits[l]=get_limit(l)
return my_limits
def set_limits(limits):
global limit_list, limit_defs_int, limit_defs_bool
for l in limits:
if limits[l] == None:
limit_list.pop(l, None)
elif l in limit_defs_int:
if isinstance(limits[l], int) and limits[l] > 0:
limit_list[l]=limits[l]
elif l in limit_defs_bool:
if isinstance(limits[l], bool): limit_list[l]=limits[l]
else:
loggin.error("Invalid limit %s") % l
return get_limit_list()

114
ycast/my_recentlystation.py Normal file
View file

@ -0,0 +1,114 @@
from ycast import generic, my_stations
from ycast.generic import get_recently_file
MAX_ENTRIES = 15
# define a max, so after 5 hits, another station is get better votes
MAX_VOTES = 5
DIRECTORY_NAME = "recently used"
recently_station_dictionary = None
voted5_station_dictinary = None
class StationVote:
def __init__(self, name, params_txt):
self.name = name
params = params_txt.split('|')
self.url = params[0]
self.icon = ''
self.vote = 0
if len(params) > 1:
self.icon = params[1]
if len(params) > 2:
self.vote = int(params[2])
def to_params_txt(self):
text_line = self.url + '|' + self.icon + '|' + str(self.vote)
return text_line
def to_server_station(self, cathegory):
return my_stations.Station(self.name, self.url, cathegory, self.icon)
def signal_station_selected(name, url, icon):
recently_station_list = get_stations_list()
station_hit = StationVote(name, url + '|' + icon)
for recently_station in recently_station_list:
if name == recently_station.name:
station_hit.vote = recently_station.vote + 1
recently_station_list.remove(recently_station)
break
recently_station_list.insert(0, station_hit)
if station_hit.vote > MAX_VOTES:
for recently_station in recently_station_list:
if recently_station.vote > 0:
recently_station.vote = recently_station.vote - 1
if len(recently_station_list) > MAX_ENTRIES:
# remove last (oldest) entry
recently_station_list.pop()
set_recently_station_dictionary(mk_station_dictionary(directory_name(), recently_station_list))
def set_recently_station_dictionary(station_dict):
global recently_station_dictionary
recently_station_dictionary = station_dict
generic.write_yaml_file(get_recently_file(), recently_station_dictionary)
def mk_station_dictionary(cathegory, station_list):
new_cathegory_dictionary = {}
station_dictionary = {}
for station in station_list:
station_dictionary[station.name] = station.to_params_txt()
new_cathegory_dictionary[cathegory] = station_dictionary
return new_cathegory_dictionary
def get_stations_list():
stations_list = []
cathegory_dict = get_recently_stations_dictionary()
if cathegory_dict:
for cat_key in cathegory_dict:
station_dict = cathegory_dict[cat_key]
for station_key in station_dict:
stations_list.append(StationVote(station_key, station_dict[station_key]))
return stations_list
def get_recently_stations_dictionary():
# cached recently
global recently_station_dictionary
if not recently_station_dictionary:
recently_station_dictionary = generic.read_yaml_file(get_recently_file())
return recently_station_dictionary
def directory_name():
station_dictionary = get_recently_stations_dictionary()
if station_dictionary:
return list(get_recently_stations_dictionary().keys())[0]
return DIRECTORY_NAME
# used in landing page
def get_stations_by_vote():
station_list = get_stations_list()
station_list.sort(key=lambda station: station.vote, reverse=True)
station_list = station_list[:5]
stations = []
for item in station_list:
stations.append(item.to_server_station('voted'))
return stations
def get_stations_by_recently():
station_list = get_stations_list()
stations = []
for item in station_list:
stations.append(item.to_server_station(directory_name()))
return stations

View file

@ -1,58 +1,46 @@
import logging
import hashlib
import yaml
import ycast.vtuner as vtuner
import ycast.generic as generic
ID_PREFIX = "MY"
config_file = 'stations.yml'
class Station:
def __init__(self, uid, name, url, category):
self.id = generic.generate_stationid_with_prefix(uid, ID_PREFIX)
def __init__(self, name, url, category, icon):
self.id = generic.generate_stationid_with_prefix(
generic.get_checksum(name + url), ID_PREFIX)
self.name = name
self.url = url
self.tag = category
self.icon = None
self.icon = icon
def to_vtuner(self):
return vtuner.Station(self.id, self.name, self.tag, self.url, self.icon, self.tag, None, None, None, None)
def set_config(config):
global config_file
if config:
config_file = config
if get_stations_yaml():
return True
else:
return False
def to_dict(self):
return {'name': self.name , 'url': self.url, 'icon': self.icon, 'description': self.tag }
def get_station_by_id(uid):
def get_station_by_id(vtune_id):
my_stations_yaml = get_stations_yaml()
if my_stations_yaml:
for category in my_stations_yaml:
for station in get_stations_by_category(category):
if uid == generic.get_stationid_without_prefix(station.id):
if vtune_id == station.id:
return station
return None
def get_stations_yaml():
try:
with open(config_file, 'r') as f:
my_stations = yaml.safe_load(f)
except FileNotFoundError:
logging.error("Station configuration '%s' not found", config_file)
return None
except yaml.YAMLError as e:
logging.error("Station configuration format error: %s", e)
return None
from ycast.my_recentlystation import get_recently_stations_dictionary
my_recently_station = get_recently_stations_dictionary()
my_stations = generic.read_yaml_file(generic.get_stations_file())
if my_stations:
if my_recently_station:
my_stations.update(my_recently_station)
else:
return my_recently_station
return my_stations
@ -70,18 +58,42 @@ def get_stations_by_category(category):
stations = []
if my_stations_yaml and category in my_stations_yaml:
for station_name in my_stations_yaml[category]:
station_url = my_stations_yaml[category][station_name]
station_id = str(get_checksum(station_name + station_url)).upper()
stations.append(Station(station_id, station_name, station_url, category))
station_urls = my_stations_yaml[category][station_name]
param_list = station_urls.split('|')
station_url = param_list[0]
station_icon = None
if len(param_list) > 1:
station_icon = param_list[1]
stations.append(Station(station_name, station_url, category, station_icon))
return stations
def get_all_bookmarks_stations():
bm_stations_category = generic.read_yaml_file(generic.get_stations_file())
stations = []
if bm_stations_category :
for category in bm_stations_category:
for station_name in bm_stations_category[category]:
station_urls = bm_stations_category[category][station_name]
param_list = station_urls.split('|')
station_url = param_list[0]
station_icon = None
if len(param_list) > 1:
station_icon = param_list[1]
stations.append(Station(station_name, station_url, category, station_icon))
return stations
def get_checksum(feed, charlimit=12):
hash_feed = feed.encode()
hash_object = hashlib.md5(hash_feed)
digest = hash_object.digest()
xor_fold = bytearray(digest[:8])
for i, b in enumerate(digest[8:]):
xor_fold[i] ^= b
digest_xor_fold = ''.join(format(x, '02x') for x in bytes(xor_fold))
return digest_xor_fold[:charlimit]
def putBookmarkJson(elements):
newDict={}
for stationJson in elements:
logging.debug("%s ... %s",stationJson['description'], stationJson['name'])
if stationJson['description'] not in newDict:
newDict[stationJson['description']] = {}
logging.debug(stationJson)
if stationJson['icon'] is not None:
newDict[stationJson['description']][stationJson['name']] = stationJson['url'] + "|" + stationJson['icon']
else:
newDict[stationJson['description']][stationJson['name']] = stationJson['url']
generic.write_yaml_file(generic.get_stations_file(),newDict)
return elements

View file

@ -1,49 +1,57 @@
import base64
import json
import uuid
import requests
import logging
from ycast import __version__
from ycast import __version__, my_filter
import ycast.vtuner as vtuner
import ycast.generic as generic
from ycast.my_filter import check_station, begin_filter, end_filter, get_limit
from ycast.generic import get_json_attr
API_ENDPOINT = "http://all.api.radio-browser.info"
MINIMUM_COUNT_GENRE = 5
MINIMUM_COUNT_COUNTRY = 5
MINIMUM_COUNT_LANGUAGE = 5
DEFAULT_STATION_LIMIT = 200
SHOW_BROKEN_STATIONS = False
ID_PREFIX = "RB"
def get_json_attr(json, attr):
try:
return json[attr]
except:
return None
station_cache = {}
class Station:
def __init__(self, station_json):
self.id = generic.generate_stationid_with_prefix(get_json_attr(station_json, 'stationuuid'), ID_PREFIX)
self.name = get_json_attr(station_json, 'name')
self.url = get_json_attr(station_json, 'url')
self.icon = get_json_attr(station_json, 'favicon')
self.tags = get_json_attr(station_json, 'tags').split(',')
self.countrycode = get_json_attr(station_json, 'countrycode')
self.language = get_json_attr(station_json, 'language')
self.votes = get_json_attr(station_json, 'votes')
self.codec = get_json_attr(station_json, 'codec')
self.bitrate = get_json_attr(station_json, 'bitrate')
self.stationuuid = generic.get_json_attr(station_json, 'stationuuid')
self.id = generic.generate_stationid_with_prefix(
base64.urlsafe_b64encode(uuid.UUID(self.stationuuid).bytes).decode(), ID_PREFIX)
self.name = generic.get_json_attr(station_json, 'name')
self.url = generic.get_json_attr(station_json, 'url_resolved')
if not self.url:
self.url = generic.get_json_attr(station_json, 'url')
self.icon = generic.get_json_attr(station_json, 'favicon')
self.description = generic.get_json_attr(station_json, 'tags')
self.tags = generic.get_json_attr(station_json, 'tags').split(',')
self.countrycode = generic.get_json_attr(station_json, 'countrycode')
self.language = generic.get_json_attr(station_json, 'language')
self.languagecodes = generic.get_json_attr(station_json, 'languagecodes')
self.votes = generic.get_json_attr(station_json, 'votes')
self.codec = generic.get_json_attr(station_json, 'codec')
self.bitrate = generic.get_json_attr(station_json, 'bitrate')
def to_vtuner(self):
return vtuner.Station(self.id, self.name, ', '.join(self.tags), self.url, self.icon,
return vtuner.Station(self.id, self.name,
self.description, self.url, self.icon,
self.tags[0], self.countrycode, self.codec, self.bitrate, None)
def to_dict(self):
return {'name': self.name , 'url': self.url, 'icon': self.icon, 'description': self.description }
def get_playable_url(self):
try:
playable_url_json = request('url/' + generic.get_stationid_without_prefix(self.id))[0]
playable_url_json = request('url/' + str(self.stationuuid))
self.url = playable_url_json['url']
except (IndexError, KeyError):
logging.error("Could not retrieve first playlist item for station with id '%s'", self.id)
logging.error("Could not retrieve first playlist item for station with id '%s'", self.stationuuid)
def request(url):
@ -60,98 +68,138 @@ def request(url):
return response.json()
def get_station_by_id(uid):
station_json = request('stations/byuuid/' + str(uid))
def get_station_by_id(vtune_id):
global station_cache
# decode
uidbase64 = generic.get_stationid_without_prefix(vtune_id)
uid = str(uuid.UUID(base64.urlsafe_b64decode(uidbase64).hex()))
if station_cache:
station = station_cache[vtune_id]
if station:
return station
# no item in cache, do request
station_json = request('stations/byuuid?uuids=' + uid)
if station_json and len(station_json):
return Station(station_json[0])
else:
return None
def search(name, limit=DEFAULT_STATION_LIMIT):
stations = []
stations_json = request('stations/search?order=name&reverse=false&limit=' + str(limit) + '&name=' + str(name))
for station_json in stations_json:
if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
stations.append(Station(station_json))
return stations
station = Station(station_json[0])
if station:
station_cache[station.id] = station
return station
return None
def get_country_directories():
country_directories = []
apicall = 'countries'
if not SHOW_BROKEN_STATIONS:
if not get_limit('SHOW_BROKEN_STATIONS'):
apicall += '?hidebroken=true'
countries_raw = request(apicall)
for country_raw in countries_raw:
if get_json_attr(country_raw, 'name') and get_json_attr(country_raw, 'stationcount') and \
int(get_json_attr(country_raw, 'stationcount')) > MINIMUM_COUNT_COUNTRY:
country_directories.append(generic.Directory(get_json_attr(country_raw, 'name'),
get_json_attr(country_raw, 'stationcount')))
int(get_json_attr(country_raw, 'stationcount')) > get_limit('MINIMUM_COUNT_COUNTRY'):
if my_filter.chk_parameter('country', get_json_attr(country_raw, 'name')):
country_directories.append(generic.Directory(get_json_attr(country_raw, 'name'),
get_json_attr(country_raw, 'stationcount')))
return country_directories
def get_language_directories():
language_directories = []
apicall = 'languages'
if not SHOW_BROKEN_STATIONS:
if not get_limit('SHOW_BROKEN_STATIONS'):
apicall += '?hidebroken=true'
languages_raw = request(apicall)
for language_raw in languages_raw:
if get_json_attr(language_raw, 'name') and get_json_attr(language_raw, 'stationcount') and \
int(get_json_attr(language_raw, 'stationcount')) > MINIMUM_COUNT_LANGUAGE:
language_directories.append(generic.Directory(get_json_attr(language_raw, 'name'),
get_json_attr(language_raw, 'stationcount'),
get_json_attr(language_raw, 'name').title()))
int(get_json_attr(language_raw, 'stationcount')) > get_limit('MINIMUM_COUNT_LANGUAGE'):
if my_filter.chk_parameter('languagecodes', get_json_attr(language_raw, 'iso_639')):
language_directories.append(generic.Directory(get_json_attr(language_raw, 'name'),
get_json_attr(language_raw, 'stationcount'),
get_json_attr(language_raw, 'name').title()))
return language_directories
def get_genre_directories():
genre_directories = []
apicall = 'tags'
if not SHOW_BROKEN_STATIONS:
if not get_limit('SHOW_BROKEN_STATIONS'):
apicall += '?hidebroken=true'
genres_raw = request(apicall)
for genre_raw in genres_raw:
if get_json_attr(genre_raw, 'name') and get_json_attr(genre_raw, 'stationcount') and \
int(get_json_attr(genre_raw, 'stationcount')) > MINIMUM_COUNT_GENRE:
genre_directories.append(generic.Directory(get_json_attr(genre_raw, 'name'),
int(get_json_attr(genre_raw, 'stationcount')) > get_limit('MINIMUM_COUNT_GENRE'):
if my_filter.chk_parameter('tags', get_json_attr(genre_raw, 'name')):
genre_directories.append(generic.Directory(get_json_attr(genre_raw, 'name'),
get_json_attr(genre_raw, 'stationcount'),
get_json_attr(genre_raw, 'name').capitalize()))
return genre_directories
def get_stations_by_country(country):
begin_filter()
station_cache.clear()
stations = []
stations_json = request('stations/search?order=name&reverse=false&countryExact=true&country=' + str(country))
for station_json in stations_json:
if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
stations.append(Station(station_json))
stations_list_json = request('stations/search?order=name&reverse=false&countryExact=true&country=' + str(country))
for station_json in stations_list_json:
if check_station(station_json):
cur_station = Station(station_json)
station_cache[cur_station.id] = cur_station
stations.append(cur_station)
end_filter()
return stations
def get_stations_by_language(language):
begin_filter()
station_cache.clear()
stations = []
stations_json = request('stations/search?order=name&reverse=false&languageExact=true&language=' + str(language))
for station_json in stations_json:
if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
stations.append(Station(station_json))
stations_list_json = \
request('stations/search?order=name&reverse=false&languageExact=true&language=' + str(language))
for station_json in stations_list_json:
if check_station(station_json):
cur_station = Station(station_json)
station_cache[cur_station.id] = cur_station
stations.append(cur_station)
end_filter()
return stations
def get_stations_by_genre(genre):
begin_filter()
station_cache.clear()
stations = []
stations_json = request('stations/search?order=name&reverse=false&tagExact=true&tag=' + str(genre))
for station_json in stations_json:
if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
stations.append(Station(station_json))
stations_list_json = request('stations/search?order=name&reverse=false&tagExact=true&tag=' + str(genre))
for station_json in stations_list_json:
if check_station(station_json):
cur_station = Station(station_json)
station_cache[cur_station.id] = cur_station
stations.append(cur_station)
end_filter()
return stations
def get_stations_by_votes(limit=DEFAULT_STATION_LIMIT):
def get_stations_by_votes(limit=get_limit('DEFAULT_STATION_LIMIT')):
begin_filter()
station_cache.clear()
stations = []
stations_json = request('stations?order=votes&reverse=true&limit=' + str(limit))
for station_json in stations_json:
if SHOW_BROKEN_STATIONS or get_json_attr(station_json, 'lastcheckok') == 1:
stations.append(Station(station_json))
stations_list_json = request('stations?order=votes&reverse=true&limit=' + str(limit))
for station_json in stations_list_json:
if check_station(station_json):
cur_station = Station(station_json)
station_cache[cur_station.id] = cur_station
stations.append(cur_station)
end_filter()
return stations
def search(name, limit=get_limit('DEFAULT_STATION_LIMIT')):
begin_filter()
station_cache.clear()
stations = []
stations_list_json = request('stations/search?order=name&reverse=false&limit=' + str(limit) + '&name=' + str(name))
for station_json in stations_list_json:
if check_station(station_json):
cur_station = Station(station_json)
station_cache[cur_station.id] = cur_station
stations.append(cur_station)
end_filter()
return stations

View file

@ -1,14 +1,17 @@
import logging
import re
from flask import Flask, request, url_for, redirect, abort, make_response
import flask
from flask import Flask, request, url_for, redirect, abort, make_response, render_template
import ycast.vtuner as vtuner
import ycast.radiobrowser as radiobrowser
import ycast.my_stations as my_stations
import ycast.generic as generic
import ycast.station_icons as station_icons
import ycast.my_filter as my_filter
from ycast import my_recentlystation
from ycast.my_recentlystation import signal_station_selected
PATH_ROOT = 'ycast'
PATH_PLAY = 'play'
@ -23,50 +26,46 @@ PATH_RADIOBROWSER_GENRE = 'genre'
PATH_RADIOBROWSER_POPULAR = 'popular'
station_tracking = False
my_stations_enabled = False
app = Flask(__name__)
def run(config, address='0.0.0.0', port=8010):
try:
check_my_stations_feature(config)
generic.set_stations_file(config)
app.run(host=address, port=port)
except PermissionError:
logging.error("No permission to create socket. Are you trying to use ports below 1024 without elevated rights?")
def check_my_stations_feature(config):
global my_stations_enabled
my_stations_enabled = my_stations.set_config(config)
def get_directories_page(subdir, directories, request):
def get_directories_page(subdir, directories, request_obj):
page = vtuner.Page()
if len(directories) == 0:
page.add(vtuner.Display("No entries found"))
page.add_item(vtuner.Display("No entries found"))
page.set_count(1)
return page
for directory in get_paged_elements(directories, request.args):
for directory in get_paged_elements(directories, request_obj.args):
vtuner_directory = vtuner.Directory(directory.displayname,
url_for(subdir, _external=True, directory=directory.name),
directory.item_count)
page.add(vtuner_directory)
page.add_item(vtuner_directory)
page.set_count(len(directories))
return page
def get_stations_page(stations, request):
def get_stations_page(stations, request_obj):
page = vtuner.Page()
page.add_item(vtuner.Previous(url_for('landing', _external=True)))
if len(stations) == 0:
page.add(vtuner.Display("No stations found"))
page.add_item(vtuner.Display("No stations found"))
page.set_count(1)
return page
for station in get_paged_elements(stations, request.args):
for station in get_paged_elements(stations, request_obj.args):
vtuner_station = station.to_vtuner()
if station_tracking:
vtuner_station.set_trackurl(request.host_url + PATH_ROOT + '/' + PATH_PLAY + '?id=' + vtuner_station.uid)
vtuner_station.icon = request.host_url + PATH_ROOT + '/' + PATH_ICON + '?id=' + vtuner_station.uid
page.add(vtuner_station)
vtuner_station.set_trackurl(
request_obj.host_url + PATH_ROOT + '/' + PATH_PLAY + '?id=' + vtuner_station.uid)
vtuner_station.icon = request_obj.host_url + PATH_ROOT + '/' + PATH_ICON + '?id=' + vtuner_station.uid
page.add_item(vtuner_station)
page.set_count(len(stations))
return page
@ -102,9 +101,9 @@ def get_paged_elements(items, requestargs):
def get_station_by_id(stationid, additional_info=False):
station_id_prefix = generic.get_stationid_prefix(stationid)
if station_id_prefix == my_stations.ID_PREFIX:
return my_stations.get_station_by_id(generic.get_stationid_without_prefix(stationid))
return my_stations.get_station_by_id(stationid)
elif station_id_prefix == radiobrowser.ID_PREFIX:
station = radiobrowser.get_station_by_id(generic.get_stationid_without_prefix(stationid))
station = radiobrowser.get_station_by_id(stationid)
if additional_info:
station.get_playable_url()
return station
@ -112,7 +111,7 @@ def get_station_by_id(stationid, additional_info=False):
def vtuner_redirect(url):
if request and request.host and not re.search("^[A-Za-z0-9]+\.vtuner\.com$", request.host):
if request and request.host and not re.search(r"^[A-Za-z0-9]+\.vtuner\.com$", request.host):
logging.warning("You are not accessing a YCast redirect with a whitelisted host url (*.vtuner.com). "
"Some AVRs have problems with this. The requested host was: %s", request.host)
return redirect(url, code=302)
@ -121,6 +120,7 @@ def vtuner_redirect(url):
@app.route('/setupapp/<path:path>',
methods=['GET', 'POST'])
def upstream(path):
logging.debug('upstream **********************')
if request.args.get('token') == '0':
return vtuner.get_init_token()
if request.args.get('search'):
@ -136,28 +136,139 @@ def upstream(path):
logging.error("Unhandled upstream query (/setupapp/%s)", path)
abort(404)
@app.route('/control/filter/<path:item>',
methods=['POST','GET'])
def set_filters(item):
update_limits=False
# POST updates the whitelist or blacklist, GET just returns the current attributes/values.
myfilter={}
if item.endswith('blacklist'):
myfilter=my_filter.black_list
if item.endswith('whitelist'):
myfilter=my_filter.white_list
if item.endswith('limits'):
myfilter=my_filter.get_limit_list()
update_limits=True
if request.method == 'POST':
content_type = request.headers.get('Content-Type')
if (content_type == 'application/json'):
json = request.json
else:
return abort(400,'Content-Type not supported!: ' + item)
if update_limits:
myfilter=my_filter.set_limits(json)
else:
for j in json:
# Attribute with null value removes item from the list otherwise add the attribute or update the value
if json[j] == None:
myfilter.pop(j, None)
else:
myfilter[j]=json[j]
my_filter.write_filter_config()
json=flask.jsonify(myfilter)
return json
@app.route('/api/<path:path>',
methods=['GET', 'POST'])
def landing_api(path):
if request.method == 'GET':
if path.endswith('stations'):
category = request.args.get('category')
stations = None
if category.endswith('recently'):
stations = my_recentlystation.get_stations_by_recently()
if category.endswith('voted'):
stations = radiobrowser.get_stations_by_votes()
if category.endswith('language'):
language = request.args.get('language','german')
stations = radiobrowser.get_stations_by_language(language)
if category.endswith('country'):
country = request.args.get('country','Germany')
stations = radiobrowser.get_stations_by_country(country)
if stations is not None:
stations_dict = []
for station in stations:
stations_dict.append(station.to_dict())
return flask.jsonify(stations_dict)
if path.endswith('bookmarks'):
category = request.args.get('category')
stations = my_stations.get_all_bookmarks_stations()
if stations is not None:
stations_dict = []
for station in stations:
stations_dict.append(station.to_dict())
return flask.jsonify(stations_dict)
if path.endswith('paramlist'):
category = request.args.get('category')
directories = None
if category.endswith('language'):
directories = radiobrowser.get_language_directories();
if category.endswith('country'):
directories = radiobrowser.get_country_directories();
if directories is not None:
directories_dict = []
for directory in directories:
directories_dict.append(directory.to_dict())
return flask.jsonify(directories_dict)
if request.method == 'POST':
content_type = request.headers.get('Content-Type')
if (content_type == 'application/json'):
json = request.json
return flask.jsonify(my_stations.putBookmarkJson(json))
else:
return abort(400,'Content-Type not supported!: ' + path)
return abort(400,'Not implemented: ' + path)
@app.route('/',
defaults={'path': ''},
methods=['GET', 'POST'])
def landing_root(path=''):
return render_template("index.html")
@app.route('/' + PATH_ROOT + '/',
defaults={'path': ''},
methods=['GET', 'POST'])
def landing(path=''):
logging.debug('===============================================================')
page = vtuner.Page()
page.add(vtuner.Directory('Radiobrowser', url_for('radiobrowser_landing', _external=True), 4))
if my_stations_enabled:
page.add(vtuner.Directory('My Stations', url_for('my_stations_landing', _external=True),
len(my_stations.get_category_directories())))
page.add_item(vtuner.Directory('Radiobrowser', url_for('radiobrowser_landing', _external=True), 4))
page.add_item(vtuner.Directory('My Stations', url_for('my_stations_landing', _external=True),
len(my_stations.get_category_directories())))
stations = my_recentlystation.get_stations_by_vote()
if stations and len(stations) > 0:
# make blank line (display is not shown)
page.add_item(vtuner.Spacer())
for station in stations:
vtuner_station = station.to_vtuner()
if station_tracking:
vtuner_station.set_trackurl(
request.host_url + PATH_ROOT + '/' + PATH_PLAY + '?id=' + vtuner_station.uid)
vtuner_station.icon = request.host_url + PATH_ROOT + '/' + PATH_ICON + '?id=' + vtuner_station.uid
page.add_item(vtuner_station)
else:
page.add(vtuner.Display("'My Stations' feature not configured."))
page.set_count(1)
page.add_item(vtuner.Display("'My Stations' feature not configured."))
page.set_count(-1)
return page.to_string()
@app.route('/' + PATH_ROOT + '/' + PATH_MY_STATIONS + '/',
methods=['GET', 'POST'])
def my_stations_landing():
logging.debug('===============================================================')
directories = my_stations.get_category_directories()
return get_directories_page('my_stations_category', directories, request).to_string()
@ -165,6 +276,7 @@ def my_stations_landing():
@app.route('/' + PATH_ROOT + '/' + PATH_MY_STATIONS + '/<directory>',
methods=['GET', 'POST'])
def my_stations_category(directory):
logging.debug('===============================================================')
stations = my_stations.get_stations_by_category(directory)
return get_stations_page(stations, request).to_string()
@ -172,15 +284,16 @@ def my_stations_category(directory):
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/',
methods=['GET', 'POST'])
def radiobrowser_landing():
logging.debug('===============================================================')
page = vtuner.Page()
page.add(vtuner.Directory('Genres', url_for('radiobrowser_genres', _external=True),
len(radiobrowser.get_genre_directories())))
page.add(vtuner.Directory('Countries', url_for('radiobrowser_countries', _external=True),
len(radiobrowser.get_country_directories())))
page.add(vtuner.Directory('Languages', url_for('radiobrowser_languages', _external=True),
len(radiobrowser.get_language_directories())))
page.add(vtuner.Directory('Most Popular', url_for('radiobrowser_popular', _external=True),
len(radiobrowser.get_stations_by_votes())))
page.add_item(vtuner.Directory('Genres', url_for('radiobrowser_genres', _external=True),
len(radiobrowser.get_genre_directories())))
page.add_item(vtuner.Directory('Countries', url_for('radiobrowser_countries', _external=True),
len(radiobrowser.get_country_directories())))
page.add_item(vtuner.Directory('Languages', url_for('radiobrowser_languages', _external=True),
len(radiobrowser.get_language_directories())))
page.add_item(vtuner.Directory('Most Popular', url_for('radiobrowser_popular', _external=True),
len(radiobrowser.get_stations_by_votes())))
page.set_count(4)
return page.to_string()
@ -188,6 +301,7 @@ def radiobrowser_landing():
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_COUNTRY + '/',
methods=['GET', 'POST'])
def radiobrowser_countries():
logging.debug('===============================================================')
directories = radiobrowser.get_country_directories()
return get_directories_page('radiobrowser_country_stations', directories, request).to_string()
@ -195,6 +309,7 @@ def radiobrowser_countries():
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_COUNTRY + '/<directory>',
methods=['GET', 'POST'])
def radiobrowser_country_stations(directory):
logging.debug('===============================================================')
stations = radiobrowser.get_stations_by_country(directory)
return get_stations_page(stations, request).to_string()
@ -202,6 +317,7 @@ def radiobrowser_country_stations(directory):
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_LANGUAGE + '/',
methods=['GET', 'POST'])
def radiobrowser_languages():
logging.debug('===============================================================')
directories = radiobrowser.get_language_directories()
return get_directories_page('radiobrowser_language_stations', directories, request).to_string()
@ -209,6 +325,7 @@ def radiobrowser_languages():
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_LANGUAGE + '/<directory>',
methods=['GET', 'POST'])
def radiobrowser_language_stations(directory):
logging.debug('===============================================================')
stations = radiobrowser.get_stations_by_language(directory)
return get_stations_page(stations, request).to_string()
@ -216,6 +333,7 @@ def radiobrowser_language_stations(directory):
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_GENRE + '/',
methods=['GET', 'POST'])
def radiobrowser_genres():
logging.debug('===============================================================')
directories = radiobrowser.get_genre_directories()
return get_directories_page('radiobrowser_genre_stations', directories, request).to_string()
@ -223,6 +341,7 @@ def radiobrowser_genres():
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_GENRE + '/<directory>',
methods=['GET', 'POST'])
def radiobrowser_genre_stations(directory):
logging.debug('===============================================================')
stations = radiobrowser.get_stations_by_genre(directory)
return get_stations_page(stations, request).to_string()
@ -230,6 +349,7 @@ def radiobrowser_genre_stations(directory):
@app.route('/' + PATH_ROOT + '/' + PATH_RADIOBROWSER + '/' + PATH_RADIOBROWSER_POPULAR + '/',
methods=['GET', 'POST'])
def radiobrowser_popular():
logging.debug('===============================================================')
stations = radiobrowser.get_stations_by_votes()
return get_stations_page(stations, request).to_string()
@ -237,10 +357,11 @@ def radiobrowser_popular():
@app.route('/' + PATH_ROOT + '/' + PATH_SEARCH + '/',
methods=['GET', 'POST'])
def station_search():
logging.debug('===============================================================')
query = request.args.get('search')
if not query or len(query) < 3:
page = vtuner.Page()
page.add(vtuner.Display("Search query too short"))
page.add_item(vtuner.Display("Search query too short"))
page.set_count(1)
return page.to_string()
else:
@ -252,6 +373,7 @@ def station_search():
@app.route('/' + PATH_ROOT + '/' + PATH_PLAY,
methods=['GET', 'POST'])
def get_stream_url():
logging.debug('===============================================================')
stationid = request.args.get('id')
if not stationid:
logging.error("Stream URL without station ID requested")
@ -267,6 +389,7 @@ def get_stream_url():
@app.route('/' + PATH_ROOT + '/' + PATH_STATION,
methods=['GET', 'POST'])
def get_station_info():
logging.debug('===============================================================')
stationid = request.args.get('id')
if not stationid:
logging.error("Station info without station ID requested")
@ -275,7 +398,7 @@ def get_station_info():
if not station:
logging.error("Could not get station with id '%s'", stationid)
page = vtuner.Page()
page.add(vtuner.Display("Station not found"))
page.add_item(vtuner.Display("Station not found"))
page.set_count(1)
return page.to_string()
vtuner_station = station.to_vtuner()
@ -283,7 +406,7 @@ def get_station_info():
vtuner_station.set_trackurl(request.host_url + PATH_ROOT + '/' + PATH_PLAY + '?id=' + vtuner_station.uid)
vtuner_station.icon = request.host_url + PATH_ROOT + '/' + PATH_ICON + '?id=' + vtuner_station.uid
page = vtuner.Page()
page.add(vtuner_station)
page.add_item(vtuner_station)
page.set_count(1)
return page.to_string()
@ -291,6 +414,7 @@ def get_station_info():
@app.route('/' + PATH_ROOT + '/' + PATH_ICON,
methods=['GET', 'POST'])
def get_station_icon():
logging.debug('**********************: %s', request.url)
stationid = request.args.get('id')
if not stationid:
logging.error("Station icon without station ID requested")
@ -299,12 +423,13 @@ def get_station_icon():
if not station:
logging.error("Could not get station with id '%s'", stationid)
abort(404)
signal_station_selected(station.name, station.url, station.icon)
if not hasattr(station, 'icon') or not station.icon:
logging.warning("No icon information found for station with id '%s'", stationid)
abort(404)
station_icon = station_icons.get_icon(station)
if not station_icon:
logging.error("Could not get station icon for station with id '%s'", stationid)
logging.warning("Could not get station icon for station with id '%s'", stationid)
abort(404)
response = make_response(station_icon)
response.headers.set('Content-Type', 'image/jpeg')

379
ycast/static/script.js Normal file
View file

@ -0,0 +1,379 @@
window.onload = function () {
document.getElementById('idRequestSrc').value = 'recently';
document.getElementById('idLanOrCountSelect').disabled = true;
var requestSrc = document.getElementById('idRequestSrc').value;
var param = document.getElementById('idLanOrCountSelect').value;
requestStationList(requestSrc, param, false);
requestStationList('', '', true);
setBookmarkcategoryList();
}
function searchTree(element, id) {
if (element.id === id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = searchTree(element.children[i], id);
}
return result;
}
return null;
}
function createItem(name, icon, description) {
var itemElem = document.createElement("div");
itemElem.className = "item";
var itemicon = document.createElement("div");
itemicon.className = "itemicon";
if (icon && icon.length > 0) {
var itemiconimg = document.createElement("img");
itemiconimg.src = icon;
itemiconimg.className = "itemicon";
itemicon.appendChild(itemiconimg);
}
var itemtext = document.createElement("div");
itemtext.className = "itemtext";
var h4text = document.createElement("h4");
h4text.textContent = name;
h4text.id = 'name'
var desc = document.createElement("p");
desc.textContent = description;
desc.id = 'description';
itemtext.appendChild(h4text);
itemtext.appendChild(desc);
itemElem.appendChild(itemicon);
itemElem.appendChild(itemtext);
return itemElem;
}
function requestStationList(category, param, isbookmarklist) {
var url = 'api/stations?category=' + category;
var id_listnode = "stationList";
var countall = 0;
if (isbookmarklist) {
var url = 'api/bookmarks?category=' + category;
var id_listnode = "bookmarkList";
}
if (param.length > 0) {
if (category.indexOf('language') > -1) {
url = url + '&language=' + param.toLowerCase();
}
if (category.indexOf('country') > -1) {
url = url + '&country=' + param;
}
}
var myRequest = new Request(url);
var myOldList = document.getElementById(id_listnode);
var myList = myOldList.cloneNode(false);
// First Elemet is empty (workaround <ul> needs a <li> element)
let listItemEmpty = document.createElement('li');
listItemEmpty.appendChild(createItem('<<< loading >>>', '', ''));
listItemEmpty.dataset.isemptyelement = 'true';
listItemEmpty.hidden = false;
myList.appendChild(listItemEmpty);
myOldList.parentNode.replaceChild(myList, myOldList);
fetch(myRequest)
.then(response => response.json())
.then(data => {
for (const station of data) {
countall = countall + 1;
let listItem = document.createElement('li');
listItem.appendChild(
createItem(station.name, station.icon, station.description)
);
listItem.dataset.json = JSON.stringify(station);
if (isbookmarklist) {
listItem.dataset.search = (station.description);
listItem.onclick = function (event) {
deleteElement(event, listItem)
};
} else {
listItem.dataset.search = (station.name + '#' + station.description).toUpperCase();
listItem.onclick = function (event) {
copyElementToBookmark(event, listItem)
};
}
listItem.dataset.category = station.description;
listItem.dataset.isemptyelement = 'false';
myList.appendChild(listItem);
if (listItemEmpty) listItemEmpty.hidden = true;
}
if (listItemEmpty) {
textElem = searchTree(listItemEmpty, 'name');
if (textElem) textElem.textContent = '';
}
if (isbookmarklist) {
setBookmarkcategoryList();
} else {
document.getElementById('stationcount').textContent = countall + '/' + countall;
}
})
.catch(console.error);
}
function deleteElement(event, objElem) {
if (objElem) {
objElem.remove();
refreshFilteredList(document.getElementById("bookmarkList"), document.getElementById('idcategory').value, true);
setBookmarkcategoryList();
saveBookmarks();
}
}
function copyElementToBookmark(event, objElem) {
if (objElem) {
let myList = document.getElementById("bookmarkList")
let listItem = document.createElement('li');
let station = JSON.parse(objElem.dataset.json);
let categoryElem = document.getElementById('idcategory');
if (categoryElem.value.length == 0) categoryElem.value = 'Others'
station.description = categoryElem.value;
listItem.appendChild(
createItem(station.name, station.icon, station.description)
);
listItem.dataset.json = JSON.stringify(station);
listItem.dataset.search = station.description;
listItem.dataset.category = station.description;
listItem.dataset.isemptyelement = 'false';
listItem.onclick = function (event) {
deleteElement(event, listItem)
};
myList.appendChild(listItem);
refreshFilteredList(document.getElementById("bookmarkList"), document.getElementById('idcategory').value, true);
setBookmarkcategoryList();
saveBookmarks();
}
}
function refreshFilteredList(myListNode, filtertxt, chkEqual) {
var isEmpty = true;
var myListAr = Array.from(myListNode.childNodes);
var emptyElement = null;
var countall = 0;
var countfiltered = 0;
myListAr.forEach(function (listItem) {
try {
if (listItem.dataset.isemptyelement === 'true') {
emptyElement = listItem;
} else {
countall = countall + 1;
var bfound = true;
if (filtertxt.length > 0) {
var searchval = listItem.dataset.search;
if (chkEqual) {
bfound = (searchval === filtertxt);
} else {
bfound = (searchval.indexOf(filtertxt) > -1);
}
}
if (bfound) {
countfiltered = countfiltered + 1;
listItem.hidden = false;
isEmpty = false;
} else {
listItem.hidden = true;
}
}
} catch (e) {
console.error(listItem, e)
}
});
if (emptyElement) emptyElement.hidden = !isEmpty;
return countfiltered + '/' + countall;
}
function checkOptionElement(optionsElementList, value) {
var optionList = Array.from(optionsElementList.childNodes);
return optionList.find(function (optionElem) {
return (optionElem.value === value)
})
}
function onInputSelect(e, objElem) {
switch (objElem.id) {
case 'idRequestSrc':
paramElem = document.getElementById('idLanOrCountSelect')
param = paramElem.value
category = objElem.value
switch (category) {
case 'language':
setParamlist();
try {
paramElem.fokus();
} catch (e) {};
return;
case 'country':
setParamlist();
try {
paramElem.fokus();
} catch (e) {};
return;
default:
paramElem.disabled = true;
break;
}
document.getElementById('stationcount').textContent = requestStationList(category, param);
break;
case 'idLanOrCountSelect':
if (checkOptionElement(document.getElementById('listLangOrCountry'), document.getElementById('idLanOrCountSelect').value)) {
document.getElementById('stationcount').textContent = requestStationList(
document.getElementById('idRequestSrc').value,
document.getElementById('idLanOrCountSelect').value);
}
break;
case 'idcategory':
refreshFilteredList(document.getElementById("bookmarkList"), document.getElementById('idcategory').value, true);
break;
case 'stationsearch':
document.getElementById('stationcount').textContent =
refreshFilteredList(document.getElementById('stationList'),
document.getElementById('stationsearch').value.toUpperCase(), false);
break;
}
}
function setBookmarkcategoryList() {
var categoryList = [];
var bookmarkList = Array.from(document.getElementById("bookmarkList").childNodes);
bookmarkList.forEach(function (listItem) {
try {
if (listItem.dataset.isemptyelement === 'false') {
var category = listItem.dataset.category;
if (!categoryList.find(function (arElem) {
return (category === arElem);
})) {
categoryList.push(category);
}
}
} catch (e) {
console.error(listItem, e)
}
})
if (categoryList.length > 0) {
var myOldList = document.getElementById('categoryList');
var myList = myOldList.cloneNode(false);
myOldList.parentNode.replaceChild(myList, myOldList);
for (const category of categoryList) {
var option = document.createElement('option');
option.value = category;
myList.appendChild(option);
}
}
}
function setParamlist() {
var category = document.getElementById('idRequestSrc').value
var url = 'api/paramlist?category=' + category;
document.getElementById('idLanOrCountSelect').value = '';
var myRequest = new Request(url);
var myOldList = document.getElementById('listLangOrCountry');
var myList = myOldList.cloneNode(false);
myOldList.parentNode.replaceChild(myList, myOldList);
fetch(myRequest)
.then(response => response.json())
.then(data => {
for (const param of data) {
var option = document.createElement('option');
option.value = param.name;
myList.appendChild(option);
}
})
.catch(console.error);
document.getElementById('idLanOrCountSelect').disabled = false;
}
function keyUpEvent(e, objElem) {
if (e instanceof KeyboardEvent) {
if (e.code === 'Backspace') {
objElem.value = '';
switch (objElem.id) {
case 'stationsearch':
document.getElementById('stationcount').textContent =
refreshFilteredList(document.getElementById('stationList'),
document.getElementById('stationsearch').value.toUpperCase(), false);
break;
case 'idcategory':
refreshFilteredList(document.getElementById("bookmarkList"), document.getElementById('idcategory').value, true);
break;
default:
break;
}
return;
}
}
switch (objElem.id) {
case 'idLanOrCountSelect':
param = objElem.value;
category = document.getElementById('idRequestSrc').value;
if (e instanceof KeyboardEvent) {
// it is a keyboard event!
if (e.code == 'Enter') {
if (checkOptionElement(document.getElementById('listLangOrCountry'), param)) {
document.getElementById('stationcount').textContent = requestStationList(category, param);
}
}
} else if (e instanceof Event) {
// one Element from selection is selected
document.getElementById('stationcount').textContent = requestStationList(category, param);
}
break;
case 'stationsearch':
document.getElementById('stationcount').textContent =
refreshFilteredList(document.getElementById('stationList'),
document.getElementById('stationsearch').value.toUpperCase(), false);
break;
case 'idcategory':
refreshFilteredList(document.getElementById("bookmarkList"), document.getElementById('idcategory').value, true);
break;
default:
break;
}
}
function saveBookmarks() {
var bookmarkJsonlist = []
var bookmarkList = Array.from(document.getElementById("bookmarkList").childNodes);
bookmarkList.forEach(function (listItem) {
if (listItem.dataset.isemptyelement === 'false') {
station = JSON.parse(listItem.dataset.json)
bookmarkJsonlist.push(station)
}
})
var data = JSON.stringify(bookmarkJsonlist)
var xhr = new XMLHttpRequest();
xhr.open("POST", 'api/bookmarks', true);
xhr.setRequestHeader("Content-Type", "application/json");
xhr.onreadystatechange = function () {
if (xhr.readyState === 4 && xhr.status === 200) {
var json = JSON.parse(xhr.responseText);
console.log(json);
}
};
xhr.send(data);
}

160
ycast/static/style.css Normal file
View file

@ -0,0 +1,160 @@
*:focus{
outline: 4px solid red;
}
input: {
margin: 0.4rem;
}
li:hover, input:hover {
background-color: dodgerblue;
}
body {
font-family: sans-serif;
}
.page {
display: block;
padding-left: calc(100% / 2 - 12.5rem);
}
.header {
position: relative;
block-sizing: borderbox;
height=30rem;
max-width: 20.9rem;
min-width: 20.9rem;
margin: 0.3rem;
background-color: blue;
color: white;
text-align: center;
padding: 0.5rem;
}
.content {
position: relative;
display: inline-block;
box-sizing: border-box;
margin: 0.2rem;
max-height: 60rem;
max-width: 22rem;
min-width: 22rem;
}
@media (min-width: 52rem) {
.header {
max-width: 43.5rem;
min-width: 43.5rem;
}
.page {
padding-left: calc(100% / 2 - 23rem);
}
}
.contentheader {
box-sizing: border-box;
display: inline-table;
position: relative;
background-color: blue;
color: white;
align-items: center;
margin: 0rem;
padding: 0.2rem;
border: 2px #eee solid;
height: 5rem;
width: 100%;
min-width: 20rem;
}
.contentitems {
box-sizing: border-box;
position: relative;
display: flex;
overflow-y: auto;
overflow-x: hidden;
max-height: 30rem;
min-width: 20rem;
border: 2px #eee solid;
}
.miniinfo {
display: block;
position: absolute;
font-size: 8px;
top: 15px;
}
.item {
position: relative;
display: inline-flex;
border: 2px #eee solid;
margin: 0.1rem;
height: 2.6rem;
width: 100%;
align-items: center;
}
.itemicon {
display: flex;
box-sizing: border-box;
margin: 1px;
height: 2.4rem;
width: 2.4rem;
}
.itemtext {
display: flex white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
ul {
padding: 0.2rem;
margin: 0rem;
height: 100%;
width: 100%;
}
li {
width: 95%;
padding: 0rem;
margin: 0rem;
}
h2 {
padding: 0rem;
margin: 0.5rem;
}
h3 {
text-align: center;
padding: 10px;
margin: 0rem;
padding: 0.5rem;
}
h4 {
text-align: left;
padding-left: 1rem;
margin: auto;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
p {
padding-left: 1.1rem;
margin: auto;
font-size: 0.8rem;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
label {
padding-left: 1.1rem;
}

View file

@ -16,17 +16,20 @@ def get_icon(station):
cache_path = generic.get_cache_path(CACHE_NAME)
if not cache_path:
return None
station_icon_file = cache_path + '/' + station.id
# make icon filename from favicon-adress
station_icon_file = cache_path + '/' + generic.get_checksum(station.icon) + '.jpg'
if not os.path.exists(station_icon_file):
logging.debug("Station icon cache miss. Fetching and converting station icon for station id '%s'", station.id)
headers = {'User-Agent': generic.USER_AGENT + '/' + __version__}
try:
response = requests.get(station.icon, headers=headers)
except requests.exceptions.ConnectionError as err:
logging.error("Connection to station icon URL failed (%s)", err)
logging.debug("Connection to station icon URL failed (%s)", err)
return None
if response.status_code != 200:
logging.error("Could not get station icon data from %s (HTML status %s)", station.icon, response.status_code)
logging.debug("Could not get station icon data from %s (HTML status %s)",
station.icon, response.status_code)
return None
try:
image = Image.open(io.BytesIO(response.content))

View file

@ -0,0 +1,64 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=no">
<link rel="stylesheet" href="/static/style.css">
<script type="text/javascript" src="/static/script.js"></script>
<title>YCast</title>
</head>
<body>
<div class="page">
<div class="header">
<h2>YCast advanced</h2>
<div>
<label for="idRequestSrc">station-source:</label>
<select tabindex="1" id="idRequestSrc" name="category" oninput="onInputSelect(event, this)">
<option value="recently">recently</option>
<option value="voted">voted</option>
<option value="language">language</option>
<option value="country">country</option>
</select>
</div>
<div>
<label for="idLanOrCountSelect">language or country:</label>
<input type="search" tabindex="2" list="listLangOrCountry" id="idLanOrCountSelect" autocomplete=off onclick="this.value=''" onfocus="this.value=''" onkeyup="keyUpEvent(event,this)" oninput="onInputSelect(event, this)">
<datalist id="listLangOrCountry">
</datalist>
</div>
</div>
<div class="content">
<div class="contentheader">
<label class="miniinfo" id="stationcount">0/0</label>
<h3>Stations</h3>
<label for="stationsearch">search:</label>
<input type="search" tabindex="3" type="search" id="stationsearch" onkeyup="keyUpEvent(event,this)" oninput="onInputSelect(event, this)">
</div>
<div class="contentitems" title="Clicking on this item copies it as a bookmark" tabindex="4">
<ul id="stationList">
</ul>
</div>
</div>
<div class="content">
<div class="contentheader">
<h3>Bookmarks</h3>
<label for="idcategory">category:</label>
<input type="search" tabindex="10000" list="categoryList" id="idcategory" title="filter existing or a new category, if empty all bookmarks are shown." onkeyup="keyUpEvent(event,this)" oninput="onInputSelect(event, this)">
<datalist id="categoryList">
<option value="Actual">
<option value="Pop">
<option value="Classics">
</datalist>
</div>
<div class="contentitems" title="Clicking on this item will delete it" tabindex="5">
<ul id="bookmarkList">
</ul>
</div>
</div>
</div>
</body>
</html>

183
ycast/test_YCast.py Normal file
View file

@ -0,0 +1,183 @@
import json
import logging
import os
import unittest
from io import StringIO
import flask
from ycast import my_filter, generic, radiobrowser, my_recentlystation
class MyTestCase(unittest.TestCase):
logging.getLogger().setLevel(logging.DEBUG)
generic.init_base_dir("/.test_ycast")
my_filter.init_filter_file()
def test_verify_values(self):
assert my_filter.verify_value(None, None)
assert my_filter.verify_value('', None)
assert my_filter.verify_value('', '')
assert my_filter.verify_value(None, '')
assert my_filter.verify_value(3, 3)
assert my_filter.verify_value('3', 3)
assert my_filter.verify_value('3', '3')
assert my_filter.verify_value('3', '3,4,5')
assert my_filter.verify_value(['3','5'], '3')
assert my_filter.verify_value(['3','5'], '3,6')
assert my_filter.verify_value([3,4,5,6], 5)
assert not my_filter.verify_value('', '3')
assert not my_filter.verify_value(3, 4)
assert not my_filter.verify_value('3', 4)
assert not my_filter.verify_value('4', '3')
assert not my_filter.verify_value(['3,4,5,6'], '9')
assert not my_filter.verify_value(['3,4,5,6'], '9,8')
def test_init_filter(self):
my_filter.begin_filter()
filter_dictionary={ "whitelist" : my_filter.white_list, "blacklist" : my_filter.black_list}
for elem in filter_dictionary:
logging.warning("Name filtertype: %s", elem)
filter_param = filter_dictionary[elem]
if filter_param:
for par in filter_param:
logging.warning(" Name paramter: %s", par)
else:
logging.warning(" <empty list>")
def test_station_search(self):
# hard test for filter
my_filter.white_list={}
my_filter.black_list={}
stations = radiobrowser.search('Pinguin Pop')
logging.info("Stations found (%d)", len(stations))
assert len(stations) == 1
my_filter.white_list={}
my_filter.black_list={ "countrycode": 'NL'}
stations = radiobrowser.search('Pinguin Pop')
logging.info("Stations found (%d)", len(stations))
assert len(stations) == 0
def test_station_by_country(self):
my_filter.white_list={ "codec" : "OGG" }
my_filter.black_list={ }
stations = radiobrowser.get_stations_by_country('Germany')
logging.info("Stations (%d)", len(stations))
# Currently yields 40 but is not fixed of course
assert len(stations) > 20 and len(stations) < 70
def test_station_by_language(self):
my_filter.white_list={ "codec" : "AAC"}
my_filter.black_list={"countrycode": "NL"}
stations = radiobrowser.get_stations_by_language('dutch')
logging.info("Stations (%d)", len(stations))
# With this filter there is only 1 (atm).
assert len(stations) == 1
def test_station_by_genre(self):
my_filter.white_list={"bitrate" : 320}
my_filter.black_list={}
stations = radiobrowser.get_stations_by_genre('rock')
logging.info("Stations (%d)", len(stations))
# Currently yields 86 but is not fixed of course
assert len(stations) > 50 and len(stations) < 100
def test_station_by_votes(self):
my_filter.white_list={}
my_filter.black_list={}
stations = radiobrowser.get_stations_by_votes()
logging.info("Stations (%d)", len(stations))
assert len(stations) == my_filter.get_limit('DEFAULT_STATION_LIMIT')
#stations = radiobrowser.get_stations_by_votes(10000)
#logging.info("Stations (%d)", len(stations))
#assert len(stations) == 10000
def test_get_languages(self):
my_filter.white_list={ 'languagecodes' : ['en','no'] }
my_filter.black_list={}
result = radiobrowser.get_language_directories()
logging.info("Languages (%d)", len(result))
assert len(result) == 2
def test_get_countries(self):
# Test for Germany only 1, nach der Wiedervereinigung...
my_filter.white_list={ 'country' : 'Germany' }
my_filter.black_list={}
result = radiobrowser.get_country_directories()
logging.info("Countries (%d)", len(result))
assert len(result) == 1
def test_get_genre(self):
my_filter.white_list={ 'tags' : ['rock','pop'] }
my_filter.black_list={}
result = radiobrowser.get_genre_directories()
logging.info("Genres (%d)", len(result))
# Not a useful test, changes all the time
#assert len(result) < 300
def test_get_limits(self):
result = my_filter.get_limit('MINIMUM_COUNT_COUNTRY')
assert result == 5
result = my_filter.get_limit('SHOW_BROKEN_STATIONS')
assert result == False
def test_recently_hit(self):
try:
os.remove(my_recentlystation.get_recently_file())
except Exception:
pass
result = my_recentlystation.get_stations_by_vote()
assert len(result) == 0
result = my_recentlystation.get_recently_stations_dictionary()
assert result is None
i = 0
while i < 10:
my_recentlystation.signal_station_selected('NAME ' + str(i), 'http://dummy/' + str(i),
'http://icon' + str(i))
i = i+1
result = my_recentlystation.get_recently_stations_dictionary()
assert my_recentlystation.directory_name()
assert result[my_recentlystation.directory_name()]
my_recentlystation.signal_station_selected('Konverenz: Sport', 'http://dummy/' + str(i),
'http://icon' + str(i))
my_recentlystation.signal_station_selected('Konverenz: Sport', 'http://dummy/' + str(i),
'http://icon' + str(i))
my_recentlystation.signal_station_selected('Konverenz: Sport', 'http://dummy/' + str(i),
'http://icon' + str(i))
i = 6
while i < 17:
my_recentlystation.signal_station_selected('NAME ' + str(i), 'http://dummy/' + str(i),
'http://icon' + str(i))
i = i+1
result = my_recentlystation.get_recently_stations_dictionary()
assert result[my_recentlystation.directory_name()]
result = my_recentlystation.get_stations_by_vote()
assert len(result) == 5
j = 0
while j < 6:
i = 6
while i < 9:
my_recentlystation.signal_station_selected('NAME ' + str(i), 'http://dummy/' + str(i),
'http://icon' + str(i))
i = i+1
j = j+1
result = my_recentlystation.get_stations_by_vote()
assert len(result) == 5
if __name__ == '__main__':
unittest.main()

View file

@ -29,7 +29,7 @@ class Page:
self.count = -1
self.dontcache = False
def add(self, item):
def add_item(self, item):
self.items.append(item)
def set_count(self, count):
@ -71,6 +71,14 @@ class Display:
return item
class Spacer:
def to_xml(self):
item = ET.Element('Item')
ET.SubElement(item, 'ItemType').text = 'Spacer'
return item
class Search:
def __init__(self, caption, url):
self.caption = caption