adding local support

This commit is contained in:
ParkerTenBroeck 2026-03-10 18:14:50 -04:00
parent 191101591e
commit 92102e3629
5 changed files with 515 additions and 183 deletions

View file

@ -1,92 +1,100 @@
use std::{collections::HashMap, ops::Deref, path::{Path, PathBuf}}; use std::{
use tokio::{ collections::HashMap,
process::{Child, Command}, ops::Deref,
path::{Path, PathBuf},
}; };
use tokio::process::{Child, Command};
async fn ensure_ok(child: Child) -> Result<(), Box<dyn std::error::Error + Send + Sync>>{ async fn ensure_ok(child: Child) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let result = child.wait_with_output().await?; let result = child.wait_with_output().await?;
if !result.status.success(){ if !result.status.success() {
return Err(format!("{}\n{}", String::from_utf8_lossy(&result.stdout), String::from_utf8_lossy(&result.stderr)))? return Err(format!(
"{}\n{}",
String::from_utf8_lossy(&result.stdout),
String::from_utf8_lossy(&result.stderr)
))?;
} }
Ok(()) Ok(())
} }
pub struct TempDir(PathBuf); pub struct TempDir(PathBuf);
impl Drop for TempDir{ impl Drop for TempDir {
fn drop(&mut self) { fn drop(&mut self) {
_ = std::fs::remove_dir_all(&self.0) _ = std::fs::remove_dir_all(&self.0)
} }
} }
impl Deref for TempDir{ impl Deref for TempDir {
type Target = PathBuf; type Target = PathBuf;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 &self.0
} }
} }
impl AsRef<PathBuf> for TempDir{ impl AsRef<PathBuf> for TempDir {
fn as_ref(&self) -> &PathBuf { fn as_ref(&self) -> &PathBuf {
&self.0 &self.0
} }
} }
impl AsRef<Path> for TempDir{ impl AsRef<Path> for TempDir {
fn as_ref(&self) -> &Path { fn as_ref(&self) -> &Path {
&self.0 &self.0
} }
} }
pub async fn build(
files: HashMap<String, String>,
) -> Result<TempDir, Box<dyn std::error::Error + Send + Sync>> {
use std::hash::*;
let mut hasher = std::hash::DefaultHasher::default();
for (key, value) in &files {
key.hash(&mut hasher);
value.hash(&mut hasher);
}
let hash = hasher.finish();
pub async fn build(files: HashMap<String, String>) -> Result<TempDir, Box<dyn std::error::Error + Send + Sync>>{ let mut work_dir = std::env::temp_dir();
use std::hash::*; work_dir.push(format!("ghdl-relay-{hash:x?}"));
let mut hasher = std::hash::DefaultHasher::default(); _ = std::fs::create_dir(&work_dir);
for (key, value) in &files{ let work_dir = TempDir(work_dir);
key.hash(&mut hasher);
value.hash(&mut hasher);
}
let hash = hasher.finish();
let mut work_dir = std::env::temp_dir(); for (name, contents) in &files {
work_dir.push(format!("ghdl-relay-{hash:x?}")); let mut path = work_dir.clone();
_ = std::fs::create_dir(&work_dir); path.push(name);
let work_dir = TempDir(work_dir); std::fs::write(path, contents)?;
}
let mut cmd = Command::new("ghdl");
cmd.kill_on_drop(true);
cmd.args(["-a", "-g", "--std=08"]);
for name in files.keys() {
let mut path = work_dir.clone();
path.push(name);
cmd.arg(path);
}
cmd.arg(std::fs::canonicalize("../rtl/tb.vhdl")?);
for (name, contents) in &files{ cmd.stdin(std::process::Stdio::piped())
let mut path = work_dir.clone(); .stdout(std::process::Stdio::piped())
path.push(name); .stderr(std::process::Stdio::piped());
std::fs::write(path, contents)?;
}
let mut cmd = Command::new("ghdl");
cmd.kill_on_drop(true);
cmd.args(["-a", "-g", "--std=08"]);
for name in files.keys(){
let mut path = work_dir.clone();
path.push(name);
cmd.arg(path);
}
cmd.arg(std::fs::canonicalize("../rtl/tb.vhdl")?);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
cmd.current_dir(&work_dir);
ensure_ok(cmd.spawn()?).await?;
let mut cmd = Command::new("ghdl"); cmd.current_dir(&work_dir);
cmd.kill_on_drop(true); ensure_ok(cmd.spawn()?).await?;
cmd.args(["-e", "--std=08"]);
cmd.arg(format!("-Wl,{}", std::fs::canonicalize("../conn/target/release/libvhdl_ui.a")?.display()));
cmd.arg("tb");
cmd.current_dir(&work_dir);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
ensure_ok(cmd.spawn()?).await?;
Ok(work_dir) let mut cmd = Command::new("ghdl");
} cmd.kill_on_drop(true);
cmd.args(["-e", "--std=08"]);
cmd.arg(format!(
"-Wl,{}",
std::fs::canonicalize("../conn/target/release/libvhdl_ui.a")?.display()
));
cmd.arg("tb");
cmd.current_dir(&work_dir);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
ensure_ok(cmd.spawn()?).await?;
Ok(work_dir)
}

