Logo Search packages:      
Sourcecode: plone3 version File versions  Download package

GettextMessageCatalog.py

00001 """A simple implementation of a Message Catalog.

$Id: GettextMessageCatalog.py 61752 2008-03-31 16:07:04Z hannosch $
"""

from gettext import GNUTranslations
import os, sys, types, traceback
import glob
from stat import ST_MTIME

from Acquisition import aq_parent, Implicit
from DateTime import DateTime
from AccessControl import ClassSecurityInfo
from AccessControl.Permissions import view_management_screens
from Globals import InitializeClass
from Globals import INSTANCE_HOME
from Globals import package_home
import Globals
from OFS.Traversable import Traversable
from Persistence import Persistent
from App.Management import Tabs

import logging
from utils import log, make_relative_location, Registry
from msgfmt import Msgfmt

from Products.PageTemplates.PageTemplateFile import PageTemplateFile

def ptFile(id, *filename):
    if type(filename[0]) is types.DictType:
        filename = list(filename)
        filename[0] = package_home(filename[0])
    filename = os.path.join(*filename)
    if not os.path.splitext(filename)[1]:
        filename = filename + '.pt'
    return PageTemplateFile(filename, '', __name__=id)

permission = 'View management screens'

translationRegistry = Registry()
registerTranslation = translationRegistry.register
rtlRegistry = Registry()
registerRTL = rtlRegistry.register

00045 def getMessage(catalog, id, orig_text=None):
    """get message from catalog

    returns the message according to the id 'id' from the catalog 'catalog' or
    raises a KeyError if no translation was found. The return type is always
    unicode
    """
    msg = catalog.gettext(id)
    if msg is id:
        raise KeyError
    if type(msg) is types.StringType:
        msg = unicode(msg, catalog._charset)
    return msg


00060 class BrokenMessageCatalog(Persistent, Implicit, Traversable, Tabs):
    """ broken message catalog """
    meta_type = title = 'Broken Gettext Message Catalog'
    icon='p_/broken'

    isPrincipiaFolderish = 0
    isTopLevelPrincipiaApplicationObject = 0

    security = ClassSecurityInfo()
    security.declareObjectProtected(view_management_screens)

    def __init__(self, id, pofile, error):
        self._pofile = make_relative_location(pofile)
        self.id = id
        self._mod_time = self._getModTime()
        self.error = traceback.format_exception(error[0],error[1],error[2])

    # modified time helper
00078     def _getModTime(self):
        """
        """
        try:
            mtime = os.stat(self._getPoFile())[ST_MTIME]
        except (IOError, OSError):
            mtime = 0
        return mtime

00087     def getIdentifier(self):
        """
        """
        return self.id

00092     def getId(self):
        """
        """
        return self.id

    security.declareProtected(view_management_screens, 'getError')
00098     def getError(self):
        """
        """
        return self.error

00103     def _getPoFile(self):
        """get absolute path of the po file as string
        """
        prefix, pofile = self._pofile
        if prefix == 'ZOPE_HOME':
            return os.path.join(ZOPE_HOME, pofile)
        elif prefix == 'INSTANCE_HOME':
            return os.path.join(INSTANCE_HOME, pofile)
        elif prefix == 'CLIENT_HOME':
            return os.path.join(CLIENT_HOME, pofile)
        else:
            return os.path.normpath(pofile)

    security.declareProtected(view_management_screens, 'Title')
    def Title(self):
        return self.title

00120     def get_size(self):
        """Get the size of the underlying file."""
        return os.path.getsize(self._getPoFile())

