Connect from connect thread. Avoids stack overflow

This commit is contained in:
2023-11-30 14:59:36 -05:00
parent 9ded3dfdc6
commit 145bf4d7b1

View File

@@ -28,12 +28,14 @@ class Obs(
/** Queue of requests to run. */
private val q:BlockingQueue<Req> = LinkedBlockingQueue()
/** Backoff on errors. */
private val backoff = Backoff()
/** Flag to set when closed to stop queue poll loop. */
private val closed = AtomicBoolean(false)
/**
* Flag set when we start trying to connect, unset when disconnected.
* Used to determine if we should reconnect on controller error.
*/
private val connectingOrConnected = AtomicBoolean(false)
/**
* Flag to set when connected, unset when disconnected.
* Used to determine if we should reconnect on controller error.
@@ -69,9 +71,21 @@ class Obs(
addShutdownHook {
close()
}
// connect() blocks until OBS is up, so fork it.
thread(name="obs-init-connect", isDaemon=true, start=true) {
controller.connect()
// Thread that connects if we are not connected/connecting.
thread(name="obs-connect", isDaemon=true, start=true) {
while(!closed.get()) {
if (connectingOrConnected.compareAndSet(false,true)) {
log.debug("Not closed, not connected. Try to connect...")
try {
// Only call connect from here; if you try to call connect from [onControllerError] or [onDisconnect]
// eventually you will get stack overflow.
controller.connect()
} catch (e:Exception) {
log.warn("failed to connect: ${e.message}", e)
}
}
Thread.sleep(connectionTimeout.toLong() * 1000L) // in case the error was immediate
}
}
}
@@ -82,11 +96,10 @@ class Obs(
private fun onControllerError(e:ReasonThrowable) {
log.debug("controller error - ${e.reason}",e.throwable)
if (!closed.get() && !connected.get()) {
log.debug("connection failed")
backoff.backoff()
log.debug("reconnect after connection failed...")
controller.connect()
// If we are not connected, a controller error means that connection failed and we will not connect.
// If we are connected, it does not mean we are/will be disconnected.
if (!connected.get()) {
connectingOrConnected.set(false)
}
}
private fun onCommError(e:ReasonThrowable) {
@@ -94,12 +107,8 @@ class Obs(
}
private fun onDisconnect() {
log.debug("disconnected")
connectingOrConnected.set(false)
connected.set(false)
if (! closed.get()) {
backoff.backoff()
log.debug("reconnect after disconnected..")
controller.connect()
}
}
private fun onReady() {