diff --git a/gradle.properties b/gradle.properties index 5d8344847..9be6eddd3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,3 +20,4 @@ buildconfigVersion=6.0.7 # We should probably stop using grgit, see: # https://andrewoberstar.com/posts/2024-04-02-dont-commit-to-grgit/ grgitVersion=5.3.3 +wireVersion=5.3.1 diff --git a/server/core/src/main/java/dev/slimevr/config/settings.kt b/server/core/src/main/java/dev/slimevr/config/settings.kt index a85f5533b..e20796da9 100644 --- a/server/core/src/main/java/dev/slimevr/config/settings.kt +++ b/server/core/src/main/java/dev/slimevr/config/settings.kt @@ -41,6 +41,7 @@ data class SettingsState( ) sealed interface SettingsActions { + data class Update(val transform: SettingsState.() -> SettingsState) : SettingsActions data class LoadProfile(val newState: SettingsState) : SettingsActions } @@ -56,7 +57,9 @@ data class Settings( val DefaultSettingsModule = SettingsModule( reducer = { s, a -> when (a) { + is SettingsActions.Update -> a.transform(s) is SettingsActions.LoadProfile -> a.newState + else -> s } }, ) @@ -98,4 +101,4 @@ suspend fun createSettings(scope: CoroutineScope, configDir: File, name: String) } return Settings(context, configDir = settingsDir.toString(), swap) -} \ No newline at end of file +} diff --git a/server/core/src/main/java/dev/slimevr/config/user.kt b/server/core/src/main/java/dev/slimevr/config/user.kt index ead6936ad..e9a4c41d9 100644 --- a/server/core/src/main/java/dev/slimevr/config/user.kt +++ b/server/core/src/main/java/dev/slimevr/config/user.kt @@ -3,6 +3,7 @@ package dev.slimevr.config import dev.slimevr.context.Context import dev.slimevr.context.CustomModule import dev.slimevr.context.createContext +import dev.slimevr.tracker.DeviceActions import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin @@ -57,6 +58,7 @@ data class UserConfig( val DefaultUserModule = UserConfigModule( reducer = { s, a -> when (a) { + is UserConfigActions.Update -> a.transform(s) is UserConfigActions.LoadProfile -> a.newState else -> s } diff --git a/server/core/src/main/java/dev/slimevr/platform.kt b/server/core/src/main/java/dev/slimevr/platform.kt index 67ba9f0b7..b741df356 100644 --- a/server/core/src/main/java/dev/slimevr/platform.kt +++ b/server/core/src/main/java/dev/slimevr/platform.kt @@ -1,4 +1,4 @@ -package io.eiren.util +package dev.slimevr import java.io.File import java.nio.file.Path diff --git a/server/core/src/main/java/dev/slimevr/solarxr/server.kt b/server/core/src/main/java/dev/slimevr/solarxr/ws-server.kt similarity index 88% rename from server/core/src/main/java/dev/slimevr/solarxr/server.kt rename to server/core/src/main/java/dev/slimevr/solarxr/ws-server.kt index 984cf7742..c83d146d7 100644 --- a/server/core/src/main/java/dev/slimevr/solarxr/server.kt +++ b/server/core/src/main/java/dev/slimevr/solarxr/ws-server.kt @@ -8,13 +8,9 @@ import io.ktor.server.netty.Netty import io.ktor.server.routing.routing import io.ktor.server.websocket.* import io.ktor.websocket.Frame -import io.ktor.websocket.readBytes -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import solarxr_protocol.MessageBundle +import solarxr_protocol.rpc.ResetRequest import java.nio.ByteBuffer -import kotlin.reflect.KClass const val SOLARXR_PORT = 21110 @@ -27,10 +23,16 @@ suspend fun onSolarXRMessage(message: ByteBuffer, context: SolarXRConnection) { context.dataFeedDispatcher.emit(msg) } + // FIXME: temporary test + context.rpcDispatcher.on { + println("RESET $it") + } + messageBundle.rpcMsgs?.forEach { val msg = it.message ?: return; context.rpcDispatcher.emit(msg) } + } diff --git a/server/desktop/build.gradle.kts b/server/desktop/build.gradle.kts index 38eea50a4..db22322f9 100644 --- a/server/desktop/build.gradle.kts +++ b/server/desktop/build.gradle.kts @@ -14,6 +14,7 @@ plugins { id("com.gradleup.shadow") id("com.github.gmazzo.buildconfig") id("org.ajoberstar.grgit") + id("com.squareup.wire") } kotlin { @@ -54,6 +55,23 @@ allprojects { } } +val downloadDriverProto by tasks.registering { + val protoFile = layout.buildDirectory.file("proto/ProtobufMessages.proto") + outputs.file(protoFile) + doLast { + val url = "https://raw.githubusercontent.com/SlimeVR/SlimeVR-OpenVR-Driver/main/src/bridge/ProtobufMessages.proto" + protoFile.get().asFile.parentFile.mkdirs() + uri(url).toURL().openStream().use { it.copyTo(protoFile.get().asFile.outputStream()) } + } +} + +wire { + sourcePath { + srcDir(downloadDriverProto.map { layout.buildDirectory.dir("proto") }) + } + kotlin { } +} + dependencies { implementation(project(":server:core")) implementation(project(":solarxr-protocol")) diff --git a/server/desktop/src/main/java/dev/slimevr/desktop/Main.kt b/server/desktop/src/main/java/dev/slimevr/desktop/Main.kt index 573491a36..e08970964 100644 --- a/server/desktop/src/main/java/dev/slimevr/desktop/Main.kt +++ b/server/desktop/src/main/java/dev/slimevr/desktop/Main.kt @@ -4,9 +4,10 @@ package dev.slimevr.desktop import dev.slimevr.VRServer import dev.slimevr.config.createAppConfig +import dev.slimevr.desktop.ipc.createIpcServers import dev.slimevr.solarxr.createSolarXRWebsocketServer import dev.slimevr.tracker.udp.createUDPTrackerServer -import io.eiren.util.resolveConfigDirectory +import dev.slimevr.resolveConfigDirectory import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -21,5 +22,6 @@ fun main(args: Array) = runBlocking { launch { createSolarXRWebsocketServer(server) } + createIpcServers(server) Unit } diff --git a/server/desktop/src/main/java/dev/slimevr/desktop/ipc/ipc.kt b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/ipc.kt new file mode 100644 index 000000000..81649da94 --- /dev/null +++ b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/ipc.kt @@ -0,0 +1,31 @@ +package dev.slimevr.desktop.ipc + +import dev.slimevr.VRServer +import dev.slimevr.CURRENT_PLATFORM +import dev.slimevr.Platform +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch + +const val DRIVER_SOCKET_NAME = "SlimeVRDriver" +const val FEEDER_SOCKET_NAME = "SlimeVRInput" +const val SOLARXR_SOCKET_NAME = "SlimeVRRpc" + +const val DRIVER_PIPE = "\\\\.\\pipe\\SlimeVRDriver" +const val FEEDER_PIPE = "\\\\.\\pipe\\SlimeVRInput" +const val SOLARXR_PIPE = "\\\\.\\pipe\\SlimeVRRpc" + +suspend fun createIpcServers(server: VRServer) = coroutineScope { + when (CURRENT_PLATFORM) { + Platform.LINUX, Platform.OSX -> { + launch { createUnixDriverSocket(server) } + launch { createUnixFeederSocket(server) } + launch { createUnixSolarXRSocket(server) } + } + Platform.WINDOWS -> { + launch { createWindowsDriverPipe(server) } + launch { createWindowsFeederPipe(server) } + launch { createWindowsSolarXRPipe(server) } + } + else -> Unit + } +} diff --git a/server/desktop/src/main/java/dev/slimevr/desktop/ipc/linux.kt b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/linux.kt new file mode 100644 index 000000000..73467db7c --- /dev/null +++ b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/linux.kt @@ -0,0 +1,95 @@ +package dev.slimevr.desktop.ipc + +import dev.slimevr.VRServer +import dev.slimevr.getSocketDirectory +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import java.net.StandardProtocolFamily +import java.net.UnixDomainSocketAddress +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.nio.channels.ServerSocketChannel +import java.nio.channels.SocketChannel +import kotlin.io.path.Path + +suspend fun createUnixDriverSocket(server: VRServer) = + acceptUnixClients(DRIVER_SOCKET_NAME) { channel -> + handleDriverConnection( + server = server, + messages = readFramedMessages(channel), + send = { bytes -> withContext(Dispatchers.IO) { writeFramed(channel, bytes) } }, + ) + } + +suspend fun createUnixFeederSocket(server: VRServer) = + acceptUnixClients(FEEDER_SOCKET_NAME) { channel -> + handleFeederConnection( + server = server, + messages = readFramedMessages(channel), + send = { bytes -> withContext(Dispatchers.IO) { writeFramed(channel, bytes) } }, + ) + } + +suspend fun createUnixSolarXRSocket(server: VRServer) = + acceptUnixClients(SOLARXR_SOCKET_NAME) { channel -> + handleSolarXRConnection( + server = server, + messages = readFramedMessages(channel), + send = { bytes -> withContext(Dispatchers.IO) { writeFramed(channel, bytes) } }, + ) + } + +private fun isSocketInUse(socketPath: String): Boolean = try { + SocketChannel.open(StandardProtocolFamily.UNIX).use { + it.connect(UnixDomainSocketAddress.of(socketPath)) + true + } +} catch (_: Exception) { false } + +// Length field is LE u32 and includes the 4-byte header itself +private fun readFramedMessages(channel: SocketChannel) = flow { + val lenBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN) + while (true) { + lenBuf.clear() + if (channel.read(lenBuf) == -1) break + lenBuf.flip() + + val dataBuf = ByteBuffer.allocate(lenBuf.int - 4) + while (dataBuf.hasRemaining()) { + if (channel.read(dataBuf) == -1) break + } + emit(dataBuf.array()) + } +}.flowOn(Dispatchers.IO) + +private fun writeFramed(channel: SocketChannel, bytes: ByteArray) { + val header = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(bytes.size + 4).flip() + channel.write(arrayOf(header, ByteBuffer.wrap(bytes))) +} + +private suspend fun acceptUnixClients( + name: String, + handle: suspend (SocketChannel) -> Unit, +) = withContext(Dispatchers.IO) { + val path = Path(getSocketDirectory(), name) + val file = path.toFile() + if (file.exists()) { + check(!isSocketInUse(path.toString())) { + "$name socket is already in use by another process" + } + file.delete() + } + file.deleteOnExit() + + ServerSocketChannel.open(StandardProtocolFamily.UNIX).use { server -> + server.bind(UnixDomainSocketAddress.of(path)) + while (isActive) { + val client = server.accept() + launch { handle(client) } + } + } +} \ No newline at end of file diff --git a/server/desktop/src/main/java/dev/slimevr/desktop/ipc/protocol.kt b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/protocol.kt new file mode 100644 index 000000000..04e6c1c57 --- /dev/null +++ b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/protocol.kt @@ -0,0 +1,156 @@ +package dev.slimevr.desktop.ipc + +import dev.slimevr.VRServer +import dev.slimevr.VRServerActions +import dev.slimevr.desktop.platform.Position +import dev.slimevr.desktop.platform.ProtobufMessage +import dev.slimevr.desktop.platform.TrackerAdded +import dev.slimevr.desktop.platform.Version +import dev.slimevr.solarxr.createSolarXRConnection +import dev.slimevr.solarxr.onSolarXRMessage +import dev.slimevr.tracker.DeviceOrigin +import dev.slimevr.tracker.TrackerActions +import dev.slimevr.tracker.createDevice +import dev.slimevr.tracker.createTracker +import io.github.axisangles.ktmath.Quaternion +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import solarxr_protocol.datatypes.hardware_info.ImuType +import java.nio.ByteBuffer + +const val PROTOCOL_VERSION = 5 + +suspend fun handleDriverConnection( + server: VRServer, + messages: Flow, + send: suspend (ByteArray) -> Unit, +) = coroutineScope { + val sendMutex = Mutex() + + suspend fun sendMsg(msg: ProtobufMessage) = sendMutex.withLock { + send(ProtobufMessage.ADAPTER.encode(msg)) + } + + sendMsg(ProtobufMessage(version = Version(protocol_version = PROTOCOL_VERSION))) + + val subscribedTrackers = mutableSetOf() + + val observerJob = launch { + server.context.state.collect { state -> + state.trackers.values.forEach { tracker -> + val trackerState = tracker.context.state.value + if (trackerState.origin == DeviceOrigin.DRIVER) return@forEach + if (subscribedTrackers.add(trackerState.id)) { + sendMsg( + ProtobufMessage( + tracker_added = TrackerAdded( + tracker_id = trackerState.id, + tracker_serial = trackerState.hardwareId, + tracker_name = trackerState.customName ?: trackerState.name, + ), + ), + ) + launch { + tracker.context.state.collect { ts -> + sendMsg( + ProtobufMessage( + position = Position( + tracker_id = ts.id, + qx = ts.rawRotation.x, + qy = ts.rawRotation.y, + qz = ts.rawRotation.z, + qw = ts.rawRotation.w, + ), + ), + ) + } + } + } + } + } + } + + try { + messages.collect { bytes -> + val msg = ProtobufMessage.ADAPTER.decode(bytes) + msg.user_action?.let { + // TODO: dispatch user actions (reset, etc.) to VRServer + } + msg.version?.let { + // TODO: store remote protocol version if needed + } + } + } finally { + observerJob.cancel() + } +} + +suspend fun handleFeederConnection( + server: VRServer, + messages: Flow, + send: suspend (ByteArray) -> Unit, +) = coroutineScope { + send(ProtobufMessage.ADAPTER.encode(ProtobufMessage(version = Version(protocol_version = PROTOCOL_VERSION)))) + + messages.collect { bytes -> + val msg = ProtobufMessage.ADAPTER.decode(bytes) + + if (msg.tracker_added != null) { + val deviceId = server.nextHandle() + val device = createDevice( + scope = this, + id = deviceId, + address = msg.tracker_added.tracker_serial, + origin = DeviceOrigin.FEEDER, + serverContext = server, + ) + server.context.dispatch(VRServerActions.NewDevice(deviceId, device)) + + val trackerId = server.nextHandle() + val tracker = createTracker( + scope = this, + id = trackerId, + deviceId = deviceId, + sensorType = ImuType.MPU9250, + hardwareId = msg.tracker_added.tracker_serial, + origin = DeviceOrigin.FEEDER, + serverContext = server, + ) + server.context.dispatch(VRServerActions.NewTracker(trackerId, tracker)) + } + + if (msg.position != null) { + server.getTracker(msg.position.tracker_id)?.context?.dispatch( + TrackerActions.Update { + copy( + rawRotation = Quaternion( + w = msg.position.qw, + x = msg.position.qx, + y = msg.position.qy, + z = msg.position.qz, + ), + ) + }, + ) + } + } +} + +suspend fun handleSolarXRConnection( + server: VRServer, + messages: Flow, + send: suspend (ByteArray) -> Unit, +) = coroutineScope { + val connection = createSolarXRConnection( + serverContext = server, + scope = this, + onSend = send, + ) + + messages.collect { bytes -> + onSolarXRMessage(ByteBuffer.wrap(bytes), connection) + } +} \ No newline at end of file diff --git a/server/desktop/src/main/java/dev/slimevr/desktop/ipc/windows.kt b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/windows.kt new file mode 100644 index 000000000..5622cd6e9 --- /dev/null +++ b/server/desktop/src/main/java/dev/slimevr/desktop/ipc/windows.kt @@ -0,0 +1,126 @@ +package dev.slimevr.desktop.ipc + +import com.sun.jna.platform.win32.Advapi32 +import com.sun.jna.platform.win32.Kernel32 +import com.sun.jna.platform.win32.WinBase +import com.sun.jna.platform.win32.WinError +import com.sun.jna.platform.win32.WinNT +import com.sun.jna.ptr.IntByReference +import dev.slimevr.VRServer +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import java.nio.ByteBuffer +import java.nio.ByteOrder + +private val k32 = Kernel32.INSTANCE +private val adv32 = Advapi32.INSTANCE + +suspend fun createWindowsDriverPipe(server: VRServer) = + acceptWindowsClients(DRIVER_PIPE) { handle -> + handleDriverConnection( + server = server, + messages = readFramedMessages(handle), + send = { bytes -> withContext(Dispatchers.IO) { writeFramedPipe(handle, bytes) } }, + ) + } + +suspend fun createWindowsFeederPipe(server: VRServer) = + acceptWindowsClients(FEEDER_PIPE) { handle -> + handleFeederConnection( + server = server, + messages = readFramedMessages(handle), + send = { bytes -> withContext(Dispatchers.IO) { writeFramedPipe(handle, bytes) } }, + ) + } + +suspend fun createWindowsSolarXRPipe(server: VRServer) = + acceptWindowsClients(SOLARXR_PIPE) { handle -> + handleSolarXRConnection( + server = server, + messages = readFramedMessages(handle), + send = { bytes -> withContext(Dispatchers.IO) { writeFramedPipe(handle, bytes) } }, + ) + } + +// Length field is LE u32 and includes the 4-byte header itself +private fun readFramedMessages(handle: WinNT.HANDLE) = flow { + val lenBuf = ByteArray(4) + while (true) { + if (!readExact(handle, lenBuf, 4)) break + val totalLen = ByteBuffer.wrap(lenBuf).order(ByteOrder.LITTLE_ENDIAN).int + + val dataBuf = ByteArray(totalLen - 4) + if (!readExact(handle, dataBuf, totalLen - 4)) break + emit(dataBuf) + } +}.flowOn(Dispatchers.IO) + +private fun readExact(handle: WinNT.HANDLE, buf: ByteArray, len: Int): Boolean { + var offset = 0 + val bytesRead = IntByReference() + while (offset < len) { + val ok = k32.ReadFile(handle, buf, len - offset, bytesRead, null) + if (!ok || bytesRead.value == 0) return false + offset += bytesRead.value + } + return true +} + +private fun writeFramedPipe(handle: WinNT.HANDLE, bytes: ByteArray) { + val buf = ByteArray(bytes.size + 4) + ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN).putInt(bytes.size + 4) + bytes.copyInto(buf, destinationOffset = 4) + k32.WriteFile(handle, buf, buf.size, IntByReference(), null) +} + +private fun createSecurePipe(pipeName: String): WinNT.HANDLE { + // Null DACL allows any process (including SteamVR driver) to connect + val descriptor = WinNT.SECURITY_DESCRIPTOR(64 * 1024) + adv32.InitializeSecurityDescriptor(descriptor, WinNT.SECURITY_DESCRIPTOR_REVISION) + adv32.SetSecurityDescriptorDacl(descriptor, true, null, false) + + val attributes = WinBase.SECURITY_ATTRIBUTES() + attributes.lpSecurityDescriptor = descriptor.pointer + attributes.bInheritHandle = false + + val pipe = k32.CreateNamedPipe( + pipeName, + WinBase.PIPE_ACCESS_DUPLEX, + WinBase.PIPE_TYPE_BYTE or WinBase.PIPE_READMODE_BYTE or WinBase.PIPE_WAIT, + WinBase.PIPE_UNLIMITED_INSTANCES, + 65536, 65536, 0, attributes, + ) + check(pipe != WinNT.INVALID_HANDLE_VALUE) { + "CreateNamedPipe failed for $pipeName: ${k32.GetLastError()}" + } + return pipe +} + +private suspend fun acceptWindowsClients( + pipeName: String, + handle: suspend (WinNT.HANDLE) -> Unit, +) = withContext(Dispatchers.IO) { + while (isActive) { + val pipe = createSecurePipe(pipeName) + + val ok = k32.ConnectNamedPipe(pipe, null) + val err = k32.GetLastError() + if (!ok && err != WinError.ERROR_PIPE_CONNECTED) { + k32.CloseHandle(pipe) + continue + } + + launch { + try { + handle(pipe) + } finally { + k32.DisconnectNamedPipe(pipe) + k32.CloseHandle(pipe) + } + } + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index a87de8acd..7c7f9abf2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -21,6 +21,7 @@ pluginManagement { val shadowJarVersion: String by settings val buildconfigVersion: String by settings val grgitVersion: String by settings + val wireVersion: String by settings plugins { kotlin("plugin.serialization") version kotlinVersion kotlin("jvm") version kotlinVersion @@ -29,6 +30,7 @@ pluginManagement { id("com.gradleup.shadow") version shadowJarVersion id("com.github.gmazzo.buildconfig") version buildconfigVersion id("org.ajoberstar.grgit") version grgitVersion + id("com.squareup.wire") version wireVersion } }