Source code for sima.lib.simadb

# Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
#
#  This file is part of sima
#
#  sima is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  sima is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with sima.  If not, see <http://www.gnu.org/licenses/>.
#
#
"""SQlite database library
"""

#: DB Version
__DB_VERSION__ = 4
#: Default history duration for both request and purge in hours
__HIST_DURATION__ = int(30 * 24)

import sqlite3

from collections import deque
from datetime import (datetime, timedelta)
from datetime import timezone


from sima.lib.meta import Artist, Album
from sima.lib.track import Track
from sima.utils.utils import MPDSimaException


[docs]class SimaDBError(MPDSimaException): """ Exceptions. """
[docs]class SimaDB: "SQLite management" def __init__(self, db_path=None): self._db_path = db_path
[docs] def get_database_connection(self): """get database reference""" connection = sqlite3.connect(self._db_path, isolation_level=None) return connection
[docs] def get_info(self): connection = self.get_database_connection() info = connection.execute("""SELECT * FROM db_info WHERE name = "DB Version" LIMIT 1;""").fetchone() connection.close() return info
[docs] def create_db(self): """ Set up a database """ connection = self.get_database_connection() connection.execute( 'CREATE TABLE IF NOT EXISTS db_info' ' (name CHAR(50), value CHAR(50))') connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ? WHERE NOT EXISTS ( SELECT 1 FROM db_info WHERE name = ? )''', ('DB Version', __DB_VERSION__, 'DB Version')) connection.execute( # ARTISTS 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, ' 'name VARCHAR(100), mbid CHAR(36))') connection.execute( # ALBUMS 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, ' 'name VARCHAR(100), mbid CHAR(36))') connection.execute( # ALBUMARTISTS 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, ' 'name VARCHAR(100), mbid CHAR(36))') connection.execute( # TRACKS 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, ' 'title VARCHAR(100), artist INTEGER, ' 'album INTEGER, albumartist INTEGER, ' 'file VARCHAR(500), mbid CHAR(36), ' 'FOREIGN KEY(artist) REFERENCES artists(id), ' 'FOREIGN KEY(album) REFERENCES albums(id), ' 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))') connection.execute( # HISTORY 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, ' 'last_play TIMESTAMP, track INTEGER, ' 'FOREIGN KEY(track) REFERENCES tracks(id))') connection.execute( # BLOCKLIST 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, ' 'artist INTEGER, album INTEGER, track INTEGER, ' 'FOREIGN KEY(artist) REFERENCES artists(id), ' 'FOREIGN KEY(album) REFERENCES albums(id), ' 'FOREIGN KEY(track) REFERENCES tracks(id))') connection.execute( # Genres (Many-to-many) 'CREATE TABLE IF NOT EXISTS genres ' '(id INTEGER PRIMARY KEY, name VARCHAR(100))') connection.execute( # Junction Genres Tracks """CREATE TABLE IF NOT EXISTS tracks_genres ( track INTEGER, genre INTEGER, FOREIGN KEY(track) REFERENCES tracks(id) FOREIGN KEY(genre) REFERENCES genres(id))""") # Create cleanup triggers: # DELETE history → Tracks / Tracks_genres tables connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks AFTER DELETE ON history WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND (SELECT count(*) FROM blocklist WHERE track=old.track) = 0) BEGIN DELETE FROM tracks WHERE id = old.track; DELETE FROM tracks_genres WHERE track = old.track; END; ''') # DELETE Tracks_Genres → Genres table connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres AFTER DELETE ON tracks_genres WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0) BEGIN DELETE FROM genres WHERE id = old.genre; END; ''') # DELETE Tracks → Artists table connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists AFTER DELETE ON tracks WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0) BEGIN DELETE FROM artists WHERE id = old.artist; END; ''') # DELETE Tracks → Albums table connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums AFTER DELETE ON tracks WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND (SELECT count(*) FROM blocklist WHERE album=old.album) = 0) BEGIN DELETE FROM albums WHERE id = old.album; END; ''') # DELETE Tracks → cleanup AlbumArtists table connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists AFTER DELETE ON tracks WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0) BEGIN DELETE FROM albumartists WHERE id = old.albumartist; END; ''') # DELETE blocklist → Tracks table connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks AFTER DELETE ON blocklist WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND (SELECT count(*) FROM blocklist WHERE track=old.track) = 0) BEGIN DELETE FROM tracks WHERE id = old.track; END; ''') # DELETE blocklist → Artists table # The "SELECT count(*) FROM blocklist" is useless, # there can be only one blocklist.artist connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists AFTER DELETE ON blocklist WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0) BEGIN DELETE FROM artists WHERE id = old.artist; END; ''') # DELETE Tracks → Albums table # The "SELECT count(*) FROM blocklist" is useless, # there can be only one blocklist.album connection.execute(''' CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums AFTER DELETE ON blocklist WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND (SELECT count(*) FROM blocklist WHERE album=old.album) = 0) BEGIN DELETE FROM albums WHERE id = old.album; END; ''') connection.close()
[docs] def drop_all(self): connection = self.get_database_connection() rows = connection.execute( "SELECT name FROM sqlite_master WHERE type='table'") for row in rows.fetchall(): connection.execute(f'DROP TABLE IF EXISTS {row[0]}') connection.close()
def _remove_blocklist_id(self, blid, with_connection=None): """Remove a blocklist id""" if with_connection: connection = with_connection else: connection = self.get_database_connection() connection = self.get_database_connection() connection.execute('DELETE FROM blocklist' ' WHERE blocklist.id = ?', (blid,)) connection.commit() if not with_connection: connection.close() def _get_album(self, album, connection): if album.mbid: return connection.execute( "SELECT id FROM albums WHERE mbid = ?", (album.mbid,)) return connection.execute( "SELECT id FROM albums WHERE name = ? AND mbid IS NULL", (album.name,))
[docs] def get_album(self, album, with_connection=None, add=True): """get album information from the database. if not in database insert new entry. :param sima.lib.meta.Album album: album objet :param sqlite3.Connection with_connection: SQLite connection """ if with_connection: connection = with_connection else: connection = self.get_database_connection() rows = self._get_album(album, connection) for row in rows: if not with_connection: connection.close() return row[0] if not add: if not with_connection: connection.close() return None connection.execute( "INSERT INTO albums (name, mbid) VALUES (?, ?)", (album.name, album.mbid)) connection.commit() rows = self._get_album(album, connection) for row in rows: if not with_connection: connection.close() return row[0] if not with_connection: connection.close() return None
def _get_albumartist(self, artist, connection): if artist.mbid: return connection.execute( "SELECT id FROM albumartists WHERE mbid = ?", (artist.mbid,)) return connection.execute( "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL", (artist.name,))
[docs] def get_albumartist(self, artist, with_connection=None, add=True): """get albumartist information from the database. if not in database insert new entry. :param sima.lib.meta.Artist artist: artist :param sqlite3.Connection with_connection: SQLite connection """ if with_connection: connection = with_connection else: connection = self.get_database_connection() rows = self._get_albumartist(artist, connection) for row in rows: if not with_connection: connection.close() return row[0] if not add: if not with_connection: connection.close() return None connection.execute( "INSERT INTO albumartists (name, mbid) VALUES (?, ?)", (artist.name, artist.mbid)) connection.commit() rows = self._get_albumartist(artist, connection) for row in rows: if not with_connection: connection.close() return row[0] if not with_connection: connection.close()
def _get_artist(self, artist, connection): if artist.mbid: return connection.execute( "SELECT id FROM artists WHERE mbid = ?", (artist.mbid,)) return connection.execute( "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
[docs] def get_artist(self, artist, with_connection=None, add=True): """get artist information from the database. if not in database insert new entry. :param sima.lib.meta.Artist artist: artist :param sqlite3.Connection with_connection: SQLite connection """ if with_connection: connection = with_connection else: connection = self.get_database_connection() rows = self._get_artist(artist, connection) for row in rows: if not with_connection: connection.close() return row[0] if not add: if not with_connection: connection.close() return None connection.execute( "INSERT INTO artists (name, mbid) VALUES (?, ?)", (artist.name, artist.mbid)) connection.commit() rows = self._get_artist(artist, connection) for row in rows: if not with_connection: connection.close() return row[0] if not with_connection: connection.close()
[docs] def get_genre(self, genre, with_connection=None, add=True): """get genre from the database. if not in database insert new entry. :param str genre: genre as a string :param sqlite3.Connection with_connection: SQLite connection """ if with_connection: connection = with_connection else: connection = self.get_database_connection() rows = connection.execute( "SELECT id FROM genres WHERE name = ?", (genre,)) for row in rows: if not with_connection: connection.close() return row[0] if not add: if not with_connection: connection.close() return None connection.execute( "INSERT INTO genres (name) VALUES (?)", (genre,)) connection.commit() rows = connection.execute( "SELECT id FROM genres WHERE name = ?", (genre,)) for row in rows: if not with_connection: connection.close() return row[0]
[docs] def get_track(self, track, with_connection=None, add=True): """Get a track id from Tracks table, add if not existing, :param sima.lib.track.Track track: track to use :param bool add: add non existing track to database""" if not track.file: raise SimaDBError(f'Got a track with no file attribute: {track}') if with_connection: connection = with_connection else: connection = self.get_database_connection() rows = connection.execute( "SELECT * FROM tracks WHERE file = ?", (track.file,)) for row in rows: if not with_connection: connection.close() return row[0] if not add: # Not adding non existing track if not with_connection: connection.close() return None # Get an artist record or None if track.artist: art = Artist(name=track.artist, mbid=track.musicbrainz_artistid) art_id = self.get_artist(art, with_connection=connection) else: art_id = None # Get an albumartist record or None if track.albumartist: albart = Artist(name=track.albumartist, mbid=track.musicbrainz_albumartistid) albart_id = self.get_albumartist(albart, with_connection=connection) else: albart_id = None # Get an album record or None if track.album: alb = Album(name=track.album, mbid=track.musicbrainz_albumid) alb_id = self.get_album(alb, with_connection=connection) else: alb_id = None connection.execute( """INSERT INTO tracks (artist, albumartist, album, title, mbid, file) VALUES (?, ?, ?, ?, ?, ?)""", (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid, track.file)) connection.commit() # Add track id to junction tables self._add_tracks_genres(track, connection) rows = connection.execute( "SELECT id FROM tracks WHERE file = ?", (track.file,)) for row in rows: if not with_connection: connection.close() return row[0] if not with_connection: connection.close() return None
def _add_tracks_genres(self, track, connection): if not track.genres: return rows = connection.execute( "SELECT id FROM tracks WHERE file = ?", (track.file,)) trk_id = rows.fetchone()[0] for genre in track.genres: # add genre gen_id = self.get_genre(genre) connection.execute("""INSERT INTO tracks_genres (track, genre) VALUES (?, ?)""", (trk_id, gen_id))
[docs] def add_history(self, track, date=None): """Record last play date of track (ie. not a real play history). :param sima.lib.track.Track track: track to add to history :param datetime.datetime date: UTC datetime object (use "datetime.now(timezone.utc)" is not set)""" if not date: date = datetime.now(timezone.utc) connection = self.get_database_connection() track_id = self.get_track(track, with_connection=connection) rows = connection.execute("SELECT * FROM history WHERE track = ? ", (track_id,)) if not rows.fetchone(): connection.execute("INSERT INTO history (track) VALUES (?)", (track_id,)) connection.execute("UPDATE history SET last_play = ? " " WHERE track = ?", (date, track_id,)) connection.commit() connection.close()
[docs] def purge_history(self, duration=__HIST_DURATION__): """Remove old entries in history :param int duration: Purge history record older than duration in hours""" connection = self.get_database_connection() connection.execute("DELETE FROM history WHERE last_play" " < datetime('now', '-%i hours')" % duration) connection.execute('VACUUM') connection.commit() connection.close()
[docs] def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__): """ :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist. :param int duration: How long ago to fetch history from (in hours) """ date = datetime.now(timezone.utc) - timedelta(hours=duration) connection = self.get_database_connection() connection.row_factory = sqlite3.Row rows = connection.execute(""" SELECT albums.name AS name, albums.mbid as mbid, artists.name as artist, artists.mbid as artist_mbib, albumartists.name as albumartist, albumartists.mbid as albumartist_mbib FROM history JOIN tracks ON history.track = tracks.id LEFT OUTER JOIN albums ON tracks.album = albums.id LEFT OUTER JOIN artists ON tracks.artist = artists.id LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL ORDER BY history.last_play DESC""", (date.isoformat(' '),)) hist = [] for row in rows: vals = dict(row) if needle: # Here use artist instead of albumartist if needle != Artist(name=vals.get('artist'), mbid=vals.get('artist_mbib')): continue # Use albumartist / MBIDs if possible to build album artist if not vals.get('albumartist'): vals['albumartist'] = vals.get('artist') if not vals.get('albumartist_mbib'): vals['albumartist_mbib'] = vals.get('artist_mbib') artist = Artist(name=vals.get('albumartist'), mbid=vals.pop('albumartist_mbib')) album = Album(**vals, Artist=artist) if hist and hist[-1] == album: # remove consecutive dupes continue hist.append(album) connection.close() return hist
[docs] def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__): """Returns a list of Artist objects :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only :param int duration: How long ago to fetch history from (in hours) :type needle: sima.lib.meta.Artist or sima.lib.meta.MetaContainer """ date = datetime.now(timezone.utc) - timedelta(hours=duration) connection = self.get_database_connection() connection.row_factory = sqlite3.Row rows = connection.execute(""" SELECT artists.name AS name, artists.mbid as mbid FROM history JOIN tracks ON history.track = tracks.id LEFT OUTER JOIN artists ON tracks.artist = artists.id WHERE history.last_play > ? AND artists.name NOT NULL ORDER BY history.last_play DESC""", (date.isoformat(' '),)) last = deque(maxlen=1) hist = [] for row in rows: artist = Artist(**row) if last and last[0] == artist: # remove consecutive dupes continue last.append(artist) if needle and isinstance(needle, (Artist, str)): if needle == artist: hist.append(artist) # No need to go further break continue if needle and getattr(needle, '__contains__'): if artist in needle: hist.append(artist) # No need to go further continue hist.append(artist) connection.close() return hist
[docs] def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20): """Returns genre history :param int duration: How long ago to fetch history from (in hours) :param int limit: number of genre to fetch """ date = datetime.now(timezone.utc) - timedelta(hours=duration) connection = self.get_database_connection() rows = connection.execute(""" SELECT genres.name, artists.name FROM history JOIN tracks ON history.track = tracks.id LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id LEFT OUTER JOIN artists ON tracks.artist = artists.id LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre WHERE history.last_play > ? AND genres.name NOT NULL ORDER BY history.last_play DESC """, (date.isoformat(' '),)) genres = [] for row in rows: genres.append(row) if len({g[0] for g in genres}) >= limit: break connection.close() return genres
[docs] def fetch_history(self, artist=None, duration=__HIST_DURATION__): """Fetches tracks history, more recent first :param sima.lib.meta.Artist artist: limit history to this artist :param int duration: How long ago to fetch history from (in hours) """ date = datetime.now(timezone.utc) - timedelta(hours=duration) connection = self.get_database_connection() connection.row_factory = sqlite3.Row sql = """ SELECT tracks.title, tracks.file, artists.name AS artist, albumartists.name AS albumartist, artists.mbid as musicbrainz_artistid, albums.name AS album, albums.mbid AS musicbrainz_albumid, tracks.mbid as musicbrainz_trackid FROM history JOIN tracks ON history.track = tracks.id LEFT OUTER JOIN artists ON tracks.artist = artists.id LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id LEFT OUTER JOIN albums ON tracks.album = albums.id WHERE history.last_play > ? """ if artist: if artist.mbid: rows = connection.execute(sql+""" AND artists.mbid = ? ORDER BY history.last_play DESC""", (date.isoformat(' '), artist.mbid)) else: rows = connection.execute(sql+""" AND artists.name = ? ORDER BY history.last_play DESC""", (date.isoformat(' '), artist.name)) else: rows = connection.execute(sql+'ORDER BY history.last_play DESC', (date.isoformat(' '),)) hist = [] for row in rows: hist.append(Track(**row)) connection.close() return hist
[docs] def get_bl_track(self, track, with_connection=None, add=True): """Add a track to blocklist :param sima.lib.track.Track track: Track object to add to blocklist :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one :param bool add: Default is to add a new record, set to False to fetch associated record""" if with_connection: connection = with_connection else: connection = self.get_database_connection() track_id = self.get_track(track, with_connection=connection, add=add) rows = connection.execute( "SELECT id FROM blocklist WHERE track = ?", (track_id,)) if not rows.fetchone(): if not add: if not with_connection: connection.close() return None connection.execute('INSERT INTO blocklist (track) VALUES (?)', (track_id,)) connection.commit() rows = connection.execute( "SELECT id FROM blocklist WHERE track = ?", (track_id,)) blt = rows.fetchone()[0] if not with_connection: connection.close() return blt
[docs] def get_bl_album(self, album, with_connection=None, add=True): """Add an album to blocklist :param sima.lib.meta.Album: Album object to add to blocklist :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one :param bool add: Default is to add a new record, set to False to fetch associated record""" if with_connection: connection = with_connection else: connection = self.get_database_connection() album_id = self.get_album(album, with_connection=connection, add=add) rows = connection.execute( "SELECT id FROM blocklist WHERE album = ?", (album_id,)) if not rows.fetchone(): if not add: if not with_connection: connection.close() return None connection.execute('INSERT INTO blocklist (album) VALUES (?)', (album_id,)) connection.commit() rows = connection.execute( "SELECT id FROM blocklist WHERE album = ?", (album_id,)) blitem = rows.fetchone()[0] if not with_connection: connection.close() return blitem
[docs] def get_bl_artist(self, artist, with_connection=None, add=True): """Add an artist to blocklist :param sima.lib.meta.Artist: Artist object to add to blocklist :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one :param bool add: Default is to add a new record, set to False to fetch associated record""" if with_connection: connection = with_connection else: connection = self.get_database_connection() artist_id = self.get_artist(artist, with_connection=connection, add=add) rows = connection.execute( "SELECT id FROM blocklist WHERE artist = ?", (artist_id,)) if not rows.fetchone(): if not add: return None connection.execute('INSERT INTO blocklist (artist) VALUES (?)', (artist_id,)) connection.commit() rows = connection.execute( "SELECT id FROM blocklist WHERE artist = ?", (artist_id,)) blitem = rows.fetchone()[0] if not with_connection: connection.close() return blitem
[docs] def view_bl(self): connection = self.get_database_connection() connection.row_factory = sqlite3.Row rows = connection.execute("""SELECT artists.name AS artist, artists.mbid AS musicbrainz_artist, albums.name AS album, albums.mbid AS musicbrainz_album, tracks.title AS title, tracks.mbid AS musicbrainz_title, tracks.file AS file, blocklist.id FROM blocklist LEFT OUTER JOIN artists ON blocklist.artist = artists.id LEFT OUTER JOIN albums ON blocklist.album = albums.id LEFT OUTER JOIN tracks ON blocklist.track = tracks.id""") res = [dict(row) for row in rows.fetchall()] connection.close() return res
[docs] def delete_bl(self, track=None, album=None, artist=None): if not (track or album or artist): return connection = self.get_database_connection() blid = None if track: blid = self.get_bl_track(track, with_connection=connection) if album: blid = self.get_bl_album(album, with_connection=connection) if artist: blid = self.get_bl_artist(artist, with_connection=connection) if not blid: return self._remove_blocklist_id(blid, with_connection=connection) connection.close()
# VIM MODLINE # vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8