summaryrefslogtreecommitdiff
path: root/codec.py
blob: 47167dcd517dfd97608d449a0c6f6b624d2e690b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import subprocess, os, cuesheet
from config import config

decoders = {}
encoders = {}

class DecoderMeta(type):
	def __init__(cls, name, bases, attrs):
		if not name in ('Decoder',):
			if cls.test_exists():
				decoders[cls.decoder_name] = cls

class Decoder(object):
	__metaclass__ = DecoderMeta

class EncoderMeta(type):
	def __init__(cls, name, bases, attrs):
		if not name in ('Encoder',):
			if cls.test_exists():
				encoders[cls.encoder_name] = cls

class Encoder(object):
	__metaclass__ = EncoderMeta

def test_executable(name):
	@staticmethod
	def do_test():
		exists = True
		devnull = open('/dev/null', 'a+')
		try:
			subprocess.Popen([name], stdout = devnull, stderr = devnull, close_fds = True)
		except OSError:
			exists = False
		devnull.close()
		return exists
	return do_test

class FFmpeg(Decoder):
	decoder_name = 'ffmpeg'

	def __init__(self, source, destination):
		self.source = source
		self.destination = destination

	@staticmethod
	def probe(source):
		'''
		Calls ffprobe to test wether ffmpeg supports this file.
		'''
		devnull = open('/dev/null', 'a+')
		p = subprocess.Popen(['ffprobe', source], stdout = devnull, stderr = devnull, close_fds = True)
		ret = p.wait()
		return ret == 0

	test_exists = test_executable('ffmpeg')

	def decode(self, **kwargs):
		cmd = 'ffmpeg -loglevel quiet'.split()
		if 'start_time' in kwargs and kwargs['start_time']:
			cmd += ['-ss', str(kwargs['start_time'])]
		if 'end_time' in kwargs and kwargs['end_time']:
			cmd += ['-t', str(kwargs['end_time'] - kwargs['start_time'])]
		cmd += ['-i', self.source, '-y', self.destination]
		devnull = open('/dev/null', 'a+')
		p = subprocess.Popen(cmd, stderr = devnull, close_fds = True)
		p.wait()

class Ogg(Encoder):
	encoder_name = 'ogg'
	extension = '.ogg'

	def __init__(self, source, destination):
		self.source = source
		self.destination = destination

	test_exists = test_executable('oggenc')

	def encode(self):
		cmd = ['oggenc', '-Q', self.source, '-o', self.destination]
		subprocess.call(cmd)

class RecoderError(Exception): pass
class DecoderNotFoundError(Exception): pass
class EncoderNotFoundError(Exception): pass

class Recoder(object):
	def __init__(self, source, encoder, track = None, destination = None):
		if track:
			cue = cuesheet.Cuesheet(source)
			source = os.path.join(os.path.dirname(source), cue.info[0].file[0])
			track = cue.tracks[track-1]
			self.start_time = track.get_start_time()
			track = cue.get_next(track)
			self.end_time = track.get_start_time() if track else None
		else:
			self.start_time, self.end_time = None, None
		# TODO: Python 3 breakage (must be str)
		if isinstance(encoder, basestring):
			if not encoder in encoders:
				raise EncoderNotFoundError('Encoder "%s" not found (%s).' % (encoder, ', '.join(encoders.keys())))
			encoder = encoders[encoder]

		self.dec_source = source
		# Boldly assume all decoders can convert to wave format.
		if destination:
			self.dec_destination = os.path.splitext(destination)[0] + '.wav'
		else:
			self.dec_destination = os.path.join(config.get('cache_dir'), os.path.splitext(os.path.basename(source))[0] + '.wav')
		self.enc_source = self.dec_destination
		if destination:
			self.enc_destination = os.path.splitext(destination)[0] + encoder.extension
		else:
			self.enc_destination = os.path.splitext(self.dec_destination)[0] + encoder.extension

		self.decoder = None
		for decoder in decoders.itervalues():
			if decoder.probe(self.dec_source):
				self.decoder = decoder(self.dec_source, self.dec_destination)
				break
		if not self.decoder:
			raise DecoderNotFoundError('No decoder found for source file "%s".' % self.dec_source)
		self.encoder = encoder(self.enc_source, self.enc_destination)

	def recode(self):
		self.decoder.decode(start_time = self.start_time, end_time = self.end_time)
		self.encoder.encode()
		os.unlink(self.dec_destination)

if not os.path.exists(config.get('cache_dir')):
	os.makedirs(config.get('cache_dir'))

if __name__ == '__main__':
	import sys, optparse

	parser = optparse.OptionParser()
	parser.add_option('-e', '--encoder', help = 'Encoder to use, must be one of: ' + ', '.join(encoders.keys()))
	options, args = parser.parse_args()

	if not options.encoder:
		parser.print_help()
		sys.exit(1)

	for f in args:
		r = Recoder(f, options.encoder)
		r.recode()