Level.py

From XPUB & Lens-Based wiki

File:Level.zip

import pygst
pygst.require("0.10")
import gst, gobject
import time, thread

class Level:

	def __init__(self):
		self.min = None
		self.max = None
		self.slider = 0.0

		# Create a gstreamer pipeline, listen to it's messages via on_message callback
		listener_desc = 'alsasrc ! level ! fakesink'
		self.listener = gst.parse_launch(listener_desc)
		bus = self.listener.get_bus()
		bus.add_signal_watch()
		bus.connect("message", self.on_message)

	def on_message (self, bus, message):
		""" callback function for gstreamer messages """
		# print "message.name", s.get_name()
		s = message.structure
		# print "message.structure.keys", s.keys()
		if s and s.get_name() == "level":
			# The Level module seems to provide the following data: 'endtime', 'timestamp', 'stream-time', 'running-time', 'duration', 'rms', 'peak', 'decay'
			# In this case we will just look at the "rms" value (representing relative loudness of the signal)
			# It's a stereo (two-channel) value, so we just take the first one (left?)
			loudness = s['rms'][0]

			# update min and max
			if (self.min==None or loudness<self.min):
				self.min = loudness
			if (self.max==None or loudness>self.max):
				self.max = loudness
			
			# calculate "slider" (a 0 - 1 value, from min to max)
			if (self.min != self.max):
				self.slider = float(loudness - self.min) / (self.max - self.min)
				# if self.slider < 0.25: self.slider = 0
			print self.slider

		return True

	def start(self):
		self.listener.set_state(gst.STATE_PLAYING)
		while True:
			time.sleep(1)


# Run the Level object in it's own thread, then start the gobject.MainLoop as required
level = Level()
thread.start_new_thread(level.start, ())
gobject.threads_init()
loop = gobject.MainLoop()
loop.run()

PyGame Visualisation

To run, we pipe the output of the above, into the PyGame script below (nb -u for unbuffered python in/out):

python -u level.py | python -u level_display.py
#!/usr/bin/python

import sys
import pygame
from pygame.locals import *
from pygame.time import Clock
from time import sleep
import thread

def main():
	(mx, my, mw, mh) = (10, 10, 50, 50)
	pygame.init()
	# screen = pygame.display.set_mode((640, 480), FULLSCREEN, 32)
	screen = pygame.display.set_mode((640, 480), 0, 32)
	pygame.display.set_caption("level")
	clock = Clock()
	level = 0.0
	while True:
		# TIMING
		clock.tick(30)
		
		# PROCESS EVENTS
		for event in pygame.event.get():
			if event.type==QUIT or \
			(event.type == KEYDOWN and event.key == K_ESCAPE):
				sys.exit()
			elif event.type == KEYDOWN and event.key == K_f:
				pygame.display.toggle_fullscreen()
			elif event.type == USEREVENT:
				mdata = event.data
				level = event.data[0]
				# print mdata

		c = int(255 * level)	
		screen.fill((c, c, c))
		# if mfullimage: screen.blit(subimage, (mx, my))
		
		pygame.display.update()

# Start the PyGame display in it's own thread...
thread.start_new_thread(main, ())

# ... and then READ DATA FROM STDIN, sending data to PyGame as "events"
print "Press ctrl-c to stop..."
while 1:
	line = sys.stdin.readline()
	if (not line): break
	line = line.strip()
	try:
		if line.startswith("data:"):
			data = line.split()[1:]
			data = [float(x) for x in data] # parse data as floats
			# create a custom "event" and send it (post it) to the pygame loop
			evt = pygame.event.Event(USEREVENT, {"data" : data})
			pygame.event.post(evt)
	except ValueError:
		print "bad data", line