audioop.py 16 KB


  1. import sys
  2. import builtins
  3. import math
  4. import struct
  5. from math import gcd
  6. from _audioop_cffi import ffi, lib
  7. BIG_ENDIAN = sys.byteorder != 'little'
  8. _buffer = memoryview
  9. class error(Exception):
  10. pass
  11. def _check_size(size):
  12. if size < 1 or size > 4:
  13. raise error("Size should be 1, 2, 3 or 4")
  14. def _check_params(length, size):
  15. _check_size(size)
  16. if length % size != 0:
  17. raise error("not a whole number of frames")
  18. def _check_state(state):
  19. if state is None:
  20. valpred = 0
  21. index = 0
  22. else:
  23. valpred, index = state
  24. if not (-0x8000 <= valpred < 0x8000 and 0 <= index < 89):
  25. raise ValueError("bad state")
  26. return (valpred, index)
  27. def _sample_count(cp, size):
  28. return len(cp) // size
  29. def _get_samples(cp, size, signed=True):
  30. for i in range(_sample_count(cp, size)):
  31. yield _get_sample(cp, size, i, signed)
  32. def _struct_format(size, signed):
  33. if size == 1:
  34. return "b" if signed else "B"
  35. elif size == 2:
  36. return "h" if signed else "H"
  37. elif size == 3:
  38. raise NotImplementedError
  39. elif size == 4:
  40. return "i" if signed else "I"
  41. def _unpack_int24(buf):
  42. if BIG_ENDIAN:
  43. val = (buf[2] & 0xff) | \
  44. ((buf[1] & 0xff) << 8) | \
  45. ((buf[0] & 0xff) << 16)
  46. else:
  47. val = (buf[0] & 0xff) | \
  48. ((buf[1] & 0xff) << 8) | \
  49. ((buf[2] & 0xff) << 16)
  50. if val & 0x800000:
  51. val = val - 0x1000000
  52. return val
  53. def _pack_int24(into, off, val):
  54. buf = _buffer(into)
  55. if BIG_ENDIAN:
  56. buf[off+2] = val & 0xff
  57. buf[off+1] = (val >> 8) & 0xff
  58. buf[off+0] = (val >> 16) & 0xff
  59. else:
  60. buf[off+0] = val & 0xff
  61. buf[off+1] = (val >> 8) & 0xff
  62. buf[off+2] = (val >> 16) & 0xff
  63. def _get_sample(cp, size, i, signed=True):
  64. start = i * size
  65. end = start + size
  66. chars = _buffer(cp)[start:end]
  67. if size == 3:
  68. return _unpack_int24(chars)
  69. fmt = _struct_format(size, signed)
  70. return struct.unpack_from(fmt, chars)[0]
  71. def _put_sample(cp, size, i, val, signed=True):
  72. if size == 3:
  73. _pack_int24(cp, i*size, val)
  74. return
  75. fmt = _struct_format(size, signed)
  76. struct.pack_into(fmt, cp, i * size, val)
  77. def _get_maxval(size, signed=True):
  78. if signed and size == 1:
  79. return 0x7f
  80. elif size == 1:
  81. return 0xff
  82. elif signed and size == 2:
  83. return 0x7fff
  84. elif size == 2:
  85. return 0xffff
  86. elif signed and size == 3:
  87. return 0x7fffff
  88. elif size == 3:
  89. return 0xffffff
  90. elif signed and size == 4:
  91. return 0x7fffffff
  92. elif size == 4:
  93. return 0xffffffff
  94. def _get_minval(size, signed=True):
  95. if not signed:
  96. return 0
  97. elif size == 1:
  98. return -0x80
  99. elif size == 2:
  100. return -0x8000
  101. elif size == 3:
  102. return -0x800000
  103. elif size == 4:
  104. return -0x80000000
  105. def _get_clipfn(size, signed=True):
  106. maxval = _get_maxval(size, signed)
  107. minval = _get_minval(size, signed)
  108. return lambda val: builtins.max(min(val, maxval), minval)
  109. def _overflow(val, size, signed=True):
  110. minval = _get_minval(size, signed)
  111. maxval = _get_maxval(size, signed)
  112. if minval <= val <= maxval:
  113. return val
  114. bits = size * 8
  115. if signed:
  116. offset = 2**(bits-1)
  117. return ((val + offset) % (2**bits)) - offset
  118. else:
  119. return val % (2**bits)
  120. def _check_bytes(cp):
  121. # we have no argument clinic
  122. try:
  123. memoryview(cp)
  124. except TypeError:
  125. raise TypeError("a bytes-like object is required, not '%s'" % \
  126. str(type(cp)))
  127. def getsample(cp, size, i):
  128. # _check_bytes checked in _get_sample
  129. _check_params(len(cp), size)
  130. if not (0 <= i < len(cp) // size):
  131. raise error("Index out of range")
  132. return _get_sample(cp, size, i)
  133. def max(cp, size):
  134. _check_params(len(cp), size)
  135. if len(cp) == 0:
  136. return 0
  137. return builtins.max(abs(sample) for sample in _get_samples(cp, size))
  138. def minmax(cp, size):
  139. _check_params(len(cp), size)
  140. min_sample, max_sample = 0x7fffffff, -0x80000000
  141. for sample in _get_samples(cp, size):
  142. max_sample = builtins.max(sample, max_sample)
  143. min_sample = builtins.min(sample, min_sample)
  144. return min_sample, max_sample
  145. def avg(cp, size):
  146. _check_params(len(cp), size)
  147. sample_count = _sample_count(cp, size)
  148. if sample_count == 0:
  149. return 0
  150. return sum(_get_samples(cp, size)) // sample_count
  151. def rms(cp, size):
  152. _check_params(len(cp), size)
  153. sample_count = _sample_count(cp, size)
  154. if sample_count == 0:
  155. return 0
  156. sum_squares = sum(sample**2 for sample in _get_samples(cp, size))
  157. return int(math.sqrt(sum_squares // sample_count))
  158. def _sum2(cp1, cp2, length):
  159. size = 2
  160. return sum(getsample(cp1, size, i) * getsample(cp2, size, i)
  161. for i in range(length)) + 0.0
  162. def findfit(cp1, cp2):
  163. size = 2
  164. if len(cp1) % 2 != 0 or len(cp2) % 2 != 0:
  165. raise error("Strings should be even-sized")
  166. if len(cp1) < len(cp2):
  167. raise error("First sample should be longer")
  168. len1 = _sample_count(cp1, size)
  169. len2 = _sample_count(cp2, size)
  170. sum_ri_2 = _sum2(cp2, cp2, len2)
  171. sum_aij_2 = _sum2(cp1, cp1, len2)
  172. sum_aij_ri = _sum2(cp1, cp2, len2)
  173. result = (sum_ri_2 * sum_aij_2 - sum_aij_ri * sum_aij_ri) / sum_aij_2
  174. best_result = result
  175. best_i = 0
  176. for i in range(1, len1 - len2 + 1):
  177. aj_m1 = _get_sample(cp1, size, i - 1)
  178. aj_lm1 = _get_sample(cp1, size, i + len2 - 1)
  179. sum_aij_2 += aj_lm1**2 - aj_m1**2
  180. sum_aij_ri = _sum2(_buffer(cp1)[i*size:], cp2, len2)
  181. result = (sum_ri_2 * sum_aij_2 - sum_aij_ri * sum_aij_ri) / sum_aij_2
  182. if result < best_result:
  183. best_result = result
  184. best_i = i
  185. factor = _sum2(_buffer(cp1)[best_i*size:], cp2, len2) / sum_ri_2
  186. return best_i, factor
  187. def findfactor(cp1, cp2):
  188. size = 2
  189. if len(cp1) % 2 != 0:
  190. raise error("Strings should be even-sized")
  191. if len(cp1) != len(cp2):
  192. raise error("Samples should be same size")
  193. sample_count = _sample_count(cp1, size)
  194. sum_ri_2 = _sum2(cp2, cp2, sample_count)
  195. sum_aij_ri = _sum2(cp1, cp2, sample_count)
  196. return sum_aij_ri / sum_ri_2
  197. def findmax(cp, len2):
  198. size = 2
  199. sample_count = _sample_count(cp, size)
  200. if len(cp) % 2 != 0:
  201. raise error("Strings should be even-sized")
  202. if len2 < 0 or sample_count < len2:
  203. raise error("Input sample should be longer")
  204. if sample_count == 0:
  205. return 0
  206. result = _sum2(cp, cp, len2)
  207. best_result = result
  208. best_i = 0
  209. for i in range(1, sample_count - len2 + 1):
  210. sample_leaving_window = getsample(cp, size, i - 1)
  211. sample_entering_window = getsample(cp, size, i + len2 - 1)
  212. result -= sample_leaving_window**2
  213. result += sample_entering_window**2
  214. if result > best_result:
  215. best_result = result
  216. best_i = i
  217. return best_i
  218. def avgpp(cp, size):
  219. _check_bytes(cp)
  220. _check_params(len(cp), size)
  221. sample_count = _sample_count(cp, size)
  222. if sample_count <= 2:
  223. return 0
  224. prevextremevalid = False
  225. prevextreme = None
  226. avg = 0
  227. nextreme = 0
  228. prevval = getsample(cp, size, 0)
  229. val = getsample(cp, size, 1)
  230. prevdiff = val - prevval
  231. for i in range(1, sample_count):
  232. val = getsample(cp, size, i)
  233. diff = val - prevval
  234. if diff * prevdiff < 0:
  235. if prevextremevalid:
  236. avg += abs(prevval - prevextreme)
  237. nextreme += 1
  238. prevextremevalid = True
  239. prevextreme = prevval
  240. prevval = val
  241. if diff != 0:
  242. prevdiff = diff
  243. if nextreme == 0:
  244. return 0
  245. return avg // nextreme
  246. def maxpp(cp, size):
  247. _check_params(len(cp), size)
  248. sample_count = _sample_count(cp, size)
  249. if sample_count <= 1:
  250. return 0
  251. prevextremevalid = False
  252. prevextreme = None
  253. max = 0
  254. prevval = getsample(cp, size, 0)
  255. val = getsample(cp, size, 1)
  256. prevdiff = val - prevval
  257. for i in range(1, sample_count):
  258. val = getsample(cp, size, i)
  259. diff = val - prevval
  260. if diff * prevdiff < 0:
  261. if prevextremevalid:
  262. extremediff = abs(prevval - prevextreme)
  263. if extremediff > max:
  264. max = extremediff
  265. prevextremevalid = True
  266. prevextreme = prevval
  267. prevval = val
  268. if diff != 0:
  269. prevdiff = diff
  270. return max
  271. def cross(cp, size):
  272. _check_params(len(cp), size)
  273. crossings = -1
  274. last_sample = 17
  275. for sample in _get_samples(cp, size):
  276. sample = sample < 0
  277. if sample != last_sample:
  278. crossings += 1
  279. last_sample = sample
  280. return crossings
  281. def mul(cp, size, factor):
  282. _check_params(len(cp), size)
  283. clip = _get_clipfn(size)
  284. rv = ffi.new("unsigned char[]", len(cp))
  285. result = ffi.buffer(rv)
  286. for i, sample in enumerate(_get_samples(cp, size)):
  287. sample = clip(int(sample * factor))
  288. _put_sample(result, size, i, sample)
  289. return result[:]
  290. def tomono(cp, size, fac1, fac2):
  291. _check_params(len(cp), size)
  292. clip = _get_clipfn(size)
  293. sample_count = _sample_count(cp, size)
  294. rv = ffi.new("unsigned char[]", len(cp) // 2)
  295. result = ffi.buffer(rv)
  296. for i in range(0, sample_count, 2):
  297. l_sample = getsample(cp, size, i)
  298. r_sample = getsample(cp, size, i + 1)
  299. sample = (l_sample * fac1) + (r_sample * fac2)
  300. sample = int(clip(sample))
  301. _put_sample(result, size, i // 2, sample)
  302. return result[:]
  303. def tostereo(cp, size, fac1, fac2):
  304. _check_params(len(cp), size)
  305. sample_count = _sample_count(cp, size)
  306. rv = ffi.new("char[]", len(cp) * 2)
  307. lib.tostereo(rv, ffi.from_buffer(cp), len(cp), size, fac1, fac2)
  308. return ffi.buffer(rv)[:]
  309. def add(cp1, cp2, size):
  310. _check_params(len(cp1), size)
  311. if len(cp1) != len(cp2):
  312. raise error("Lengths should be the same")
  313. rv = ffi.new("char[]", len(cp1))
  314. lib.add(rv, ffi.from_buffer(cp1), ffi.from_buffer(cp2), len(cp1), size)
  315. return ffi.buffer(rv)[:]
  316. def bias(cp, size, bias):
  317. _check_params(len(cp), size)
  318. rv = ffi.new("unsigned char[]", len(cp))
  319. result = ffi.buffer(rv)
  320. for i, sample in enumerate(_get_samples(cp, size)):
  321. sample = _overflow(sample + bias, size)
  322. _put_sample(result, size, i, sample)
  323. return result[:]
  324. def reverse(cp, size):
  325. _check_params(len(cp), size)
  326. sample_count = _sample_count(cp, size)
  327. rv = ffi.new("unsigned char[]", len(cp))
  328. result = ffi.buffer(rv)
  329. for i, sample in enumerate(_get_samples(cp, size)):
  330. _put_sample(result, size, sample_count - i - 1, sample)
  331. return result[:]
  332. def lin2lin(cp, size, size2):
  333. _check_bytes(cp)
  334. _check_params(len(cp), size)
  335. _check_size(size2)
  336. if size == size2:
  337. return cp
  338. new_len = (len(cp) // size) * size2
  339. rv = ffi.new("unsigned char[]", new_len)
  340. result = ffi.buffer(rv)
  341. for i in range(_sample_count(cp, size)):
  342. sample = _get_sample(cp, size, i)
  343. if size == 1:
  344. sample <<= 24
  345. elif size == 2:
  346. sample <<= 16
  347. elif size == 3:
  348. sample <<= 8
  349. if size2 == 1:
  350. sample >>= 24
  351. elif size2 == 2:
  352. sample >>= 16
  353. elif size2 == 3:
  354. sample >>= 8
  355. sample = _overflow(sample, size2)
  356. _put_sample(result, size2, i, sample)
  357. return result[:]
  358. def ratecv(cp, size, nchannels, inrate, outrate, state, weightA=1, weightB=0):
  359. _check_params(len(cp), size)
  360. if nchannels < 1:
  361. raise error("# of channels should be >= 1")
  362. bytes_per_frame = size * nchannels
  363. frame_count = len(cp) // bytes_per_frame
  364. if bytes_per_frame // nchannels != size:
  365. raise OverflowError("width * nchannels too big for a C int")
  366. if weightA < 1 or weightB < 0:
  367. raise error("weightA should be >= 1, weightB should be >= 0")
  368. if len(cp) % bytes_per_frame != 0:
  369. raise error("not a whole number of frames")
  370. if inrate <= 0 or outrate <= 0:
  371. raise error("sampling rate not > 0")
  372. d = gcd(inrate, outrate)
  373. inrate //= d
  374. outrate //= d
  375. d = gcd(weightA, weightB)
  376. weightA //= d
  377. weightB //= d
  378. if state is None:
  379. d = -outrate
  380. prev_i = ffi.new('int[]', nchannels)
  381. cur_i = ffi.new('int[]', nchannels)
  382. else:
  383. d, samps = state
  384. if len(samps) != nchannels:
  385. raise error("illegal state argument")
  386. prev_i, cur_i = zip(*samps)
  387. prev_i = ffi.new('int[]', prev_i)
  388. cur_i = ffi.new('int[]', cur_i)
  389. state_d = ffi.new('int[]', (d,))
  390. q = frame_count // inrate
  391. ceiling = (q + 1) * outrate
  392. nbytes = ceiling * bytes_per_frame
  393. rv = ffi.new("char[]", nbytes)
  394. cpbuf = ffi.from_buffer(cp)
  395. trim_index = lib.ratecv(rv, cpbuf, frame_count, size,
  396. nchannels, inrate, outrate,
  397. state_d, prev_i, cur_i,
  398. weightA, weightB)
  399. result = ffi.buffer(rv)[:trim_index]
  400. d = state_d[0]
  401. samps = zip(prev_i, cur_i)
  402. return (result, (d, tuple(samps)))
  403. def _get_lin_samples(cp, size):
  404. for sample in _get_samples(cp, size):
  405. if size == 1:
  406. yield sample << 8
  407. elif size == 2:
  408. yield sample
  409. elif size == 3:
  410. yield sample >> 8
  411. elif size == 4:
  412. yield sample >> 16
  413. def _put_lin_sample(result, size, i, sample):
  414. if size == 1:
  415. sample >>= 8
  416. elif size == 2:
  417. pass
  418. elif size == 3:
  419. sample <<= 8
  420. elif size == 4:
  421. sample <<= 16
  422. _put_sample(result, size, i, sample)
  423. def lin2ulaw(cp, size):
  424. _check_params(len(cp), size)
  425. rv = ffi.new("unsigned char[]", _sample_count(cp, size))
  426. for i, sample in enumerate(_get_lin_samples(cp, size)):
  427. rv[i] = lib.st_14linear2ulaw(sample)
  428. return ffi.buffer(rv)[:]
  429. def ulaw2lin(cp, size):
  430. _check_size(size)
  431. rv = ffi.new("unsigned char[]", len(cp) * size)
  432. result = ffi.buffer(rv)
  433. for i, value in enumerate(cp):
  434. sample = lib.st_ulaw2linear16(value)
  435. _put_lin_sample(result, size, i, sample)
  436. return result[:]
  437. def lin2alaw(cp, size):
  438. _check_params(len(cp), size)
  439. rv = ffi.new("unsigned char[]", _sample_count(cp, size))
  440. for i, sample in enumerate(_get_lin_samples(cp, size)):
  441. rv[i] = lib.st_linear2alaw(sample)
  442. return ffi.buffer(rv)[:]
  443. def alaw2lin(cp, size):
  444. _check_size(size)
  445. rv = ffi.new("unsigned char[]", len(cp) * size)
  446. result = ffi.buffer(rv)
  447. for i, value in enumerate(cp):
  448. sample = lib.st_alaw2linear16(value)
  449. _put_lin_sample(result, size, i, sample)
  450. return result[:]
  451. def lin2adpcm(cp, size, state):
  452. _check_params(len(cp), size)
  453. state = _check_state(state)
  454. rv = ffi.new("unsigned char[]", len(cp) // size // 2)
  455. state_ptr = ffi.new("int[]", state)
  456. cpbuf = ffi.cast("unsigned char*", ffi.from_buffer(cp))
  457. lib.lin2adcpm(rv, cpbuf, len(cp), size, state_ptr)
  458. return ffi.buffer(rv)[:], tuple(state_ptr)
  459. def adpcm2lin(cp, size, state):
  460. _check_size(size)
  461. state = _check_state(state)
  462. rv = ffi.new("unsigned char[]", len(cp) * size * 2)
  463. state_ptr = ffi.new("int[]", state)
  464. cpbuf = ffi.cast("unsigned char*", ffi.from_buffer(cp))
  465. lib.adcpm2lin(rv, cpbuf, len(cp), size, state_ptr)
  466. return ffi.buffer(rv)[:], tuple(state_ptr)
  467. def byteswap(cp, size):
  468. if len(cp) % size != 0:
  469. raise error("not a whole number of frames")
  470. sample_count = _sample_count(cp, size)
  471. rv = ffi.new("unsigned char[]", len(cp))
  472. base = size
  473. next_bump = 0
  474. bump = 2 * size
  475. for i in range(len(cp)):
  476. base -= 1
  477. rv[i] = cp[base]
  478. if base == next_bump:
  479. base += bump
  480. next_bump += size
  481. return ffi.buffer(rv)[:]