1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import subprocess, os
from config import config
decoders = {}
encoders = {}
class DecoderMeta(type):
def __init__(cls, name, bases, attrs):
if not name in ('Decoder',):
if cls.test_exists():
decoders[cls.decoder_name] = cls
class Decoder(object):
__metaclass__ = DecoderMeta
class EncoderMeta(type):
def __init__(cls, name, bases, attrs):
if not name in ('Encoder',):
if cls.test_exists():
encoders[cls.encoder_name] = cls
class Encoder(object):
__metaclass__ = EncoderMeta
def test_executable(name):
@staticmethod
def do_test():
exists = True
devnull = open('/dev/null', 'a+')
try:
subprocess.Popen([name], stdout = devnull, stderr = devnull, close_fds = True)
except OSError:
exists = False
devnull.close()
return exists
return do_test
class FFmpeg(Decoder):
decoder_name = 'ffmpeg'
def __init__(self, source, destination):
self.source = source
self.destination = destination
@staticmethod
def probe(source):
'''
Calls ffprobe to test wether ffmpeg supports this file.
'''
devnull = open('/dev/null', 'a+')
p = subprocess.Popen(['ffprobe', source], stdout = devnull, stderr = devnull, close_fds = True)
ret = p.wait()
return ret == 0
test_exists = test_executable('ffmpeg')
def decode(self):
cmd = 'ffmpeg -loglevel quiet'.split()
cmd += ['-i', self.source, '-y', self.destination]
devnull = open('/dev/null', 'a+')
p = subprocess.Popen(cmd, stderr = devnull, close_fds = True)
p.wait()
class Ogg(Encoder):
encoder_name = 'ogg'
extension = '.ogg'
def __init__(self, source, destination):
self.source = source
self.destination = destination
test_exists = test_executable('oggenc')
def encode(self):
cmd = ['oggenc', '-Q', self.source, '-o', self.destination]
subprocess.call(cmd)
class RecoderError(Exception): pass
class DecoderNotFoundError(Exception): pass
class EncoderNotFoundError(Exception): pass
class Recoder(object):
def __init__(self, source, encoder):
# TODO: Python 3 breakage (must be str)
if isinstance(encoder, basestring):
if not encoder in encoders:
raise EncoderNotFoundError('Encoder "%s" not found (%s).' % (encoder, ', '.join(encoders.keys())))
encoder = encoders[encoder]
self.dec_source = source
# Boldly assume all decoders can convert to wave format.
self.dec_destination = os.path.join(config.get('cache_dir'), os.path.splitext(os.path.basename(source))[0] + '.wav')
self.enc_source = self.dec_destination
self.enc_destination = os.path.splitext(self.dec_destination)[0] + encoder.extension
self.decoder = None
for decoder in decoders.itervalues():
if decoder.probe(self.dec_source):
self.decoder = decoder(self.dec_source, self.dec_destination)
break
if not self.decoder:
raise DecoderNotFoundError('No decoder found for source file "%s".' % self.dec_source)
self.encoder = encoder(self.enc_source, self.enc_destination)
def recode(self):
self.decoder.decode()
self.encoder.encode()
os.unlink(self.dec_destination)
if not os.path.exists(config.get('cache_dir')):
os.makedirs(config.get('cache_dir'))
if __name__ == '__main__':
import sys, optparse
parser = optparse.OptionParser()
parser.add_option('-e', '--encoder', help = 'Encoder to use, must be one of: ' + ', '.join(encoders.keys()))
options, args = parser.parse_args()
if not options.encoder:
parser.print_help()
sys.exit(1)
for f in args:
r = Recoder(f, options.encoder)
r.recode()
|