pipe_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Test broken pipes on Unix systems.
  5. //go:build !plan9 && !js
  6. package os_test
  7. import (
  8. "bufio"
  9. "bytes"
  10. "fmt"
  11. "internal/testenv"
  12. "io"
  13. "io/fs"
  14. "os"
  15. osexec "os/exec"
  16. "os/signal"
  17. "runtime"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "syscall"
  22. "testing"
  23. "time"
  24. )
  25. func TestEPIPE(t *testing.T) {
  26. r, w, err := os.Pipe()
  27. if err != nil {
  28. t.Fatal(err)
  29. }
  30. if err := r.Close(); err != nil {
  31. t.Fatal(err)
  32. }
  33. expect := syscall.EPIPE
  34. if runtime.GOOS == "windows" {
  35. // 232 is Windows error code ERROR_NO_DATA, "The pipe is being closed".
  36. expect = syscall.Errno(232)
  37. }
  38. // Every time we write to the pipe we should get an EPIPE.
  39. for i := 0; i < 20; i++ {
  40. _, err = w.Write([]byte("hi"))
  41. if err == nil {
  42. t.Fatal("unexpected success of Write to broken pipe")
  43. }
  44. if pe, ok := err.(*fs.PathError); ok {
  45. err = pe.Err
  46. }
  47. if se, ok := err.(*os.SyscallError); ok {
  48. err = se.Err
  49. }
  50. if err != expect {
  51. t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
  52. }
  53. }
  54. }
  55. func TestStdPipe(t *testing.T) {
  56. switch runtime.GOOS {
  57. case "windows":
  58. t.Skip("Windows doesn't support SIGPIPE")
  59. }
  60. testenv.MustHaveExec(t)
  61. r, w, err := os.Pipe()
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. if err := r.Close(); err != nil {
  66. t.Fatal(err)
  67. }
  68. // Invoke the test program to run the test and write to a closed pipe.
  69. // If sig is false:
  70. // writing to stdout or stderr should cause an immediate SIGPIPE;
  71. // writing to descriptor 3 should fail with EPIPE and then exit 0.
  72. // If sig is true:
  73. // all writes should fail with EPIPE and then exit 0.
  74. for _, sig := range []bool{false, true} {
  75. for dest := 1; dest < 4; dest++ {
  76. cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
  77. cmd.Stdout = w
  78. cmd.Stderr = w
  79. cmd.ExtraFiles = []*os.File{w}
  80. cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
  81. if sig {
  82. cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
  83. }
  84. if err := cmd.Run(); err == nil {
  85. if !sig && dest < 3 {
  86. t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
  87. }
  88. } else if ee, ok := err.(*osexec.ExitError); !ok {
  89. t.Errorf("unexpected exec error type %T: %v", err, err)
  90. } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
  91. t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
  92. } else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
  93. if sig || dest > 2 {
  94. t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
  95. }
  96. } else {
  97. t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
  98. }
  99. }
  100. }
  101. // Test redirecting stdout but not stderr. Issue 40076.
  102. cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
  103. cmd.Stdout = w
  104. var stderr bytes.Buffer
  105. cmd.Stderr = &stderr
  106. cmd.Env = append(os.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
  107. if err := cmd.Run(); err == nil {
  108. t.Errorf("unexpected success of write to closed stdout")
  109. } else if ee, ok := err.(*osexec.ExitError); !ok {
  110. t.Errorf("unexpected exec error type %T: %v", err, err)
  111. } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
  112. t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
  113. } else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
  114. t.Errorf("unexpected exit status %v for write to closed stdout", err)
  115. }
  116. if output := stderr.Bytes(); len(output) > 0 {
  117. t.Errorf("unexpected output on stderr: %s", output)
  118. }
  119. }
  120. // This is a helper for TestStdPipe. It's not a test in itself.
  121. func TestStdPipeHelper(t *testing.T) {
  122. if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
  123. signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
  124. }
  125. switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
  126. case "1":
  127. os.Stdout.Write([]byte("stdout"))
  128. case "2":
  129. os.Stderr.Write([]byte("stderr"))
  130. case "3":
  131. if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
  132. os.Exit(3)
  133. }
  134. default:
  135. t.Skip("skipping test helper")
  136. }
  137. // For stdout/stderr, we should have crashed with a broken pipe error.
  138. // The caller will be looking for that exit status,
  139. // so just exit normally here to cause a failure in the caller.
  140. // For descriptor 3, a normal exit is expected.
  141. os.Exit(0)
  142. }
  143. func testClosedPipeRace(t *testing.T, read bool) {
  144. limit := 1
  145. if !read {
  146. // Get the amount we have to write to overload a pipe
  147. // with no reader.
  148. limit = 131073
  149. if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
  150. if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
  151. limit = i + 1
  152. }
  153. }
  154. t.Logf("using pipe write limit of %d", limit)
  155. }
  156. r, w, err := os.Pipe()
  157. if err != nil {
  158. t.Fatal(err)
  159. }
  160. defer r.Close()
  161. defer w.Close()
  162. // Close the read end of the pipe in a goroutine while we are
  163. // writing to the write end, or vice-versa.
  164. go func() {
  165. // Give the main goroutine a chance to enter the Read or
  166. // Write call. This is sloppy but the test will pass even
  167. // if we close before the read/write.
  168. time.Sleep(20 * time.Millisecond)
  169. var err error
  170. if read {
  171. err = r.Close()
  172. } else {
  173. err = w.Close()
  174. }
  175. if err != nil {
  176. t.Error(err)
  177. }
  178. }()
  179. b := make([]byte, limit)
  180. if read {
  181. _, err = r.Read(b[:])
  182. } else {
  183. _, err = w.Write(b[:])
  184. }
  185. if err == nil {
  186. t.Error("I/O on closed pipe unexpectedly succeeded")
  187. } else if pe, ok := err.(*fs.PathError); !ok {
  188. t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
  189. } else if pe.Err != fs.ErrClosed {
  190. t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
  191. } else {
  192. t.Logf("I/O returned expected error %q", err)
  193. }
  194. }
  195. func TestClosedPipeRaceRead(t *testing.T) {
  196. testClosedPipeRace(t, true)
  197. }
  198. func TestClosedPipeRaceWrite(t *testing.T) {
  199. testClosedPipeRace(t, false)
  200. }
  201. // Issue 20915: Reading on nonblocking fd should not return "waiting
  202. // for unsupported file type." Currently it returns EAGAIN; it is
  203. // possible that in the future it will simply wait for data.
  204. func TestReadNonblockingFd(t *testing.T) {
  205. switch runtime.GOOS {
  206. case "windows":
  207. t.Skip("Windows doesn't support SetNonblock")
  208. }
  209. if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
  210. fd := syscallDescriptor(os.Stdin.Fd())
  211. syscall.SetNonblock(fd, true)
  212. defer syscall.SetNonblock(fd, false)
  213. _, err := os.Stdin.Read(make([]byte, 1))
  214. if err != nil {
  215. if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
  216. t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
  217. }
  218. }
  219. os.Exit(0)
  220. }
  221. testenv.MustHaveExec(t)
  222. r, w, err := os.Pipe()
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. defer r.Close()
  227. defer w.Close()
  228. cmd := osexec.Command(os.Args[0], "-test.run="+t.Name())
  229. cmd.Env = append(os.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
  230. cmd.Stdin = r
  231. output, err := cmd.CombinedOutput()
  232. t.Logf("%s", output)
  233. if err != nil {
  234. t.Errorf("child process failed: %v", err)
  235. }
  236. }
  237. func TestCloseWithBlockingReadByNewFile(t *testing.T) {
  238. var p [2]syscallDescriptor
  239. err := syscall.Pipe(p[:])
  240. if err != nil {
  241. t.Fatal(err)
  242. }
  243. // os.NewFile returns a blocking mode file.
  244. testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
  245. }
  246. func TestCloseWithBlockingReadByFd(t *testing.T) {
  247. r, w, err := os.Pipe()
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. // Calling Fd will put the file into blocking mode.
  252. _ = r.Fd()
  253. testCloseWithBlockingRead(t, r, w)
  254. }
  255. // Test that we don't let a blocking read prevent a close.
  256. func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
  257. defer r.Close()
  258. defer w.Close()
  259. c1, c2 := make(chan bool), make(chan bool)
  260. var wg sync.WaitGroup
  261. wg.Add(1)
  262. go func(c chan bool) {
  263. defer wg.Done()
  264. // Give the other goroutine a chance to enter the Read
  265. // or Write call. This is sloppy but the test will
  266. // pass even if we close before the read/write.
  267. time.Sleep(20 * time.Millisecond)
  268. if err := r.Close(); err != nil {
  269. t.Error(err)
  270. }
  271. close(c)
  272. }(c1)
  273. wg.Add(1)
  274. go func(c chan bool) {
  275. defer wg.Done()
  276. var b [1]byte
  277. _, err := r.Read(b[:])
  278. close(c)
  279. if err == nil {
  280. t.Error("I/O on closed pipe unexpectedly succeeded")
  281. }
  282. if pe, ok := err.(*fs.PathError); ok {
  283. err = pe.Err
  284. }
  285. if err != io.EOF && err != fs.ErrClosed {
  286. t.Errorf("got %v, expected EOF or closed", err)
  287. }
  288. }(c2)
  289. for c1 != nil || c2 != nil {
  290. select {
  291. case <-c1:
  292. c1 = nil
  293. // r.Close has completed, but the blocking Read
  294. // is hanging. Close the writer to unblock it.
  295. w.Close()
  296. case <-c2:
  297. c2 = nil
  298. case <-time.After(1 * time.Second):
  299. switch {
  300. case c1 != nil && c2 != nil:
  301. t.Error("timed out waiting for Read and Close")
  302. w.Close()
  303. case c1 != nil:
  304. t.Error("timed out waiting for Close")
  305. case c2 != nil:
  306. t.Error("timed out waiting for Read")
  307. default:
  308. t.Error("impossible case")
  309. }
  310. }
  311. }
  312. wg.Wait()
  313. }
  314. // Issue 24164, for pipes.
  315. func TestPipeEOF(t *testing.T) {
  316. r, w, err := os.Pipe()
  317. if err != nil {
  318. t.Fatal(err)
  319. }
  320. var wg sync.WaitGroup
  321. wg.Add(1)
  322. go func() {
  323. defer wg.Done()
  324. defer func() {
  325. if err := w.Close(); err != nil {
  326. t.Errorf("error closing writer: %v", err)
  327. }
  328. }()
  329. for i := 0; i < 3; i++ {
  330. time.Sleep(10 * time.Millisecond)
  331. _, err := fmt.Fprintf(w, "line %d\n", i)
  332. if err != nil {
  333. t.Errorf("error writing to fifo: %v", err)
  334. return
  335. }
  336. }
  337. time.Sleep(10 * time.Millisecond)
  338. }()
  339. defer wg.Wait()
  340. done := make(chan bool)
  341. go func() {
  342. defer close(done)
  343. defer func() {
  344. if err := r.Close(); err != nil {
  345. t.Errorf("error closing reader: %v", err)
  346. }
  347. }()
  348. rbuf := bufio.NewReader(r)
  349. for {
  350. b, err := rbuf.ReadBytes('\n')
  351. if err == io.EOF {
  352. break
  353. }
  354. if err != nil {
  355. t.Error(err)
  356. return
  357. }
  358. t.Logf("%s\n", bytes.TrimSpace(b))
  359. }
  360. }()
  361. select {
  362. case <-done:
  363. // Test succeeded.
  364. case <-time.After(time.Second):
  365. t.Error("timed out waiting for read")
  366. // Close the reader to force the read to complete.
  367. r.Close()
  368. }
  369. }
  370. // Issue 24481.
  371. func TestFdRace(t *testing.T) {
  372. r, w, err := os.Pipe()
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. defer r.Close()
  377. defer w.Close()
  378. var wg sync.WaitGroup
  379. call := func() {
  380. defer wg.Done()
  381. w.Fd()
  382. }
  383. const tries = 100
  384. for i := 0; i < tries; i++ {
  385. wg.Add(1)
  386. go call()
  387. }
  388. wg.Wait()
  389. }
  390. func TestFdReadRace(t *testing.T) {
  391. t.Parallel()
  392. r, w, err := os.Pipe()
  393. if err != nil {
  394. t.Fatal(err)
  395. }
  396. defer r.Close()
  397. defer w.Close()
  398. const count = 10
  399. c := make(chan bool, 1)
  400. var wg sync.WaitGroup
  401. wg.Add(1)
  402. go func() {
  403. defer wg.Done()
  404. var buf [count]byte
  405. r.SetReadDeadline(time.Now().Add(time.Minute))
  406. c <- true
  407. if _, err := r.Read(buf[:]); os.IsTimeout(err) {
  408. t.Error("read timed out")
  409. }
  410. }()
  411. wg.Add(1)
  412. go func() {
  413. defer wg.Done()
  414. <-c
  415. // Give the other goroutine a chance to enter the Read.
  416. // It doesn't matter if this occasionally fails, the test
  417. // will still pass, it just won't test anything.
  418. time.Sleep(10 * time.Millisecond)
  419. r.Fd()
  420. // The bug was that Fd would hang until Read timed out.
  421. // If the bug is fixed, then writing to w and closing r here
  422. // will cause the Read to exit before the timeout expires.
  423. w.Write(make([]byte, count))
  424. r.Close()
  425. }()
  426. wg.Wait()
  427. }