# -*- mode: python; coding: utf-8 -*- import os, sys import logging logging.getLogger().setLevel(logging.DEBUG) logger = logging.getLogger('db') from sqlobject import * import zope.interface as interface import exc import config from model import * from skin_base import Skin import inspector_base as ibase class AlreadyRegistered (exc.TPException): pass import util TOPIC_SETUP=1 dbTopic = util.SyncObservable(topics=[TOPIC_SETUP]) def register_domain(domain_cls, methods, module=None): try: dbTopic.notify(('domain', domain_cls.name)) domain = DomainDef(id=domain_cls.name, module=module, cls=domain_cls) except dberrors.DuplicateEntryError: #logger.error("Domain '%s' already registered" % domain_cls.name) logging.warning("Updating domain '%s'" % domain_cls.name) domain = DomainDef.get(domain_cls.name) domain.set(cls = domain_cls) for m in methods: try: m = MethodDef(id=m) domain.addMethodDef(m) except dberrors.DuplicateEntryError: logging.info("Method '%s' already registered" % domain_cls.name) def register_inspector(domain_name, inspector_cls, module=None): try: d = DomainDef.get(domain_name) except IndexError: logger.error("Domain '%s' not registered" % domain_name) return try: dbTopic.notify(('inspector', inspector_cls.name)) idef = InspectorDef(id = inspector_cls.name, module = module, flags = inspector_cls.flags, domain = d, cls = inspector_cls) except dberrors.DuplicateEntryError: #logger.error("Inspector '%s' already registered" % inspector_cls.name) logging.warning("Updating inspector '%s'" % inspector_cls.name) idef = InspectorDef.get(inspector_cls.name) idef.set(flags = inspector_cls.flags, domain = d, cls = inspector_cls) new_ifaces = [x.getName() for x in interface.implementedBy(inspector_cls)] current_ifaces = [x.id for x in idef.ifaces] for iface in set(new_ifaces) - set(current_ifaces): try: i_iface = InspectorIface.get(iface) except SQLObjectNotFound: i_iface = InspectorIface(id=iface) idef.addInspectorIface(i_iface) logger.info("New inspector[%s] '%s'" % \ ('MA'[int(bool(inspector_cls.flags &\ ibase.Inspector.AUTO))], inspector_cls.name)) def register_skin(skin_cls, module=None): #if SkinDef.select(SkinDef.q.name==skin_cls.name).count(): # raise AlreadyRegistered("Skin '%s' already registered" % skin_cls.name) try: dbTopic.notify(('skin', skin_cls.name)) s = SkinDef(id=skin_cls.name, module=module, cls=skin_cls) except dberrors.DuplicateEntryError: logging.warning("Updating skin '%s'" % skin_cls.name) s = SkinDef.get(skin_cls.name) s.set(module=module, cls=skin_cls) current_names = [x.id for x in s.can_render_names] for name in skin_cls.can_render_names: if name in current_names: continue #print 'añadiendo', name try: i_name = InspectorName.get(name) except SQLObjectNotFound: i_name = InspectorName(id=name) s.addInspectorName(i_name) current_ifaces = [x.id for x in s.can_render_ifaces] for iface in skin_cls.can_render_ifaces: if iface.getName() in current_ifaces: continue #print 'añadiendo', iface try: i_iface = InspectorIface.get(iface.getName()) except SQLObjectNotFound: i_iface = InspectorIface(id=iface.getName()) s.addInspectorIface(i_iface) logger.info("New skin ('%s') for %s %s" % (skin_cls.name, skin_cls.can_render_names, skin_cls.can_render_ifaces)) # Querys def get_all_domains(): return [x.cls for x in DomainDef.select()] def get_domain(method): #FIXME:Podría haber varios dominios con soporte para ese método try: return MethodDef.get(method).domains[0].cls() except SQLObjectNotFound: raise exc.NoSuchMethod("The method '%s' does NOT exist" % method) def get_inspectors_by_domain(domain): try: return [x.cls for x in DomainDef.get(domain.name).inspectors] except ImportError: logger.error("There is a problem with factory cache. Please, clean the cache") return [] def get_inspector_by_name(name): try: return InspectorDef.get(name).cls except SQLObjectNotFound: raise exc.InspectorNotFound("Inspector %s not available" % name) def get_skins_by_name(name): "Return all skins (classes) for the given inspector name" try: return [x.cls for x in InspectorName.get(name).skins] except SQLObjectNotFound: return [] def get_skins_by_iface(iface): "Return all skins (classes) for the given inspector iface" try: return [x.cls for x in InspectorIface.get(iface).skins] except SQLObjectNotFound: return [] def available_skins(inspector_cls): skin_clss = get_skins_by_name(inspector_cls.name) for iface in interface.implementedBy(inspector_cls): skin_clss += get_skins_by_iface(iface.getName()) auto = [] manual = [] mini = [] for skin in skin_clss: if skin.flags & Skin.AUTO: auto.append(skin) else: manual.append(skin) if skin.flags & Skin.MINI: mini.append(skin) return auto, manual, mini ## introspection querys (do NOT use for factory purposes) def get_DomainDefs(): return list(DomainDef.select()) def get_MethodDefs(): return list(MethodDef.select()) def get_InspectorDefs(): return list(InspectorDef.select()) def get_SkinDefs(): return list(SkinDef.select()) def get_PluginDefs(): return list(PluginDef.select()) def connect(): sqlhub.processConnection = connectionForURI('sqlite://%s' % config.DBFILE) for table in [SkinDef, InspectorName, InspectorIface, InspectorDef, DomainDef, MethodDef, PluginDef]: table.createTable(ifNotExists=True)