ByteQueue.java
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13: package xeij;
14:
15: import java.util.*;
16:
17:
18:
19:
20:
21: public final class ByteQueue {
22:
23: private static final boolean DEBUG = false;
24: private static final boolean TEST = false;
25:
26:
27:
28:
29: private static final class Block {
30:
31: private volatile Block newer = null;
32: private volatile Block older = null;
33:
34: private static final int size = 65536;
35: private final byte[] buff = new byte[size];
36: private volatile int wcnt = 0;
37: private volatile int rcnt = 0;
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50: private final int clear () {
51: int length = wcnt - rcnt;
52: if (0 < length) {
53: rcnt += length;
54: }
55: return length;
56: }
57:
58:
59:
60:
61:
62: private final int read () {
63: return ((wcnt - rcnt) == 0 ? -1 :
64: buff[(rcnt++) & (size - 1)] & 255);
65: }
66:
67:
68:
69:
70:
71:
72:
73:
74:
75: private final int read (byte[] array, int offset, int length) {
76: if (length < 0) {
77: throw new IllegalArgumentException (String.format ("length=%d", length));
78: }
79: length = Math.min (length, wcnt - rcnt);
80: if (0 < length) {
81: int index = rcnt & (size - 1);
82: int first = Math.min (length, size - index);
83: System.arraycopy (buff, index,
84: array, offset,
85: first);
86: if (first < length) {
87: System.arraycopy (buff, 0,
88: array, offset + first,
89: length - first);
90: }
91: rcnt += length;
92: }
93: return length;
94: }
95:
96:
97:
98:
99:
100:
101:
102: private final int skip (int length) {
103: if (length < 0) {
104: throw new IllegalArgumentException (String.format ("length=%d", length));
105: }
106: length = Math.min (length, wcnt - rcnt);
107: if (0 < length) {
108: rcnt += length;
109: }
110: return length;
111: }
112:
113:
114:
115:
116:
117: private final int unused () {
118: return size - (wcnt - rcnt);
119: }
120:
121:
122:
123:
124:
125: private final int used () {
126: return wcnt - rcnt;
127: }
128:
129:
130:
131:
132:
133:
134:
135: private final int write (int data) {
136: if ((wcnt - rcnt) == size) {
137: return 0;
138: }
139: buff[(wcnt++) & (size - 1)] = (byte) data;
140: return 1;
141: }
142:
143:
144:
145:
146:
147:
148:
149:
150:
151: private final int write (byte[] array, int offset, int length) {
152: if (length < 0) {
153: throw new IllegalArgumentException (String.format ("length=%d", length));
154: }
155: length = Math.min (length, size - (wcnt - rcnt));
156: if (0 < length) {
157: int index = wcnt & (size - 1);
158: int first = Math.min (length, size - index);
159: System.arraycopy (array, offset,
160: buff, index,
161: first);
162: if (first < length) {
163: System.arraycopy (array, offset + first,
164: buff, 0,
165: length - first);
166: }
167: wcnt += length;
168: }
169: return length;
170: }
171:
172: }
173:
174:
175: private volatile Block newest = new Block ();
176: private volatile Block oldest = newest;
177:
178: private static final int size = 0x7fffffff;
179: private volatile int wcnt = 0;
180: private volatile int rcnt = 0;
181:
182:
183:
184:
185:
186:
187: private final Object lock = new Object ();
188: private volatile boolean canceled = false;
189:
190:
191:
192:
193: public final void cancel () {
194: if (canceled) {
195: return;
196: }
197: if (DEBUG) {
198: System.out.println ("cancel start");
199: }
200: canceled = true;
201: synchronized (lock) {
202: lock.notify ();
203: }
204: if (DEBUG) {
205: System.out.println ("cancel end");
206: }
207: }
208:
209:
210:
211:
212:
213: public final int clear () {
214: if (canceled) {
215: return 0;
216: }
217: if (DEBUG) {
218: System.out.println ("clear start");
219: }
220: int length = wcnt - rcnt;
221: if (0 < length) {
222: int l = length;
223: int k = oldest.skip (l);
224: l -= k;
225: while (0 < l) {
226:
227:
228:
229:
230:
231: Block newer = oldest.newer;
232: newer.older = null;
233: oldest.newer = null;
234: oldest = newer;
235:
236: k = oldest.skip (l);
237: l -= k;
238: }
239: rcnt += length;
240: }
241: if (DEBUG) {
242: System.out.println ("clear end " + length);
243: }
244: return length;
245: }
246:
247:
248:
249:
250:
251: public final int read () {
252: if (canceled) {
253: return -1;
254: }
255: if (DEBUG) {
256: System.out.println ("read start");
257: }
258: int data = -1;
259: if (0 < (wcnt - rcnt)) {
260: data = oldest.read ();
261: if (data < 0) {
262:
263:
264:
265:
266:
267: Block newer = oldest.newer;
268: newer.older = null;
269: oldest.newer = null;
270: oldest = newer;
271:
272: data = oldest.read ();
273: }
274: rcnt++;
275: }
276: if (DEBUG) {
277: System.out.println ("read end " + data);
278: }
279: return data;
280: }
281:
282:
283:
284:
285:
286:
287:
288:
289:
290: public final int read (byte[] array, int offset, int length) {
291: if (canceled) {
292: return 0;
293: }
294: if (DEBUG) {
295: System.out.println ("read start array," + offset + "," + length);
296: }
297: if (length < 0) {
298: throw new IllegalArgumentException (String.format ("length=%d", length));
299: }
300: length = Math.min (length, wcnt - rcnt);
301: if (0 < length) {
302: int o = offset;
303: int l = length;
304: int k = oldest.read (array, o, l);
305: o += k;
306: l -= k;
307: while (0 < l) {
308:
309:
310:
311:
312:
313: Block newer = oldest.newer;
314: newer.older = null;
315: oldest.newer = null;
316: oldest = newer;
317:
318: k = oldest.read (array, o, l);
319: o += k;
320: l -= k;
321: }
322: rcnt += length;
323: }
324: if (DEBUG) {
325: System.out.println ("read end " + length);
326: }
327: return length;
328: }
329:
330:
331:
332:
333:
334:
335:
336: public final int skip (int length) {
337: if (canceled) {
338: return 0;
339: }
340: if (DEBUG) {
341: System.out.println ("skip start " + length);
342: }
343: if (length < 0) {
344: throw new IllegalArgumentException (String.format ("length=%d", length));
345: }
346: length = Math.min (length, wcnt - rcnt);
347: if (0 < length) {
348: int l = length;
349: int k = oldest.skip (l);
350: l -= k;
351: while (0 < l) {
352:
353:
354:
355:
356:
357: Block newer = oldest.newer;
358: newer.older = null;
359: oldest.newer = null;
360: oldest = newer;
361:
362: k = oldest.skip (l);
363: l -= k;
364: }
365: rcnt += length;
366: }
367: if (DEBUG) {
368: System.out.println ("skip end " + length);
369: }
370: return length;
371: }
372:
373:
374:
375:
376:
377: public final int unused () {
378: if (canceled) {
379: return size;
380: }
381: return size - (wcnt - rcnt);
382: }
383:
384:
385:
386:
387:
388: public final int used () {
389: if (canceled) {
390: return 0;
391: }
392: return wcnt - rcnt;
393: }
394:
395:
396:
397:
398:
399: public final int waitAndRead () {
400: if (canceled) {
401: return -1;
402: }
403: if (DEBUG) {
404: System.out.println ("waitAndRead start");
405: }
406: int data = read ();
407: if (data < 0) {
408: try {
409: synchronized (lock) {
410: do {
411: lock.wait ();
412: if (canceled) {
413: break;
414: }
415: data = read ();
416: } while (data < 0);
417: }
418: } catch (InterruptedException ie) {
419: }
420: }
421: if (DEBUG) {
422: System.out.println ("waitAndRead end " + data);
423: }
424: return data;
425: }
426:
427:
428:
429:
430:
431:
432:
433:
434:
435: public final int waitAndRead (byte[] array, int offset, int length) {
436: if (canceled) {
437: return -1;
438: }
439: if (DEBUG) {
440: System.out.println ("waitAndRead start array," + offset + "," + length);
441: }
442: if (length < 0) {
443: throw new IllegalArgumentException (String.format ("length=%d", length));
444: }
445: if (0 < length) {
446: int data = waitAndRead ();
447: if (data < 0) {
448: length = -1;
449: } else {
450: array[offset] = (byte) data;
451: length = 1 + read (array, offset + 1, length - 1);
452: }
453: }
454: if (DEBUG) {
455: System.out.println ("waitAndRead end " + length);
456: }
457: return length;
458: }
459:
460:
461:
462:
463:
464:
465:
466: public final int write (int data) {
467: if (canceled) {
468: return 0;
469: }
470: if (DEBUG) {
471: System.out.println ("write start " + data);
472: }
473: int length = 0;
474: if ((wcnt - rcnt) < size) {
475: length = newest.write (data);
476: if (length == 0) {
477:
478:
479:
480:
481:
482: Block newer = new Block ();
483: newer.older = newest;
484: newest.newer = newer;
485: newest = newer;
486:
487: length = newest.write (data);
488: }
489: wcnt += length;
490: synchronized (lock) {
491: lock.notify ();
492: }
493: }
494: if (DEBUG) {
495: System.out.println ("write end " + length);
496: }
497: return length;
498: }
499:
500:
501:
502:
503:
504:
505:
506:
507:
508: public final int write (byte[] array, int offset, int length) {
509: if (canceled) {
510: return 0;
511: }
512: if (DEBUG) {
513: System.out.println ("write start array," + offset + "," + length);
514: }
515: length = Math.min (length, size - (wcnt - rcnt));
516: if (0 < length) {
517: int o = offset;
518: int l = length;
519: int k = newest.write (array, o, l);
520: o += k;
521: l -= k;
522: while (0 < l) {
523:
524:
525:
526:
527:
528: Block newer = new Block ();
529: newer.older = newest;
530: newest.newer = newer;
531: newest = newer;
532:
533: k = newest.write (array, o, l);
534: o += k;
535: l -= k;
536: }
537: wcnt += length;
538: synchronized (lock) {
539: lock.notify ();
540: }
541: }
542: if (DEBUG) {
543: System.out.println ("write end " + length);
544: }
545: return length;
546: }
547:
548:
549:
550: static {
551: if (TEST) {
552: System.out.println ("ByteQueue test");
553: long start = System.currentTimeMillis ();
554: final ByteQueue queue = new ByteQueue ();
555: final int unit = Math.max (1, Block.size >> 6);
556: Thread writer = new Thread () {
557: @Override public void run () {
558: byte[] a = new byte[unit * 243];
559: int x = 0;
560: for (int n = 0; n < 243; n++) {
561: int l = unit * (x + 1);
562: if ((n & 1) == 0) {
563: for (int i = 0; i < l; i++) {
564: queue.write (x);
565: }
566: } else {
567: Arrays.fill (a, 0, l, (byte) x);
568: queue.write (a, 0, l);
569: }
570: x = (x * 7 + 1) % 243;
571: }
572: }
573: };
574: Thread reader = new Thread () {
575: @Override public void run () {
576: int session = 0;
577: int error = 0;
578: int t = unit * (243 * (243 + 1) / 2);
579: byte[] a = new byte[t];
580: int x = 0;
581: int l = unit * (x + 1);
582: for (int n = 0; n < 243; ) {
583: session++;
584: int k = queue.waitAndRead (a, 0, t);
585: t -= k;
586: int i = 0;
587: while (k != 0) {
588: int m = Math.min (k, l);
589: k -= m;
590: for (int j = 0; j < m; j++) {
591: int data = a[i++] & 255;
592: if (data != x) {
593: error++;
594: }
595: }
596: l -= m;
597: if (l == 0) {
598: x = (x * 7 + 1) % 243;
599: l = unit * (x + 1);
600: n++;
601: }
602: }
603: }
604: System.out.println (session + " sessions");
605: System.out.println (error + " errors");
606: }
607: };
608: reader.start ();
609: writer.start ();
610: try {
611: writer.join ();
612: } catch (InterruptedException ie) {
613: }
614: try {
615: reader.join ();
616: } catch (InterruptedException ie) {
617: }
618: long end = System.currentTimeMillis ();
619: System.out.println ((end - start) + " ms");
620: }
621: }
622:
623:
624: }