xeij/ByteQueue.java
//========================================================================================
//  ByteQueue.java
//    en:Byte queue
//    ja:バイトキュー
//  Copyright (C) 2003-2026 Makoto Kamada
//
//  This file is part of the XEiJ (X68000 Emulator in Java).
//  You can use, modify and redistribute the XEiJ if the conditions are met.
//  Read the XEiJ License for more details.
//  https://stdkmd.net/xeij/
//========================================================================================

package xeij;

import java.util.*;  //Arrays

//class ByteQueue
//  バイトキュー。要素がbyteのFIFOの実装
//  ブロック(小さいキュー)を連結することで容量を可変にしている。上限は約2GB
//  書き込むスレッドと読み出すスレッドはそれぞれ1つであること
public final class ByteQueue {

  private static final boolean DEBUG = false;
  private static final boolean TEST = false;


  //class Block
  //  ブロック
  private static final class Block {

    private volatile Block newer = null;  //これより新しいブロック。null=最も新しいブロック
    private volatile Block older = null;  //これより古いブロック。null=最も古いブロック

    private static final int size = 65536;  //ブロックの容量。2の累乗
    private final byte[] buff = new byte[size];  //配列
    private volatile int wcnt = 0;  //これまでに書き込んだ長さの下位32bit
    private volatile int rcnt = 0;  //これまでに読み出した長さの下位32bit
    //  buff[wcnt&(size-1)]   次に書き込む位置
    //  buff[rcnt&(size-1)]   次に読み出す位置
    //  wcnt-rcnt             読み出せる長さ
    //  size-(wcnt-rcnt)      書き込める長さ
    //  (wcnt-rcnt)==0        空
    //  (wcnt-rcnt)==size     満杯
    //  0<=(wcnt-rcnt)<=size

    //length = clear ()
    //  空にする。読み飛ばせるだけ読み飛ばす
    //結果
    //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
    private final int clear () {
      int length = wcnt - rcnt;  //読み飛ばす長さ
      if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
        rcnt += length;  //これまでに読み出した長さを更新する
      }
      return length;  //読み飛ばせた長さを返す
    }  //clear

    //data = read ()
    //  1バイト読み出す
    //結果
    //  data  読み出したデータ。0~255。-1=空で読み出せなかった
    private final int read () {
      return ((wcnt - rcnt) == 0 ? -1 : //空のとき-1を返す
              buff[(rcnt++) & (size - 1)] & 255);  //1バイト読み出して0~255の範囲で返す
    }  //read

    //length = read (array, offset, length)
    //  配列へ読み出す
    //引数
    //  array   配列
    //  offset  配列の位置
    //  length  読み出す長さ
    //結果
    //  length  読み出せた長さ。0=空で読み出せなかった
    private final int read (byte[] array, int offset, int length) {
      if (length < 0) {
        throw new IllegalArgumentException (String.format ("length=%d", length));
      }
      length = Math.min (length, wcnt - rcnt);  //読み出せる長さに抑える
      if (0 < length) {  //1バイト以上読み出せて1バイト以上読み出すとき
        int index = rcnt & (size - 1);  //キューの位置
        int first = Math.min (length, size - index);  //1回目はキューの末尾まで
        System.arraycopy (buff, index,  //キューから
                          array, offset,  //配列へ
                          first);  //コピーする
        if (first < length) {  //2回目があるとき
          System.arraycopy (buff, 0,  //キューから
                            array, offset + first,  //配列へ
                            length - first);  //コピーする
        }
        rcnt += length;  //これまでに読み出した長さを更新する
      }
      return length;  //読み出せた長さを返す
    }  //read