00124     def reload(self, REQUEST=None):
        """ Forcibly re-read the file """
        # get pts
        pts = aq_parent(self)
        name = self.getId()
        pofile = self._getPoFile()
        pts._delObject(name)
        try: pts.addCatalog(GettextMessageCatalog(name, pofile))
        except OSError:
            # XXX TODO
            # remove a catalog if it cannot be loaded from the old location
            raise
        except:
            exc=sys.exc_info()
            log('Message Catalog has errors', logging.WARNING, name, exc)
            pts.addCatalog(BrokenMessageCatalog(name, pofile, exc))
        self = pts._getOb(name)
        if hasattr(REQUEST, 'RESPONSE'):
            if not REQUEST.form.has_key('noredir'):
                REQUEST.RESPONSE.redirect(self.absolute_url())

    security.declareProtected(view_management_screens, 'file_exists')
    def file_exists(self):
        try:
            file = open(self._getPoFile(), 'rb')
        except:
            return False
        return True

    def manage_afterAdd(self, item, container): pass
    def manage_beforeDelete(self, item, container): pass
    def manage_afterClone(self, item): pass

    manage_options = (
        {'label':'Info', 'action':''},
        )

    index_html = ptFile('index_html', globals(), 'www', 'catalog_broken')

InitializeClass(BrokenMessageCatalog)

00165 class GettextMessageCatalog(Persistent, Implicit, Traversable, Tabs):
    """
    Message catalog that wraps a .po file in the filesystem and stores
    the compiled po file in the zodb
    """
    meta_type = title = 'Gettext Message Catalog'
    icon = 'misc_/PlacelessTranslationService/GettextMessageCatalog.png'

    isPrincipiaFolderish = 0
    isTopLevelPrincipiaApplicationObject = 0

    security = ClassSecurityInfo()
    security.declareObjectProtected(view_management_screens)

00179     def __init__(self, id, pofile, language=None, domain=None):
        """Initialize the message catalog"""
        self._pofile   = make_relative_location(pofile)
        self.id        = id
        self._mod_time = self._getModTime()
        self._language = language
        self._domain   = domain
        self._prepareTranslations(0)

00188     def _prepareTranslations(self, catch=1):
        """Try to generate the translation object
           if fails remove us from registry
        """
        try: self._doPrepareTranslations()
        except:
            if self.getId() in translationRegistry.keys():
                del translationRegistry[self.getId()]
            if not catch: raise
            else: pass

00199     def _doPrepareTranslations(self):
        """Generate the translation object from a po file
        """
        self._updateFromFS()
        tro = None
        if getattr(self, '_v_tro', None) is None:
            self._v_tro = tro = translationRegistry.get(self.getId(), None)
        if tro is None:
            moFile = self._getMoFile()
            tro = GNUTranslations(moFile)
            if not self._language:
                self._language = (tro._info.get('language-code', None) # new way
                               or tro._info.get('language', None)) # old way
            if not self._domain:
                self._domain = tro._info.get('domain', None)
            if self._language is None or self._domain is None:
                raise ValueError, 'potfile %s has no metadata, PTS needs a language and a message domain!' % os.path.join(*self._pofile)
            self._language = self._language.lower().replace('_', '-')
            self._other_languages = tro._info.get('x-is-fallback-for', '').split()
            self.preferred_encodings = tro._info.get('preferred-encodings', '').split()
            self.name = unicode(tro._info.get('language-name', ''), tro._charset)
            self.default_zope_data_encoding = tro._charset

            translationRegistry[self.getId()] = self._v_tro = tro

            # right to left support
            is_rtl = tro._info.get('x-is-rtl', 'no').strip().lower()
            if is_rtl in ('yes', 'y', 'true', '1'):
                self._is_rtl = True
            elif is_rtl in ('no', 'n', 'false', '0'):
                self._is_rtl = False
            else:
                raise ValueError, 'Unsupported value for X-Is-RTL' % is_rtl
            rtlRegistry[self.getId()] = self.isRTL()

            if self.name:
                self.title = '%s language (%s) for %s' % (self._language, self.name, self._domain)
            else:
                self.title = '%s language for %s' % (self._language, self._domain)

    def filtered_manage_options(self, REQUEST=None):
        return self.manage_options

00242     def reload(self, REQUEST=None):
        """Forcibly re-read the file
        """
        if self.getId() in translationRegistry.keys():
            del translationRegistry[self.getId()]
        if hasattr(self, '_v_tro'):
            del self._v_tro
        name = self.getId()
        pts = aq_parent(self)
        pofile=self._getPoFile()
        try:
            self._prepareTranslations(0)
            log('reloading %s: %s' % (name, self.title), severity=logging.DEBUG)
        except:
            pts._delObject(name)
            exc=sys.exc_info()
            log('Message Catalog has errors', logging.WARNING, name, exc)
            pts.addCatalog(BrokenMessageCatalog(name, pofile, exc))
        self = pts._getOb(name)
        if hasattr(REQUEST, 'RESPONSE'):
            if not REQUEST.form.has_key('noredir'):
                REQUEST.RESPONSE.redirect(self.absolute_url())

    security.declarePublic('queryMessage')
