From f99beb721b60721101a0e5b4b18f8e285a507039 Mon Sep 17 00:00:00 2001 From: ParkerTenBroeck <51721964+ParkerTenBroeck@users.noreply.github.com> Date: Thu, 1 May 2025 21:37:34 -0400 Subject: [PATCH] moved selector code into own class --- src/async_runtime/SelectorThread.java | 57 +++++++ src/async_runtime/net/DatagramSocket.java | 122 +++++++++++++++ src/async_runtime/net/ServerSocket.java | 96 ++++-------- src/async_runtime/net/Socket.java | 176 ++++++++-------------- src/future/Future.java | 4 +- 5 files changed, 270 insertions(+), 185 deletions(-) create mode 100644 src/async_runtime/SelectorThread.java create mode 100644 src/async_runtime/net/DatagramSocket.java diff --git a/src/async_runtime/SelectorThread.java b/src/async_runtime/SelectorThread.java new file mode 100644 index 0000000..5110665 --- /dev/null +++ b/src/async_runtime/SelectorThread.java @@ -0,0 +1,57 @@ +package async_runtime; + +import java.io.IOException; +import java.nio.channels.*; +import java.util.ArrayDeque; + +public abstract class SelectorThread extends Thread{ + + private final Selector selector; + + private record ToRegister(T sc, int ops, A waker){} + private final ArrayDeque> to_register = new ArrayDeque<>(); + + + public SelectorThread(String name) throws IOException { + selector = Selector.open(); + this.setName(name + " Polling Thread"); + this.setDaemon(true); + this.start(); + } + + public abstract void handle(SelectionKey key, T t, A a) throws IOException; + + public void register(T sc, int ops, A waker){ + synchronized (to_register){ + to_register.add(new ToRegister<>(sc, ops, waker)); + } + selector.wakeup(); + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + while(!Thread.currentThread().isInterrupted()){ + try{ + synchronized (to_register){ + while(!to_register.isEmpty()){ + var to = to_register.poll(); + to.sc.register(selector, to.ops, to.waker); + } + } + + selector.select(); + var keys = selector.selectedKeys().iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + + handle(key, (T)key.channel(), (A)key.attachment()); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + } +} diff --git a/src/async_runtime/net/DatagramSocket.java b/src/async_runtime/net/DatagramSocket.java new file mode 100644 index 0000000..22cf28d --- /dev/null +++ b/src/async_runtime/net/DatagramSocket.java @@ -0,0 +1,122 @@ +package async_runtime.net; + +import async_runtime.SelectorThread; +import future.Future; +import future.Waker; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketOption; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +public class DatagramSocket implements AutoCloseable{ + private final static SelectorThread SELECTOR; + + static { + try { + SELECTOR = new SelectorThread<>("DatagramSocket") { + @Override + public void handle(SelectionKey key, DatagramChannel c, Waker w) { + if (!key.isValid()) { + }else if(key.isAcceptable()){ + }else if(key.isConnectable()){ + w.wake(); + }else if(key.isReadable()){ + w.wake(); + }else if(key.isWritable()){ + w.wake(); + } + } + }; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private final DatagramChannel socket; + + protected DatagramSocket(DatagramChannel sc){ + this.socket = sc; + } + + public static DatagramSocket open(InetSocketAddress inet) throws IOException { + var socket = DatagramChannel.open(); + socket.configureBlocking(false); + return new DatagramSocket(socket); + } + + public DatagramSocket bind(InetSocketAddress inet) throws IOException { + socket.bind(inet); + return this; + } + + public DatagramSocket connect(InetSocketAddress inet) throws IOException { + socket.connect(inet); + return this; + } + + public DatagramSocket disconnect() throws IOException{ + socket.disconnect(); + return this; + } + + public DatagramSocket set_options(SocketOption option, T value) throws IOException{ + socket.setOption(option, value); + return this; + } + + public SocketAddress local_address() throws IOException { + return socket.getLocalAddress(); + } + + public SocketAddress remote_address() throws IOException { + return socket.getRemoteAddress(); + } + + public Future write(ByteBuffer buffer){ + return waker -> { + var wrote = socket.write(buffer); + if(wrote!=0) return wrote; + SELECTOR.register(socket, SelectionKey.OP_WRITE, waker); + return Future.Pending.INSTANCE; + }; + } + + public Future write_all(ByteBuffer buffer){ + var wrote = buffer.remaining(); + return waker -> { + socket.write(buffer); + if(!buffer.hasRemaining()) return wrote; + SELECTOR.register(socket, SelectionKey.OP_WRITE, waker); + return Future.Pending.INSTANCE; + }; + } + + public Future read(ByteBuffer buffer){ + return waker -> { + var read = socket.read(buffer); + if(read!=0) return read; + SELECTOR.register(socket, SelectionKey.OP_READ, waker); + return Future.Pending.INSTANCE; + }; + } + + public Future read_all(ByteBuffer buffer){ + int read = buffer.remaining(); + return waker -> { + var read_now = socket.read(buffer); + if(read_now ==-1)throw new IOException("Reached EOS while filling buffer"); + if(!buffer.hasRemaining()) return read; + SELECTOR.register(socket, SelectionKey.OP_READ, waker); + return Future.Pending.INSTANCE; + }; + } + + @Override + public void close() throws Exception { + socket.close(); + } +} diff --git a/src/async_runtime/net/ServerSocket.java b/src/async_runtime/net/ServerSocket.java index 5832ecf..becfd92 100644 --- a/src/async_runtime/net/ServerSocket.java +++ b/src/async_runtime/net/ServerSocket.java @@ -1,66 +1,34 @@ package async_runtime.net; +import async_runtime.SelectorThread; import future.Future; import future.Waker; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketOption; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; -import java.util.ArrayDeque; public class ServerSocket implements AutoCloseable{ - private final static Selector SELECTOR; - private record ToRegister(ServerSocketChannel sc, int ops, Waker waker){} - private final static ArrayDeque to_register = new ArrayDeque<>(); - static{ + private final static SelectorThread SELECTOR; + + static { try { - SELECTOR = Selector.open(); + SELECTOR = new SelectorThread<>("ServerSocket") { + @Override + public void handle(SelectionKey key, ServerSocketChannel c, Waker w) { + if (!key.isValid()) { + }else if(key.isAcceptable()){ + w.wake(); + } + } + }; } catch (IOException e) { throw new RuntimeException(e); } - - var thread = new Thread(() -> { - while(!Thread.currentThread().isInterrupted()){ - try{ - synchronized (to_register){ - while(!to_register.isEmpty()){ - var to = to_register.poll(); - to.sc.register(SELECTOR, to.ops, to.waker); - } - } - - SELECTOR.select(); - var keys = SELECTOR.selectedKeys().iterator(); - - while (keys.hasNext()) { - SelectionKey key = keys.next(); - keys.remove(); - var c = (ServerSocketChannel)key.channel(); - var w = (Waker)key.attachment(); - - if (!key.isValid()) { - }else if(key.isAcceptable()){ - w.wake(); - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } - }); - thread.setName("ServerSocket Polling Thread"); - thread.setDaemon(true); - thread.start(); - } - - private static void register(ServerSocketChannel sc, int ops, Waker waker){ - synchronized (to_register){ - to_register.add(new ToRegister(sc, ops, waker)); - } - SELECTOR.wakeup(); } private final ServerSocketChannel socket; @@ -77,29 +45,25 @@ public class ServerSocket implements AutoCloseable{ } public Future accept(){ - return new Future<>() { - @Override - public Object poll(Waker waker) throws IOException { - var socc = socket.accept(); - if(socc==null) { - register(socket, SelectionKey.OP_ACCEPT, waker); - return Pending.INSTANCE; - } - socc.configureBlocking(false); - return new Socket(socc); - } - - @Override - public void cancel() { - try { - if(socket!=null) socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return waker -> { + var accepted = socket.accept(); + if(accepted==null) { + SELECTOR.register(socket, SelectionKey.OP_ACCEPT, waker); + return Future.Pending.INSTANCE; } + accepted.configureBlocking(false); + return new Socket(accepted); }; } + public void set_options(SocketOption option, T value) throws IOException{ + socket.setOption(option, value); + } + + public SocketAddress local_address() throws IOException { + return socket.getLocalAddress(); + } + @Override public void close() throws Exception { socket.close(); diff --git a/src/async_runtime/net/Socket.java b/src/async_runtime/net/Socket.java index eaf8063..3913d19 100644 --- a/src/async_runtime/net/Socket.java +++ b/src/async_runtime/net/Socket.java @@ -1,73 +1,40 @@ package async_runtime.net; +import async_runtime.SelectorThread; import future.Future; import future.Waker; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketOption; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.ArrayDeque; public class Socket implements AutoCloseable{ + private final static SelectorThread SELECTOR; - private final static Selector SELECTOR; - private record ToRegister(SocketChannel sc, int ops, Waker waker){} - private final static ArrayDeque to_register = new ArrayDeque<>(); - static{ + static { try { - SELECTOR = Selector.open(); + SELECTOR = new SelectorThread<>("Socket") { + @Override + public void handle(SelectionKey key, SocketChannel c, Waker w) throws IOException { + if (!key.isValid()) { + }else if(key.isAcceptable()){ + }else if(key.isConnectable()){ + c.finishConnect(); + w.wake(); + }else if(key.isReadable()){ + w.wake(); + }else if(key.isWritable()){ + w.wake(); + } + } + }; } catch (IOException e) { throw new RuntimeException(e); } - - var thread = new Thread(() -> { - while(!Thread.currentThread().isInterrupted()){ - try{ - synchronized (to_register){ - while(!to_register.isEmpty()){ - var to = to_register.poll(); - to.sc.register(SELECTOR, to.ops, to.waker); - } - } - - SELECTOR.select(); - var keys = SELECTOR.selectedKeys().iterator(); - - while (keys.hasNext()) { - SelectionKey key = keys.next(); - keys.remove(); - var c = (SocketChannel)key.channel(); - var w = (Waker)key.attachment(); - - if (!key.isValid()) { - }else if(key.isAcceptable()){ - }else if(key.isConnectable()){ - c.finishConnect(); - w.wake(); - }else if(key.isReadable()){ - w.wake(); - }else if(key.isWritable()){ - w.wake(); - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } - }); - thread.setName("Socket Polling Thread"); - thread.setDaemon(true); - thread.start(); - } - - private static void register(SocketChannel sc, int ops, Waker waker){ - synchronized (to_register){ - to_register.add(new ToRegister(sc, ops, waker)); - } - SELECTOR.wakeup(); } private final SocketChannel socket; @@ -86,88 +53,65 @@ public class Socket implements AutoCloseable{ socket.configureBlocking(false); var connected = socket.connect(inet); if(!connected) { - register(socket, SelectionKey.OP_CONNECT, waker); + SELECTOR.register(socket, SelectionKey.OP_CONNECT, waker); return Pending.INSTANCE; } } if(socket.isConnected()) return new Socket(socket); return Pending.INSTANCE; } + }; + } - @Override - public void cancel() { - try { - if(socket!=null) socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + public Socket set_options(SocketOption option, T value) throws IOException{ + socket.setOption(option, value); + return this; + } + + public SocketAddress local_address() throws IOException { + return socket.getLocalAddress(); + } + + public SocketAddress remote_address() throws IOException { + return socket.getRemoteAddress(); + } + + public Future write(ByteBuffer buffer){ + return waker -> { + var wrote = socket.write(buffer); + if(wrote!=0) return wrote; + SELECTOR.register(socket, SelectionKey.OP_WRITE, waker); + return Future.Pending.INSTANCE; }; } public Future write_all(ByteBuffer buffer){ - return new Future<>() { - int wrote = 0; - @Override - public Object poll(Waker waker) throws IOException { - wrote += socket.write(buffer); - if(!buffer.hasRemaining()) return wrote; - register(socket, SelectionKey.OP_WRITE, waker); - return Pending.INSTANCE; - } - - @Override - public void cancel() { - try { - if(socket!=null) socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + var wrote = buffer.remaining(); + return waker -> { + socket.write(buffer); + if(!buffer.hasRemaining()) return wrote; + SELECTOR.register(socket, SelectionKey.OP_WRITE, waker); + return Future.Pending.INSTANCE; }; } public Future read(ByteBuffer buffer){ - return new Future<>() { - int read = 0; - @Override - public Object poll(Waker waker) throws IOException { - read += socket.read(buffer); - if(read>0) return read; - register(socket, SelectionKey.OP_READ, waker); - return Pending.INSTANCE; - } - - @Override - public void cancel() { - try { - if(socket!=null) socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + return waker -> { + var read = socket.read(buffer); + if(read!=0) return read; + SELECTOR.register(socket, SelectionKey.OP_READ, waker); + return Future.Pending.INSTANCE; }; } public Future read_all(ByteBuffer buffer){ - return new Future<>() { - int read = 0; - @Override - public Object poll(Waker waker) throws IOException { - read += socket.read(buffer); - if(!buffer.hasRemaining()) return read; - register(socket, SelectionKey.OP_READ, waker); - return Pending.INSTANCE; - } - - @Override - public void cancel() { - try { - if(socket!=null) socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + int read = buffer.remaining(); + return waker -> { + var read_now = socket.read(buffer); + if(read_now ==-1)throw new IOException("Reached EOS while filling buffer"); + if(!buffer.hasRemaining()) return read; + SELECTOR.register(socket, SelectionKey.OP_READ, waker); + return Future.Pending.INSTANCE; }; } diff --git a/src/future/Future.java b/src/future/Future.java index a64d7ad..4411c3a 100644 --- a/src/future/Future.java +++ b/src/future/Future.java @@ -2,9 +2,7 @@ package future; public interface Future { - default Object poll(Waker waker) throws E{ - return Pending.INSTANCE; - } + Object poll(Waker waker) throws E; default R await() throws E{ throw new RuntimeException("NO!");