ByteQueue.java
     1: //========================================================================================
     2: //  ByteQueue.java
     3: //    en:Byte queue
     4: //    ja:バイトキュー
     5: //  Copyright (C) 2003-2026 Makoto Kamada
     6: //
     7: //  This file is part of the XEiJ (X68000 Emulator in Java).
     8: //  You can use, modify and redistribute the XEiJ if the conditions are met.
     9: //  Read the XEiJ License for more details.
    10: //  https://stdkmd.net/xeij/
    11: //========================================================================================
    12: 
    13: package xeij;
    14: 
    15: import java.util.*;  //Arrays
    16: 
    17: //class ByteQueue
    18: //  バイトキュー。要素がbyteのFIFOの実装
    19: //  ブロック(小さいキュー)を連結することで容量を可変にしている。上限は約2GB
    20: //  書き込むスレッドと読み出すスレッドはそれぞれ1つであること
    21: public final class ByteQueue {
    22: 
    23:   private static final boolean DEBUG = false;
    24:   private static final boolean TEST = false;
    25: 
    26: 
    27:   //class Block
    28:   //  ブロック
    29:   private static final class Block {
    30: 
    31:     private volatile Block newer = null;  //これより新しいブロック。null=最も新しいブロック
    32:     private volatile Block older = null;  //これより古いブロック。null=最も古いブロック
    33: 
    34:     private static final int size = 65536;  //ブロックの容量。2の累乗
    35:     private final byte[] buff = new byte[size];  //配列
    36:     private volatile int wcnt = 0;  //これまでに書き込んだ長さの下位32bit
    37:     private volatile int rcnt = 0;  //これまでに読み出した長さの下位32bit
    38:     //  buff[wcnt&(size-1)]   次に書き込む位置
    39:     //  buff[rcnt&(size-1)]   次に読み出す位置
    40:     //  wcnt-rcnt             読み出せる長さ
    41:     //  size-(wcnt-rcnt)      書き込める長さ
    42:     //  (wcnt-rcnt)==0        空
    43:     //  (wcnt-rcnt)==size     満杯
    44:     //  0<=(wcnt-rcnt)<=size
    45: 
    46:     //length = clear ()
    47:     //  空にする。読み飛ばせるだけ読み飛ばす
    48:     //結果
    49:     //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
    50:     private final int clear () {
    51:       int length = wcnt - rcnt;  //読み飛ばす長さ
    52:       if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
    53:         rcnt += length;  //これまでに読み出した長さを更新する
    54:       }
    55:       return length;  //読み飛ばせた長さを返す
    56:     }  //clear
    57: 
    58:     //data = read ()
    59:     //  1バイト読み出す
    60:     //結果
    61:     //  data  読み出したデータ。0~255。-1=空で読み出せなかった
    62:     private final int read () {
    63:       return ((wcnt - rcnt) == 0 ? -1 : //空のとき-1を返す
    64:               buff[(rcnt++) & (size - 1)] & 255);  //1バイト読み出して0~255の範囲で返す
    65:     }  //read
    66: 
    67:     //length = read (array, offset, length)
    68:     //  配列へ読み出す
    69:     //引数
    70:     //  array   配列
    71:     //  offset  配列の位置
    72:     //  length  読み出す長さ
    73:     //結果
    74:     //  length  読み出せた長さ。0=空で読み出せなかった
    75:     private final int read (byte[] array, int offset, int length) {
    76:       if (length < 0) {
    77:         throw new IllegalArgumentException (String.format ("length=%d", length));
    78:       }
    79:       length = Math.min (length, wcnt - rcnt);  //読み出せる長さに抑える
    80:       if (0 < length) {  //1バイト以上読み出せて1バイト以上読み出すとき
    81:         int index = rcnt & (size - 1);  //キューの位置
    82:         int first = Math.min (length, size - index);  //1回目はキューの末尾まで
    83:         System.arraycopy (buff, index,  //キューから
    84:                           array, offset,  //配列へ
    85:                           first);  //コピーする
    86:         if (first < length) {  //2回目があるとき
    87:           System.arraycopy (buff, 0,  //キューから
    88:                             array, offset + first,  //配列へ
    89:                             length - first);  //コピーする
    90:         }
    91:         rcnt += length;  //これまでに読み出した長さを更新する
    92:       }
    93:       return length;  //読み出せた長さを返す
    94:     }  //read
    95: 
    96:     //length = skip (length)
    97:     //  読み飛ばす
    98:     //引数
    99:     //  length  読み飛ばす長さ
   100:     //結果
   101:     //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
   102:     private final int skip (int length) {
   103:       if (length < 0) {
   104:         throw new IllegalArgumentException (String.format ("length=%d", length));
   105:       }
   106:       length = Math.min (length, wcnt - rcnt);  //読み飛ばせる長さに抑える
   107:       if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
   108:         rcnt += length;  //これまでに読み出した長さを更新する
   109:       }
   110:       return length;  //読み飛ばせた長さを返す
   111:     }  //skip
   112: 
   113:     //length = unused ()
   114:     //  書き込める長さ
   115:     //結果
   116:     //  length  書き込める長さ。0=満杯で書き込めない
   117:     private final int unused () {
   118:       return size - (wcnt - rcnt);  //書き込める長さ
   119:     }  //unused
   120: 
   121:     //length = used ()
   122:     //  読み出せる長さ
   123:     //結果
   124:     //  length  読み出せる長さ。0=空で読み出せない
   125:     private final int used () {
   126:       return wcnt - rcnt;  //読み出せる長さ
   127:     }  //used
   128: 
   129:     //length = write (data)
   130:     //  1バイト書き込む
   131:     //引数
   132:     //  data    書き込むデータ。下位8bitのみ有効
   133:     //結果
   134:     //  length  書き込めた長さ。0=満杯で書き込めなかった。1=書き込めた
   135:     private final int write (int data) {
   136:       if ((wcnt - rcnt) == size) {  //書き込めないとき
   137:         return 0;  //0を返す
   138:       }
   139:       buff[(wcnt++) & (size - 1)] = (byte) data;  //1バイト書き込む
   140:       return 1;  //1を返す
   141:     }  //write
   142: 
   143:     //length = write (array, offset, length)
   144:     //  配列から書き込む
   145:     //引数
   146:     //  array   配列
   147:     //  offset  配列の位置
   148:     //  length  書き込む長さ
   149:     //結果
   150:     //  length  書き込めた長さ。0=満杯で書き込めなかった
   151:     private final int write (byte[] array, int offset, int length) {
   152:       if (length < 0) {
   153:         throw new IllegalArgumentException (String.format ("length=%d", length));
   154:       }
   155:       length = Math.min (length, size - (wcnt - rcnt));  //書き込める長さに抑える
   156:       if (0 < length) {  //1バイト以上書き込めて1バイト以上書き込むとき
   157:         int index = wcnt & (size - 1);  //キューの位置
   158:         int first = Math.min (length, size - index);  //1回目はキューの末尾まで
   159:         System.arraycopy (array, offset,  //配列から
   160:                           buff, index,  //キューへ
   161:                           first);  //コピーする
   162:         if (first < length) {  //2回目があるとき
   163:           System.arraycopy (array, offset + first,  //配列から
   164:                             buff, 0,  //キューへ
   165:                             length - first);  //コピーする
   166:         }
   167:         wcnt += length;  //これまでに書き込んだ長さを更新する
   168:       }
   169:       return length;  //書き込めた長さを返す
   170:     }  //write
   171: 
   172:   }  //class Block
   173: 
   174: 
   175:   private volatile Block newest = new Block ();  //最も新しいブロック。newest.newer==null
   176:   private volatile Block oldest = newest;  //最も古いブロック。oldest.older==null
   177: 
   178:   private static final int size = 0x7fffffff;  //キューの容量。ブロックの容量の倍数である必要はない
   179:   private volatile int wcnt = 0;  //これまでに書き込んだ長さの下位32bit
   180:   private volatile int rcnt = 0;  //これまでに読み出した長さの下位32bit
   181:   //  wcnt-rcnt             読み出せる長さ
   182:   //  size-(wcnt-rcnt)      書き込める長さ
   183:   //  (wcnt-rcnt)==0        空
   184:   //  (wcnt-rcnt)==size     満杯
   185:   //  0<=(wcnt-rcnt)<=size
   186: 
   187:   private final Object lock = new Object ();  //ブロックのためのオブジェクト
   188:   private volatile boolean canceled = false;  //true=キャンセルされた
   189: 
   190:   //cancel ()
   191:   //  waitAndReadのブロックを解除する
   192:   //  以後キューは動作しない
   193:   public final void cancel () {
   194:     if (canceled) {  //キャンセルされた
   195:       return;
   196:     }
   197:     if (DEBUG) {
   198:       System.out.println ("cancel start");
   199:     }
   200:     canceled = true;  //キャンセルする
   201:     synchronized (lock) {
   202:       lock.notify ();  //waitAndReadのブロックを解除する
   203:     }
   204:     if (DEBUG) {
   205:       System.out.println ("cancel end");
   206:     }
   207:   }  //cancel
   208: 
   209:   //length = clear ()
   210:   //  空にする。読み飛ばせるだけ読み飛ばす
   211:   //結果
   212:   //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
   213:   public final int clear () {
   214:     if (canceled) {  //キャンセルされた
   215:       return 0;
   216:     }
   217:     if (DEBUG) {
   218:       System.out.println ("clear start");
   219:     }
   220:     int length = wcnt - rcnt;  //読み飛ばせる長さ
   221:     if (0 < length) {  //1バイト以上読み飛ばせるとき
   222:       int l = length;  //残りの長さ
   223:       int k = oldest.skip (l);  //最も古いブロックから読み飛ばす
   224:       l -= k;
   225:       while (0 < l) {
   226:         //最も古いブロックを削除する
   227:         //  oldest     newer
   228:         //      <-X-older
   229:         //      newer-X->
   230:         //             oldest
   231:         Block newer = oldest.newer;
   232:         newer.older = null;
   233:         oldest.newer = null;
   234:         oldest = newer;
   235:         //
   236:         k = oldest.skip (l);  //最も古いブロックから読み飛ばす
   237:         l -= k;
   238:       }
   239:       rcnt += length;  //これまでに読み出した長さを更新する
   240:     }  //if 0<length
   241:     if (DEBUG) {
   242:       System.out.println ("clear end " + length);
   243:     }
   244:     return length;  //読み飛ばせた長さ
   245:   }  //clear
   246: 
   247:   //data = read ()
   248:   //  1バイト読み出す
   249:   //結果
   250:   //  data  読み出したデータ。0~255。-1=空で読み出せなかった
   251:   public final int read () {
   252:     if (canceled) {  //キャンセルされた
   253:       return -1;
   254:     }
   255:     if (DEBUG) {
   256:       System.out.println ("read start");
   257:     }
   258:     int data = -1;
   259:     if (0 < (wcnt - rcnt)) {  //1バイト以上読み出せるとき
   260:       data = oldest.read ();  //最も古いブロックから読み出す
   261:       if (data < 0) {  //最も古いブロックが空で読み出せなかったとき
   262:         //最も古いブロックを削除する
   263:         //  oldest     newer
   264:         //      <-X-older
   265:         //      newer-X->
   266:         //             oldest
   267:         Block newer = oldest.newer;
   268:         newer.older = null;
   269:         oldest.newer = null;
   270:         oldest = newer;
   271:         //
   272:         data = oldest.read ();  //最も古いブロックから読み出す
   273:       }
   274:       rcnt++;
   275:     }  //if 0<(wcnt-rcnt)
   276:     if (DEBUG) {
   277:       System.out.println ("read end " + data);
   278:     }
   279:     return data;  //読み出したデータ
   280:   }  //read
   281: 
   282:   //length = read (array, offset, length)
   283:   //  配列へ読み出す
   284:   //引数
   285:   //  array   配列
   286:   //  offset  配列の位置
   287:   //  length  読み出す長さ
   288:   //結果
   289:   //  length  読み出せた長さ。0=空で読み出せなかった
   290:   public final int read (byte[] array, int offset, int length) {
   291:     if (canceled) {  //キャンセルされた
   292:       return 0;
   293:     }
   294:     if (DEBUG) {
   295:       System.out.println ("read start array," + offset + "," + length);
   296:     }
   297:     if (length < 0) {
   298:       throw new IllegalArgumentException (String.format ("length=%d", length));
   299:     }
   300:     length = Math.min (length, wcnt - rcnt);  //読み出せる長さに抑える
   301:     if (0 < length) {  //1バイト以上読み出せて1バイト以上読み出すとき
   302:       int o = offset;  //次の位置
   303:       int l = length;  //残りの長さ
   304:       int k = oldest.read (array, o, l);  //最も古いブロックから読み出す
   305:       o += k;
   306:       l -= k;
   307:       while (0 < l) {
   308:         //最も古いブロックを削除する
   309:         //  oldest     newer
   310:         //      <-X-older
   311:         //      newer-X->
   312:         //             oldest
   313:         Block newer = oldest.newer;
   314:         newer.older = null;
   315:         oldest.newer = null;
   316:         oldest = newer;
   317:         //
   318:         k = oldest.read (array, o, l);  //最も古いブロックから読み出す
   319:         o += k;
   320:         l -= k;
   321:       }
   322:       rcnt += length;  //これまでに読み出した長さを更新する
   323:     }
   324:     if (DEBUG) {
   325:       System.out.println ("read end " + length);
   326:     }
   327:     return length;  //読み出せた長さ
   328:   }  //read
   329: 
   330:   //length = skip (length)
   331:   //  読み飛ばす
   332:   //引数
   333:   //  length  読み飛ばす長さ
   334:   //結果
   335:   //  length  読み飛ばせた長さ。0=空で読み飛ばせなかった
   336:   public final int skip (int length) {
   337:     if (canceled) {  //キャンセルされた
   338:       return 0;
   339:     }
   340:     if (DEBUG) {
   341:       System.out.println ("skip start " + length);
   342:     }
   343:     if (length < 0) {
   344:       throw new IllegalArgumentException (String.format ("length=%d", length));
   345:     }
   346:     length = Math.min (length, wcnt - rcnt);  //読み飛ばせる長さに抑える
   347:     if (0 < length) {  //1バイト以上読み飛ばせて1バイト以上読み飛ばすとき
   348:       int l = length;  //残りの長さ
   349:       int k = oldest.skip (l);  //最も古いブロックから読み飛ばす
   350:       l -= k;
   351:       while (0 < l) {
   352:         //最も古いブロックを削除する
   353:         //  oldest     newer
   354:         //      <-X-older
   355:         //      newer-X->
   356:         //             oldest
   357:         Block newer = oldest.newer;
   358:         newer.older = null;
   359:         oldest.newer = null;
   360:         oldest = newer;
   361:         //
   362:         k = oldest.skip (l);  //最も古いブロックから読み飛ばす
   363:         l -= k;
   364:       }
   365:       rcnt += length;  //これまでに読み出した長さを更新する
   366:     }
   367:     if (DEBUG) {
   368:       System.out.println ("skip end " + length);
   369:     }
   370:     return length;  //読み飛ばせた長さ
   371:   }  //skip
   372: 
   373:   //length = unused ()
   374:   //  書き込める長さ
   375:   //結果
   376:   //  length  書き込める長さ。0=満杯で書き込めない
   377:   public final int unused () {
   378:     if (canceled) {  //キャンセルされた
   379:       return size;
   380:     }
   381:     return size - (wcnt - rcnt);  //書き込める長さ
   382:   }  //unused
   383: 
   384:   //length = used ()
   385:   //  読み出せる長さ
   386:   //結果
   387:   //  length  読み出せる長さ。0=空で読み出せない
   388:   public final int used () {
   389:     if (canceled) {  //キャンセルされた
   390:       return 0;
   391:     }
   392:     return wcnt - rcnt;  //読み出せる長さ
   393:   }  //used
   394: 
   395:   //data = waitAndRead ()
   396:   //  1バイト読み出す。1バイト読み出せるまでブロックする
   397:   //結果
   398:   //  data  読み出したデータ。0~255。-1=キャンセルされた
   399:   public final int waitAndRead () {
   400:     if (canceled) {  //キャンセルされた
   401:       return -1;
   402:     }
   403:     if (DEBUG) {
   404:       System.out.println ("waitAndRead start");
   405:     }
   406:     int data = read ();  //1バイト読み出す
   407:     if (data < 0) {  //読み出せない
   408:       try {
   409:         synchronized (lock) {
   410:           do {
   411:             lock.wait ();  //ブロックする
   412:             if (canceled) {  //キャンセルされた
   413:               break;
   414:             }
   415:             data = read ();  //1バイト読み出す
   416:           } while (data < 0);  //読み出せない
   417:         }
   418:       } catch (InterruptedException ie) {
   419:       }
   420:     }
   421:     if (DEBUG) {
   422:       System.out.println ("waitAndRead end " + data);
   423:     }
   424:     return data;  //データを返す
   425:   }  //waitAndRead
   426: 
   427:   //length = waitAndRead (array, offset, length)
   428:   //  配列へ読み出す。1バイト以上読み出せるまでブロックする
   429:   //引数
   430:   //  array   配列
   431:   //  offset  配列の位置
   432:   //  length  読み出す長さ
   433:   //結果
   434:   //  length  読み出せた長さ。-1=キャンセルされた
   435:   public final int waitAndRead (byte[] array, int offset, int length) {
   436:     if (canceled) {  //キャンセルされた
   437:       return -1;
   438:     }
   439:     if (DEBUG) {
   440:       System.out.println ("waitAndRead start array," + offset + "," + length);
   441:     }
   442:     if (length < 0) {
   443:       throw new IllegalArgumentException (String.format ("length=%d", length));
   444:     }
   445:     if (0 < length) {  //1バイト以上読み出すとき
   446:       int data = waitAndRead ();  //1バイト読み出す。1バイト読み出せるまでブロックする
   447:       if (data < 0) {  //キャンセルされたとき
   448:         length = -1;  //-1を返す
   449:       } else {  //読み出せたとき
   450:         array[offset] = (byte) data;  //1バイト目
   451:         length = 1 + read (array, offset + 1, length - 1);  //2バイト目以降
   452:       }
   453:     }
   454:     if (DEBUG) {
   455:       System.out.println ("waitAndRead end " + length);
   456:     }
   457:     return length;
   458:   }  //waitAndRead
   459: 
   460:   //length = write (data)
   461:   //  1バイト書き込む
   462:   //引数
   463:   //  data    書き込むデータ。下位8bitのみ有効
   464:   //結果
   465:   //  length  書き込めた長さ。0=満杯で書き込めなかった。1=書き込めた
   466:   public final int write (int data) {
   467:     if (canceled) {  //キャンセルされた
   468:       return 0;
   469:     }
   470:     if (DEBUG) {
   471:       System.out.println ("write start " + data);
   472:     }
   473:     int length = 0;
   474:     if ((wcnt - rcnt) < size) {  //1バイト以上書き込めるとき
   475:       length = newest.write (data);  //最も新しいブロックに書き込む
   476:       if (length == 0) {  //最も新しいブロックが満杯で書き込めなかったとき
   477:         //最も新しいブロックを追加する
   478:         //  newest     newer
   479:         //      <---older
   480:         //      newer--->
   481:         //             newest
   482:         Block newer = new Block ();
   483:         newer.older = newest;
   484:         newest.newer = newer;
   485:         newest = newer;
   486:         //
   487:         length = newest.write (data);  //最も新しいブロックに書き込む
   488:       }
   489:       wcnt += length;  //これまでに書き込んだ長さを更新する
   490:       synchronized (lock) {
   491:         lock.notify ();  //waitAndReadのブロックを解除する
   492:       }
   493:     }
   494:     if (DEBUG) {
   495:       System.out.println ("write end " + length);
   496:     }
   497:     return length;
   498:   }  //write
   499: 
   500:   //length = write (array, offset, length)
   501:   //  配列から書き込む
   502:   //引数
   503:   //  array   配列
   504:   //  offset  配列の位置
   505:   //  length  書き込む長さ
   506:   //結果
   507:   //  length  書き込めた長さ。0=満杯で書き込めなかった
   508:   public final int write (byte[] array, int offset, int length) {
   509:     if (canceled) {  //キャンセルされた
   510:       return 0;
   511:     }
   512:     if (DEBUG) {
   513:       System.out.println ("write start array," + offset + "," + length);
   514:     }
   515:     length = Math.min (length, size - (wcnt - rcnt));  //書き込める長さに抑える
   516:     if (0 < length) {  //1バイト以上書き込めて1バイト以上書き込むとき
   517:       int o = offset;  //次の位置
   518:       int l = length;  //残りの長さ
   519:       int k = newest.write (array, o, l);  //最も新しいブロックに書き込む
   520:       o += k;
   521:       l -= k;
   522:       while (0 < l) {
   523:         //最も新しいブロックを追加する
   524:         //  newest     newer
   525:         //      <---older
   526:         //      newer--->
   527:         //             newest
   528:         Block newer = new Block ();
   529:         newer.older = newest;
   530:         newest.newer = newer;
   531:         newest = newer;
   532:         //
   533:         k = newest.write (array, o, l);  //最も新しいブロックに書き込む
   534:         o += k;
   535:         l -= k;
   536:       }
   537:       wcnt += length;  //これまでに書き込んだ長さを更新する
   538:       synchronized (lock) {
   539:         lock.notify ();  //waitAndReadのブロックを解除する
   540:       }
   541:     }
   542:     if (DEBUG) {
   543:       System.out.println ("write end " + length);
   544:     }
   545:     return length;  //書き込めた長さ
   546:   }  //write
   547: 
   548: 
   549:   //動作確認
   550:   static {
   551:     if (TEST) {
   552:       System.out.println ("ByteQueue test");
   553:       long start = System.currentTimeMillis ();
   554:       final ByteQueue queue = new ByteQueue ();
   555:       final int unit = Math.max (1, Block.size >> 6);
   556:       Thread writer = new Thread () {
   557:         @Override public void run () {
   558:           byte[] a = new byte[unit * 243];
   559:           int x = 0;
   560:           for (int n = 0; n < 243; n++) {
   561:             int l = unit * (x + 1);  //xの並びの長さ
   562:             if ((n & 1) == 0) {  //nが偶数のとき
   563:               for (int i = 0; i < l; i++) {
   564:                 queue.write (x);  //1バイトずつ書き込む
   565:               }
   566:             } else {  //nが奇数のとき
   567:               Arrays.fill (a, 0, l, (byte) x);
   568:               queue.write (a, 0, l);  //まとめて書き込む
   569:             }
   570:             x = (x * 7 + 1) % 243;  //次のx
   571:           }
   572:         }
   573:       };
   574:       Thread reader = new Thread () {
   575:         @Override public void run () {
   576:           int session = 0;  //waitAndReadを呼び出した回数
   577:           int error = 0;  //一致しなかったデータの数
   578:           int t = unit * (243 * (243 + 1) / 2);  //全体の長さ
   579:           byte[] a = new byte[t];
   580:           int x = 0;
   581:           int l = unit * (x + 1);  //xの並びの長さ
   582:           for (int n = 0; n < 243; ) {
   583:             session++;
   584:             int k = queue.waitAndRead (a, 0, t);  //読めるだけ読む
   585:             t -= k;
   586:             int i = 0;  //aのインデックス
   587:             while (k != 0) {
   588:               int m = Math.min (k, l);  //xの並びの読めた部分の長さ
   589:               k -= m;
   590:               for (int j = 0; j < m; j++) {
   591:                 int data = a[i++] & 255;
   592:                 if (data != x) {
   593:                   error++;
   594:                 }
   595:               }
   596:               l -= m;
   597:               if (l == 0) {  //xの並びが終わった
   598:                 x = (x * 7 + 1) % 243;  //次のx
   599:                 l = unit * (x + 1);  //xの並びの長さ
   600:                 n++;
   601:               }
   602:             }
   603:           }
   604:           System.out.println (session + " sessions");
   605:           System.out.println (error + " errors");
   606:         }
   607:       };
   608:       reader.start ();
   609:       writer.start ();
   610:       try {
   611:         writer.join ();
   612:       } catch (InterruptedException ie) {
   613:       }
   614:       try {
   615:         reader.join ();
   616:       } catch (InterruptedException ie) {
   617:       }
   618:       long end = System.currentTimeMillis ();
   619:       System.out.println ((end - start) + " ms");
   620:     }  //if
   621:   }  //static
   622: 
   623: 
   624: }  //class ByteQueue