diff options
Diffstat (limited to 'src/lwt/generic/lwt_unix_impl.ml')
-rw-r--r-- | src/lwt/generic/lwt_unix_impl.ml | 508 |
1 files changed, 508 insertions, 0 deletions
diff --git a/src/lwt/generic/lwt_unix_impl.ml b/src/lwt/generic/lwt_unix_impl.ml new file mode 100644 index 0000000..111def4 --- /dev/null +++ b/src/lwt/generic/lwt_unix_impl.ml @@ -0,0 +1,508 @@ +(* +Non-blocking I/O and select does not (fully) work under Windows. +The libray therefore does not use them under Windows, and will +therefore have the following limitations: +- No read will be performed while there are some threads ready to run + or waiting to write; +- When a read is pending, everything else will be blocked: [sleep] + will not terminate and other reads will not be performed before + this read terminates; +- A write on a socket or a pipe can block the execution of the program + if the data are never consumed at the other end of the connection. + In particular, if both ends use this library and write at the same + time, this could result in a dead-lock. +- [connect] is blocking +*) +let windows_hack = Sys.os_type <> "Unix" +let recent_ocaml = + Scanf.sscanf Sys.ocaml_version "%d.%d" + (fun maj min -> (maj = 3 && min >= 11) || maj > 3) + +module SleepQueue = + Pqueue.Make (struct + type t = float * int * unit Lwt.t + let compare (t, i, _) (t', i', _) = + let c = compare t t' in + if c = 0 then i - i' else c + end) +let sleep_queue = ref SleepQueue.empty + +let event_counter = ref 0 + +let sleep d = + let res = Lwt.wait () in + incr event_counter; + let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in + sleep_queue := + SleepQueue.add (t, !event_counter, res) !sleep_queue; + res + +let yield () = sleep 0. + +let get_time t = + if !t = -1. then t := Unix.gettimeofday (); + !t + +let in_the_past now t = + t = 0. || t <= get_time now + +let rec restart_threads imax now = + match + try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None + with + Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> + sleep_queue := SleepQueue.remove_min !sleep_queue; + Lwt.wakeup thr (); + restart_threads imax now + | _ -> + () + +type file_descr = Unix.file_descr + +let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd + +let inputs = ref [] +let outputs = ref [] +let wait_children = ref [] + +let child_exited = ref false +let _ = + if not windows_hack then + ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true))) + +let bad_fd fd = + try ignore (Unix.LargeFile.fstat fd); false with + Unix.Unix_error (_, _, _) -> + true + +let wrap_syscall queue fd cont syscall = + let res = + try + Some (syscall ()) + with + Exit + | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* EINTR because we are catching SIG_CHLD hence the system call + might be interrupted to handle the signal; this lets us restart + the system call eventually. *) + None + | e -> + queue := List.remove_assoc fd !queue; + Lwt.wakeup_exn cont e; + None + in + match res with + Some v -> + queue := List.remove_assoc fd !queue; + Lwt.wakeup cont v + | None -> + () + +let rec run thread = + match Lwt.poll thread with + Some v -> + v + | None -> + let next_event = + try + let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time + with Not_found -> + None + in + let now = ref (-1.) in + let delay = + match next_event with + None -> -1. + | Some 0. -> 0. + | Some time -> max 0. (time -. get_time now) + in + let infds = List.map fst !inputs in + let outfds = List.map fst !outputs in + let (readers, writers, _) = + if windows_hack && not recent_ocaml then + let writers = outfds in + let readers = + if delay = 0. || writers <> [] then [] else infds in + (readers, writers, []) + else if infds = [] && outfds = [] && delay = 0. then + ([], [], []) + else + try + let res = Unix.select infds outfds [] delay in + if delay > 0. && !now <> -1. then now := !now +. delay; + res + with + Unix.Unix_error (Unix.EINTR, _, _) -> + ([], [], []) + | Unix.Unix_error (Unix.EBADF, _, _) -> + (List.filter bad_fd infds, List.filter bad_fd outfds, []) + | Unix.Unix_error (Unix.EPIPE, _, _) + when windows_hack && recent_ocaml -> + (* Workaround for a bug in Ocaml 3.11: select fails with an + EPIPE error when the file descriptor is remotely closed *) + (infds, [], []) + in + restart_threads !event_counter now; + List.iter + (fun fd -> + try + match List.assoc fd !inputs with + `Read (buf, pos, len, res) -> + wrap_syscall inputs fd res + (fun () -> Unix.read fd buf pos len) + | `Accept res -> + wrap_syscall inputs fd res + (fun () -> + let (s, _) as v = Unix.accept fd in + if not windows_hack then Unix.set_nonblock s; + v) + | `Wait res -> + wrap_syscall inputs fd res (fun () -> ()) + with Not_found -> + ()) + readers; + List.iter + (fun fd -> + try + match List.assoc fd !outputs with + `Write (buf, pos, len, res) -> + wrap_syscall outputs fd res + (fun () -> Unix.write fd buf pos len) + | `CheckSocket res -> + wrap_syscall outputs fd res + (fun () -> + try ignore (Unix.getpeername fd) with + Unix.Unix_error (Unix.ENOTCONN, _, _) -> + ignore (Unix.read fd " " 0 1)) + | `Wait res -> + wrap_syscall inputs fd res (fun () -> ()) + with Not_found -> + ()) + writers; + if !child_exited then begin + child_exited := false; + List.iter + (fun (id, (res, flags, pid)) -> + wrap_syscall wait_children id res + (fun () -> + let (pid', _) as v = Unix.waitpid flags pid in + if pid' = 0 then raise Exit; + v)) + !wait_children + end; + run thread + +(****) + +let wait_read ch = + let res = Lwt.wait () in + inputs := (ch, `Wait res) :: !inputs; + res + +let wait_write ch = + let res = Lwt.wait () in + outputs := (ch, `Wait res) :: !outputs; + res + +let read ch buf pos len = + try + if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); + Lwt.return (Unix.read ch buf pos len) + with + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + let res = Lwt.wait () in + inputs := (ch, `Read (buf, pos, len, res)) :: !inputs; + res + | e -> + Lwt.fail e + +let write ch buf pos len = + try + if windows_hack && recent_ocaml then + raise (Unix.Unix_error (Unix.EAGAIN, "", "")); + Lwt.return (Unix.write ch buf pos len) + with + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + let res = Lwt.wait () in + outputs := (ch, `Write (buf, pos, len, res)) :: !outputs; + res + | e -> + Lwt.fail e + +(* +let pipe () = + let (in_fd, out_fd) as fd_pair = Unix.pipe() in + if not windows_hack then begin + Unix.set_nonblock in_fd; + Unix.set_nonblock out_fd + end; + fd_pair +*) + +let pipe_in () = + let (in_fd, out_fd) as fd_pair = Unix.pipe() in + if not windows_hack then + Unix.set_nonblock in_fd; + fd_pair + +let pipe_out () = + let (in_fd, out_fd) as fd_pair = Unix.pipe() in + if not windows_hack then + Unix.set_nonblock out_fd; + fd_pair + +let socket dom typ proto = + let s = Unix.socket dom typ proto in + if not windows_hack then Unix.set_nonblock s; + s + +let socketpair dom typ proto = + let (s1, s2) as spair = Unix.socketpair dom typ proto in + if not windows_hack then begin + Unix.set_nonblock s1; Unix.set_nonblock s2 + end; + Lwt.return spair + +let bind = Unix.bind +let setsockopt = Unix.setsockopt +let listen = Unix.listen +let close = Unix.close +let set_close_on_exec = Unix.set_close_on_exec + +let accept ch = + let res = Lwt.wait () in + inputs := (ch, `Accept res) :: !inputs; + res + +let check_socket ch = + let res = Lwt.wait () in + outputs := (ch, `CheckSocket res) :: !outputs; + res + +let connect s addr = + try + Unix.connect s addr; + Lwt.return () + with + Unix.Unix_error + ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> + check_socket s + | e -> + Lwt.fail e + +let ids = ref 0 +let new_id () = incr ids; !ids + +let _waitpid flags pid = + try + Lwt.return (Unix.waitpid flags pid) + with e -> + Lwt.fail e + +let waitpid flags pid = + if List.mem Unix.WNOHANG flags || windows_hack then + _waitpid flags pid + else + let flags = Unix.WNOHANG :: flags in + Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) -> + if pid' <> 0 then + Lwt.return res + else + let res = Lwt.wait () in + wait_children := (new_id (), (res, flags, pid)) :: !wait_children; + res) + +let wait () = waitpid [] (-1) + +let system cmd = + match Unix.fork () with + 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] + | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status) + +(****) + +type lwt_in_channel = in_channel +type lwt_out_channel = out_channel + +let intern_in_channel ch = + Unix.set_nonblock (Unix.descr_of_in_channel ch); ch +let intern_out_channel ch = + Unix.set_nonblock (Unix.descr_of_out_channel ch); ch + + +let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic) +let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc) + +let rec input_char ic = + try + Lwt.return (Pervasives.input_char ic) + with + Sys_blocked_io -> + Lwt.bind (wait_inchan ic) (fun () -> input_char ic) + | e -> + Lwt.fail e + +let rec input ic s ofs len = + try + Lwt.return (Pervasives.input ic s ofs len) + with + Sys_blocked_io -> + Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len) + | e -> + Lwt.fail e + +let rec unsafe_really_input ic s ofs len = + if len <= 0 then + Lwt.return () + else begin + Lwt.bind (input ic s ofs len) (fun r -> + if r = 0 + then Lwt.fail End_of_file + else unsafe_really_input ic s (ofs+r) (len-r)) + end + +let really_input ic s ofs len = + if ofs < 0 || len < 0 || ofs > String.length s - len + then Lwt.fail (Invalid_argument "really_input") + else unsafe_really_input ic s ofs len + +let input_line ic = + let buf = ref (String.create 128) in + let pos = ref 0 in + let rec loop () = + if !pos = String.length !buf then begin + let newbuf = String.create (2 * !pos) in + String.blit !buf 0 newbuf 0 !pos; + buf := newbuf + end; + Lwt.bind (input_char ic) (fun c -> + if c = '\n' then + Lwt.return () + else begin + !buf.[!pos] <- c; + incr pos; + loop () + end) + in + Lwt.bind + (Lwt.catch loop + (fun e -> + match e with + End_of_file when !pos <> 0 -> + Lwt.return () + | _ -> + Lwt.fail e)) + (fun () -> + let res = String.create !pos in + String.blit !buf 0 res 0 !pos; + Lwt.return res) + +(****) + +type popen_process = + Process of in_channel * out_channel + | Process_in of in_channel + | Process_out of out_channel + | Process_full of in_channel * out_channel * in_channel + +let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t) + +let open_proc cmd proc input output toclose = + match Unix.fork () with + 0 -> if input <> Unix.stdin then begin + Unix.dup2 input Unix.stdin; + Unix.close input + end; + if output <> Unix.stdout then begin + Unix.dup2 output Unix.stdout; + Unix.close output + end; + List.iter Unix.close toclose; + Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] + | id -> Hashtbl.add popen_processes proc id + +let open_process_in cmd = + let (in_read, in_write) = pipe_in () in + let inchan = Unix.in_channel_of_descr in_read in + open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read]; + Unix.close in_write; + Lwt.return inchan + +let open_process_out cmd = + let (out_read, out_write) = pipe_out () in + let outchan = Unix.out_channel_of_descr out_write in + open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write]; + Unix.close out_read; + Lwt.return outchan + +let open_process cmd = + let (in_read, in_write) = pipe_in () in + let (out_read, out_write) = pipe_out () in + let inchan = Unix.in_channel_of_descr in_read in + let outchan = Unix.out_channel_of_descr out_write in + open_proc cmd (Process(inchan, outchan)) out_read in_write + [in_read; out_write]; + Unix.close out_read; + Unix.close in_write; + Lwt.return (inchan, outchan) + +(* FIX: Subprocesses that use /dev/tty to print things on the terminal + will NOT have this output captured and returned to the caller of this + function. There's an argument that this is correct, but if we are + running from a GUI the user may not be looking at any terminal and it + will appear that the process is just hanging. This can be fixed, in + principle, by writing a little C code that opens /dev/tty and then uses + the TIOCNOTTY ioctl control to detach the terminal. *) + +let open_proc_full cmd env proc input output error toclose = + match Unix.fork () with + 0 -> Unix.dup2 input Unix.stdin; Unix.close input; + Unix.dup2 output Unix.stdout; Unix.close output; + Unix.dup2 error Unix.stderr; Unix.close error; + List.iter Unix.close toclose; + Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env + | id -> Hashtbl.add popen_processes proc id + +let open_process_full cmd env = + let (in_read, in_write) = pipe_in () in + let (out_read, out_write) = pipe_out () in + let (err_read, err_write) = pipe_in () in + let inchan = Unix.in_channel_of_descr in_read in + let outchan = Unix.out_channel_of_descr out_write in + let errchan = Unix.in_channel_of_descr err_read in + open_proc_full cmd env (Process_full(inchan, outchan, errchan)) + out_read in_write err_write [in_write; out_read; err_read]; + Unix.close out_read; + Unix.close in_write; + Unix.close err_write; + Lwt.return (inchan, outchan, errchan) + +let find_proc_id fun_name proc = + try + let pid = Hashtbl.find popen_processes proc in + Hashtbl.remove popen_processes proc; + pid + with Not_found -> + raise (Unix.Unix_error (Unix.EBADF, fun_name, "")) + +let close_process_in inchan = + let pid = find_proc_id "close_process_in" (Process_in inchan) in + close_in inchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process_out outchan = + let pid = find_proc_id "close_process_out" (Process_out outchan) in + close_out outchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process (inchan, outchan) = + let pid = find_proc_id "close_process" (Process(inchan, outchan)) in + close_in inchan; close_out outchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process_full (outchan, inchan, errchan) = + let pid = + find_proc_id "close_process_full" + (Process_full(outchan, inchan, errchan)) in + close_out inchan; close_in outchan; close_in errchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) |