ByteQueue.java
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13: package xeij;
14:
15: import java.util.*;
16:
17:
18:
19:
20:
21: public final class ByteQueue {
22:
23: private static final boolean DEBUG = false;
24: private static final boolean TEST = false;
25:
26:
27:
28:
29: private static final class Block {
30:
31: private volatile Block newer = null;
32: private volatile Block older = null;
33:
34: private static final int size = 65536;
35: private final byte[] buff = new byte[size];
36: private volatile int wcnt = 0;
37: private volatile int rcnt = 0;
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50: private final int clear () {
51: int length = wcnt - rcnt;
52: if (0 < length) {
53: rcnt += length;
54: }
55: return length;
56: }
57:
58:
59:
60:
61:
62: private final int read () {
63: return ((wcnt - rcnt) == 0 ? -1 :
64: buff[(rcnt++) & (size - 1)] & 255);
65: }
66:
67:
68:
69:
70:
71:
72:
73:
74:
75: private final int read (byte[] array, int offset, int length) {
76: if (length < 0) {
77: throw new IllegalArgumentException (String.format ("length=%d", length));
78: }
79: length = Math.min (length, wcnt - rcnt);
80: if (0 < length) {
81: int index = rcnt & (size - 1);
82: int first = Math.min (length, size - index);
83: System.arraycopy (buff, index,
84: array, offset,
85: first);
86: if (first < length) {
87: System.arraycopy (buff, 0,
88: array, offset + first,
89: length - first);
90: }
91: rcnt += length;
92: }
93: return length;
94: }
95:
96:
97:
98:
99:
100:
101:
102: private final int skip (int length) {
103: if (length < 0) {
104: throw new IllegalArgumentException (String.format ("length=%d", length));
105: }
106: length = Math.min (length, wcnt - rcnt);
107: if (0 < length) {
108: rcnt += length;
109: }
110: return length;
111: }
112:
113:
114:
115:
116:
117: private final int unused () {
118: return size - (wcnt - rcnt);
119: }
120:
121:
122:
123:
124:
125: private final int used () {
126: return wcnt - rcnt;
127: }
128:
129:
130:
131:
132:
133:
134:
135: private final int write (int data) {
136: if ((wcnt - rcnt) == size) {
137: return 0;
138: }
139: buff[(wcnt++) & (size - 1)] = (byte) data;
140: return 1;
141: }
142:
143:
144:
145:
146:
147:
148:
149:
150:
151: private final int write (byte[] array, int offset, int length) {
152: if (length < 0) {
153: throw new IllegalArgumentException (String.format ("length=%d", length));
154: }
155: length = Math.min (length, size - (wcnt - rcnt));
156: if (0 < length) {
157: int index = wcnt & (size - 1);
158: int first = Math.min (length, size - index);
159: System.arraycopy (array, offset,
160: buff, index,
161: first);
162: if (first < length) {
163: System.arraycopy (array, offset + first,
164: buff, 0,
165: length - first);
166: }
167: wcnt += length;
168: }
169: return length;
170: }
171:
172: }
173:
174:
175: private volatile Block newest = new Block ();
176: private volatile Block oldest = newest;
177:
178: private static final int size = 0x7fffffff;
179: private volatile int wcnt = 0;
180: private volatile int rcnt = 0;
181:
182:
183:
184:
185:
186:
187:
188:
189:
190:
191: public final int clear () {
192: return skip (wcnt - rcnt);
193: }
194:
195:
196:
197:
198:
199: public final int read () {
200: if (DEBUG) {
201: System.out.println ("read start");
202: }
203: int data = -1;
204: if (0 < (wcnt - rcnt)) {
205: data = oldest.read ();
206: if (data < 0) {
207:
208:
209:
210:
211:
212: Block newer = oldest.newer;
213: newer.older = null;
214: oldest.newer = null;
215: oldest = newer;
216:
217: data = oldest.read ();
218: }
219: rcnt++;
220: }
221: if (DEBUG) {
222: System.out.println ("read end " + data);
223: }
224: return data;
225: }
226:
227:
228:
229:
230:
231:
232:
233:
234:
235: public final int read (byte[] array, int offset, int length) {
236: if (DEBUG) {
237: System.out.println ("read start array," + offset + "," + length);
238: }
239: if (length < 0) {
240: throw new IllegalArgumentException (String.format ("length=%d", length));
241: }
242: length = Math.min (length, wcnt - rcnt);
243: if (0 < length) {
244: int o = offset;
245: int l = length;
246: int k = oldest.read (array, o, l);
247: o += k;
248: l -= k;
249: while (0 < l) {
250:
251:
252:
253:
254:
255: Block newer = oldest.newer;
256: newer.older = null;
257: oldest.newer = null;
258: oldest = newer;
259:
260: k = oldest.read (array, o, l);
261: o += k;
262: l -= k;
263: }
264: rcnt += length;
265: }
266: if (DEBUG) {
267: System.out.println ("read end " + length);
268: }
269: return length;
270: }
271:
272:
273:
274:
275:
276:
277:
278: public final int skip (int length) {
279: if (DEBUG) {
280: System.out.println ("skip start " + length);
281: }
282: if (length < 0) {
283: throw new IllegalArgumentException (String.format ("length=%d", length));
284: }
285: length = Math.min (length, wcnt - rcnt);
286: if (0 < length) {
287: int l = length;
288: int k = oldest.skip (l);
289: l -= k;
290: while (0 < l) {
291:
292:
293:
294:
295:
296: Block newer = oldest.newer;
297: newer.older = null;
298: oldest.newer = null;
299: oldest = newer;
300:
301: k = oldest.skip (l);
302: l -= k;
303: }
304: rcnt += length;
305: }
306: if (DEBUG) {
307: System.out.println ("skip end " + length);
308: }
309: return length;
310: }
311:
312:
313:
314:
315:
316: public final int unused () {
317: return size - (wcnt - rcnt);
318: }
319:
320:
321:
322:
323:
324: public final int used () {
325: return wcnt - rcnt;
326: }
327:
328:
329:
330:
331:
332:
333:
334: public final int write (int data) {
335: if (DEBUG) {
336: System.out.println ("write start " + data);
337: }
338: int length = 0;
339: if ((wcnt - rcnt) < size) {
340: length = newest.write (data);
341: if (length == 0) {
342:
343:
344:
345:
346:
347: Block newer = new Block ();
348: newer.older = newest;
349: newest.newer = newer;
350: newest = newer;
351:
352: length = newest.write (data);
353: }
354: wcnt += length;
355: }
356: Thread wt = waitThread;
357: if (wt != null) {
358: wt.interrupt ();
359: }
360: if (DEBUG) {
361: System.out.println ("write end " + length);
362: }
363: return length;
364: }
365:
366:
367:
368:
369:
370:
371:
372:
373:
374: public final int write (byte[] array, int offset, int length) {
375: if (DEBUG) {
376: System.out.println ("write start array," + offset + "," + length);
377: }
378: length = Math.min (length, size - (wcnt - rcnt));
379: if (0 < length) {
380: int o = offset;
381: int l = length;
382: int k = newest.write (array, o, l);
383: o += k;
384: l -= k;
385: while (0 < l) {
386:
387:
388:
389:
390:
391: Block newer = new Block ();
392: newer.older = newest;
393: newest.newer = newer;
394: newest = newer;
395:
396: k = newest.write (array, o, l);
397: o += k;
398: l -= k;
399: }
400: wcnt += length;
401: Thread wt = waitThread;
402: if (wt != null) {
403: wt.interrupt ();
404: }
405: }
406: if (DEBUG) {
407: System.out.println ("write end " + length);
408: }
409: return length;
410: }
411:
412:
413: private volatile Thread waitThread = null;
414:
415:
416:
417: public final void cancel () {
418: if (DEBUG) {
419: System.out.println ("cancel start");
420: }
421: Thread wt = waitThread;
422: if (wt != null) {
423: waitThread = null;
424: wt.interrupt ();
425: }
426: if (DEBUG) {
427: System.out.println ("cancel end");
428: }
429: }
430:
431:
432:
433:
434:
435: public final int waitAndRead () {
436: if (DEBUG) {
437: System.out.println ("waitAndRead start");
438: }
439: int data = -1;
440: waitThread = Thread.currentThread ();
441: while (waitThread != null) {
442: data = read ();
443: if (0 <= data ||
444: waitThread == null) {
445: break;
446: }
447: try {
448: Thread.sleep (1000L);
449: } catch (InterruptedException ie) {
450: }
451: }
452: waitThread = null;
453: if (DEBUG) {
454: System.out.println ("waitAndRead end " + data);
455: }
456: return data;
457: }
458:
459:
460:
461:
462:
463:
464:
465:
466:
467: public final int waitAndRead (byte[] array, int offset, int length) {
468: if (DEBUG) {
469: System.out.println ("waitAndRead start array," + offset + "," + length);
470: }
471: if (length < 0) {
472: throw new IllegalArgumentException (String.format ("length=%d", length));
473: }
474: if (0 < length) {
475: int data = waitAndRead ();
476: if (data < 0) {
477: length = -1;
478: } else {
479: array[offset] = (byte) data;
480: length = 1 + read (array, offset + 1, length - 1);
481: }
482: }
483: if (DEBUG) {
484: System.out.println ("waitAndRead end " + length);
485: }
486: return length;
487: }
488:
489:
490:
491: static {
492: if (TEST) {
493: System.out.println ("ByteQueue test");
494: long start = System.currentTimeMillis ();
495: final ByteQueue queue = new ByteQueue ();
496: final int unit = Math.max (1, Block.size >> 6);
497: Thread writer = new Thread () {
498: @Override public void run () {
499: byte[] a = new byte[unit * 243];
500: int x = 0;
501: for (int n = 0; n < 243; n++) {
502: int l = unit * (x + 1);
503: if ((n & 1) == 0) {
504: for (int i = 0; i < l; i++) {
505: queue.write (x);
506: }
507: } else {
508: Arrays.fill (a, 0, l, (byte) x);
509: queue.write (a, 0, l);
510: }
511: x = (x * 7 + 1) % 243;
512: }
513: }
514: };
515: Thread reader = new Thread () {
516: @Override public void run () {
517: int session = 0;
518: int error = 0;
519: int t = unit * (243 * (243 + 1) / 2);
520: byte[] a = new byte[t];
521: int x = 0;
522: int l = unit * (x + 1);
523: for (int n = 0; n < 243; ) {
524: session++;
525: int k = queue.waitAndRead (a, 0, t);
526: t -= k;
527: int i = 0;
528: while (k != 0) {
529: int m = Math.min (k, l);
530: k -= m;
531: for (int j = 0; j < m; j++) {
532: int data = a[i++] & 255;
533: if (data != x) {
534: error++;
535: }
536: }
537: l -= m;
538: if (l == 0) {
539: x = (x * 7 + 1) % 243;
540: l = unit * (x + 1);
541: n++;
542: }
543: }
544: }
545: System.out.println (session + " sessions");
546: System.out.println (error + " errors");
547: }
548: };
549: reader.start ();
550: writer.start ();
551: try {
552: writer.join ();
553: } catch (InterruptedException ie) {
554: }
555: try {
556: reader.join ();
557: } catch (InterruptedException ie) {
558: }
559: long end = System.currentTimeMillis ();
560: System.out.println ((end - start) + " ms");
561: }
562: }
563:
564:
565: }