NamedPipeInputStream.java
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13: package xeij;
14:
15: import java.io.*;
16: import java.lang.foreign.*;
17: import java.lang.invoke.*;
18: import java.util.*;
19:
20:
21:
22: public abstract class NamedPipeInputStream extends InputStream {
23:
24:
25:
26:
27:
28: public static NamedPipeInputStream create (String name) throws IOException {
29: return System.getProperty ("os.name").indexOf ("Windows") < 0 ? new Gen (name) : new Win (name);
30: }
31:
32: protected String path;
33: protected volatile boolean closed;
34: protected volatile boolean connecting;
35: protected volatile boolean reading;
36: protected Thread thread;
37: protected ByteQueue queue;
38:
39:
40:
41:
42:
43: private NamedPipeInputStream (String name) throws IOException {
44:
45: osStart (name);
46:
47: queue = new ByteQueue ();
48:
49: thread = new Thread () {
50: byte[] b = new byte[1024];
51: @Override public void run () {
52: try {
53:
54: connecting = true;
55: osOpenAndConnect ();
56: connecting = false;
57: while (!closed) {
58:
59: reading = true;
60: int k = osRead (b);
61: reading = false;
62: if (closed) {
63: break;
64: }
65: if (k == -1) {
66: osClose ();
67: connecting = true;
68: osOpenAndConnect ();
69: connecting = false;
70: continue;
71: }
72:
73: queue.write (b, 0, k);
74: }
75: } catch (IOException ioe) {
76: connecting = false;
77: reading = false;
78: }
79:
80: osClose ();
81: }
82: };
83: thread.start ();
84: if (false) {
85: System.out.println ("named pipe " + path + " opened");
86: }
87: }
88:
89:
90:
91:
92:
93: @Override
94: public int available () throws IOException {
95:
96: if (closed) {
97: throw new IOException ("named pipe " + path + " closed");
98: }
99:
100: return queue.used ();
101: }
102:
103:
104:
105:
106: @Override
107: public void close () {
108:
109: if (closed) {
110: return;
111: }
112: closed = true;
113:
114: osUnblock ();
115:
116: osClose ();
117:
118: try {
119: thread.join (100L);
120: } catch (InterruptedException ie) {
121: }
122:
123: osEnd ();
124:
125: queue.cancel ();
126: if (false) {
127: System.out.println ("named pipe " + path + " closed");
128: }
129: }
130:
131:
132:
133:
134:
135: @Override
136: public int read () throws IOException {
137:
138: if (closed) {
139: throw new IOException ("named pipe " + path + " closed");
140: }
141:
142: return queue.waitAndRead ();
143: }
144:
145:
146:
147:
148:
149:
150: @Override
151: public int read (byte[] b) throws IOException {
152:
153: if (closed) {
154: throw new IOException ("named pipe " + path + " closed");
155: }
156:
157: return queue.waitAndRead (b, 0, b.length);
158: }
159:
160:
161:
162:
163:
164:
165:
166:
167: @Override
168: public int read (byte[] b, int o, int n) throws IOException {
169:
170: if (closed) {
171: throw new IOException ("named pipe " + path + " closed");
172: }
173:
174: if (o < 0 || n < 0 || b.length < o + n) {
175: throw new IndexOutOfBoundsException ("b.length=" + b.length + ", o=" + o + ", n=" + n);
176: }
177:
178: return queue.waitAndRead (b, o, n);
179: }
180:
181:
182:
183:
184:
185:
186:
187:
188: @Override
189: public int readNBytes (byte[] b, int o, int n) throws IOException {
190:
191: if (closed) {
192: throw new IOException ("named pipe " + path + " closed");
193: }
194:
195: if (o < 0 || n < 0 || b.length < o + n) {
196: throw new IndexOutOfBoundsException ("b.length=" + b.length + ", o=" + o + ", n=" + n);
197: }
198:
199: int k = 0;
200: while (k < n) {
201: int t = queue.waitAndRead (b, o + k, n - k);
202: if (t == -1) {
203: return -1;
204: }
205: k += t;
206: }
207:
208: return k;
209: }
210:
211: protected abstract void osStart (String name) throws IOException;
212: protected abstract void osOpenAndConnect () throws IOException;
213: protected abstract int osRead (byte[] b) throws IOException;
214: protected abstract void osUnblock ();
215: protected abstract void osClose ();
216: protected abstract void osEnd ();
217:
218:
219:
220: private static class Gen extends NamedPipeInputStream {
221:
222: private File file;
223: private FileInputStream stream;
224:
225:
226:
227: protected Gen (String name) throws IOException {
228: super (name);
229: }
230:
231:
232:
233: @Override
234: protected void osStart (String name) throws IOException {
235:
236: path = System.getProperty ("java.io.tmpdir") + "/" + name;
237:
238: file = new File (path);
239:
240: file.delete ();
241: Process process;
242: try {
243: process = new ProcessBuilder ("mkfifo", path).inheritIO ().start ();
244: } catch (IOException ioe) {
245: throw new IOException ("mkfifo " + path + " not started");
246: }
247: try {
248: int exitCode = process.waitFor ();
249: if (exitCode != 0) {
250: file.delete ();
251: throw new IOException ("mkfifo " + path + " terminated with exit code " + exitCode);
252: }
253: } catch (InterruptedException ie) {
254: file.delete ();
255: throw new IOException ("mkfifo " + path + " interrupted");
256: }
257: }
258:
259:
260:
261: @Override
262: protected void osOpenAndConnect () throws IOException {
263: if (stream == null) {
264: stream = new FileInputStream (file);
265: }
266: }
267:
268:
269:
270: @Override
271: protected int osRead (byte[] b) throws IOException {
272: return stream.read (b);
273: }
274:
275:
276:
277: @Override
278: protected void osUnblock () {
279:
280:
281:
282: if (connecting) {
283: try {
284: Thread.sleep (50L);
285: } catch (InterruptedException ie) {
286: }
287: if (connecting) {
288: try {
289: new FileOutputStream (file).close ();
290: } catch (IOException ioe) {
291: }
292: for (int i = 0; i < 100 && connecting; i++) {
293: try {
294: Thread.sleep (50L);
295: } catch (InterruptedException ie) {
296: }
297: }
298: if (connecting) {
299: System.out.println ("named pipe " + path + " unblocking timeout");
300: }
301: }
302: }
303:
304:
305: if (reading) {
306: for (int i = 0; i < 100 && reading; i++) {
307: try {
308: Thread.sleep (50L);
309: } catch (InterruptedException ie) {
310: }
311: }
312: if (reading) {
313: System.out.println ("user operation required to unblock named pipe " + path);
314: while (reading) {
315: try {
316: Thread.sleep (1000L);
317: } catch (InterruptedException ie) {
318: }
319: }
320: }
321: }
322: }
323:
324:
325:
326: @Override
327: protected void osClose () {
328:
329: if (stream != null) {
330: try {
331: stream.close ();
332: } catch (IOException ioe) {
333: }
334: stream = null;
335: }
336: }
337:
338:
339:
340: @Override
341: protected void osEnd () {
342: file.delete ();
343: }
344:
345: }
346:
347:
348:
349: private static class Win extends NamedPipeInputStream {
350:
351:
352:
353: private static final int ERROR_BROKEN_PIPE = 109;
354: private static final int ERROR_BAD_PIPE = 230;
355: private static final int ERROR_NO_DATA = 232;
356: private static final int ERROR_PIPE_CONNECTED = 535;
357: private static final int ERROR_PIPE_LISTENING = 536;
358: private static final int ERROR_OPERATION_ABORTED = 995;
359: private static final int ERROR_NOT_FOUND = 1168;
360: private static final long INVALID_HANDLE_VALUE = -1L;
361:
362:
363: private Linker linker;
364: private MethodHandle downcallHandle (MemorySegment address, FunctionDescriptor function) {
365: return linker.downcallHandle (address, function);
366: }
367:
368:
369: private Arena arena;
370:
371:
372: private MethodHandle CancelIoEx;
373: private MethodHandle CloseHandle;
374: private MethodHandle ConnectNamedPipe;
375: private static final int PIPE_ACCESS_INBOUND = 0x00000001;
376: private static final int PIPE_TYPE_BYTE = 0x00000000;
377: private static final int PIPE_WAIT = 0x00000000;
378: private static final int PIPE_UNLIMITED_INSTANCES = 255;
379: private static final int BUFFER_SIZE = 8192;
380: private MethodHandle CreateNamedPipeA;
381: private MethodHandle GetLastError;
382: private MethodHandle ReadFile;
383:
384: private MemorySegment handle;
385:
386:
387:
388: protected Win (String name) throws IOException {
389: super (name);
390: }
391:
392:
393:
394: @Override
395: protected void osStart (String name) throws IOException {
396:
397: linker = Linker.nativeLinker ();
398:
399: arena = Arena.ofAuto ();
400:
401: SymbolLookup kernel32 = SymbolLookup.libraryLookup ("kernel32", arena);
402:
403: try {
404:
405:
406: CancelIoEx = downcallHandle (
407: kernel32.findOrThrow ("CancelIoEx"),
408: FunctionDescriptor.of (
409: ValueLayout.JAVA_INT,
410: ValueLayout.ADDRESS,
411: ValueLayout.ADDRESS));
412:
413:
414: CloseHandle = downcallHandle (
415: kernel32.findOrThrow ("CloseHandle"),
416: FunctionDescriptor.of (
417: ValueLayout.JAVA_INT,
418: ValueLayout.ADDRESS));
419:
420:
421: ConnectNamedPipe = downcallHandle (
422: kernel32.findOrThrow ("ConnectNamedPipe"),
423: FunctionDescriptor.of (
424: ValueLayout.JAVA_INT,
425: ValueLayout.ADDRESS,
426: ValueLayout.ADDRESS));
427:
428:
429: CreateNamedPipeA = downcallHandle (
430: kernel32.findOrThrow ("CreateNamedPipeA"),
431: FunctionDescriptor.of (
432: ValueLayout.ADDRESS,
433: ValueLayout.ADDRESS,
434: ValueLayout.JAVA_INT,
435: ValueLayout.JAVA_INT,
436: ValueLayout.JAVA_INT,
437: ValueLayout.JAVA_INT,
438: ValueLayout.JAVA_INT,
439: ValueLayout.JAVA_INT,
440: ValueLayout.ADDRESS));
441:
442:
443: GetLastError = downcallHandle (
444: kernel32.findOrThrow ("GetLastError"),
445: FunctionDescriptor.of (
446: ValueLayout.JAVA_INT));
447:
448:
449: ReadFile = downcallHandle (
450: kernel32.findOrThrow ("ReadFile"),
451: FunctionDescriptor.of (
452: ValueLayout.JAVA_INT,
453: ValueLayout.ADDRESS,
454: ValueLayout.ADDRESS,
455: ValueLayout.JAVA_INT,
456: ValueLayout.ADDRESS,
457: ValueLayout.ADDRESS));
458: } catch (NoSuchElementException nsee) {
459: nsee.printStackTrace ();
460: }
461:
462: path = "\\\\.\\pipe\\" + name;
463: }
464:
465:
466:
467: @Override
468: protected void osOpenAndConnect () throws IOException {
469:
470: if (handle == null) {
471: try {
472: int error;
473:
474: if ((handle = (MemorySegment) CreateNamedPipeA.invoke (
475: arena.allocateFrom (path),
476: PIPE_ACCESS_INBOUND,
477: PIPE_TYPE_BYTE | PIPE_WAIT,
478: PIPE_UNLIMITED_INSTANCES,
479: 0,
480: BUFFER_SIZE,
481: 0,
482: MemorySegment.NULL
483: )).address () == INVALID_HANDLE_VALUE &&
484: (error = (int) GetLastError.invoke ()) != -1) {
485: handle = null;
486:
487: throw new IOException ("CreateNamedPipeA returned error code " + error);
488: }
489: } catch (IOException ioe) {
490: throw ioe;
491: } catch (Throwable e) {
492: e.printStackTrace ();
493: throw new IOException ("CreateNamedPipeA invocation failed");
494: }
495: }
496:
497: try {
498: int error;
499:
500: if ((int) ConnectNamedPipe.invoke (
501: handle,
502: MemorySegment.NULL) == 0 &&
503: (error = (int) GetLastError.invoke ()) != -1) {
504: if (error == 0 ||
505: error == ERROR_NO_DATA ||
506: error == ERROR_PIPE_CONNECTED) {
507: } else if (error == ERROR_OPERATION_ABORTED) {
508: throw new InterruptedIOException ("ConnectNamedPipe aborted");
509: } else {
510: throw new IOException ("ConnectNamedPipe returned error code " + error);
511: }
512: }
513: } catch (IOException ioe) {
514: throw ioe;
515: } catch (Throwable e) {
516: e.printStackTrace ();
517: throw new IOException ("ConnectNamedPipe invocation failed");
518: }
519: }
520:
521:
522:
523: @Override
524: protected int osRead (byte[] b) throws IOException {
525: int n = b.length;
526: MemorySegment buf = arena.allocate ((long) n);
527: int k = 0;
528: MemorySegment t = arena.allocate (ValueLayout.JAVA_INT);
529: t.set (ValueLayout.JAVA_INT, 0L, 0);
530: try {
531: while (k < 1) {
532: t.set (ValueLayout.JAVA_INT, 0L, 0);
533: int error;
534:
535: if ((int) ReadFile.invoke (
536: handle,
537: buf.asSlice ((long) k),
538: n - k,
539: t,
540: MemorySegment.NULL) == 0 &&
541: (error = (int) GetLastError.invoke ()) != -1) {
542: if (closed) {
543: throw new IOException ("named pipe " + path + " closed");
544: }
545: if (error == 0 ||
546: error == ERROR_BROKEN_PIPE ||
547: error == ERROR_PIPE_LISTENING) {
548:
549: return -1;
550: } else if (error == ERROR_OPERATION_ABORTED) {
551: throw new InterruptedIOException ("ReadFile aborted");
552: } else {
553: throw new IOException ("ReadFile returned error code " + error);
554: }
555: }
556: if (closed) {
557: throw new IOException ("named pipe " + path + " closed");
558: }
559: k += t.get (ValueLayout.JAVA_INT, 0L);
560: }
561: } catch (IOException ioe) {
562: throw ioe;
563: } catch (Throwable e) {
564: e.printStackTrace ();
565: throw new IOException ("ReadFile invocation failed");
566: }
567: System.arraycopy (buf.toArray (ValueLayout.JAVA_BYTE), 0, b, 0, k);
568: return k;
569: }
570:
571:
572:
573: @Override
574: protected void osUnblock () {
575:
576:
577:
578: if (connecting || reading) {
579: try {
580: Thread.sleep (50L);
581: } catch (InterruptedException ie) {
582: }
583: if (connecting) {
584: try {
585: int error;
586:
587: if ((int) CancelIoEx.invoke (
588: handle,
589: MemorySegment.NULL) == 0 &&
590: (error = (int) GetLastError.invoke ()) != -1) {
591:
592:
593: }
594: } catch (Throwable e) {
595:
596:
597: }
598: for (int i = 0; i < 100 && (connecting || reading); i++) {
599: try {
600: Thread.sleep (50L);
601: } catch (InterruptedException ie) {
602: }
603: }
604: if (connecting || reading) {
605: System.out.println ("named pipe " + path + " unblocking timeout");
606: }
607: }
608: }
609: }
610:
611:
612:
613: @Override
614: protected void osClose () {
615:
616: if (handle != null) {
617: try {
618: int error;
619:
620: if ((int) CloseHandle.invoke (
621: handle) == 0 &&
622: (error = (int) GetLastError.invoke ()) != -1) {
623:
624: }
625:
626:
627: } catch (Throwable e) {
628: e.printStackTrace ();
629:
630: }
631: handle = null;
632: }
633: }
634:
635:
636:
637: @Override
638: protected void osEnd () {
639: }
640:
641: }
642:
643: }