xeij/NamedPipeInputStream.java
//========================================================================================
//  NamedPipeInputStream.java
//    en:Input from named pipe
//    ja:名前付きパイプから入力します
//  Copyright (C) 2003-2025 Makoto Kamada
//
//  This file is part of the XEiJ (X68000 Emulator in Java).
//  You can use, modify and redistribute the XEiJ if the conditions are met.
//  Read the XEiJ License for more details.
//  https://stdkmd.net/xeij/
//========================================================================================

package xeij;

import java.io.*;
import java.lang.foreign.*;  //Arena,FunctionDescriptor,Linker,MemorySegment,SymbolLookup,ValueLayout
import java.lang.invoke.*;  //MethodHandle
import java.util.*;  //NoSuchElementException

//class NamedPipeInputStream
//  名前付きパイプInputStream
public abstract class NamedPipeInputStream extends InputStream {

  //npis = NamedPipeInputStream.create (name)
  //  コンストラクタ
  //  npis  名前付きパイプInputStream
  //  name  名前付きパイプの名前。プレフィックスを含まない
  public static NamedPipeInputStream create (String name) throws IOException {
    return System.getProperty ("os.name").indexOf ("Windows") < 0 ? new Gen (name) : new Win (name);
  }  //open

  protected String path;  //パス。プレフィックスを含む
  protected volatile boolean closed;  //true=すでに閉じた
  protected volatile boolean connecting;  //true=接続待ちブロック中
  protected volatile boolean reading;  //true=受信待ちブロック中
  protected Thread thread;  //受信スレッド
  protected ByteQueue queue;  //受信キュー

  //npis = new NamedPipeInputStream (name)
  //  コンストラクタ
  //  npis  名前付きパイプInputStream
  //  name  名前付きパイプの名前。プレフィックスを含まない
  private NamedPipeInputStream (String name) throws IOException {
    //開始
    osStart (name);
    //受信キューを作る
    queue = new ByteQueue ();
    //受信スレッドを開始する
    thread = new Thread () {
      byte[] b = new byte[1024];
      @Override public void run () {
        try {
          //開いて接続する
          connecting = true;
          osOpenAndConnect ();  //送信側が開くまでブロックする
          connecting = false;
          while (!closed) {
            //受信する
            reading = true;
            int k = osRead (b);  //少なくとも1バイト受信するまでブロックする
            reading = false;
            if (closed) {
              break;
            }
            if (k == -1) {  //EOFのとき
              osClose ();  //直ちに接続し直す
              connecting = true;
              osOpenAndConnect ();  //送信側が開くまでブロックする
              connecting = false;
              continue;
            }
            //キューに書き込む
            queue.write (b, 0, k);
          }
        } catch (IOException ioe) {
          connecting = false;
          reading = false;
        }
        //閉じる
        osClose ();
      }
    };
    thread.start ();
    if (false) {
      System.out.println ("named pipe " + path + " opened");
    }
  }  //NamedPipeInputStream

  //------------------------------------------------------------------------
  //n = available ()
  //  ブロックせずに受信できる長さを返す
  //  n  ブロックせずに受信できる長さ
  @Override
  public int available () throws IOException {
    //閉じていたら失敗
    if (closed) {
      throw new IOException ("named pipe " + path + " closed");
    }
    //ブロックせずに受信できる長さを返す
    return queue.used ();
  }  //available

  //------------------------------------------------------------------------
  //close ()
  //  名前付きパイプを閉じる
  @Override
  public void close () {
    //閉じていたら何もしない
    if (closed) {
      return;
    }
    closed = true;
    //ブロックを解除する
    osUnblock ();
    //閉じる
    osClose ();
    //受信スレッドの終了を待つ
    try {
      thread.join (100L);
    } catch (InterruptedException ie) {
    }
    //終了
    osEnd ();
    //キューの読み出しをキャンセルする
    queue.cancel ();  //waitAndReadのブロックを解く
    if (false) {
      System.out.println ("named pipe " + path + " closed");
    }
  }  //close

