pipe.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. // +build windows
  2. package winio
  3. import (
  4. "errors"
  5. "io"
  6. "net"
  7. "os"
  8. "syscall"
  9. "time"
  10. "unsafe"
  11. )
  12. //sys connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) = ConnectNamedPipe
  13. //sys createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateNamedPipeW
  14. //sys createFile(name string, access uint32, mode uint32, sa *syscall.SecurityAttributes, createmode uint32, attrs uint32, templatefile syscall.Handle) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateFileW
  15. //sys waitNamedPipe(name string, timeout uint32) (err error) = WaitNamedPipeW
  16. //sys getNamedPipeInfo(pipe syscall.Handle, flags *uint32, outSize *uint32, inSize *uint32, maxInstances *uint32) (err error) = GetNamedPipeInfo
  17. //sys getNamedPipeHandleState(pipe syscall.Handle, state *uint32, curInstances *uint32, maxCollectionCount *uint32, collectDataTimeout *uint32, userName *uint16, maxUserNameSize uint32) (err error) = GetNamedPipeHandleStateW
  18. //sys localAlloc(uFlags uint32, length uint32) (ptr uintptr) = LocalAlloc
  19. const (
  20. cERROR_PIPE_BUSY = syscall.Errno(231)
  21. cERROR_NO_DATA = syscall.Errno(232)
  22. cERROR_PIPE_CONNECTED = syscall.Errno(535)
  23. cERROR_SEM_TIMEOUT = syscall.Errno(121)
  24. cPIPE_ACCESS_DUPLEX = 0x3
  25. cFILE_FLAG_FIRST_PIPE_INSTANCE = 0x80000
  26. cSECURITY_SQOS_PRESENT = 0x100000
  27. cSECURITY_ANONYMOUS = 0
  28. cPIPE_REJECT_REMOTE_CLIENTS = 0x8
  29. cPIPE_UNLIMITED_INSTANCES = 255
  30. cNMPWAIT_USE_DEFAULT_WAIT = 0
  31. cNMPWAIT_NOWAIT = 1
  32. cPIPE_TYPE_MESSAGE = 4
  33. cPIPE_READMODE_MESSAGE = 2
  34. )
  35. var (
  36. // ErrPipeListenerClosed is returned for pipe operations on listeners that have been closed.
  37. // This error should match net.errClosing since docker takes a dependency on its text.
  38. ErrPipeListenerClosed = errors.New("use of closed network connection")
  39. errPipeWriteClosed = errors.New("pipe has been closed for write")
  40. )
  41. type win32Pipe struct {
  42. *win32File
  43. path string
  44. }
  45. type win32MessageBytePipe struct {
  46. win32Pipe
  47. writeClosed bool
  48. readEOF bool
  49. }
  50. type pipeAddress string
  51. func (f *win32Pipe) LocalAddr() net.Addr {
  52. return pipeAddress(f.path)
  53. }
  54. func (f *win32Pipe) RemoteAddr() net.Addr {
  55. return pipeAddress(f.path)
  56. }
  57. func (f *win32Pipe) SetDeadline(t time.Time) error {
  58. f.SetReadDeadline(t)
  59. f.SetWriteDeadline(t)
  60. return nil
  61. }
  62. // CloseWrite closes the write side of a message pipe in byte mode.
  63. func (f *win32MessageBytePipe) CloseWrite() error {
  64. if f.writeClosed {
  65. return errPipeWriteClosed
  66. }
  67. err := f.win32File.Flush()
  68. if err != nil {
  69. return err
  70. }
  71. _, err = f.win32File.Write(nil)
  72. if err != nil {
  73. return err
  74. }
  75. f.writeClosed = true
  76. return nil
  77. }
  78. // Write writes bytes to a message pipe in byte mode. Zero-byte writes are ignored, since
  79. // they are used to implement CloseWrite().
  80. func (f *win32MessageBytePipe) Write(b []byte) (int, error) {
  81. if f.writeClosed {
  82. return 0, errPipeWriteClosed
  83. }
  84. if len(b) == 0 {
  85. return 0, nil
  86. }
  87. return f.win32File.Write(b)
  88. }
  89. // Read reads bytes from a message pipe in byte mode. A read of a zero-byte message on a message
  90. // mode pipe will return io.EOF, as will all subsequent reads.
  91. func (f *win32MessageBytePipe) Read(b []byte) (int, error) {
  92. if f.readEOF {
  93. return 0, io.EOF
  94. }
  95. n, err := f.win32File.Read(b)
  96. if err == io.EOF {
  97. // If this was the result of a zero-byte read, then
  98. // it is possible that the read was due to a zero-size
  99. // message. Since we are simulating CloseWrite with a
  100. // zero-byte message, ensure that all future Read() calls
  101. // also return EOF.
  102. f.readEOF = true
  103. } else if err == syscall.ERROR_MORE_DATA {
  104. // ERROR_MORE_DATA indicates that the pipe's read mode is message mode
  105. // and the message still has more bytes. Treat this as a success, since
  106. // this package presents all named pipes as byte streams.
  107. err = nil
  108. }
  109. return n, err
  110. }
  111. func (s pipeAddress) Network() string {
  112. return "pipe"
  113. }
  114. func (s pipeAddress) String() string {
  115. return string(s)
  116. }
  117. // DialPipe connects to a named pipe by path, timing out if the connection
  118. // takes longer than the specified duration. If timeout is nil, then the timeout
  119. // is the default timeout established by the pipe server.
  120. func DialPipe(path string, timeout *time.Duration) (net.Conn, error) {
  121. var absTimeout time.Time
  122. if timeout != nil {
  123. absTimeout = time.Now().Add(*timeout)
  124. }
  125. var err error
  126. var h syscall.Handle
  127. for {
  128. h, err = createFile(path, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.OPEN_EXISTING, syscall.FILE_FLAG_OVERLAPPED|cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  129. if err != cERROR_PIPE_BUSY {
  130. break
  131. }
  132. now := time.Now()
  133. var ms uint32
  134. if absTimeout.IsZero() {
  135. ms = cNMPWAIT_USE_DEFAULT_WAIT
  136. } else if now.After(absTimeout) {
  137. ms = cNMPWAIT_NOWAIT
  138. } else {
  139. ms = uint32(absTimeout.Sub(now).Nanoseconds() / 1000 / 1000)
  140. }
  141. err = waitNamedPipe(path, ms)
  142. if err != nil {
  143. if err == cERROR_SEM_TIMEOUT {
  144. return nil, ErrTimeout
  145. }
  146. break
  147. }
  148. }
  149. if err != nil {
  150. return nil, &os.PathError{Op: "open", Path: path, Err: err}
  151. }
  152. var flags uint32
  153. err = getNamedPipeInfo(h, &flags, nil, nil, nil)
  154. if err != nil {
  155. return nil, err
  156. }
  157. f, err := makeWin32File(h)
  158. if err != nil {
  159. syscall.Close(h)
  160. return nil, err
  161. }
  162. // If the pipe is in message mode, return a message byte pipe, which
  163. // supports CloseWrite().
  164. if flags&cPIPE_TYPE_MESSAGE != 0 {
  165. return &win32MessageBytePipe{
  166. win32Pipe: win32Pipe{win32File: f, path: path},
  167. }, nil
  168. }
  169. return &win32Pipe{win32File: f, path: path}, nil
  170. }
  171. type acceptResponse struct {
  172. f *win32File
  173. err error
  174. }
  175. type win32PipeListener struct {
  176. firstHandle syscall.Handle
  177. path string
  178. securityDescriptor []byte
  179. config PipeConfig
  180. acceptCh chan (chan acceptResponse)
  181. closeCh chan int
  182. doneCh chan int
  183. }
  184. func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
  185. var flags uint32 = cPIPE_ACCESS_DUPLEX | syscall.FILE_FLAG_OVERLAPPED
  186. if first {
  187. flags |= cFILE_FLAG_FIRST_PIPE_INSTANCE
  188. }
  189. var mode uint32 = cPIPE_REJECT_REMOTE_CLIENTS
  190. if c.MessageMode {
  191. mode |= cPIPE_TYPE_MESSAGE
  192. }
  193. sa := &syscall.SecurityAttributes{}
  194. sa.Length = uint32(unsafe.Sizeof(*sa))
  195. if securityDescriptor != nil {
  196. len := uint32(len(securityDescriptor))
  197. sa.SecurityDescriptor = localAlloc(0, len)
  198. defer localFree(sa.SecurityDescriptor)
  199. copy((*[0xffff]byte)(unsafe.Pointer(sa.SecurityDescriptor))[:], securityDescriptor)
  200. }
  201. h, err := createNamedPipe(path, flags, mode, cPIPE_UNLIMITED_INSTANCES, uint32(c.OutputBufferSize), uint32(c.InputBufferSize), 0, sa)
  202. if err != nil {
  203. return 0, &os.PathError{Op: "open", Path: path, Err: err}
  204. }
  205. return h, nil
  206. }
  207. func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
  208. h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false)
  209. if err != nil {
  210. return nil, err
  211. }
  212. f, err := makeWin32File(h)
  213. if err != nil {
  214. syscall.Close(h)
  215. return nil, err
  216. }
  217. return f, nil
  218. }
  219. func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
  220. p, err := l.makeServerPipe()
  221. if err != nil {
  222. return nil, err
  223. }
  224. // Wait for the client to connect.
  225. ch := make(chan error)
  226. go func(p *win32File) {
  227. ch <- connectPipe(p)
  228. }(p)
  229. select {
  230. case err = <-ch:
  231. if err != nil {
  232. p.Close()
  233. p = nil
  234. }
  235. case <-l.closeCh:
  236. // Abort the connect request by closing the handle.
  237. p.Close()
  238. p = nil
  239. err = <-ch
  240. if err == nil || err == ErrFileClosed {
  241. err = ErrPipeListenerClosed
  242. }
  243. }
  244. return p, err
  245. }
  246. func (l *win32PipeListener) listenerRoutine() {
  247. closed := false
  248. for !closed {
  249. select {
  250. case <-l.closeCh:
  251. closed = true
  252. case responseCh := <-l.acceptCh:
  253. var (
  254. p *win32File
  255. err error
  256. )
  257. for {
  258. p, err = l.makeConnectedServerPipe()
  259. // If the connection was immediately closed by the client, try
  260. // again.
  261. if err != cERROR_NO_DATA {
  262. break
  263. }
  264. }
  265. responseCh <- acceptResponse{p, err}
  266. closed = err == ErrPipeListenerClosed
  267. }
  268. }
  269. syscall.Close(l.firstHandle)
  270. l.firstHandle = 0
  271. // Notify Close() and Accept() callers that the handle has been closed.
  272. close(l.doneCh)
  273. }
  274. // PipeConfig contain configuration for the pipe listener.
  275. type PipeConfig struct {
  276. // SecurityDescriptor contains a Windows security descriptor in SDDL format.
  277. SecurityDescriptor string
  278. // MessageMode determines whether the pipe is in byte or message mode. In either
  279. // case the pipe is read in byte mode by default. The only practical difference in
  280. // this implementation is that CloseWrite() is only supported for message mode pipes;
  281. // CloseWrite() is implemented as a zero-byte write, but zero-byte writes are only
  282. // transferred to the reader (and returned as io.EOF in this implementation)
  283. // when the pipe is in message mode.
  284. MessageMode bool
  285. // InputBufferSize specifies the size the input buffer, in bytes.
  286. InputBufferSize int32
  287. // OutputBufferSize specifies the size the input buffer, in bytes.
  288. OutputBufferSize int32
  289. }
  290. // ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
  291. // The pipe must not already exist.
  292. func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
  293. var (
  294. sd []byte
  295. err error
  296. )
  297. if c == nil {
  298. c = &PipeConfig{}
  299. }
  300. if c.SecurityDescriptor != "" {
  301. sd, err = SddlToSecurityDescriptor(c.SecurityDescriptor)
  302. if err != nil {
  303. return nil, err
  304. }
  305. }
  306. h, err := makeServerPipeHandle(path, sd, c, true)
  307. if err != nil {
  308. return nil, err
  309. }
  310. // Immediately open and then close a client handle so that the named pipe is
  311. // created but not currently accepting connections.
  312. h2, err := createFile(path, 0, 0, nil, syscall.OPEN_EXISTING, cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  313. if err != nil {
  314. syscall.Close(h)
  315. return nil, err
  316. }
  317. syscall.Close(h2)
  318. l := &win32PipeListener{
  319. firstHandle: h,
  320. path: path,
  321. securityDescriptor: sd,
  322. config: *c,
  323. acceptCh: make(chan (chan acceptResponse)),
  324. closeCh: make(chan int),
  325. doneCh: make(chan int),
  326. }
  327. go l.listenerRoutine()
  328. return l, nil
  329. }
  330. func connectPipe(p *win32File) error {
  331. c, err := p.prepareIo()
  332. if err != nil {
  333. return err
  334. }
  335. defer p.wg.Done()
  336. err = connectNamedPipe(p.handle, &c.o)
  337. _, err = p.asyncIo(c, nil, 0, err)
  338. if err != nil && err != cERROR_PIPE_CONNECTED {
  339. return err
  340. }
  341. return nil
  342. }
  343. func (l *win32PipeListener) Accept() (net.Conn, error) {
  344. ch := make(chan acceptResponse)
  345. select {
  346. case l.acceptCh <- ch:
  347. response := <-ch
  348. err := response.err
  349. if err != nil {
  350. return nil, err
  351. }
  352. if l.config.MessageMode {
  353. return &win32MessageBytePipe{
  354. win32Pipe: win32Pipe{win32File: response.f, path: l.path},
  355. }, nil
  356. }
  357. return &win32Pipe{win32File: response.f, path: l.path}, nil
  358. case <-l.doneCh:
  359. return nil, ErrPipeListenerClosed
  360. }
  361. }
  362. func (l *win32PipeListener) Close() error {
  363. select {
  364. case l.closeCh <- 1:
  365. <-l.doneCh
  366. case <-l.doneCh:
  367. }
  368. return nil
  369. }
  370. func (l *win32PipeListener) Addr() net.Addr {
  371. return pipeAddress(l.path)
  372. }