    //length = skip (length)
    //  読み飛ばす
    //引数
    //  length  読み飛ばす長さ
    //結果
    //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
    private final int skip (int length) {
      if (length < 0) {
        throw new IllegalArgumentException (String.format ("length=%d", length));
      }
      length = Math.min (length, wcnt - rcnt);  //読み飛ばせる長さに抑える
      if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
        rcnt += length;  //これまでに読み出した長さを更新する
      }
      return length;  //読み飛ばせた長さを返す
    }  //skip

    //length = unused ()
    //  書き込める長さ
    //結果
    //  length  書き込める長さ。0=満杯で書き込めない
    private final int unused () {
      return size - (wcnt - rcnt);  //書き込める長さ
    }  //unused

    //length = used ()
    //  読み出せる長さ
    //結果
    //  length  読み出せる長さ。0=空で読み出せない
    private final int used () {
      return wcnt - rcnt;  //読み出せる長さ
    }  //used

    //length = write (data)
    //  1バイト書き込む
    //引数
    //  data    書き込むデータ。下位8bitのみ有効
    //結果
    //  length  書き込めた長さ。0=満杯で書き込めなかった。1=書き込めた
    private final int write (int data) {
      if ((wcnt - rcnt) == size) {  //書き込めないとき
        return 0;  //0を返す
      }
      buff[(wcnt++) & (size - 1)] = (byte) data;  //1バイト書き込む
      return 1;  //1を返す
    }  //write

    //length = write (array, offset, length)
    //  配列から書き込む
    //引数
    //  array   配列
    //  offset  配列の位置
    //  length  書き込む長さ
    //結果
    //  length  書き込めた長さ。0=満杯で書き込めなかった
    private final int write (byte[] array, int offset, int length) {
      if (length < 0) {
        throw new IllegalArgumentException (String.format ("length=%d", length));
      }
      length = Math.min (length, size - (wcnt - rcnt));  //書き込める長さに抑える
      if (0 < length) {  //1バイト以上書き込めて1バイト以上書き込むとき
        int index = wcnt & (size - 1);  //キューの位置
        int first = Math.min (length, size - index);  //1回目はキューの末尾まで
        System.arraycopy (array, offset,  //配列から
                          buff, index,  //キューへ
                          first);  //コピーする
        if (first < length) {  //2回目があるとき
          System.arraycopy (array, offset + first,  //配列から
                            buff, 0,  //キューへ
                            length - first);  //コピーする
        }
        wcnt += length;  //これまでに書き込んだ長さを更新する
      }
      return length;  //書き込めた長さを返す
    }  //write

  }  //class Block


  private volatile Block newest = new Block ();  //最も新しいブロック。newest.newer==null
  private volatile Block oldest = newest;  //最も古いブロック。oldest.older==null

  private static final int size = 0x7fffffff;  //キューの容量。ブロックの容量の倍数である必要はない
  private volatile int wcnt = 0;  //これまでに書き込んだ長さの下位32bit
  private volatile int rcnt = 0;  //これまでに読み出した長さの下位32bit
  //  wcnt-rcnt             読み出せる長さ
  //  size-(wcnt-rcnt)      書き込める長さ
  //  (wcnt-rcnt)==0        空
  //  (wcnt-rcnt)==size     満杯
  //  0<=(wcnt-rcnt)<=size

  private final Object lock = new Object ();  //ブロックのためのオブジェクト
  private volatile boolean canceled = false;  //true=キャンセルされた

  //cancel ()
  //  waitAndReadのブロックを解除する
  //  以後キューは動作しない
  public final void cancel () {
    if (canceled) {  //キャンセルされた
      return;
    }
    if (DEBUG) {
      System.out.println ("cancel start");
    }
    canceled = true;  //キャンセルする
    synchronized (lock) {
      lock.notify ();  //waitAndReadのブロックを解除する
    }
    if (DEBUG) {
      System.out.println ("cancel end");
    }
  }  //cancel

  //length = clear ()
  //  空にする。読み飛ばせるだけ読み飛ばす
  //結果
  //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
  public final int clear () {
    if (canceled) {  //キャンセルされた
      return 0;
    }
    if (DEBUG) {
      System.out.println ("clear start");
    }
    int length = wcnt - rcnt;  //読み飛ばせる長さ
    if (0 < length) {  //1バイト以上読み飛ばせるとき
      int l = length;  //残りの長さ
      int k = oldest.skip (l);  //最も古いブロックから読み飛ばす
      l -= k;
      while (0 < l) {
        //最も古いブロックを削除する
        //  oldest     newer
        //      <-X-older
        //      newer-X->
        //             oldest
        Block newer = oldest.newer;
        newer.older = null;
        oldest.newer = null;
        oldest = newer;
        //
        k = oldest.skip (l);  //最も古いブロックから読み飛ばす
        l -= k;
      }
      rcnt += length;  //これまでに読み出した長さを更新する
    }  //if 0<length
    if (DEBUG) {
      System.out.println ("clear end " + length);
    }
    return length;  //読み飛ばせた長さ
  }  //clear

  //data = read ()
  //  1バイト読み出す
  //結果
  //  data  読み出したデータ。0~255。-1=空で読み出せなかった
  public final int read () {
    if (canceled) {  //キャンセルされた
      return -1;
    }
    if (DEBUG) {
      System.out.println ("read start");
    }
    int data = -1;
    if (0 < (wcnt - rcnt)) {  //1バイト以上読み出せるとき
      data = oldest.read ();  //最も古いブロックから読み出す
      if (data < 0) {  //最も古いブロックが空で読み出せなかったとき
        //最も古いブロックを削除する
        //  oldest     newer
        //      <-X-older
        //      newer-X->
        //             oldest
        Block newer = oldest.newer;
        newer.older = null;
        oldest.newer = null;
        oldest = newer;
        //
        data = oldest.read ();  //最も古いブロックから読み出す
      }
      rcnt++;
    }  //if 0<(wcnt-rcnt)
    if (DEBUG) {
      System.out.println ("read end " + data);
    }
    return data;  //読み出したデータ
  }  //read

  //length = read (array, offset, length)
  //  配列へ読み出す
  //引数
  //  array   配列
  //  offset  配列の位置
  //  length  読み出す長さ
  //結果
  //  length  読み出せた長さ。0=空で読み出せなかった
  public final int read (byte[] array, int offset, int length) {
    if (canceled) {  //キャンセルされた
      return 0;
    }
    if (DEBUG) {
      System.out.println ("read start array," + offset + "," + length);
    }
    if (length < 0) {
      throw new IllegalArgumentException (String.format ("length=%d", length));
    }
    length = Math.min (length, wcnt - rcnt);  //読み出せる長さに抑える
    if (0 < length) {  //1バイト以上読み出せて1バイト以上読み出すとき
      int o = offset;  //次の位置
      int l = length;  //残りの長さ
      int k = oldest.read (array, o, l);  //最も古いブロックから読み出す
      o += k;
      l -= k;
      while (0 < l) {
        //最も古いブロックを削除する
        //  oldest     newer
        //      <-X-older
        //      newer-X->
        //             oldest
        Block newer = oldest.newer;
        newer.older = null;
        oldest.newer = null;
        oldest = newer;
        //
        k = oldest.read (array, o, l);  //最も古いブロックから読み出す
        o += k;
        l -= k;
      }
      rcnt += length;  //これまでに読み出した長さを更新する
    }
    if (DEBUG) {
      System.out.println ("read end " + length);
    }
    return length;  //読み出せた長さ
  }  //read

  //length = skip (length)
  //  読み飛ばす
  //引数
  //  length  読み飛ばす長さ
  //結果
  //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
  public final int skip (int length) {
    if (canceled) {  //キャンセルされた
      return 0;
    }
    if (DEBUG) {
      System.out.println ("skip start " + length);
    }
    if (length < 0) {
      throw new IllegalArgumentException (String.format ("length=%d", length));
    }
    length = Math.min (length, wcnt - rcnt);  //読み飛ばせる長さに抑える
    if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
      int l = length;  //残りの長さ
      int k = oldest.skip (l);  //最も古いブロックから読み飛ばす
      l -= k;
      while (0 < l) {
        //最も古いブロックを削除する
        //  oldest     newer
        //      <-X-older
        //      newer-X->
        //             oldest
        Block newer = oldest.newer;
        newer.older = null;
        oldest.newer = null;
        oldest = newer;
        //
        k = oldest.skip (l);  //最も古いブロックから読み飛ばす
        l -= k;
      }
      rcnt += length;  //これまでに読み出した長さを更新する
    }
    if (DEBUG) {
      System.out.println ("skip end " + length);
    }
    return length;  //読み飛ばせた長さ
  }  //skip

  //length = unused ()
  //  書き込める長さ
  //結果
  //  length  書き込める長さ。0=満杯で書き込めない
  public final int unused () {
    if (canceled) {  //キャンセルされた
      return size;
    }
    return size - (wcnt - rcnt);  //書き込める長さ
  }  //unused

  //length = used ()
  //  読み出せる長さ
  //結果
  //  length  読み出せる長さ。0=空で読み出せない
  public final int used () {
    if (canceled) {  //キャンセルされた
      return 0;
    }
    return wcnt - rcnt;  //読み出せる長さ
  }  //used

  //data = waitAndRead ()
  //  1バイト読み出す。1バイト読み出せるまでブロックする
  //結果
  //  data  読み出したデータ。0~255。-1=キャンセルされた
  public final int waitAndRead () {
    if (canceled) {  //キャンセルされた
      return -1;
    }
    if (DEBUG) {
      System.out.println ("waitAndRead start");
    }
    int data = read ();  //1バイト読み出す
    if (data < 0) {  //読み出せない
      try {
        synchronized (lock) {
          do {
            lock.wait ();  //ブロックする
            if (canceled) {  //キャンセルされた
              break;
            }
            data = read ();  //1バイト読み出す
          } while (data < 0);  //読み出せない
        }
      } catch (InterruptedException ie) {
      }
    }
    if (DEBUG) {
      System.out.println ("waitAndRead end " + data);
    }
    return data;  //データを返す
  }  //waitAndRead

  //length = waitAndRead (array, offset, length)
  //  配列へ読み出す。1バイト以上読み出せるまでブロックする
  //引数
  //  array   配列
  //  offset  配列の位置
  //  length  読み出す長さ
  //結果
  //  length  読み出せた長さ。-1=キャンセルされた
  public final int waitAndRead (byte[] array, int offset, int length) {
    if (canceled) {  //キャンセルされた
      return -1;
    }
    if (DEBUG) {
      System.out.println ("waitAndRead start array," + offset + "," + length);
    }
    if (length < 0) {
      throw new IllegalArgumentException (String.format ("length=%d", length));
    }
    if (0 < length) {  //1バイト以上読み出すとき
      int data = waitAndRead ();  //1バイト読み出す。1バイト読み出せるまでブロックする
      if (data < 0) {  //キャンセルされたとき
        length = -1;  //-1を返す
      } else {  //読み出せたとき
        array[offset] = (byte) data;  //1バイト目
        length = 1 + read (array, offset + 1, length - 1);  //2バイト目以降
      }
    }
    if (DEBUG) {
      System.out.println ("waitAndRead end " + length);
    }
    return length;
  }  //waitAndRead

  //length = write (data)
  //  1バイト書き込む
  //引数
  //  data    書き込むデータ。下位8bitのみ有効
  //結果
  //  length  書き込めた長さ。0=満杯で書き込めなかった。1=書き込めた
  public final int write (int data) {
    if (canceled) {  //キャンセルされた
      return 0;
    }
    if (DEBUG) {
      System.out.println ("write start " + data);
    }
    int length = 0;
    if ((wcnt - rcnt) < size) {  //1バイト以上書き込めるとき
      length = newest.write (data);  //最も新しいブロックに書き込む
      if (length == 0) {  //最も新しいブロックが満杯で書き込めなかったとき
        //最も新しいブロックを追加する
        //  newest     newer
        //      <---older
        //      newer--->
        //             newest
        Block newer = new Block ();
        newer.older = newest;
        newest.newer = newer;
        newest = newer;
        //
        length = newest.write (data);  //最も新しいブロックに書き込む
      }
      wcnt += length;  //これまでに書き込んだ長さを更新する
      synchronized (lock) {
        lock.notify ();  //waitAndReadのブロックを解除する
      }
    }
    if (DEBUG) {
      System.out.println ("write end " + length);
    }
    return length;
  }  //write

  //length = write (array, offset, length)
  //  配列から書き込む
  //引数
  //  array   配列
  //  offset  配列の位置
  //  length  書き込む長さ
  //結果
  //  length  書き込めた長さ。0=満杯で書き込めなかった
  public final int write (byte[] array, int offset, int length) {
    if (canceled) {  //キャンセルされた
      return 0;
    }
    if (DEBUG) {
      System.out.println ("write start array," + offset + "," + length);
    }
    length = Math.min (length, size - (wcnt - rcnt));  //書き込める長さに抑える
    if (0 < length) {  //1バイト以上書き込めて1バイト以上書き込むとき
      int o = offset;  //次の位置
      int l = length;  //残りの長さ
      int k = newest.write (array, o, l);  //最も新しいブロックに書き込む
      o += k;
      l -= k;
      while (0 < l) {
        //最も新しいブロックを追加する
        //  newest     newer
        //      <---older
        //      newer--->
        //             newest
        Block newer = new Block ();
        newer.older = newest;
        newest.newer = newer;
        newest = newer;
        //
        k = newest.write (array, o, l);  //最も新しいブロックに書き込む
        o += k;
        l -= k;
      }
      wcnt += length;  //これまでに書き込んだ長さを更新する
      synchronized (lock) {
        lock.notify ();  //waitAndReadのブロックを解除する
      }
    }
    if (DEBUG) {
      System.out.println ("write end " + length);
    }
    return length;  //書き込めた長さ
  }  //write


  //動作確認
  static {
    if (TEST) {
      System.out.println ("ByteQueue test");
      long start = System.currentTimeMillis ();
      final ByteQueue queue = new ByteQueue ();
      final int unit = Math.max (1, Block.size >> 6);
      Thread writer = new Thread () {
        @Override public void run () {
          byte[] a = new byte[unit * 243];
          int x = 0;
          for (int n = 0; n < 243; n++) {
            int l = unit * (x + 1);  //xの並びの長さ
            if ((n & 1) == 0) {  //nが偶数のとき
              for (int i = 0; i < l; i++) {
                queue.write (x);  //1バイトずつ書き込む
              }
            } else {  //nが奇数のとき
              Arrays.fill (a, 0, l, (byte) x);
              queue.write (a, 0, l);  //まとめて書き込む
            }
            x = (x * 7 + 1) % 243;  //次のx
          }
        }
      };
      Thread reader = new Thread () {
        @Override public void run () {
          int session = 0;  //waitAndReadを呼び出した回数
          int error = 0;  //一致しなかったデータの数
          int t = unit * (243 * (243 + 1) / 2);  //全体の長さ
          byte[] a = new byte[t];
          int x = 0;
          int l = unit * (x + 1);  //xの並びの長さ
          for (int n = 0; n < 243; ) {
            session++;
            int k = queue.waitAndRead (a, 0, t);  //読めるだけ読む
            t -= k;
            int i = 0;  //aのインデックス
            while (k != 0) {
              int m = Math.min (k, l);  //xの並びの読めた部分の長さ
              k -= m;
              for (int j = 0; j < m; j++) {
                int data = a[i++] & 255;
                if (data != x) {
                  error++;
                }
              }
              l -= m;
              if (l == 0) {  //xの並びが終わった
                x = (x * 7 + 1) % 243;  //次のx
                l = unit * (x + 1);  //xの並びの長さ
                n++;
              }
            }
          }
          System.out.println (session + " sessions");
          System.out.println (error + " errors");
        }
      };
      reader.start ();
      writer.start ();
      try {
        writer.join ();
      } catch (InterruptedException ie) {
      }
      try {
        reader.join ();
      } catch (InterruptedException ie) {
      }
      long end = System.currentTimeMillis ();
      System.out.println ((end - start) + " ms");
    }  //if
  }  //static


}  //class ByteQueue