258
relay/src/local.rs Normal file
View file

@ -0,0 +1,258 @@
use axum::{
Error, extract::ws::{Message, WebSocket}
};
use futures_util::{
SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, time::Duration};
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
process::{Child, ChildStderr, ChildStdin, ChildStdout},
};
use crate::{build, run};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ClientMsg {
Compile(Option<HashMap<String, String>>),
Start,
Stop,
Input {
/// bitfield of 32 switches
switch: u32,
/// bitfield of 32 buttons
buttons: u32,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum ServerMsg<'a> {
Log { stream: &'a str, line: &'a str },
Start,
Stop,
Led(u32),
Seg0(u32),
Seg1(u32),
Seg2(u32),
Seg3(u32),
}
struct Process {
process: Child,
stderr: Lines<BufReader<ChildStderr>>,
stdout: Lines<BufReader<ChildStdout>>,
stdin: ChildStdin,
}
enum Mode {
SingleLocal,
Remote,
}
struct Handler {
sender: SplitSink<WebSocket, Message>,
receiver: SplitStream<WebSocket>,
build_dir: PathBuf,
src_dir: PathBuf,
program: Option<PathBuf>,
process: Option<Process>,
refresh_time: Duration,
}
type HResult<T> = Result<T, Box<dyn std::error::Error + Sync + Send>>;
impl Handler {
async fn local(socket: WebSocket, build: PathBuf, src: PathBuf) -> Self {
let (sender, receiver) = socket.split();
Self {
sender,
receiver,
build_dir: build,
src_dir: src,
program: None,
process: None,
refresh_time: Duration::from_millis(30),
}
}
async fn print(&mut self, msg: impl AsRef<str>) {
println!("stdout: {}", msg.as_ref());
let msg = ServerMsg::Log {
stream: "stdout",
line: msg.as_ref(),
};
_ = self.sender.send(Message::Text(serde_json::to_string(&msg).unwrap_or_default().into())).await;
}
pub async fn eprint(&mut self, msg: impl AsRef<str>) {
println!("stderr: {}", msg.as_ref());
let msg = ServerMsg::Log {
stream: "stderr",
line: msg.as_ref(),
};
_ = self.sender.send(Message::Text(serde_json::to_string(&msg).unwrap_or_default().into())).await;
}
async fn stop_process(&mut self) {
self.process = None;
_ = self.sender.send(Message::Text(serde_json::to_string(&ServerMsg::Stop).unwrap_or_default().into())).await;
}
async fn handle_websocket_msg(&mut self, msg: ClientMsg) -> HResult<bool> {
match msg{
ClientMsg::Compile(_) => todo!(),
ClientMsg::Start => self.run_program().await,
ClientMsg::Stop => self.stop_process().await,
ClientMsg::Input { switch, buttons } => {
if let Some(process) = &mut self.process{
use tokio::io::AsyncWriteExt;
process.stdin.write_all(format!("btn={}\n", buttons).as_bytes()).await?;
process.stdin.write_all(format!("sw={}\n", switch).as_bytes()).await?;
}
},
}
Ok(false)
}
async fn handle_websocket_receive(
&mut self,
msg: Option<Result<Message, Error>>,
) -> bool {
match msg {
Some(Ok(Message::Close(_))) => true,
Some(Ok(Message::Text(msg))) => {
let msg = match serde_json::from_str(msg.as_str()){
Ok(msg) => msg,
Err(err) => {
self.eprint(format!("Client message error {err}")).await;
return false;
}
};
self.handle_websocket_msg(msg).await;
false
}
Some(Ok(_)) => false,
Some(Err(err)) => {
self.eprint(format!("Client websocket error {err}")).await;
true
},
None => true,
}
}
async fn run(&mut self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
loop {
if let Some(process) = &mut self.process {
if let Ok(Some(_)) = process.process.try_wait(){
self.stop_process().await;
continue;
}
tokio::select! {
receive = self.receiver.next() => {
if self.handle_websocket_receive(receive).await{
break;
}
}
out = process.stdout.next_line() => {
match out {
Ok(Some(line)) => self.print(line).await,
Ok(None) => self.stop_process().await,
Err(err) => {
self.eprint(format!("Failed to read proccess sout: {err}")).await;
self.stop_process().await;
}
}
}
err = process.stderr.next_line() => {
match err{
Ok(Some(line)) => {
let msg = if let Some(repr) = line.strip_prefix("led="){
ServerMsg::Led(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg0="){
ServerMsg::Seg0(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg1="){
ServerMsg::Seg1(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg2="){
ServerMsg::Seg2(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg3="){
ServerMsg::Seg3(repr.parse().unwrap_or(0))
}else{
self.eprint(line).await;
continue;
};
self.sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?;
},
Ok(None) => self.stop_process().await,
Err(err) => {
self.eprint(format!("Failed to read proccess serr: {err}")).await;
self.stop_process().await;
}
}
}
_ = tokio::time::sleep(self.refresh_time) => {
use tokio::io::AsyncWriteExt;
_ = process.stdin.write_all("\n".as_bytes()).await;
}
}
}else{
let res = self.receiver.next().await;
if self.handle_websocket_receive(res).await{
break;
}
}
}
Ok(())
}
async fn build_program(&mut self) {
let files = if let Some(Ok(Message::Text(msg))) = self.receiver.next().await
&& let Ok(files) = serde_json::from_str::<'_, HashMap<String, String>>(&msg)
{
files
} else {
return;
};
let artifact_dir = match build::build(files).await {
Ok(dir) => dir,
Err(err) => {
_ = self
.sender
.send(Message::Text(format!("Failed to build: {err}").into()))
.await;
return;
}
};
}
async fn run_program(&mut self) {
let process = match run::run(&self.build_dir).await {
Ok(process) => process,
Err(err) => {
self.eprint(format!("Failed to run: {err}")).await;
return;
}
};
let stdout = BufReader::new(process.stdout).lines();
let stderr = BufReader::new(process.stderr).lines();
let stdin = process.stdin;
self.process = Some(
Process { process: process.child, stderr, stdout, stdin }
)
}
}
pub async fn ws_handler(socket: WebSocket) {
Handler::local(socket, "target".into(), "src".into());
}

View file

@ -3,24 +3,30 @@ use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::ws::{Message, WebSocket, WebSocketUpgrade},
routing::get, routing::get,
}; };
use futures_util::{SinkExt, StreamExt}; use futures_util::{
stream::{SplitSink, SplitStream},
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::SocketAddr}; use std::{collections::HashMap, net::SocketAddr, path::PathBuf, time::Duration};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, BufReader}, io::{BufReader, Lines},
process::{Child, ChildStderr, ChildStdin, ChildStdout},
}; };
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
use crate::build::TempDir;
pub mod build; pub mod build;
pub mod run; pub mod run;
pub mod local;
pub mod remote;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let app = Router::new() let app = Router::new()
.route( .route(
"/ws", "/ws",
get(|ws: WebSocketUpgrade| async move { ws.on_upgrade(ws_handler) }), get(|ws: WebSocketUpgrade| async move { ws.on_upgrade(remote::ws_handler) }),
) )
.fallback_service(ServeDir::new("ui")); .fallback_service(ServeDir::new("ui"));
@ -32,132 +38,52 @@ async fn main() {
.unwrap(); .unwrap();
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")] #[serde(tag = "type", rename_all = "snake_case")]
struct ClientInput{ pub enum ClientMsg {
/// bitfield of 32 switches Compile(Option<HashMap<String, String>>),
switch: u32, Start,
/// bitfield of 32 buttons Stop,
buttons: u32 Input {
/// bitfield of 32 switches
switch: u32,
/// bitfield of 32 buttons
buttons: u32,
},
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
enum ServerMsg<'a> { pub enum ServerMsg<'a> {
Log { stream: &'a str, line: &'a str }, Log { stream: &'a str, line: &'a str },
Start,
Stop,
Led(u32), Led(u32),
Seg0(u32), Seg0(u32),
Seg1(u32), Seg1(u32),
Seg2(u32), Seg2(u32),
Seg3(u32) Seg3(u32),
} }
async fn ws_handler(socket: WebSocket) { struct Process {
let (mut sender, mut receiver) = socket.split(); process: Child,
let files = if let Some(Ok(Message::Text(msg))) = receiver.next().await stderr: Lines<BufReader<ChildStderr>>,
&& let Ok(files) = serde_json::from_str::<'_, HashMap<String, String>>(&msg) stdout: Lines<BufReader<ChildStdout>>,
{ stdin: ChildStdin,
files }
} else {
return;
};
struct Handler {
sender: SplitSink<WebSocket, Message>,
receiver: SplitStream<WebSocket>,
let artifact_dir = match build::build(files).await{ build_dir: TempDir,
Ok(dir) => dir, src_dir: PathBuf,
Err(err) => {
_ = sender.send(Message::Text(format!("Failed to build: {err}").into())).await;
return;
},
};
let mut process = match run::run(&artifact_dir).await{ program: Option<PathBuf>,
Ok(process) => process, process: Option<Process>,
Err(err) => {
_ = sender.send(Message::Text(format!("Failed to run: {err}").into())).await;
return;
},
};
let mut sout = BufReader::new(process.stdout).lines();
let mut serr = BufReader::new(process.stderr).lines();
let artifact_prefix = artifact_dir.to_str().unwrap_or("\0\0NOPE"); refresh_time: Duration,
}
let result: Result<(), Box<dyn std::error::Error + Sync + Send>> = async {
loop{
tokio::select! {
receive = receiver.next() => {
match receive{
Some(Ok(Message::Close(_))) => break,
Some(Ok(Message::Text(msg))) => {
let input = serde_json::from_str::<'_, ClientInput>(&msg)?;
use tokio::io::AsyncWriteExt;
process.stdin.write_all(format!("btn={}\n", input.buttons).as_bytes()).await?;
process.stdin.write_all(format!("sw={}\n", input.switch).as_bytes()).await?;
},
Some(Ok(_)) => {},
Some(Err(err)) => Err(err)?,
_ => break,
}
}
out = sout.next_line() => {
match out{
Ok(Some(line)) => {
let msg = ServerMsg::Log {
stream: "stdout",
line: line.strip_prefix(artifact_prefix).unwrap_or(&line),
};
sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?;
},
Ok(None) => break,
Err(err) => {
Err(format!("Failed to read proccess sout: {err}"))?;
}
}
}
err = serr.next_line() => {
match err{
Ok(Some(line)) => {
let msg = if let Some(repr) = line.strip_prefix("led="){
ServerMsg::Led(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg0="){
ServerMsg::Seg0(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg1="){
ServerMsg::Seg1(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg2="){
ServerMsg::Seg2(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg3="){
ServerMsg::Seg3(repr.parse().unwrap_or(0))
}else{
ServerMsg::Log {
stream: "stderr",
line: line.strip_prefix(artifact_prefix).unwrap_or(&line),
}
};
sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?;
},
Ok(None) => break,
Err(err) => {
Err(format!("Failed to read proccess serr: {err}"))?
}
}
}
_ = tokio::time::sleep(std::time::Duration::from_millis(30)) => {
use tokio::io::AsyncWriteExt;
process.stdin.write_all("\n".as_bytes()).await?;
}
}
}
Ok(())
}.await;
match result{ pub type HResult<T> = Result<T, Box<dyn std::error::Error + Sync + Send>>;
Ok(_) => {},
Err(err) => {
_ = sender.send(Message::Text(format!("{err}").into())).await;
},
}
}

128
relay/src/remote.rs Normal file
View file

@ -0,0 +1,128 @@
use axum::{
extract::ws::{Message, WebSocket},
};
use futures_util::{
SinkExt, StreamExt,
};
use std::collections::HashMap;
use tokio::{
io::{AsyncBufReadExt, BufReader},
};
use crate::{ClientMsg, HResult, ServerMsg, build, run};
pub async fn ws_handler(socket: WebSocket) {
let (mut sender, mut receiver) = socket.split();
let files = if let Some(Ok(Message::Text(msg))) = receiver.next().await
&& let Ok(files) = serde_json::from_str::<'_, HashMap<String, String>>(&msg)
{
files
} else {
return;
};
let artifact_dir = match build::build(files).await{
Ok(dir) => dir,
Err(err) => {
_ = sender.send(Message::Text(format!("Failed to build: {err}").into())).await;
return;
},
};
let mut process = match run::run(&artifact_dir).await{
Ok(process) => process,
Err(err) => {
_ = sender.send(Message::Text(format!("Failed to run: {err}").into())).await;
return;
},
};
let mut sout = BufReader::new(process.stdout).lines();
let mut serr = BufReader::new(process.stderr).lines();
let artifact_prefix = artifact_dir.to_str().unwrap_or("\0\0NOPE");
let result: HResult<()> = async {
loop{
tokio::select! {
receive = receiver.next() => {
match receive{
Some(Ok(Message::Close(_))) => break,
Some(Ok(Message::Text(msg))) => {
let input = serde_json::from_str::<'_, ClientMsg>(&msg)?;
match input{
ClientMsg::Compile(_) => {},
ClientMsg::Start => {},
ClientMsg::Stop => break,
ClientMsg::Input { switch, buttons } => {
use tokio::io::AsyncWriteExt;
process.stdin.write_all(format!("btn={}\n", buttons).as_bytes()).await?;
process.stdin.write_all(format!("sw={}\n", switch).as_bytes()).await?;
},
}
},
Some(Ok(_)) => {},
Some(Err(err)) => Err(err)?,
_ => break,
}
}
out = sout.next_line() => {
match out{
Ok(Some(line)) => {
let msg = ServerMsg::Log {
stream: "stdout",
line: line.strip_prefix(artifact_prefix).unwrap_or(&line),
};
sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?;
},
Ok(None) => break,
Err(err) => {
Err(format!("Failed to read proccess sout: {err}"))?;
}
}
}
err = serr.next_line() => {
match err{
Ok(Some(line)) => {
let msg = if let Some(repr) = line.strip_prefix("led="){
ServerMsg::Led(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg0="){
ServerMsg::Seg0(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg1="){
ServerMsg::Seg1(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg2="){
ServerMsg::Seg2(repr.parse().unwrap_or(0))
}else if let Some(repr) = line.strip_prefix("seg3="){
ServerMsg::Seg3(repr.parse().unwrap_or(0))
}else{
ServerMsg::Log {
stream: "stderr",
line: line.strip_prefix(artifact_prefix).unwrap_or(&line),
}
};
sender.send(Message::Text(serde_json::to_string(&msg)?.into())).await?;
},
Ok(None) => break,
Err(err) => {
Err(format!("Failed to read proccess serr: {err}"))?
}
}
}
_ = tokio::time::sleep(std::time::Duration::from_millis(30)) => {
use tokio::io::AsyncWriteExt;
process.stdin.write_all("\n".as_bytes()).await?;
}
}
}
Ok(())
}.await;
match result{
Ok(_) => {},
Err(err) => {
_ = sender.send(Message::Text(format!("{err}").into())).await;
},
}
}

View file

@ -1,17 +1,24 @@
use std::{path::Path}; use std::path::Path;
use tokio::process::{Child, ChildStdin, ChildStdout, ChildStderr, Command}; use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
pub struct Process{ pub struct Process {
pub child: Child, pub child: Child,
pub stdin: ChildStdin, pub stdin: ChildStdin,
pub stdout: ChildStdout, pub stdout: ChildStdout,
pub stderr: ChildStderr pub stderr: ChildStderr,
} }
pub async fn run(artifact_dir: &Path) -> Result<Process, Box<dyn std::error::Error + Send + Sync>>{ pub async fn run(artifact_dir: &Path) -> Result<Process, Box<dyn std::error::Error + Send + Sync>> {
let mut cmd = Command::new("ghdl"); let mut cmd = Command::new("ghdl");
cmd.args(["-r", "--std=08", "tb", "--stop-delta=4294967296", "--unbuffered", "--"]); cmd.args([
"-r",
"--std=08",
"tb",
"--stop-delta=4294967296",
"--unbuffered",
"--",
]);
cmd.args(std::env::args_os()); cmd.args(std::env::args_os());
cmd.current_dir(artifact_dir); cmd.current_dir(artifact_dir);
cmd.kill_on_drop(true); cmd.kill_on_drop(true);
@ -26,5 +33,10 @@ pub async fn run(artifact_dir: &Path) -> Result<Process, Box<dyn std::error::Er
let stdout = child.stdout.take().ok_or("no stdout")?; let stdout = child.stdout.take().ok_or("no stdout")?;
let stderr = child.stderr.take().ok_or("no stderr")?; let stderr = child.stderr.take().ok_or("no stderr")?;
Ok(Process { child, stdin, stdout, stderr }) Ok(Process {
} child,
stdin,
stdout,
stderr,
})
}