(* Non-blocking I/O and select does not (fully) work under Windows. The libray therefore does not use them under Windows, and will therefore have the following limitations: - No read will be performed while there are some threads ready to run or waiting to write; - When a read is pending, everything else will be blocked: [sleep] will not terminate and other reads will not be performed before this read terminates; - A write on a socket or a pipe can block the execution of the program if the data are never consumed at the other end of the connection. In particular, if both ends use this library and write at the same time, this could result in a dead-lock. - [connect] is blocking *) let windows_hack = Sys.os_type <> "Unix" let recent_ocaml = Scanf.sscanf Sys.ocaml_version "%d.%d" (fun maj min -> (maj = 3 && min >= 11) || maj > 3) module SleepQueue = Pqueue.Make (struct type t = float * int * unit Lwt.t let compare (t, i, _) (t', i', _) = let c = compare t t' in if c = 0 then i - i' else c end) let sleep_queue = ref SleepQueue.empty let event_counter = ref 0 let sleep d = let res = Lwt.wait () in incr event_counter; let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in sleep_queue := SleepQueue.add (t, !event_counter, res) !sleep_queue; res let yield () = sleep 0. let get_time t = if !t = -1. then t := Unix.gettimeofday (); !t let in_the_past now t = t = 0. || t <= get_time now let rec restart_threads imax now = match try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None with Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> sleep_queue := SleepQueue.remove_min !sleep_queue; Lwt.wakeup thr (); restart_threads imax now | _ -> () type file_descr = Unix.file_descr let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd let inputs = ref [] let outputs = ref [] let wait_children = ref [] let child_exited = ref false let _ = if not windows_hack then ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true))) let bad_fd fd = try ignore (Unix.LargeFile.fstat fd); false with Unix.Unix_error (_, _, _) -> true let wrap_syscall queue fd cont syscall = let res = try Some (syscall ()) with Exit | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> (* EINTR because we are catching SIG_CHLD hence the system call might be interrupted to handle the signal; this lets us restart the system call eventually. *) None | e -> queue := List.remove_assoc fd !queue; Lwt.wakeup_exn cont e; None in match res with Some v -> queue := List.remove_assoc fd !queue; Lwt.wakeup cont v | None -> () let rec run thread = match Lwt.poll thread with Some v -> v | None -> let next_event = try let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time with Not_found -> None in let now = ref (-1.) in let delay = match next_event with None -> -1. | Some 0. -> 0. | Some time -> max 0. (time -. get_time now) in let infds = List.map fst !inputs in let outfds = List.map fst !outputs in let (readers, writers, _) = if windows_hack && not recent_ocaml then let writers = outfds in let readers = if delay = 0. || writers <> [] then [] else infds in (readers, writers, []) else if infds = [] && outfds = [] && delay = 0. then ([], [], []) else try let (readers, writers, _) as res = Unix.select infds outfds [] delay in if delay > 0. && !now <> -1. && readers = [] && writers = [] then now := !now +. delay; res with Unix.Unix_error (Unix.EINTR, _, _) -> ([], [], []) | Unix.Unix_error (Unix.EBADF, _, _) -> (List.filter bad_fd infds, List.filter bad_fd outfds, []) | Unix.Unix_error (Unix.EPIPE, _, _) when windows_hack && recent_ocaml -> (* Workaround for a bug in Ocaml 3.11: select fails with an EPIPE error when the file descriptor is remotely closed *) (infds, [], []) in restart_threads !event_counter now; List.iter (fun fd -> try match List.assoc fd !inputs with `Read (buf, pos, len, res) -> wrap_syscall inputs fd res (fun () -> Unix.read fd buf pos len) | `Accept res -> wrap_syscall inputs fd res (fun () -> let (s, _) as v = Unix.accept fd in if not windows_hack then Unix.set_nonblock s; v) | `Wait res -> wrap_syscall inputs fd res (fun () -> ()) with Not_found -> ()) readers; List.iter (fun fd -> try match List.assoc fd !outputs with `Write (buf, pos, len, res) -> wrap_syscall outputs fd res (fun () -> Unix.write fd buf pos len) | `CheckSocket res -> wrap_syscall outputs fd res (fun () -> try ignore (Unix.getpeername fd) with Unix.Unix_error (Unix.ENOTCONN, _, _) -> ignore (Unix.read fd " " 0 1)) | `Wait res -> wrap_syscall inputs fd res (fun () -> ()) with Not_found -> ()) writers; if !child_exited then begin child_exited := false; List.iter (fun (id, (res, flags, pid)) -> wrap_syscall wait_children id res (fun () -> let (pid', _) as v = Unix.waitpid flags pid in if pid' = 0 then raise Exit; v)) !wait_children end; run thread (****) let wait_read ch = let res = Lwt.wait () in inputs := (ch, `Wait res) :: !inputs; res let wait_write ch = let res = Lwt.wait () in outputs := (ch, `Wait res) :: !outputs; res let read ch buf pos len = try if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); Lwt.return (Unix.read ch buf pos len) with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> let res = Lwt.wait () in inputs := (ch, `Read (buf, pos, len, res)) :: !inputs; res | e -> Lwt.fail e let write ch buf pos len = try if windows_hack && recent_ocaml then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); Lwt.return (Unix.write ch buf pos len) with Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> let res = Lwt.wait () in outputs := (ch, `Write (buf, pos, len, res)) :: !outputs; res | e -> Lwt.fail e (* let pipe () = let (in_fd, out_fd) as fd_pair = Unix.pipe() in if not windows_hack then begin Unix.set_nonblock in_fd; Unix.set_nonblock out_fd end; fd_pair *) let pipe_in () = let (in_fd, out_fd) as fd_pair = Unix.pipe() in if not windows_hack then Unix.set_nonblock in_fd; fd_pair let pipe_out () = let (in_fd, out_fd) as fd_pair = Unix.pipe() in if not windows_hack then Unix.set_nonblock out_fd; fd_pair let socket dom typ proto = let s = Unix.socket dom typ proto in if not windows_hack then Unix.set_nonblock s; s let socketpair dom typ proto = let (s1, s2) as spair = Unix.socketpair dom typ proto in if not windows_hack then begin Unix.set_nonblock s1; Unix.set_nonblock s2 end; Lwt.return spair let bind = Unix.bind let setsockopt = Unix.setsockopt let listen = Unix.listen let close = Unix.close let set_close_on_exec = Unix.set_close_on_exec let accept ch = let res = Lwt.wait () in inputs := (ch, `Accept res) :: !inputs; res let check_socket ch = let res = Lwt.wait () in outputs := (ch, `CheckSocket res) :: !outputs; res let connect s addr = try Unix.connect s addr; Lwt.return () with Unix.Unix_error ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> check_socket s | e -> Lwt.fail e let ids = ref 0 let new_id () = incr ids; !ids let _waitpid flags pid = try Lwt.return (Unix.waitpid flags pid) with e -> Lwt.fail e let waitpid flags pid = if List.mem Unix.WNOHANG flags || windows_hack then _waitpid flags pid else let flags = Unix.WNOHANG :: flags in Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) -> if pid' <> 0 then Lwt.return res else let res = Lwt.wait () in wait_children := (new_id (), (res, flags, pid)) :: !wait_children; res) let wait () = waitpid [] (-1) let system cmd = match Unix.fork () with 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status) (****) type lwt_in_channel = in_channel type lwt_out_channel = out_channel let intern_in_channel ch = Unix.set_nonblock (Unix.descr_of_in_channel ch); ch let intern_out_channel ch = Unix.set_nonblock (Unix.descr_of_out_channel ch); ch let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic) let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc) let rec input_char ic = try Lwt.return (Pervasives.input_char ic) with Sys_blocked_io -> Lwt.bind (wait_inchan ic) (fun () -> input_char ic) | e -> Lwt.fail e let rec input ic s ofs len = try Lwt.return (Pervasives.input ic s ofs len) with Sys_blocked_io -> Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len) | e -> Lwt.fail e let rec unsafe_really_input ic s ofs len = if len <= 0 then Lwt.return () else begin Lwt.bind (input ic s ofs len) (fun r -> if r = 0 then Lwt.fail End_of_file else unsafe_really_input ic s (ofs+r) (len-r)) end let really_input ic s ofs len = if ofs < 0 || len < 0 || ofs > String.length s - len then Lwt.fail (Invalid_argument "really_input") else unsafe_really_input ic s ofs len let input_line ic = let buf = ref (Bytes.create 128) in let pos = ref 0 in let rec loop () = if !pos = String.length !buf then begin let newbuf = Bytes.create (2 * !pos) in String.blit !buf 0 newbuf 0 !pos; buf := newbuf end; Lwt.bind (input_char ic) (fun c -> if c = '\n' then Lwt.return () else begin !buf.[!pos] <- c; incr pos; loop () end) in Lwt.bind (Lwt.catch loop (fun e -> match e with End_of_file when !pos <> 0 -> Lwt.return () | _ -> Lwt.fail e)) (fun () -> let res = Bytes.create !pos in String.blit !buf 0 res 0 !pos; Lwt.return res) (****) type popen_process = Process of in_channel * out_channel | Process_in of in_channel | Process_out of out_channel | Process_full of in_channel * out_channel * in_channel let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t) let open_proc cmd proc input output toclose = match Unix.fork () with 0 -> if input <> Unix.stdin then begin Unix.dup2 input Unix.stdin; Unix.close input end; if output <> Unix.stdout then begin Unix.dup2 output Unix.stdout; Unix.close output end; List.iter Unix.close toclose; Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] | id -> Hashtbl.add popen_processes proc id let open_process_in cmd = let (in_read, in_write) = pipe_in () in let inchan = Unix.in_channel_of_descr in_read in open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read]; Unix.close in_write; Lwt.return inchan let open_process_out cmd = let (out_read, out_write) = pipe_out () in let outchan = Unix.out_channel_of_descr out_write in open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write]; Unix.close out_read; Lwt.return outchan let open_process cmd = let (in_read, in_write) = pipe_in () in let (out_read, out_write) = pipe_out () in let inchan = Unix.in_channel_of_descr in_read in let outchan = Unix.out_channel_of_descr out_write in open_proc cmd (Process(inchan, outchan)) out_read in_write [in_read; out_write]; Unix.close out_read; Unix.close in_write; Lwt.return (inchan, outchan) (* FIX: Subprocesses that use /dev/tty to print things on the terminal will NOT have this output captured and returned to the caller of this function. There's an argument that this is correct, but if we are running from a GUI the user may not be looking at any terminal and it will appear that the process is just hanging. This can be fixed, in principle, by writing a little C code that opens /dev/tty and then uses the TIOCNOTTY ioctl control to detach the terminal. *) let open_proc_full cmd env proc input output error toclose = match Unix.fork () with 0 -> Unix.dup2 input Unix.stdin; Unix.close input; Unix.dup2 output Unix.stdout; Unix.close output; Unix.dup2 error Unix.stderr; Unix.close error; List.iter Unix.close toclose; Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env | id -> Hashtbl.add popen_processes proc id let open_process_full cmd env = let (in_read, in_write) = pipe_in () in let (out_read, out_write) = pipe_out () in let (err_read, err_write) = pipe_in () in let inchan = Unix.in_channel_of_descr in_read in let outchan = Unix.out_channel_of_descr out_write in let errchan = Unix.in_channel_of_descr err_read in open_proc_full cmd env (Process_full(inchan, outchan, errchan)) out_read in_write err_write [in_write; out_read; err_read]; Unix.close out_read; Unix.close in_write; Unix.close err_write; Lwt.return (inchan, outchan, errchan) let find_proc_id fun_name proc = try let pid = Hashtbl.find popen_processes proc in Hashtbl.remove popen_processes proc; pid with Not_found -> raise (Unix.Unix_error (Unix.EBADF, fun_name, "")) let close_process_in inchan = let pid = find_proc_id "close_process_in" (Process_in inchan) in close_in inchan; Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) let close_process_out outchan = let pid = find_proc_id "close_process_out" (Process_out outchan) in close_out outchan; Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) let close_process (inchan, outchan) = let pid = find_proc_id "close_process" (Process(inchan, outchan)) in close_in inchan; close_out outchan; Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) let close_process_full (outchan, inchan, errchan) = let pid = find_proc_id "close_process_full" (Process_full(outchan, inchan, errchan)) in close_out inchan; close_in outchan; close_in errchan; Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)