send ops to obs from separate thread; handle obs restart

This commit is contained in:
2023-10-21 11:42:23 -04:00
parent 467a54c607
commit 449fae8e22
5 changed files with 90 additions and 48 deletions

View File

@@ -0,0 +1,7 @@
package net.eksb.obsdc
class Backoff {
fun backoff() {
Thread.sleep(5_000)
}
}

View File

@@ -15,7 +15,7 @@ class DBus(private val q: BlockingQueue<Op>) {
private val thread = thread( private val thread = thread(
start = true, start = true,
isDaemon = true, isDaemon = false,
name = "dbus", name = "dbus",
) { ) {
DBusConnectionBuilder.forSessionBus().build().use { dbus -> DBusConnectionBuilder.forSessionBus().build().use { dbus ->

View File

@@ -7,7 +7,7 @@ object Main {
@JvmStatic @JvmStatic
fun main(args: Array<String>) { fun main(args: Array<String>) {
val q:BlockingQueue<Op> = LinkedBlockingQueue() val q:BlockingQueue<Op> = LinkedBlockingQueue()
DBus(q) // forks daemon thread DBus(q) // forks non-daemon thread
Obs(q).run() // blocks Obs(q).start() // forks non-daemon thread
} }
} }

View File

@@ -1,24 +1,29 @@
package net.eksb.obsdc package net.eksb.obsdc
import io.obswebsocket.community.client.OBSRemoteController import io.obswebsocket.community.client.OBSRemoteController
import io.obswebsocket.community.client.message.event.ui.StudioModeStateChangedEvent import io.obswebsocket.community.client.WebSocketCloseCode
import io.obswebsocket.community.client.listener.lifecycle.ReasonThrowable
import io.obswebsocket.community.client.message.request.RequestBatch import io.obswebsocket.community.client.message.request.RequestBatch
import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest import io.obswebsocket.community.client.message.request.sceneitems.GetSceneItemLockedRequest
import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse import io.obswebsocket.community.client.message.response.sceneitems.GetSceneItemLockedResponse
import io.obswebsocket.community.client.model.Scene import io.obswebsocket.community.client.model.Scene
import io.obswebsocket.community.client.model.SceneItem.Transform import io.obswebsocket.community.client.model.SceneItem.Transform
import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder import io.obswebsocket.community.client.model.SceneItem.Transform.TransformBuilder
import org.slf4j.LoggerFactory
import java.util.concurrent.BlockingQueue import java.util.concurrent.BlockingQueue
import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread import kotlin.concurrent.thread
import org.slf4j.LoggerFactory
// protocol docs: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md // protocol docs: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md
class Obs(private val q:BlockingQueue<Op>): AutoCloseable { class Obs(private val q:BlockingQueue<Op>): AutoCloseable {
private val panAmount = 50F private val panAmount = 50F
private var alive = true private val backoff = Backoff()
private val started = AtomicBoolean(false)
private val connected = AtomicBoolean(false)
private val ready = AtomicBoolean(false)
private val controller = OBSRemoteController.builder() private val controller = OBSRemoteController.builder()
.host("localhost") .host("localhost")
@@ -27,49 +32,84 @@ class Obs(private val q:BlockingQueue<Op>): AutoCloseable {
.autoConnect(false) .autoConnect(false)
.connectionTimeout(3) .connectionTimeout(3)
.lifecycle() .lifecycle()
.onReady(::ready) .onReady(::onReady)
.onClose { code -> log.error("closed:${code}")} .onClose(::onClose)
.onControllerError { e -> log.error("controller error", e ) } .onControllerError(::onControllerError)
.onCommunicatorError { e -> log.error("comm error", e ) } .onCommunicatorError(::onCommError)
.onDisconnect { .onDisconnect(::onDisconnect)
log.info("disconnected") .onConnect {
if (alive) reconnect() log.info("connected")
connected.set(true)
} }
.and() .and()
.registerEventListener(StudioModeStateChangedEvent::class.java) {
log.info("studio mode state change: ${it}")
}
.build() .build()
init { init {
Runtime.getRuntime().addShutdownHook(thread(start=false) { Runtime.getRuntime().addShutdownHook(thread(start=false) {
log.info("shutdown") close()
alive = false
controller.stop()
}) })
} }
private fun reconnect() { fun start() {
controller.connect() if (!started.compareAndExchange(false,true)) {
controller.connect()
}
}
fun stop() {
if (started.compareAndExchange(true,false)) {
controller.disconnect()
}
} }
fun run() { private fun onClose(e:WebSocketCloseCode) {
controller.connect() log.info("closed: ${e.code}")
ready.set(false)
} }
override fun close() { private fun onControllerError(e:ReasonThrowable) {
controller.disconnect() log.info("controller error - ${e.reason}",e.throwable)
if (started.get() && ! connected.get()) {
log.info("connection failed")
backoff.backoff()
log.info("reconnect after connection failed...")
controller.connect()
}
}
private fun onCommError(e:ReasonThrowable) {
log.info("comm error - ${e.reason}",e.throwable)
}
private fun onDisconnect() {
log.info("disconnected")
ready.set(false)
connected.set(false)
if (started.get()) {
backoff.backoff()
log.info("reconnect after disconnected..")
controller.connect()
}
} }
private fun ready() { private fun onReady() {
log.info("ready") log.info("ready")
while(alive) { ready.set(true)
val op: Op? = q.poll(1, TimeUnit.SECONDS) // blocks }
if (op != null) {
log.info("op: ${op}") private val opThread = thread(name="obs-op", isDaemon=false, start=true) {
op(op) while(true) {
break val op = q.take()
log.info("got op: ${op}")
if (ready.get()) {
try {
op(op)
} catch (e:InterruptedException) {
log.info("op thread interrupted")
break
} catch (e:Exception) {
log.error("op ${op} failed", e )
}
} else {
// This would be way more complicated if we had to buffer ops.
log.info("skipping op ${op} because not yet ready")
} }
} }
} }
@@ -81,19 +121,13 @@ class Obs(private val q:BlockingQueue<Op>): AutoCloseable {
Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() } Op.SCENE_3 -> scene { scenes -> scenes.asSequence().drop(2).firstOrNull() }
Op.STUDIO_TRANSITION -> { Op.STUDIO_TRANSITION -> {
controller.triggerStudioModeTransition { response -> controller.triggerStudioModeTransition { response ->
// This does not get called? log.info("studio transitioned: ${response.isSuccessful}")
log.info("Response successful: ${response.isSuccessful}")
ready()
} }
} }
Op.PAN_UP -> pan { old -> positionY(old.positionY - panAmount ) } Op.PAN_UP -> pan { old -> positionY(old.positionY - panAmount ) }
Op.PAN_DOWN -> pan { old -> positionY(old.positionY + panAmount ) } Op.PAN_DOWN -> pan { old -> positionY(old.positionY + panAmount ) }
Op.PAN_LEFT -> pan { old -> positionX(old.positionX - panAmount ) } Op.PAN_LEFT -> pan { old -> positionX(old.positionX - panAmount ) }
Op.PAN_RIGHT -> pan { old -> positionX(old.positionX + panAmount ) } Op.PAN_RIGHT -> pan { old -> positionX(old.positionX + panAmount ) }
Op.TODO -> {
log.info("OP=TODO")
ready()
}
} }
} }
@@ -103,10 +137,8 @@ class Obs(private val q:BlockingQueue<Op>): AutoCloseable {
log.info("select scene ${scene?.sceneName} index:${scene?.sceneIndex}") log.info("select scene ${scene?.sceneName} index:${scene?.sceneIndex}")
if (scene != null) { if (scene != null) {
controller.setCurrentProgramScene(scene.sceneName) { response -> controller.setCurrentProgramScene(scene.sceneName) { response ->
ready() log.info("selected scene ${scene.sceneName}: ${response.isSuccessful}")
} }
} else {
ready()
} }
} }
} }
@@ -152,18 +184,22 @@ class Obs(private val q:BlockingQueue<Op>): AutoCloseable {
log.info("transform successful: ${response.isSuccessful}") log.info("transform successful: ${response.isSuccessful}")
// Have to set the current scene to take effect if in studio mode. // Have to set the current scene to take effect if in studio mode.
controller.setCurrentProgramScene(sceneName) { response -> controller.setCurrentProgramScene(sceneName) { response ->
ready() log.info("set current program to ${sceneName}: ${response.isSuccessful}")
} }
} }
} }
} else {
ready()
} }
} }
} }
} }
} }
override fun close() {
log.info("close")
stop()
opThread.interrupt()
}
companion object { companion object {
private val log = LoggerFactory.getLogger(Obs::class.java) private val log = LoggerFactory.getLogger(Obs::class.java)
} }

View File

@@ -9,6 +9,5 @@ enum class Op {
PAN_DOWN, PAN_DOWN,
PAN_LEFT, PAN_LEFT,
PAN_RIGHT, PAN_RIGHT,
TODO,
; ;
} }