  //------------------------------------------------------------------------
  //d = read ()
  //  1バイト受信する。1バイト受信するまでブロックする
  //  d  受信したデータ。0~255。-1=キャンセルされた
  @Override
  public int read () throws IOException {
    //閉じていたら失敗
    if (closed) {
      throw new IOException ("named pipe " + path + " closed");
    }
    //キューから1バイト読み出す
    return queue.waitAndRead ();  //1バイト読み出すまでブロックする
  }  //read

  //------------------------------------------------------------------------
  //k = read (b)
  //  配列に受信する。少なくとも1バイト受信するまでブロックする
  //  k  受信した長さ。-1=キャンセルされた
  //  b  配列
  @Override
  public int read (byte[] b) throws IOException {
    //閉じていたら失敗
    if (closed) {
      throw new IOException ("named pipe " + path + " closed");
    }
    //キューから配列に読み出す
    return queue.waitAndRead (b, 0, b.length);  //少なくとも1バイト読み出すまでブロックする
  }  //read

  //------------------------------------------------------------------------
  //k = read (b, o, n)
  //  配列に受信する。少なくとも1バイト受信するまでブロックする
  //  k  受信した長さ。-1=キャンセルされた
  //  b  配列
  //  o  位置
  //  n  長さ
  @Override
  public int read (byte[] b, int o, int n) throws IOException {
    //閉じていたら失敗
    if (closed) {
      throw new IOException ("named pipe " + path + " closed");
    }
    //引数を確認する
    if (o < 0 || n < 0 || b.length < o + n) {  //範囲外
      throw new IndexOutOfBoundsException ("b.length=" + b.length + ", o=" + o + ", n=" + n);
    }
    //キューから配列に読み出す。少なくとも1バイト読み出すまでブロックする
    return queue.waitAndRead (b, o, n);
  }  //read

  //------------------------------------------------------------------------
  //k = readNBytes (b, o, n)
  //  配列に受信する。nバイト受信するまでブロックする
  //  k  受信した長さ。-1=キャンセルされた
  //  b  配列
  //  o  位置
  //  n  長さ
  @Override
  public int readNBytes (byte[] b, int o, int n) throws IOException {
    //閉じていたら失敗
    if (closed) {
      throw new IOException ("named pipe " + path + " closed");
    }
    //引数を確認する
    if (o < 0 || n < 0 || b.length < o + n) {  //範囲外
      throw new IndexOutOfBoundsException ("b.length=" + b.length + ", o=" + o + ", n=" + n);
    }
    //キューから配列に読み出す
    int k = 0;
    while (k < n) {
      int t = queue.waitAndRead (b, o + k, n - k);  //少なくとも1バイト読み出すまでブロックする
      if (t == -1) {  //キャンセルされた
        return -1;
      }
      k += t;
    }
    //受信した長さを返す
    return k;
  }  //readNBytes

  protected abstract void osStart (String name) throws IOException;
  protected abstract void osOpenAndConnect () throws IOException;
  protected abstract int osRead (byte[] b) throws IOException;
  protected abstract void osUnblock ();
  protected abstract void osClose ();
  protected abstract void osEnd ();

  //class Gen
  //  UNIX系OS用
  private static class Gen extends NamedPipeInputStream {

    private File file;  //ファイル
    private FileInputStream stream;  //FileInputStream。null=まだ開いていないまたはすでに閉じた

    //npis = new Gen (name)
    //  コンストラクタ
    protected Gen (String name) throws IOException {
      super (name);
    }  //Gen

    //osStart (name)
    //  開始
    @Override
    protected void osStart (String name) throws IOException {
      //パス
      path = System.getProperty ("java.io.tmpdir") + "/" + name;  //通常は/tmp/~
      //ファイル
      file = new File (path);
      //mkfifoでFIFOを作る
      file.delete ();
      Process process;
      try {
        process = new ProcessBuilder ("mkfifo", path).inheritIO ().start ();
      } catch (IOException ioe) {  //start失敗。mkfifoを開始できなかった
        throw new IOException ("mkfifo " + path + " not started");
      }
      try {
        int exitCode = process.waitFor ();
        if (exitCode != 0) {  //mkfifoがエラー終了した
          file.delete ();
          throw new IOException ("mkfifo " + path + " terminated with exit code " + exitCode);
        }
      } catch (InterruptedException ie) {  //waitFor中断。mkfifoの終了を確認できなかった
        file.delete ();
        throw new IOException ("mkfifo " + path + " interrupted");
      }
    }  //osStart

