ByteQueue.java
     1: //========================================================================================
     2: //  ByteQueue.java
     3: //    en:Byte queue
     4: //    ja:バイトキュー
     5: //  Copyright (C) 2003-2025 Makoto Kamada
     6: //
     7: //  This file is part of the XEiJ (X68000 Emulator in Java).
     8: //  You can use, modify and redistribute the XEiJ if the conditions are met.
     9: //  Read the XEiJ License for more details.
    10: //  https://stdkmd.net/xeij/
    11: //========================================================================================
    12: 
    13: package xeij;
    14: 
    15: import java.util.*;  //Arrays
    16: 
    17: //class ByteQueue
    18: //  バイトキュー。要素がbyteのFIFOの実装
    19: //  ブロック(小さいキュー)を連結することで容量を可変にしている。上限は約2GB
    20: //  書き込むスレッドと読み出すスレッドはそれぞれ1つであること
    21: public final class ByteQueue {
    22: 
    23:   private static final boolean DEBUG = false;
    24:   private static final boolean TEST = false;
    25: 
    26: 
    27:   //class Block
    28:   //  ブロック
    29:   private static final class Block {
    30: 
    31:     private volatile Block newer = null;  //これより新しいブロック。null=最も新しいブロック
    32:     private volatile Block older = null;  //これより古いブロック。null=最も古いブロック
    33: 
    34:     private static final int size = 65536;  //ブロックの容量。2の累乗
    35:     private final byte[] buff = new byte[size];  //配列
    36:     private volatile int wcnt = 0;  //これまでに書き込んだ長さの下位32bit
    37:     private volatile int rcnt = 0;  //これまでに読み出した長さの下位32bit
    38:     //  buff[wcnt&(size-1)]   次に書き込む位置
    39:     //  buff[rcnt&(size-1)]   次に読み出す位置
    40:     //  wcnt-rcnt             読み出せる長さ
    41:     //  size-(wcnt-rcnt)      書き込める長さ
    42:     //  (wcnt-rcnt)==0        空
    43:     //  (wcnt-rcnt)==size     満杯
    44:     //  0<=(wcnt-rcnt)<=size
    45: 
    46:     //length = clear ()
    47:     //  空にする。読み飛ばせるだけ読み飛ばす
    48:     //結果
    49:     //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
    50:     private final int clear () {
    51:       int length = wcnt - rcnt;  //読み飛ばす長さ
    52:       if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
    53:         rcnt += length;  //これまでに読み出した長さを更新する
    54:       }
    55:       return length;  //読み飛ばせた長さを返す
    56:     }  //clear
    57: 
    58:     //data = read ()
    59:     //  1バイト読み出す
    60:     //結果
    61:     //  data  読み出したデータ。0~255。-1=空で読み出せなかった
    62:     private final int read () {
    63:       return ((wcnt - rcnt) == 0 ? -1 : //空のとき-1を返す
    64:               buff[(rcnt++) & (size - 1)] & 255);  //1バイト読み出して0~255の範囲で返す
    65:     }  //read
    66: 
    67:     //length = read (array, offset, length)
    68:     //  配列へ読み出す
    69:     //引数
    70:     //  array   配列
    71:     //  offset  配列の位置
    72:     //  length  読み出す長さ
    73:     //結果
    74:     //  length  読み出せた長さ。0=空で読み出せなかった
    75:     private final int read (byte[] array, int offset, int length) {
    76:       if (length < 0) {
    77:         throw new IllegalArgumentException (String.format ("length=%d", length));
    78:       }
    79:       length = Math.min (length, wcnt - rcnt);  //読み出せる長さに抑える
    80:       if (0 < length) {  //1バイト以上読み出せて1バイト以上読み出すとき
    81:         int index = rcnt & (size - 1);  //キューの位置
    82:         int first = Math.min (length, size - index);  //1回目はキューの末尾まで
    83:         System.arraycopy (buff, index,  //キューから
    84:                           array, offset,  //配列へ
    85:                           first);  //コピーする
    86:         if (first < length) {  //2回目があるとき
    87:           System.arraycopy (buff, 0,  //キューから
    88:                             array, offset + first,  //配列へ
    89:                             length - first);  //コピーする
    90:         }
    91:         rcnt += length;  //これまでに読み出した長さを更新する
    92:       }
    93:       return length;  //読み出せた長さを返す
    94:     }  //read
    95: 
    96:     //length = skip (length)
    97:     //  読み飛ばす
    98:     //引数
    99:     //  length  読み飛ばす長さ
   100:     //結果
   101:     //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
   102:     private final int skip (int length) {
   103:       if (length < 0) {
   104:         throw new IllegalArgumentException (String.format ("length=%d", length));
   105:       }
   106:       length = Math.min (length, wcnt - rcnt);  //読み飛ばせる長さに抑える
   107:       if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
   108:         rcnt += length;  //これまでに読み出した長さを更新する
   109:       }
   110:       return length;  //読み飛ばせた長さを返す
   111:     }  //skip
   112: 
   113:     //length = unused ()
   114:     //  書き込める長さ
   115:     //結果
   116:     //  length  書き込める長さ。0=満杯で書き込めない
   117:     private final int unused () {
   118:       return size - (wcnt - rcnt);  //書き込める長さ
   119:     }  //unused
   120: 
   121:     //length = used ()
   122:     //  読み出せる長さ
   123:     //結果
   124:     //  length  読み出せる長さ。0=空で読み出せない
   125:     private final int used () {
   126:       return wcnt - rcnt;  //読み出せる長さ
   127:     }  //used
   128: 
   129:     //length = write (data)
   130:     //  1バイト書き込む
   131:     //引数
   132:     //  data    書き込むデータ。下位8bitのみ有効
   133:     //結果
   134:     //  length  書き込めた長さ。0=満杯で書き込めなかった。1=書き込めた
   135:     private final int write (int data) {
   136:       if ((wcnt - rcnt) == size) {  //書き込めないとき
   137:         return 0;  //0を返す
   138:       }
   139:       buff[(wcnt++) & (size - 1)] = (byte) data;  //1バイト書き込む
   140:       return 1;  //1を返す
   141:     }  //write
   142: 
   143:     //length = write (array, offset, length)
   144:     //  配列から書き込む
   145:     //引数
   146:     //  array   配列
   147:     //  offset  配列の位置
   148:     //  length  書き込む長さ
   149:     //結果
   150:     //  length  書き込めた長さ。0=満杯で書き込めなかった
   151:     private final int write (byte[] array, int offset, int length) {
   152:       if (length < 0) {
   153:         throw new IllegalArgumentException (String.format ("length=%d", length));
   154:       }
   155:       length = Math.min (length, size - (wcnt - rcnt));  //書き込める長さに抑える
   156:       if (0 < length) {  //1バイト以上書き込めて1バイト以上書き込むとき
   157:         int index = wcnt & (size - 1);  //キューの位置
   158:         int first = Math.min (length, size - index);  //1回目はキューの末尾まで
   159:         System.arraycopy (array, offset,  //配列から
   160:                           buff, index,  //キューへ
   161:                           first);  //コピーする
   162:         if (first < length) {  //2回目があるとき
   163:           System.arraycopy (array, offset + first,  //配列から
   164:                             buff, 0,  //キューへ
   165:                             length - first);  //コピーする
   166:         }
   167:         wcnt += length;  //これまでに書き込んだ長さを更新する
   168:       }
   169:       return length;  //書き込めた長さを返す
   170:     }  //write
   171: 
   172:   }  //class Block
   173: 
   174: 
   175:   private volatile Block newest = new Block ();  //最も新しいブロック。newest.newer==null
   176:   private volatile Block oldest = newest;  //最も古いブロック。oldest.older==null
   177: 
   178:   private static final int size = 0x7fffffff;  //キューの容量。ブロックの容量の倍数である必要はない
   179:   private volatile int wcnt = 0;  //これまでに書き込んだ長さの下位32bit
   180:   private volatile int rcnt = 0;  //これまでに読み出した長さの下位32bit
   181:   //  wcnt-rcnt             読み出せる長さ
   182:   //  size-(wcnt-rcnt)      書き込める長さ
   183:   //  (wcnt-rcnt)==0        空
   184:   //  (wcnt-rcnt)==size     満杯
   185:   //  0<=(wcnt-rcnt)<=size
   186: 
   187:   //length = clear ()
   188:   //  空にする。読み飛ばせるだけ読み飛ばす
   189:   //結果
   190:   //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
   191:   public final int clear () {
   192:     return skip (wcnt - rcnt);  //読み飛ばせるだけ読み飛ばす
   193:   }  //clear
   194: 
   195:   //data = read ()
   196:   //  1バイト読み出す
   197:   //結果
   198:   //  data  読み出したデータ。0~255。-1=空で読み出せなかった
   199:   public final int read () {
   200:     if (DEBUG) {
   201:       System.out.println ("read start");
   202:     }
   203:     int data = -1;
   204:     if (0 < (wcnt - rcnt)) {  //1バイト以上読み出せるとき
   205:       data = oldest.read ();  //最も古いブロックから読み出す
   206:       if (data < 0) {  //最も古いブロックが空で読み出せなかったとき
   207:         //最も古いブロックを削除する
   208:         //  oldest     newer
   209:         //      <-X-older
   210:         //      newer-X->
   211:         //             oldest
   212:         Block newer = oldest.newer;
   213:         newer.older = null;
   214:         oldest.newer = null;
   215:         oldest = newer;
   216:         //
   217:         data = oldest.read ();  //最も古いブロックから読み出す
   218:       }
   219:       rcnt++;
   220:     }
   221:     if (DEBUG) {
   222:       System.out.println ("read end " + data);
   223:     }
   224:     return data;  //読み出したデータ
   225:   }  //read
   226: 
   227:   //length = read (array, offset, length)
   228:   //  配列へ読み出す
   229:   //引数
   230:   //  array   配列
   231:   //  offset  配列の位置
   232:   //  length  読み出す長さ
   233:   //結果
   234:   //  length  読み出せた長さ。0=空で読み出せなかった
   235:   public final int read (byte[] array, int offset, int length) {
   236:     if (DEBUG) {
   237:       System.out.println ("read start array," + offset + "," + length);
   238:     }
   239:     if (length < 0) {
   240:       throw new IllegalArgumentException (String.format ("length=%d", length));
   241:     }
   242:     length = Math.min (length, wcnt - rcnt);  //読み出せる長さに抑える
   243:     if (0 < length) {  //1バイト以上読み出せて1バイト以上読み出すとき
   244:       int o = offset;  //次の位置
   245:       int l = length;  //残りの長さ
   246:       int k = oldest.read (array, o, l);  //最も古いブロックから読み出す
   247:       o += k;
   248:       l -= k;
   249:       while (0 < l) {
   250:         //最も古いブロックを削除する
   251:         //  oldest     newer
   252:         //      <-X-older
   253:         //      newer-X->
   254:         //             oldest
   255:         Block newer = oldest.newer;
   256:         newer.older = null;
   257:         oldest.newer = null;
   258:         oldest = newer;
   259:         //
   260:         k = oldest.read (array, o, l);  //最も古いブロックから読み出す
   261:         o += k;
   262:         l -= k;
   263:       }
   264:       rcnt += length;  //これまでに読み出した長さを更新する
   265:     }
   266:     if (DEBUG) {
   267:       System.out.println ("read end " + length);
   268:     }
   269:     return length;  //読み出せた長さ
   270:   }  //read
   271: 
   272:   //length = skip (length)
   273:   //  読み飛ばす
   274:   //引数
   275:   //  length  読み飛ばす長さ
   276:   //結果
   277:   //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
   278:   public final int skip (int length) {
   279:     if (DEBUG) {
   280:       System.out.println ("skip start " + length);
   281:     }
   282:     if (length < 0) {
   283:       throw new IllegalArgumentException (String.format ("length=%d", length));
   284:     }
   285:     length = Math.min (length, wcnt - rcnt);  //読み飛ばせる長さに抑える
   286:     if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
   287:       int l = length;  //残りの長さ
   288:       int k = oldest.skip (l);  //最も古いブロックから読み飛ばす
   289:       l -= k;
   290:       while (0 < l) {
   291:         //最も古いブロックを削除する
   292:         //  oldest     newer
   293:         //      <-X-older
   294:         //      newer-X->
   295:         //             oldest
   296:         Block newer = oldest.newer;
   297:         newer.older = null;
   298:         oldest.newer = null;
   299:         oldest = newer;
   300:         //
   301:         k = oldest.skip (l);  //最も古いブロックから読み飛ばす
   302:         l -= k;
   303:       }
   304:       rcnt += length;  //これまでに読み出した長さを更新する
   305:     }
   306:     if (DEBUG) {
   307:       System.out.println ("skip end " + length);
   308:     }
   309:     return length;  //読み飛ばせた長さ
   310:   }  //skip
   311: 
   312:   //length = unused ()
   313:   //  書き込める長さ
   314:   //結果
   315:   //  length  書き込める長さ。0=満杯で書き込めない
   316:   public final int unused () {
   317:     return size - (wcnt - rcnt);  //書き込める長さ
   318:   }  //unused
   319: 
   320:   //length = used ()
   321:   //  読み出せる長さ
   322:   //結果
   323:   //  length  読み出せる長さ。0=空で読み出せない
   324:   public final int used () {
   325:     return wcnt - rcnt;  //読み出せる長さ
   326:   }  //used
   327: 
   328:   //length = write (data)
   329:   //  1バイト書き込む
   330:   //引数
   331:   //  data    書き込むデータ。下位8bitのみ有効
   332:   //結果
   333:   //  length  書き込めた長さ。0=満杯で書き込めなかった。1=書き込めた
   334:   public final int write (int data) {
   335:     if (DEBUG) {
   336:       System.out.println ("write start " + data);
   337:     }
   338:     int length = 0;
   339:     if ((wcnt - rcnt) < size) {  //1バイト以上書き込めるとき
   340:       length = newest.write (data);  //最も新しいブロックに書き込む
   341:       if (length == 0) {  //最も新しいブロックが満杯で書き込めなかったとき
   342:         //最も新しいブロックを追加する
   343:         //  newest     newer
   344:         //      <---older
   345:         //      newer--->
   346:         //             newest
   347:         Block newer = new Block ();
   348:         newer.older = newest;
   349:         newest.newer = newer;
   350:         newest = newer;
   351:         //
   352:         length = newest.write (data);  //最も新しいブロックに書き込む
   353:       }
   354:       wcnt += length;  //これまでに書き込んだ長さを更新する
   355:     }
   356:     Thread wt = waitThread;  //待機中のスレッドが
   357:     if (wt != null) {  //あれば
   358:       wt.interrupt ();  //割り込む
   359:     }
   360:     if (DEBUG) {
   361:       System.out.println ("write end " + length);
   362:     }
   363:     return length;
   364:   }  //write
   365: 
   366:   //length = write (array, offset, length)
   367:   //  配列から書き込む
   368:   //引数
   369:   //  array   配列
   370:   //  offset  配列の位置
   371:   //  length  書き込む長さ
   372:   //結果
   373:   //  length  書き込めた長さ。0=満杯で書き込めなかった
   374:   public final int write (byte[] array, int offset, int length) {
   375:     if (DEBUG) {
   376:       System.out.println ("write start array," + offset + "," + length);
   377:     }
   378:     length = Math.min (length, size - (wcnt - rcnt));  //書き込める長さに抑える
   379:     if (0 < length) {  //1バイト以上書き込めて1バイト以上書き込むとき
   380:       int o = offset;  //次の位置
   381:       int l = length;  //残りの長さ
   382:       int k = newest.write (array, o, l);  //最も新しいブロックに書き込む
   383:       o += k;
   384:       l -= k;
   385:       while (0 < l) {
   386:         //最も新しいブロックを追加する
   387:         //  newest     newer
   388:         //      <---older
   389:         //      newer--->
   390:         //             newest
   391:         Block newer = new Block ();
   392:         newer.older = newest;
   393:         newest.newer = newer;
   394:         newest = newer;
   395:         //
   396:         k = newest.write (array, o, l);  //最も新しいブロックに書き込む
   397:         o += k;
   398:         l -= k;
   399:       }
   400:       wcnt += length;  //これまでに書き込んだ長さを更新する
   401:       Thread wt = waitThread;  //待機中のスレッドが
   402:       if (wt != null) {  //あれば
   403:         wt.interrupt ();  //割り込む
   404:       }
   405:     }
   406:     if (DEBUG) {
   407:       System.out.println ("write end " + length);
   408:     }
   409:     return length;  //書き込めた長さ
   410:   }  //write
   411: 
   412: 
   413:   private volatile Thread waitThread = null;  //待機中のスレッド。null=ないかキャンセルされた
   414: 
   415:   //cancel ()
   416:   //  waitAndReadをキャンセルする
   417:   public final void cancel () {
   418:     if (DEBUG) {
   419:       System.out.println ("cancel start");
   420:     }
   421:     Thread wt = waitThread;  //待機中のスレッドが
   422:     if (wt != null) {  //あれば
   423:       waitThread = null;  //キャンセルしてから
   424:       wt.interrupt ();  //割り込む
   425:     }
   426:     if (DEBUG) {
   427:       System.out.println ("cancel end");
   428:     }
   429:   }  //cancel
   430: 
   431:   //data = waitAndRead ()
   432:   //  1バイト読み出す。1バイト読み出せるまでブロックする
   433:   //結果
   434:   //  data  読み出したデータ。0~255。-1=キャンセルされた
   435:   public final int waitAndRead () {
   436:     if (DEBUG) {
   437:       System.out.println ("waitAndRead start");
   438:     }
   439:     int data = -1;
   440:     waitThread = Thread.currentThread ();  //待機中のスレッドは現在のスレッド
   441:     while (waitThread != null) {  //キャンセルされていなければ繰り返す
   442:       data = read ();  //1バイト読み出す
   443:       if (0 <= data ||  //読み出せたか
   444:           waitThread == null) {  //キャンセルされたら
   445:         break;  //終了する
   446:       }
   447:       try {
   448:         Thread.sleep (1000L);  //割り込みを待つ
   449:       } catch (InterruptedException ie) {
   450:       }
   451:     }
   452:     waitThread = null;  //待機中のスレッドはない
   453:     if (DEBUG) {
   454:       System.out.println ("waitAndRead end " + data);
   455:     }
   456:     return data;  //データを返す
   457:   }  //waitAndRead
   458: 
   459:   //length = waitAndRead (array, offset, length)
   460:   //  配列へ読み出す。1バイト以上読み出せるまでブロックする
   461:   //引数
   462:   //  array   配列
   463:   //  offset  配列の位置
   464:   //  length  読み出す長さ
   465:   //結果
   466:   //  length  読み出せた長さ。-1=キャンセルされた
   467:   public final int waitAndRead (byte[] array, int offset, int length) {
   468:     if (DEBUG) {
   469:       System.out.println ("waitAndRead start array," + offset + "," + length);
   470:     }
   471:     if (length < 0) {
   472:       throw new IllegalArgumentException (String.format ("length=%d", length));
   473:     }
   474:     if (0 < length) {  //1バイト以上読み出すとき
   475:       int data = waitAndRead ();  //1バイト読み出す。1バイト読み出せるまでブロックする
   476:       if (data < 0) {  //キャンセルされたとき
   477:         length = -1;  //-1を返す
   478:       } else {  //読み出せたとき
   479:         array[offset] = (byte) data;  //1バイト目
   480:         length = 1 + read (array, offset + 1, length - 1);  //2バイト目以降
   481:       }
   482:     }
   483:     if (DEBUG) {
   484:       System.out.println ("waitAndRead end " + length);
   485:     }
   486:     return length;
   487:   }  //waitAndRead
   488: 
   489: 
   490:   //動作確認
   491:   static {
   492:     if (TEST) {
   493:       System.out.println ("ByteQueue test");
   494:       long start = System.currentTimeMillis ();
   495:       final ByteQueue queue = new ByteQueue ();
   496:       final int unit = Math.max (1, Block.size >> 6);
   497:       Thread writer = new Thread () {
   498:         @Override public void run () {
   499:           byte[] a = new byte[unit * 243];
   500:           int x = 0;
   501:           for (int n = 0; n < 243; n++) {
   502:             int l = unit * (x + 1);  //xの並びの長さ
   503:             if ((n & 1) == 0) {  //nが偶数のとき
   504:               for (int i = 0; i < l; i++) {
   505:                 queue.write (x);  //1バイトずつ書き込む
   506:               }
   507:             } else {  //nが奇数のとき
   508:               Arrays.fill (a, 0, l, (byte) x);
   509:               queue.write (a, 0, l);  //まとめて書き込む
   510:             }
   511:             x = (x * 7 + 1) % 243;  //次のx
   512:           }
   513:         }
   514:       };
   515:       Thread reader = new Thread () {
   516:         @Override public void run () {
   517:           int session = 0;  //waitAndReadを呼び出した回数
   518:           int error = 0;  //一致しなかったデータの数
   519:           int t = unit * (243 * (243 + 1) / 2);  //全体の長さ
   520:           byte[] a = new byte[t];
   521:           int x = 0;
   522:           int l = unit * (x + 1);  //xの並びの長さ
   523:           for (int n = 0; n < 243; ) {
   524:             session++;
   525:             int k = queue.waitAndRead (a, 0, t);  //読めるだけ読む
   526:             t -= k;
   527:             int i = 0;  //aのインデックス
   528:             while (k != 0) {
   529:               int m = Math.min (k, l);  //xの並びの読めた部分の長さ
   530:               k -= m;
   531:               for (int j = 0; j < m; j++) {
   532:                 int data = a[i++] & 255;
   533:                 if (data != x) {
   534:                   error++;
   535:                 }
   536:               }
   537:               l -= m;
   538:               if (l == 0) {  //xの並びが終わった
   539:                 x = (x * 7 + 1) % 243;  //次のx
   540:                 l = unit * (x + 1);  //xの並びの長さ
   541:                 n++;
   542:               }
   543:             }
   544:           }
   545:           System.out.println (session + " sessions");
   546:           System.out.println (error + " errors");
   547:         }
   548:       };
   549:       reader.start ();
   550:       writer.start ();
   551:       try {
   552:         writer.join ();
   553:       } catch (InterruptedException ie) {
   554:       }
   555:       try {
   556:         reader.join ();
   557:       } catch (InterruptedException ie) {
   558:       }
   559:       long end = System.currentTimeMillis ();
   560:       System.out.println ((end - start) + " ms");
   561:     }  //if
   562:   }  //static
   563: 
   564: 
   565: }  //class ByteQueue