From f715506ace63b1d6f98fd12f884cf692825969fb Mon Sep 17 00:00:00 2001 From: Parker TenBroeck <51721964+ParkerTenBroeck@users.noreply.github.com> Date: Wed, 30 Apr 2025 21:45:20 -0400 Subject: [PATCH] added more async networking features --- src/Examples.java | 64 ++++++++++--- src/Main.java | 2 +- src/async_example/Delay.java | 8 +- src/async_example/Jokio.java | 69 +++++++++----- src/async_example/net/ServerSocket.java | 114 ++++++++++++++++++++++++ src/async_example/{ => net}/Socket.java | 67 ++++++++------ src/generator/future/Future.java | 10 +-- src/generator/runtime/FrameTracker.java | 14 +-- 8 files changed, 268 insertions(+), 80 deletions(-) create mode 100644 src/async_example/net/ServerSocket.java rename src/async_example/{ => net}/Socket.java (71%) diff --git a/src/Examples.java b/src/Examples.java index ddd8c3e..77d0ed3 100644 --- a/src/Examples.java +++ b/src/Examples.java @@ -1,30 +1,74 @@ +import async_example.Delay; import async_example.Jokio; -import async_example.Socket; +import async_example.net.ServerSocket; +import async_example.net.Socket; import generator.future.Future; import generator.future.Waker; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; public class Examples { - public static Future test(){ - for(int i = 0; i < 1000; i ++){ - Jokio.runtime(Waker.waker()).spawn(echoForever("Message " + i + "\n")); + static long sent = 0; + static long received = 0; + public static Future test(){ + Jokio.runtime(Waker.waker()).spawn(server()); + + for(int i = 0; i < 10000; i ++){ + var builder = new StringBuilder(); + for(int c = 0; c < 4096*2; c ++) + builder.append((char)((Math.random()*('z'-'a')+'a'))); + Jokio.runtime(Waker.waker()).spawn(echoForever(builder.toString())); + } + while(true){ + System.out.println(sent + " " + received + " " + Jokio.polled); + Delay.delay(100).await(); + } + } + + + public static Future server(){ + try(var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){ + while (true){ + var socket = ss.accept().await(); + Jokio.runtime(Waker.waker()).spawn(echo(socket)); + } + } catch (Exception e) { + e.printStackTrace(); } return Future.ret(null); } - public static Future echoForever(String message){ - try(var socket = Socket.connect(new InetSocketAddress("45.79.112.203", 4242)).await()){ - var buffer = ByteBuffer.allocate(500); + public static Future echo(Socket socket){ + try(socket){ + var buffer = ByteBuffer.allocate(4096); while(true){ - buffer.limit(message.length()).put(message.getBytes(StandardCharsets.UTF_8)).position(0); + var read = socket.read(buffer).await(); + buffer.clear().limit(read); + socket.write_all(buffer).await(); + buffer.clear(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static Future echoForever(String message){ + byte[] msg_bytes = message.getBytes(StandardCharsets.UTF_8); + try(var socket = Socket.connect(new InetSocketAddress("localhost", 42069)).await()){ + var buffer = ByteBuffer.allocate(message.length()); + while(true){ + buffer.limit(message.length()).put(msg_bytes).position(0); var wrote = socket.write_all(buffer).await(); + sent++; buffer.clear().limit(wrote); - var read = socket.read_all(buffer).await(); - System.out.print(new String(buffer.array(), 0, read)); + socket.read_all(buffer).await(); + if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes))) + throw new RuntimeException(); + received++; buffer.clear(); } } catch (Exception e) { diff --git a/src/Main.java b/src/Main.java index defda30..ec9c29c 100644 --- a/src/Main.java +++ b/src/Main.java @@ -32,7 +32,7 @@ public class Main implements Runnable { // } // } - Object simple_async_rt(Future fut){ + Object simple_async_rt(Future fut){ final var waker = new Waker(){ @Override public void wake() { diff --git a/src/async_example/Delay.java b/src/async_example/Delay.java index f10df07..c57c1c4 100644 --- a/src/async_example/Delay.java +++ b/src/async_example/Delay.java @@ -6,7 +6,7 @@ import generator.future.Waker; import java.util.Timer; import java.util.TimerTask; -public class Delay implements Future { +public class Delay implements Future { private final static Timer timer; private TimerTask task; @@ -17,11 +17,15 @@ public class Delay implements Future { private int delay; private boolean ready; - public Delay(int ms) { + protected Delay(int ms) { if (ms < 0) throw new IllegalArgumentException("async_example.Delay cannot be negative"); delay = ms; } + public static Future delay(int ms){ + return new Delay(ms); + } + @Override public void cancel() { if (task != null) task.cancel(); diff --git a/src/async_example/Jokio.java b/src/async_example/Jokio.java index 8168642..c6cf18c 100644 --- a/src/async_example/Jokio.java +++ b/src/async_example/Jokio.java @@ -5,23 +5,22 @@ import generator.future.Waker; import java.util.ArrayDeque; import java.util.HashSet; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicInteger; public class Jokio implements Runnable{ - private class Task implements Waker{ - public final Future future; + public static long polled = 0; + private class Task implements Waker{ + public final Future future; - private Task(Future future) { + private Task(Future future) { this.future = future; } @Override public void wake() { - woke.add(this); synchronized (Jokio.this){ + if(wokeSet.add(this)) + wokeQueue.add(this); Jokio.this.notifyAll(); } } @@ -31,38 +30,42 @@ public class Jokio implements Runnable{ } } - public static Future runtime(){ + public static Future runtime(){ return new Future<>() { @Override public Jokio poll(Waker waker) { - return ((Task)waker).runtime(); + return ((Task)waker).runtime(); } }; } public static Jokio runtime(Waker waker){ - return ((Task)waker).runtime(); + return ((Task)waker).runtime(); } - private final AtomicInteger current = new AtomicInteger(0); - private final ConcurrentLinkedDeque> woke = new ConcurrentLinkedDeque<>(); + private volatile long current = 0; + private final ArrayDeque> wokeQueue = new ArrayDeque<>(); + private final HashSet> wokeSet = new HashSet<>(); - public void blocking(Future fut){ + public void blocking(Future fut){ spawn(fut).run(); } - public Jokio spawn(Future future){ + public Jokio spawn(Future future){ var task = new Task<>(future); - current.getAndIncrement(); - woke.add(task); + synchronized (this){ + current++; + wokeQueue.add(task); + wokeSet.add(task); + } return this; } @Override public void run(){ - while(current.get() > 0) { + while(current > 0) { synchronized (this) { - while (woke.isEmpty()) { + while (wokeQueue.isEmpty()) { try { this.wait(); } catch (InterruptedException e) { @@ -70,12 +73,32 @@ public class Jokio implements Runnable{ } } } - var task = woke.poll(); - var result = task.future.poll(task); - if(result!=Future.Pending.INSTANCE) { - current.getAndDecrement(); - System.out.println(result); + Task task; + synchronized (this){ + task = wokeQueue.poll(); + wokeSet.remove(task); } + Object result; + try{ + result = task.future.poll(task); + }catch (Throwable t){ + throw new RuntimeException(t); +//// System.out.println("Future " + task.future + " Threw Exception"); +//// t.printStackTrace(); +// synchronized (this){ +// current--; +// polled++; +// } +// continue; + } + synchronized (this){ + if(result!=Future.Pending.INSTANCE) { + current--; + System.out.println(result); + } + polled++; + } + } } } diff --git a/src/async_example/net/ServerSocket.java b/src/async_example/net/ServerSocket.java new file mode 100644 index 0000000..b19dd73 --- /dev/null +++ b/src/async_example/net/ServerSocket.java @@ -0,0 +1,114 @@ +package async_example.net; + +import generator.future.Future; +import generator.future.Waker; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayDeque; + +public class ServerSocket implements AutoCloseable{ + + private final static Selector SELECTOR; + private record ToRegister(ServerSocketChannel sc, int ops, Waker waker){} + private final static ArrayDeque to_register = new ArrayDeque<>(); + static{ + try { + SELECTOR = Selector.open(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + var thread = new Thread(() -> { + while(!Thread.currentThread().isInterrupted()){ + try{ + synchronized (to_register){ + while(!to_register.isEmpty()){ + var to = to_register.poll(); + to.sc.register(SELECTOR, to.ops, to.waker); + } + } + + SELECTOR.select(); + var keys = SELECTOR.selectedKeys().iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + var c = (ServerSocketChannel)key.channel(); + var w = (Waker)key.attachment(); + + if (!key.isValid()) { + }else if(key.isAcceptable()){ + w.wake(); + } +// else if(key.isConnectable()){ +// }else if(key.isReadable()){ +// w.wake(); +// }else if(key.isWritable()){ +// w.wake(); +// } + } + }catch (Exception e){ + e.printStackTrace(); + } + } + }); + thread.setName("ServerSocket Polling Thread"); + thread.setDaemon(true); + thread.start(); + } + + private static void register(ServerSocketChannel sc, int ops, Waker waker){ + synchronized (to_register){ + to_register.add(new ToRegister(sc, ops, waker)); + } + SELECTOR.wakeup(); + } + + private final ServerSocketChannel socket; + + private ServerSocket(ServerSocketChannel sc){ + this.socket = sc; + } + + public static ServerSocket bind(InetSocketAddress inet) throws IOException { + var socket = ServerSocketChannel.open(); + socket.configureBlocking(false); + socket.bind(inet); + return new ServerSocket(socket); + } + + public Future accept(){ + return new Future<>() { + @Override + public Object poll(Waker waker) throws IOException { + var socc = socket.accept(); + if(socc==null) { + register(socket, SelectionKey.OP_ACCEPT, waker); + return Pending.INSTANCE; + } + socc.configureBlocking(false); + return new Socket(socc); + } + + @Override + public void cancel() { + try { + if(socket!=null) socket.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + @Override + public void close() throws Exception { + socket.close(); + } +} diff --git a/src/async_example/Socket.java b/src/async_example/net/Socket.java similarity index 71% rename from src/async_example/Socket.java rename to src/async_example/net/Socket.java index 8dc447e..e8cd424 100644 --- a/src/async_example/Socket.java +++ b/src/async_example/net/Socket.java @@ -1,4 +1,4 @@ -package async_example; +package async_example.net; import generator.future.Future; import generator.future.Waker; @@ -43,16 +43,15 @@ public class Socket implements AutoCloseable{ var w = (Waker)key.attachment(); if (!key.isValid()) { - } - if(key.isAcceptable()){ - + }else if(key.isAcceptable()){ }else if(key.isConnectable()){ c.finishConnect(); + w.wake(); }else if(key.isReadable()){ + w.wake(); }else if(key.isWritable()){ - + w.wake(); } - w.wake(); } }catch (Exception e){ e.printStackTrace(); @@ -73,26 +72,22 @@ public class Socket implements AutoCloseable{ private final SocketChannel socket; - private Socket(SocketChannel sc){ + protected Socket(SocketChannel sc){ this.socket = sc; } - public static Future connect(InetSocketAddress inet) { + public static Future connect(InetSocketAddress inet) { return new Future<>() { public SocketChannel socket; @Override - public Object poll(Waker waker) { + public Object poll(Waker waker) throws IOException { if(socket==null){ - try{ - socket = SocketChannel.open(); - socket.configureBlocking(false); - var connected = socket.connect(inet); - if(!connected) { - register(socket, SelectionKey.OP_CONNECT, waker); - return Pending.INSTANCE; - } - }catch (Exception e){ - throw new RuntimeException(e); + socket = SocketChannel.open(); + socket.configureBlocking(false); + var connected = socket.connect(inet); + if(!connected) { + register(socket, SelectionKey.OP_CONNECT, waker); + return Pending.INSTANCE; } } if(socket.isConnected()) return new Socket(socket); @@ -110,16 +105,12 @@ public class Socket implements AutoCloseable{ }; } - public Future write_all(ByteBuffer buffer){ + public Future write_all(ByteBuffer buffer){ return new Future<>() { int wrote = 0; @Override - public Object poll(Waker waker) { - try { - wrote += socket.write(buffer); - } catch (IOException e) { - throw new RuntimeException(e); - } + public Object poll(Waker waker) throws IOException { + wrote += socket.write(buffer); if(!buffer.hasRemaining()) return wrote; register(socket, SelectionKey.OP_WRITE, waker); return Pending.INSTANCE; @@ -136,16 +127,34 @@ public class Socket implements AutoCloseable{ }; } - public Future read_all(ByteBuffer buffer){ + public Future read(ByteBuffer buffer){ return new Future<>() { int read = 0; @Override - public Object poll(Waker waker) { + public Object poll(Waker waker) throws IOException { + read += socket.read(buffer); + if(read>0) return read; + register(socket, SelectionKey.OP_READ, waker); + return Pending.INSTANCE; + } + + @Override + public void cancel() { try { - read += socket.read(buffer); + if(socket!=null) socket.close(); } catch (IOException e) { throw new RuntimeException(e); } + } + }; + } + + public Future read_all(ByteBuffer buffer){ + return new Future<>() { + int read = 0; + @Override + public Object poll(Waker waker) throws IOException { + read += socket.read(buffer); if(!buffer.hasRemaining()) return read; register(socket, SelectionKey.OP_READ, waker); return Pending.INSTANCE; diff --git a/src/generator/future/Future.java b/src/generator/future/Future.java index d404ef3..e4fd208 100644 --- a/src/generator/future/Future.java +++ b/src/generator/future/Future.java @@ -1,22 +1,22 @@ package generator.future; -public interface Future { +public interface Future { - default Object poll(Waker waker){ + default Object poll(Waker waker) throws E{ return Pending.INSTANCE; } - default R await(){ + default R await() throws E{ throw new RuntimeException("NO!"); } default void cancel(){} - static Future ret(R r){ + static Future ret(R r){ throw new RuntimeException("NO!"); } - static Future ret(){ + static Future ret(){ throw new RuntimeException(); } diff --git a/src/generator/runtime/FrameTracker.java b/src/generator/runtime/FrameTracker.java index a437f00..fb8897c 100644 --- a/src/generator/runtime/FrameTracker.java +++ b/src/generator/runtime/FrameTracker.java @@ -6,10 +6,7 @@ import java.lang.classfile.attribute.RuntimeVisibleTypeAnnotationsAttribute; import java.lang.classfile.attribute.StackMapFrameInfo; import java.lang.classfile.attribute.StackMapTableAttribute; import java.lang.classfile.instruction.*; -import java.lang.constant.ClassDesc; -import java.lang.constant.DynamicConstantDesc; -import java.lang.constant.MethodHandleDesc; -import java.lang.constant.MethodTypeDesc; +import java.lang.constant.*; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -91,7 +88,7 @@ public class FrameTracker { case ITEM_BYTE -> "byte"; case ITEM_SHORT -> "short"; case ITEM_CHAR -> "char"; - case ITEM_LONG_2ND -> "float2"; + case ITEM_LONG_2ND -> "long2"; case ITEM_DOUBLE_2ND -> "double2"; default -> throw new IllegalStateException("Unexpected value: " + tag); }; @@ -311,10 +308,10 @@ public class FrameTracker { case ConstantInstruction c when ins.opcode() == Opcode.ACONST_NULL -> pushStack(Type.NULL_TYPE); case ConstantInstruction c -> { switch(c.constantValue()){ - case Double _ -> pushStack(Type.DOUBLE_TYPE); + case Double _ -> pushStack(Type.DOUBLE_TYPE, Type.DOUBLE2_TYPE); case Float _ -> pushStack(Type.FLOAT_TYPE); case Integer _ -> pushStack(Type.INTEGER_TYPE); - case Long _ -> pushStack(Type.LONG_TYPE); + case Long _ -> pushStack(Type.LONG_TYPE, Type.LONG2_TYPE); case String _ -> pushStack(Type.STRING_TYPE); case ClassDesc desc -> pushStack(desc); case DynamicConstantDesc dynamicConstantDesc -> pushStack(dynamicConstantDesc.constantType()); @@ -352,9 +349,6 @@ public class FrameTracker { case InvokeInstruction i -> { for(var param : i.typeSymbol().parameterArray()) decStack(TypeKind.from(param).slotSize()); - if(stack.isEmpty()){ - System.out.println(Arrays.toString(i.typeSymbol().parameterArray())); - } popStack(); pushStack(i.typeSymbol().returnType()); }