Source code for database.load

"""
Load project information and read from the project database
"""

import sqlite3
from configparser import ConfigParser


[docs]def create_db(sql_file_name: str): """ Create a new sql table from .slq file in /database Parameters ---------- sql_file_name : str name of the sql file to create """ # Load database db = ProjectLoader().load_db() # Make database with open(sql_file_name, "r") as sql_file: db.conn.executescript(sql_file.read())
[docs]class ProjectLoader: def __init__(self): from pathlib import Path config_file = Path(__file__).resolve().parent.parent / "config.ini" config = ConfigParser() config.read(config_file) self.path = Path(config.get("path", "project")) self.db_path = Path(config.get("path", "database")) self.db = Path(config.get("file", "database")) @property def open_folder(self): """Open the directory in win explorer""" import webbrowser webbrowser.open(self.path)
[docs] def load_db(self): db_path = self.path / self.db_path / self.db return Database(db_path)
[docs]class Database: def __init__(self, db_path): self.path = db_path self.dir = self.path.parent self.conn = sqlite3.connect(db_path) self.conn.row_factory = sqlite3.Row self.cur = self.conn.cursor()
[docs] def execute(self, query): self.cur.execute(query)
[docs] def create_col(self, table: str, col_name: str, type: str): """ Create a new column in table Parameters ---------- table : str table name to add the column col_name : str column name type : str data type for the column (e.g., TEXT, INT) """ if col_name not in self.col_names(table): self.cur.execute( "ALTER TABLE {} ADD COLUMN {} {}".format(table, col_name, type) )
[docs] def col_names(self, table) -> list: """Get column names""" self.cur.execute(f"PRAGMA table_info({table})") columns = self.cur.fetchall() return [x[1] for x in columns]
[docs] def update( self, table, col_name, condition_col=None, condition_value=None, value=None ): """Update values to table""" if condition_col is not None: self.cur.execute( "UPDATE {} SET {} = ? WHERE {} = ?".format( table, col_name, condition_col ), (value, condition_value), ) else: self.cur.execute( "UPDATE {} SET {} = ? WHERE {} = ?".format( table, col_name, condition_col ), (value, condition_value), ) self.conn.commit()
[docs] def to_csv(self, table, add_date=True, open_folder=True): """ Convert database to csv Parameters ---------- table : str Name of the table open_folder : bool Open the output directory """ from datetime import datetime from pathlib import Path import pandas as pd csv_name = Path(f"{table}.csv") if add_date: # append time&time info to csv csv_name = ( csv_name.stem + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + csv_name.suffix ) df = pd.read_sql("SELECT * from {}".format(table), self.conn) df.to_csv(self.dir / csv_name, index=False, header=True) if open_folder: # Open the directory in win explorer import webbrowser webbrowser.open(self.dir)
[docs] def to_dataframe(self, query): """ Convert to pandas dataframe according to the query statement Parameters ---------- query : str SQL query statement Returns ------- df : dataframe """ import pandas as pd df = pd.read_sql_query(query, self.conn) return df
[docs]class DBInfo: def __init__(self, db): # Set all database fields as attributes self.channel = None for key in db.keys(): # dic[col] = database[col] setattr(self, key, db[key]) def __repr__(self): # print attributes return str([key for key in self.__dict__.keys()])
[docs] def load_cluster_db(self): from pathlib import Path cluster_id = "" if len(str(self.id)) == 1: cluster_id = "00" + str(self.id) elif len(str(self.id)) == 2: cluster_id = "0" + str(self.id) else: cluster_id = str(self.id) cluster_taskSession = "" if len(str(self.taskSession)) == 1: cluster_taskSession = "D0" + str(self.taskSession) elif len(str(self.taskSession)) == 2: cluster_taskSession = "D" + str(self.taskSession) cluster_taskSession += "(" + str(self.sessionDate) + ")" if self.channel: # if neural signal exists cluster_name = [ cluster_id, self.birdID, self.taskName, cluster_taskSession, self.site, self.channel, self.unit, ] else: cluster_name = [ cluster_id, self.birdID, self.taskName, cluster_taskSession, self.site, ] cluster_name = "-".join(map(str, cluster_name)) # Get cluster path project_path = ProjectLoader().path cluster_path = ( project_path / self.birdID / self.taskName / cluster_taskSession / self.site[-2:] / "Songs" ) cluster_path = Path(cluster_path) return cluster_name, cluster_path
[docs] def load_song_db(self): from pathlib import Path song_id = "" if len(str(self.id)) == 1: song_id = "00" + str(self.id) elif len(str(self.id)) == 2: song_id = "0" + str(self.id) else: song_id = str(self.id) task_session = "" if len(str(self.taskSession)) == 1: task_session = "D0" + str(self.taskSession) elif len(str(self.taskSession)) == 2: task_session = "D" + str(self.taskSession) task_session += "(" + str(self.sessionDate) + ")" song_name = [song_id, self.birdID, self.taskName, task_session] song_name = "-".join(map(str, song_name)) # Get cluster path project_path = ProjectLoader().path song_path = project_path / self.birdID / self.taskName / task_session song_path = Path(song_path) return song_name, song_path