summaryrefslogtreecommitdiffstats
path: root/src/lwt/win/lwt_unix_impl.ml
diff options
context:
space:
mode:
Diffstat (limited to 'src/lwt/win/lwt_unix_impl.ml')
-rw-r--r--src/lwt/win/lwt_unix_impl.ml645
1 files changed, 645 insertions, 0 deletions
diff --git a/src/lwt/win/lwt_unix_impl.ml b/src/lwt/win/lwt_unix_impl.ml
new file mode 100644
index 0000000..08dc891
--- /dev/null
+++ b/src/lwt/win/lwt_unix_impl.ml
@@ -0,0 +1,645 @@
+(*
+- should check all events before looping again for avoiding race
+ conditions...
+ (we have the first, scan the subsequent ones)
+*)
+
+let no_overlapped_io = false
+let d = ref false
+
+(****)
+
+type buffer =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+let buffer_create l = Bigarray.Array1.create Bigarray.char Bigarray.c_layout l
+
+external unsafe_blit_string_to_buffer :
+ string -> int -> buffer -> int -> int -> unit = "ml_blit_string_to_buffer"
+external unsafe_blit_buffer_to_string :
+ buffer -> int -> string -> int -> int -> unit = "ml_blit_buffer_to_string"
+
+let buffer_length = Bigarray.Array1.dim
+
+let blit_string_to_buffer s i a j l =
+ if l < 0 || i < 0 || i > String.length s - l
+ || j < 0 || j > buffer_length a - l
+ then invalid_arg "Lwt_unix.blit_string_to_buffer"
+ else unsafe_blit_string_to_buffer s i a j l
+
+let blit_buffer_to_string a i s j l =
+ if l < 0 || i < 0 || i > buffer_length a - l
+ || j < 0 || j > String.length s - l
+ then invalid_arg "Lwt_unix.blit_buffer_to_string"
+ else unsafe_blit_buffer_to_string a i s j l
+
+let buffer_size = 16384
+
+let avail_buffers = ref []
+
+let acquire_buffer () =
+ match !avail_buffers with
+ [] -> buffer_create buffer_size
+ | b :: r -> avail_buffers := r; b
+
+let release_buffer b = avail_buffers := b :: !avail_buffers
+
+(****)
+
+let last_id = ref 0
+let free_list = ref (Array.init 1 (fun i -> i))
+
+let acquire_id () =
+ let len = Array.length !free_list in
+ if !last_id = len then begin
+ let a = Array.init (len * 2) (fun i -> i) in
+ Array.blit !free_list 0 a 0 len;
+ free_list := a
+ end;
+ let i = !free_list.(!last_id) in
+ incr last_id;
+ i
+
+let release_id i =
+ decr last_id;
+ !free_list.(!last_id) <- i
+
+(****)
+
+let completionEvents = ref []
+
+let actionCompleted id len errno name =
+ completionEvents := (id, len, errno, name) :: !completionEvents
+
+external init_lwt :
+ (int -> int -> Unix.error -> string -> unit) -> int = "init_lwt"
+
+let max_event_count = init_lwt actionCompleted
+
+let event_count = ref 0
+let free_list = Array.init max_event_count (fun i -> i)
+
+let acquire_event nm =
+ if !event_count = max_event_count then
+ raise (Unix.Unix_error (Unix.EAGAIN, nm, ""));
+ let i = free_list.(!event_count) in
+ incr event_count;
+ i
+
+let release_event i =
+ decr event_count;
+ free_list.(!event_count) <- i
+
+(****)
+
+type helpers
+type file_descr = { fd : Unix.file_descr; helpers : helpers }
+
+external of_unix_file_descr : Unix.file_descr -> file_descr = "win_wrap_fd"
+
+external win_wrap_async : Unix.file_descr -> file_descr = "win_wrap_overlapped"
+
+let wrap_async =
+ if no_overlapped_io then of_unix_file_descr else win_wrap_async
+
+(****)
+
+module SleepQueue =
+ Pqueue.Make (struct
+ type t = float * int * unit Lwt.t
+ let compare (t, i, _) (t', i', _) =
+ let c = compare t t' in
+ if c = 0 then i - i' else c
+ end)
+let sleep_queue = ref SleepQueue.empty
+
+let event_counter = ref 0
+
+let sleep d =
+ let res = Lwt.wait () in
+ incr event_counter;
+ let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
+ sleep_queue :=
+ SleepQueue.add (t, !event_counter, res) !sleep_queue;
+ res
+
+let yield () = sleep 0.
+
+let get_time t =
+ if !t = -1. then t := Unix.gettimeofday ();
+ !t
+
+let in_the_past now t =
+ t = 0. || t <= get_time now
+
+let rec restart_threads imax now =
+ match
+ try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
+ with
+ Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
+ sleep_queue := SleepQueue.remove_min !sleep_queue;
+if !d then Format.eprintf "RESTART@.";
+ Lwt.wakeup thr ();
+if !d then Format.eprintf "RESTART...DONE@.";
+ restart_threads imax now
+ | _ ->
+ ()
+
+module IntTbl =
+ Hashtbl.Make
+ (struct type t = int let equal (x : int) y = x = y let hash x = x end)
+
+let ioInFlight = IntTbl.create 17
+let connInFlight = IntTbl.create 17
+
+let handleCompletionEvent (id, len, errno, name) =
+if !d then Format.eprintf "Handling event %d (len %d)@." id len;
+ let (action, buf, res) =
+ try IntTbl.find ioInFlight id with Not_found -> assert false
+ in
+ begin match action with
+ `Write -> ()
+ | `Read (s, pos) -> if len > 0 then blit_buffer_to_string buf 0 s pos len
+ end;
+ IntTbl.remove ioInFlight id;
+ release_id id;
+ release_buffer buf;
+ if len = -1 then
+ Lwt.wakeup_exn res (Unix.Unix_error (errno, name, ""))
+ else
+ Lwt.wakeup res len
+
+type kind = CONNECT | ACCEPT
+
+external win_wait : int -> int -> int = "win_wait"
+
+external win_register_wait :
+ Unix.file_descr -> kind -> int -> unit = "win_register_wait"
+
+external win_check_connection :
+ Unix.file_descr -> kind -> int -> unit = "win_check_connection"
+
+let handle_wait_event i ch kind cont action =
+if !d then prerr_endline "MMM";
+ let res =
+ try
+ Some (action ())
+ with
+ Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
+if !d then prerr_endline "NNN";
+ win_register_wait ch.fd kind i;
+ None
+ | e ->
+if !d then prerr_endline "OOO";
+ release_event i;
+ IntTbl.remove connInFlight i;
+ Lwt.wakeup_exn cont e;
+ None
+ in
+ match res with
+ Some v ->
+if !d then prerr_endline "PPP";
+ release_event i;
+ IntTbl.remove connInFlight i;
+ Lwt.wakeup cont v
+ | None ->
+ ()
+
+let rec run thread =
+if !d then Format.eprintf "Main loop@.";
+ match Lwt.poll thread with
+ Some v ->
+if !d then Format.eprintf "DONE!@.";
+ v
+ | None ->
+ let next_event =
+ try
+ let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
+ with Not_found ->
+ None
+ in
+ let now = ref (-1.) in
+ let delay =
+ match next_event with
+ None -> -1.
+ | Some 0. -> 0.
+ | Some time -> max 0. (time -. get_time now)
+ in
+if !d then Format.eprintf "vvv@.";
+ let i =
+ try
+ win_wait (truncate (ceil (delay *. 1000.))) !event_count
+ with e -> assert false
+ in
+if !d then Format.eprintf "^^^@.";
+ if i = -1 then now := !now +. delay;
+ restart_threads !event_counter now;
+if !d then Format.eprintf "threads restarted@.";
+ let ev = !completionEvents in
+ completionEvents := [];
+ List.iter handleCompletionEvent (List.rev ev);
+ if i >= 0 then begin
+ let (kind, ch) =
+ try IntTbl.find connInFlight i with Not_found -> assert false in
+ match kind with
+ `CheckSocket res ->
+if !d then prerr_endline "CHECK CONN";
+ handle_wait_event i ch CONNECT res
+ (fun () -> win_check_connection ch.fd CONNECT i)
+ | `Accept res ->
+if !d then prerr_endline "ACCEPT";
+ handle_wait_event i ch ACCEPT res
+ (fun () ->
+ win_check_connection ch.fd ACCEPT i;
+ let (v, info) = Unix.accept ch.fd in
+ (wrap_async v, info))
+ end;
+(*
+ let infds = List.map fst !inputs in
+ let outfds = List.map fst !outputs in
+ let (readers, writers, _) =
+ if windows_hack && not recent_ocaml then
+ let writers = outfds in
+ let readers =
+ if delay = 0. || writers <> [] then [] else infds in
+ (readers, writers, [])
+ else if infds = [] && outfds = [] && delay = 0. then
+ ([], [], [])
+ else
+ try
+ let res = Unix.select infds outfds [] delay in
+ if delay > 0. && !now <> -1. then now := !now +. delay;
+ res
+ with
+ Unix.Unix_error (Unix.EINTR, _, _) ->
+ ([], [], [])
+ | Unix.Unix_error (Unix.EBADF, _, _) ->
+ (List.filter bad_fd infds, List.filter bad_fd outfds, [])
+ | Unix.Unix_error (Unix.EPIPE, _, _)
+ when windows_hack && recent_ocaml ->
+ (* Workaround for a bug in Ocaml 3.11: select fails with an
+ EPIPE error when the file descriptor is remotely closed *)
+ (infds, [], [])
+ in
+ restart_threads !event_counter now;
+ List.iter
+ (fun fd ->
+ try
+ match List.assoc fd !inputs with
+ `Read (buf, pos, len, res) ->
+ wrap_syscall inputs fd res
+ (fun () -> Unix.read fd buf pos len)
+ | `Accept res ->
+ wrap_syscall inputs fd res
+ (fun () ->
+ let (s, i) = Unix.accept fd.fd in
+ if not windows_hack then Unix.set_nonblock s;
+ (wrap_async s, i))
+ | `Wait res ->
+ wrap_syscall inputs fd res (fun () -> ())
+ with Not_found ->
+ ())
+ readers;
+ List.iter
+ (fun fd ->
+ try
+ match List.assoc fd !outputs with
+ `Write (buf, pos, len, res) ->
+ wrap_syscall outputs fd res
+ (fun () -> Unix.write fd buf pos len)
+ | `Wait res ->
+ wrap_syscall inputs fd res (fun () -> ())
+ with Not_found ->
+ ())
+ writers;
+ if !child_exited then begin
+ child_exited := false;
+ List.iter
+ (fun (id, (res, flags, pid)) ->
+ wrap_syscall wait_children id res
+ (fun () ->
+ let (pid', _) as v = Unix.waitpid flags pid in
+ if pid' = 0 then raise Exit;
+ v))
+ !wait_children
+ end;
+*)
+ run thread
+
+(****)
+
+let wait_read ch = assert false
+
+let wait_write ch = assert false
+
+external start_read :
+ file_descr -> buffer -> int -> int -> int -> unit = "win_read"
+external start_write :
+ file_descr -> buffer -> int -> int -> int -> unit = "win_write"
+
+let read ch s pos len =
+if !d then Format.eprintf "Start reading@.";
+ let id = acquire_id () in
+ let buf = acquire_buffer () in
+ let len = if len > buffer_size then buffer_size else len in
+ let res = Lwt.wait () in
+ IntTbl.add ioInFlight id (`Read (s, pos), buf, res);
+ start_read ch buf 0 len id;
+if !d then Format.eprintf "Reading started@.";
+ res
+
+let write ch s pos len =
+if !d then Format.eprintf "Start writing@.";
+ let id = acquire_id () in
+ let buf = acquire_buffer () in
+ let len = if len > buffer_size then buffer_size else len in
+ blit_string_to_buffer s pos buf 0 len;
+ let res = Lwt.wait () in
+ IntTbl.add ioInFlight id (`Write, buf, res);
+ start_write ch buf 0 len id;
+if !d then Format.eprintf "Writing started@.";
+ res
+
+external win_pipe_in :
+ unit -> Unix.file_descr * Unix.file_descr = "win_pipe_in"
+external win_pipe_out :
+ unit -> Unix.file_descr * Unix.file_descr = "win_pipe_out"
+
+let pipe_in () =
+ let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_in () in
+ (wrap_async i, o)
+let pipe_out () =
+ let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_out () in
+ (i, wrap_async o)
+
+external win_socket :
+ Unix.socket_domain -> Unix.socket_type -> int -> Unix.file_descr =
+ "win_socket"
+
+let socket d t p =
+ let s = if no_overlapped_io then Unix.socket d t p else win_socket d t p in
+ Unix.set_nonblock s;
+ wrap_async s
+
+let bind ch addr = Unix.bind ch.fd addr
+let setsockopt ch opt v = Unix.setsockopt ch.fd opt v
+let listen ch n = Unix.listen ch.fd n
+let set_close_on_exec ch = Unix.set_close_on_exec ch.fd
+
+external kill_threads : file_descr -> unit = "win_kill_threads"
+
+let close ch = Unix.close ch.fd; kill_threads ch
+
+let accept ch =
+ let res = Lwt.wait () in
+ let i = acquire_event "accept" in
+ IntTbl.add connInFlight i (`Accept res, ch);
+ win_register_wait ch.fd ACCEPT i;
+ res
+
+let check_socket ch =
+ let res = Lwt.wait () in
+ let i = acquire_event "connect" in
+ IntTbl.add connInFlight i (`CheckSocket res, ch);
+ win_register_wait ch.fd CONNECT i;
+ res
+
+let connect s addr =
+ try
+ Unix.connect s.fd addr;
+if !d then prerr_endline "AAA";
+ Lwt.return ()
+ with
+ Unix.Unix_error
+ ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
+if !d then prerr_endline "BBB";
+ check_socket s
+ | e ->
+if !d then prerr_endline "CCC";
+ Lwt.fail e
+
+(*
+let ids = ref 0
+let new_id () = incr ids; !ids
+
+let _waitpid flags pid =
+ try
+ Lwt.return (Unix.waitpid flags pid)
+ with e ->
+ Lwt.fail e
+
+let waitpid flags pid =
+ if List.mem Unix.WNOHANG flags || windows_hack then
+ _waitpid flags pid
+ else
+ let flags = Unix.WNOHANG :: flags in
+ Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) ->
+ if pid' <> 0 then
+ Lwt.return res
+ else
+ let res = Lwt.wait () in
+ wait_children := (new_id (), (res, flags, pid)) :: !wait_children;
+ res)
+
+let wait () = waitpid [] (-1)
+
+let system cmd =
+ match Unix.fork () with
+ 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
+ | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status)
+*)
+
+(****)
+(*
+type lwt_in_channel = in_channel
+type lwt_out_channel = out_channel
+
+let intern_in_channel ch =
+ Unix.set_nonblock (Unix.descr_of_in_channel ch); ch
+let intern_out_channel ch =
+ Unix.set_nonblock (Unix.descr_of_out_channel ch); ch
+
+
+let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic)
+let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc)
+
+let rec input_char ic =
+ try
+ Lwt.return (Pervasives.input_char ic)
+ with
+ Sys_blocked_io ->
+ Lwt.bind (wait_inchan ic) (fun () -> input_char ic)
+ | e ->
+ Lwt.fail e
+
+let rec input ic s ofs len =
+ try
+ Lwt.return (Pervasives.input ic s ofs len)
+ with
+ Sys_blocked_io ->
+ Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len)
+ | e ->
+ Lwt.fail e
+
+let rec unsafe_really_input ic s ofs len =
+ if len <= 0 then
+ Lwt.return ()
+ else begin
+ Lwt.bind (input ic s ofs len) (fun r ->
+ if r = 0
+ then Lwt.fail End_of_file
+ else unsafe_really_input ic s (ofs+r) (len-r))
+ end
+
+let really_input ic s ofs len =
+ if ofs < 0 || len < 0 || ofs > String.length s - len
+ then Lwt.fail (Invalid_argument "really_input")
+ else unsafe_really_input ic s ofs len
+
+let input_line ic =
+ let buf = ref (String.create 128) in
+ let pos = ref 0 in
+ let rec loop () =
+ if !pos = String.length !buf then begin
+ let newbuf = String.create (2 * !pos) in
+ String.blit !buf 0 newbuf 0 !pos;
+ buf := newbuf
+ end;
+ Lwt.bind (input_char ic) (fun c ->
+ if c = '\n' then
+ Lwt.return ()
+ else begin
+ !buf.[!pos] <- c;
+ incr pos;
+ loop ()
+ end)
+ in
+ Lwt.bind
+ (Lwt.catch loop
+ (fun e ->
+ match e with
+ End_of_file when !pos <> 0 ->
+ Lwt.return ()
+ | _ ->
+ Lwt.fail e))
+ (fun () ->
+ let res = String.create !pos in
+ String.blit !buf 0 res 0 !pos;
+ Lwt.return res)
+*)
+(****)
+
+(*
+type popen_process =
+ Process of in_channel * out_channel
+ | Process_in of in_channel
+ | Process_out of out_channel
+ | Process_full of in_channel * out_channel * in_channel
+
+let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t)
+
+let open_proc cmd proc input output toclose =
+ match Unix.fork () with
+ 0 -> if input <> Unix.stdin then begin
+ Unix.dup2 input Unix.stdin;
+ Unix.close input
+ end;
+ if output <> Unix.stdout then begin
+ Unix.dup2 output Unix.stdout;
+ Unix.close output
+ end;
+ List.iter Unix.close toclose;
+ Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
+ | id -> Hashtbl.add popen_processes proc id
+
+let open_process_in cmd =
+ let (in_read, in_write) = pipe_in () in
+ let inchan = Unix.in_channel_of_descr in_read in
+ open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read];
+ Unix.close in_write;
+ Lwt.return inchan
+
+let open_process_out cmd =
+ let (out_read, out_write) = pipe_out () in
+ let outchan = Unix.out_channel_of_descr out_write in
+ open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write];
+ Unix.close out_read;
+ Lwt.return outchan
+
+let open_process cmd =
+ let (in_read, in_write) = pipe_in () in
+ let (out_read, out_write) = pipe_out () in
+ let inchan = Unix.in_channel_of_descr in_read in
+ let outchan = Unix.out_channel_of_descr out_write in
+ open_proc cmd (Process(inchan, outchan)) out_read in_write
+ [in_read; out_write];
+ Unix.close out_read;
+ Unix.close in_write;
+ Lwt.return (inchan, outchan)
+
+(* FIX: Subprocesses that use /dev/tty to print things on the terminal
+ will NOT have this output captured and returned to the caller of this
+ function. There's an argument that this is correct, but if we are
+ running from a GUI the user may not be looking at any terminal and it
+ will appear that the process is just hanging. This can be fixed, in
+ principle, by writing a little C code that opens /dev/tty and then uses
+ the TIOCNOTTY ioctl control to detach the terminal. *)
+
+let open_proc_full cmd env proc input output error toclose =
+ match Unix.fork () with
+ 0 -> Unix.dup2 input Unix.stdin; Unix.close input;
+ Unix.dup2 output Unix.stdout; Unix.close output;
+ Unix.dup2 error Unix.stderr; Unix.close error;
+ List.iter Unix.close toclose;
+ Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env
+ | id -> Hashtbl.add popen_processes proc id
+
+let open_process_full cmd env =
+ let (in_read, in_write) = pipe_in () in
+ let (out_read, out_write) = pipe_out () in
+ let (err_read, err_write) = pipe_in () in
+ let inchan = Unix.in_channel_of_descr in_read in
+ let outchan = Unix.out_channel_of_descr out_write in
+ let errchan = Unix.in_channel_of_descr err_read in
+ open_proc_full cmd env (Process_full(inchan, outchan, errchan))
+ out_read in_write err_write [in_write; out_read; err_read];
+ Unix.close out_read;
+ Unix.close in_write;
+ Unix.close err_write;
+ Lwt.return (inchan, outchan, errchan)
+
+let find_proc_id fun_name proc =
+ try
+ let pid = Hashtbl.find popen_processes proc in
+ Hashtbl.remove popen_processes proc;
+ pid
+ with Not_found ->
+ raise (Unix.Unix_error (Unix.EBADF, fun_name, ""))
+*)
+(*
+let close_process_in inchan =
+ let pid = find_proc_id "close_process_in" (Process_in inchan) in
+ close_in inchan;
+ Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process_out outchan =
+ let pid = find_proc_id "close_process_out" (Process_out outchan) in
+ close_out outchan;
+ Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process (inchan, outchan) =
+ let pid = find_proc_id "close_process" (Process(inchan, outchan)) in
+ close_in inchan; close_out outchan;
+ Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process_full (outchan, inchan, errchan) =
+ let pid =
+ find_proc_id "close_process_full"
+ (Process_full(outchan, inchan, errchan)) in
+ close_out inchan; close_in outchan; close_in errchan;
+ Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+*)
+
+type lwt_in_channel
+let input_line _ = assert false (*XXXXX*)
+let intern_in_channel _ = assert false (*XXXXX*)