diff --git a/README.md b/README.md index 9796db5..08f3fff 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,12 @@ OBS control from D-Bus messages -Send a D-Bus signal to perform an operation with [src/obsdc-signal], which takes a single operation name +Send a D-Bus signal to perform an operation with `src/obsdc-signal`, which takes a single operation name as an argument. ## Operations -See [src/main/kotlin/net/eksb/obsdc/Op] for a list of operations. +See [Op](src/main/kotlin/net/eksb/obsdc/Op.kt) for a list of operations. ## Configuration @@ -33,7 +33,16 @@ Configuration is in `$XDG_CONFIG_HOME/net.eksb.obsdc/config.properties` ## Code -Start reading [src/main/kotlin/net/eksb/obsdc/Main]. +Start at [Main](src/main/kotlin/net/eksb/obsdc/Main.kt). + +[Obs](src/main/kotlin/net/eksb/obsdc/Obs.kt) is a wrapper around +[obs-websocket-java](https://github.com/obs-websocket-community-projects/obs-websocket-java) +that handles reconnects and queuing operations until the websocket it ready. + +[OpRunner](src/main/kotlin/net/eksb/obsdc/OpRunner.kt) has the logic to run [Op](src/main/kotlin/net/eksb/obsdc/Op.kt)s +using an `Obs`. + +[DBus](src/main/kotlin/net/eksb/obsdc/DBus.kt) listens for the `D-Bus` signals and calls `OpRunner`. ## History/Rationale @@ -45,4 +54,4 @@ My [window manager](https://swaywm.org/) is configured to call `obsdc-signal` wi keys emitted by one knob and `PAN_RIGHT`/`PAN_LEFT` for the keys emitted by the other knob. The regular keys on the macropad are mapped to the other actions. -So now I can quickly control what people can see without having to focus (or even have visible) the OBS window. \ No newline at end of file +So now I can quickly control what people can see without having to focus (or even make visible) the OBS window. \ No newline at end of file diff --git a/src/main/kotlin/net/eksb/obsdc/DBus.kt b/src/main/kotlin/net/eksb/obsdc/DBus.kt index 1e28130..079713c 100644 --- a/src/main/kotlin/net/eksb/obsdc/DBus.kt +++ b/src/main/kotlin/net/eksb/obsdc/DBus.kt @@ -7,8 +7,6 @@ import org.freedesktop.dbus.interfaces.DBusInterface import org.freedesktop.dbus.interfaces.DBusSigHandler import org.freedesktop.dbus.messages.DBusSignal import org.slf4j.LoggerFactory -import java.util.concurrent.BlockingQueue -import java.util.concurrent.TimeUnit /** * Listen to signals on the session DBUS, and send the ops to [q]. @@ -20,7 +18,7 @@ import java.util.concurrent.TimeUnit * * `src/scripts/obsdc-signal` takes an [Op] name and sends the signal. */ -class DBus(private val q: BlockingQueue): AutoCloseable { +class DBus(private val opRunner:OpRunner): AutoCloseable { // To monitor DBUS: `dbus-monitor` // To see what is registered: `qdbus net.eksb.obsdc /` @@ -36,11 +34,7 @@ class DBus(private val q: BlockingQueue): AutoCloseable { log.debug("signal: ${signal.op}") val op = Op.valueOf(signal.op) log.debug("op: ${op}") - try { - q.offer(op, 1, TimeUnit.SECONDS) - } catch (e: InterruptedException) { - log.debug("queue offer interrupted") - } + opRunner.run(op) } } diff --git a/src/main/kotlin/net/eksb/obsdc/Gate.kt b/src/main/kotlin/net/eksb/obsdc/Gate.kt new file mode 100644 index 0000000..72cc76d --- /dev/null +++ b/src/main/kotlin/net/eksb/obsdc/Gate.kt @@ -0,0 +1,50 @@ +package net.eksb.obsdc + +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + +/** + * A Gate blocks [enter] until the gate is opened by [open]. + */ +class Gate { + + private val lock:Lock = ReentrantLock() + private val open = AtomicBoolean(false) + private val condition = lock.newCondition() + + /** + * Open the gate; allow all waiting [enter]s to run. + */ + fun open() { + lock.lock() + try { + open.set(true) + condition.signalAll() + } finally { + lock.unlock() + } + } + + /** + * Close the gate; any subsequent calls to [enter] will block until [open] is called. + */ + fun close() { + open.set(false) + } + + /** + * Enter the gate: run the specified [block] as soon as the gate is open. + */ + fun enter(block:()->R): R { + lock.lock() + try { + while(!open.get()) { + condition.await() + } + return block() + } finally { + lock.unlock() + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/net/eksb/obsdc/Main.kt b/src/main/kotlin/net/eksb/obsdc/Main.kt index 2e1bb71..6364f94 100644 --- a/src/main/kotlin/net/eksb/obsdc/Main.kt +++ b/src/main/kotlin/net/eksb/obsdc/Main.kt @@ -1,26 +1,18 @@ package net.eksb.obsdc -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue - object Main { @JvmStatic fun main(args: Array) { - // Create a queue to send ops coming in from DBUS to OBS. - val q:BlockingQueue = LinkedBlockingQueue() - val config = CONFIG_FILE.properties() - - // Listen for DBUS signals and send to q. - DBus(q) - - // Send requests to OBS. Forks a non-daemon thread. Obs( - q, host = config.getProperty("host") ?: "localhost", port = config.getProperty("port")?.toInt() ?: 4455, password = config.getProperty("password") ?: error("config missing \"password\""), connectionTimeout = config.getProperty("connectionTimeout")?.toInt() ?: 5 - ) + ).use { obs -> + DBus(OpRunner(obs)).use { dbus -> + waitForShutdown() + } + } } } \ No newline at end of file diff --git a/src/main/kotlin/net/eksb/obsdc/Obs.kt b/src/main/kotlin/net/eksb/obsdc/Obs.kt index 761df42..9f6ba6b 100644 --- a/src/main/kotlin/net/eksb/obsdc/Obs.kt +++ b/src/main/kotlin/net/eksb/obsdc/Obs.kt @@ -3,45 +3,48 @@ package net.eksb.obsdc import io.obswebsocket.community.client.OBSRemoteController import io.obswebsocket.community.client.WebSocketCloseCode import io.obswebsocket.community.client.listener.lifecycle.ReasonThrowable -import io.obswebsocket.community.client.message.request.RequestBatch -import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest -import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse -import io.obswebsocket.community.client.model.Scene -import io.obswebsocket.community.client.model.SceneItem.Transform -import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder import java.util.concurrent.BlockingQueue import java.util.concurrent.atomic.AtomicBoolean import kotlin.concurrent.thread import org.slf4j.LoggerFactory +import java.util.concurrent.LinkedBlockingQueue +import kotlin.time.Duration /** - * Send ops from [q] to OBS. + * Wrapper for an [OBSRemoteController] that handles connecting, reconnecting, and queuing operations + * until ready. + * + * Call [submit] to submit a request. * * protocol docs: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md */ class Obs( - private val q:BlockingQueue, host:String = "localhost", port:Int = 4455, password:String, connectionTimeout:Int = 5 // seconds ): AutoCloseable { - /** How much to pan. */ - private val panAmount = 50F + /** Queue of requests to run. */ + private val q:BlockingQueue = LinkedBlockingQueue() + /** Backoff on errors. */ private val backoff = Backoff() - /** Set if this is closed. */ + /** Flag to set when closed to stop queue poll loop. */ private val closed = AtomicBoolean(false) - /** Set when connected, unset when disconnected. */ + /** + * Flag to set when connected, unset when disconnected. + * Used to determine if we should reconnect on controller error. + */ private val connected = AtomicBoolean(false) - /** Set when ready, unset on error or disconnect. */ - private val ready = AtomicBoolean(false) + /** Gate to block queue poll loop when not ready. */ + private val ready:Gate = Gate() - private val controller = OBSRemoteController.builder() + /** The OBS controller. */ + val controller:OBSRemoteController = OBSRemoteController.builder() .host(host) .port(port) .password(password) @@ -61,14 +64,16 @@ class Obs( .build() init { - Runtime.getRuntime().addShutdownHook(thread(start=false) { - close() - }) + // OBSRemoteController starts a non-daemon thread. It probably should not do that. + // Kill it on shutdown. + addShutdownHook { + controller.stop() + } } private fun onClose(e:WebSocketCloseCode) { log.debug("closed: ${e.code}") - ready.set(false) + ready.close() } private fun onControllerError(e:ReasonThrowable) { @@ -85,7 +90,6 @@ class Obs( } private fun onDisconnect() { log.debug("disconnected") - ready.set(false) connected.set(false) if (! closed.get()) { backoff.backoff() @@ -96,7 +100,7 @@ class Obs( private fun onReady() { log.debug("ready") - ready.set(true) + ready.open() // The docs say that you are only supposed to send requests from the [onReady] handler, // but you cannot block the [onReady] handler. // (If you block the [onReady] handler other handlers are not called. [onDisconnect] is not called so you @@ -106,129 +110,63 @@ class Obs( } /** - * Thread that polls [q], checks if ready, and calls [op] to send requests to OBS. - * If not ready, ops are dropped. + * Thread that runs submitted requests from [q] when [ready]. */ - private val opThread = thread(name="obs-op", isDaemon=false, start=true) { - while(true) { - val op = q.take() - log.debug("got op: ${op}") - if (ready.get()) { - try { - op(op) - } catch (e:InterruptedException) { - log.debug("op thread interrupted") - break - } catch (e:Exception) { - log.error("op ${op} failed", e ) - } - } else { - // This would be way more complicated if we had to buffer ops. - log.debug("skipping op ${op} because not yet ready") - } - } - } - - /** - * Send the request to OBS for the op. - * - * When the op requires multiple chained requests, those requests are made here in response handlers, - * and this method blocks until all requests are complete. - */ - private fun op(op:Op) { - when(op) { - Op.SCENE_1 -> scene { scenes -> scenes.firstOrNull() } - Op.SCENE_2 -> scene { scenes -> scenes.asSequence().drop(1).firstOrNull() } - Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() } - Op.STUDIO_TRANSITION -> { - controller.triggerStudioModeTransition { response -> - log.debug("studio transitioned: ${response.isSuccessful}") - } - } - Op.PAN_UP -> transform { old -> positionY(old.positionY - panAmount ) } - Op.PAN_DOWN -> transform { old -> positionY(old.positionY + panAmount ) } - Op.PAN_LEFT -> transform { old -> positionX(old.positionX - panAmount ) } - Op.PAN_RIGHT -> transform { old -> positionX(old.positionX + panAmount ) } - } - } - - /** - * Select a scene from the scene list with the supplied [selector] and set the selected scene (if any) - * as the current program scene. - */ - private fun scene(selector:(List)->Scene?) { - controller.getSceneList { response -> - val scene = selector(response.scenes.sortedBy(Scene::getSceneIndex).reversed()) - log.debug("select scene ${scene?.sceneName} index:${scene?.sceneIndex}") - if (scene != null) { - controller.setCurrentProgramScene(scene.sceneName) { response -> - log.debug("selected scene ${scene.sceneName}: ${response.isSuccessful}") - } - } - } - } - - /** - * Generate a transform for the lowest non-locked item in the current program scene with the - * supplied [transformBuilder], and apply that transform to the item. - */ - private fun transform(transformBuilder:TransformBuilder.(Transform)->TransformBuilder) { - controller.getCurrentProgramScene { response -> - val sceneName = response.currentProgramSceneName - log.debug("scene name: ${sceneName}") - controller.getSceneItemList(sceneName) { response -> - val items = response.sceneItems - // Even though locked status is in the response from OBS, the library does not parse it. - // So we have to ask for it explicitly: - controller.sendRequestBatch( - RequestBatch.builder() - .requests( - response.sceneItems.map { item -> - GetSceneItemLockedRequest.builder() - .sceneName(sceneName) - .sceneItemId(item.sceneItemId) - .build() - } - ) - .build() - ) { response -> - val item = response.data.results - .map { result -> - (result.responseData as GetSceneItemLockedResponse.SpecificData).sceneItemLocked - } - .zip(items) - .asSequence() - .filter { (locked,item) -> ! locked } - .map { (locked,item) -> item } - .sortedBy { item -> item.sceneItemIndex } - .firstOrNull() - log.debug("item to pan: ${item?.sceneItemId}") - if (item != null) { - controller.getSceneItemTransform(sceneName, item.sceneItemId) { response -> - val transform = response.sceneItemTransform - log.debug("position: ${transform.positionX} x ${transform.positionY}") - val newTransform = transformBuilder(Transform.builder(), transform).build() - controller.setSceneItemTransform(sceneName, item.sceneItemId, newTransform) { response -> - log.debug("transform successful: ${response.isSuccessful}") - // Have to set the current scene to take effect if in studio mode. - controller.setCurrentProgramScene(sceneName) { response -> - log.debug("set current program to ${sceneName}: ${response.isSuccessful}") - } - } - } + private val opThread = thread(name="obs-op", isDaemon=true, start=true) { + while(!closed.get()) { + val req = q.take() + log.debug("got req: ${req}, wait for ready") + ready.enter { + log.debug("ready") + if (!req.expired()) { + try { + req.block.invoke(controller) + } catch (e:InterruptedException) { + log.debug("interrupted") + throw e + } catch (e:Exception) { + log.error("req ${req} failed", e ) } } } } + log.debug("done") + } + + /** + * Submit a request to run when ready. + * + * @param timeout If this time has elapsed before ready, do not run. Always run if null. + * @param block the request to run + */ + fun submit( + timeout:Duration? = null, + block:(OBSRemoteController)->Unit, + ) { + q.put(Req(block, timeout?.inWholeNanoseconds)) } override fun close() { log.debug("close") - controller.disconnect() + closed.set(true) opThread.interrupt() + controller.disconnect() + controller.stop() } companion object { private val log = LoggerFactory.getLogger(Obs::class.java) + + /** + * Wrap a request and keep track of timeout. + */ + private class Req( + val block:(OBSRemoteController)->Unit, + val timeout:Long?, + ) { + val submitTime = System.nanoTime() + fun expired():Boolean = timeout != null && System.nanoTime() - submitTime > timeout + } } } + diff --git a/src/main/kotlin/net/eksb/obsdc/OpRunner.kt b/src/main/kotlin/net/eksb/obsdc/OpRunner.kt new file mode 100644 index 0000000..bb9a2a9 --- /dev/null +++ b/src/main/kotlin/net/eksb/obsdc/OpRunner.kt @@ -0,0 +1,116 @@ +package net.eksb.obsdc + +import io.obswebsocket.community.client.message.request.RequestBatch +import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest +import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse +import io.obswebsocket.community.client.model.Scene +import io.obswebsocket.community.client.model.SceneItem.Transform +import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder +import org.slf4j.LoggerFactory + +/** + * Use an [obs] to run [Op]s. + * + * Call [run] to run an OBS operation. + */ +class OpRunner(private val obs:Obs) { + + /** How much to pan. */ + private val panAmount = 50F + + private val controller = obs.controller + + fun run(op:Op) { + obs.submit { controller -> + when(op) { + Op.SCENE_1 -> scene { scenes -> scenes.firstOrNull() } + Op.SCENE_2 -> scene { scenes -> scenes.asSequence().drop(1).firstOrNull() } + Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() } + Op.STUDIO_TRANSITION -> { + controller.triggerStudioModeTransition { response -> + log.debug("studio transitioned: ${response.isSuccessful}") + } + } + Op.PAN_UP -> transform { old -> positionY(old.positionY - panAmount ) } + Op.PAN_DOWN -> transform { old -> positionY(old.positionY + panAmount ) } + Op.PAN_LEFT -> transform { old -> positionX(old.positionX - panAmount ) } + Op.PAN_RIGHT -> transform { old -> positionX(old.positionX + panAmount ) } + } + } + } + + /** + * Select a scene from the scene list with the supplied [selector] and set the selected scene (if any) + * as the current program scene. + */ + private fun scene(selector:(List)->Scene?) { + controller.getSceneList { response -> + val scene = selector(response.scenes.sortedBy(Scene::getSceneIndex).reversed()) + log.debug("select scene ${scene?.sceneName} index:${scene?.sceneIndex}") + if (scene != null) { + controller.setCurrentProgramScene(scene.sceneName) { response -> + log.debug("selected scene ${scene.sceneName}: ${response.isSuccessful}") + } + } + } + } + + /** + * Generate a transform for the lowest non-locked item in the current program scene with the + * supplied [transformBuilder], and apply that transform to the item. + */ + private fun transform(transformBuilder:TransformBuilder.(Transform)->TransformBuilder) { + controller.getCurrentProgramScene { response -> + val sceneName = response.currentProgramSceneName + log.debug("scene name: ${sceneName}") + controller.getSceneItemList(sceneName) { response -> + val items = response.sceneItems + // Even though locked status is in the response from OBS, the library does not parse it. + // So we have to ask for it explicitly: + controller.sendRequestBatch( + RequestBatch.builder() + .requests( + response.sceneItems.map { item -> + GetSceneItemLockedRequest.builder() + .sceneName(sceneName) + .sceneItemId(item.sceneItemId) + .build() + } + ) + .build() + ) { response -> + val item = response.data.results + .map { result -> + (result.responseData as GetSceneItemLockedResponse.SpecificData).sceneItemLocked + } + .zip(items) + .asSequence() + .filter { (locked,item) -> ! locked } + .map { (locked,item) -> item } + .sortedBy { item -> item.sceneItemIndex } + .firstOrNull() + log.debug("item to pan: ${item?.sceneItemId}") + if (item != null) { + controller.getSceneItemTransform(sceneName, item.sceneItemId) { response -> + val transform = response.sceneItemTransform + log.debug("position: ${transform.positionX} x ${transform.positionY}") + val newTransform = transformBuilder(Transform.builder(), transform).build() + controller.setSceneItemTransform(sceneName, item.sceneItemId, newTransform) { response -> + log.debug("transform successful: ${response.isSuccessful}") + // Have to set the current scene to take effect if in studio mode. + controller.setCurrentProgramScene(sceneName) { response -> + log.debug("set current program to ${sceneName}: ${response.isSuccessful}") + } + } + } + } + } + } + } + } + + companion object { + private val log = LoggerFactory.getLogger(OpRunner::class.java) + } +} + diff --git a/src/main/kotlin/net/eksb/obsdc/Util.kt b/src/main/kotlin/net/eksb/obsdc/Util.kt index 1c08f24..15133dd 100644 --- a/src/main/kotlin/net/eksb/obsdc/Util.kt +++ b/src/main/kotlin/net/eksb/obsdc/Util.kt @@ -2,6 +2,8 @@ package net.eksb.obsdc import java.io.File import java.util.Properties +import java.util.concurrent.CountDownLatch +import kotlin.concurrent.thread val HOME:File = System.getProperty("user.home")?.let(::File) ?: error("No user.home") val CONFIG_HOME:File = System.getenv("XDG_CONFIG_HOME")?.let(::File) ?: File(HOME, ".config") @@ -12,4 +14,14 @@ fun File.properties(): Properties = Properties() if (isFile) { inputStream().use(properties::load) } - } \ No newline at end of file + } + +fun addShutdownHook(block:()->Unit) = Runtime.getRuntime().addShutdownHook(thread(start=false) { block() }) + +fun waitForShutdown() { + val latch = CountDownLatch(1) + addShutdownHook { + latch.countDown() + } + latch.await() +} \ No newline at end of file diff --git a/src/main/resources/simplelogger.properties b/src/main/resources/simplelogger.properties new file mode 100644 index 0000000..a5749f8 --- /dev/null +++ b/src/main/resources/simplelogger.properties @@ -0,0 +1 @@ +org.slf4j.simpleLogger.log.net.eksb=debug