Separate OBS retry/queue logic (Obs) from ops (OpRunner).

This commit is contained in:
2023-10-22 10:45:04 -04:00
parent 917af288ab
commit ee173f7eae
8 changed files with 269 additions and 157 deletions

View File

@@ -2,12 +2,12 @@
OBS control from D-Bus messages OBS control from D-Bus messages
Send a D-Bus signal to perform an operation with [src/obsdc-signal], which takes a single operation name Send a D-Bus signal to perform an operation with `src/obsdc-signal`, which takes a single operation name
as an argument. as an argument.
## Operations ## Operations
See [src/main/kotlin/net/eksb/obsdc/Op] for a list of operations. See [Op](src/main/kotlin/net/eksb/obsdc/Op.kt) for a list of operations.
## Configuration ## Configuration
@@ -33,7 +33,16 @@ Configuration is in `$XDG_CONFIG_HOME/net.eksb.obsdc/config.properties`
## Code ## Code
Start reading [src/main/kotlin/net/eksb/obsdc/Main]. Start at [Main](src/main/kotlin/net/eksb/obsdc/Main.kt).
[Obs](src/main/kotlin/net/eksb/obsdc/Obs.kt) is a wrapper around
[obs-websocket-java](https://github.com/obs-websocket-community-projects/obs-websocket-java)
that handles reconnects and queuing operations until the websocket it ready.
[OpRunner](src/main/kotlin/net/eksb/obsdc/OpRunner.kt) has the logic to run [Op](src/main/kotlin/net/eksb/obsdc/Op.kt)s
using an `Obs`.
[DBus](src/main/kotlin/net/eksb/obsdc/DBus.kt) listens for the `D-Bus` signals and calls `OpRunner`.
## History/Rationale ## History/Rationale
@@ -45,4 +54,4 @@ My [window manager](https://swaywm.org/) is configured to call `obsdc-signal` wi
keys emitted by one knob and `PAN_RIGHT`/`PAN_LEFT` for the keys emitted by the other knob. keys emitted by one knob and `PAN_RIGHT`/`PAN_LEFT` for the keys emitted by the other knob.
The regular keys on the macropad are mapped to the other actions. The regular keys on the macropad are mapped to the other actions.
So now I can quickly control what people can see without having to focus (or even have visible) the OBS window. So now I can quickly control what people can see without having to focus (or even make visible) the OBS window.

View File

@@ -7,8 +7,6 @@ import org.freedesktop.dbus.interfaces.DBusInterface
import org.freedesktop.dbus.interfaces.DBusSigHandler import org.freedesktop.dbus.interfaces.DBusSigHandler
import org.freedesktop.dbus.messages.DBusSignal import org.freedesktop.dbus.messages.DBusSignal
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.util.concurrent.BlockingQueue
import java.util.concurrent.TimeUnit
/** /**
* Listen to signals on the session DBUS, and send the ops to [q]. * Listen to signals on the session DBUS, and send the ops to [q].
@@ -20,7 +18,7 @@ import java.util.concurrent.TimeUnit
* *
* `src/scripts/obsdc-signal` takes an [Op] name and sends the signal. * `src/scripts/obsdc-signal` takes an [Op] name and sends the signal.
*/ */
class DBus(private val q: BlockingQueue<Op>): AutoCloseable { class DBus(private val opRunner:OpRunner): AutoCloseable {
// To monitor DBUS: `dbus-monitor` // To monitor DBUS: `dbus-monitor`
// To see what is registered: `qdbus net.eksb.obsdc /` // To see what is registered: `qdbus net.eksb.obsdc /`
@@ -36,11 +34,7 @@ class DBus(private val q: BlockingQueue<Op>): AutoCloseable {
log.debug("signal: ${signal.op}") log.debug("signal: ${signal.op}")
val op = Op.valueOf(signal.op) val op = Op.valueOf(signal.op)
log.debug("op: ${op}") log.debug("op: ${op}")
try { opRunner.run(op)
q.offer(op, 1, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
log.debug("queue offer interrupted")
}
} }
} }

View File

@@ -0,0 +1,50 @@
package net.eksb.obsdc
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
/**
* A Gate blocks [enter] until the gate is opened by [open].
*/
class Gate {
private val lock:Lock = ReentrantLock()
private val open = AtomicBoolean(false)
private val condition = lock.newCondition()
/**
* Open the gate; allow all waiting [enter]s to run.
*/
fun open() {
lock.lock()
try {
open.set(true)
condition.signalAll()
} finally {
lock.unlock()
}
}
/**
* Close the gate; any subsequent calls to [enter] will block until [open] is called.
*/
fun close() {
open.set(false)
}
/**
* Enter the gate: run the specified [block] as soon as the gate is open.
*/
fun <R> enter(block:()->R): R {
lock.lock()
try {
while(!open.get()) {
condition.await()
}
return block()
} finally {
lock.unlock()
}
}
}

View File

@@ -1,26 +1,18 @@
package net.eksb.obsdc package net.eksb.obsdc
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
object Main { object Main {
@JvmStatic @JvmStatic
fun main(args: Array<String>) { fun main(args: Array<String>) {
// Create a queue to send ops coming in from DBUS to OBS.
val q:BlockingQueue<Op> = LinkedBlockingQueue()
val config = CONFIG_FILE.properties() val config = CONFIG_FILE.properties()
// Listen for DBUS signals and send to q.
DBus(q)
// Send requests to OBS. Forks a non-daemon thread.
Obs( Obs(
q,
host = config.getProperty("host") ?: "localhost", host = config.getProperty("host") ?: "localhost",
port = config.getProperty("port")?.toInt() ?: 4455, port = config.getProperty("port")?.toInt() ?: 4455,
password = config.getProperty("password") ?: error("config missing \"password\""), password = config.getProperty("password") ?: error("config missing \"password\""),
connectionTimeout = config.getProperty("connectionTimeout")?.toInt() ?: 5 connectionTimeout = config.getProperty("connectionTimeout")?.toInt() ?: 5
) ).use { obs ->
DBus(OpRunner(obs)).use { dbus ->
waitForShutdown()
}
}
} }
} }

View File

@@ -3,45 +3,48 @@ package net.eksb.obsdc
import io.obswebsocket.community.client.OBSRemoteController import io.obswebsocket.community.client.OBSRemoteController
import io.obswebsocket.community.client.WebSocketCloseCode import io.obswebsocket.community.client.WebSocketCloseCode
import io.obswebsocket.community.client.listener.lifecycle.ReasonThrowable import io.obswebsocket.community.client.listener.lifecycle.ReasonThrowable
import io.obswebsocket.community.client.message.request.RequestBatch
import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest
import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse
import io.obswebsocket.community.client.model.Scene
import io.obswebsocket.community.client.model.SceneItem.Transform
import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder
import java.util.concurrent.BlockingQueue import java.util.concurrent.BlockingQueue
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread import kotlin.concurrent.thread
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.util.concurrent.LinkedBlockingQueue
import kotlin.time.Duration
/** /**
* Send ops from [q] to OBS. * Wrapper for an [OBSRemoteController] that handles connecting, reconnecting, and queuing operations
* until ready.
*
* Call [submit] to submit a request.
* *
* protocol docs: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md * protocol docs: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md
*/ */
class Obs( class Obs(
private val q:BlockingQueue<Op>,
host:String = "localhost", host:String = "localhost",
port:Int = 4455, port:Int = 4455,
password:String, password:String,
connectionTimeout:Int = 5 // seconds connectionTimeout:Int = 5 // seconds
): AutoCloseable { ): AutoCloseable {
/** How much to pan. */ /** Queue of requests to run. */
private val panAmount = 50F private val q:BlockingQueue<Req> = LinkedBlockingQueue()
/** Backoff on errors. */
private val backoff = Backoff() private val backoff = Backoff()
/** Set if this is closed. */ /** Flag to set when closed to stop queue poll loop. */
private val closed = AtomicBoolean(false) private val closed = AtomicBoolean(false)
/** Set when connected, unset when disconnected. */ /**
* Flag to set when connected, unset when disconnected.
* Used to determine if we should reconnect on controller error.
*/
private val connected = AtomicBoolean(false) private val connected = AtomicBoolean(false)
/** Set when ready, unset on error or disconnect. */ /** Gate to block queue poll loop when not ready. */
private val ready = AtomicBoolean(false) private val ready:Gate = Gate()
private val controller = OBSRemoteController.builder() /** The OBS controller. */
val controller:OBSRemoteController = OBSRemoteController.builder()
.host(host) .host(host)
.port(port) .port(port)
.password(password) .password(password)
@@ -61,14 +64,16 @@ class Obs(
.build() .build()
init { init {
Runtime.getRuntime().addShutdownHook(thread(start=false) { // OBSRemoteController starts a non-daemon thread. It probably should not do that.
close() // Kill it on shutdown.
}) addShutdownHook {
controller.stop()
}
} }
private fun onClose(e:WebSocketCloseCode) { private fun onClose(e:WebSocketCloseCode) {
log.debug("closed: ${e.code}") log.debug("closed: ${e.code}")
ready.set(false) ready.close()
} }
private fun onControllerError(e:ReasonThrowable) { private fun onControllerError(e:ReasonThrowable) {
@@ -85,7 +90,6 @@ class Obs(
} }
private fun onDisconnect() { private fun onDisconnect() {
log.debug("disconnected") log.debug("disconnected")
ready.set(false)
connected.set(false) connected.set(false)
if (! closed.get()) { if (! closed.get()) {
backoff.backoff() backoff.backoff()
@@ -96,7 +100,7 @@ class Obs(
private fun onReady() { private fun onReady() {
log.debug("ready") log.debug("ready")
ready.set(true) ready.open()
// The docs say that you are only supposed to send requests from the [onReady] handler, // The docs say that you are only supposed to send requests from the [onReady] handler,
// but you cannot block the [onReady] handler. // but you cannot block the [onReady] handler.
// (If you block the [onReady] handler other handlers are not called. [onDisconnect] is not called so you // (If you block the [onReady] handler other handlers are not called. [onDisconnect] is not called so you
@@ -106,129 +110,63 @@ class Obs(
} }
/** /**
* Thread that polls [q], checks if ready, and calls [op] to send requests to OBS. * Thread that runs submitted requests from [q] when [ready].
* If not ready, ops are dropped.
*/ */
private val opThread = thread(name="obs-op", isDaemon=false, start=true) { private val opThread = thread(name="obs-op", isDaemon=true, start=true) {
while(true) { while(!closed.get()) {
val op = q.take() val req = q.take()
log.debug("got op: ${op}") log.debug("got req: ${req}, wait for ready")
if (ready.get()) { ready.enter {
log.debug("ready")
if (!req.expired()) {
try { try {
op(op) req.block.invoke(controller)
} catch (e:InterruptedException) { } catch (e:InterruptedException) {
log.debug("op thread interrupted") log.debug("interrupted")
break throw e
} catch (e:Exception) { } catch (e:Exception) {
log.error("op ${op} failed", e ) log.error("req ${req} failed", e )
}
} else {
// This would be way more complicated if we had to buffer ops.
log.debug("skipping op ${op} because not yet ready")
} }
} }
} }
}
log.debug("done")
}
/** /**
* Send the request to OBS for the op. * Submit a request to run when ready.
* *
* When the op requires multiple chained requests, those requests are made here in response handlers, * @param timeout If this time has elapsed before ready, do not run. Always run if null.
* and this method blocks until all requests are complete. * @param block the request to run
*/ */
private fun op(op:Op) { fun submit(
when(op) { timeout:Duration? = null,
Op.SCENE_1 -> scene { scenes -> scenes.firstOrNull() } block:(OBSRemoteController)->Unit,
Op.SCENE_2 -> scene { scenes -> scenes.asSequence().drop(1).firstOrNull() } ) {
Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() } q.put(Req(block, timeout?.inWholeNanoseconds))
Op.STUDIO_TRANSITION -> {
controller.triggerStudioModeTransition { response ->
log.debug("studio transitioned: ${response.isSuccessful}")
}
}
Op.PAN_UP -> transform { old -> positionY(old.positionY - panAmount ) }
Op.PAN_DOWN -> transform { old -> positionY(old.positionY + panAmount ) }
Op.PAN_LEFT -> transform { old -> positionX(old.positionX - panAmount ) }
Op.PAN_RIGHT -> transform { old -> positionX(old.positionX + panAmount ) }
}
}
/**
* Select a scene from the scene list with the supplied [selector] and set the selected scene (if any)
* as the current program scene.
*/
private fun scene(selector:(List<Scene>)->Scene?) {
controller.getSceneList { response ->
val scene = selector(response.scenes.sortedBy(Scene::getSceneIndex).reversed())
log.debug("select scene ${scene?.sceneName} index:${scene?.sceneIndex}")
if (scene != null) {
controller.setCurrentProgramScene(scene.sceneName) { response ->
log.debug("selected scene ${scene.sceneName}: ${response.isSuccessful}")
}
}
}
}
/**
* Generate a transform for the lowest non-locked item in the current program scene with the
* supplied [transformBuilder], and apply that transform to the item.
*/
private fun transform(transformBuilder:TransformBuilder.(Transform)->TransformBuilder) {
controller.getCurrentProgramScene { response ->
val sceneName = response.currentProgramSceneName
log.debug("scene name: ${sceneName}")
controller.getSceneItemList(sceneName) { response ->
val items = response.sceneItems
// Even though locked status is in the response from OBS, the library does not parse it.
// So we have to ask for it explicitly:
controller.sendRequestBatch(
RequestBatch.builder()
.requests(
response.sceneItems.map { item ->
GetSceneItemLockedRequest.builder()
.sceneName(sceneName)
.sceneItemId(item.sceneItemId)
.build()
}
)
.build()
) { response ->
val item = response.data.results
.map { result ->
(result.responseData as GetSceneItemLockedResponse.SpecificData).sceneItemLocked
}
.zip(items)
.asSequence()
.filter { (locked,item) -> ! locked }
.map { (locked,item) -> item }
.sortedBy { item -> item.sceneItemIndex }
.firstOrNull()
log.debug("item to pan: ${item?.sceneItemId}")
if (item != null) {
controller.getSceneItemTransform(sceneName, item.sceneItemId) { response ->
val transform = response.sceneItemTransform
log.debug("position: ${transform.positionX} x ${transform.positionY}")
val newTransform = transformBuilder(Transform.builder(), transform).build()
controller.setSceneItemTransform(sceneName, item.sceneItemId, newTransform) { response ->
log.debug("transform successful: ${response.isSuccessful}")
// Have to set the current scene to take effect if in studio mode.
controller.setCurrentProgramScene(sceneName) { response ->
log.debug("set current program to ${sceneName}: ${response.isSuccessful}")
}
}
}
}
}
}
}
} }
override fun close() { override fun close() {
log.debug("close") log.debug("close")
controller.disconnect() closed.set(true)
opThread.interrupt() opThread.interrupt()
controller.disconnect()
controller.stop()
} }
companion object { companion object {
private val log = LoggerFactory.getLogger(Obs::class.java) private val log = LoggerFactory.getLogger(Obs::class.java)
/**
* Wrap a request and keep track of timeout.
*/
private class Req(
val block:(OBSRemoteController)->Unit,
val timeout:Long?,
) {
val submitTime = System.nanoTime()
fun expired():Boolean = timeout != null && System.nanoTime() - submitTime > timeout
} }
} }
}

View File

@@ -0,0 +1,116 @@
package net.eksb.obsdc
import io.obswebsocket.community.client.message.request.RequestBatch
import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest
import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse
import io.obswebsocket.community.client.model.Scene
import io.obswebsocket.community.client.model.SceneItem.Transform
import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder
import org.slf4j.LoggerFactory
/**
* Use an [obs] to run [Op]s.
*
* Call [run] to run an OBS operation.
*/
class OpRunner(private val obs:Obs) {
/** How much to pan. */
private val panAmount = 50F
private val controller = obs.controller
fun run(op:Op) {
obs.submit { controller ->
when(op) {
Op.SCENE_1 -> scene { scenes -> scenes.firstOrNull() }
Op.SCENE_2 -> scene { scenes -> scenes.asSequence().drop(1).firstOrNull() }
Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() }
Op.STUDIO_TRANSITION -> {
controller.triggerStudioModeTransition { response ->
log.debug("studio transitioned: ${response.isSuccessful}")
}
}
Op.PAN_UP -> transform { old -> positionY(old.positionY - panAmount ) }
Op.PAN_DOWN -> transform { old -> positionY(old.positionY + panAmount ) }
Op.PAN_LEFT -> transform { old -> positionX(old.positionX - panAmount ) }
Op.PAN_RIGHT -> transform { old -> positionX(old.positionX + panAmount ) }
}
}
}
/**
* Select a scene from the scene list with the supplied [selector] and set the selected scene (if any)
* as the current program scene.
*/
private fun scene(selector:(List<Scene>)->Scene?) {
controller.getSceneList { response ->
val scene = selector(response.scenes.sortedBy(Scene::getSceneIndex).reversed())
log.debug("select scene ${scene?.sceneName} index:${scene?.sceneIndex}")
if (scene != null) {
controller.setCurrentProgramScene(scene.sceneName) { response ->
log.debug("selected scene ${scene.sceneName}: ${response.isSuccessful}")
}
}
}
}
/**
* Generate a transform for the lowest non-locked item in the current program scene with the
* supplied [transformBuilder], and apply that transform to the item.
*/
private fun transform(transformBuilder:TransformBuilder.(Transform)->TransformBuilder) {
controller.getCurrentProgramScene { response ->
val sceneName = response.currentProgramSceneName
log.debug("scene name: ${sceneName}")
controller.getSceneItemList(sceneName) { response ->
val items = response.sceneItems
// Even though locked status is in the response from OBS, the library does not parse it.
// So we have to ask for it explicitly:
controller.sendRequestBatch(
RequestBatch.builder()
.requests(
response.sceneItems.map { item ->
GetSceneItemLockedRequest.builder()
.sceneName(sceneName)
.sceneItemId(item.sceneItemId)
.build()
}
)
.build()
) { response ->
val item = response.data.results
.map { result ->
(result.responseData as GetSceneItemLockedResponse.SpecificData).sceneItemLocked
}
.zip(items)
.asSequence()
.filter { (locked,item) -> ! locked }
.map { (locked,item) -> item }
.sortedBy { item -> item.sceneItemIndex }
.firstOrNull()
log.debug("item to pan: ${item?.sceneItemId}")
if (item != null) {
controller.getSceneItemTransform(sceneName, item.sceneItemId) { response ->
val transform = response.sceneItemTransform
log.debug("position: ${transform.positionX} x ${transform.positionY}")
val newTransform = transformBuilder(Transform.builder(), transform).build()
controller.setSceneItemTransform(sceneName, item.sceneItemId, newTransform) { response ->
log.debug("transform successful: ${response.isSuccessful}")
// Have to set the current scene to take effect if in studio mode.
controller.setCurrentProgramScene(sceneName) { response ->
log.debug("set current program to ${sceneName}: ${response.isSuccessful}")
}
}
}
}
}
}
}
}
companion object {
private val log = LoggerFactory.getLogger(OpRunner::class.java)
}
}

View File

@@ -2,6 +2,8 @@ package net.eksb.obsdc
import java.io.File import java.io.File
import java.util.Properties import java.util.Properties
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
val HOME:File = System.getProperty("user.home")?.let(::File) ?: error("No user.home") val HOME:File = System.getProperty("user.home")?.let(::File) ?: error("No user.home")
val CONFIG_HOME:File = System.getenv("XDG_CONFIG_HOME")?.let(::File) ?: File(HOME, ".config") val CONFIG_HOME:File = System.getenv("XDG_CONFIG_HOME")?.let(::File) ?: File(HOME, ".config")
@@ -13,3 +15,13 @@ fun File.properties(): Properties = Properties()
inputStream().use(properties::load) inputStream().use(properties::load)
} }
} }
fun addShutdownHook(block:()->Unit) = Runtime.getRuntime().addShutdownHook(thread(start=false) { block() })
fun waitForShutdown() {
val latch = CountDownLatch(1)
addShutdownHook {
latch.countDown()
}
latch.await()
}

View File

@@ -0,0 +1 @@
org.slf4j.simpleLogger.log.net.eksb=debug