"""LoggerHub"""
import logging
import queue
import sys
import threading
import tkinter as tk
from datetime import timedelta
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
from urllib.parse import urljoin
from uuid import uuid4
import requests
import zmq
from dateutil.parser import parse
from dateutil.tz import tzlocal
from timeloop import Timeloop
from .filewatcher import FileWatcher
from .instrument import GCPInstrument
from .utils import LOGGING_FMT, get_logger, resource_path
[docs]class DBSessionLogger:
"""communicate with database."""
def __init__(self, cpu_name, dbapi_url,
dbapi_username=None,
dbapi_password=None,
user=None,
logger=None):
"""
Parameters
----------
cpu_name : str
user : str
dbapi_url : str
dbapi_username : str
dbapi_password : str
user : str
logger : logging.Logger
"""
self.dbapi_url = dbapi_url
self.dbapi_auth = (dbapi_username, dbapi_password)
self.user = user
self.logger = logger or logging.getLogger("DSL")
self.cpu_name = cpu_name
self.session_id = None
self.instr_info = None
self.instr_pid = None
self.instr_schema = None
self.session_started = False
self.session_start_time = None
self.last_entry_type = None
self.last_session_id = None
self.last_session_row_number = None
self.last_session_ts = None
self.last_start_id = None # last start `id_session_log`
self.progress_num = 0
self.session_note = ""
self.action_map = {
'SETUP': self.db_logger_setup,
'LAST_SESSION_CHECK': self.last_session_ended,
'START_PROCESS': self.process_start,
'START_PROCESS_CHECK': self.process_start_check,
'TEAR_DOWN': self.db_logger_teardown,
'SAVE_NOTE': self.save_note,
'END_PROCESS': self.process_end,
'END_PROCESS_CHECK': self.process_end_check,
'UPDATE_START_RECORD': self.update_start,
'UPDATE_START_RECORD_CHECK': self.update_start_check,
'CONTINUE_LAST_SESSION': self.continue_last_session,
}
[docs] @classmethod
def from_config(cls, config, cpu_name, user=None, logger=None):
return cls(cpu_name,
config["NEXUSLIMSHUB_DBAPI_URL"],
dbapi_username=config["NEXUSLIMSHUB_DBAPI_USERNAME"],
dbapi_password=config["NEXUSLIMSHUB_DBAPI_PASSWORD"],
user=user,
logger=logger)
[docs] def handle(self, msg):
cmd = msg['cmd']
argv = msg.get('argv', [])
kwarg = msg.get('kwarg', {})
try:
is_success, msg = self.action_map[cmd](*argv, **kwarg)
except Exception as e:
return {'state': False,
'exception': True,
'message': str(e)}
res = {'state': is_success,
'message': msg,
'exception': False,
'progress': self.progress_num}
self.progress_num += 1
return res
[docs] def last_session_ended(self):
"""
Check the database for this instrument to make sure that the last
entry in the db was an "END" (properly ended). If it's not, return
False so the GUI can query the user for additional input on how to
proceed.
Returns
-------
is_success : bool
If the database is consistent (i.e. the last log for this
instrument is an "END" log), return True. If not (it's a "START"
log), return False
msg : str
"""
url = urljoin(self.dbapi_url, "/api/lastsession")
res = requests.get(url, params={"instrument": self.instr_pid}, auth=self.dbapi_auth)
if res.status_code >= 500:
msg = str(res.content)
self.logger.debug(res.text)
self.logger.error(msg)
raise Exception(msg)
if res.status_code == 404:
self.last_entry_type = "END"
if res.status_code == 200:
data = res.json()["data"]
self.last_entry_type = data["event_type"]
self.last_session_id = data["session_identifier"]
self.last_session_row_number = data["id_session_log"]
self.last_session_ts = data["timestamp"]
self.session_note = data["session_note"]
self.session_id = self.last_session_id
if self.last_entry_type == "END":
msg = "Verified database consistency for the %s." % self.instr_schema
self.logger.debug(msg)
return True, msg
elif self.last_entry_type == "START":
msg = "Database is inconsistent for the %s. " \
"(last entry [id_session_log = %s] was a `START`)" % (
self.instr_schema, self.last_session_row_number)
self.logger.warning(msg)
return False, msg
else:
msg = "Last entry for the %s was neither `START` or `END` (value was %s)" % (
self.instr_schema, self.last_entry_type)
self.logger.error(msg)
raise Exception(msg)
[docs] def process_start(self):
"""
Insert a session `'START'` log for this computer's instrument
Returns True if successful, False if not
"""
self.session_id = str(uuid4())
self.session_note = ""
# Insert START log
url = urljoin(self.dbapi_url, "/api/session")
payload = {
"event_type": "START",
"instrument": self.instr_pid,
"user": self.user,
"session_identifier": self.session_id,
"session_note": self.session_note
}
res = requests.post(url, data=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error inserting `START` log into DB. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
self.session_started = True
msg = "`START` session inserted into db."
self.logger.info(msg)
return True, msg
[docs] def process_start_check(self):
# verify insertion success by query db
url = urljoin(self.dbapi_url, "/api/lastsession")
payload = {
"session_identifier": self.session_id,
"event_type": "START",
}
res = requests.get(url, params=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error verifying that session was started. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
data = res.json()["data"]
# convert GMT time to local time
self.session_start_time = parse(data["timestamp"]).astimezone(tzlocal())
msg = "Verified insertion of row " + str(data)
self.logger.debug(msg)
return True, msg
[docs] def process_end(self):
"""
Insert a session `'END'` log for this computer's instrument,
and change the status of the corresponding `'START'` entry from
`'WAITING_FOR_END'` to `'TO_BE_BUILT'`
"""
# Insert END log
url = urljoin(self.dbapi_url, "/api/session")
payload = {
"instrument": self.instr_pid,
"event_type": "END",
"record_status": "TO_BE_BUILT",
"session_identifier": self.session_id,
"session_note": self.session_note,
"user": self.user,
}
res = requests.post(url, data=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error inserting `END` log for session"
self.logger.debug(res.text)
self.logger.error(msg)
raise Exception(msg)
msg = "`END` session log inserted into db"
self.logger.info(msg)
self.progress_num = 1
return True, msg
[docs] def process_end_check(self):
# verify insertion success by querying
url = urljoin(self.dbapi_url, "/api/lastsession")
payload = {
"session_identifier": self.session_id,
"event_type": "END",
}
res = requests.get(url, params=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error verifying that session was ended. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
data = res.json()["data"]
msg = "Verified `END` session inserted into db. " + str(data)
self.logger.debug(msg)
return True, msg
def __last_start_id(self):
if self.last_start_id is None:
# Query matched last start
url = urljoin(self.dbapi_url, "/api/lastsession")
payload = {
"session_identifier": self.session_id,
"event_type": "START",
}
res = requests.get(url, params=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error getting matching `START` log. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
data = res.json()["data"]
msg = "Found matched `START` log: " + str(data)
self.logger.debug(msg)
self.last_start_id = data["id_session_log"]
return self.last_start_id
[docs] def update_start(self):
# Update matched last start
url = urljoin(self.dbapi_url, "/api/session")
payload = {
"id_session_log": self.__last_start_id(),
"record_status": "TO_BE_BUILT",
}
res = requests.put(url, data=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error updating matching `START` log's status. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
msg = "Matching `START` session log's status updated."
self.logger.info(msg)
return True, msg
[docs] def update_start_check(self):
# Verify update success by querying
url = urljoin(self.dbapi_url, "/api/session")
payload = {
"id_session_log": self.__last_start_id(),
"record_status": "TO_BE_BUILT",
}
res = requests.get(url, params=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error updating matching `START` log's status. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
data = res.json()["data"]
msg = "Verified updated row: " + str(data)
self.logger.debug(msg)
self.logger.info("Finished ending session %s" % self.session_id)
return True, msg
[docs] def continue_last_session(self):
self.session_started = True
self.session_start_time = parse(self.last_session_ts).astimezone(tzlocal())
msg = 'Set start time/id as last start time/id.'
return True, msg
[docs] def db_logger_setup(self):
"""
get instrument info (pid, schema name).
"""
self.logger.info("Computer Name: %s" % self.cpu_name)
url = urljoin(self.dbapi_url, "/api/instrument")
payload = {
"computer_name": self.cpu_name,
}
res = requests.get(url, params=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error fetching instrument information from DB. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
data = res.json()["data"]
msg = "Loaded instrument information from DB"
self.logger.info(msg)
self.logger.debug("Instrument info: %s" % str(data))
self.progress_num = 1
self.instr_info = data
self.instr_pid = self.instr_info["instrument_pid"]
self.instr_schema = self.instr_info["schema_name"]
return True, msg
[docs] def save_note(self, note_text):
# Update matched last start
url = urljoin(self.dbapi_url, "/api/session")
payload = {
"id_session_log": self.__last_start_id(),
"session_note": note_text,
}
res = requests.put(url, data=payload, auth=self.dbapi_auth)
if res.status_code != 200:
msg = "Error updating session_note. " + str(res.content)
self.logger.error(msg)
raise Exception(msg)
self.session_note = note_text
msg = "session_note saved."
self.logger.info(msg)
return True, msg
[docs] def db_logger_teardown(self):
"""
teardown routine
"""
msg = "TEARDOWN"
self.logger.debug(msg)
# `msg` here is a JSON object to pass additional information for GUI pannel display
return True, {
'instrument_schema': self.instr_schema,
'session_start_ts': self.session_start_time.strftime("%a %b %d, %Y\n%I:%M:%S %p"),
'session_note': self.session_note
}
[docs]class QueueHandler(logging.Handler):
"""Class to send logging records to a queue
It can be used from different threads
The ConsoleUi class polls this queue to display records in a ScrolledText widget
"""
# Example from Moshe Kaplan: https://gist.github.com/moshekaplan/c425f861de7bbf28ef06
# (https://stackoverflow.com/questions/13318742/python-logging-to-tkinter-text-widget) is not thread safe! # noqa
# See https://stackoverflow.com/questions/43909849/tkinter-python-crashes-on-new-thread-trying-to-log-on-main-thread # noqa
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
[docs] def emit(self, record):
self.log_queue.put(record)
[docs]class ConsoleUi:
"""Poll messages from a logging queue and display them in a scrolled text widget"""
def __init__(self, frame, logger):
self.frame = frame
# Create a ScrolledText wdiget
self.scrolled_text = ScrolledText(frame, state='disabled')
self.scrolled_text.grid(row=0, column=0, columnspan=5, rowspan=2,
sticky=(tk.N, tk.S, tk.W, tk.E))
self.scrolled_text.configure(font='TkFixedFont')
self.scrolled_text.tag_config('INFO', foreground='black')
self.scrolled_text.tag_config('DEBUG', foreground='gray')
self.scrolled_text.tag_config('WARNING', foreground='orange')
self.scrolled_text.tag_config('ERROR', foreground='red')
self.scrolled_text.tag_config('CRITICAL', foreground='red', underline=1)
# Create a logging handler using a queue
self.log_queue = queue.Queue()
self.queue_handler = QueueHandler(self.log_queue)
formatter = logging.Formatter(LOGGING_FMT)
self.queue_handler.setFormatter(formatter)
logger.addHandler(self.queue_handler)
# Start polling messages from the queue
self.frame.after(100, self.poll_log_queue)
[docs] def display(self, record):
msg = self.queue_handler.format(record)
self.scrolled_text.configure(state='normal')
self.scrolled_text.insert(tk.END, msg + '\n', record.levelname)
self.scrolled_text.configure(state='disabled')
# Autoscroll to the bottom
self.scrolled_text.yview(tk.END)
[docs] def poll_log_queue(self):
# Check every 100ms if there is a new message in the queue to display
while True:
try:
record = self.log_queue.get(block=False)
except queue.Empty:
break
else:
self.display(record)
self.frame.after(100, self.poll_log_queue)
[docs]class App(tk.Tk):
def __init__(self, config, cred_json, cache_json, verbose=logging.INFO):
super(App, self).__init__()
self.config = config
self.cred_json = cred_json
self.cache_json = cache_json
self.logger = get_logger('HUB', verbose=verbose)
self.thread = None
self.geometry('600x250')
self.style = ttk.Style(self)
self.resizable(True, True)
self.title('NexusLIMS Logger Hub')
self.protocol('WM_DELETE_WINDOW', self.stop)
self.wm_iconphoto(True, tk.PhotoImage(master=self, file=resource_path("logo_bare.png")))
self.rowconfigure(0, weight=1)
self.columnconfigure(0, weight=1)
content = ttk.Frame(self, padding=(5, 5, 5, 0))
self.start_btn = ttk.Button(content, text='Start', command=self.start)
self.end_btn = ttk.Button(content, text='End', command=self.stop, state=tk.DISABLED)
self.copy_btn = ttk.Button(content, text='Copy', command=self.copy_text_to_clipboard)
content.grid(column=0, row=0, sticky=(tk.N, tk.S, tk.E, tk.W))
self.start_btn.grid(column=1, row=3)
self.end_btn.grid(column=2, row=3, padx=5, pady=5)
self.copy_btn.grid(column=3, row=3)
self.console = ConsoleUi(content, self.logger)
content.columnconfigure(0, weight=1)
content.columnconfigure(4, weight=1)
content.rowconfigure(1, weight=1)
# zmq socket
self.zmqcxt = zmq.Context()
self.socket = self.zmqcxt.socket(zmq.REP)
# containers
self.dbsessionloggers = {}
self.filewatchers = {}
self.timeloops = {}
self.gcpinstruments = {}
[docs] def start(self):
p = self.config.get("NEXUSLIMSHUB_PORT")
self.socket.bind(f'tcp://*:{p}')
self.thread = threading.Thread(target=self.run)
self.thread.start()
self.start_btn.configure(state=tk.DISABLED)
self.end_btn.configure(state=tk.NORMAL)
self.logger.info(f'LoggerHub started, listening on port {p}')
[docs] def run(self):
while True:
try:
msg = self.socket.recv_json()
self.logger.debug(msg)
except zmq.error.ContextTerminated:
break
client_id = msg.get('client_id')
dsl = self.dbsessionloggers.setdefault(
client_id,
DBSessionLogger.from_config(
self.config, client_id, user=msg.get('user'), logger=self.logger)
)
cmd = msg.get('cmd')
res = {}
if cmd == 'START_SYNC':
fw = self.filewatchers.setdefault(
client_id, FileWatcher.from_config(self.config,
msg.get('watchdir'),
credential_fn=self.cred_json,
cache_fn=self.cache_json,
logger=self.logger))
fw.bucket_dir = dsl.instr_info.get('filestore_path', dsl.instr_pid)
fw.mtime_since = dsl.session_start_time.timestamp()
fw.instr_info = dsl.instr_info
if client_id not in self.timeloops:
tl = Timeloop()
tl.logger = self.logger
tl._add_job(fw.upload, timedelta(seconds=fw.interval))
tl.start()
self.timeloops[client_id] = tl
res = {'state': True,
'message': 'sync thread started',
'exception': False}
elif cmd == 'STOP_SYNC':
tl = self.timeloops.get(client_id)
if tl:
try:
tl.stop()
except Exception as e:
self.logger.exception(e)
pass
fw = self.filewatchers.get(client_id)
if fw:
fw.upload()
res = {'state': True,
'message': 'sync thread stopped',
'exception': False}
elif cmd == 'MAKE_DATA':
instr = self.gcpinstruments.setdefault(
client_id, GCPInstrument.from_config(self.config,
msg.get('outputdir'),
credential_fn=self.cred_json,
logger=self.logger))
instr.generate_data()
res = {'state': True,
'message': 'copy a datafile',
'exception': False}
elif cmd == 'DESTROY':
self.gcpinstruments.pop(client_id, None)
tl = self.timeloops.pop(client_id, None)
if tl:
tl.stop()
self.filewatchers.pop(client_id, None)
self.dbsessionloggers.pop(client_id, None)
res = {'state': True,
'message': 'Hub released resources',
'exception': False}
elif cmd == 'HELLO':
res = {'state': True,
'message': 'world',
'exception': False}
elif cmd in dsl.action_map:
res = dsl.handle(msg)
else:
self.logger.error(f'Undefined cmd: {cmd} received from client {client_id}!')
res = {'state': False,
'message': f'{cmd} is not defined',
'exception': True}
self.socket.send_json(res)
[docs] def stop(self):
if not self.zmqcxt.closed:
self.zmqcxt.destroy()
self.destroy()
[docs] def copy_text_to_clipboard(self):
text = self.console.scrolled_text.get('1.0', 'end')
self.clipboard_clear()
self.clipboard_append(text)
self.update()
[docs]def validate_config(config):
# `api_url`
api_url = config.get("NEXUSLIMSHUB_DBAPI_URL")
res = requests.get(api_url)
if res.status_code != 200 or res.text != "API for nexuslims-db":
raise ValueError("api_url `%s` is not responding" % api_url)
# `port`
port = config.get('NEXUSLIMSHUB_PORT')
if not isinstance(port, int) or port <= 3000:
raise ValueError('`NEXUSLIMSHUB_PORT` must be set as integer > 3000')
return True
[docs]def main():
import json
import os
import pathlib
from .utils import Config, check_singleton, show_error_msg_box
# check singleton
try:
check_singleton()
except OSError:
msg = ("Only one instance of the NexusLIMS Session Logger can be run at one time. "
"Please close the existing window if you would like to start a new session "
"and run the application again.")
show_error_msg_box(msg)
sys.exit(0)
# options
verbosity = logging.DEBUG
if len(sys.argv) > 1:
v = sys.argv[1][1:]
if v == 's':
verbosity = logging.CRITICAL
elif v == 'v':
verbosity = logging.INFO
elif v == 'vv':
verbosity = logging.DEBUG
elif v == 'h':
print(help())
sys.exit(1)
else:
print("wrong option provided!")
print(help())
sys.exit(0)
logger = get_logger("APP", verbose=verbosity)
# config, credential, cache
config_fn = os.path.join(pathlib.Path.home(), "nexuslims", "gui", "hubconfig.json")
config = Config()
try:
config.update(json.load(open(config_fn)))
except Exception:
logger.warning("file `%s` cannot be found, use ENV variables instead.")
try:
validate_config(config)
except Exception as e:
show_error_msg_box(str(e))
sys.exit(0)
# credential
cred_json = os.path.join(pathlib.Path.home(), "nexuslims", "gui", "creds.json")
if not os.path.exists(cred_json):
msg = "Credential file `%s` cannot be found!" % cred_json
show_error_msg_box(msg)
sys.exit(0)
# cache
cache_json = os.path.join(pathlib.Path.home(), "nexuslims", "gui", "cache.json")
if not os.path.exists(cache_json):
with open(cache_json, 'w') as f:
f.write(json.dumps({}))
# app
app = App(config, cred_json, cache_json, verbose=verbosity)
app.mainloop()
if __name__ == '__main__':
main()