    //osOpenAndConnect ()
    //  開いて接続する。送信側が開くまでブロックする
    @Override
    protected void osOpenAndConnect () throws IOException {
      if (stream == null) {
        stream = new FileInputStream (file);  //送信側が開くまでブロックする
      }
    }  //osOpenAndConnect

    //k = osRead (b)
    //  受信する。少なくとも1バイト受信するまでブロックする
    @Override
    protected int osRead (byte[] b) throws IOException {
      return stream.read (b);
    }  //osRead

    //osUnblock ()
    //  ブロックを解除する
    @Override
    protected void osUnblock () {
      //new FileInputStream()のブロックを解除する
      //  new FileInputStream()は送信側が開くまでブロックする。new FileOutputStream()でダミーの送信側を開くことで解除できる
      //  new FileInputStream()でブロックしていないのにnew FileOutputStream()を行うとnew FileOutputStream()がブロックしてしまう可能性がある
      if (connecting) {
        try {
          Thread.sleep (50L);
        } catch (InterruptedException ie) {
        }
        if (connecting) {
          try {
            new FileOutputStream (file).close ();
          } catch (IOException ioe) {
          }
          for (int i = 0; i < 100 && connecting; i++) {
            try {
              Thread.sleep (50L);
            } catch (InterruptedException ie) {
            }
          }
          if (connecting) {
            System.out.println ("named pipe " + path + " unblocking timeout");
          }
        }
      }
      //FileInputStream.read()のブロックが解除されるまで待つ
      //  FileInputStream.read()は送信側が開いたまま送信しないとブロックする。送信側が送信するか閉じるまでブロックを解除できない
      if (reading) {
        for (int i = 0; i < 100 && reading; i++) {
          try {
            Thread.sleep (50L);
          } catch (InterruptedException ie) {
          }
        }
        if (reading) {
          System.out.println ("user operation required to unblock named pipe " + path);
          while (reading) {
            try {
              Thread.sleep (1000L);
            } catch (InterruptedException ie) {
            }
          }
        }
      }
    }  //osUnblock

    //osClose ()
    //  閉じる
    @Override
    protected void osClose () {
      //開いていたら閉じる
      if (stream != null) {
        try {
          stream.close ();
        } catch (IOException ioe) {
        }
        stream = null;
      }
    }  //osClose

    //osEnd ()
    //  終了
    @Override
    protected void osEnd () {
      file.delete ();
    }  //osEnd

  }  //class Gen

  //class Win
  //  Windows用
  private static class Win extends NamedPipeInputStream {

    //エラーコード
    //  https://learn.microsoft.com/en-us/windows/win32/debug/system-error-codes
    private static final int ERROR_BROKEN_PIPE = 109;  //パイプが終了した。誰かが開いて閉じた
    private static final int ERROR_BAD_PIPE = 230;  //パイプの状態が無効。誰も開いていない
    private static final int ERROR_NO_DATA = 232;  //既に切断された
    private static final int ERROR_PIPE_CONNECTED = 535;  //既に接続している
    private static final int ERROR_PIPE_LISTENING = 536;  //誰も開いていない
    private static final int ERROR_OPERATION_ABORTED = 995;  //スレッドが終了したか操作が取り消された
    private static final int ERROR_NOT_FOUND = 1168;  //取り消す操作がない
    private static final long INVALID_HANDLE_VALUE = -1L;

    //リンカ
    private Linker linker;
    private MethodHandle downcallHandle (MemorySegment address, FunctionDescriptor function) {
      return linker.downcallHandle (address, function);
    }

    //アリーナ
    private Arena arena;

