Source code for zot.plugs.db
# zot/plugs/fs.py
#
#
""" add a value. """
__copyright__ = "Copyright 2015, B.H.J Thate"
## IMPORTS
from zot.utils import short_date
from zot.object import Object
from zot.runtime import kernel
import logging
import time
import os
## db.add command
[docs]def db_add(event):
rest = event.rest()
try: key, value = rest.split(" ", 1)
except ValueError: return
o = Object()
if key in dir(o): event.reply("%s already defined in default Object" % key) ; return
o.prefix = "add"
o[key] = value
path = o.save()
event.ok()
kernel.register("db.add", db_add)
## db.delete command
[docs]def db_deleted(event):
if not event.args: return
nr = 1
for obj in kernel.all("deleted"): event.reply(obj.format(nr)) ; event.reply("") ; nr += 1
kernel.register("db.deleted", db_deleted)
## db.rm command
[docs]def db_rm(event):
try: key, match = event.args
except ValueError: return
nr = 0
for obj in kernel.objects():
if key not in obj: continue
if match not in obj[key]: continue
obj.deleted = True
obj.sync()
nr += 1
event.ok(nr)
kernel.register("db.rm", db_rm)
## db.undel command
[docs]def fs_undel(event):
try: key, match = event.args
except ValueError: return
nr = 0
for obj in kernel.objects(**{"full": True}):
if key not in obj: continue
if match not in obj[key]: continue
if "deleted" not in obj: continue
obj.deleted = False
obj.sync()
nr += 1
event.ok(nr)
kernel.register("db.undel", fs_undel)
## db.find command
[docs]def db_find(event):
if not event.args: return
objs = sorted(event.selected(), key=lambda x: x.get_timed())
event.display_list(objs, keys=event.args)
kernel.register("db.find", db_find)
## db.dump command
[docs]def db_dump(event):
if not event.args: return
objs = sorted(event.selected(event), key=lambda x: x.get_timed())
try: event.reply(str(objs[event.index]))
except:
for obj in objs: event.reply(str(obj))
kernel.register("db.dump", db_dump)
## db.first command
[docs]def db_first(event):
if not event.args: return
obj = kernel.first(event.args[0])
if obj: event.reply(obj.json())
kernel.register("db.first", db_first)
## db.last command
[docs]def db_last(event):
if not event.args: return
obj = kernel.last(event.args[0])
if obj: event.reply(obj.json())
kernel.register("db.last", db_last)
## db.show command
[docs]def db_show(event):
if not event.args: return
objs = sorted(kernel.selected(event), key=lambda x: x.get_timed())
event.reply_list(objs, skiplist=["saved", "ctype", "prefile"])
kernel.register("db.show", db_show)
## db.text command
[docs]def db_text(event):
if not event.args: return
rest = event.rest()
objs = sorted(kernel.full(event), key=lambda x: x.get_timed())
nr = 1
for obj in objs:
if obj.search(rest): event.reply("\n" + obj.format(nr, skiplist=["saved"])) ; nr += 1
kernel.register("db.text", db_text)