added async file IO

This commit is contained in:
Parker TenBroeck 2025-05-04 17:14:33 -04:00
parent 386ec9d0d7
commit 0d23093d3b
10 changed files with 252 additions and 80 deletions

View file

@ -1,10 +1,7 @@
import async_runtime.Delay;
import async_runtime.Jokio;
import async_runtime.Util;
import async_runtime.net.ServerSocket;
import async_runtime.net.Socket;
import async_runtime.io.net.ServerSocket;
import async_runtime.io.net.Socket;
import future.Future;
import future.Waker;
import generators.loadtime.future.Cancellation;
import java.io.IOException;
@ -14,94 +11,52 @@ import java.nio.charset.StandardCharsets;
public class AsyncExamples {
private static long sent = 0;
private static long received = 0;
public static Future<Void, RuntimeException> test(){
public static Future<Void, RuntimeException> run() throws IOException {
Jokio.runtime().await().spawn(server());
for(int i = 0; i < 50; i ++){
var builder = new StringBuilder();
for(int c = 0; c < 4096*16*3; c ++)
for(int c = 0; c < 4096; c ++)
builder.append((char)((Math.random()*('z'-'a')+'a')));
Jokio.runtime().await().spawn(echoForever(builder.toString()));
}
var start = System.currentTimeMillis();
while(true){
Delay.delay(100).await();
var now = System.currentTimeMillis();
System.out.println(sent + " " + received + " " + (now-start));
start = now;
Jokio.runtime().await().spawn(verify_echo(builder.toString()));
}
return Future.ret();
}
public static Future<Integer, RuntimeException> number(){
return Future.ret(12);
}
public static Future<Void, IOException> server(){
try(var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){
public static Future<Void, IOException> server() throws IOException {
try(@Cancellation("close") var ss = ServerSocket.bind(new InetSocketAddress("0.0.0.0", 42069))){
while (true){
Util.select(
Util.selectee(ss.accept(), socket -> {
Jokio.runtime(Waker.waker()).spawn(echo(socket));
return Future.ret();
}),
Util.selectee(Delay.delay(500), _ -> {
System.out.println("Timeout");
return Future.ret();
})
).await().await();
Jokio.runtime().await().spawn(echo(ss.accept().await()));
}
} catch (Throwable e) {
e.printStackTrace();
}
return Future.ret(null);
}
public static synchronized Future<Void, RuntimeException> meow(){
Delay.delay(1000).await();
return Future.ret();
}
public synchronized Future<Void, RuntimeException> meow2(){
Delay.delay(1000).await();
return Future.ret();
}
public static Future<Void, IOException> echo(Socket socket){
public static Future<Void, IOException> echo(@Cancellation("close") Socket socket) throws IOException{
try(socket){
@Cancellation var buffer = ByteBuffer.allocate(4096*16*3);
var buffer = ByteBuffer.allocate(4096);
while(true){
var read = socket.read(buffer).await();
buffer.clear().limit(read);
socket.read(buffer).await();
buffer.flip();
socket.write_all(buffer).await();
buffer.clear();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static Future<Void, IOException> echoForever(String message){
public static Future<Void, IOException> verify_echo(String message) throws IOException {
byte[] msg_bytes = message.getBytes(StandardCharsets.UTF_8);
try(@Cancellation("close") var socket = Socket.connect(new InetSocketAddress("localhost", 42069)).await()){
var buffer = ByteBuffer.allocate(message.length());
while(true){
buffer.limit(message.length()).put(msg_bytes).position(0);
var wrote = socket.write_all(buffer).await();
sent++;
buffer.clear().limit(wrote);
socket.read_all(buffer).await();
// if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes)))
// throw new RuntimeException();
received++;
if(!buffer.position(0).equals(ByteBuffer.wrap(msg_bytes)))
throw new RuntimeException();
buffer.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
return Future.ret(null);
}
}

View file

@ -1,11 +1,13 @@
import async_runtime.Delay;
import async_runtime.Jokio;
import async_runtime.Util;
import async_runtime.io.fs.File;
import generators.RT;
import future.Future;
import gen.Gen;
import generators.loadtime.future.Cancellation;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.function.Supplier;
public class Main implements Runnable {
@ -15,8 +17,20 @@ public class Main implements Runnable {
@Override
public void run() {
lexer();
await();
// lexer();
// await();
try {
new Jokio().blocking(files());
} catch (IOException ignore) {}
}
static Future<Void, Throwable> files() throws IOException{
try(@Cancellation("close") var file = File.open(Path.of("./src/Main.java"))){
var buf = ByteBuffer.allocate((int) file.size());
var read = file.read_all(buf).await();
System.out.println(new String(buf.array(), 0, read));
}
return Future.ret();
}
void async_lambda(Supplier<Future<?, ?>> lambda){
@ -25,7 +39,11 @@ public class Main implements Runnable {
void await(){
new Jokio().blocking(AsyncExamples.test());
try{
new Jokio().blocking(AsyncExamples.run());
}catch (Exception e){
throw new RuntimeException(e);
}
}

View file

@ -0,0 +1,10 @@
package async_runtime.io;
import future.Future;
import java.nio.ByteBuffer;
public interface Readable<E extends Throwable> {
Future<Integer, E> read(ByteBuffer buffer);
Future<Integer, E> read_all(ByteBuffer buffer);
}

View file

@ -1,4 +1,4 @@
package async_runtime;
package async_runtime.io;
import java.io.IOException;
import java.nio.channels.*;

View file

@ -0,0 +1,10 @@
package async_runtime.io;
import future.Future;
import java.nio.ByteBuffer;
public interface Writable<E extends Throwable> {
Future<Integer, E> write(ByteBuffer buffer);
Future<Integer, E> write_all(ByteBuffer buffer);
}

View file

@ -0,0 +1,165 @@
package async_runtime.io.fs;
import async_runtime.io.Readable;
import async_runtime.io.Writable;
import future.Future;
import future.Waker;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class File implements AutoCloseable, Readable<IOException>, Writable<IOException> {
private final AsynchronousFileChannel channel;
protected File(AsynchronousFileChannel sc){
this.channel = sc;
}
public static File open(Path path) throws IOException {
return new File(AsynchronousFileChannel.open(path, StandardOpenOption.READ));
}
public long size() throws IOException {
return channel.size();
}
@Override
public Future<Integer, IOException> read(ByteBuffer buffer) {
return read(buffer, 0);
}
@Override
public Future<Integer, IOException> read_all(ByteBuffer buffer) {
return read_all(buffer, 0);
}
@Override
public Future<Integer, IOException> write(ByteBuffer buffer) {
return write(buffer, 0);
}
@Override
public Future<Integer, IOException> write_all(ByteBuffer buffer) {
return write_all(buffer, 0);
}
public Future<Integer, IOException> write_all(ByteBuffer buffer, long position){
return new Future<>() {
int written = 0;
Throwable t;
@Override
public Object poll(Waker waker) throws IOException {
if(t!=null)throw (IOException) t;
if(!buffer.hasRemaining()) return written;
channel.write(buffer, written+position, waker, new CompletionHandler<>() {
@Override
public void completed(Integer result, Waker attachment) {
written = result;
waker.wake();
}
@Override
public void failed(Throwable exc, Waker attachment) {
t = exc;
attachment.wake();;
}
});
return Pending.INSTANCE;
}
};
}
public Future<Integer, IOException> write(ByteBuffer buffer, long position){
return new Future<>() {
int written = 0;
Throwable t;
@Override
public Object poll(Waker waker) throws IOException {
if(t!=null)throw (IOException) t;
if(written!=0) return written;
channel.write(buffer, written+position, waker, new CompletionHandler<>() {
@Override
public void completed(Integer result, Waker attachment) {
written = result;
waker.wake();
}
@Override
public void failed(Throwable exc, Waker attachment) {
t = exc;
attachment.wake();;
}
});
return Pending.INSTANCE;
}
};
}
public Future<Integer, IOException> read(ByteBuffer buffer, long position){
return new Future<>() {
int read = 0;
boolean eos = false;
Throwable t;
@Override
public Object poll(Waker waker) throws IOException {
if(t!=null)throw (IOException) t;
if(eos) return read;
if(read!=0) return read;
channel.read(buffer, read+position, waker, new CompletionHandler<>() {
@Override
public void completed(Integer result, Waker attachment) {
if(result==-1)eos = true;
else read += result;
waker.wake();
waker.wake();
}
@Override
public void failed(Throwable exc, Waker attachment) {
t = exc;
attachment.wake();;
}
});
return Pending.INSTANCE;
}
};
}
public Future<Integer, IOException> read_all(ByteBuffer buffer, long position){
return new Future<>() {
int read = 0;
boolean eos = false;
Throwable t;
@Override
public Object poll(Waker waker) throws IOException {
if(t!=null)throw (IOException) t;
if(eos) return read;
if(!buffer.hasRemaining()) return read;
channel.read(buffer, read+position, waker, new CompletionHandler<>() {
@Override
public void completed(Integer result, Waker attachment) {
if(result==-1)eos = true;
else read += result;
waker.wake();
}
@Override
public void failed(Throwable exc, Waker attachment) {
t = exc;
attachment.wake();;
}
});
return Pending.INSTANCE;
}
};
}
@Override
public void close() throws IOException {
channel.close();
}
}

View file

@ -1,6 +1,8 @@
package async_runtime.net;
package async_runtime.io.net;
import async_runtime.SelectorThread;
import async_runtime.io.Readable;
import async_runtime.io.SelectorThread;
import async_runtime.io.Writable;
import future.Future;
import future.Waker;
@ -12,7 +14,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
public class DatagramSocket implements AutoCloseable{
public class DatagramSocket implements AutoCloseable, Readable<IOException>, Writable<IOException> {
private final static SelectorThread<DatagramChannel, Waker> SELECTOR;
static {
@ -76,6 +78,7 @@ public class DatagramSocket implements AutoCloseable{
return socket.getRemoteAddress();
}
@Override
public Future<Integer, IOException> write(ByteBuffer buffer){
return waker -> {
var wrote = socket.write(buffer);
@ -85,6 +88,7 @@ public class DatagramSocket implements AutoCloseable{
};
}
@Override
public Future<Integer, IOException> write_all(ByteBuffer buffer){
var wrote = buffer.remaining();
return waker -> {
@ -113,7 +117,7 @@ public class DatagramSocket implements AutoCloseable{
};
}
@Override
public Future<Integer, IOException> read(ByteBuffer buffer){
return waker -> {
var read = socket.read(buffer);
@ -123,6 +127,7 @@ public class DatagramSocket implements AutoCloseable{
};
}
@Override
public Future<Integer, IOException> read_all(ByteBuffer buffer){
int read = buffer.remaining();
return waker -> {

View file

@ -1,9 +1,12 @@
package async_runtime.net;
package async_runtime.io.net;
import async_runtime.SelectorThread;
import async_runtime.io.Readable;
import async_runtime.io.SelectorThread;
import async_runtime.io.Writable;
import future.Future;
import future.Waker;
import java.io.IO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -11,7 +14,7 @@ import java.net.SocketOption;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
public class ServerSocket implements AutoCloseable{
public class ServerSocket implements AutoCloseable {
private final static SelectorThread<ServerSocketChannel, Waker> SELECTOR;

View file

@ -1,6 +1,8 @@
package async_runtime.net;
package async_runtime.io.net;
import async_runtime.SelectorThread;
import async_runtime.io.Readable;
import async_runtime.io.SelectorThread;
import async_runtime.io.Writable;
import future.Future;
import future.Waker;
@ -12,7 +14,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class Socket implements AutoCloseable{
public class Socket implements AutoCloseable, Readable<IOException>, Writable<IOException> {
private final static SelectorThread<SocketChannel, Waker> SELECTOR;
static {
@ -76,6 +78,7 @@ public class Socket implements AutoCloseable{
return socket.getRemoteAddress();
}
@Override
public Future<Integer, IOException> write(ByteBuffer buffer){
return waker -> {
var wrote = socket.write(buffer);
@ -85,6 +88,7 @@ public class Socket implements AutoCloseable{
};
}
@Override
public Future<Integer, IOException> write_all(ByteBuffer buffer){
var wrote = buffer.remaining();
return waker -> {
@ -95,6 +99,7 @@ public class Socket implements AutoCloseable{
};
}
@Override
public Future<Integer, IOException> read(ByteBuffer buffer){
return waker -> {
var read = socket.read(buffer);
@ -104,6 +109,7 @@ public class Socket implements AutoCloseable{
};
}
@Override
public Future<Integer, IOException> read_all(ByteBuffer buffer){
int read = buffer.remaining();
return waker -> {

View file

@ -63,8 +63,8 @@ public class GeneratorClassLoader extends ClassLoader {
return ClassFile.of(ClassFile.AttributesProcessingOption.PASS_ALL_ATTRIBUTES, ClassFile.StackMapsOption.STACK_MAPS_WHEN_REQUIRED).build(clm.thisClass().asSymbol(), cb -> {
for (var ce : clm) {
if (ce instanceof MethodModel mem && !isGen && !isFuture) {
StateMachineBuilder builder;
if (ce instanceof MethodModel mem && !isGen && !isFuture && mem.code().isPresent()) {
StateMachineBuilder<?> builder;
if(mem.methodTypeSymbol().returnType().descriptorString().equals(Gen.class.descriptorString())){
builder = new GenSMBuilder(clm, mem, mem.code().get());
}else if(mem.methodTypeSymbol().returnType().descriptorString().equals(Future.class.descriptorString())){