    //メソッド
    private MethodHandle CancelIoEx;
    private MethodHandle CloseHandle;
    private MethodHandle ConnectNamedPipe;
    private static final int PIPE_ACCESS_INBOUND = 0x00000001;
    private static final int PIPE_TYPE_BYTE = 0x00000000;
    private static final int PIPE_WAIT = 0x00000000;
    private static final int PIPE_UNLIMITED_INSTANCES = 255;
    private static final int BUFFER_SIZE = 8192;
    private MethodHandle CreateNamedPipeA;
    private MethodHandle GetLastError;
    private MethodHandle ReadFile;

    private MemorySegment handle;  //HANDLE。null=まだ開いていないまたはすでに閉じた

    //npis = new Win (name)
    //  コンストラクタ
    protected Win (String name) throws IOException {
      super (name);
    }  //Win

    //osStart (name)
    //  開始
    @Override
    protected void osStart (String name) throws IOException {
      //リンカ
      linker = Linker.nativeLinker ();
      //アリーナ
      arena = Arena.ofAuto ();
      //ライブラリ
      SymbolLookup kernel32 = SymbolLookup.libraryLookup ("kernel32", arena);
      //メソッド
      try {
        //CancelIoEx関数
        //  https://learn.microsoft.com/ja-jp/windows/win32/fileio/cancelioex-func
        CancelIoEx = downcallHandle (
          kernel32.findOrThrow ("CancelIoEx"),
          FunctionDescriptor.of (
            ValueLayout.JAVA_INT,  //BOOL
            ValueLayout.ADDRESS,  //HANDLE hFile
            ValueLayout.ADDRESS));  //LPOVERLAPPED lpOverlapped
        //CloseHandle関数
        //  https://learn.microsoft.com/ja-jp/windows/win32/api/handleapi/nf-handleapi-closehandle
        CloseHandle = downcallHandle (
          kernel32.findOrThrow ("CloseHandle"),
          FunctionDescriptor.of (
            ValueLayout.JAVA_INT,  //BOOL
            ValueLayout.ADDRESS));  //HANDLE hObject
        //ConnectNamedPipe関数
        //  https://learn.microsoft.com/ja-jp/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
        ConnectNamedPipe = downcallHandle (
          kernel32.findOrThrow ("ConnectNamedPipe"),
          FunctionDescriptor.of (
            ValueLayout.JAVA_INT,  //BOOL
            ValueLayout.ADDRESS,  //HANDLE hNamedPipe
            ValueLayout.ADDRESS));  //LPOVERLAPPED lpOverlapped
        //CreateNamedPipeA関数
        //  https://learn.microsoft.com/ja-jp/windows/win32/api/winbase/nf-winbase-createnamedpipea
        CreateNamedPipeA = downcallHandle (
          kernel32.findOrThrow ("CreateNamedPipeA"),
          FunctionDescriptor.of (
            ValueLayout.ADDRESS,  //HANDLE
            ValueLayout.ADDRESS,  //LPCSTR lpName
            ValueLayout.JAVA_INT,  //DWORD dwOpenMode
            ValueLayout.JAVA_INT,  //DWORD dwPipeMode
            ValueLayout.JAVA_INT,  //DWORD nMaxInstances
            ValueLayout.JAVA_INT,  //DWORD nOutBufferSize
            ValueLayout.JAVA_INT,  //DWORD nInBufferSize
            ValueLayout.JAVA_INT,  //DWORD nDefaultTimeOut
            ValueLayout.ADDRESS));  //LPSECURITY_ATTRIBUTES lpSecurityAttributes
        //GetLastError関数
        //  https://learn.microsoft.com/ja-jp/windows/win32/api/errhandlingapi/nf-errhandlingapi-getlasterror
        GetLastError = downcallHandle (
          kernel32.findOrThrow ("GetLastError"),
          FunctionDescriptor.of (
            ValueLayout.JAVA_INT));  //DWORD
        //ReadFile関数
        //  https://learn.microsoft.com/ja-jp/windows/win32/api/fileapi/nf-fileapi-readfilef
        ReadFile = downcallHandle (
          kernel32.findOrThrow ("ReadFile"),
          FunctionDescriptor.of (
            ValueLayout.JAVA_INT,  //BOOL
            ValueLayout.ADDRESS,  //HANDLE hFile
            ValueLayout.ADDRESS,  //LPVOID lpBuffer
            ValueLayout.JAVA_INT,  //DWORD nNumberOfBytesToRead
            ValueLayout.ADDRESS,  //LPDWORD lpNumberOfBytesRead
            ValueLayout.ADDRESS));  //LPOVERLAPPED lpOverlapped
      } catch (NoSuchElementException nsee) {
        nsee.printStackTrace ();
      }
      //パス
      path = "\\\\.\\pipe\\" + name;
    }  //osStart

