diff --git a/src/main/kotlin/net/eksb/obsdc/Backoff.kt b/src/main/kotlin/net/eksb/obsdc/Backoff.kt new file mode 100644 index 0000000..e47797d --- /dev/null +++ b/src/main/kotlin/net/eksb/obsdc/Backoff.kt @@ -0,0 +1,7 @@ +package net.eksb.obsdc + +class Backoff { + fun backoff() { + Thread.sleep(5_000) + } +} \ No newline at end of file diff --git a/src/main/kotlin/net/eksb/obsdc/DBus.kt b/src/main/kotlin/net/eksb/obsdc/DBus.kt index 2efb49f..b729a50 100644 --- a/src/main/kotlin/net/eksb/obsdc/DBus.kt +++ b/src/main/kotlin/net/eksb/obsdc/DBus.kt @@ -15,7 +15,7 @@ class DBus(private val q: BlockingQueue) { private val thread = thread( start = true, - isDaemon = true, + isDaemon = false, name = "dbus", ) { DBusConnectionBuilder.forSessionBus().build().use { dbus -> diff --git a/src/main/kotlin/net/eksb/obsdc/Main.kt b/src/main/kotlin/net/eksb/obsdc/Main.kt index 39eb058..5ade101 100644 --- a/src/main/kotlin/net/eksb/obsdc/Main.kt +++ b/src/main/kotlin/net/eksb/obsdc/Main.kt @@ -7,7 +7,7 @@ object Main { @JvmStatic fun main(args: Array) { val q:BlockingQueue = LinkedBlockingQueue() - DBus(q) // forks daemon thread - Obs(q).run() // blocks + DBus(q) // forks non-daemon thread + Obs(q).start() // forks non-daemon thread } } \ No newline at end of file diff --git a/src/main/kotlin/net/eksb/obsdc/Obs.kt b/src/main/kotlin/net/eksb/obsdc/Obs.kt index 60dc15d..95788aa 100644 --- a/src/main/kotlin/net/eksb/obsdc/Obs.kt +++ b/src/main/kotlin/net/eksb/obsdc/Obs.kt @@ -1,24 +1,29 @@ package net.eksb.obsdc import io.obswebsocket.community.client.OBSRemoteController -import io.obswebsocket.community.client.message.event.ui.StudioModeStateChangedEvent +import io.obswebsocket.community.client.WebSocketCloseCode +import io.obswebsocket.community.client.listener.lifecycle.ReasonThrowable import io.obswebsocket.community.client.message.request.RequestBatch import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse import io.obswebsocket.community.client.model.Scene import io.obswebsocket.community.client.model.SceneItem.Transform import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder -import org.slf4j.LoggerFactory import java.util.concurrent.BlockingQueue -import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import kotlin.concurrent.thread +import org.slf4j.LoggerFactory // protocol docs: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md class Obs(private val q:BlockingQueue): AutoCloseable { private val panAmount = 50F - private var alive = true + private val backoff = Backoff() + + private val started = AtomicBoolean(false) + private val connected = AtomicBoolean(false) + private val ready = AtomicBoolean(false) private val controller = OBSRemoteController.builder() .host("localhost") @@ -27,49 +32,84 @@ class Obs(private val q:BlockingQueue): AutoCloseable { .autoConnect(false) .connectionTimeout(3) .lifecycle() - .onReady(::ready) - .onClose { code -> log.error("closed:${code}")} - .onControllerError { e -> log.error("controller error", e ) } - .onCommunicatorError { e -> log.error("comm error", e ) } - .onDisconnect { - log.info("disconnected") - if (alive) reconnect() + .onReady(::onReady) + .onClose(::onClose) + .onControllerError(::onControllerError) + .onCommunicatorError(::onCommError) + .onDisconnect(::onDisconnect) + .onConnect { + log.info("connected") + connected.set(true) } .and() - .registerEventListener(StudioModeStateChangedEvent::class.java) { - log.info("studio mode state change: ${it}") - } .build() - init { Runtime.getRuntime().addShutdownHook(thread(start=false) { - log.info("shutdown") - alive = false - controller.stop() + close() }) } - private fun reconnect() { - controller.connect() + fun start() { + if (!started.compareAndExchange(false,true)) { + controller.connect() + } + } + fun stop() { + if (started.compareAndExchange(true,false)) { + controller.disconnect() + } } - fun run() { - controller.connect() + private fun onClose(e:WebSocketCloseCode) { + log.info("closed: ${e.code}") + ready.set(false) } - override fun close() { - controller.disconnect() + private fun onControllerError(e:ReasonThrowable) { + log.info("controller error - ${e.reason}",e.throwable) + if (started.get() && ! connected.get()) { + log.info("connection failed") + backoff.backoff() + log.info("reconnect after connection failed...") + controller.connect() + } + } + private fun onCommError(e:ReasonThrowable) { + log.info("comm error - ${e.reason}",e.throwable) + } + private fun onDisconnect() { + log.info("disconnected") + ready.set(false) + connected.set(false) + if (started.get()) { + backoff.backoff() + log.info("reconnect after disconnected..") + controller.connect() + } } - private fun ready() { + private fun onReady() { log.info("ready") - while(alive) { - val op: Op? = q.poll(1, TimeUnit.SECONDS) // blocks - if (op != null) { - log.info("op: ${op}") - op(op) - break + ready.set(true) + } + + private val opThread = thread(name="obs-op", isDaemon=false, start=true) { + while(true) { + val op = q.take() + log.info("got op: ${op}") + if (ready.get()) { + try { + op(op) + } catch (e:InterruptedException) { + log.info("op thread interrupted") + break + } catch (e:Exception) { + log.error("op ${op} failed", e ) + } + } else { + // This would be way more complicated if we had to buffer ops. + log.info("skipping op ${op} because not yet ready") } } } @@ -81,19 +121,13 @@ class Obs(private val q:BlockingQueue): AutoCloseable { Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() } Op.STUDIO_TRANSITION -> { controller.triggerStudioModeTransition { response -> - // This does not get called? - log.info("Response successful: ${response.isSuccessful}") - ready() + log.info("studio transitioned: ${response.isSuccessful}") } } Op.PAN_UP -> pan { old -> positionY(old.positionY - panAmount ) } Op.PAN_DOWN -> pan { old -> positionY(old.positionY + panAmount ) } Op.PAN_LEFT -> pan { old -> positionX(old.positionX - panAmount ) } Op.PAN_RIGHT -> pan { old -> positionX(old.positionX + panAmount ) } - Op.TODO -> { - log.info("OP=TODO") - ready() - } } } @@ -103,10 +137,8 @@ class Obs(private val q:BlockingQueue): AutoCloseable { log.info("select scene ${scene?.sceneName} index:${scene?.sceneIndex}") if (scene != null) { controller.setCurrentProgramScene(scene.sceneName) { response -> - ready() + log.info("selected scene ${scene.sceneName}: ${response.isSuccessful}") } - } else { - ready() } } } @@ -152,18 +184,22 @@ class Obs(private val q:BlockingQueue): AutoCloseable { log.info("transform successful: ${response.isSuccessful}") // Have to set the current scene to take effect if in studio mode. controller.setCurrentProgramScene(sceneName) { response -> - ready() + log.info("set current program to ${sceneName}: ${response.isSuccessful}") } } } - } else { - ready() } } } } } + override fun close() { + log.info("close") + stop() + opThread.interrupt() + } + companion object { private val log = LoggerFactory.getLogger(Obs::class.java) } diff --git a/src/main/kotlin/net/eksb/obsdc/Op.kt b/src/main/kotlin/net/eksb/obsdc/Op.kt index 1a56317..b639f4d 100644 --- a/src/main/kotlin/net/eksb/obsdc/Op.kt +++ b/src/main/kotlin/net/eksb/obsdc/Op.kt @@ -9,6 +9,5 @@ enum class Op { PAN_DOWN, PAN_LEFT, PAN_RIGHT, - TODO, ; } \ No newline at end of file