added more async networking features

This commit is contained in:
Parker TenBroeck 2025-04-30 21:45:20 -04:00
parent 7bb3547cda
commit f715506ace
8 changed files with 268 additions and 80 deletions

View file

@ -1,30 +1,74 @@
import async_example.Delay;
import async_example.Jokio;
import async_example.Socket;
import async_example.net.ServerSocket;
import async_example.net.Socket;
import generator.future.Future;
import generator.future.Waker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class Examples {
public static Future<Void> test(){
for(int i = 0; i < 1000; i ++){
Jokio.runtime(Waker.waker()).spawn(echoForever("Message " + i + "\n"));
static long sent = 0;
static long received = 0;
public static Future<Void, RuntimeException> test(){
Jokio.runtime(Waker.waker()).spawn(server());
for(int i = 0; i < 10000; i ++){
var builder = new StringBuilder();
for(int c = 0; c < 4096*2; c ++)
builder.append((char)((Math.random()*('z'-'a')+'a')));
Jokio.runtime(Waker.waker()).spawn(echoForever(builder.toString()));
}
while(true){
System.out.println(sent + " " + received + " " + Jokio.polled);
Delay.delay(100).await();
}
}
public static Future<Void, IOException> server(){
try(var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){
while (true){
var socket = ss.accept().await();
Jokio.runtime(Waker.waker()).spawn(echo(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
return Future.ret(null);
}
public static Future<Void> echoForever(String message){
try(var socket = Socket.connect(new InetSocketAddress("45.79.112.203", 4242)).await()){
var buffer = ByteBuffer.allocate(500);
public static Future<Void, IOException> echo(Socket socket){
try(socket){
var buffer = ByteBuffer.allocate(4096);
while(true){
buffer.limit(message.length()).put(message.getBytes(StandardCharsets.UTF_8)).position(0);
var read = socket.read(buffer).await();
buffer.clear().limit(read);
socket.write_all(buffer).await();
buffer.clear();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static Future<Void, IOException> echoForever(String message){
byte[] msg_bytes = message.getBytes(StandardCharsets.UTF_8);
try(var socket = Socket.connect(new InetSocketAddress("localhost", 42069)).await()){
var buffer = ByteBuffer.allocate(message.length());
while(true){
buffer.limit(message.length()).put(msg_bytes).position(0);
var wrote = socket.write_all(buffer).await();
sent++;
buffer.clear().limit(wrote);
var read = socket.read_all(buffer).await();
System.out.print(new String(buffer.array(), 0, read));
socket.read_all(buffer).await();
if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes)))
throw new RuntimeException();
received++;
buffer.clear();
}
} catch (Exception e) {

View file

@ -32,7 +32,7 @@ public class Main implements Runnable {
// }
// }
Object simple_async_rt(Future<?> fut){
Object simple_async_rt(Future<?, RuntimeException> fut){
final var waker = new Waker(){
@Override
public void wake() {

View file

@ -6,7 +6,7 @@ import generator.future.Waker;
import java.util.Timer;
import java.util.TimerTask;
public class Delay implements Future<String> {
public class Delay implements Future<Void, RuntimeException> {
private final static Timer timer;
private TimerTask task;
@ -17,11 +17,15 @@ public class Delay implements Future<String> {
private int delay;
private boolean ready;
public Delay(int ms) {
protected Delay(int ms) {
if (ms < 0) throw new IllegalArgumentException("async_example.Delay cannot be negative");
delay = ms;
}
public static Future<Void, RuntimeException> delay(int ms){
return new Delay(ms);
}
@Override
public void cancel() {
if (task != null) task.cancel();

View file

@ -5,23 +5,22 @@ import generator.future.Waker;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
public class Jokio implements Runnable{
private class Task<T> implements Waker{
public final Future<T> future;
public static long polled = 0;
private class Task<T, E extends Throwable> implements Waker{
public final Future<T, E> future;
private Task(Future<T> future) {
private Task(Future<T, E> future) {
this.future = future;
}
@Override
public void wake() {
woke.add(this);
synchronized (Jokio.this){
if(wokeSet.add(this))
wokeQueue.add(this);
Jokio.this.notifyAll();
}
}
@ -31,38 +30,42 @@ public class Jokio implements Runnable{
}
}
public static Future<Jokio> runtime(){
public static Future<Jokio, RuntimeException> runtime(){
return new Future<>() {
@Override
public Jokio poll(Waker waker) {
return ((Task<?>)waker).runtime();
return ((Task<?, ?>)waker).runtime();
}
};
}
public static Jokio runtime(Waker waker){
return ((Task<?>)waker).runtime();
return ((Task<?, ?>)waker).runtime();
}
private final AtomicInteger current = new AtomicInteger(0);
private final ConcurrentLinkedDeque<Task<?>> woke = new ConcurrentLinkedDeque<>();
private volatile long current = 0;
private final ArrayDeque<Task<?, ?>> wokeQueue = new ArrayDeque<>();
private final HashSet<Task<?, ?>> wokeSet = new HashSet<>();
public void blocking(Future<?> fut){
public void blocking(Future<?, RuntimeException> fut){
spawn(fut).run();
}
public Jokio spawn(Future<?> future){
public Jokio spawn(Future<?, ?> future){
var task = new Task<>(future);
current.getAndIncrement();
woke.add(task);
synchronized (this){
current++;
wokeQueue.add(task);
wokeSet.add(task);
}
return this;
}
@Override
public void run(){
while(current.get() > 0) {
while(current > 0) {
synchronized (this) {
while (woke.isEmpty()) {
while (wokeQueue.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
@ -70,12 +73,32 @@ public class Jokio implements Runnable{
}
}
}
var task = woke.poll();
var result = task.future.poll(task);
Task<?, ?> task;
synchronized (this){
task = wokeQueue.poll();
wokeSet.remove(task);
}
Object result;
try{
result = task.future.poll(task);
}catch (Throwable t){
throw new RuntimeException(t);
//// System.out.println("Future " + task.future + " Threw Exception");
//// t.printStackTrace();
// synchronized (this){
// current--;
// polled++;
// }
// continue;
}
synchronized (this){
if(result!=Future.Pending.INSTANCE) {
current.getAndDecrement();
current--;
System.out.println(result);
}
polled++;
}
}
}
}

View file

@ -0,0 +1,114 @@
package async_example.net;
import generator.future.Future;
import generator.future.Waker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
public class ServerSocket implements AutoCloseable{
private final static Selector SELECTOR;
private record ToRegister(ServerSocketChannel sc, int ops, Waker waker){}
private final static ArrayDeque<ToRegister> to_register = new ArrayDeque<>();
static{
try {
SELECTOR = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
var thread = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()){
try{
synchronized (to_register){
while(!to_register.isEmpty()){
var to = to_register.poll();
to.sc.register(SELECTOR, to.ops, to.waker);
}
}
SELECTOR.select();
var keys = SELECTOR.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
var c = (ServerSocketChannel)key.channel();
var w = (Waker)key.attachment();
if (!key.isValid()) {
}else if(key.isAcceptable()){
w.wake();
}
// else if(key.isConnectable()){
// }else if(key.isReadable()){
// w.wake();
// }else if(key.isWritable()){
// w.wake();
// }
}
}catch (Exception e){
e.printStackTrace();
}
}
});
thread.setName("ServerSocket Polling Thread");
thread.setDaemon(true);
thread.start();
}
private static void register(ServerSocketChannel sc, int ops, Waker waker){
synchronized (to_register){
to_register.add(new ToRegister(sc, ops, waker));
}
SELECTOR.wakeup();
}
private final ServerSocketChannel socket;
private ServerSocket(ServerSocketChannel sc){
this.socket = sc;
}
public static ServerSocket bind(InetSocketAddress inet) throws IOException {
var socket = ServerSocketChannel.open();
socket.configureBlocking(false);
socket.bind(inet);
return new ServerSocket(socket);
}
public Future<Socket, IOException> accept(){
return new Future<>() {
@Override
public Object poll(Waker waker) throws IOException {
var socc = socket.accept();
if(socc==null) {
register(socket, SelectionKey.OP_ACCEPT, waker);
return Pending.INSTANCE;
}
socc.configureBlocking(false);
return new Socket(socc);
}
@Override
public void cancel() {
try {
if(socket!=null) socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
@Override
public void close() throws Exception {
socket.close();
}
}

View file

@ -1,4 +1,4 @@
package async_example;
package async_example.net;
import generator.future.Future;
import generator.future.Waker;
@ -43,16 +43,15 @@ public class Socket implements AutoCloseable{
var w = (Waker)key.attachment();
if (!key.isValid()) {
}
if(key.isAcceptable()){
}else if(key.isAcceptable()){
}else if(key.isConnectable()){
c.finishConnect();
}else if(key.isReadable()){
}else if(key.isWritable()){
}
w.wake();
}else if(key.isReadable()){
w.wake();
}else if(key.isWritable()){
w.wake();
}
}
}catch (Exception e){
e.printStackTrace();
@ -73,17 +72,16 @@ public class Socket implements AutoCloseable{
private final SocketChannel socket;
private Socket(SocketChannel sc){
protected Socket(SocketChannel sc){
this.socket = sc;
}
public static Future<Socket> connect(InetSocketAddress inet) {
public static Future<Socket, IOException> connect(InetSocketAddress inet) {
return new Future<>() {
public SocketChannel socket;
@Override
public Object poll(Waker waker) {
public Object poll(Waker waker) throws IOException {
if(socket==null){
try{
socket = SocketChannel.open();
socket.configureBlocking(false);
var connected = socket.connect(inet);
@ -91,9 +89,6 @@ public class Socket implements AutoCloseable{
register(socket, SelectionKey.OP_CONNECT, waker);
return Pending.INSTANCE;
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
if(socket.isConnected()) return new Socket(socket);
return Pending.INSTANCE;
@ -110,16 +105,12 @@ public class Socket implements AutoCloseable{
};
}
public Future<Integer> write_all(ByteBuffer buffer){
public Future<Integer, IOException> write_all(ByteBuffer buffer){
return new Future<>() {
int wrote = 0;
@Override
public Object poll(Waker waker) {
try {
public Object poll(Waker waker) throws IOException {
wrote += socket.write(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
if(!buffer.hasRemaining()) return wrote;
register(socket, SelectionKey.OP_WRITE, waker);
return Pending.INSTANCE;
@ -136,16 +127,34 @@ public class Socket implements AutoCloseable{
};
}
public Future<Integer> read_all(ByteBuffer buffer){
public Future<Integer, IOException> read(ByteBuffer buffer){
return new Future<>() {
int read = 0;
@Override
public Object poll(Waker waker) {
try {
public Object poll(Waker waker) throws IOException {
read += socket.read(buffer);
if(read>0) return read;
register(socket, SelectionKey.OP_READ, waker);
return Pending.INSTANCE;
}
@Override
public void cancel() {
try {
if(socket!=null) socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
public Future<Integer, IOException> read_all(ByteBuffer buffer){
return new Future<>() {
int read = 0;
@Override
public Object poll(Waker waker) throws IOException {
read += socket.read(buffer);
if(!buffer.hasRemaining()) return read;
register(socket, SelectionKey.OP_READ, waker);
return Pending.INSTANCE;

View file

@ -1,22 +1,22 @@
package generator.future;
public interface Future<R> {
public interface Future<R, E extends Throwable> {
default Object poll(Waker waker){
default Object poll(Waker waker) throws E{
return Pending.INSTANCE;
}
default R await(){
default R await() throws E{
throw new RuntimeException("NO!");
}
default void cancel(){}
static <R> Future<R> ret(R r){
static <R, E extends Throwable> Future<R, E> ret(R r){
throw new RuntimeException("NO!");
}
static Future<Void> ret(){
static <E extends Throwable> Future<Void, E> ret(){
throw new RuntimeException();
}

View file

@ -6,10 +6,7 @@ import java.lang.classfile.attribute.RuntimeVisibleTypeAnnotationsAttribute;
import java.lang.classfile.attribute.StackMapFrameInfo;
import java.lang.classfile.attribute.StackMapTableAttribute;
import java.lang.classfile.instruction.*;
import java.lang.constant.ClassDesc;
import java.lang.constant.DynamicConstantDesc;
import java.lang.constant.MethodHandleDesc;
import java.lang.constant.MethodTypeDesc;
import java.lang.constant.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -91,7 +88,7 @@ public class FrameTracker {
case ITEM_BYTE -> "byte";
case ITEM_SHORT -> "short";
case ITEM_CHAR -> "char";
case ITEM_LONG_2ND -> "float2";
case ITEM_LONG_2ND -> "long2";
case ITEM_DOUBLE_2ND -> "double2";
default -> throw new IllegalStateException("Unexpected value: " + tag);
};
@ -311,10 +308,10 @@ public class FrameTracker {
case ConstantInstruction c when ins.opcode() == Opcode.ACONST_NULL -> pushStack(Type.NULL_TYPE);
case ConstantInstruction c -> {
switch(c.constantValue()){
case Double _ -> pushStack(Type.DOUBLE_TYPE);
case Double _ -> pushStack(Type.DOUBLE_TYPE, Type.DOUBLE2_TYPE);
case Float _ -> pushStack(Type.FLOAT_TYPE);
case Integer _ -> pushStack(Type.INTEGER_TYPE);
case Long _ -> pushStack(Type.LONG_TYPE);
case Long _ -> pushStack(Type.LONG_TYPE, Type.LONG2_TYPE);
case String _ -> pushStack(Type.STRING_TYPE);
case ClassDesc desc -> pushStack(desc);
case DynamicConstantDesc dynamicConstantDesc -> pushStack(dynamicConstantDesc.constantType());
@ -352,9 +349,6 @@ public class FrameTracker {
case InvokeInstruction i -> {
for(var param : i.typeSymbol().parameterArray())
decStack(TypeKind.from(param).slotSize());
if(stack.isEmpty()){
System.out.println(Arrays.toString(i.typeSymbol().parameterArray()));
}
popStack();
pushStack(i.typeSymbol().returnType());
}