moved selector code into own class

This commit is contained in:
ParkerTenBroeck 2025-05-01 21:37:34 -04:00
parent 93a764c986
commit f99beb721b
5 changed files with 270 additions and 185 deletions

View file

@ -0,0 +1,57 @@
package async_runtime;
import java.io.IOException;
import java.nio.channels.*;
import java.util.ArrayDeque;
public abstract class SelectorThread<T extends SelectableChannel, A> extends Thread{
private final Selector selector;
private record ToRegister<T, A>(T sc, int ops, A waker){}
private final ArrayDeque<ToRegister<T, A>> to_register = new ArrayDeque<>();
public SelectorThread(String name) throws IOException {
selector = Selector.open();
this.setName(name + " Polling Thread");
this.setDaemon(true);
this.start();
}
public abstract void handle(SelectionKey key, T t, A a) throws IOException;
public void register(T sc, int ops, A waker){
synchronized (to_register){
to_register.add(new ToRegister<>(sc, ops, waker));
}
selector.wakeup();
}
@SuppressWarnings("unchecked")
@Override
public void run() {
while(!Thread.currentThread().isInterrupted()){
try{
synchronized (to_register){
while(!to_register.isEmpty()){
var to = to_register.poll();
to.sc.register(selector, to.ops, to.waker);
}
}
selector.select();
var keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
handle(key, (T)key.channel(), (A)key.attachment());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}

View file

@ -0,0 +1,122 @@
package async_runtime.net;
import async_runtime.SelectorThread;
import future.Future;
import future.Waker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
public class DatagramSocket implements AutoCloseable{
private final static SelectorThread<DatagramChannel, Waker> SELECTOR;
static {
try {
SELECTOR = new SelectorThread<>("DatagramSocket") {
@Override
public void handle(SelectionKey key, DatagramChannel c, Waker w) {
if (!key.isValid()) {
}else if(key.isAcceptable()){
}else if(key.isConnectable()){
w.wake();
}else if(key.isReadable()){
w.wake();
}else if(key.isWritable()){
w.wake();
}
}
};
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private final DatagramChannel socket;
protected DatagramSocket(DatagramChannel sc){
this.socket = sc;
}
public static DatagramSocket open(InetSocketAddress inet) throws IOException {
var socket = DatagramChannel.open();
socket.configureBlocking(false);
return new DatagramSocket(socket);
}
public DatagramSocket bind(InetSocketAddress inet) throws IOException {
socket.bind(inet);
return this;
}
public DatagramSocket connect(InetSocketAddress inet) throws IOException {
socket.connect(inet);
return this;
}
public DatagramSocket disconnect() throws IOException{
socket.disconnect();
return this;
}
public <T> DatagramSocket set_options(SocketOption<T> option, T value) throws IOException{
socket.setOption(option, value);
return this;
}
public SocketAddress local_address() throws IOException {
return socket.getLocalAddress();
}
public SocketAddress remote_address() throws IOException {
return socket.getRemoteAddress();
}
public Future<Integer, IOException> write(ByteBuffer buffer){
return waker -> {
var wrote = socket.write(buffer);
if(wrote!=0) return wrote;
SELECTOR.register(socket, SelectionKey.OP_WRITE, waker);
return Future.Pending.INSTANCE;
};
}
public Future<Integer, IOException> write_all(ByteBuffer buffer){
var wrote = buffer.remaining();
return waker -> {
socket.write(buffer);
if(!buffer.hasRemaining()) return wrote;
SELECTOR.register(socket, SelectionKey.OP_WRITE, waker);
return Future.Pending.INSTANCE;
};
}
public Future<Integer, IOException> read(ByteBuffer buffer){
return waker -> {
var read = socket.read(buffer);
if(read!=0) return read;
SELECTOR.register(socket, SelectionKey.OP_READ, waker);
return Future.Pending.INSTANCE;
};
}
public Future<Integer, IOException> read_all(ByteBuffer buffer){
int read = buffer.remaining();
return waker -> {
var read_now = socket.read(buffer);
if(read_now ==-1)throw new IOException("Reached EOS while filling buffer");
if(!buffer.hasRemaining()) return read;
SELECTOR.register(socket, SelectionKey.OP_READ, waker);
return Future.Pending.INSTANCE;
};
}
@Override
public void close() throws Exception {
socket.close();
}
}

View file

@ -1,66 +1,34 @@
package async_runtime.net; package async_runtime.net;
import async_runtime.SelectorThread;
import future.Future; import future.Future;
import future.Waker; import future.Waker;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.util.ArrayDeque;
public class ServerSocket implements AutoCloseable{ public class ServerSocket implements AutoCloseable{
private final static Selector SELECTOR; private final static SelectorThread<ServerSocketChannel, Waker> SELECTOR;
private record ToRegister(ServerSocketChannel sc, int ops, Waker waker){}
private final static ArrayDeque<ToRegister> to_register = new ArrayDeque<>(); static {
static{
try { try {
SELECTOR = Selector.open(); SELECTOR = new SelectorThread<>("ServerSocket") {
@Override
public void handle(SelectionKey key, ServerSocketChannel c, Waker w) {
if (!key.isValid()) {
}else if(key.isAcceptable()){
w.wake();
}
}
};
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var thread = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()){
try{
synchronized (to_register){
while(!to_register.isEmpty()){
var to = to_register.poll();
to.sc.register(SELECTOR, to.ops, to.waker);
}
}
SELECTOR.select();
var keys = SELECTOR.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
var c = (ServerSocketChannel)key.channel();
var w = (Waker)key.attachment();
if (!key.isValid()) {
}else if(key.isAcceptable()){
w.wake();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
});
thread.setName("ServerSocket Polling Thread");
thread.setDaemon(true);
thread.start();
}
private static void register(ServerSocketChannel sc, int ops, Waker waker){
synchronized (to_register){
to_register.add(new ToRegister(sc, ops, waker));
}
SELECTOR.wakeup();
} }
private final ServerSocketChannel socket; private final ServerSocketChannel socket;
@ -77,29 +45,25 @@ public class ServerSocket implements AutoCloseable{
} }
public Future<Socket, IOException> accept(){ public Future<Socket, IOException> accept(){
return new Future<>() { return waker -> {
@Override var accepted = socket.accept();
public Object poll(Waker waker) throws IOException { if(accepted==null) {
var socc = socket.accept(); SELECTOR.register(socket, SelectionKey.OP_ACCEPT, waker);
if(socc==null) { return Future.Pending.INSTANCE;
register(socket, SelectionKey.OP_ACCEPT, waker);
return Pending.INSTANCE;
}
socc.configureBlocking(false);
return new Socket(socc);
}
@Override
public void cancel() {
try {
if(socket!=null) socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
accepted.configureBlocking(false);
return new Socket(accepted);
}; };
} }
public <T> void set_options(SocketOption<T> option, T value) throws IOException{
socket.setOption(option, value);
}
public SocketAddress local_address() throws IOException {
return socket.getLocalAddress();
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
socket.close(); socket.close();

View file

@ -1,73 +1,40 @@
package async_runtime.net; package async_runtime.net;
import async_runtime.SelectorThread;
import future.Future; import future.Future;
import future.Waker; import future.Waker;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
public class Socket implements AutoCloseable{ public class Socket implements AutoCloseable{
private final static SelectorThread<SocketChannel, Waker> SELECTOR;
private final static Selector SELECTOR; static {
private record ToRegister(SocketChannel sc, int ops, Waker waker){}
private final static ArrayDeque<ToRegister> to_register = new ArrayDeque<>();
static{
try { try {
SELECTOR = Selector.open(); SELECTOR = new SelectorThread<>("Socket") {
@Override
public void handle(SelectionKey key, SocketChannel c, Waker w) throws IOException {
if (!key.isValid()) {
}else if(key.isAcceptable()){
}else if(key.isConnectable()){
c.finishConnect();
w.wake();
}else if(key.isReadable()){
w.wake();
}else if(key.isWritable()){
w.wake();
}
}
};
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var thread = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()){
try{
synchronized (to_register){
while(!to_register.isEmpty()){
var to = to_register.poll();
to.sc.register(SELECTOR, to.ops, to.waker);
}
}
SELECTOR.select();
var keys = SELECTOR.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
var c = (SocketChannel)key.channel();
var w = (Waker)key.attachment();
if (!key.isValid()) {
}else if(key.isAcceptable()){
}else if(key.isConnectable()){
c.finishConnect();
w.wake();
}else if(key.isReadable()){
w.wake();
}else if(key.isWritable()){
w.wake();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
});
thread.setName("Socket Polling Thread");
thread.setDaemon(true);
thread.start();
}
private static void register(SocketChannel sc, int ops, Waker waker){
synchronized (to_register){
to_register.add(new ToRegister(sc, ops, waker));
}
SELECTOR.wakeup();
} }
private final SocketChannel socket; private final SocketChannel socket;
@ -86,88 +53,65 @@ public class Socket implements AutoCloseable{
socket.configureBlocking(false); socket.configureBlocking(false);
var connected = socket.connect(inet); var connected = socket.connect(inet);
if(!connected) { if(!connected) {
register(socket, SelectionKey.OP_CONNECT, waker); SELECTOR.register(socket, SelectionKey.OP_CONNECT, waker);
return Pending.INSTANCE; return Pending.INSTANCE;
} }
} }
if(socket.isConnected()) return new Socket(socket); if(socket.isConnected()) return new Socket(socket);
return Pending.INSTANCE; return Pending.INSTANCE;
} }
};
}
@Override public <T> Socket set_options(SocketOption<T> option, T value) throws IOException{
public void cancel() { socket.setOption(option, value);
try { return this;
if(socket!=null) socket.close(); }
} catch (IOException e) {
throw new RuntimeException(e); public SocketAddress local_address() throws IOException {
} return socket.getLocalAddress();
} }
public SocketAddress remote_address() throws IOException {
return socket.getRemoteAddress();
}
public Future<Integer, IOException> write(ByteBuffer buffer){
return waker -> {
var wrote = socket.write(buffer);
if(wrote!=0) return wrote;
SELECTOR.register(socket, SelectionKey.OP_WRITE, waker);
return Future.Pending.INSTANCE;
}; };
} }
public Future<Integer, IOException> write_all(ByteBuffer buffer){ public Future<Integer, IOException> write_all(ByteBuffer buffer){
return new Future<>() { var wrote = buffer.remaining();
int wrote = 0; return waker -> {
@Override socket.write(buffer);
public Object poll(Waker waker) throws IOException { if(!buffer.hasRemaining()) return wrote;
wrote += socket.write(buffer); SELECTOR.register(socket, SelectionKey.OP_WRITE, waker);
if(!buffer.hasRemaining()) return wrote; return Future.Pending.INSTANCE;
register(socket, SelectionKey.OP_WRITE, waker);
return Pending.INSTANCE;
}
@Override
public void cancel() {
try {
if(socket!=null) socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}; };
} }
public Future<Integer, IOException> read(ByteBuffer buffer){ public Future<Integer, IOException> read(ByteBuffer buffer){
return new Future<>() { return waker -> {
int read = 0; var read = socket.read(buffer);
@Override if(read!=0) return read;
public Object poll(Waker waker) throws IOException { SELECTOR.register(socket, SelectionKey.OP_READ, waker);
read += socket.read(buffer); return Future.Pending.INSTANCE;
if(read>0) return read;
register(socket, SelectionKey.OP_READ, waker);
return Pending.INSTANCE;
}
@Override
public void cancel() {
try {
if(socket!=null) socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}; };
} }
public Future<Integer, IOException> read_all(ByteBuffer buffer){ public Future<Integer, IOException> read_all(ByteBuffer buffer){
return new Future<>() { int read = buffer.remaining();
int read = 0; return waker -> {
@Override var read_now = socket.read(buffer);
public Object poll(Waker waker) throws IOException { if(read_now ==-1)throw new IOException("Reached EOS while filling buffer");
read += socket.read(buffer); if(!buffer.hasRemaining()) return read;
if(!buffer.hasRemaining()) return read; SELECTOR.register(socket, SelectionKey.OP_READ, waker);
register(socket, SelectionKey.OP_READ, waker); return Future.Pending.INSTANCE;
return Pending.INSTANCE;
}
@Override
public void cancel() {
try {
if(socket!=null) socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}; };
} }

View file

@ -2,9 +2,7 @@ package future;
public interface Future<R, E extends Throwable> { public interface Future<R, E extends Throwable> {
default Object poll(Waker waker) throws E{ Object poll(Waker waker) throws E;
return Pending.INSTANCE;
}
default R await() throws E{ default R await() throws E{
throw new RuntimeException("NO!"); throw new RuntimeException("NO!");