fixed bug in saving to local for await, fixed async runtime (again)

This commit is contained in:
Parker TenBroeck 2025-05-01 09:55:56 -04:00
parent f715506ace
commit 8593eada03
4 changed files with 30 additions and 31 deletions

View file

@ -15,20 +15,28 @@ public class Examples {
static long sent = 0; static long sent = 0;
static long received = 0; static long received = 0;
public static Future<Void, RuntimeException> test(){ public static Future<Void, RuntimeException> test(){
Jokio.runtime(Waker.waker()).spawn(server()); Jokio.runtime().await().spawn(server());
for(int i = 0; i < 10000; i ++){
for(int i = 0; i < 100; i ++){
var builder = new StringBuilder(); var builder = new StringBuilder();
for(int c = 0; c < 4096*2; c ++) for(int c = 0; c < 4096*16*3; c ++)
builder.append((char)((Math.random()*('z'-'a')+'a'))); builder.append((char)((Math.random()*('z'-'a')+'a')));
Jokio.runtime(Waker.waker()).spawn(echoForever(builder.toString())); Jokio.runtime().await().spawn(echoForever(builder.toString()));
} }
var start = System.currentTimeMillis();
while(true){ while(true){
System.out.println(sent + " " + received + " " + Jokio.polled);
Delay.delay(100).await(); Delay.delay(100).await();
var now = System.currentTimeMillis();
System.out.println(sent + " " + received + " " + (now-start));
start = now;
} }
} }
public static Future<Integer, RuntimeException> number(){
return Future.ret(12);
}
public static Future<Void, IOException> server(){ public static Future<Void, IOException> server(){
try(var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){ try(var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){
@ -44,7 +52,7 @@ public class Examples {
public static Future<Void, IOException> echo(Socket socket){ public static Future<Void, IOException> echo(Socket socket){
try(socket){ try(socket){
var buffer = ByteBuffer.allocate(4096); var buffer = ByteBuffer.allocate(4096*16*3);
while(true){ while(true){
var read = socket.read(buffer).await(); var read = socket.read(buffer).await();
buffer.clear().limit(read); buffer.clear().limit(read);
@ -66,8 +74,8 @@ public class Examples {
sent++; sent++;
buffer.clear().limit(wrote); buffer.clear().limit(wrote);
socket.read_all(buffer).await(); socket.read_all(buffer).await();
if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes))) // if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes)))
throw new RuntimeException(); // throw new RuntimeException();
received++; received++;
buffer.clear(); buffer.clear();
} }

View file

@ -7,8 +7,6 @@ import java.util.ArrayDeque;
import java.util.HashSet; import java.util.HashSet;
public class Jokio implements Runnable{ public class Jokio implements Runnable{
public static long polled = 0;
private class Task<T, E extends Throwable> implements Waker{ private class Task<T, E extends Throwable> implements Waker{
public final Future<T, E> future; public final Future<T, E> future;
@ -19,7 +17,7 @@ public class Jokio implements Runnable{
@Override @Override
public void wake() { public void wake() {
synchronized (Jokio.this){ synchronized (Jokio.this){
if(wokeSet.add(this)) if(currentSet.contains(this)&&wokeSet.add(this))
wokeQueue.add(this); wokeQueue.add(this);
Jokio.this.notifyAll(); Jokio.this.notifyAll();
} }
@ -43,9 +41,9 @@ public class Jokio implements Runnable{
return ((Task<?, ?>)waker).runtime(); return ((Task<?, ?>)waker).runtime();
} }
private volatile long current = 0;
private final ArrayDeque<Task<?, ?>> wokeQueue = new ArrayDeque<>(); private final ArrayDeque<Task<?, ?>> wokeQueue = new ArrayDeque<>();
private final HashSet<Task<?, ?>> wokeSet = new HashSet<>(); private final HashSet<Task<?, ?>> wokeSet = new HashSet<>();
private final HashSet<Task<?, ?>> currentSet = new HashSet<>();
public void blocking(Future<?, RuntimeException> fut){ public void blocking(Future<?, RuntimeException> fut){
spawn(fut).run(); spawn(fut).run();
@ -54,7 +52,7 @@ public class Jokio implements Runnable{
public Jokio spawn(Future<?, ?> future){ public Jokio spawn(Future<?, ?> future){
var task = new Task<>(future); var task = new Task<>(future);
synchronized (this){ synchronized (this){
current++; currentSet.add(task);
wokeQueue.add(task); wokeQueue.add(task);
wokeSet.add(task); wokeSet.add(task);
} }
@ -63,8 +61,9 @@ public class Jokio implements Runnable{
@Override @Override
public void run(){ public void run(){
while(current > 0) { while(true) {
synchronized (this) { synchronized (this) {
if(currentSet.isEmpty())break;
while (wokeQueue.isEmpty()) { while (wokeQueue.isEmpty()) {
try { try {
this.wait(); this.wait();
@ -77,26 +76,24 @@ public class Jokio implements Runnable{
synchronized (this){ synchronized (this){
task = wokeQueue.poll(); task = wokeQueue.poll();
wokeSet.remove(task); wokeSet.remove(task);
if(!currentSet.contains(task))continue;
} }
Object result; Object result;
try{ try{
result = task.future.poll(task); result = task.future.poll(task);
}catch (Throwable t){ }catch (Throwable t){
throw new RuntimeException(t); System.out.println("Future " + task.future + " Threw Exception");
//// System.out.println("Future " + task.future + " Threw Exception"); t.printStackTrace();
//// t.printStackTrace(); synchronized (this){
// synchronized (this){ currentSet.remove(task);
// current--; }
// polled++; continue;
// }
// continue;
} }
synchronized (this){ synchronized (this){
if(result!=Future.Pending.INSTANCE) { if(result!=Future.Pending.INSTANCE) {
current--; currentSet.remove(task);
System.out.println(result); System.out.println(result);
} }
polled++;
} }
} }

View file

@ -46,12 +46,6 @@ public class ServerSocket implements AutoCloseable{
}else if(key.isAcceptable()){ }else if(key.isAcceptable()){
w.wake(); w.wake();
} }
// else if(key.isConnectable()){
// }else if(key.isReadable()){
// w.wake();
// }else if(key.isWritable()){
// w.wake();
// }
} }
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();

View file

@ -47,8 +47,8 @@ public class FutureSMBuilder extends StateMachineBuilder {
var sst = new SavedStateTracker(); var sst = new SavedStateTracker();
bcb.storeLocal(TypeKind.REFERENCE, frame.locals().length+2);
frame.save_locals(smb, cob, sst,2); frame.save_locals(smb, cob, sst,2);
bcb.storeLocal(TypeKind.REFERENCE, frame.locals().length+2);
frame.save_stack(smb, cob, sst,1); frame.save_stack(smb, cob, sst,1);
bcb.loadLocal(TypeKind.REFERENCE, frame.locals().length+2); bcb.loadLocal(TypeKind.REFERENCE, frame.locals().length+2);
bcb.areturn().labelBinding(restore_label); bcb.areturn().labelBinding(restore_label);