mirror of
https://github.com/ParkerTenBroeck/coroutines.git
synced 2026-06-07 05:08:51 -04:00
made progress
This commit is contained in:
parent
90bfb8a631
commit
f52827f97b
6 changed files with 175 additions and 38 deletions
|
|
@ -5,48 +5,71 @@ import generator.future.Waker;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
public class Socket implements AutoCloseable{
|
||||
|
||||
private final static Selector selector;
|
||||
|
||||
private final static Selector SELECTOR;
|
||||
private record ToRegister(SocketChannel sc, int ops, Waker waker){}
|
||||
private final static ArrayDeque<ToRegister> to_register = new ArrayDeque<>();
|
||||
static{
|
||||
try {
|
||||
selector = Selector.open();
|
||||
SELECTOR = Selector.open();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
new Thread(() -> {
|
||||
var thread = new Thread(() -> {
|
||||
while(!Thread.currentThread().isInterrupted()){
|
||||
try{
|
||||
selector.select();
|
||||
var keys = selector.selectedKeys().iterator();
|
||||
synchronized (to_register){
|
||||
while(!to_register.isEmpty()){
|
||||
var to = to_register.poll();
|
||||
to.sc.register(SELECTOR, to.ops, to.waker);
|
||||
}
|
||||
}
|
||||
|
||||
SELECTOR.select();
|
||||
var keys = SELECTOR.selectedKeys().iterator();
|
||||
|
||||
while (keys.hasNext()) {
|
||||
SelectionKey key = keys.next();
|
||||
keys.remove();
|
||||
var c = (SocketChannel)key.channel();
|
||||
var w = (Waker)key.attachment();
|
||||
|
||||
System.out.println(key);
|
||||
if (!key.isValid()) {
|
||||
|
||||
}else if(key.isAcceptable()){
|
||||
}
|
||||
if(key.isAcceptable()){
|
||||
|
||||
}else if(key.isConnectable()){
|
||||
|
||||
c.finishConnect();
|
||||
}else if(key.isReadable()){
|
||||
|
||||
}else if(key.isWritable()){
|
||||
|
||||
}
|
||||
w.wake();
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
});
|
||||
thread.setName("Socket Polling Thread");
|
||||
thread.setDaemon(true);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
private static void register(SocketChannel sc, int ops, Waker waker){
|
||||
synchronized (to_register){
|
||||
to_register.add(new ToRegister(sc, ops, waker));
|
||||
}
|
||||
SELECTOR.wakeup();
|
||||
}
|
||||
|
||||
private final SocketChannel socket;
|
||||
|
|
@ -55,25 +78,84 @@ public class Socket implements AutoCloseable{
|
|||
this.socket = sc;
|
||||
}
|
||||
|
||||
public static Future<Socket> bind(InetSocketAddress inet) throws IOException{
|
||||
var socket = SocketChannel.open();
|
||||
public static Future<Socket> connect(InetSocketAddress inet) {
|
||||
return new Future<>() {
|
||||
public SocketChannel socket;
|
||||
@Override
|
||||
public Socket poll(Waker waker) {
|
||||
try{
|
||||
socket.configureBlocking(false);
|
||||
socket.socket().bind(new InetSocketAddress("localhost", 8080));
|
||||
socket.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
|
||||
return new Socket(socket);
|
||||
}catch (Exception e){
|
||||
throw new RuntimeException(e);
|
||||
public Object poll(Waker waker) {
|
||||
if(socket==null){
|
||||
try{
|
||||
socket = SocketChannel.open();
|
||||
socket.configureBlocking(false);
|
||||
var connected = socket.connect(inet);
|
||||
if(!connected) {
|
||||
register(socket, SelectionKey.OP_CONNECT, waker);
|
||||
return Pending.INSTANCE;
|
||||
}
|
||||
}catch (Exception e){
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
if(socket.isConnected()) return new Socket(socket);
|
||||
return Pending.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
try {
|
||||
socket.close();
|
||||
if(socket!=null) socket.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public Future<Integer> write_all(ByteBuffer buffer){
|
||||
return new Future<>() {
|
||||
int wrote = 0;
|
||||
@Override
|
||||
public Object poll(Waker waker) {
|
||||
try {
|
||||
wrote += socket.write(buffer);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if(!buffer.hasRemaining()) return wrote;
|
||||
register(socket, SelectionKey.OP_WRITE, waker);
|
||||
return Pending.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
try {
|
||||
if(socket!=null) socket.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public Future<Integer> read_all(ByteBuffer buffer){
|
||||
return new Future<>() {
|
||||
int read = 0;
|
||||
@Override
|
||||
public Object poll(Waker waker) {
|
||||
try {
|
||||
read += socket.read(buffer);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if(!buffer.hasRemaining()) return read;
|
||||
register(socket, SelectionKey.OP_READ, waker);
|
||||
return Pending.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
try {
|
||||
if(socket!=null) socket.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue