Source code for nexuslims_logger.filewatcher

"""FileWatcher will watch a directory,and sync with Cloud periodically.

It will upload any files (require file types match if specified) (modified after
certain time if specified) that checksum changed wrt. cache (if any) to a GCP
cloud bucket in a specified interval.
"""
__all__ = ["FileWatcher"]

import base64
import hashlib
import json
import logging
import os
from datetime import datetime, timezone

from google.cloud import storage


def calc_file_md5(filename):
    """Get md5 checksum (base64 encoded) of a local file."""
    hash = hashlib.md5()
    with open(filename, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash.update(chunk)
    return base64.b64encode(hash.digest()).decode()


[docs]class FileWatcher: def __init__(self, watch_dir, bucket_name, bucket_dir, credentials, cache_fn, project=None, interval=600, file_types=None, mtime_since=None, instr_info=None, logger=None): self.watch_dir = watch_dir self._bucket_dir = bucket_dir self.client = storage.Client(project=project, credentials=credentials) self.bucket = self.client.get_bucket(bucket_name) self.cache_fn = cache_fn self.cache = json.load(open(cache_fn)) self.file_types = file_types self._mtime_since = mtime_since self._interval = interval self._instr_info = instr_info or {} self.logger = logger or logging.getLogger("FW") self.logger.info("FileWatcher initialized.") msg = "watching directory `%s` every %d seconds" % (watch_dir, interval) if mtime_since: msg += " for files modified after %s" % datetime.fromtimestamp(mtime_since).isoformat() self.logger.debug(msg)
[docs] @classmethod def from_config(cls, config, credentials, cache_fn, logger=None): return cls(config["NEXUSLIMSGUI_FILESTORE_PATH"], config["NEXUSLIMSGUI_DATA_BUCKET"], "", credentials, cache_fn, project=config["NEXUSLIMSGUI_GCP_PROJECT"], interval=config["NEXUSLIMSGUI_SYNC_INTERVAL_SECONDS"], file_types=config["NEXUSLIMSGUI_FILETYPES_SYNC"], logger=logger)
@property def mtime_since(self): return self._mtime_since @mtime_since.setter def mtime_since(self, t): self._mtime_since = t msg = "only watch files modified after %s" % datetime.fromtimestamp(t).isoformat() self.logger.debug(msg) @property def interval(self): return self._interval @interval.setter def interval(self, t): self._interval = t self.logger.debug("set file watch interval as %d" % t) @property def bucket_dir(self): return self._bucket_dir @bucket_dir.setter def bucket_dir(self, d): self._bucket_dir = d self.logger.debug("set GCP bucket: %s" % d) @property def instr_info(self): return self._instr_info @instr_info.setter def instr_info(self, d): self._instr_info = d self.logger.debug("set instrument info")
[docs] def get_files_to_upload(self): """find files to upload recursively and return list of abs file names and content checksum. file satisfying the following condition will be considered for uploading: - file type is allowed (set in app config) - file modification timestamp is after the set threshold (session start time) - file content checksum does not exist in cache or updated. Returns ------- List[Tuple[str, str]] List of tuple consisting file names and MD5 checksum. """ res = [] for p, dirs, fs in os.walk(self.watch_dir): for f in fs: _, ext = os.path.splitext(f) if self.file_types and ext not in self.file_types: continue absfn = os.path.join(p, f) if self.mtime_since and os.path.getmtime(absfn) < self.mtime_since: continue md5_checksum = calc_file_md5(absfn) if absfn in self.cache and self.cache[absfn] == md5_checksum: continue res.append((absfn, md5_checksum)) self.logger.info("%d files found to upload." % len(res)) if res: self.logger.debug("filenames: %s" % str([f for f, _ in res])) return res
[docs] def upload(self): """upload to the cloud object storage, set metadata and update the cache. """ files = self.get_files_to_upload() for f, md5 in files: relpath = os.path.relpath(f, self.watch_dir) bucket_path = "%s/%s" % (self.bucket_dir, relpath) ts = os.path.getmtime(f) mtime = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() blob = self.bucket.blob(bucket_path) blob.metadata = { "mtime": mtime, "instr_name": self.instr_info.get("schema_name") } blob.upload_from_filename(f) self.cache[f] = md5 with open(self.cache_fn, 'w') as f: f.write(json.dumps(self.cache, indent=4)) if files: self.logger.info("%d files uploaded." % len(files))