00266     def queryMessage(self, id, default=None):
        """Queries the catalog for a message

        If the message wasn't found the default value or the id is returned.
        """
        try:
            return getMessage(translationRegistry[self.getId()],id,default)
        except KeyError:
            if default is None:
                default = id
            return default

00278     def getLanguage(self):
        """
        """
        return self._language

00283     def getLanguageName(self):
        """
        """
        return self.name or self._language

00288     def getOtherLanguages(self):
        """
        """
        return self._other_languages

00293     def getDomain(self):
        """
        """
        return self._domain

00298     def getIdentifier(self):
        """
        """
        return self.id

00303     def getId(self):
        """
        """
        return self.id

00308     def getInfo(self, name):
        """
        """
        self._prepareTranslations()
        return self._v_tro._info.get(name, None)
    
00314     def isRTL(self):
        """
        """
        return self._is_rtl

    security.declareProtected(view_management_screens, 'Title')
    def Title(self):
        return self.title

00323     def _getMoFile(self):
        """get compiled version of the po file as file object
        """
        useCache = True
        if useCache:
            hit, mof = cachedPoFile(self)
            return mof
        else:
            mo = Msgfmt(self._readFile(), self.getId())
            return mo.getAsFile()

00334     def _getPoFile(self):
        """get absolute path of the po file as string
        """
        prefix, pofile = self._pofile
        if prefix == 'ZOPE_HOME':
            return os.path.join(ZOPE_HOME, pofile)
        elif prefix == 'INSTANCE_HOME':
            return os.path.join(INSTANCE_HOME, pofile)
        elif prefix == 'CLIENT_HOME':
            return os.path.join(CLIENT_HOME, pofile)
        else:
            return os.path.normpath(pofile)

00347     def _readFile(self, reparse=False):
        """Read the data from the filesystem.

        """
        file = open(self._getPoFile(), 'rb')
        data = []
        try:
            # XXX need more checks here
            data = file.readlines()
        finally:
            file.close()
        return data

00360     def _updateFromFS(self):
        """Refresh our contents from the filesystem

        if the file is newer and we are running in debug mode.
        """
        if Globals.DevelopmentMode:
            mtime = self._getModTime()
            if mtime != self._mod_time:
                self._mod_time = mtime
                self.reload()

00371     def _getModTime(self):
        """
        """
        try:
            mtime = os.stat(self._getPoFile())[ST_MTIME]
        except (IOError, OSError):
            mtime = 0
        return mtime

00380     def get_size(self):
        """Get the size of the underlying file."""
        return os.path.getsize(self._getPoFile())

00384     def getModTime(self):
        """Return the last_modified date of the file we represent.

        Returns a DateTime instance.
        """
        self._updateFromFS()
        return DateTime(self._mod_time)

00392     def getObjectFSPath(self):
        """Return the path of the file we represent"""
        return self._getPoFile()

    # Zope/OFS integration

    def manage_afterAdd(self, item, container): pass
    def manage_beforeDelete(self, item, container): pass
    def manage_afterClone(self, item): pass

    manage_options = (
        {'label':'Info', 'action':''},
        {'label':'Test', 'action':'zmi_test'},
        )

    index_html = ptFile('index_html', globals(), 'www', 'catalog_info')
    zmi_test = ptFile('zmi_test', globals(), 'www', 'catalog_test')

    security.declareProtected(view_management_screens, 'file_exists')
    def file_exists(self):
        try:
            file = open(self._getPoFile(), 'rb')
        except:
            return False
        return True

    security.declareProtected(view_management_screens, 'getEncoding')
    def getEncoding(self):
        try:
            content_type = self.getHeader('content-type')
            enc = content_type.split(';')[1].strip()
            enc = enc.split('=')[1]
        except: enc='utf-8'
        return enc

    def getHeader(self, header):
        self._prepareTranslations()
        info = self._v_tro._info
        return info.get(header)

    security.declareProtected(view_management_screens, 'displayInfo')
    def displayInfo(self):
        self._prepareTranslations()
        try: info = self._v_tro._info
        except:
            # broken catalog probably
            info={}
        keys = info.keys()
        keys.sort()
        return [{'name': k, 'value': info[k]} for k in keys] + [
            {'name': 'full path', 'value': os.path.join(*self._pofile)},
            {'name': 'last modification', 'value': self.getModTime().ISO()}
            ]