    //osOpenAndConnect ()
    //  開いて接続する。送信側が開くまでブロックする
    @Override
    protected void osOpenAndConnect () throws IOException {
      //閉じていたら開く
      if (handle == null) {
        try {
          int error;
          //  https://learn.microsoft.com/ja-jp/windows/win32/api/winbase/nf-winbase-createnamedpipea
          if ((handle = (MemorySegment) CreateNamedPipeA.invoke (
            arena.allocateFrom (path),  //LPCSTR lpName
            PIPE_ACCESS_INBOUND,  //DWORD dwOpenMode
            PIPE_TYPE_BYTE | PIPE_WAIT,  //DWORD dwPipeMode
            PIPE_UNLIMITED_INSTANCES,  //DWORD nMaxInstances
            0,  //DWORD nOutBufferSize
            BUFFER_SIZE,  //DWORD nInBufferSize
            0,  //DWORD nDefaultTimeOut
            MemorySegment.NULL  //LPSECURITY_ATTRIBUTES lpSecurityAttributes
            )).address () == INVALID_HANDLE_VALUE &&
              (error = (int) GetLastError.invoke ()) != -1) {
            handle = null;
            //開けなかったら失敗
            throw new IOException ("CreateNamedPipeA returned error code " + error);
          }
        } catch (IOException ioe) {
          throw ioe;
        } catch (Throwable e) {
          e.printStackTrace ();
          throw new IOException ("CreateNamedPipeA invocation failed");
        }
      }
      //接続を待つ。送信側が開くまでブロックする
      try {
        int error;
        //  https://learn.microsoft.com/ja-jp/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
        if ((int) ConnectNamedPipe.invoke (
          handle,  //HANDLE hNamedPipe
          MemorySegment.NULL) == 0 &&  //LPOVERLAPPED lpOverlapped
            (error = (int) GetLastError.invoke ()) != -1) {
          if (error == 0 ||  //誰かがエラーコードを上書きした
              error == ERROR_NO_DATA ||  //232 既に切断された
              error == ERROR_PIPE_CONNECTED) {  //535 既に接続している
          } else if (error == ERROR_OPERATION_ABORTED) {  //995 スレッドが終了したか操作が取り消された
            throw new InterruptedIOException ("ConnectNamedPipe aborted");
          } else {
            throw new IOException ("ConnectNamedPipe returned error code " + error);
          }
        }
      } catch (IOException ioe) {
        throw ioe;
      } catch (Throwable e) {
        e.printStackTrace ();
        throw new IOException ("ConnectNamedPipe invocation failed");
      }
    }  //osOpenAndConnect

