From 0d23093d3b5a06498c89e35770d3f22e51a04853 Mon Sep 17 00:00:00 2001 From: Parker TenBroeck <51721964+ParkerTenBroeck@users.noreply.github.com> Date: Sun, 4 May 2025 17:14:33 -0400 Subject: [PATCH] added async file IO --- src/AsyncExamples.java | 79 ++------- src/Main.java | 28 ++- src/async_runtime/io/Readable.java | 10 ++ .../{ => io}/SelectorThread.java | 2 +- src/async_runtime/io/Writable.java | 10 ++ src/async_runtime/io/fs/File.java | 165 ++++++++++++++++++ .../{ => io}/net/DatagramSocket.java | 13 +- .../{ => io}/net/ServerSocket.java | 9 +- src/async_runtime/{ => io}/net/Socket.java | 12 +- .../loadtime/GeneratorClassLoader.java | 4 +- 10 files changed, 252 insertions(+), 80 deletions(-) create mode 100644 src/async_runtime/io/Readable.java rename src/async_runtime/{ => io}/SelectorThread.java (98%) create mode 100644 src/async_runtime/io/Writable.java create mode 100644 src/async_runtime/io/fs/File.java rename src/async_runtime/{ => io}/net/DatagramSocket.java (93%) rename src/async_runtime/{ => io}/net/ServerSocket.java (90%) rename src/async_runtime/{ => io}/net/Socket.java (93%) diff --git a/src/AsyncExamples.java b/src/AsyncExamples.java index 84e247e..09958e5 100644 --- a/src/AsyncExamples.java +++ b/src/AsyncExamples.java @@ -1,10 +1,7 @@ -import async_runtime.Delay; import async_runtime.Jokio; -import async_runtime.Util; -import async_runtime.net.ServerSocket; -import async_runtime.net.Socket; +import async_runtime.io.net.ServerSocket; +import async_runtime.io.net.Socket; import future.Future; -import future.Waker; import generators.loadtime.future.Cancellation; import java.io.IOException; @@ -14,94 +11,52 @@ import java.nio.charset.StandardCharsets; public class AsyncExamples { - private static long sent = 0; - private static long received = 0; - public static Future test(){ + public static Future run() throws IOException { Jokio.runtime().await().spawn(server()); - for(int i = 0; i < 50; i ++){ var builder = new StringBuilder(); - for(int c = 0; c < 4096*16*3; c ++) + for(int c = 0; c < 4096; c ++) builder.append((char)((Math.random()*('z'-'a')+'a'))); - Jokio.runtime().await().spawn(echoForever(builder.toString())); - } - var start = System.currentTimeMillis(); - while(true){ - Delay.delay(100).await(); - var now = System.currentTimeMillis(); - System.out.println(sent + " " + received + " " + (now-start)); - start = now; + Jokio.runtime().await().spawn(verify_echo(builder.toString())); } + + return Future.ret(); } - public static Future number(){ - return Future.ret(12); - } - - public static Future server(){ - try(var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){ + public static Future server() throws IOException { + try(@Cancellation("close") var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){ while (true){ - Util.select( - Util.selectee(ss.accept(), socket -> { - Jokio.runtime(Waker.waker()).spawn(echo(socket)); - return Future.ret(); - }), - Util.selectee(Delay.delay(500), _ -> { - System.out.println("Timeout"); - return Future.ret(); - }) - ).await().await(); + Jokio.runtime().await().spawn(echo(ss.accept().await())); } - } catch (Throwable e) { - e.printStackTrace(); } - return Future.ret(null); } - public static synchronized Future meow(){ - Delay.delay(1000).await(); - return Future.ret(); - } - - public synchronized Future meow2(){ - Delay.delay(1000).await(); - return Future.ret(); - } - - public static Future echo(Socket socket){ + public static Future echo(@Cancellation("close") Socket socket) throws IOException{ try(socket){ - @Cancellation var buffer = ByteBuffer.allocate(4096*16*3); + var buffer = ByteBuffer.allocate(4096); while(true){ - var read = socket.read(buffer).await(); - buffer.clear().limit(read); + socket.read(buffer).await(); + buffer.flip(); socket.write_all(buffer).await(); buffer.clear(); } - } catch (Exception e) { - throw new RuntimeException(e); } } - public static Future echoForever(String message){ + public static Future verify_echo(String message) throws IOException { byte[] msg_bytes = message.getBytes(StandardCharsets.UTF_8); try(@Cancellation("close") var socket = Socket.connect(new InetSocketAddress("localhost", 42069)).await()){ var buffer = ByteBuffer.allocate(message.length()); while(true){ buffer.limit(message.length()).put(msg_bytes).position(0); var wrote = socket.write_all(buffer).await(); - sent++; buffer.clear().limit(wrote); socket.read_all(buffer).await(); -// if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes))) -// throw new RuntimeException(); - received++; + if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes))) + throw new RuntimeException(); buffer.clear(); } - } catch (Exception e) { - - e.printStackTrace(); } - return Future.ret(null); } } diff --git a/src/Main.java b/src/Main.java index f1679ea..11dc7e6 100644 --- a/src/Main.java +++ b/src/Main.java @@ -1,11 +1,13 @@ -import async_runtime.Delay; import async_runtime.Jokio; -import async_runtime.Util; +import async_runtime.io.fs.File; import generators.RT; import future.Future; import gen.Gen; import generators.loadtime.future.Cancellation; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; import java.util.function.Supplier; public class Main implements Runnable { @@ -15,8 +17,20 @@ public class Main implements Runnable { @Override public void run() { - lexer(); - await(); +// lexer(); +// await(); + try { + new Jokio().blocking(files()); + } catch (IOException ignore) {} + } + + static Future files() throws IOException{ + try(@Cancellation("close") var file = File.open(Path.of("./src/Main.java"))){ + var buf = ByteBuffer.allocate((int) file.size()); + var read = file.read_all(buf).await(); + System.out.println(new String(buf.array(), 0, read)); + } + return Future.ret(); } void async_lambda(Supplier> lambda){ @@ -25,7 +39,11 @@ public class Main implements Runnable { void await(){ - new Jokio().blocking(AsyncExamples.test()); + try{ + new Jokio().blocking(AsyncExamples.run()); + }catch (Exception e){ + throw new RuntimeException(e); + } } diff --git a/src/async_runtime/io/Readable.java b/src/async_runtime/io/Readable.java new file mode 100644 index 0000000..fdcb947 --- /dev/null +++ b/src/async_runtime/io/Readable.java @@ -0,0 +1,10 @@ +package async_runtime.io; + +import future.Future; + +import java.nio.ByteBuffer; + +public interface Readable { + Future read(ByteBuffer buffer); + Future read_all(ByteBuffer buffer); +} diff --git a/src/async_runtime/SelectorThread.java b/src/async_runtime/io/SelectorThread.java similarity index 98% rename from src/async_runtime/SelectorThread.java rename to src/async_runtime/io/SelectorThread.java index 5110665..d0c33b7 100644 --- a/src/async_runtime/SelectorThread.java +++ b/src/async_runtime/io/SelectorThread.java @@ -1,4 +1,4 @@ -package async_runtime; +package async_runtime.io; import java.io.IOException; import java.nio.channels.*; diff --git a/src/async_runtime/io/Writable.java b/src/async_runtime/io/Writable.java new file mode 100644 index 0000000..2640fcc --- /dev/null +++ b/src/async_runtime/io/Writable.java @@ -0,0 +1,10 @@ +package async_runtime.io; + +import future.Future; + +import java.nio.ByteBuffer; + +public interface Writable { + Future write(ByteBuffer buffer); + Future write_all(ByteBuffer buffer); +} diff --git a/src/async_runtime/io/fs/File.java b/src/async_runtime/io/fs/File.java new file mode 100644 index 0000000..6584e19 --- /dev/null +++ b/src/async_runtime/io/fs/File.java @@ -0,0 +1,165 @@ +package async_runtime.io.fs; + +import async_runtime.io.Readable; +import async_runtime.io.Writable; +import future.Future; +import future.Waker; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class File implements AutoCloseable, Readable, Writable { + private final AsynchronousFileChannel channel; + + protected File(AsynchronousFileChannel sc){ + this.channel = sc; + } + + public static File open(Path path) throws IOException { + return new File(AsynchronousFileChannel.open(path, StandardOpenOption.READ)); + } + + public long size() throws IOException { + return channel.size(); + } + + + @Override + public Future read(ByteBuffer buffer) { + return read(buffer, 0); + } + + @Override + public Future read_all(ByteBuffer buffer) { + return read_all(buffer, 0); + } + + @Override + public Future write(ByteBuffer buffer) { + return write(buffer, 0); + } + + @Override + public Future write_all(ByteBuffer buffer) { + return write_all(buffer, 0); + } + + public Future write_all(ByteBuffer buffer, long position){ + return new Future<>() { + int written = 0; + Throwable t; + @Override + public Object poll(Waker waker) throws IOException { + if(t!=null)throw (IOException) t; + if(!buffer.hasRemaining()) return written; + channel.write(buffer, written+position, waker, new CompletionHandler<>() { + @Override + public void completed(Integer result, Waker attachment) { + written = result; + waker.wake(); + } + + @Override + public void failed(Throwable exc, Waker attachment) { + t = exc; + attachment.wake();; + } + }); + return Pending.INSTANCE; + } + }; + } + + public Future write(ByteBuffer buffer, long position){ + return new Future<>() { + int written = 0; + Throwable t; + @Override + public Object poll(Waker waker) throws IOException { + if(t!=null)throw (IOException) t; + if(written!=0) return written; + channel.write(buffer, written+position, waker, new CompletionHandler<>() { + @Override + public void completed(Integer result, Waker attachment) { + written = result; + waker.wake(); + } + + @Override + public void failed(Throwable exc, Waker attachment) { + t = exc; + attachment.wake();; + } + }); + return Pending.INSTANCE; + } + }; + } + + public Future read(ByteBuffer buffer, long position){ + return new Future<>() { + int read = 0; + boolean eos = false; + Throwable t; + @Override + public Object poll(Waker waker) throws IOException { + if(t!=null)throw (IOException) t; + if(eos) return read; + if(read!=0) return read; + channel.read(buffer, read+position, waker, new CompletionHandler<>() { + @Override + public void completed(Integer result, Waker attachment) { + if(result==-1)eos = true; + else read += result; + waker.wake(); + waker.wake(); + } + + @Override + public void failed(Throwable exc, Waker attachment) { + t = exc; + attachment.wake();; + } + }); + return Pending.INSTANCE; + } + }; + } + + public Future read_all(ByteBuffer buffer, long position){ + return new Future<>() { + int read = 0; + boolean eos = false; + Throwable t; + @Override + public Object poll(Waker waker) throws IOException { + if(t!=null)throw (IOException) t; + if(eos) return read; + if(!buffer.hasRemaining()) return read; + channel.read(buffer, read+position, waker, new CompletionHandler<>() { + @Override + public void completed(Integer result, Waker attachment) { + if(result==-1)eos = true; + else read += result; + waker.wake(); + } + + @Override + public void failed(Throwable exc, Waker attachment) { + t = exc; + attachment.wake();; + } + }); + return Pending.INSTANCE; + } + }; + } + + @Override + public void close() throws IOException { + channel.close(); + } +} diff --git a/src/async_runtime/net/DatagramSocket.java b/src/async_runtime/io/net/DatagramSocket.java similarity index 93% rename from src/async_runtime/net/DatagramSocket.java rename to src/async_runtime/io/net/DatagramSocket.java index d308714..387e483 100644 --- a/src/async_runtime/net/DatagramSocket.java +++ b/src/async_runtime/io/net/DatagramSocket.java @@ -1,6 +1,8 @@ -package async_runtime.net; +package async_runtime.io.net; -import async_runtime.SelectorThread; +import async_runtime.io.Readable; +import async_runtime.io.SelectorThread; +import async_runtime.io.Writable; import future.Future; import future.Waker; @@ -12,7 +14,7 @@ import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; -public class DatagramSocket implements AutoCloseable{ +public class DatagramSocket implements AutoCloseable, Readable, Writable { private final static SelectorThread SELECTOR; static { @@ -76,6 +78,7 @@ public class DatagramSocket implements AutoCloseable{ return socket.getRemoteAddress(); } + @Override public Future write(ByteBuffer buffer){ return waker -> { var wrote = socket.write(buffer); @@ -85,6 +88,7 @@ public class DatagramSocket implements AutoCloseable{ }; } + @Override public Future write_all(ByteBuffer buffer){ var wrote = buffer.remaining(); return waker -> { @@ -113,7 +117,7 @@ public class DatagramSocket implements AutoCloseable{ }; } - + @Override public Future read(ByteBuffer buffer){ return waker -> { var read = socket.read(buffer); @@ -123,6 +127,7 @@ public class DatagramSocket implements AutoCloseable{ }; } + @Override public Future read_all(ByteBuffer buffer){ int read = buffer.remaining(); return waker -> { diff --git a/src/async_runtime/net/ServerSocket.java b/src/async_runtime/io/net/ServerSocket.java similarity index 90% rename from src/async_runtime/net/ServerSocket.java rename to src/async_runtime/io/net/ServerSocket.java index 99b85d4..8827b3a 100644 --- a/src/async_runtime/net/ServerSocket.java +++ b/src/async_runtime/io/net/ServerSocket.java @@ -1,9 +1,12 @@ -package async_runtime.net; +package async_runtime.io.net; -import async_runtime.SelectorThread; +import async_runtime.io.Readable; +import async_runtime.io.SelectorThread; +import async_runtime.io.Writable; import future.Future; import future.Waker; +import java.io.IO; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -11,7 +14,7 @@ import java.net.SocketOption; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; -public class ServerSocket implements AutoCloseable{ +public class ServerSocket implements AutoCloseable { private final static SelectorThread SELECTOR; diff --git a/src/async_runtime/net/Socket.java b/src/async_runtime/io/net/Socket.java similarity index 93% rename from src/async_runtime/net/Socket.java rename to src/async_runtime/io/net/Socket.java index ae94251..de2ca28 100644 --- a/src/async_runtime/net/Socket.java +++ b/src/async_runtime/io/net/Socket.java @@ -1,6 +1,8 @@ -package async_runtime.net; +package async_runtime.io.net; -import async_runtime.SelectorThread; +import async_runtime.io.Readable; +import async_runtime.io.SelectorThread; +import async_runtime.io.Writable; import future.Future; import future.Waker; @@ -12,7 +14,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -public class Socket implements AutoCloseable{ +public class Socket implements AutoCloseable, Readable, Writable { private final static SelectorThread SELECTOR; static { @@ -76,6 +78,7 @@ public class Socket implements AutoCloseable{ return socket.getRemoteAddress(); } + @Override public Future write(ByteBuffer buffer){ return waker -> { var wrote = socket.write(buffer); @@ -85,6 +88,7 @@ public class Socket implements AutoCloseable{ }; } + @Override public Future write_all(ByteBuffer buffer){ var wrote = buffer.remaining(); return waker -> { @@ -95,6 +99,7 @@ public class Socket implements AutoCloseable{ }; } + @Override public Future read(ByteBuffer buffer){ return waker -> { var read = socket.read(buffer); @@ -104,6 +109,7 @@ public class Socket implements AutoCloseable{ }; } + @Override public Future read_all(ByteBuffer buffer){ int read = buffer.remaining(); return waker -> { diff --git a/src/generators/loadtime/GeneratorClassLoader.java b/src/generators/loadtime/GeneratorClassLoader.java index 29d2e71..479c64c 100644 --- a/src/generators/loadtime/GeneratorClassLoader.java +++ b/src/generators/loadtime/GeneratorClassLoader.java @@ -63,8 +63,8 @@ public class GeneratorClassLoader extends ClassLoader { return ClassFile.of(ClassFile.AttributesProcessingOption.PASS_ALL_ATTRIBUTES, ClassFile.StackMapsOption.STACK_MAPS_WHEN_REQUIRED).build(clm.thisClass().asSymbol(), cb -> { for (var ce : clm) { - if (ce instanceof MethodModel mem && !isGen && !isFuture) { - StateMachineBuilder builder; + if (ce instanceof MethodModel mem && !isGen && !isFuture && mem.code().isPresent()) { + StateMachineBuilder builder; if(mem.methodTypeSymbol().returnType().descriptorString().equals(Gen.class.descriptorString())){ builder = new GenSMBuilder(clm, mem, mem.code().get()); }else if(mem.methodTypeSymbol().returnType().descriptorString().equals(Future.class.descriptorString())){