--- /dev/null
+#!/usr/bin/env python3
+
+import gi
+gi.require_version('GLib', '2.0')
+gi.require_version('ALSATimer', '0.0')
+gi.require_version('ALSASeq', '0.0')
+from gi.repository import GLib, ALSATimer, ALSASeq
+
+from signal import SIGINT
+
+# Register my client.
+client = ALSASeq.UserClient.new()
+client.open(0)
+
+# Register client information.
+info = ALSASeq.ClientInfo.new()
+info.set_property('name', 'focal')
+client.set_info(info)
+
+# Dump client information.
+info = client.get_info(info)
+print('The information of client:')
+print(' type: {}'.format(info.get_property('type').value_nick))
+print(' filter-attributes: {}'.format(info.get_property('filter-attributes').value_nicks))
+for prop in ('client-id', 'name', 'use-filter', 'port-count', 'lost-count',
+ 'card-id', 'process-id'):
+ print(' {}: {}'.format(prop, info.get_property(prop)))
+
+# Add my port.
+info = ALSASeq.PortInfo.new()
+info.set_property('name', 'fossa')
+caps = (ALSASeq.PortCapFlag.READ |
+ ALSASeq.PortCapFlag.WRITE |
+ ALSASeq.PortCapFlag.DUPLEX |
+ ALSASeq.PortCapFlag.SUBS_READ |
+ ALSASeq.PortCapFlag.SUBS_WRITE)
+info.set_property('caps', caps)
+attrs = (ALSASeq.PortAttrFlag.MIDI_GENERIC |
+ ALSASeq.PortAttrFlag.SOFTWARE |
+ ALSASeq.PortAttrFlag.APPLICATION)
+info.set_property('attrs', attrs)
+info = client.create_port(info)
+
+# Dump port information.
+print('The information of port:')
+for prop in ('name', 'midi-channels', 'midi-voices', 'synth-voices',
+ 'read-users', 'write-users', 'timestamp-overwrite', 'queue-id'):
+ print(' {}: {}'.format(prop, info.get_property(prop)))
+for prop in ('caps', 'attrs'):
+ print(' {}: {}'.format(prop, info.get_property(prop).value_nicks))
+addr = info.get_property('addr')
+print(' addr:', addr.get_client_id(), addr.get_port_id())
+port_id = addr.get_port_id()
+
+# Register my queue.
+info = ALSASeq.QueueInfo.new()
+info.set_property('name', '20.04')
+info.set_property('client-id', client.get_property('client-id'))
+info.set_property('locked', True)
+info = client.create_queue(info)
+
+# Dump queue information.
+print('The information of queue:')
+for prop in ('name', 'queue-id', 'client-id', 'locked'):
+ print(' {}: {}'.format(prop, info.get_property(prop)))
+queue_id = info.get_property('queue-id')
+
+# Prepare two events; one to start the queue, another to deliver on the queue.
+ev_cntr = ALSASeq.EventCntr.new(2)
+
+ev_cntr.set_event_type(0, ALSASeq.EventType.START)
+ev_cntr.set_tstamp_mode(0, ALSASeq.EventTimestampMode.REAL)
+ev_cntr.set_time_mode(0, ALSASeq.EventTimeMode.REL)
+ev_cntr.set_priority_mode(0, ALSASeq.EventPriorityMode.NORMAL)
+ev_cntr.set_tag(0, 0)
+ev_cntr.set_queue_id(0, ALSASeq.SpecificQueueId.DIRECT)
+ev_cntr.set_dst(0, ALSASeq.Addr.new(ALSASeq.SpecificClientId.SYSTEM,
+ ALSASeq.SpecificPortId.TIMER));
+ev_cntr.set_src(0, ALSASeq.Addr.new(client.get_property('client-id'), 0));
+data = ev_cntr.get_queue_data(0)
+data.set_queue_id(queue_id)
+ev_cntr.set_queue_data(0, data)
+
+ev_cntr.set_event_type(1, ALSASeq.EventType.CONTROLLER)
+ev_cntr.set_tstamp_mode(1, ALSASeq.EventTimestampMode.REAL)
+ev_cntr.set_time_mode(1, ALSASeq.EventTimeMode.REL)
+ev_cntr.set_priority_mode(1, ALSASeq.EventPriorityMode.NORMAL)
+ev_cntr.set_tag(1, 10)
+ev_cntr.set_queue_id(1, queue_id)
+ev_cntr.set_dst(1, ALSASeq.Addr.new(client.get_property('client-id'), 0));
+ev_cntr.set_src(1, ALSASeq.Addr.new(client.get_property('client-id'), 0));
+
+# Schedule the events.
+client.schedule_event(ev_cntr, 2)
+
+# Register event handler.
+def handle_event(client, ev_cntr):
+ count = ev_cntr.count_events()
+ print('{} events:'.format(count))
+ for i in range(count):
+ print(' Event {}:'.format(i))
+ print(' type:', ev_cntr.get_event_type(i).value_nick)
+ print(' tstamp-mode:', ev_cntr.get_tstamp_mode(i).value_nick)
+ print(' time-mode:', ev_cntr.get_time_mode(i).value_nick)
+ print(' length-mode:', ev_cntr.get_length_mode(i).value_nick)
+ print(' priority-mode:', ev_cntr.get_priority_mode(i).value_nick)
+ print(' tag:', ev_cntr.get_tag(i))
+ print(' queue-id:', ev_cntr.get_queue_id(i))
+ tstamp = ev_cntr.get_tstamp(i)
+ if ev_cntr.get_tstamp_mode(i) == ALSASeq.EventTimestampMode.TICK:
+ print(' tstamp:', tstamp.get_tick_time())
+ else:
+ print(' tstamp:', tstamp.get_real_time())
+ dst = ev_cntr.get_dst(i)
+ print(' dst:', dst.get_client_id(), dst.get_port_id())
+ src = ev_cntr.get_src(i)
+ print(' src:', src.get_client_id(), src.get_port_id())
+client.connect('handle-event', handle_event)
+
+# Create event dispatcher.
+dispatcher = GLib.MainLoop.new(None, False)
+src = client.create_source()
+src.attach(dispatcher.get_context())
+
+# Register UNIX signal handler.
+def handle_unix_signal(dispatcher):
+ dispatcher.quit()
+GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, SIGINT,
+ handle_unix_signal, dispatcher)
+
+# Start to dispatch events.
+dispatcher.run()
+
+# Stop the queue.
+ev_cntr.set_event_type(0, ALSASeq.EventType.STOP)
+client.schedule_event(ev_cntr, 1)
+
+# Delete the queue.
+client.delete_queue(queue_id)
+
+# Delete the port
+client.delete_port(port_id)