summaryrefslogtreecommitdiffstats
path: root/src/lwt/lwt_unix.ml
diff options
context:
space:
mode:
Diffstat (limited to 'src/lwt/lwt_unix.ml')
-rw-r--r--src/lwt/lwt_unix.ml509
1 files changed, 1 insertions, 508 deletions
diff --git a/src/lwt/lwt_unix.ml b/src/lwt/lwt_unix.ml
index 111def4..356d25b 100644
--- a/src/lwt/lwt_unix.ml
+++ b/src/lwt/lwt_unix.ml
@@ -1,508 +1 @@
-(*
-Non-blocking I/O and select does not (fully) work under Windows.
-The libray therefore does not use them under Windows, and will
-therefore have the following limitations:
-- No read will be performed while there are some threads ready to run
- or waiting to write;
-- When a read is pending, everything else will be blocked: [sleep]
- will not terminate and other reads will not be performed before
- this read terminates;
-- A write on a socket or a pipe can block the execution of the program
- if the data are never consumed at the other end of the connection.
- In particular, if both ends use this library and write at the same
- time, this could result in a dead-lock.
-- [connect] is blocking
-*)
-let windows_hack = Sys.os_type <> "Unix"
-let recent_ocaml =
- Scanf.sscanf Sys.ocaml_version "%d.%d"
- (fun maj min -> (maj = 3 && min >= 11) || maj > 3)
-
-module SleepQueue =
- Pqueue.Make (struct
- type t = float * int * unit Lwt.t
- let compare (t, i, _) (t', i', _) =
- let c = compare t t' in
- if c = 0 then i - i' else c
- end)
-let sleep_queue = ref SleepQueue.empty
-
-let event_counter = ref 0
-
-let sleep d =
- let res = Lwt.wait () in
- incr event_counter;
- let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
- sleep_queue :=
- SleepQueue.add (t, !event_counter, res) !sleep_queue;
- res
-
-let yield () = sleep 0.
-
-let get_time t =
- if !t = -1. then t := Unix.gettimeofday ();
- !t
-
-let in_the_past now t =
- t = 0. || t <= get_time now
-
-let rec restart_threads imax now =
- match
- try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
- with
- Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
- sleep_queue := SleepQueue.remove_min !sleep_queue;
- Lwt.wakeup thr ();
- restart_threads imax now
- | _ ->
- ()
-
-type file_descr = Unix.file_descr
-
-let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd
-
-let inputs = ref []
-let outputs = ref []
-let wait_children = ref []
-
-let child_exited = ref false
-let _ =
- if not windows_hack then
- ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true)))
-
-let bad_fd fd =
- try ignore (Unix.LargeFile.fstat fd); false with
- Unix.Unix_error (_, _, _) ->
- true
-
-let wrap_syscall queue fd cont syscall =
- let res =
- try
- Some (syscall ())
- with
- Exit
- | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
- (* EINTR because we are catching SIG_CHLD hence the system call
- might be interrupted to handle the signal; this lets us restart
- the system call eventually. *)
- None
- | e ->
- queue := List.remove_assoc fd !queue;
- Lwt.wakeup_exn cont e;
- None
- in
- match res with
- Some v ->
- queue := List.remove_assoc fd !queue;
- Lwt.wakeup cont v
- | None ->
- ()
-
-let rec run thread =
- match Lwt.poll thread with
- Some v ->
- v
- | None ->
- let next_event =
- try
- let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
- with Not_found ->
- None
- in
- let now = ref (-1.) in
- let delay =
- match next_event with
- None -> -1.
- | Some 0. -> 0.
- | Some time -> max 0. (time -. get_time now)
- in
- let infds = List.map fst !inputs in
- let outfds = List.map fst !outputs in
- let (readers, writers, _) =
- if windows_hack && not recent_ocaml then
- let writers = outfds in
- let readers =
- if delay = 0. || writers <> [] then [] else infds in
- (readers, writers, [])
- else if infds = [] && outfds = [] && delay = 0. then
- ([], [], [])
- else
- try
- let res = Unix.select infds outfds [] delay in
- if delay > 0. && !now <> -1. then now := !now +. delay;
- res
- with
- Unix.Unix_error (Unix.EINTR, _, _) ->
- ([], [], [])
- | Unix.Unix_error (Unix.EBADF, _, _) ->
- (List.filter bad_fd infds, List.filter bad_fd outfds, [])
- | Unix.Unix_error (Unix.EPIPE, _, _)
- when windows_hack && recent_ocaml ->
- (* Workaround for a bug in Ocaml 3.11: select fails with an
- EPIPE error when the file descriptor is remotely closed *)
- (infds, [], [])
- in
- restart_threads !event_counter now;
- List.iter
- (fun fd ->
- try
- match List.assoc fd !inputs with
- `Read (buf, pos, len, res) ->
- wrap_syscall inputs fd res
- (fun () -> Unix.read fd buf pos len)
- | `Accept res ->
- wrap_syscall inputs fd res
- (fun () ->
- let (s, _) as v = Unix.accept fd in
- if not windows_hack then Unix.set_nonblock s;
- v)
- | `Wait res ->
- wrap_syscall inputs fd res (fun () -> ())
- with Not_found ->
- ())
- readers;
- List.iter
- (fun fd ->
- try
- match List.assoc fd !outputs with
- `Write (buf, pos, len, res) ->
- wrap_syscall outputs fd res
- (fun () -> Unix.write fd buf pos len)
- | `CheckSocket res ->
- wrap_syscall outputs fd res
- (fun () ->
- try ignore (Unix.getpeername fd) with
- Unix.Unix_error (Unix.ENOTCONN, _, _) ->
- ignore (Unix.read fd " " 0 1))
- | `Wait res ->
- wrap_syscall inputs fd res (fun () -> ())
- with Not_found ->
- ())
- writers;
- if !child_exited then begin
- child_exited := false;
- List.iter
- (fun (id, (res, flags, pid)) ->
- wrap_syscall wait_children id res
- (fun () ->
- let (pid', _) as v = Unix.waitpid flags pid in
- if pid' = 0 then raise Exit;
- v))
- !wait_children
- end;
- run thread
-
-(****)
-
-let wait_read ch =
- let res = Lwt.wait () in
- inputs := (ch, `Wait res) :: !inputs;
- res
-
-let wait_write ch =
- let res = Lwt.wait () in
- outputs := (ch, `Wait res) :: !outputs;
- res
-
-let read ch buf pos len =
- try
- if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
- Lwt.return (Unix.read ch buf pos len)
- with
- Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
- let res = Lwt.wait () in
- inputs := (ch, `Read (buf, pos, len, res)) :: !inputs;
- res
- | e ->
- Lwt.fail e
-
-let write ch buf pos len =
- try
- if windows_hack && recent_ocaml then
- raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
- Lwt.return (Unix.write ch buf pos len)
- with
- Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
- let res = Lwt.wait () in
- outputs := (ch, `Write (buf, pos, len, res)) :: !outputs;
- res
- | e ->
- Lwt.fail e
-
-(*
-let pipe () =
- let (in_fd, out_fd) as fd_pair = Unix.pipe() in
- if not windows_hack then begin
- Unix.set_nonblock in_fd;
- Unix.set_nonblock out_fd
- end;
- fd_pair
-*)
-
-let pipe_in () =
- let (in_fd, out_fd) as fd_pair = Unix.pipe() in
- if not windows_hack then
- Unix.set_nonblock in_fd;
- fd_pair
-
-let pipe_out () =
- let (in_fd, out_fd) as fd_pair = Unix.pipe() in
- if not windows_hack then
- Unix.set_nonblock out_fd;
- fd_pair
-
-let socket dom typ proto =
- let s = Unix.socket dom typ proto in
- if not windows_hack then Unix.set_nonblock s;
- s
-
-let socketpair dom typ proto =
- let (s1, s2) as spair = Unix.socketpair dom typ proto in
- if not windows_hack then begin
- Unix.set_nonblock s1; Unix.set_nonblock s2
- end;
- Lwt.return spair
-
-let bind = Unix.bind
-let setsockopt = Unix.setsockopt
-let listen = Unix.listen
-let close = Unix.close
-let set_close_on_exec = Unix.set_close_on_exec
-
-let accept ch =
- let res = Lwt.wait () in
- inputs := (ch, `Accept res) :: !inputs;
- res
-
-let check_socket ch =
- let res = Lwt.wait () in
- outputs := (ch, `CheckSocket res) :: !outputs;
- res
-
-let connect s addr =
- try
- Unix.connect s addr;
- Lwt.return ()
- with
- Unix.Unix_error
- ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
- check_socket s
- | e ->
- Lwt.fail e
-
-let ids = ref 0
-let new_id () = incr ids; !ids
-
-let _waitpid flags pid =
- try
- Lwt.return (Unix.waitpid flags pid)
- with e ->
- Lwt.fail e
-
-let waitpid flags pid =
- if List.mem Unix.WNOHANG flags || windows_hack then
- _waitpid flags pid
- else
- let flags = Unix.WNOHANG :: flags in
- Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) ->
- if pid' <> 0 then
- Lwt.return res
- else
- let res = Lwt.wait () in
- wait_children := (new_id (), (res, flags, pid)) :: !wait_children;
- res)
-
-let wait () = waitpid [] (-1)
-
-let system cmd =
- match Unix.fork () with
- 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
- | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status)
-
-(****)
-
-type lwt_in_channel = in_channel
-type lwt_out_channel = out_channel
-
-let intern_in_channel ch =
- Unix.set_nonblock (Unix.descr_of_in_channel ch); ch
-let intern_out_channel ch =
- Unix.set_nonblock (Unix.descr_of_out_channel ch); ch
-
-
-let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic)
-let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc)
-
-let rec input_char ic =
- try
- Lwt.return (Pervasives.input_char ic)
- with
- Sys_blocked_io ->
- Lwt.bind (wait_inchan ic) (fun () -> input_char ic)
- | e ->
- Lwt.fail e
-
-let rec input ic s ofs len =
- try
- Lwt.return (Pervasives.input ic s ofs len)
- with
- Sys_blocked_io ->
- Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len)
- | e ->
- Lwt.fail e
-
-let rec unsafe_really_input ic s ofs len =
- if len <= 0 then
- Lwt.return ()
- else begin
- Lwt.bind (input ic s ofs len) (fun r ->
- if r = 0
- then Lwt.fail End_of_file
- else unsafe_really_input ic s (ofs+r) (len-r))
- end
-
-let really_input ic s ofs len =
- if ofs < 0 || len < 0 || ofs > String.length s - len
- then Lwt.fail (Invalid_argument "really_input")
- else unsafe_really_input ic s ofs len
-
-let input_line ic =
- let buf = ref (String.create 128) in
- let pos = ref 0 in
- let rec loop () =
- if !pos = String.length !buf then begin
- let newbuf = String.create (2 * !pos) in
- String.blit !buf 0 newbuf 0 !pos;
- buf := newbuf
- end;
- Lwt.bind (input_char ic) (fun c ->
- if c = '\n' then
- Lwt.return ()
- else begin
- !buf.[!pos] <- c;
- incr pos;
- loop ()
- end)
- in
- Lwt.bind
- (Lwt.catch loop
- (fun e ->
- match e with
- End_of_file when !pos <> 0 ->
- Lwt.return ()
- | _ ->
- Lwt.fail e))
- (fun () ->
- let res = String.create !pos in
- String.blit !buf 0 res 0 !pos;
- Lwt.return res)
-
-(****)
-
-type popen_process =
- Process of in_channel * out_channel
- | Process_in of in_channel
- | Process_out of out_channel
- | Process_full of in_channel * out_channel * in_channel
-
-let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t)
-
-let open_proc cmd proc input output toclose =
- match Unix.fork () with
- 0 -> if input <> Unix.stdin then begin
- Unix.dup2 input Unix.stdin;
- Unix.close input
- end;
- if output <> Unix.stdout then begin
- Unix.dup2 output Unix.stdout;
- Unix.close output
- end;
- List.iter Unix.close toclose;
- Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
- | id -> Hashtbl.add popen_processes proc id
-
-let open_process_in cmd =
- let (in_read, in_write) = pipe_in () in
- let inchan = Unix.in_channel_of_descr in_read in
- open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read];
- Unix.close in_write;
- Lwt.return inchan
-
-let open_process_out cmd =
- let (out_read, out_write) = pipe_out () in
- let outchan = Unix.out_channel_of_descr out_write in
- open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write];
- Unix.close out_read;
- Lwt.return outchan
-
-let open_process cmd =
- let (in_read, in_write) = pipe_in () in
- let (out_read, out_write) = pipe_out () in
- let inchan = Unix.in_channel_of_descr in_read in
- let outchan = Unix.out_channel_of_descr out_write in
- open_proc cmd (Process(inchan, outchan)) out_read in_write
- [in_read; out_write];
- Unix.close out_read;
- Unix.close in_write;
- Lwt.return (inchan, outchan)
-
-(* FIX: Subprocesses that use /dev/tty to print things on the terminal
- will NOT have this output captured and returned to the caller of this
- function. There's an argument that this is correct, but if we are
- running from a GUI the user may not be looking at any terminal and it
- will appear that the process is just hanging. This can be fixed, in
- principle, by writing a little C code that opens /dev/tty and then uses
- the TIOCNOTTY ioctl control to detach the terminal. *)
-
-let open_proc_full cmd env proc input output error toclose =
- match Unix.fork () with
- 0 -> Unix.dup2 input Unix.stdin; Unix.close input;
- Unix.dup2 output Unix.stdout; Unix.close output;
- Unix.dup2 error Unix.stderr; Unix.close error;
- List.iter Unix.close toclose;
- Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env
- | id -> Hashtbl.add popen_processes proc id
-
-let open_process_full cmd env =
- let (in_read, in_write) = pipe_in () in
- let (out_read, out_write) = pipe_out () in
- let (err_read, err_write) = pipe_in () in
- let inchan = Unix.in_channel_of_descr in_read in
- let outchan = Unix.out_channel_of_descr out_write in
- let errchan = Unix.in_channel_of_descr err_read in
- open_proc_full cmd env (Process_full(inchan, outchan, errchan))
- out_read in_write err_write [in_write; out_read; err_read];
- Unix.close out_read;
- Unix.close in_write;
- Unix.close err_write;
- Lwt.return (inchan, outchan, errchan)
-
-let find_proc_id fun_name proc =
- try
- let pid = Hashtbl.find popen_processes proc in
- Hashtbl.remove popen_processes proc;
- pid
- with Not_found ->
- raise (Unix.Unix_error (Unix.EBADF, fun_name, ""))
-
-let close_process_in inchan =
- let pid = find_proc_id "close_process_in" (Process_in inchan) in
- close_in inchan;
- Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
-
-let close_process_out outchan =
- let pid = find_proc_id "close_process_out" (Process_out outchan) in
- close_out outchan;
- Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
-
-let close_process (inchan, outchan) =
- let pid = find_proc_id "close_process" (Process(inchan, outchan)) in
- close_in inchan; close_out outchan;
- Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
-
-let close_process_full (outchan, inchan, errchan) =
- let pid =
- find_proc_id "close_process_full"
- (Process_full(outchan, inchan, errchan)) in
- close_out inchan; close_in outchan; close_in errchan;
- Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+include Lwt_unix_impl