From 92102e3629354f4bf913c2cd5fa9888c1eb43b3a Mon Sep 17 00:00:00 2001 From: ParkerTenBroeck <51721964+ParkerTenBroeck@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:14:50 -0400 Subject: [PATCH] adding local support --- relay/src/build.rs | 124 +++++++++++---------- relay/src/local.rs | 258 ++++++++++++++++++++++++++++++++++++++++++++ relay/src/main.rs | 156 +++++++-------------------- relay/src/remote.rs | 128 ++++++++++++++++++++++ relay/src/run.rs | 32 ++++-- 5 files changed, 515 insertions(+), 183 deletions(-) create mode 100644 relay/src/local.rs create mode 100644 relay/src/remote.rs diff --git a/relay/src/build.rs b/relay/src/build.rs index e5298c6..1d5eafa 100644 --- a/relay/src/build.rs +++ b/relay/src/build.rs @@ -1,92 +1,100 @@ -use std::{collections::HashMap, ops::Deref, path::{Path, PathBuf}}; -use tokio::{ - process::{Child, Command}, +use std::{ + collections::HashMap, + ops::Deref, + path::{Path, PathBuf}, }; +use tokio::process::{Child, Command}; -async fn ensure_ok(child: Child) -> Result<(), Box>{ +async fn ensure_ok(child: Child) -> Result<(), Box> { let result = child.wait_with_output().await?; - if !result.status.success(){ - return Err(format!("{}\n{}", String::from_utf8_lossy(&result.stdout), String::from_utf8_lossy(&result.stderr)))? + if !result.status.success() { + return Err(format!( + "{}\n{}", + String::from_utf8_lossy(&result.stdout), + String::from_utf8_lossy(&result.stderr) + ))?; } Ok(()) } pub struct TempDir(PathBuf); -impl Drop for TempDir{ +impl Drop for TempDir { fn drop(&mut self) { _ = std::fs::remove_dir_all(&self.0) } } -impl Deref for TempDir{ +impl Deref for TempDir { type Target = PathBuf; - + fn deref(&self) -> &Self::Target { &self.0 } } -impl AsRef for TempDir{ +impl AsRef for TempDir { fn as_ref(&self) -> &PathBuf { &self.0 } } -impl AsRef for TempDir{ +impl AsRef for TempDir { fn as_ref(&self) -> &Path { &self.0 } } +pub async fn build( + files: HashMap, +) -> Result> { + use std::hash::*; + let mut hasher = std::hash::DefaultHasher::default(); + for (key, value) in &files { + key.hash(&mut hasher); + value.hash(&mut hasher); + } + let hash = hasher.finish(); -pub async fn build(files: HashMap) -> Result>{ - use std::hash::*; - let mut hasher = std::hash::DefaultHasher::default(); - for (key, value) in &files{ - key.hash(&mut hasher); - value.hash(&mut hasher); - } - let hash = hasher.finish(); + let mut work_dir = std::env::temp_dir(); + work_dir.push(format!("ghdl-relay-{hash:x?}")); + _ = std::fs::create_dir(&work_dir); + let work_dir = TempDir(work_dir); - let mut work_dir = std::env::temp_dir(); - work_dir.push(format!("ghdl-relay-{hash:x?}")); - _ = std::fs::create_dir(&work_dir); - let work_dir = TempDir(work_dir); + for (name, contents) in &files { + let mut path = work_dir.clone(); + path.push(name); + std::fs::write(path, contents)?; + } + let mut cmd = Command::new("ghdl"); + cmd.kill_on_drop(true); + cmd.args(["-a", "-g", "--std=08"]); + for name in files.keys() { + let mut path = work_dir.clone(); + path.push(name); + cmd.arg(path); + } + cmd.arg(std::fs::canonicalize("../rtl/tb.vhdl")?); - for (name, contents) in &files{ - let mut path = work_dir.clone(); - path.push(name); - std::fs::write(path, contents)?; - } - - - let mut cmd = Command::new("ghdl"); - cmd.kill_on_drop(true); - cmd.args(["-a", "-g", "--std=08"]); - for name in files.keys(){ - let mut path = work_dir.clone(); - path.push(name); - cmd.arg(path); - } - cmd.arg(std::fs::canonicalize("../rtl/tb.vhdl")?); - - cmd.stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()); - - cmd.current_dir(&work_dir); - ensure_ok(cmd.spawn()?).await?; + cmd.stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); - let mut cmd = Command::new("ghdl"); - cmd.kill_on_drop(true); - cmd.args(["-e", "--std=08"]); - cmd.arg(format!("-Wl,{}", std::fs::canonicalize("../conn/target/release/libvhdl_ui.a")?.display())); - cmd.arg("tb"); - cmd.current_dir(&work_dir); - cmd.stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()); - ensure_ok(cmd.spawn()?).await?; + cmd.current_dir(&work_dir); + ensure_ok(cmd.spawn()?).await?; - Ok(work_dir) -} \ No newline at end of file + let mut cmd = Command::new("ghdl"); + cmd.kill_on_drop(true); + cmd.args(["-e", "--std=08"]); + cmd.arg(format!( + "-Wl,{}", + std::fs::canonicalize("../conn/target/release/libvhdl_ui.a")?.display() + )); + cmd.arg("tb"); + cmd.current_dir(&work_dir); + cmd.stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); + ensure_ok(cmd.spawn()?).await?; + + Ok(work_dir) +} diff --git a/relay/src/local.rs b/relay/src/local.rs new file mode 100644 index 0000000..7dfeea6 --- /dev/null +++ b/relay/src/local.rs @@ -0,0 +1,258 @@ +use axum::{ + Error, extract::ws::{Message, WebSocket} +}; +use futures_util::{ + SinkExt, StreamExt, + stream::{SplitSink, SplitStream}, +}; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, path::PathBuf, time::Duration}; +use tokio::{ + io::{AsyncBufReadExt, BufReader, Lines}, + process::{Child, ChildStderr, ChildStdin, ChildStdout}, +}; + +use crate::{build, run}; + + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum ClientMsg { + Compile(Option>), + Start, + Stop, + Input { + /// bitfield of 32 switches + switch: u32, + /// bitfield of 32 buttons + buttons: u32, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum ServerMsg<'a> { + Log { stream: &'a str, line: &'a str }, + Start, + Stop, + Led(u32), + Seg0(u32), + Seg1(u32), + Seg2(u32), + Seg3(u32), +} + +struct Process { + process: Child, + + stderr: Lines>, + stdout: Lines>, + stdin: ChildStdin, +} + +enum Mode { + SingleLocal, + Remote, +} + +struct Handler { + sender: SplitSink, + receiver: SplitStream, + + build_dir: PathBuf, + src_dir: PathBuf, + + program: Option, + process: Option, + + refresh_time: Duration, +} + +type HResult = Result>; + +impl Handler { + + async fn local(socket: WebSocket, build: PathBuf, src: PathBuf) -> Self { + let (sender, receiver) = socket.split(); + Self { + sender, + receiver, + build_dir: build, + src_dir: src, + program: None, + process: None, + refresh_time: Duration::from_millis(30), + } + } + + async fn print(&mut self, msg: impl AsRef) { + println!("stdout: {}", msg.as_ref()); + let msg = ServerMsg::Log { + stream: "stdout", + line: msg.as_ref(), + }; + _ = self.sender.send(Message::Text(serde_json::to_string(&msg).unwrap_or_default().into())).await; + } + + pub async fn eprint(&mut self, msg: impl AsRef) { + println!("stderr: {}", msg.as_ref()); + let msg = ServerMsg::Log { + stream: "stderr", + line: msg.as_ref(), + }; + _ = self.sender.send(Message::Text(serde_json::to_string(&msg).unwrap_or_default().into())).await; + } + + async fn stop_process(&mut self) { + self.process = None; + _ = self.sender.send(Message::Text(serde_json::to_string(&ServerMsg::Stop).unwrap_or_default().into())).await; + } + + async fn handle_websocket_msg(&mut self, msg: ClientMsg) -> HResult { + match msg{ + ClientMsg::Compile(_) => todo!(), + ClientMsg::Start => self.run_program().await, + ClientMsg::Stop => self.stop_process().await, + ClientMsg::Input { switch, buttons } => { + if let Some(process) = &mut self.process{ + use tokio::io::AsyncWriteExt; + process.stdin.write_all(format!("btn={}\n", buttons).as_bytes()).await?; + process.stdin.write_all(format!("sw={}\n", switch).as_bytes()).await?; + } + }, + } + Ok(false) + } + + async fn handle_websocket_receive( + &mut self, + msg: Option>, + ) -> bool { + match msg { + Some(Ok(Message::Close(_))) => true, + Some(Ok(Message::Text(msg))) => { + let msg = match serde_json::from_str(msg.as_str()){ + Ok(msg) => msg, + Err(err) => { + self.eprint(format!("Client message error {err}")).await; + return false; + } + }; + self.handle_websocket_msg(msg).await; + false + } + Some(Ok(_)) => false, + Some(Err(err)) => { + self.eprint(format!("Client websocket error {err}")).await; + true + }, + None => true, + } + } + + async fn run(&mut self) -> Result<(), Box> { + loop { + if let Some(process) = &mut self.process { + if let Ok(Some(_)) = process.process.try_wait(){ + self.stop_process().await; + continue; + } + tokio::select! { + receive = self.receiver.next() => { + if self.handle_websocket_receive(receive).await{ + break; + } + } + out = process.stdout.next_line() => { + match out { + Ok(Some(line)) => self.print(line).await, + Ok(None) => self.stop_process().await, + Err(err) => { + self.eprint(format!("Failed to read proccess sout: {err}")).await; + self.stop_process().await; + } + } + } + err = process.stderr.next_line() => { + match err{ + Ok(Some(line)) => { + let msg = if let Some(repr) = line.strip_prefix("led="){ + ServerMsg::Led(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg0="){ + ServerMsg::Seg0(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg1="){ + ServerMsg::Seg1(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg2="){ + ServerMsg::Seg2(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg3="){ + ServerMsg::Seg3(repr.parse().unwrap_or(0)) + }else{ + self.eprint(line).await; + continue; + }; + self.sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?; + }, + Ok(None) => self.stop_process().await, + Err(err) => { + self.eprint(format!("Failed to read proccess serr: {err}")).await; + self.stop_process().await; + } + } + } + _ = tokio::time::sleep(self.refresh_time) => { + use tokio::io::AsyncWriteExt; + _ = process.stdin.write_all("\n".as_bytes()).await; + } + } + }else{ + let res = self.receiver.next().await; + if self.handle_websocket_receive(res).await{ + break; + } + } + } + Ok(()) + } + + async fn build_program(&mut self) { + let files = if let Some(Ok(Message::Text(msg))) = self.receiver.next().await + && let Ok(files) = serde_json::from_str::<'_, HashMap>(&msg) + { + files + } else { + return; + }; + + let artifact_dir = match build::build(files).await { + Ok(dir) => dir, + Err(err) => { + _ = self + .sender + .send(Message::Text(format!("Failed to build: {err}").into())) + .await; + return; + } + }; + } + + async fn run_program(&mut self) { + let process = match run::run(&self.build_dir).await { + Ok(process) => process, + Err(err) => { + self.eprint(format!("Failed to run: {err}")).await; + return; + } + }; + let stdout = BufReader::new(process.stdout).lines(); + let stderr = BufReader::new(process.stderr).lines(); + let stdin = process.stdin; + + self.process = Some( + Process { process: process.child, stderr, stdout, stdin } + ) + } +} + +pub async fn ws_handler(socket: WebSocket) { + Handler::local(socket, "target".into(), "src".into()); +} diff --git a/relay/src/main.rs b/relay/src/main.rs index b30cd34..0ae2b39 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -3,24 +3,30 @@ use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, routing::get, }; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{ + stream::{SplitSink, SplitStream}, +}; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, net::SocketAddr}; +use std::{collections::HashMap, net::SocketAddr, path::PathBuf, time::Duration}; use tokio::{ - io::{AsyncBufReadExt, BufReader}, + io::{BufReader, Lines}, + process::{Child, ChildStderr, ChildStdin, ChildStdout}, }; use tower_http::services::ServeDir; +use crate::build::TempDir; + pub mod build; pub mod run; - +pub mod local; +pub mod remote; #[tokio::main] async fn main() { let app = Router::new() .route( "/ws", - get(|ws: WebSocketUpgrade| async move { ws.on_upgrade(ws_handler) }), + get(|ws: WebSocketUpgrade| async move { ws.on_upgrade(remote::ws_handler) }), ) .fallback_service(ServeDir::new("ui")); @@ -32,132 +38,52 @@ async fn main() { .unwrap(); } - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] -struct ClientInput{ - /// bitfield of 32 switches - switch: u32, - /// bitfield of 32 buttons - buttons: u32 +pub enum ClientMsg { + Compile(Option>), + Start, + Stop, + Input { + /// bitfield of 32 switches + switch: u32, + /// bitfield of 32 buttons + buttons: u32, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -enum ServerMsg<'a> { +pub enum ServerMsg<'a> { Log { stream: &'a str, line: &'a str }, + Start, + Stop, Led(u32), Seg0(u32), Seg1(u32), Seg2(u32), - Seg3(u32) + Seg3(u32), } -async fn ws_handler(socket: WebSocket) { - let (mut sender, mut receiver) = socket.split(); +struct Process { + process: Child, - let files = if let Some(Ok(Message::Text(msg))) = receiver.next().await - && let Ok(files) = serde_json::from_str::<'_, HashMap>(&msg) - { - files - } else { - return; - }; + stderr: Lines>, + stdout: Lines>, + stdin: ChildStdin, +} +struct Handler { + sender: SplitSink, + receiver: SplitStream, - let artifact_dir = match build::build(files).await{ - Ok(dir) => dir, - Err(err) => { - _ = sender.send(Message::Text(format!("Failed to build: {err}").into())).await; - return; - }, - }; + build_dir: TempDir, + src_dir: PathBuf, - let mut process = match run::run(&artifact_dir).await{ - Ok(process) => process, - Err(err) => { - _ = sender.send(Message::Text(format!("Failed to run: {err}").into())).await; - return; - }, - }; - let mut sout = BufReader::new(process.stdout).lines(); - let mut serr = BufReader::new(process.stderr).lines(); + program: Option, + process: Option, - let artifact_prefix = artifact_dir.to_str().unwrap_or("\0\0NOPE"); - - let result: Result<(), Box> = async { - loop{ - tokio::select! { - receive = receiver.next() => { - match receive{ - Some(Ok(Message::Close(_))) => break, - Some(Ok(Message::Text(msg))) => { - let input = serde_json::from_str::<'_, ClientInput>(&msg)?; - use tokio::io::AsyncWriteExt; - process.stdin.write_all(format!("btn={}\n", input.buttons).as_bytes()).await?; - process.stdin.write_all(format!("sw={}\n", input.switch).as_bytes()).await?; - }, - Some(Ok(_)) => {}, - Some(Err(err)) => Err(err)?, - _ => break, - } - } - out = sout.next_line() => { - match out{ - Ok(Some(line)) => { - - let msg = ServerMsg::Log { - stream: "stdout", - line: line.strip_prefix(artifact_prefix).unwrap_or(&line), - }; - sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?; - }, - Ok(None) => break, - Err(err) => { - Err(format!("Failed to read proccess sout: {err}"))?; - } - } - } - err = serr.next_line() => { - match err{ - Ok(Some(line)) => { - let msg = if let Some(repr) = line.strip_prefix("led="){ - ServerMsg::Led(repr.parse().unwrap_or(0)) - }else if let Some(repr) = line.strip_prefix("seg0="){ - ServerMsg::Seg0(repr.parse().unwrap_or(0)) - }else if let Some(repr) = line.strip_prefix("seg1="){ - ServerMsg::Seg1(repr.parse().unwrap_or(0)) - }else if let Some(repr) = line.strip_prefix("seg2="){ - ServerMsg::Seg2(repr.parse().unwrap_or(0)) - }else if let Some(repr) = line.strip_prefix("seg3="){ - ServerMsg::Seg3(repr.parse().unwrap_or(0)) - }else{ - ServerMsg::Log { - stream: "stderr", - line: line.strip_prefix(artifact_prefix).unwrap_or(&line), - } - }; - sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?; - }, - Ok(None) => break, - Err(err) => { - Err(format!("Failed to read proccess serr: {err}"))? - } - } - } - _ = tokio::time::sleep(std::time::Duration::from_millis(30)) => { - use tokio::io::AsyncWriteExt; - process.stdin.write_all("\n".as_bytes()).await?; - } - } - } - Ok(()) - }.await; + refresh_time: Duration, +} - match result{ - Ok(_) => {}, - Err(err) => { - _ = sender.send(Message::Text(format!("{err}").into())).await; - }, - } -} \ No newline at end of file +pub type HResult = Result>; \ No newline at end of file diff --git a/relay/src/remote.rs b/relay/src/remote.rs new file mode 100644 index 0000000..64cd133 --- /dev/null +++ b/relay/src/remote.rs @@ -0,0 +1,128 @@ +use axum::{ + extract::ws::{Message, WebSocket}, +}; +use futures_util::{ + SinkExt, StreamExt, +}; +use std::collections::HashMap; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, +}; + +use crate::{ClientMsg, HResult, ServerMsg, build, run}; + +pub async fn ws_handler(socket: WebSocket) { + let (mut sender, mut receiver) = socket.split(); + + let files = if let Some(Ok(Message::Text(msg))) = receiver.next().await + && let Ok(files) = serde_json::from_str::<'_, HashMap>(&msg) + { + files + } else { + return; + }; + + + let artifact_dir = match build::build(files).await{ + Ok(dir) => dir, + Err(err) => { + _ = sender.send(Message::Text(format!("Failed to build: {err}").into())).await; + return; + }, + }; + + let mut process = match run::run(&artifact_dir).await{ + Ok(process) => process, + Err(err) => { + _ = sender.send(Message::Text(format!("Failed to run: {err}").into())).await; + return; + }, + }; + let mut sout = BufReader::new(process.stdout).lines(); + let mut serr = BufReader::new(process.stderr).lines(); + + let artifact_prefix = artifact_dir.to_str().unwrap_or("\0\0NOPE"); + + let result: HResult<()> = async { + loop{ + tokio::select! { + receive = receiver.next() => { + match receive{ + Some(Ok(Message::Close(_))) => break, + Some(Ok(Message::Text(msg))) => { + let input = serde_json::from_str::<'_, ClientMsg>(&msg)?; + match input{ + ClientMsg::Compile(_) => {}, + ClientMsg::Start => {}, + ClientMsg::Stop => break, + ClientMsg::Input { switch, buttons } => { + use tokio::io::AsyncWriteExt; + process.stdin.write_all(format!("btn={}\n", buttons).as_bytes()).await?; + process.stdin.write_all(format!("sw={}\n", switch).as_bytes()).await?; + }, + } + }, + Some(Ok(_)) => {}, + Some(Err(err)) => Err(err)?, + _ => break, + } + } + out = sout.next_line() => { + match out{ + Ok(Some(line)) => { + + let msg = ServerMsg::Log { + stream: "stdout", + line: line.strip_prefix(artifact_prefix).unwrap_or(&line), + }; + sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?; + }, + Ok(None) => break, + Err(err) => { + Err(format!("Failed to read proccess sout: {err}"))?; + } + } + } + err = serr.next_line() => { + match err{ + Ok(Some(line)) => { + let msg = if let Some(repr) = line.strip_prefix("led="){ + ServerMsg::Led(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg0="){ + ServerMsg::Seg0(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg1="){ + ServerMsg::Seg1(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg2="){ + ServerMsg::Seg2(repr.parse().unwrap_or(0)) + }else if let Some(repr) = line.strip_prefix("seg3="){ + ServerMsg::Seg3(repr.parse().unwrap_or(0)) + }else{ + ServerMsg::Log { + stream: "stderr", + line: line.strip_prefix(artifact_prefix).unwrap_or(&line), + } + }; + sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?; + }, + Ok(None) => break, + Err(err) => { + Err(format!("Failed to read proccess serr: {err}"))? + } + } + } + _ = tokio::time::sleep(std::time::Duration::from_millis(30)) => { + use tokio::io::AsyncWriteExt; + process.stdin.write_all("\n".as_bytes()).await?; + } + } + } + Ok(()) + }.await; + + match result{ + Ok(_) => {}, + Err(err) => { + _ = sender.send(Message::Text(format!("{err}").into())).await; + }, + } +} \ No newline at end of file diff --git a/relay/src/run.rs b/relay/src/run.rs index 37764d2..08c898b 100644 --- a/relay/src/run.rs +++ b/relay/src/run.rs @@ -1,17 +1,24 @@ -use std::{path::Path}; +use std::path::Path; -use tokio::process::{Child, ChildStdin, ChildStdout, ChildStderr, Command}; +use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; -pub struct Process{ +pub struct Process { pub child: Child, pub stdin: ChildStdin, - pub stdout: ChildStdout, - pub stderr: ChildStderr + pub stdout: ChildStdout, + pub stderr: ChildStderr, } -pub async fn run(artifact_dir: &Path) -> Result>{ - let mut cmd = Command::new("ghdl"); - cmd.args(["-r", "--std=08", "tb", "--stop-delta=4294967296", "--unbuffered", "--"]); +pub async fn run(artifact_dir: &Path) -> Result> { + let mut cmd = Command::new("ghdl"); + cmd.args([ + "-r", + "--std=08", + "tb", + "--stop-delta=4294967296", + "--unbuffered", + "--", + ]); cmd.args(std::env::args_os()); cmd.current_dir(artifact_dir); cmd.kill_on_drop(true); @@ -26,5 +33,10 @@ pub async fn run(artifact_dir: &Path) -> Result