Source code for zot.plugs.db

# zot/plugs/fs.py
#
#

""" add a value. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from zot.utils import short_date
from zot.object import Object
from zot.runtime import kernel

import logging
import time
import os

## db.add command

[docs]def db_add(event): rest = event.rest() try: key, value = rest.split(" ", 1) except ValueError: return o = Object() if key in dir(o): event.reply("%s already defined in default Object" % key) ; return o.prefix = "add" o[key] = value path = o.save() event.ok()
kernel.register("db.add", db_add) ## db.delete command
[docs]def db_deleted(event): if not event.args: return nr = 1 for obj in kernel.all("deleted"): event.reply(obj.format(nr)) ; event.reply("") ; nr += 1
kernel.register("db.deleted", db_deleted) ## db.rm command
[docs]def db_rm(event): try: key, match = event.args except ValueError: return nr = 0 for obj in kernel.objects(): if key not in obj: continue if match not in obj[key]: continue obj.deleted = True obj.sync() nr += 1 event.ok(nr)
kernel.register("db.rm", db_rm) ## db.undel command
[docs]def fs_undel(event): try: key, match = event.args except ValueError: return nr = 0 for obj in kernel.objects(**{"full": True}): if key not in obj: continue if match not in obj[key]: continue if "deleted" not in obj: continue obj.deleted = False obj.sync() nr += 1 event.ok(nr)
kernel.register("db.undel", fs_undel) ## db.find command
[docs]def db_find(event): if not event.args: return objs = sorted(event.selected(), key=lambda x: x.get_timed()) event.display_list(objs, keys=event.args)
kernel.register("db.find", db_find) ## db.dump command
[docs]def db_dump(event): if not event.args: return objs = sorted(event.selected(event), key=lambda x: x.get_timed()) try: event.reply(str(objs[event.index])) except: for obj in objs: event.reply(str(obj))
kernel.register("db.dump", db_dump) ## db.first command
[docs]def db_first(event): if not event.args: return obj = kernel.first(event.args[0]) if obj: event.reply(obj.json())
kernel.register("db.first", db_first) ## db.last command
[docs]def db_last(event): if not event.args: return obj = kernel.last(event.args[0]) if obj: event.reply(obj.json())
kernel.register("db.last", db_last) ## db.show command
[docs]def db_show(event): if not event.args: return objs = sorted(kernel.selected(event), key=lambda x: x.get_timed()) event.reply_list(objs, skiplist=["saved", "ctype", "prefile"])
kernel.register("db.show", db_show) ## db.text command
[docs]def db_text(event): if not event.args: return rest = event.rest() objs = sorted(kernel.full(event), key=lambda x: x.get_timed()) nr = 1 for obj in objs: if obj.search(rest): event.reply("\n" + obj.format(nr, skiplist=["saved"])) ; nr += 1
kernel.register("db.text", db_text)