diff options
-rw-r--r-- | doc/Makefile | 4 | ||||
-rw-r--r-- | src/.depend | 14 | ||||
-rw-r--r-- | src/Makefile.OCaml | 16 | ||||
-rw-r--r-- | src/RECENTNEWS | 7 | ||||
-rw-r--r-- | src/fingerprint.ml | 1 | ||||
-rw-r--r-- | src/lwt/generic/.gitignore | 3 | ||||
-rw-r--r-- | src/lwt/generic/lwt_unix_impl.ml | 508 | ||||
-rw-r--r-- | src/lwt/lwt_unix.ml | 509 | ||||
-rw-r--r-- | src/lwt/lwt_unix.mli | 28 | ||||
-rw-r--r-- | src/lwt/lwt_unix_stubs.c | 600 | ||||
-rw-r--r-- | src/lwt/win/.gitignore | 3 | ||||
-rw-r--r-- | src/lwt/win/lwt_unix_impl.ml | 645 | ||||
-rw-r--r-- | src/mkProjectInfo.ml | 1 | ||||
-rw-r--r-- | src/osx.ml | 1 | ||||
-rw-r--r-- | src/osxsupport.c | 22 | ||||
-rw-r--r-- | src/remote.ml | 42 | ||||
-rw-r--r-- | src/system/win/.gitignore | 3 |
17 files changed, 1822 insertions, 585 deletions
diff --git a/doc/Makefile b/doc/Makefile index 1f2fd3c..e857983 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -34,7 +34,7 @@ once: postproc$(EXEC_EXT) $(STRINGS) docs$(EXEC_EXT) @echo HEVEAPATH = $(HEVEAPATH) @(if [ ! -f prefs.tmp ]; then $(MAKE) prefs.tmp; fi) @(if [ ! -f prefsdocs.tmp ]; then $(MAKE) prefsdocs.tmp; fi) -ifdef HEVEA +ifeq ($(HEVEA),true) printf '$(TEXDIRECTIVES)\\textversiontrue\\draftfalse' \ > texdirectives.tex latex unison-manual.tex @@ -90,4 +90,4 @@ clean:: ../src/mkProjectInfo > $@ ../src/mkProjectInfo: ../src/mkProjectInfo.ml - ocamlc -o $@ $^ + ocamlc str.cma -o $@ $^ diff --git a/src/.depend b/src/.depend index 76aedab..9f0b177 100644 --- a/src/.depend +++ b/src/.depend @@ -7,7 +7,7 @@ common.cmi: uutil.cmi props.cmi path.cmi osx.cmi os.cmi name.cmi fspath.cmi \ fileinfo.cmi copy.cmi: uutil.cmi props.cmi path.cmi osx.cmi os.cmi lwt/lwt.cmi fspath.cmi \ fileinfo.cmi common.cmi -external.cmi: +external.cmi: lwt/lwt.cmi fileinfo.cmi: system.cmi props.cmi path.cmi osx.cmi fspath.cmi files.cmi: uutil.cmi system.cmi props.cmi path.cmi lwt/lwt_util.cmi \ lwt/lwt.cmi common.cmi @@ -154,9 +154,9 @@ pred.cmo: ubase/util.cmi ubase/safelist.cmi ubase/rx.cmi ubase/prefs.cmi \ pred.cmx: ubase/util.cmx ubase/safelist.cmx ubase/rx.cmx ubase/prefs.cmx \ case.cmx pred.cmi props.cmo: uutil.cmi ubase/util.cmi ubase/prefs.cmi path.cmi osx.cmi \ - fspath.cmi fs.cmi external.cmi props.cmi + lwt/lwt_unix.cmi fspath.cmi fs.cmi external.cmi props.cmi props.cmx: uutil.cmx ubase/util.cmx ubase/prefs.cmx path.cmx osx.cmx \ - fspath.cmx fs.cmx external.cmx props.cmi + lwt/lwt_unix.cmx fspath.cmx fs.cmx external.cmx props.cmi recon.cmo: ubase/util.cmi update.cmi tree.cmi ubase/trace.cmi sortri.cmi \ ubase/safelist.cmi props.cmi ubase/prefs.cmi pred.cmi path.cmi name.cmi \ globals.cmi fileinfo.cmi common.cmi recon.cmi @@ -295,8 +295,8 @@ xferhint.cmx: ubase/util.cmx ubase/trace.cmx ubase/prefs.cmx path.cmx os.cmx \ fspath.cmx xferhint.cmi lwt/lwt.cmo: lwt/lwt.cmi lwt/lwt.cmx: lwt/lwt.cmi -lwt/lwt_unix.cmo: lwt/pqueue.cmi lwt/lwt.cmi lwt/lwt_unix.cmi -lwt/lwt_unix.cmx: lwt/pqueue.cmx lwt/lwt.cmx lwt/lwt_unix.cmi +lwt/lwt_unix.cmo: lwt/lwt_unix.cmi +lwt/lwt_unix.cmx: lwt/lwt_unix.cmi lwt/lwt_util.cmo: lwt/lwt.cmi lwt/lwt_util.cmi lwt/lwt_util.cmx: lwt/lwt.cmx lwt/lwt_util.cmi lwt/pqueue.cmo: lwt/pqueue.cmi @@ -350,6 +350,10 @@ lwt/example/editor.cmo: lwt/lwt_unix.cmi lwt/example/editor.cmx: lwt/lwt_unix.cmx lwt/example/relay.cmo: lwt/lwt_unix.cmi lwt/lwt.cmi lwt/example/relay.cmx: lwt/lwt_unix.cmx lwt/lwt.cmx +lwt/generic/lwt_unix_impl.cmo: lwt/pqueue.cmi lwt/lwt.cmi +lwt/generic/lwt_unix_impl.cmx: lwt/pqueue.cmx lwt/lwt.cmx +lwt/win/lwt_unix_impl.cmo: lwt/pqueue.cmi lwt/lwt.cmi +lwt/win/lwt_unix_impl.cmx: lwt/pqueue.cmx lwt/lwt.cmx system/generic/system_impl.cmo: system/system_generic.cmo system/generic/system_impl.cmx: system/system_generic.cmx system/win/system_impl.cmo: system/system_win.cmo system/system_generic.cmo diff --git a/src/Makefile.OCaml b/src/Makefile.OCaml index 070efe9..8284ea6 100644 --- a/src/Makefile.OCaml +++ b/src/Makefile.OCaml @@ -5,6 +5,11 @@ #################################################################### ### Try to automatically guess OS +ifeq (${OSCOMP},cross) # Cross-compilation under Linux + OSARCH=win32gnuc + PATH := /usr/i586-mingw32msvc/bin:$(PATH) +endif + ifeq (${OSCOMP},cygwingnuc) # Define this if compiling with Cygwin GNU C OSARCH=win32gnuc ETAGS=/bin/etags @@ -86,7 +91,7 @@ buildexecutable:: INCLFLAGS=-I lwt -I ubase -I system CAMLFLAGS+=$(INCLFLAGS) -CAMLFLAGS+=-I system/$(SYSTEM) +CAMLFLAGS+=-I system/$(SYSTEM) -I lwt/$(SYSTEM) ifeq ($(OSARCH),win32) # Win32 system @@ -100,7 +105,7 @@ ifeq ($(OSARCH),win32) # issue." # CLIBS+=-cclib win32rc/unison.res # STATICLIBS+=-cclib win32rc/unison.res - COBJS+=system/system_win_stubs$(OBJ_EXT) + COBJS+=system/system_win_stubs$(OBJ_EXT) lwt/lwt_unix_stubs$(OBJ_EXT) WINOBJS=system/system_win.cmo SYSTEM=win CLIBS+=-cclib "-link win32rc/unison.res" @@ -113,7 +118,7 @@ else ifeq ($(OSARCH),win32gnuc) CWD=. EXEC_EXT=.exe - COBJS+=system/system_win_stubs$(OBJ_EXT) + COBJS+=system/system_win_stubs$(OBJ_EXT) lwt/lwt_unix_stubs$(OBJ_EXT) WINOBJS=system/system_win.cmo SYSTEM=win CLIBS+=-cclib win32rc/unison.res.lib @@ -206,7 +211,8 @@ OCAMLOBJS += \ ubase/uprintf.cmo ubase/util.cmo ubase/uarg.cmo \ ubase/prefs.cmo ubase/trace.cmo ubase/proplist.cmo \ \ - lwt/pqueue.cmo lwt/lwt.cmo lwt/lwt_util.cmo lwt/lwt_unix.cmo \ + lwt/pqueue.cmo lwt/lwt.cmo lwt/lwt_util.cmo \ + lwt/$(SYSTEM)/lwt_unix_impl.cmo lwt/lwt_unix.cmo \ \ case.cmo pred.cmo uutil.cmo \ fileutil.cmo name.cmo path.cmo fspath.cmo fs.cmo fingerprint.cmo \ @@ -303,6 +309,8 @@ include .depend # Additional dependencied depending on the system system.cmo fspath.cmo fs.cmo: system/$(SYSTEM)/system_impl.cmo system.cmx fspath.cmx fs.cmx: system/$(SYSTEM)/system_impl.cmx +lwt/lwt_unix.cmo: lwt/$(SYSTEM)/lwt_unix_impl.cmo +lwt/lwt_unix.cmx: lwt/$(SYSTEM)/lwt_unix_impl.cmx ifeq ($(OSARCH), OpenBSD) ifeq ($(shell echo type ocamldot | ksh), file) diff --git a/src/RECENTNEWS b/src/RECENTNEWS index 3a49029..f399d15 100644 --- a/src/RECENTNEWS +++ b/src/RECENTNEWS @@ -1,5 +1,12 @@ CHANGES FROM VERSION 2.39.6 +* Fixed bug which made Unison ignore finder information and resource + fork when compiled to 64bit on Mac OSX. +* Use asynchronous I/O under Windows + +------------------------------- +CHANGES FROM VERSION 2.39.6 + * Made a server waiting on a socket more resilient to unexpected lost connections from the client. * Fixed possible race condition in half-duplex communication mode. diff --git a/src/fingerprint.ml b/src/fingerprint.ml index cb9d2ca..e583a89 100644 --- a/src/fingerprint.ml +++ b/src/fingerprint.ml @@ -84,6 +84,7 @@ let hash d = if d == dummy then 1234577 else begin + assert (String.length d >= 3); Char.code (String.unsafe_get d 0) + (Char.code (String.unsafe_get d 1) lsl 8) + (Char.code (String.unsafe_get d 2) lsl 16) diff --git a/src/lwt/generic/.gitignore b/src/lwt/generic/.gitignore new file mode 100644 index 0000000..6c55acc --- /dev/null +++ b/src/lwt/generic/.gitignore @@ -0,0 +1,3 @@ +/*.cmx +/*.cmi +/*.cmo diff --git a/src/lwt/generic/lwt_unix_impl.ml b/src/lwt/generic/lwt_unix_impl.ml new file mode 100644 index 0000000..111def4 --- /dev/null +++ b/src/lwt/generic/lwt_unix_impl.ml @@ -0,0 +1,508 @@ +(* +Non-blocking I/O and select does not (fully) work under Windows. +The libray therefore does not use them under Windows, and will +therefore have the following limitations: +- No read will be performed while there are some threads ready to run + or waiting to write; +- When a read is pending, everything else will be blocked: [sleep] + will not terminate and other reads will not be performed before + this read terminates; +- A write on a socket or a pipe can block the execution of the program + if the data are never consumed at the other end of the connection. + In particular, if both ends use this library and write at the same + time, this could result in a dead-lock. +- [connect] is blocking +*) +let windows_hack = Sys.os_type <> "Unix" +let recent_ocaml = + Scanf.sscanf Sys.ocaml_version "%d.%d" + (fun maj min -> (maj = 3 && min >= 11) || maj > 3) + +module SleepQueue = + Pqueue.Make (struct + type t = float * int * unit Lwt.t + let compare (t, i, _) (t', i', _) = + let c = compare t t' in + if c = 0 then i - i' else c + end) +let sleep_queue = ref SleepQueue.empty + +let event_counter = ref 0 + +let sleep d = + let res = Lwt.wait () in + incr event_counter; + let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in + sleep_queue := + SleepQueue.add (t, !event_counter, res) !sleep_queue; + res + +let yield () = sleep 0. + +let get_time t = + if !t = -1. then t := Unix.gettimeofday (); + !t + +let in_the_past now t = + t = 0. || t <= get_time now + +let rec restart_threads imax now = + match + try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None + with + Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> + sleep_queue := SleepQueue.remove_min !sleep_queue; + Lwt.wakeup thr (); + restart_threads imax now + | _ -> + () + +type file_descr = Unix.file_descr + +let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd + +let inputs = ref [] +let outputs = ref [] +let wait_children = ref [] + +let child_exited = ref false +let _ = + if not windows_hack then + ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true))) + +let bad_fd fd = + try ignore (Unix.LargeFile.fstat fd); false with + Unix.Unix_error (_, _, _) -> + true + +let wrap_syscall queue fd cont syscall = + let res = + try + Some (syscall ()) + with + Exit + | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* EINTR because we are catching SIG_CHLD hence the system call + might be interrupted to handle the signal; this lets us restart + the system call eventually. *) + None + | e -> + queue := List.remove_assoc fd !queue; + Lwt.wakeup_exn cont e; + None + in + match res with + Some v -> + queue := List.remove_assoc fd !queue; + Lwt.wakeup cont v + | None -> + () + +let rec run thread = + match Lwt.poll thread with + Some v -> + v + | None -> + let next_event = + try + let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time + with Not_found -> + None + in + let now = ref (-1.) in + let delay = + match next_event with + None -> -1. + | Some 0. -> 0. + | Some time -> max 0. (time -. get_time now) + in + let infds = List.map fst !inputs in + let outfds = List.map fst !outputs in + let (readers, writers, _) = + if windows_hack && not recent_ocaml then + let writers = outfds in + let readers = + if delay = 0. || writers <> [] then [] else infds in + (readers, writers, []) + else if infds = [] && outfds = [] && delay = 0. then + ([], [], []) + else + try + let res = Unix.select infds outfds [] delay in + if delay > 0. && !now <> -1. then now := !now +. delay; + res + with + Unix.Unix_error (Unix.EINTR, _, _) -> + ([], [], []) + | Unix.Unix_error (Unix.EBADF, _, _) -> + (List.filter bad_fd infds, List.filter bad_fd outfds, []) + | Unix.Unix_error (Unix.EPIPE, _, _) + when windows_hack && recent_ocaml -> + (* Workaround for a bug in Ocaml 3.11: select fails with an + EPIPE error when the file descriptor is remotely closed *) + (infds, [], []) + in + restart_threads !event_counter now; + List.iter + (fun fd -> + try + match List.assoc fd !inputs with + `Read (buf, pos, len, res) -> + wrap_syscall inputs fd res + (fun () -> Unix.read fd buf pos len) + | `Accept res -> + wrap_syscall inputs fd res + (fun () -> + let (s, _) as v = Unix.accept fd in + if not windows_hack then Unix.set_nonblock s; + v) + | `Wait res -> + wrap_syscall inputs fd res (fun () -> ()) + with Not_found -> + ()) + readers; + List.iter + (fun fd -> + try + match List.assoc fd !outputs with + `Write (buf, pos, len, res) -> + wrap_syscall outputs fd res + (fun () -> Unix.write fd buf pos len) + | `CheckSocket res -> + wrap_syscall outputs fd res + (fun () -> + try ignore (Unix.getpeername fd) with + Unix.Unix_error (Unix.ENOTCONN, _, _) -> + ignore (Unix.read fd " " 0 1)) + | `Wait res -> + wrap_syscall inputs fd res (fun () -> ()) + with Not_found -> + ()) + writers; + if !child_exited then begin + child_exited := false; + List.iter + (fun (id, (res, flags, pid)) -> + wrap_syscall wait_children id res + (fun () -> + let (pid', _) as v = Unix.waitpid flags pid in + if pid' = 0 then raise Exit; + v)) + !wait_children + end; + run thread + +(****) + +let wait_read ch = + let res = Lwt.wait () in + inputs := (ch, `Wait res) :: !inputs; + res + +let wait_write ch = + let res = Lwt.wait () in + outputs := (ch, `Wait res) :: !outputs; + res + +let read ch buf pos len = + try + if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); + Lwt.return (Unix.read ch buf pos len) + with + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + let res = Lwt.wait () in + inputs := (ch, `Read (buf, pos, len, res)) :: !inputs; + res + | e -> + Lwt.fail e + +let write ch buf pos len = + try + if windows_hack && recent_ocaml then + raise (Unix.Unix_error (Unix.EAGAIN, "", "")); + Lwt.return (Unix.write ch buf pos len) + with + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> + let res = Lwt.wait () in + outputs := (ch, `Write (buf, pos, len, res)) :: !outputs; + res + | e -> + Lwt.fail e + +(* +let pipe () = + let (in_fd, out_fd) as fd_pair = Unix.pipe() in + if not windows_hack then begin + Unix.set_nonblock in_fd; + Unix.set_nonblock out_fd + end; + fd_pair +*) + +let pipe_in () = + let (in_fd, out_fd) as fd_pair = Unix.pipe() in + if not windows_hack then + Unix.set_nonblock in_fd; + fd_pair + +let pipe_out () = + let (in_fd, out_fd) as fd_pair = Unix.pipe() in + if not windows_hack then + Unix.set_nonblock out_fd; + fd_pair + +let socket dom typ proto = + let s = Unix.socket dom typ proto in + if not windows_hack then Unix.set_nonblock s; + s + +let socketpair dom typ proto = + let (s1, s2) as spair = Unix.socketpair dom typ proto in + if not windows_hack then begin + Unix.set_nonblock s1; Unix.set_nonblock s2 + end; + Lwt.return spair + +let bind = Unix.bind +let setsockopt = Unix.setsockopt +let listen = Unix.listen +let close = Unix.close +let set_close_on_exec = Unix.set_close_on_exec + +let accept ch = + let res = Lwt.wait () in + inputs := (ch, `Accept res) :: !inputs; + res + +let check_socket ch = + let res = Lwt.wait () in + outputs := (ch, `CheckSocket res) :: !outputs; + res + +let connect s addr = + try + Unix.connect s addr; + Lwt.return () + with + Unix.Unix_error + ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> + check_socket s + | e -> + Lwt.fail e + +let ids = ref 0 +let new_id () = incr ids; !ids + +let _waitpid flags pid = + try + Lwt.return (Unix.waitpid flags pid) + with e -> + Lwt.fail e + +let waitpid flags pid = + if List.mem Unix.WNOHANG flags || windows_hack then + _waitpid flags pid + else + let flags = Unix.WNOHANG :: flags in + Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) -> + if pid' <> 0 then + Lwt.return res + else + let res = Lwt.wait () in + wait_children := (new_id (), (res, flags, pid)) :: !wait_children; + res) + +let wait () = waitpid [] (-1) + +let system cmd = + match Unix.fork () with + 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] + | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status) + +(****) + +type lwt_in_channel = in_channel +type lwt_out_channel = out_channel + +let intern_in_channel ch = + Unix.set_nonblock (Unix.descr_of_in_channel ch); ch +let intern_out_channel ch = + Unix.set_nonblock (Unix.descr_of_out_channel ch); ch + + +let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic) +let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc) + +let rec input_char ic = + try + Lwt.return (Pervasives.input_char ic) + with + Sys_blocked_io -> + Lwt.bind (wait_inchan ic) (fun () -> input_char ic) + | e -> + Lwt.fail e + +let rec input ic s ofs len = + try + Lwt.return (Pervasives.input ic s ofs len) + with + Sys_blocked_io -> + Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len) + | e -> + Lwt.fail e + +let rec unsafe_really_input ic s ofs len = + if len <= 0 then + Lwt.return () + else begin + Lwt.bind (input ic s ofs len) (fun r -> + if r = 0 + then Lwt.fail End_of_file + else unsafe_really_input ic s (ofs+r) (len-r)) + end + +let really_input ic s ofs len = + if ofs < 0 || len < 0 || ofs > String.length s - len + then Lwt.fail (Invalid_argument "really_input") + else unsafe_really_input ic s ofs len + +let input_line ic = + let buf = ref (String.create 128) in + let pos = ref 0 in + let rec loop () = + if !pos = String.length !buf then begin + let newbuf = String.create (2 * !pos) in + String.blit !buf 0 newbuf 0 !pos; + buf := newbuf + end; + Lwt.bind (input_char ic) (fun c -> + if c = '\n' then + Lwt.return () + else begin + !buf.[!pos] <- c; + incr pos; + loop () + end) + in + Lwt.bind + (Lwt.catch loop + (fun e -> + match e with + End_of_file when !pos <> 0 -> + Lwt.return () + | _ -> + Lwt.fail e)) + (fun () -> + let res = String.create !pos in + String.blit !buf 0 res 0 !pos; + Lwt.return res) + +(****) + +type popen_process = + Process of in_channel * out_channel + | Process_in of in_channel + | Process_out of out_channel + | Process_full of in_channel * out_channel * in_channel + +let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t) + +let open_proc cmd proc input output toclose = + match Unix.fork () with + 0 -> if input <> Unix.stdin then begin + Unix.dup2 input Unix.stdin; + Unix.close input + end; + if output <> Unix.stdout then begin + Unix.dup2 output Unix.stdout; + Unix.close output + end; + List.iter Unix.close toclose; + Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] + | id -> Hashtbl.add popen_processes proc id + +let open_process_in cmd = + let (in_read, in_write) = pipe_in () in + let inchan = Unix.in_channel_of_descr in_read in + open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read]; + Unix.close in_write; + Lwt.return inchan + +let open_process_out cmd = + let (out_read, out_write) = pipe_out () in + let outchan = Unix.out_channel_of_descr out_write in + open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write]; + Unix.close out_read; + Lwt.return outchan + +let open_process cmd = + let (in_read, in_write) = pipe_in () in + let (out_read, out_write) = pipe_out () in + let inchan = Unix.in_channel_of_descr in_read in + let outchan = Unix.out_channel_of_descr out_write in + open_proc cmd (Process(inchan, outchan)) out_read in_write + [in_read; out_write]; + Unix.close out_read; + Unix.close in_write; + Lwt.return (inchan, outchan) + +(* FIX: Subprocesses that use /dev/tty to print things on the terminal + will NOT have this output captured and returned to the caller of this + function. There's an argument that this is correct, but if we are + running from a GUI the user may not be looking at any terminal and it + will appear that the process is just hanging. This can be fixed, in + principle, by writing a little C code that opens /dev/tty and then uses + the TIOCNOTTY ioctl control to detach the terminal. *) + +let open_proc_full cmd env proc input output error toclose = + match Unix.fork () with + 0 -> Unix.dup2 input Unix.stdin; Unix.close input; + Unix.dup2 output Unix.stdout; Unix.close output; + Unix.dup2 error Unix.stderr; Unix.close error; + List.iter Unix.close toclose; + Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env + | id -> Hashtbl.add popen_processes proc id + +let open_process_full cmd env = + let (in_read, in_write) = pipe_in () in + let (out_read, out_write) = pipe_out () in + let (err_read, err_write) = pipe_in () in + let inchan = Unix.in_channel_of_descr in_read in + let outchan = Unix.out_channel_of_descr out_write in + let errchan = Unix.in_channel_of_descr err_read in + open_proc_full cmd env (Process_full(inchan, outchan, errchan)) + out_read in_write err_write [in_write; out_read; err_read]; + Unix.close out_read; + Unix.close in_write; + Unix.close err_write; + Lwt.return (inchan, outchan, errchan) + +let find_proc_id fun_name proc = + try + let pid = Hashtbl.find popen_processes proc in + Hashtbl.remove popen_processes proc; + pid + with Not_found -> + raise (Unix.Unix_error (Unix.EBADF, fun_name, "")) + +let close_process_in inchan = + let pid = find_proc_id "close_process_in" (Process_in inchan) in + close_in inchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process_out outchan = + let pid = find_proc_id "close_process_out" (Process_out outchan) in + close_out outchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process (inchan, outchan) = + let pid = find_proc_id "close_process" (Process(inchan, outchan)) in + close_in inchan; close_out outchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process_full (outchan, inchan, errchan) = + let pid = + find_proc_id "close_process_full" + (Process_full(outchan, inchan, errchan)) in + close_out inchan; close_in outchan; close_in errchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) diff --git a/src/lwt/lwt_unix.ml b/src/lwt/lwt_unix.ml index 111def4..356d25b 100644 --- a/src/lwt/lwt_unix.ml +++ b/src/lwt/lwt_unix.ml @@ -1,508 +1 @@ -(* -Non-blocking I/O and select does not (fully) work under Windows. -The libray therefore does not use them under Windows, and will -therefore have the following limitations: -- No read will be performed while there are some threads ready to run - or waiting to write; -- When a read is pending, everything else will be blocked: [sleep] - will not terminate and other reads will not be performed before - this read terminates; -- A write on a socket or a pipe can block the execution of the program - if the data are never consumed at the other end of the connection. - In particular, if both ends use this library and write at the same - time, this could result in a dead-lock. -- [connect] is blocking -*) -let windows_hack = Sys.os_type <> "Unix" -let recent_ocaml = - Scanf.sscanf Sys.ocaml_version "%d.%d" - (fun maj min -> (maj = 3 && min >= 11) || maj > 3) - -module SleepQueue = - Pqueue.Make (struct - type t = float * int * unit Lwt.t - let compare (t, i, _) (t', i', _) = - let c = compare t t' in - if c = 0 then i - i' else c - end) -let sleep_queue = ref SleepQueue.empty - -let event_counter = ref 0 - -let sleep d = - let res = Lwt.wait () in - incr event_counter; - let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in - sleep_queue := - SleepQueue.add (t, !event_counter, res) !sleep_queue; - res - -let yield () = sleep 0. - -let get_time t = - if !t = -1. then t := Unix.gettimeofday (); - !t - -let in_the_past now t = - t = 0. || t <= get_time now - -let rec restart_threads imax now = - match - try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None - with - Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> - sleep_queue := SleepQueue.remove_min !sleep_queue; - Lwt.wakeup thr (); - restart_threads imax now - | _ -> - () - -type file_descr = Unix.file_descr - -let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd - -let inputs = ref [] -let outputs = ref [] -let wait_children = ref [] - -let child_exited = ref false -let _ = - if not windows_hack then - ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true))) - -let bad_fd fd = - try ignore (Unix.LargeFile.fstat fd); false with - Unix.Unix_error (_, _, _) -> - true - -let wrap_syscall queue fd cont syscall = - let res = - try - Some (syscall ()) - with - Exit - | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - (* EINTR because we are catching SIG_CHLD hence the system call - might be interrupted to handle the signal; this lets us restart - the system call eventually. *) - None - | e -> - queue := List.remove_assoc fd !queue; - Lwt.wakeup_exn cont e; - None - in - match res with - Some v -> - queue := List.remove_assoc fd !queue; - Lwt.wakeup cont v - | None -> - () - -let rec run thread = - match Lwt.poll thread with - Some v -> - v - | None -> - let next_event = - try - let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time - with Not_found -> - None - in - let now = ref (-1.) in - let delay = - match next_event with - None -> -1. - | Some 0. -> 0. - | Some time -> max 0. (time -. get_time now) - in - let infds = List.map fst !inputs in - let outfds = List.map fst !outputs in - let (readers, writers, _) = - if windows_hack && not recent_ocaml then - let writers = outfds in - let readers = - if delay = 0. || writers <> [] then [] else infds in - (readers, writers, []) - else if infds = [] && outfds = [] && delay = 0. then - ([], [], []) - else - try - let res = Unix.select infds outfds [] delay in - if delay > 0. && !now <> -1. then now := !now +. delay; - res - with - Unix.Unix_error (Unix.EINTR, _, _) -> - ([], [], []) - | Unix.Unix_error (Unix.EBADF, _, _) -> - (List.filter bad_fd infds, List.filter bad_fd outfds, []) - | Unix.Unix_error (Unix.EPIPE, _, _) - when windows_hack && recent_ocaml -> - (* Workaround for a bug in Ocaml 3.11: select fails with an - EPIPE error when the file descriptor is remotely closed *) - (infds, [], []) - in - restart_threads !event_counter now; - List.iter - (fun fd -> - try - match List.assoc fd !inputs with - `Read (buf, pos, len, res) -> - wrap_syscall inputs fd res - (fun () -> Unix.read fd buf pos len) - | `Accept res -> - wrap_syscall inputs fd res - (fun () -> - let (s, _) as v = Unix.accept fd in - if not windows_hack then Unix.set_nonblock s; - v) - | `Wait res -> - wrap_syscall inputs fd res (fun () -> ()) - with Not_found -> - ()) - readers; - List.iter - (fun fd -> - try - match List.assoc fd !outputs with - `Write (buf, pos, len, res) -> - wrap_syscall outputs fd res - (fun () -> Unix.write fd buf pos len) - | `CheckSocket res -> - wrap_syscall outputs fd res - (fun () -> - try ignore (Unix.getpeername fd) with - Unix.Unix_error (Unix.ENOTCONN, _, _) -> - ignore (Unix.read fd " " 0 1)) - | `Wait res -> - wrap_syscall inputs fd res (fun () -> ()) - with Not_found -> - ()) - writers; - if !child_exited then begin - child_exited := false; - List.iter - (fun (id, (res, flags, pid)) -> - wrap_syscall wait_children id res - (fun () -> - let (pid', _) as v = Unix.waitpid flags pid in - if pid' = 0 then raise Exit; - v)) - !wait_children - end; - run thread - -(****) - -let wait_read ch = - let res = Lwt.wait () in - inputs := (ch, `Wait res) :: !inputs; - res - -let wait_write ch = - let res = Lwt.wait () in - outputs := (ch, `Wait res) :: !outputs; - res - -let read ch buf pos len = - try - if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); - Lwt.return (Unix.read ch buf pos len) - with - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - let res = Lwt.wait () in - inputs := (ch, `Read (buf, pos, len, res)) :: !inputs; - res - | e -> - Lwt.fail e - -let write ch buf pos len = - try - if windows_hack && recent_ocaml then - raise (Unix.Unix_error (Unix.EAGAIN, "", "")); - Lwt.return (Unix.write ch buf pos len) - with - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - let res = Lwt.wait () in - outputs := (ch, `Write (buf, pos, len, res)) :: !outputs; - res - | e -> - Lwt.fail e - -(* -let pipe () = - let (in_fd, out_fd) as fd_pair = Unix.pipe() in - if not windows_hack then begin - Unix.set_nonblock in_fd; - Unix.set_nonblock out_fd - end; - fd_pair -*) - -let pipe_in () = - let (in_fd, out_fd) as fd_pair = Unix.pipe() in - if not windows_hack then - Unix.set_nonblock in_fd; - fd_pair - -let pipe_out () = - let (in_fd, out_fd) as fd_pair = Unix.pipe() in - if not windows_hack then - Unix.set_nonblock out_fd; - fd_pair - -let socket dom typ proto = - let s = Unix.socket dom typ proto in - if not windows_hack then Unix.set_nonblock s; - s - -let socketpair dom typ proto = - let (s1, s2) as spair = Unix.socketpair dom typ proto in - if not windows_hack then begin - Unix.set_nonblock s1; Unix.set_nonblock s2 - end; - Lwt.return spair - -let bind = Unix.bind -let setsockopt = Unix.setsockopt -let listen = Unix.listen -let close = Unix.close -let set_close_on_exec = Unix.set_close_on_exec - -let accept ch = - let res = Lwt.wait () in - inputs := (ch, `Accept res) :: !inputs; - res - -let check_socket ch = - let res = Lwt.wait () in - outputs := (ch, `CheckSocket res) :: !outputs; - res - -let connect s addr = - try - Unix.connect s addr; - Lwt.return () - with - Unix.Unix_error - ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> - check_socket s - | e -> - Lwt.fail e - -let ids = ref 0 -let new_id () = incr ids; !ids - -let _waitpid flags pid = - try - Lwt.return (Unix.waitpid flags pid) - with e -> - Lwt.fail e - -let waitpid flags pid = - if List.mem Unix.WNOHANG flags || windows_hack then - _waitpid flags pid - else - let flags = Unix.WNOHANG :: flags in - Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) -> - if pid' <> 0 then - Lwt.return res - else - let res = Lwt.wait () in - wait_children := (new_id (), (res, flags, pid)) :: !wait_children; - res) - -let wait () = waitpid [] (-1) - -let system cmd = - match Unix.fork () with - 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] - | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status) - -(****) - -type lwt_in_channel = in_channel -type lwt_out_channel = out_channel - -let intern_in_channel ch = - Unix.set_nonblock (Unix.descr_of_in_channel ch); ch -let intern_out_channel ch = - Unix.set_nonblock (Unix.descr_of_out_channel ch); ch - - -let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic) -let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc) - -let rec input_char ic = - try - Lwt.return (Pervasives.input_char ic) - with - Sys_blocked_io -> - Lwt.bind (wait_inchan ic) (fun () -> input_char ic) - | e -> - Lwt.fail e - -let rec input ic s ofs len = - try - Lwt.return (Pervasives.input ic s ofs len) - with - Sys_blocked_io -> - Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len) - | e -> - Lwt.fail e - -let rec unsafe_really_input ic s ofs len = - if len <= 0 then - Lwt.return () - else begin - Lwt.bind (input ic s ofs len) (fun r -> - if r = 0 - then Lwt.fail End_of_file - else unsafe_really_input ic s (ofs+r) (len-r)) - end - -let really_input ic s ofs len = - if ofs < 0 || len < 0 || ofs > String.length s - len - then Lwt.fail (Invalid_argument "really_input") - else unsafe_really_input ic s ofs len - -let input_line ic = - let buf = ref (String.create 128) in - let pos = ref 0 in - let rec loop () = - if !pos = String.length !buf then begin - let newbuf = String.create (2 * !pos) in - String.blit !buf 0 newbuf 0 !pos; - buf := newbuf - end; - Lwt.bind (input_char ic) (fun c -> - if c = '\n' then - Lwt.return () - else begin - !buf.[!pos] <- c; - incr pos; - loop () - end) - in - Lwt.bind - (Lwt.catch loop - (fun e -> - match e with - End_of_file when !pos <> 0 -> - Lwt.return () - | _ -> - Lwt.fail e)) - (fun () -> - let res = String.create !pos in - String.blit !buf 0 res 0 !pos; - Lwt.return res) - -(****) - -type popen_process = - Process of in_channel * out_channel - | Process_in of in_channel - | Process_out of out_channel - | Process_full of in_channel * out_channel * in_channel - -let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t) - -let open_proc cmd proc input output toclose = - match Unix.fork () with - 0 -> if input <> Unix.stdin then begin - Unix.dup2 input Unix.stdin; - Unix.close input - end; - if output <> Unix.stdout then begin - Unix.dup2 output Unix.stdout; - Unix.close output - end; - List.iter Unix.close toclose; - Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] - | id -> Hashtbl.add popen_processes proc id - -let open_process_in cmd = - let (in_read, in_write) = pipe_in () in - let inchan = Unix.in_channel_of_descr in_read in - open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read]; - Unix.close in_write; - Lwt.return inchan - -let open_process_out cmd = - let (out_read, out_write) = pipe_out () in - let outchan = Unix.out_channel_of_descr out_write in - open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write]; - Unix.close out_read; - Lwt.return outchan - -let open_process cmd = - let (in_read, in_write) = pipe_in () in - let (out_read, out_write) = pipe_out () in - let inchan = Unix.in_channel_of_descr in_read in - let outchan = Unix.out_channel_of_descr out_write in - open_proc cmd (Process(inchan, outchan)) out_read in_write - [in_read; out_write]; - Unix.close out_read; - Unix.close in_write; - Lwt.return (inchan, outchan) - -(* FIX: Subprocesses that use /dev/tty to print things on the terminal - will NOT have this output captured and returned to the caller of this - function. There's an argument that this is correct, but if we are - running from a GUI the user may not be looking at any terminal and it - will appear that the process is just hanging. This can be fixed, in - principle, by writing a little C code that opens /dev/tty and then uses - the TIOCNOTTY ioctl control to detach the terminal. *) - -let open_proc_full cmd env proc input output error toclose = - match Unix.fork () with - 0 -> Unix.dup2 input Unix.stdin; Unix.close input; - Unix.dup2 output Unix.stdout; Unix.close output; - Unix.dup2 error Unix.stderr; Unix.close error; - List.iter Unix.close toclose; - Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env - | id -> Hashtbl.add popen_processes proc id - -let open_process_full cmd env = - let (in_read, in_write) = pipe_in () in - let (out_read, out_write) = pipe_out () in - let (err_read, err_write) = pipe_in () in - let inchan = Unix.in_channel_of_descr in_read in - let outchan = Unix.out_channel_of_descr out_write in - let errchan = Unix.in_channel_of_descr err_read in - open_proc_full cmd env (Process_full(inchan, outchan, errchan)) - out_read in_write err_write [in_write; out_read; err_read]; - Unix.close out_read; - Unix.close in_write; - Unix.close err_write; - Lwt.return (inchan, outchan, errchan) - -let find_proc_id fun_name proc = - try - let pid = Hashtbl.find popen_processes proc in - Hashtbl.remove popen_processes proc; - pid - with Not_found -> - raise (Unix.Unix_error (Unix.EBADF, fun_name, "")) - -let close_process_in inchan = - let pid = find_proc_id "close_process_in" (Process_in inchan) in - close_in inchan; - Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) - -let close_process_out outchan = - let pid = find_proc_id "close_process_out" (Process_out outchan) in - close_out outchan; - Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) - -let close_process (inchan, outchan) = - let pid = find_proc_id "close_process" (Process(inchan, outchan)) in - close_in inchan; close_out outchan; - Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) - -let close_process_full (outchan, inchan, errchan) = - let pid = - find_proc_id "close_process_full" - (Process_full(outchan, inchan, errchan)) in - close_out inchan; close_in outchan; close_in errchan; - Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) +include Lwt_unix_impl diff --git a/src/lwt/lwt_unix.mli b/src/lwt/lwt_unix.mli index f6cf39d..4a95c29 100644 --- a/src/lwt/lwt_unix.mli +++ b/src/lwt/lwt_unix.mli @@ -42,9 +42,6 @@ val pipe_in : unit -> file_descr * Unix.file_descr val pipe_out : unit -> Unix.file_descr * file_descr val socket : Unix.socket_domain -> Unix.socket_type -> int -> file_descr -val socketpair : - Unix.socket_domain -> Unix.socket_type -> int -> - (file_descr * file_descr) Lwt.t val bind : file_descr -> Unix.sockaddr -> unit val setsockopt : file_descr -> Unix.socket_bool_option -> bool -> unit val accept : file_descr -> (file_descr * Unix.sockaddr) Lwt.t @@ -53,32 +50,7 @@ val listen : file_descr -> int -> unit val close : file_descr -> unit val set_close_on_exec : file_descr -> unit -val wait : unit -> (int * Unix.process_status) Lwt.t -val waitpid : Unix.wait_flag list -> int -> (int * Unix.process_status) Lwt.t - -val system : string -> Unix.process_status Lwt.t - type lwt_in_channel -type lwt_out_channel val intern_in_channel : in_channel -> lwt_in_channel -val intern_out_channel : out_channel -> lwt_out_channel - -val input_char : lwt_in_channel -> char Lwt.t val input_line : lwt_in_channel -> string Lwt.t -val input : lwt_in_channel -> string -> int -> int -> int Lwt.t -val really_input : lwt_in_channel -> string -> int -> int -> unit Lwt.t - -val open_process_in: string -> lwt_in_channel Lwt.t -val open_process_out: string -> lwt_out_channel Lwt.t -val open_process: string -> (lwt_in_channel * lwt_out_channel) Lwt.t -val open_process_full: - string -> string array -> - (lwt_in_channel * lwt_out_channel * lwt_in_channel) Lwt.t -val close_process_in: lwt_in_channel -> Unix.process_status Lwt.t -val close_process_out: lwt_out_channel -> Unix.process_status Lwt.t -val close_process: - lwt_in_channel * lwt_out_channel -> Unix.process_status Lwt.t -val close_process_full: - lwt_in_channel * lwt_out_channel * lwt_in_channel -> - Unix.process_status Lwt.t diff --git a/src/lwt/lwt_unix_stubs.c b/src/lwt/lwt_unix_stubs.c new file mode 100644 index 0000000..91d4908 --- /dev/null +++ b/src/lwt/lwt_unix_stubs.c @@ -0,0 +1,600 @@ +#include <wtypes.h>
+#include <winbase.h>
+#include <mswsock.h>
+#include <winsock2.h>
+#include <errno.h>
+#include <stdio.h>
+
+#include <caml/mlvalues.h>
+#include <caml/alloc.h>
+#include <caml/memory.h>
+#include <caml/fail.h>
+#include <caml/bigarray.h>
+#include <caml/callback.h>
+
+//#define D(x) x
+#define D(x) while(0){}
+
+#define UNIX_BUFFER_SIZE 16384
+#define Nothing ((value) 0)
+
+typedef struct
+{
+ OVERLAPPED overlapped;
+ long id;
+ long action;
+} completionData;
+
+struct filedescr {
+ union {
+ HANDLE handle;
+ SOCKET socket;
+ } fd;
+ enum { KIND_HANDLE, KIND_SOCKET } kind;
+ int crt_fd;
+};
+#define Handle_val(v) (((struct filedescr *) Data_custom_val(v))->fd.handle)
+#define Socket_val(v) (((struct filedescr *) Data_custom_val(v))->fd.socket)
+
+extern void win32_maperr (DWORD errcode);
+extern void uerror (char * cmdname, value arg);
+extern value unix_error_of_code (int errcode);
+extern value win_alloc_handle (HANDLE h);
+extern value win_alloc_socket(SOCKET);
+extern void get_sockaddr (value mladdr,
+ struct sockaddr * addr /*out*/,
+ int * addr_len /*out*/);
+
+#define Array_data(a, i) (((char *) a->data) + Long_val(i))
+
+CAMLprim value ml_blit_string_to_buffer
+(value s, value i, value a, value j, value l)
+{
+ char *src = String_val(s) + Int_val(i);
+ char *dest = Array_data(Bigarray_val(a), j);
+ memcpy(dest, src, Long_val(l));
+ return Val_unit;
+}
+
+CAMLprim value ml_blit_buffer_to_string
+(value a, value i, value s, value j, value l)
+{
+ char *src = Array_data(Bigarray_val(a), i);
+ char *dest = String_val(s) + Long_val(j);
+ memcpy(dest, src, Long_val(l));
+ return Val_unit;
+}
+
+/****/
+
+#define READ 0
+#define WRITE 1
+#define READ_OVERLAPPED 2
+#define WRITE_OVERLAPPED 3
+
+static char * action_name[4] = {
+ "read", "write", "read(overlapped)", "write(overlapped)"
+};
+
+static value completionCallback;
+
+static void invoke_completion_callback
+(long id, long len, long errCode, long action) {
+ CAMLlocal2 (err, name);
+ value args[4];
+ err = Val_long(0);
+ if (errCode != NO_ERROR) {
+ len = -1;
+ win32_maperr (errCode);
+ err = unix_error_of_code(errno);
+ }
+ name = copy_string (action_name[action]);
+ D(printf("Action %s completed: id %ld -> len %ld / err %d (errCode %ld)\n",
+ action_name[action], id, len, errno, errCode));
+ args[0] = Val_long(id);
+ args[1] = Val_long(len);
+ args[2] = err;
+ args[3] = name;
+ caml_callbackN(completionCallback, 4, args);
+ D(printf("Callback performed\n"));
+}
+
+typedef struct {
+ long id;
+ long len;
+ long errCode;
+ long action; } completionInfo;
+
+int compN = 0;
+int complQueueSize = 0;
+completionInfo * complQueue = NULL;
+
+static void completion (long id, long len, long errCode, long action) {
+ D(printf("Queueing action %s: id %ld -> len %ld / err %d (errCode %ld)\n",
+ action_name[action], id, len, errno, errCode));
+ if (compN + 1 > complQueueSize) {
+ int n = complQueueSize * 2 + 1;
+ D(printf("Resizing queue to %d\n", n));
+ completionInfo * queue =
+ (completionInfo *) GlobalAlloc(GPTR, n * sizeof(completionInfo));
+ if (complQueue != NULL)
+ CopyMemory (queue, complQueue, complQueueSize * sizeof(completionInfo));
+ complQueue = queue;
+ complQueueSize = n;
+ }
+ complQueue[compN].id = id;
+ complQueue[compN].len = len;
+ complQueue[compN].errCode = errCode;
+ complQueue[compN].action = action;
+ compN++;
+}
+
+CAMLprim value get_queue (value unit) {
+ CAMLparam1 (unit);
+ int i;
+ for (i = 0; i < compN; i++)
+ invoke_completion_callback
+ (complQueue[i].id, complQueue[i].len,
+ complQueue[i].errCode, complQueue[i].action);
+ compN = 0;
+ CAMLreturn (Val_unit);
+}
+
+/****/
+
+static HANDLE main_thread;
+
+static DWORD CALLBACK helper_thread (void * param) {
+ D(printf("Helper thread created\n"));
+ while (1) SleepEx(INFINITE, TRUE);
+}
+
+static VOID CALLBACK exit_thread(ULONG_PTR param) {
+ D(printf("Helper thread exiting\n"));
+ ExitThread(0);
+}
+
+static HANDLE get_helper_thread (value threads, int kind) {
+ HANDLE h = (HANDLE) Field(threads, kind);
+
+ if (h != INVALID_HANDLE_VALUE) return h;
+
+ h = CreateThread (NULL, 0, helper_thread, NULL, 0, NULL);
+ if (h == NULL) {
+ win32_maperr (GetLastError ());
+ uerror("createHelperThread", Nothing);
+ }
+ Field(threads, kind) = (value) h;
+ return h;
+}
+
+static void kill_thread (HANDLE *h) {
+ D(printf("Killing thread\n"));
+ QueueUserAPC(exit_thread, *h, 0);
+ CloseHandle(*h);
+ *h = INVALID_HANDLE_VALUE;
+}
+
+CAMLprim value win_kill_threads (value fd) {
+ CAMLparam1(fd);
+ if (Field(fd, 1) != Val_long(0)) {
+ kill_thread((HANDLE *) &Field(Field(fd, 1), READ));
+ kill_thread((HANDLE *) &Field(Field(fd, 1), WRITE));
+ }
+ CAMLreturn(Val_unit);
+}
+
+CAMLprim value win_wrap_fd (value fd) {
+ CAMLparam1(fd);
+ CAMLlocal2(th, res);
+ D(printf("Wrapping file descriptor (sync)\n"));
+ res = caml_alloc_tuple(2);
+ Store_field(res, 0, fd);
+ th = caml_alloc(2, Abstract_tag);
+ Field(th, READ) = (value) INVALID_HANDLE_VALUE;
+ Field(th, WRITE) = (value) INVALID_HANDLE_VALUE;
+ Store_field(res, 1, th);
+ CAMLreturn(res);
+}
+
+/****/
+
+typedef struct {
+ long action;
+ long id;
+ HANDLE fd;
+ char * buffer;
+ long len;
+ long error;
+} ioInfo;
+
+
+static VOID CALLBACK thread_completion(ULONG_PTR param) {
+ ioInfo * info = (ioInfo *) param;
+ completion (info->id, info->len, info->error, info->action);
+ GlobalFree (info);
+}
+
+static VOID CALLBACK perform_io_on_thread(ULONG_PTR param) {
+ ioInfo * info = (ioInfo *) param;
+ DWORD l;
+ BOOL res;
+
+ D(printf("Starting %s: id %ld, len %ld\n",
+ action_name[info->action], info->id, info->len));
+
+ res =
+ (info->action == READ)?
+ ReadFile(info->fd, info->buffer,info->len, &l, NULL):
+ WriteFile(info->fd, info->buffer,info->len, &l, NULL);
+ if (!res) {
+ info->len = -1;
+ info->error = GetLastError ();
+ } else {
+ info->len = l;
+ info->error = NO_ERROR;
+ }
+ D(printf("Action %s done: id %ld -> len %ld / err %d (errCode %ld)\n",
+ action_name[info->action],
+ info->id, info->len, errno, info->error));
+ QueueUserAPC(thread_completion, main_thread, param);
+}
+
+static void thread_io
+(long action, long id, value threads, HANDLE h, char * buf, long len) {
+ struct caml_bigarray *buf_arr = Bigarray_val(buf);
+ ioInfo * info = GlobalAlloc(GPTR, sizeof(ioInfo));
+ if (info == NULL) {
+ errno = ENOMEM;
+ uerror(action_name[action], Nothing);
+ }
+
+ info->action = action;
+ info->id = id;
+ info->fd = h;
+ info->buffer = buf;
+ info->len = len;
+
+ h = get_helper_thread(threads, action);
+ QueueUserAPC(perform_io_on_thread, h, (ULONG_PTR) info);
+}
+
+/****/
+
+static void CALLBACK overlapped_completion
+(DWORD errCode, DWORD len, LPOVERLAPPED overlapped) {
+ completionData * d = (completionData * )overlapped;
+ completion (d->id, len, errCode, d->action);
+ GlobalFree (d);
+}
+
+static void overlapped_action(long action, long id,
+ HANDLE fd, char *buf, long len) {
+ BOOL res;
+ long err;
+ completionData * d = GlobalAlloc(GPTR, sizeof(completionData));
+ if (d == NULL) {
+ errno = ENOMEM;
+ uerror(action_name[action], Nothing);
+ }
+ d->id = id;
+ d->action = action;
+
+ D(printf("Starting %s: id %ld, len %ld\n", action_name[action], id, len));
+ res =
+ (action == READ_OVERLAPPED)?
+ ReadFileEx(fd, buf, len, &(d->overlapped), overlapped_completion):
+ WriteFileEx(fd, buf, len, &(d->overlapped), overlapped_completion);
+
+ if (!res) {
+ err = GetLastError ();
+ if (err != ERROR_IO_PENDING) {
+ win32_maperr (err);
+ D(printf("Action %s failed: id %ld -> err %d (errCode %ld)\n",
+ action_name[action], id, errno, err));
+ uerror("ReadFileEx", Nothing);
+ }
+ }
+}
+
+CAMLprim value win_wrap_overlapped (value fd) {
+ CAMLparam1(fd);
+ CAMLlocal1(res);
+ D(printf("Wrapping file descriptor (async)\n"));
+ res = caml_alloc_tuple(2);
+ Store_field(res, 0, fd);
+ Store_field(res, 1, Val_long(0));
+ CAMLreturn(res);
+}
+
+/****/
+
+#define Handle(fd) Handle_val(Field(fd, 0))
+
+CAMLprim value win_read
+(value fd, value buf, value ofs, value len, value id) {
+ CAMLparam4(fd, buf, ofs, len);
+ struct caml_bigarray *buf_arr = Bigarray_val(buf);
+
+ if (Field(fd, 1) == Val_long(0))
+ overlapped_action (READ_OVERLAPPED, Long_val(id), Handle(fd),
+ Array_data (buf_arr, ofs), Long_val(len));
+ else
+ thread_io (READ, Long_val(id), Field(fd, 1), Handle(fd),
+ Array_data (buf_arr, ofs), Long_val(len));
+ CAMLreturn (Val_unit);
+}
+
+CAMLprim value win_write
+(value fd, value buf, value ofs, value len, value id) {
+ CAMLparam4(fd, buf, ofs, len);
+ struct caml_bigarray *buf_arr = Bigarray_val(buf);
+
+ if (Field(fd, 1) == Val_long(0))
+ overlapped_action (WRITE_OVERLAPPED, Long_val(id), Handle(fd),
+ Array_data (buf_arr, ofs), Long_val(len));
+ else
+ thread_io (WRITE, Long_val(id), Field(fd, 1), Handle(fd),
+ Array_data (buf_arr, ofs), Long_val(len));
+ CAMLreturn (Val_unit);
+}
+
+/*
+#ifndef SO_UPDATE_CONNECT_CONTEXT
+#define SO_UPDATE_CONNECT_CONTEXT 0x7010
+#endif
+
+static void after_connect (SOCKET s) {
+ if (!setsockopt(s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
+ win32_maperr (GetLastError ());
+ uerror("after_connect", Nothing);
+ }
+}
+*/
+
+static HANDLE events[MAXIMUM_WAIT_OBJECTS];
+//static OVERLAPPED oData[MAXIMUM_WAIT_OBJECTS];
+
+CAMLprim value win_register_wait (value socket, value kind, value idx) {
+ CAMLparam3(socket, kind, idx);
+ long i = Long_val(idx);
+ long mask;
+
+ D(printf("Register: i %ld, kind %ld\n", Long_val(i), Long_val(kind)));
+ events[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
+ mask = (Long_val(kind) == 0) ? FD_CONNECT : FD_ACCEPT;
+ if (WSAEventSelect(Socket_val(socket), events[i], mask) == SOCKET_ERROR) {
+ win32_maperr(WSAGetLastError ());
+ uerror("WSAEventSelect", Nothing);
+ }
+
+ CAMLreturn (Val_unit);
+}
+
+CAMLprim value win_check_connection (value socket, value kind, value idx) {
+ CAMLparam3 (socket, kind, idx);
+ WSANETWORKEVENTS evs;
+ int res, err, i = Long_val(idx);
+
+ D(printf("Check connection... %d\n", i));
+ if (WSAEnumNetworkEvents(Socket_val(socket), NULL, &evs)) {
+ win32_maperr(WSAGetLastError ());
+ uerror("WSAEnumNetworkEvents", Nothing);
+ }
+ if (WSAEventSelect(Socket_val(socket), NULL, 0) == SOCKET_ERROR) {
+ win32_maperr(WSAGetLastError ());
+ uerror("WSAEventSelect", Nothing);
+ }
+ if (!CloseHandle(events[i])) {
+ win32_maperr(GetLastError ());
+ uerror("CloseHandle", Nothing);
+ }
+ err =
+ evs.iErrorCode[(Long_val(kind) == 0) ? FD_CONNECT_BIT : FD_ACCEPT_BIT];
+ D(printf("Check connection: %ld, err %d\n", evs.lNetworkEvents, err));
+ if (err != 0) {
+ win32_maperr(err);
+ uerror("check_connection", Nothing);
+ }
+ CAMLreturn (Val_unit);
+}
+
+static HANDLE dummyEvent;
+
+CAMLprim value init_lwt (value callback) {
+ CAMLparam1 (callback);
+ // GUID GuidConnectEx = WSAID_CONNECTEX;
+ // SOCKET s;
+ // DWORD l;
+ int i;
+
+ D(printf("Init...\n"));
+ register_global_root (&completionCallback);
+ completionCallback = callback;
+
+ dummyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); // Dummy event
+
+ DuplicateHandle (GetCurrentProcess (), GetCurrentThread (),
+ GetCurrentProcess (), &main_thread,
+ 0, FALSE, DUPLICATE_SAME_ACCESS);
+
+ /*
+ s = socket(AF_INET, SOCK_STREAM, 0);
+ if (s == INVALID_SOCKET) return Val_unit;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &GuidConnectEx, sizeof(GuidConnectEx),
+ &ConnectEx, sizeof(ConnectExPtr),
+ &l, NULL, NULL);
+ closesocket(s);
+ */
+
+ D(printf("Init done\n"));
+ CAMLreturn (Val_long (MAXIMUM_WAIT_OBJECTS));
+}
+
+CAMLprim value win_wait (value timeout, value event_count) {
+ CAMLparam2(timeout, event_count);
+ DWORD t, t2;
+ DWORD res;
+ long ret, n = Long_val(event_count);
+ t = Long_val(timeout);
+ if (t < 0) t = INFINITE;
+ t2 = (compN > 0) ? 0 : t;
+ D(printf("Waiting: %ld events, timeout %ldms -> %ldms\n", n, t, t2));
+ res =
+ (n > 0) ?
+ WaitForMultipleObjectsEx(n, events, FALSE, t, TRUE) :
+ WaitForMultipleObjectsEx(1, &dummyEvent, FALSE, t, TRUE);
+ D(printf("Done waiting\n"));
+ if ((t != t2) && (res == WAIT_TIMEOUT)) res = WAIT_IO_COMPLETION;
+ switch (res) {
+ case WAIT_TIMEOUT:
+ D(printf("Timeout\n"));
+ ret = -1;
+ break;
+ case WAIT_IO_COMPLETION:
+ D(printf("I/O completion\n"));
+ ret = -2;
+ break;
+ case WAIT_FAILED:
+ D(printf("Wait failed\n"));
+ ret = 0;
+ win32_maperr (GetLastError ());
+ uerror("WaitForMultipleObjectsEx", Nothing);
+ break;
+ default:
+ ret = res;
+ D(printf("Event: %ld\n", res));
+ break;
+ }
+ get_queue (Val_unit);
+ CAMLreturn (Val_long(ret));
+}
+
+static long pipeSerial;
+
+value win_pipe(long readMode, long writeMode) {
+ CAMLparam0();
+ SECURITY_ATTRIBUTES attr;
+ HANDLE readh, writeh;
+ CHAR name[MAX_PATH];
+ CAMLlocal3(readfd, writefd, res);
+
+ attr.nLength = sizeof(attr);
+ attr.lpSecurityDescriptor = NULL;
+ attr.bInheritHandle = TRUE;
+
+ sprintf(name, "\\\\.\\Pipe\\UnisonAnonPipe.%08lx.%08lx",
+ GetCurrentProcessId(), pipeSerial++);
+
+ readh =
+ CreateNamedPipeA
+ (name, PIPE_ACCESS_INBOUND | readMode, PIPE_TYPE_BYTE | PIPE_WAIT,
+ 1, UNIX_BUFFER_SIZE, UNIX_BUFFER_SIZE, 0, &attr);
+
+ if (readh == INVALID_HANDLE_VALUE) {
+ win32_maperr(GetLastError());
+ uerror("CreateNamedPipe", Nothing);
+ return FALSE;
+ }
+
+ writeh =
+ CreateFileA
+ (name, GENERIC_WRITE, 0, &attr, OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL | writeMode, NULL);
+
+ if (writeh == INVALID_HANDLE_VALUE) {
+ win32_maperr(GetLastError());
+ CloseHandle(readh);
+ uerror("CreateFile", Nothing);
+ return FALSE;
+ }
+
+ readfd = win_alloc_handle(readh);
+ writefd = win_alloc_handle(writeh);
+ res = alloc_small(2, 0);
+ Store_field(res, 0, readfd);
+ Store_field(res, 1, writefd);
+ CAMLreturn (res);
+}
+
+CAMLprim value win_pipe_in (value unit) {
+ CAMLparam0();
+ CAMLreturn (win_pipe (FILE_FLAG_OVERLAPPED, 0));
+}
+
+CAMLprim value win_pipe_out (value unit) {
+ CAMLparam0();
+ CAMLreturn (win_pipe (0, FILE_FLAG_OVERLAPPED));
+}
+
+static int socket_domain_table[] = {
+ PF_UNIX, PF_INET
+};
+
+static int socket_type_table[] = {
+ SOCK_STREAM, SOCK_DGRAM, SOCK_RAW, SOCK_SEQPACKET
+};
+
+CAMLprim value win_socket (value domain, value type, value proto) {
+ CAMLparam3(domain, type, proto);
+ SOCKET s;
+
+ s = WSASocket(socket_domain_table[Int_val(domain)],
+ socket_type_table[Int_val(type)],
+ Int_val(proto),
+ NULL, 0, WSA_FLAG_OVERLAPPED);
+ D(printf("Created socket %lx\n", (long)s));
+ if (s == INVALID_SOCKET) {
+ win32_maperr(WSAGetLastError ());
+ uerror("WSASocket", Nothing);
+ }
+ CAMLreturn(win_alloc_socket(s));
+}
+
+/*
+#ifndef WSAID_CONNECTEX
+#define WSAID_CONNECTEX \
+ {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
+#endif
+
+typedef BOOL (WINAPI *ConnectExPtr)(SOCKET, const struct sockaddr *, int, PVOID, DWORD, LPDWORD, LPOVERLAPPED);
+
+static ConnectExPtr ConnectEx = NULL;
+
+CAMLprim value win_connect (value socket, value address, value id) {
+ CAMLparam3(socket, address, id);
+ SOCKET s = Socket_val (socket);
+ struct sockaddr addr;
+ int addr_len;
+ DWORD err;
+ int i;
+
+ if (ConnectEx == NULL) {
+ errno = ENOSYS;
+ uerror("ConnectEx", Nothing);
+ }
+ if (eventCount == MAXIMUM_WAIT_OBJECTS) {
+ errno = EAGAIN;
+ uerror("ConnectEx", Nothing);
+ }
+ i = free_list[eventCount];
+ eventCount++;
+
+ ZeroMemory(&(oData[i]), sizeof(OVERLAPPED));
+ oData[i].hEvent = events[i];
+ ids[i] = Long_val(id);
+ sockets[i] = s;
+
+ get_sockaddr(address, &addr, &addr_len);
+ if (!ConnectEx(s, &addr, addr_len, NULL, 0, 0, &(oData[i]))) {
+ err = WSAGetLastError ();
+ if (err != ERROR_IO_PENDING) {
+ win32_maperr(err);
+ uerror("ConnectEx", Nothing);
+ }
+ } else
+ after_connect(s);
+ CAMLreturn (Val_unit);
+}
+*/
diff --git a/src/lwt/win/.gitignore b/src/lwt/win/.gitignore new file mode 100644 index 0000000..6c55acc --- /dev/null +++ b/src/lwt/win/.gitignore @@ -0,0 +1,3 @@ +/*.cmx +/*.cmi +/*.cmo diff --git a/src/lwt/win/lwt_unix_impl.ml b/src/lwt/win/lwt_unix_impl.ml new file mode 100644 index 0000000..08dc891 --- /dev/null +++ b/src/lwt/win/lwt_unix_impl.ml @@ -0,0 +1,645 @@ +(* +- should check all events before looping again for avoiding race + conditions... + (we have the first, scan the subsequent ones) +*) + +let no_overlapped_io = false +let d = ref false + +(****) + +type buffer = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + +let buffer_create l = Bigarray.Array1.create Bigarray.char Bigarray.c_layout l + +external unsafe_blit_string_to_buffer : + string -> int -> buffer -> int -> int -> unit = "ml_blit_string_to_buffer" +external unsafe_blit_buffer_to_string : + buffer -> int -> string -> int -> int -> unit = "ml_blit_buffer_to_string" + +let buffer_length = Bigarray.Array1.dim + +let blit_string_to_buffer s i a j l = + if l < 0 || i < 0 || i > String.length s - l + || j < 0 || j > buffer_length a - l + then invalid_arg "Lwt_unix.blit_string_to_buffer" + else unsafe_blit_string_to_buffer s i a j l + +let blit_buffer_to_string a i s j l = + if l < 0 || i < 0 || i > buffer_length a - l + || j < 0 || j > String.length s - l + then invalid_arg "Lwt_unix.blit_buffer_to_string" + else unsafe_blit_buffer_to_string a i s j l + +let buffer_size = 16384 + +let avail_buffers = ref [] + +let acquire_buffer () = + match !avail_buffers with + [] -> buffer_create buffer_size + | b :: r -> avail_buffers := r; b + +let release_buffer b = avail_buffers := b :: !avail_buffers + +(****) + +let last_id = ref 0 +let free_list = ref (Array.init 1 (fun i -> i)) + +let acquire_id () = + let len = Array.length !free_list in + if !last_id = len then begin + let a = Array.init (len * 2) (fun i -> i) in + Array.blit !free_list 0 a 0 len; + free_list := a + end; + let i = !free_list.(!last_id) in + incr last_id; + i + +let release_id i = + decr last_id; + !free_list.(!last_id) <- i + +(****) + +let completionEvents = ref [] + +let actionCompleted id len errno name = + completionEvents := (id, len, errno, name) :: !completionEvents + +external init_lwt : + (int -> int -> Unix.error -> string -> unit) -> int = "init_lwt" + +let max_event_count = init_lwt actionCompleted + +let event_count = ref 0 +let free_list = Array.init max_event_count (fun i -> i) + +let acquire_event nm = + if !event_count = max_event_count then + raise (Unix.Unix_error (Unix.EAGAIN, nm, "")); + let i = free_list.(!event_count) in + incr event_count; + i + +let release_event i = + decr event_count; + free_list.(!event_count) <- i + +(****) + +type helpers +type file_descr = { fd : Unix.file_descr; helpers : helpers } + +external of_unix_file_descr : Unix.file_descr -> file_descr = "win_wrap_fd" + +external win_wrap_async : Unix.file_descr -> file_descr = "win_wrap_overlapped" + +let wrap_async = + if no_overlapped_io then of_unix_file_descr else win_wrap_async + +(****) + +module SleepQueue = + Pqueue.Make (struct + type t = float * int * unit Lwt.t + let compare (t, i, _) (t', i', _) = + let c = compare t t' in + if c = 0 then i - i' else c + end) +let sleep_queue = ref SleepQueue.empty + +let event_counter = ref 0 + +let sleep d = + let res = Lwt.wait () in + incr event_counter; + let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in + sleep_queue := + SleepQueue.add (t, !event_counter, res) !sleep_queue; + res + +let yield () = sleep 0. + +let get_time t = + if !t = -1. then t := Unix.gettimeofday (); + !t + +let in_the_past now t = + t = 0. || t <= get_time now + +let rec restart_threads imax now = + match + try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None + with + Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> + sleep_queue := SleepQueue.remove_min !sleep_queue; +if !d then Format.eprintf "RESTART@."; + Lwt.wakeup thr (); +if !d then Format.eprintf "RESTART...DONE@."; + restart_threads imax now + | _ -> + () + +module IntTbl = + Hashtbl.Make + (struct type t = int let equal (x : int) y = x = y let hash x = x end) + +let ioInFlight = IntTbl.create 17 +let connInFlight = IntTbl.create 17 + +let handleCompletionEvent (id, len, errno, name) = +if !d then Format.eprintf "Handling event %d (len %d)@." id len; + let (action, buf, res) = + try IntTbl.find ioInFlight id with Not_found -> assert false + in + begin match action with + `Write -> () + | `Read (s, pos) -> if len > 0 then blit_buffer_to_string buf 0 s pos len + end; + IntTbl.remove ioInFlight id; + release_id id; + release_buffer buf; + if len = -1 then + Lwt.wakeup_exn res (Unix.Unix_error (errno, name, "")) + else + Lwt.wakeup res len + +type kind = CONNECT | ACCEPT + +external win_wait : int -> int -> int = "win_wait" + +external win_register_wait : + Unix.file_descr -> kind -> int -> unit = "win_register_wait" + +external win_check_connection : + Unix.file_descr -> kind -> int -> unit = "win_check_connection" + +let handle_wait_event i ch kind cont action = +if !d then prerr_endline "MMM"; + let res = + try + Some (action ()) + with + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> +if !d then prerr_endline "NNN"; + win_register_wait ch.fd kind i; + None + | e -> +if !d then prerr_endline "OOO"; + release_event i; + IntTbl.remove connInFlight i; + Lwt.wakeup_exn cont e; + None + in + match res with + Some v -> +if !d then prerr_endline "PPP"; + release_event i; + IntTbl.remove connInFlight i; + Lwt.wakeup cont v + | None -> + () + +let rec run thread = +if !d then Format.eprintf "Main loop@."; + match Lwt.poll thread with + Some v -> +if !d then Format.eprintf "DONE!@."; + v + | None -> + let next_event = + try + let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time + with Not_found -> + None + in + let now = ref (-1.) in + let delay = + match next_event with + None -> -1. + | Some 0. -> 0. + | Some time -> max 0. (time -. get_time now) + in +if !d then Format.eprintf "vvv@."; + let i = + try + win_wait (truncate (ceil (delay *. 1000.))) !event_count + with e -> assert false + in +if !d then Format.eprintf "^^^@."; + if i = -1 then now := !now +. delay; + restart_threads !event_counter now; +if !d then Format.eprintf "threads restarted@."; + let ev = !completionEvents in + completionEvents := []; + List.iter handleCompletionEvent (List.rev ev); + if i >= 0 then begin + let (kind, ch) = + try IntTbl.find connInFlight i with Not_found -> assert false in + match kind with + `CheckSocket res -> +if !d then prerr_endline "CHECK CONN"; + handle_wait_event i ch CONNECT res + (fun () -> win_check_connection ch.fd CONNECT i) + | `Accept res -> +if !d then prerr_endline "ACCEPT"; + handle_wait_event i ch ACCEPT res + (fun () -> + win_check_connection ch.fd ACCEPT i; + let (v, info) = Unix.accept ch.fd in + (wrap_async v, info)) + end; +(* + let infds = List.map fst !inputs in + let outfds = List.map fst !outputs in + let (readers, writers, _) = + if windows_hack && not recent_ocaml then + let writers = outfds in + let readers = + if delay = 0. || writers <> [] then [] else infds in + (readers, writers, []) + else if infds = [] && outfds = [] && delay = 0. then + ([], [], []) + else + try + let res = Unix.select infds outfds [] delay in + if delay > 0. && !now <> -1. then now := !now +. delay; + res + with + Unix.Unix_error (Unix.EINTR, _, _) -> + ([], [], []) + | Unix.Unix_error (Unix.EBADF, _, _) -> + (List.filter bad_fd infds, List.filter bad_fd outfds, []) + | Unix.Unix_error (Unix.EPIPE, _, _) + when windows_hack && recent_ocaml -> + (* Workaround for a bug in Ocaml 3.11: select fails with an + EPIPE error when the file descriptor is remotely closed *) + (infds, [], []) + in + restart_threads !event_counter now; + List.iter + (fun fd -> + try + match List.assoc fd !inputs with + `Read (buf, pos, len, res) -> + wrap_syscall inputs fd res + (fun () -> Unix.read fd buf pos len) + | `Accept res -> + wrap_syscall inputs fd res + (fun () -> + let (s, i) = Unix.accept fd.fd in + if not windows_hack then Unix.set_nonblock s; + (wrap_async s, i)) + | `Wait res -> + wrap_syscall inputs fd res (fun () -> ()) + with Not_found -> + ()) + readers; + List.iter + (fun fd -> + try + match List.assoc fd !outputs with + `Write (buf, pos, len, res) -> + wrap_syscall outputs fd res + (fun () -> Unix.write fd buf pos len) + | `Wait res -> + wrap_syscall inputs fd res (fun () -> ()) + with Not_found -> + ()) + writers; + if !child_exited then begin + child_exited := false; + List.iter + (fun (id, (res, flags, pid)) -> + wrap_syscall wait_children id res + (fun () -> + let (pid', _) as v = Unix.waitpid flags pid in + if pid' = 0 then raise Exit; + v)) + !wait_children + end; +*) + run thread + +(****) + +let wait_read ch = assert false + +let wait_write ch = assert false + +external start_read : + file_descr -> buffer -> int -> int -> int -> unit = "win_read" +external start_write : + file_descr -> buffer -> int -> int -> int -> unit = "win_write" + +let read ch s pos len = +if !d then Format.eprintf "Start reading@."; + let id = acquire_id () in + let buf = acquire_buffer () in + let len = if len > buffer_size then buffer_size else len in + let res = Lwt.wait () in + IntTbl.add ioInFlight id (`Read (s, pos), buf, res); + start_read ch buf 0 len id; +if !d then Format.eprintf "Reading started@."; + res + +let write ch s pos len = +if !d then Format.eprintf "Start writing@."; + let id = acquire_id () in + let buf = acquire_buffer () in + let len = if len > buffer_size then buffer_size else len in + blit_string_to_buffer s pos buf 0 len; + let res = Lwt.wait () in + IntTbl.add ioInFlight id (`Write, buf, res); + start_write ch buf 0 len id; +if !d then Format.eprintf "Writing started@."; + res + +external win_pipe_in : + unit -> Unix.file_descr * Unix.file_descr = "win_pipe_in" +external win_pipe_out : + unit -> Unix.file_descr * Unix.file_descr = "win_pipe_out" + +let pipe_in () = + let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_in () in + (wrap_async i, o) +let pipe_out () = + let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_out () in + (i, wrap_async o) + +external win_socket : + Unix.socket_domain -> Unix.socket_type -> int -> Unix.file_descr = + "win_socket" + +let socket d t p = + let s = if no_overlapped_io then Unix.socket d t p else win_socket d t p in + Unix.set_nonblock s; + wrap_async s + +let bind ch addr = Unix.bind ch.fd addr +let setsockopt ch opt v = Unix.setsockopt ch.fd opt v +let listen ch n = Unix.listen ch.fd n +let set_close_on_exec ch = Unix.set_close_on_exec ch.fd + +external kill_threads : file_descr -> unit = "win_kill_threads" + +let close ch = Unix.close ch.fd; kill_threads ch + +let accept ch = + let res = Lwt.wait () in + let i = acquire_event "accept" in + IntTbl.add connInFlight i (`Accept res, ch); + win_register_wait ch.fd ACCEPT i; + res + +let check_socket ch = + let res = Lwt.wait () in + let i = acquire_event "connect" in + IntTbl.add connInFlight i (`CheckSocket res, ch); + win_register_wait ch.fd CONNECT i; + res + +let connect s addr = + try + Unix.connect s.fd addr; +if !d then prerr_endline "AAA"; + Lwt.return () + with + Unix.Unix_error + ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> +if !d then prerr_endline "BBB"; + check_socket s + | e -> +if !d then prerr_endline "CCC"; + Lwt.fail e + +(* +let ids = ref 0 +let new_id () = incr ids; !ids + +let _waitpid flags pid = + try + Lwt.return (Unix.waitpid flags pid) + with e -> + Lwt.fail e + +let waitpid flags pid = + if List.mem Unix.WNOHANG flags || windows_hack then + _waitpid flags pid + else + let flags = Unix.WNOHANG :: flags in + Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) -> + if pid' <> 0 then + Lwt.return res + else + let res = Lwt.wait () in + wait_children := (new_id (), (res, flags, pid)) :: !wait_children; + res) + +let wait () = waitpid [] (-1) + +let system cmd = + match Unix.fork () with + 0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] + | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status) +*) + +(****) +(* +type lwt_in_channel = in_channel +type lwt_out_channel = out_channel + +let intern_in_channel ch = + Unix.set_nonblock (Unix.descr_of_in_channel ch); ch +let intern_out_channel ch = + Unix.set_nonblock (Unix.descr_of_out_channel ch); ch + + +let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic) +let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc) + +let rec input_char ic = + try + Lwt.return (Pervasives.input_char ic) + with + Sys_blocked_io -> + Lwt.bind (wait_inchan ic) (fun () -> input_char ic) + | e -> + Lwt.fail e + +let rec input ic s ofs len = + try + Lwt.return (Pervasives.input ic s ofs len) + with + Sys_blocked_io -> + Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len) + | e -> + Lwt.fail e + +let rec unsafe_really_input ic s ofs len = + if len <= 0 then + Lwt.return () + else begin + Lwt.bind (input ic s ofs len) (fun r -> + if r = 0 + then Lwt.fail End_of_file + else unsafe_really_input ic s (ofs+r) (len-r)) + end + +let really_input ic s ofs len = + if ofs < 0 || len < 0 || ofs > String.length s - len + then Lwt.fail (Invalid_argument "really_input") + else unsafe_really_input ic s ofs len + +let input_line ic = + let buf = ref (String.create 128) in + let pos = ref 0 in + let rec loop () = + if !pos = String.length !buf then begin + let newbuf = String.create (2 * !pos) in + String.blit !buf 0 newbuf 0 !pos; + buf := newbuf + end; + Lwt.bind (input_char ic) (fun c -> + if c = '\n' then + Lwt.return () + else begin + !buf.[!pos] <- c; + incr pos; + loop () + end) + in + Lwt.bind + (Lwt.catch loop + (fun e -> + match e with + End_of_file when !pos <> 0 -> + Lwt.return () + | _ -> + Lwt.fail e)) + (fun () -> + let res = String.create !pos in + String.blit !buf 0 res 0 !pos; + Lwt.return res) +*) +(****) + +(* +type popen_process = + Process of in_channel * out_channel + | Process_in of in_channel + | Process_out of out_channel + | Process_full of in_channel * out_channel * in_channel + +let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t) + +let open_proc cmd proc input output toclose = + match Unix.fork () with + 0 -> if input <> Unix.stdin then begin + Unix.dup2 input Unix.stdin; + Unix.close input + end; + if output <> Unix.stdout then begin + Unix.dup2 output Unix.stdout; + Unix.close output + end; + List.iter Unix.close toclose; + Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |] + | id -> Hashtbl.add popen_processes proc id + +let open_process_in cmd = + let (in_read, in_write) = pipe_in () in + let inchan = Unix.in_channel_of_descr in_read in + open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read]; + Unix.close in_write; + Lwt.return inchan + +let open_process_out cmd = + let (out_read, out_write) = pipe_out () in + let outchan = Unix.out_channel_of_descr out_write in + open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write]; + Unix.close out_read; + Lwt.return outchan + +let open_process cmd = + let (in_read, in_write) = pipe_in () in + let (out_read, out_write) = pipe_out () in + let inchan = Unix.in_channel_of_descr in_read in + let outchan = Unix.out_channel_of_descr out_write in + open_proc cmd (Process(inchan, outchan)) out_read in_write + [in_read; out_write]; + Unix.close out_read; + Unix.close in_write; + Lwt.return (inchan, outchan) + +(* FIX: Subprocesses that use /dev/tty to print things on the terminal + will NOT have this output captured and returned to the caller of this + function. There's an argument that this is correct, but if we are + running from a GUI the user may not be looking at any terminal and it + will appear that the process is just hanging. This can be fixed, in + principle, by writing a little C code that opens /dev/tty and then uses + the TIOCNOTTY ioctl control to detach the terminal. *) + +let open_proc_full cmd env proc input output error toclose = + match Unix.fork () with + 0 -> Unix.dup2 input Unix.stdin; Unix.close input; + Unix.dup2 output Unix.stdout; Unix.close output; + Unix.dup2 error Unix.stderr; Unix.close error; + List.iter Unix.close toclose; + Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env + | id -> Hashtbl.add popen_processes proc id + +let open_process_full cmd env = + let (in_read, in_write) = pipe_in () in + let (out_read, out_write) = pipe_out () in + let (err_read, err_write) = pipe_in () in + let inchan = Unix.in_channel_of_descr in_read in + let outchan = Unix.out_channel_of_descr out_write in + let errchan = Unix.in_channel_of_descr err_read in + open_proc_full cmd env (Process_full(inchan, outchan, errchan)) + out_read in_write err_write [in_write; out_read; err_read]; + Unix.close out_read; + Unix.close in_write; + Unix.close err_write; + Lwt.return (inchan, outchan, errchan) + +let find_proc_id fun_name proc = + try + let pid = Hashtbl.find popen_processes proc in + Hashtbl.remove popen_processes proc; + pid + with Not_found -> + raise (Unix.Unix_error (Unix.EBADF, fun_name, "")) +*) +(* +let close_process_in inchan = + let pid = find_proc_id "close_process_in" (Process_in inchan) in + close_in inchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process_out outchan = + let pid = find_proc_id "close_process_out" (Process_out outchan) in + close_out outchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process (inchan, outchan) = + let pid = find_proc_id "close_process" (Process(inchan, outchan)) in + close_in inchan; close_out outchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) + +let close_process_full (outchan, inchan, errchan) = + let pid = + find_proc_id "close_process_full" + (Process_full(outchan, inchan, errchan)) in + close_out inchan; close_in outchan; close_in errchan; + Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status) +*) + +type lwt_in_channel +let input_line _ = assert false (*XXXXX*) +let intern_in_channel _ = assert false (*XXXXX*) diff --git a/src/mkProjectInfo.ml b/src/mkProjectInfo.ml index 63f1769..e16ea86 100644 --- a/src/mkProjectInfo.ml +++ b/src/mkProjectInfo.ml @@ -98,3 +98,4 @@ Printf.printf "VERSION=%d.%d.%d\n" majorVersion minorVersion pointVersion;; Printf.printf "NAME=%s\n" projectName;; + @@ -514,6 +514,7 @@ let openRessOut fspath path length = output_string outch "\000\000\014\176"; (* length *) output_string outch "\000\000\000\002"; (* Resource fork *) output_string outch "\000\000\014\226"; (* offset *) +(* FIX: should check for overflow! *) output_string outch (setInt4 (Uutil.Filesize.toInt64 length)); (* length *) output_string outch (emptyFinderInfo ()); diff --git a/src/osxsupport.c b/src/osxsupport.c index 2a1f638..95e818f 100644 --- a/src/osxsupport.c +++ b/src/osxsupport.c @@ -33,12 +33,12 @@ CAMLprim value getFileInfos (value path, value need_size) { CAMLlocal3(res, fInfo, length); int retcode; struct attrlist attrList; - unsigned long options = 0; + unsigned long options = FSOPT_REPORT_FULLSIZE; struct { - unsigned long length; - char finderInfo [32]; - off_t rsrcLength; - } attrBuf; + u_int32_t length; + char finderInfo [32]; + off_t rsrcLength; + } __attribute__ ((packed)) attrBuf; attrList.bitmapcount = ATTR_BIT_MAP_COUNT; attrList.reserved = 0; @@ -58,10 +58,10 @@ CAMLprim value getFileInfos (value path, value need_size) { if (Bool_val (need_size)) { if (attrBuf.length != sizeof attrBuf) - unix_error (EOPNOTSUPP, "getattrlist", path); + unix_error (EINVAL, "getattrlist", path); } else { - if (attrBuf.length < sizeof (unsigned long) + 32) - unix_error (EOPNOTSUPP, "getattrlist", path); + if (attrBuf.length != sizeof (u_int32_t) + 32) + unix_error (EINVAL, "getattrlist", path); } fInfo = alloc_string (32); @@ -92,9 +92,9 @@ CAMLprim value setFileInfos (value path, value fInfo) { struct attrlist attrList; unsigned long options = 0; struct { - unsigned long length; - char finderInfo [32]; - } attrBuf; + u_int32_t length; + char finderInfo [32]; + } __attribute__ ((packed)) attrBuf; attrList.bitmapcount = ATTR_BIT_MAP_COUNT; attrList.reserved = 0; diff --git a/src/remote.ml b/src/remote.ml index bec5882..d198b92 100644 --- a/src/remote.ml +++ b/src/remote.ml @@ -28,10 +28,9 @@ let debugT = Trace.debug "remote+" But that resulted in huge amounts of output from '-debug all'. *) -let windowsHack = Sys.os_type <> "Unix" -let recent_ocaml = - Scanf.sscanf Sys.ocaml_version "%d.%d" - (fun maj min -> (maj = 3 && min >= 11) || maj > 3) +let _ = + if Sys.os_type = "Unix" then + ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore) let _ = if Sys.os_type = "Unix" then @@ -53,8 +52,6 @@ let _ = But then, there is the risk that the two sides exchange spurious messages. *) -let needFlowControl = windowsHack -let readOrWrite = needFlowControl && not recent_ocaml (****) @@ -307,10 +304,9 @@ let makeOutputQueue isServer flush = type connection = { inputBuffer : ioBuffer; outputBuffer : ioBuffer; - outputQueue : outputQueue; - receiver : (unit -> unit Lwt.t) option ref } + outputQueue : outputQueue } -let maybeFlush receiver pendingFlush q buf = +let maybeFlush pendingFlush q buf = (* We return immediately if a flush is already scheduled, or if the output buffer is already empty. *) (* If we are doing flow control and we can write, we need to send @@ -335,25 +331,19 @@ let maybeFlush receiver pendingFlush q buf = flushBuffer buf end else flushBuffer buf) >>= fun () -> - assert (not (q.flowControl && q.canWrite)); - (* Restart the reader thread if needed *) - match !receiver with - None -> Lwt.return () - | Some f -> f () + Lwt.return () end else Lwt.return () end let makeConnection isServer inCh outCh = let pendingFlush = ref false in - let receiver = ref None in let outputBuffer = makeBuffer outCh in - { inputBuffer = makeBuffer inCh; - outputBuffer = outputBuffer; - outputQueue = - makeOutputQueue isServer - (fun q -> maybeFlush receiver pendingFlush q outputBuffer); - receiver = receiver } + { inputBuffer = makeBuffer inCh; + outputBuffer = outputBuffer; + outputQueue = + makeOutputQueue isServer + (fun q -> maybeFlush pendingFlush q outputBuffer) } (* Send message [l] *) let dump conn l = @@ -694,9 +684,7 @@ let find_receiver id = (* Receiving thread: read a message and dispatch it to the right thread or create a new thread to process requests. *) let rec receive conn = - if readOrWrite && conn.outputQueue.canWrite then begin - conn.receiver := Some (fun () -> receive conn); Lwt.return () - end else begin + begin debugE (fun () -> Util.msg "Waiting for next message\n"); (* Get the message ID *) let id = Bytearray.create intSize in @@ -966,15 +954,15 @@ let halfduplex = in a deadlock." let negociateFlowControlLocal conn () = - if not needFlowControl then disableFlowControl conn.outputQueue; - Lwt.return needFlowControl + disableFlowControl conn.outputQueue; + Lwt.return false let negociateFlowControlRemote = registerServerCmd "negociateFlowControl" negociateFlowControlLocal let negociateFlowControl conn = (* Flow control negociation can be done asynchronously. *) - if not (needFlowControl || Prefs.read halfduplex) then + if not (Prefs.read halfduplex) then Lwt.ignore_result (negociateFlowControlRemote conn () >>= fun needed -> if not needed then diff --git a/src/system/win/.gitignore b/src/system/win/.gitignore new file mode 100644 index 0000000..6c55acc --- /dev/null +++ b/src/system/win/.gitignore @@ -0,0 +1,3 @@ +/*.cmx +/*.cmi +/*.cmo |