InitializeClass(GettextMessageCatalog)


00449 class MoFileCache(object):
    """Cache for mo files
    """
    
    def __init__(self, path):
        if not os.path.isdir(path):
            try:
                os.makedirs(path)
            except (IOError, OSError):
                log("No permission to create directory %s" % path, logging.INFO)
                path = None
        self._path = path
        
00462     def storeMoFile(self, catalog):
        """compile and save to mo file for catalog to disk
        
        return value: mo file as file handler
        """
        f = self.getPath(catalog)
        mof = self.compilePo(catalog)
        moExists = os.path.exists(f)
        if (not moExists and os.access(self._path, os.W_OK)) \
          or (moExists and os.access(f, os.W_OK)):
            fd = open(f, 'wb')
            fd.write(mof.read()) # XXX efficient?
            fd.close()
        else:
            log("No permission to write file %s" % f, logging.INFO)
        mof.seek(0)
        return mof
        
00480     def retrieveMoFile(self, catalog):
        """Load a mo file file for a catalog from disk
        """
        f = self.getPath(catalog)
        if os.path.isfile(f):
            if os.access(f, os.R_OK):
                return open(f, 'rb')
            else:
                log("No permission to read file %s" % f, logging.INFO)
                return None
        
00491     def getPath(self, catalog):
        """Get the mo file path (cache path + file name)
        """
        id = catalog.getId()
        if id.endswith('.po'):
            id = id[:-3]
        return os.path.join(self._path, '%s.mo' % id)
        
00499     def isCacheHit(self, catalog):
        """Cache hit?
        
        True: file exists and mod time is newer than mod time of the catalog
        False: file exists but mod time is older
        None: file doesn't exist
        """
        f = self.getPath(catalog)
        ca_mtime = catalog._getModTime()
        try:
            mo_mtime = os.stat(f)[ST_MTIME]
        except (IOError, OSError):
            mo_mtime = 0
        
        if mo_mtime == 0:
            return None
        elif ca_mtime == 0:
            return None
        elif mo_mtime > ca_mtime:
            return True
        else:
            return False
        
00522     def compilePo(self, catalog):
        """compile a po file to mo
        
        returns a file handler
        """
        mo = Msgfmt(catalog._readFile(), catalog.getId())
        return mo.getAsFile()
        
00530     def cachedPoFile(self, catalog):
        """Cache a po file (public api)
        
        Returns a file handler on a mo file
        """
        path = self._path
        if path is None:
            return None, self.compilePo(catalog)
        hit = self.isCacheHit(catalog)
        if hit:
            mof = self.retrieveMoFile(catalog)
            if mof is None:
                mof = self.compilePo(catalog)
        else:
            mof = self.storeMoFile(catalog)
        return hit, mof
        
00547     def purgeCache(self):
        """Purge the cache and remove all compiled mo files
        """
        log("Purging mo file cache", logging.INFO)
        if not os.access(self._path, os.W_OK):
            log("No write permission on folder %s" % self._path, logging.INFO)
            return False
        pattern = os.path.join(self._path, '*.mo')
        for mo in glob.glob(pattern):
            if not os.access(mo, os.W_OK):
                log("No write permission on file %s" % mo, logging.INFO)
                continue
            try:
                os.unlink(mo)
            except IOError:
                log("Failed to unlink %s" % mo, logging.INFO)

_moCache = MoFileCache(os.path.join(CLIENT_HOME, 'pts'))
cachedPoFile = _moCache.cachedPoFile
purgeMoFileCache = _moCache.purgeCache

Generated by  Doxygen 1.6.0   Back to index