--- /dev/null
+#!/usr/bin/env python3
+
+import gi
+gi.require_version('GLib', '2.0')
+gi.require_version('ALSACtl', '0.0')
+gi.require_version('ALSARawmidi', '0.0')
+from gi.repository import GLib, ALSACtl, ALSARawmidi
+
+from sys import exit
+from signal import SIGINT
+
+# Open ALSA Rawmidi chracter device in which a pair of input/output substreams
+# is available in the same subdevice ID.
+for card_id in ALSACtl.get_card_id_list():
+ for device_id in ALSARawmidi.get_device_id_list(card_id):
+ outputs = ALSARawmidi.get_subdevice_id_list(card_id, device_id,
+ ALSARawmidi.StreamDirection.OUTPUT)
+ inputs = ALSARawmidi.get_subdevice_id_list(card_id, device_id,
+ ALSARawmidi.StreamDirection.OUTPUT)
+ for subdevice_id in outputs:
+ if subdevice_id in inputs:
+ pair = ALSARawmidi.StreamPair.new()
+ modes = (ALSARawmidi.StreamPairInfoFlag.OUTPUT |
+ ALSARawmidi.StreamPairInfoFlag.INPUT)
+ pair.open(card_id, device_id, subdevice_id, modes, 0)
+ break
+ else:
+ continue
+ break
+ else:
+ continue
+ break
+else:
+ print('ALSA Rawmidi devices not found.')
+ exit(1)
+
+# Retrieve the information of substream for both directions.
+for direction in (ALSARawmidi.StreamDirection.OUTPUT,
+ ALSARawmidi.StreamDirection.INPUT):
+ info = pair.get_substream_info(direction)
+ print('The information of {} substream:'.format(direction.value_nick))
+ print(' direction:', info.get_property('direction').value_nick)
+ for prop in ('device-id', 'subdevice-id', 'card-id', 'id', 'name',
+ 'subdevice-name', 'subdevices-count', 'subdevices-avail'):
+ print(' {}: {}'.format(prop, info.get_property(prop)))
+ print(' flags:', info.get_property('flags').value_nicks)
+
+# Configure the substream for both directions.
+params = ALSARawmidi.SubstreamParams.new()
+for direction in (ALSARawmidi.StreamDirection.OUTPUT,
+ ALSARawmidi.StreamDirection.INPUT):
+ params.set_property('buffer-size', 1024)
+ params.set_property('avail-min', 1)
+ params.set_property('active-sensing', False)
+ pair.set_substream_params(direction, params)
+ print('The parameter of {} substream.'.format(direction.value_nick))
+ for prop in ('buffer-size', 'avail-min', 'active-sensing'):
+ print(' {}: {}'.format(prop, params.get_property(prop)))
+
+# Register event handler.
+def handle_messages(pair):
+ print('Event:')
+ status = ALSARawmidi.SubstreamStatus.new()
+
+ status = pair.get_substream_status(ALSARawmidi.StreamDirection.INPUT, status)
+ avail = status.get_property('avail')
+ buf = pair.read_from_substream([0] * avail)
+ print(' {} bytes read.'.format(len(buf)))
+
+ status = pair.get_substream_status(ALSARawmidi.StreamDirection.OUTPUT, status)
+ avail = status.get_property('avail')
+ if avail < len(buf):
+ buf = buf[:avail]
+ pair.write_to_substream(buf)
+ print(' {} bytes written.'.format(len(buf)))
+pair.connect('handle-messages', handle_messages)
+
+# Create event dispatcher.
+dispatcher = GLib.MainLoop.new(None, False)
+src = pair.create_source()
+src.attach(dispatcher.get_context())
+
+# Register UNIX signal handler.
+def handle_unix_signal(dispatcher):
+ dispatcher.quit()
+GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, SIGINT,
+ handle_unix_signal, dispatcher)
+
+# Start to dispatch events.
+dispatcher.run()
+
+# Clean intermediate buffers.
+pair.drain_substream(ALSARawmidi.StreamDirection.INPUT)
+pair.drain_substream(ALSARawmidi.StreamDirection.OUTPUT)