Fork me on GitHub
Release:
Date:
0.3.2
Aug 10, 2011
Flattr Flacsync

Table Of Contents

Download

Get latest source archive,
flacsync-0.3.2.tar.gz, or install with:

pip install flacsync --upgrade --user

Found a Bug?

Fill out a report on the issue tracker.

Source code for flacsync.encoder

#  Copyright 2009, Patrick C. McGinty

#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.

#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.

#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
   flacsync.encoder
   ~~~~~~~~~~~~~~~~

   Define interface to encoders available for processing FLAC files.
"""

import os
import subprocess as sp
import tempfile
import Image

from . import util

__author__ = 'Patrick C. McGinty'
__email__ = 'flacsync@tuxcoder.com'


NULL = file('/dev/null')
# list of album covers, in preferential order
COVERS = ['cover.jpg', 'folder.jpg', 'front.jpg', 'album.jpg']
THUMBSIZE = 250,250


#############################################################################
class _Encoder(object):
   # dimensions of cover thumbnails in pixels
   def __init__( self, src, ext, base_dir, dest_dir ):
      super( _Encoder, self).__init__()
      self.src = src
      self.dst = util.fname(src, base_dir, dest_dir, ext)
      self.cover = self._get_cover() or None

   def skip_encode( self ):
      "Return 'True' if entire enocde step can be skipped"
      encode = util.newer(self.src, self.dst)
      cover  = self.cover and util.newer(self.cover, self.dst)
      return not (encode or cover)

   def _get_cover( self ):
      root,_,files = os.walk( os.path.dirname(self.src)).next()
      try:
         match = (f for f in files for c in COVERS if f==c).next()
         return os.path.join(root,match)
      except StopIteration:
         pass

   def _pre_encode( self ):
      try:
         os.makedirs( os.path.dirname(self.dst) )
      except OSError: pass  # ignore if dir already exists

   def _rg_to_soundcheck( self, replay_gain ):
      """Return the soundcheck hex string converted from a replay_gain float
      value."""
      if replay_gain is None:
         return None
      rg_f = float( replay_gain.split()[0])
      sc = 1000 * pow(10,(-rg_f/10.0))
      return ' '.join(["%08X" % (sc,)]*10)

   def _cover_thumbnail( self ):
      assert self.cover    # cover must be valid
      im = Image.open( self.cover)
      im.thumbnail( THUMBSIZE)
      ofile = tempfile.NamedTemporaryFile()
      im.save( ofile.name, "JPEG")
      return ofile

   @staticmethod
   def _check_err( err, msg ):
      if err:
         print msg, err
         return False
      else:
         return True


#############################################################################
[docs]class AacEncoder( _Encoder ): def __init__( self, aac_q, **kwargs ): super( AacEncoder, self).__init__( ext='.m4a', **kwargs) assert type(aac_q) == str, "q value is: %s" % (aac_q,) self.q = aac_q
[docs] def encode( self, force=False ): """Return True if (re)encoding occured and no errors, False otherwise""" if force or util.newer( self.src, self.dst): self._pre_encode() # encode to AAC err = sp.call( 'flac -d "%s" -c -s | neroAacEnc -q %s -if - -of "%s"' % (self.src, self.q, self.dst), shell=True, stderr=NULL) if err == -2: # keyboard interrupt os.remove(self.dst) # clean-up partial file raise KeyboardInterrupt return self._check_err( err, "AAC encoder failed:" ) else: return False
[docs] def tag( self, tags ): # aac tags, matches order of FLAC_TAGS aac_fields = { 'artist':tags['artist'], 'title':tags['title'], 'album':tags['album'], 'year':tags['year'], 'track':tags['track'], 'genre':tags['genre'], 'comment':tags['comment'], 'disc':tags['disc'], 'totaltracks':tags['totaltracks'], } aac_fields = dict((k,v) for k,v in aac_fields.items() if v) user_fields = { 'writer':tags['composer'], 'replaygain_track_gain':tags['rg_track_gain'], 'replaygain_track_peak':tags['rg_track_peak'], 'replaygain_album_gain':tags['rg_album_gain'], 'replaygain_album_peak':tags['rg_album_peak'], } user_fields = dict((k,v) for k,v in user_fields.items() if v) # tag AAC file sc_val = self._rg_to_soundcheck(tags['rg_track_gain']) cmd = ['-meta-user:iTunNORM="%s"' % (sc_val,)] cmd.extend('-meta:%s="%s"'%(x,y) for x,y in aac_fields.items()) cmd.extend('-meta-user:%s="%s"'%(x,y) for x,y in user_fields.items()) err = sp.call( 'neroAacTag "%s" %s' % (self.dst,' '.join(cmd)), shell=True, stderr=NULL) return self._check_err( err, "AAC tag failed:" )
[docs] def set_cover( self, force=False ): if self.cover and (force or util.newer(self.cover,self.dst)): tmp_cover = self._cover_thumbnail() err = sp.call( 'neroAacTag "%s" -remove-cover:all -add-cover:front:"%s"' % (self.dst, tmp_cover.name,), shell=True, stderr=NULL) return self._check_err( err, "AAC add-cover failed:" ) #############################################################################
import base64 import struct
[docs]class OggEncoder( _Encoder ): def __init__( self, ogg_q, **kwargs ): super( OggEncoder, self).__init__( ext='.ogg', **kwargs) assert type(ogg_q) == str, "q value is: %s" % (ogg_q,) self.q = ogg_q
[docs] def encode( self, force=False ): if force or util.newer( self.src, self.dst): self._pre_encode() # encode to OGG err = sp.call( 'oggenc -q %s -o "%s" "%s"' % (self.q, self.dst, self.src), shell=True, stderr=NULL) if err == -2: # keyboard interrupt os.remove(self.dst) # clean-up partial file raise KeyboardInterrupt return self._check_err( err, "OGG encoder failed:" ) else: return False # no-op, since tags are automatically updated during encoding
[docs] def tag( self, tags): return True # see http://flac.sourceforge.net/format.html#metadata_block_picture # for more details regarding embedded vorbis pictures
[docs] def set_cover( self, force=False ): # define METADATA_BLOCK_PICTURE binary structure # int: Picture type, 0-20 (3=cover front) # int: Length of MIME type string in bytes # string: MIME type string # int: length of description in bytes # string: picture description # int: picture width, pixels # int: picture height, pixels # int: color depth # int: number of colors in index, 0 for non-indexeed pic # int: length of picture data in bytes # string: binary picture data pic_block_t = "=2I %ds I %ds 5I %ds" mime = 'image/jpeg' description = "album cover" if self.cover and (force or util.newer(self.cover,self.dst)): tmp_cover = self._cover_thumbnail() bin_cover = tmp_cover.read() meta_block = struct.pack( pic_block_t % (len(mime), len(description), len(bin_cover)), 3, len(mime), mime, len(description), description, THUMBSIZE[0], THUMBSIZE[1], 24, 0, len(bin_cover), bin_cover) meta_block = base64.b64encode(meta_block) err = sp.call( 'vorbiscomment -a -t "META_BLOCK_PICTURE=%s" "%s"' % (meta_block, self.dst), shell=True, stderr=NULL) return self._check_err( err, "OGG add-cover failed:" )