Gstspectrum

From XPUB & Lens-Based wiki

usage

usage: gstspectrum [-h] [--freq FREQ] [--interval INTERVAL] [--bands BANDS]
                   path

Dumps spectral analysis of audio using gstreamer

positional arguments:
  path                 audio filename to analyze (wav format only)

optional arguments:
  -h, --help           show this help message and exit
  --freq FREQ          audio frequency
  --interval INTERVAL  interval duration (in seconds)
  --bands BANDS        number of bands to split the spectrum

source

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import gst, pygst, struct, sys, argparse

parser = argparse.ArgumentParser(description="Dumps spectral analysis of audio using gstreamer")
parser.add_argument('--freq', default=44100, type=int, help="audio frequency")
parser.add_argument('--interval', default=0.1, type=float, help="interval duration (in seconds)")
parser.add_argument('--bands', default=128, type=int, help="number of bands to split the spectrum")
parser.add_argument('path', help="audio filename to analyze (wav format only)")

args = parser.parse_args()
inpath = args.path
AUDIOFREQ = args.freq          # MAX FREQ OF ANALYSIS
INTERVAL_SECS = args.interval
INTERVAL = int(INTERVAL_SECS * 10**9) # DURATION OF "FRAMES" in nanoseconds / audio chunks to process
BANDS = args.bands # 128                 # SPLIT THE SPECTRUM UP INTO THIS MANY BANDS
NANOS = float(10**9)
GST_MESSAGE_EOS = (1 << 0)   # couldn't find this in gst package (wtf!)

# alternative pipeline for live analysis
# listener_desc = 'autoaudiosrc ! spectrum interval={1} bands={2} ! fakesink'
listener_desc = 'filesrc location="{0}" ! wavparse ! spectrum interval={1} bands={2} ! fakesink'
listener_desc = listener_desc.format(inpath, INTERVAL, BANDS)

# (pmin, pmax) = (None, None)
def listener_on_message (bus, message, data):
    global pmin, pmax
    s = message.structure
    if s:
        msgname = s.get_name()
        if msgname == "spectrum":
            secs = s['timestamp'] / NANOS
            print ("%0.1f-%0.1f\t"%(secs, secs + INTERVAL_SECS)) + ("\t".join([str(x) for x in s['magnitude']]))
    else:
        # print dir(message), message, message.type
        if message.type & GST_MESSAGE_EOS:
            listener.set_state(gst.STATE_NULL)
            sys.exit(0)
    return True

freqs = [] 
for i in range(BANDS):
    freqs.append( ((AUDIOFREQ / 2.0) * i + AUDIOFREQ / 4.0) / BANDS )

# SIMPLE HEADERS
#print "time\t", "\t".join([str(x) for x in freqs])
# RANGE HEADERS
labels = []
for i in range(BANDS):
    if i == 0:
        fromfreq = 0.0
    else:
        fromfreq = freqs[i-1]
    labels.append("%0.1f-%0.1f" % (fromfreq, freqs[i]))
print "time\t", "\t".join(labels)

listener = gst.parse_launch(listener_desc)
listener.get_bus().add_watch(listener_on_message, None)
 
import gobject
mainloop = gobject.MainLoop()
listener.set_state(gst.STATE_PLAYING)
 
try:
    mainloop.run()
except: # an interruption from Ctrl-C
    pass