    //k = osRead (b)
    //  受信する。少なくとも1バイト受信するまでブロックする
    @Override
    protected int osRead (byte[] b) throws IOException {
      int n = b.length;
      MemorySegment buf = arena.allocate ((long) n);  //MemorySegment.ofArray(b)はasSlice(o+k)できない
      int k = 0;
      MemorySegment t = arena.allocate (ValueLayout.JAVA_INT);
      t.set (ValueLayout.JAVA_INT, 0L, 0);
      try {
        while (k < 1) {
          t.set (ValueLayout.JAVA_INT, 0L, 0);
          int error;
          //  https://learn.microsoft.com/ja-jp/windows/win32/api/fileapi/nf-fileapi-readfile
          if ((int) ReadFile.invoke (
            handle,  //HANDLE hFile
            buf.asSlice ((long) k),  //LPVOID lpBuffer
            n - k,  //DWORD nNumberOfBytesToRead
            t,  //LPDWORD lpNumberOfBytesRead
            MemorySegment.NULL) == 0 &&  //LPOVERLAPPED lpOverlapped
              (error = (int) GetLastError.invoke ()) != -1) {
            if (closed) {
              throw new IOException ("named pipe " + path + " closed");
            }
            if (error == 0 ||  //誰かがエラーコードを上書きした
                error == ERROR_BROKEN_PIPE ||  //109 パイプが終了した。誰かが開いて閉じた
                error == ERROR_PIPE_LISTENING) {  //536 誰も開いていない
              //0のとき閉じたのか取り消されたのかわからないが、後者はclosedなのでここには来ない
              return -1;  //EOF
            } else if (error == ERROR_OPERATION_ABORTED) {  //995 スレッドが終了したか操作が取り消された
              throw new InterruptedIOException ("ReadFile aborted");
            } else {
              throw new IOException ("ReadFile returned error code " + error);
            }
          }
          if (closed) {
            throw new IOException ("named pipe " + path + " closed");
          }
          k += t.get (ValueLayout.JAVA_INT, 0L);
        }
      } catch (IOException ioe) {
        throw ioe;
      } catch (Throwable e) {
        e.printStackTrace ();
        throw new IOException ("ReadFile invocation failed");
      }
      System.arraycopy (buf.toArray (ValueLayout.JAVA_BYTE), 0, b, 0, k);
      return k;
    }  //osRead

    //osUnblock ()
    //  ブロックを解除する
    @Override
    protected void osUnblock () {
      //ConnectNamedPipeまたはReadFileのブロックを解除する
      //  CancelIoExで取り消された操作はERROR_OPERATION_ABORTEDになる
      //  MethodHandle.invoke()の問題で、取り消された操作がERROR_OPERATION_ABORTEDでなく0を受け取る可能性がある
      if (connecting || reading) {
        try {
          Thread.sleep (50L);
        } catch (InterruptedException ie) {
        }
        if (connecting) {
          try {
            int error;
            //  https://learn.microsoft.com/ja-jp/windows/win32/fileio/cancelioex-func
            if ((int) CancelIoEx.invoke (
              handle,  //HANDLE hFile
              MemorySegment.NULL) == 0 &&  //LPOVERLAPPED lpOverlapped
                (error = (int) GetLastError.invoke ()) != -1) {
              //if (error == ERROR_NOT_FOUND) {  //1168 取り消す操作がない
              //}
            }
          } catch (Throwable e) {
            //e.printStackTrace ();
            //throw new IOException ("CancelIoEx invocation failed");
          }
          for (int i = 0; i < 100 && (connecting || reading); i++) {
            try {
              Thread.sleep (50L);
            } catch (InterruptedException ie) {
            }
          }
          if (connecting || reading) {
            System.out.println ("named pipe " + path + " unblocking timeout");
          }
        }
      }
    }  //osUnblock

    //osClose ()
    //  閉じる
    @Override
    protected void osClose () {
      //開いていたら閉じる
      if (handle != null) {
        try {
          int error;
          //  https://learn.microsoft.com/ja-jp/windows/win32/api/handleapi/nf-handleapi-closehandle
          if ((int) CloseHandle.invoke (
            handle) == 0 &&  //HANDLE hObject
              (error = (int) GetLastError.invoke ()) != -1) {
            //throw new IOException ("CloseHandle returned error code " + error);
          }
          //} catch (IOException ioe) {
          //  throw ioe;
        } catch (Throwable e) {
          e.printStackTrace ();
          //throw new IOException ("CloseHandle invocation failed");
        }
        handle = null;
      }
    }  //osClose

    //osEnd ()
    //  終了
    @Override
    protected void osEnd () {
    }  //osEnd

  }  //class Win

}  //class NamedPipeInputStream