added task handles to runtime

This commit is contained in:
Parker TenBroeck 2025-05-04 20:12:08 -04:00
parent 0d23093d3b
commit 0dd6fb237d
2 changed files with 100 additions and 54 deletions

View file

@ -8,6 +8,7 @@ import generators.loadtime.future.Cancellation;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
public class Main implements Runnable { public class Main implements Runnable {
@ -17,40 +18,50 @@ public class Main implements Runnable {
@Override @Override
public void run() { public void run() {
int value = 0;
String meow = "";
Function<Integer, String> test = new Function<Integer, String>() {
int i = 0;
@Override
public String apply(Integer i) {
return Main.this + meow + value + i;
}
};
System.out.println(test.apply(12));
// lexer(); // lexer();
// await(); //// await();
try { // try {
new Jokio().blocking(files()); // System.out.println(new Jokio().blocking(files()));
} catch (IOException ignore) {} // } catch (IOException ignore) {}
}
static Future<Void, Throwable> files() throws IOException{
try(@Cancellation("close") var file = File.open(Path.of("./src/Main.java"))){
var buf = ByteBuffer.allocate((int) file.size());
var read = file.read_all(buf).await();
System.out.println(new String(buf.array(), 0, read));
}
return Future.ret();
}
void async_lambda(Supplier<Future<?, ?>> lambda){
new Jokio().blocking(lambda.get());
}
void await(){
try{
new Jokio().blocking(AsyncExamples.run());
}catch (Exception e){
throw new RuntimeException(e);
}
}
void lexer(){
var gen = Lexer.parse("f7(x,y,z,w, u,v, othersIg) = v-(x*y+y+ln(z)^2*sin(z*pi/2))/(w*u)+sqrt(othersIg*120e-1)");
while(gen.next() instanceof Gen.Yield(var tok)) {
System.out.println(tok);
}
} }
//
// static Future<String, IOException> files() throws IOException{
// try(@Cancellation("close") var file = File.open(Path.of("./src/Main.java"))){
// var buf = ByteBuffer.allocate((int) file.size());
// var read = file.read_all(buf).await();
// return Future.ret(new String(buf.array(), 0, read));
// }
// }
//
// <T, E extends Throwable> T async_lambda(Supplier<Future<T, E>> lambda) throws E{
// return new Jokio().blocking(lambda.get());
// }
//
//
// void await(){
// try{
// new Jokio().blocking(AsyncExamples.run());
// }catch (Exception e){
// throw new RuntimeException(e);
// }
// }
//
//
// void lexer(){
// var gen = Lexer.parse("f7(x,y,z,w, u,v, othersIg) = v-(x*y+y+ln(z)^2*sin(z*pi/2))/(w*u)+sqrt(othersIg*120e-1)");
// while(gen.next() instanceof Gen.Yield(var tok)) {
// System.out.println(tok);
// }
// }
} }

View file

@ -7,10 +7,12 @@ import java.util.ArrayDeque;
import java.util.HashSet; import java.util.HashSet;
public class Jokio implements Runnable{ public class Jokio implements Runnable{
private class Task<T, E extends Throwable> implements Waker{ public class TaskHandle<T, E extends Throwable> implements Waker, Future<T, E>{
public final Future<T, E> future; private final Future<T, E> future;
private Object result = Pending.INSTANCE;
private Throwable err;
private Task(Future<T, E> future) { private TaskHandle(Future<T, E> future) {
this.future = future; this.future = future;
} }
@ -26,37 +28,60 @@ public class Jokio implements Runnable{
public Jokio runtime(){ public Jokio runtime(){
return Jokio.this; return Jokio.this;
} }
@Override
public Object poll(Waker waker) throws E {
if(err!=null)throw (E)err;
return result;
}
public T blocking() throws E{
while(result == Pending.INSTANCE) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(err!=null)throw (E)err;
}
return (T) result;
}
@Override
public synchronized void cancel() throws E {
synchronized (Jokio.this){
currentSet.remove(this);
}
future.cancel();
}
} }
public static Future<Jokio, RuntimeException> runtime(){ public static Future<Jokio, RuntimeException> runtime(){
return new Future<>() { return Jokio::runtime;
@Override
public Jokio poll(Waker waker) {
return ((Task<?, ?>)waker).runtime();
}
};
} }
public static Jokio runtime(Waker waker){ public static Jokio runtime(Waker waker){
return ((Task<?, ?>)waker).runtime(); return ((TaskHandle<?, ?>)waker).runtime();
} }
private final ArrayDeque<Task<?, ?>> wokeQueue = new ArrayDeque<>(); private final ArrayDeque<TaskHandle<?, ?>> wokeQueue = new ArrayDeque<>();
private final HashSet<Task<?, ?>> wokeSet = new HashSet<>(); private final HashSet<TaskHandle<?, ?>> wokeSet = new HashSet<>();
private final HashSet<Task<?, ?>> currentSet = new HashSet<>(); private final HashSet<TaskHandle<?, ?>> currentSet = new HashSet<>();
public void blocking(Future<?, ?> fut){ public <T, E extends Throwable> T blocking(Future<T, E> fut) throws E {
spawn(fut).run(); var result = spawn(fut);
run();
return result.blocking();
} }
public Jokio spawn(Future<?, ?> future){ public <T, E extends Throwable> TaskHandle<T, E> spawn(Future<T, E> future){
var task = new Task<>(future); var task = new TaskHandle<>(future);
synchronized (this){ synchronized (this){
currentSet.add(task); currentSet.add(task);
wokeQueue.add(task); wokeQueue.add(task);
wokeSet.add(task); wokeSet.add(task);
} }
return this; return task;
} }
@Override @Override
@ -72,7 +97,7 @@ public class Jokio implements Runnable{
} }
} }
} }
Task<?, ?> task; TaskHandle<?, ?> task;
synchronized (this){ synchronized (this){
task = wokeQueue.poll(); task = wokeQueue.poll();
wokeSet.remove(task); wokeSet.remove(task);
@ -80,8 +105,14 @@ public class Jokio implements Runnable{
} }
Object result; Object result;
try{ try{
synchronized (task){
result = task.future.poll(task); result = task.future.poll(task);
}
}catch (Throwable t){ }catch (Throwable t){
synchronized (task){
task.err = t;
task.notify();
}
System.out.println("Future " + task.future + " Threw Exception"); System.out.println("Future " + task.future + " Threw Exception");
t.printStackTrace(); t.printStackTrace();
synchronized (this){ synchronized (this){
@ -91,6 +122,10 @@ public class Jokio implements Runnable{
} }
synchronized (this){ synchronized (this){
if(result!=Future.Pending.INSTANCE) { if(result!=Future.Pending.INSTANCE) {
synchronized (task){
task.result = result;
task.notify();
}
currentSet.remove(task); currentSet.remove(task);
System.out.println(result); System.out.println(result);
} }