1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
import subprocess, os, cuesheet
from config import config
decoders = {}
encoders = {}
class DecoderMeta(type):
def __init__(cls, name, bases, attrs):
if not name in ('Decoder',):
if cls.test_exists():
decoders[cls.decoder_name] = cls
class Decoder(object):
__metaclass__ = DecoderMeta
class EncoderMeta(type):
def __init__(cls, name, bases, attrs):
if not name in ('Encoder',):
if cls.test_exists():
encoders[cls.encoder_name] = cls
class Encoder(object):
__metaclass__ = EncoderMeta
def test_executable(name):
@staticmethod
def do_test():
exists = True
devnull = open('/dev/null', 'a+')
try:
subprocess.Popen([name], stdout = devnull, stderr = devnull, close_fds = True)
except OSError:
exists = False
devnull.close()
return exists
return do_test
class FFmpeg(Decoder):
decoder_name = 'ffmpeg'
def __init__(self, source, destination):
self.source = source
self.destination = destination
@staticmethod
def probe(source):
'''
Calls ffprobe to test wether ffmpeg supports this file.
'''
devnull = open('/dev/null', 'a+')
p = subprocess.Popen(['ffprobe', source], stdout = devnull, stderr = devnull, close_fds = True)
ret = p.wait()
return ret == 0
test_exists = test_executable('ffmpeg')
def decode(self, **kwargs):
cmd = 'ffmpeg -loglevel quiet'.split()
if 'start_time' in kwargs and kwargs['start_time']:
cmd += ['-ss', str(kwargs['start_time'])]
if 'end_time' in kwargs and kwargs['end_time']:
cmd += ['-t', str(kwargs['end_time'] - kwargs['start_time'])]
cmd += ['-i', self.source, '-y', self.destination]
devnull = open('/dev/null', 'a+')
p = subprocess.Popen(cmd, stderr = devnull, close_fds = True)
p.wait()
class Ogg(Encoder):
encoder_name = 'ogg'
extension = '.ogg'
def __init__(self, source, destination):
self.source = source
self.destination = destination
test_exists = test_executable('oggenc')
def encode(self):
cmd = ['oggenc', '-Q', self.source, '-o', self.destination]
subprocess.call(cmd)
class RecoderError(Exception): pass
class DecoderNotFoundError(Exception): pass
class EncoderNotFoundError(Exception): pass
class Recoder(object):
def __init__(self, source, encoder, track = None, destination = None):
if track:
cue = cuesheet.Cuesheet(source)
source = os.path.join(os.path.dirname(source), cue.info[0].file[0])
track = cue.tracks[track-1]
self.start_time = track.get_start_time()
track = cue.get_next(track)
self.end_time = track.get_start_time() if track else None
else:
self.start_time, self.end_time = None, None
# TODO: Python 3 breakage (must be str)
if isinstance(encoder, basestring):
if not encoder in encoders:
raise EncoderNotFoundError('Encoder "%s" not found (%s).' % (encoder, ', '.join(encoders.keys())))
encoder = encoders[encoder]
self.dec_source = source
# Boldly assume all decoders can convert to wave format.
if destination:
self.dec_destination = os.path.splitext(destination)[0] + '.wav'
else:
self.dec_destination = os.path.join(config.get('cache_dir'), os.path.splitext(os.path.basename(source))[0] + '.wav')
self.enc_source = self.dec_destination
if destination:
self.enc_destination = os.path.splitext(destination)[0] + encoder.extension
else:
self.enc_destination = os.path.splitext(self.dec_destination)[0] + encoder.extension
self.decoder = None
for decoder in decoders.itervalues():
if decoder.probe(self.dec_source):
self.decoder = decoder(self.dec_source, self.dec_destination)
break
if not self.decoder:
raise DecoderNotFoundError('No decoder found for source file "%s".' % self.dec_source)
self.encoder = encoder(self.enc_source, self.enc_destination)
def recode(self):
self.decoder.decode(start_time = self.start_time, end_time = self.end_time)
self.encoder.encode()
os.unlink(self.dec_destination)
if not os.path.exists(config.get('cache_dir')):
os.makedirs(config.get('cache_dir'))
if __name__ == '__main__':
import sys, optparse
parser = optparse.OptionParser()
parser.add_option('-e', '--encoder', help = 'Encoder to use, must be one of: ' + ', '.join(encoders.keys()))
options, args = parser.parse_args()
if not options.encoder:
parser.print_help()
sys.exit(1)
for f in args:
r = Recoder(f, options.encoder)
r.recode()
|