summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAijay Adams <aijay@fb.com>2016-02-05 11:26:38 -0800
committerAijay Adams <aijay@fb.com>2016-02-05 11:26:38 -0800
commit2ceefb504234069d43bf89063703b55d83be5ea4 (patch)
tree5331eaafb79bb8501b503146400faeed122aaa5f
downloadfbtracert-2ceefb504234069d43bf89063703b55d83be5ea4.zip
fbtracert-2ceefb504234069d43bf89063703b55d83be5ea4.tar.gz
fbtracert-2ceefb504234069d43bf89063703b55d83be5ea4.tar.bz2
adding project to github
-rw-r--r--LICENSE30
-rw-r--r--PATENTS34
-rw-r--r--README.md68
-rw-r--r--TARGETS11
-rw-r--r--main.go789
-rw-r--r--tcp.go172
-rw-r--r--util.go74
7 files changed, 1178 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..6bf7744
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+BSD License
+
+For fbtracert software
+
+Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * Neither the name Facebook nor the names of its contributors may be used to
+ endorse or promote products derived from this software without specific
+ prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/PATENTS b/PATENTS
new file mode 100644
index 0000000..a9b0b77
--- /dev/null
+++ b/PATENTS
@@ -0,0 +1,34 @@
+
+Additional Grant of Patent Rights Version 2
+
+"Software" means the fbtracert software distributed by Facebook, Inc.
+
+Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software
+("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable
+(subject to the termination provision below) license under any Necessary
+Claims, to make, have made, use, sell, offer to sell, import, and otherwise
+transfer the Software. For avoidance of doubt, no license is granted under
+Facebook’s rights in any patent claims that are infringed by (i) modifications
+to the Software made by you or any third party or (ii) the Software in
+combination with any software or other technology.
+
+The license granted hereunder will terminate, automatically and without notice,
+if you (or any of your subsidiaries, corporate affiliates or agents) initiate
+directly or indirectly, or take a direct financial interest in, any Patent
+Assertion: (i) against Facebook or any of its subsidiaries or corporate
+affiliates, (ii) against any party if such Patent Assertion arises in whole or
+in part from any software, technology, product or service of Facebook or any of
+its subsidiaries or corporate affiliates, or (iii) against any party relating
+to the Software. Notwithstanding the foregoing, if Facebook or any of its
+subsidiaries or corporate affiliates files a lawsuit alleging patent
+infringement against you in the first instance, and you respond by filing a
+patent infringement counterclaim in that lawsuit against that party that is
+unrelated to the Software, the license granted hereunder will not terminate
+under section (i) of this paragraph due to such counterclaim.
+
+A "Necessary Claim" is a claim of a patent owned by Facebook that is
+necessarily infringed by the Software standing alone.
+
+A "Patent Assertion" is any lawsuit or other action alleging direct, indirect,
+or contributory infringement or inducement to infringe any patent, including a
+cross-claim or counterclaim.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..550924e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,68 @@
+# fbtracert
+(pronounced ef-BEE-tracerTEE)
+
+## Installing
+
+Requires golang >= 1.5.1
+
+go get -d github.com/facebook/fbtracert
+go install github.com/facebook/fbtracert
+$GOPATH/bin/fbtracert --help
+
+## Full documentation
+
+### Fault isolation in ECMP networks via multi-port traceroute
+
+This tools attempts to identify the network component that drops packets by employing the traceroute logic
+that explores multiple parallel paths. The following describes the main goroutines and their logic.
+
+### Sender
+
+We start this go-routine for every TTL that we expect on path to the destination. With start with some max TTL
+value, and then stop all senders that have TTL above the distance to the target. For every TTL, the sender
+loops over a range of source ports, and emits a TCP SYN packet towards the destination with the set target port.
+The Sender also emits "Probe" objects on a special channel so that the analysis part may know what packets
+have been injected in the network (srcPort and Ttl).
+
+Notice how encode the sending time-stamp and the ttl in the ISN of the TCP SYN packet. This allows for measuring
+the probe RTT, and recoving the TTL of the response. Just like regular traceroute, we expect the network to return
+us either ICMP Unreachable message (TTL exceeded) or TCP RST message (when we hit the ultimate hop)
+
+The Sender thread stops once it completes the requested number of iterations over the source port range.
+
+### ICMP Receiver
+
+We run only one ICMP receiver goroutine: it is responsible for receiving the ICMP Unrechable messages and recovering
+the original probe information from them. We only use the first 8 bytes of the TCP packet embedded into ICMP Unreachable
+message, though in IPv6 case we could have more. This is sufficient anyways to recover the TTL and the timestamp of the
+original probe.
+
+Upon reception of an ICMP message, we build IcmpResponse struct and forward it to the input work queue of the Resolver
+goroutine ensemble. This is needed to resolve the IP address of the node that sent us the response into its DNS name.
+
+### TCP Receiver
+
+Similar to IcmpReceiver in logic, but this goroutine intercepts to TCP RST/ACK packets sent by the ultimate destinations of
+our probes. These responses are processed and have TTL extracted, and then forwarded to the Resolver thread. We can
+work both with close ports (RST expect) and open ports (ACK expected). Be careful and make sure there are no open
+connections from your machine to the target on the port you are probing - this may confuse the hell of of TcpReceiver
+
+### Resolver
+
+This goroutine listens to the incoming Icmp/Tcp Response messages and resolves the names embedded into the Icmp responses.
+We start lots of those so we can handle concurrent name resolution. The resolver is effectively a transformation function
+on the stream of messages.
+
+### Main goroutine
+
+This one is responsible for starting all other goroutines, and then assembling their output. It is also responsible for
+terminated the unnecessary Senders. This is done by seeing what TTL hops actually return TCP RST messages; once we receive
+TCP RST for TTL x, we can safely stop all senders for TTL > x
+
+The main loop expect to receive all "Probes" from the channels fed by the Sender goroutines. The Sender will close its
+output channels once its done sending. This serves as an indicator that all sending has completed. After that, we
+wait a few more seconds all tell the TcpReceiver and IcmpReceiver to stop by closing their "signal" channel.
+
+After that, we process all data that the Receivers have fed to the main thread. We need to find the source ports
+whos' paths show consistent packet loss after a given hop N. We then output these paths as the "suspects" along with the
+counts of sent/received packets per hop.
diff --git a/TARGETS b/TARGETS
new file mode 100644
index 0000000..74c33b7
--- /dev/null
+++ b/TARGETS
@@ -0,0 +1,11 @@
+go_binary(
+ name = "fbtracert",
+ srcs = util.files("**/*.go"),
+ go_external_deps = [
+ ("github.com/golang/glog",
+ "d1c4472bf2efd3826f2b5bdcc02d8416798d678c"),
+ ("github.com/olekukonko/tablewriter",
+ "bc39950e081b457853031334b3c8b95cdfe428ba"),
+ ],
+ go_version = "1.5.1",
+)
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..e0892db
--- /dev/null
+++ b/main.go
@@ -0,0 +1,789 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "fmt"
+ "github.com/golang/glog"
+ "github.com/olekukonko/tablewriter"
+ "math/rand"
+ "net"
+ "os"
+ "runtime"
+ "syscall"
+ "time"
+)
+
+//
+// Command line flags
+//
+var maxTtl *int = flag.Int("maxTtl", 30, "The maximum ttl to use")
+var minTtl *int = flag.Int("minTtl", 1, "The ttl to start at")
+var maxSrcPorts *int = flag.Int("maxSrcPorts", 256, "The maximum number of source ports to use")
+var maxTime *int = flag.Int("maxTime", 60, "The time to run the process for")
+var targetPort *int = flag.Int("targetPort", 22, "The target port to trace to")
+var probeRate *int = flag.Int("probeRate", 96, "The probe rate per ttl layer")
+var tosValue *int = flag.Int("tosValue", 140, "The TOS/TC to use in probes")
+var numResolvers *int = flag.Int("numResolvers", 32, "The number of DNS resolver goroutines")
+var addrFamily *string = flag.String("addrFamily", "ip4", "The address family (ip4/ip6) to use for testing")
+var maxColumns *int = flag.Int("maxColumns", 4, "Maximum number of columns in report tables")
+var showAll *bool = flag.Bool("showAll", false, "Show all paths, regardless of loss detection")
+var srcAddr *string = flag.String("srcAddr", "", "The source address for pings, default to auto-discover")
+var jsonOutput *bool = flag.Bool("jsonOutput", false, "Output raw JSON data")
+var baseSrcPort *int = flag.Int("baseSrcPort", 32768, "The base source port to start probing from")
+
+//
+// Discover the source address for pinging
+//
+func getSourceAddr(af string, srcAddr string) (*net.IP, error) {
+
+ if srcAddr != "" {
+ addr, err := net.ResolveIPAddr(*addrFamily, srcAddr)
+ if err != nil {
+ return nil, err
+ }
+ return &addr.IP, nil
+ }
+
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return nil, err
+ }
+ for _, a := range addrs {
+ if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if (ipnet.IP.To4() != nil && af == "ip4") || (ipnet.IP.To4() == nil && af == "ip6") {
+ return &ipnet.IP, nil
+ }
+ }
+ }
+ return nil, fmt.Errorf("Could not find a source address in af %s", af)
+}
+
+//
+// Resolve given hostname/address in the given address family
+//
+func resolveName(dest string, af string) (*net.IP, error) {
+ addr, err := net.ResolveIPAddr(af, dest)
+ return &addr.IP, err
+}
+
+//
+// Probe specification, emitted by sender
+//
+type Probe struct {
+ srcPort int
+ ttl int
+}
+
+//
+// Emitted by IcmpReceiver
+//
+type IcmpResponse struct {
+ Probe
+ fromAddr *net.IP
+ fromName string
+ rtt uint32
+}
+
+//
+// Emitted by TcpReceiver
+//
+type TcpResponse struct {
+ Probe
+ rtt uint32
+}
+
+//
+// Feed on TCP RST messages we receive from the end host; we use lots of parameters to check if the incoming packet
+// is actually a response to our probe. We create TcpResponse structs and emit them on the output channel
+//
+func TcpReceiver(done <-chan struct{}, af string, targetAddr string, probePortStart, probePortEnd, targetPort, maxTtl int) (chan interface{}, error) {
+ var recvSocket int
+ var err error
+ var ipHdrSize int
+
+ glog.V(2).Infoln("TcpReceiver starting...")
+
+ // create the socket
+ switch {
+ case af == "ip4":
+ recvSocket, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_TCP)
+ // IPv4 header is always included with the ipv4 raw socket receive
+ ipHdrSize = 20
+ case af == "ip6":
+ recvSocket, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_TCP)
+ // no IPv6 header present on TCP packets received on the raw socket
+ ipHdrSize = 0
+ default:
+ return nil, fmt.Errorf("Unknown address family supplied")
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ // we'll be writing the TcpResponse structs to this channel
+ out := make(chan interface{})
+
+ // IP + TCP header, this channel is fed from the socket
+ recv := make(chan TcpResponse)
+ go func() {
+ const tcpHdrSize int = 20
+ packet := make([]byte, ipHdrSize+tcpHdrSize)
+
+ for {
+ n, from, err := syscall.Recvfrom(recvSocket, packet, 0)
+ // parent has closed the socket likely
+ if err != nil {
+ break
+ }
+
+ // IP + TCP header size
+ if n < ipHdrSize+tcpHdrSize {
+ continue
+ }
+
+ // is that from the target port we expect?
+ tcpHdr := parseTcpHeader(packet[ipHdrSize:n])
+ if int(tcpHdr.Source) != targetPort {
+ continue
+ }
+
+ // is that TCP RST TCP ACK?
+ if tcpHdr.Flags&RST != RST && tcpHdr.Flags&ACK != ACK {
+ continue
+ }
+
+ var fromAddrStr string
+
+ switch {
+ case af == "ip4":
+ fromAddrStr = net.IP((from.(*syscall.SockaddrInet4).Addr)[:]).String()
+ case af == "ip6":
+ fromAddrStr = net.IP((from.(*syscall.SockaddrInet6).Addr)[:]).String()
+ }
+
+ // is that from our target?
+ if fromAddrStr != targetAddr {
+ continue
+ }
+
+ // we extract the original TTL and timestamp from the ack number
+ ackNum := tcpHdr.AckNum - 1
+ ttl := int(ackNum >> 24)
+
+ if ttl > maxTtl || ttl < 1 {
+ continue
+ }
+
+ // recover the time-stamp from the ack #
+ ts := ackNum & 0x00ffffff
+ now := uint32(time.Now().UnixNano()/(1000*1000)) & 0x00ffffff
+
+ // received timestamp is higher than local time; it is possible
+ // that ts == now, since our clock resolution is coarse
+ if ts > now {
+ continue
+ }
+
+ recv <- TcpResponse{Probe: Probe{srcPort: int(tcpHdr.Destination), ttl: ttl}, rtt: now - ts}
+ }
+ }()
+
+ go func() {
+ defer syscall.Close(recvSocket)
+ defer close(out)
+ for {
+ select {
+ case response := <-recv:
+ out <- response
+ case <-done:
+ glog.V(2).Infoln("TcpReceiver terminating...")
+ return
+ }
+ }
+ }()
+
+ return out, nil
+}
+
+//
+// This runs on its own collecting Icmp responses until its explicitly told to stop
+//
+func IcmpReceiver(done <-chan struct{}, af string) (chan interface{}, error) {
+ var recvSocket int
+ var err error
+ var outerIpHdrSize int
+ var innerIpHdrSize int
+ var icmpMsgType byte
+
+ const (
+ icmpHdrSize int = 8
+ tcpHdrSize int = 8
+ )
+
+ switch {
+ case af == "ip4":
+ recvSocket, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_ICMP)
+ // IPv4 raw socket always prepend the transport IPv4 header
+ outerIpHdrSize = 20
+ // the size of the original IPv4 header that was on the TCP packet sent out
+ innerIpHdrSize = 20
+ // hardcoded: time to live exceeded
+ icmpMsgType = 11
+ case af == "ip6":
+ recvSocket, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_ICMPV6)
+ // IPv6 raw socket does not prepend the original transport IPv6 header
+ outerIpHdrSize = 0
+ // this is the size of IPv6 header of the original TCP packet we used in the probes
+ innerIpHdrSize = 40
+ // time to live exceeded
+ icmpMsgType = 3
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ glog.V(2).Infoln("IcmpReceiver is starting...")
+
+ recv := make(chan interface{})
+
+ go func() {
+ // TODO: remove hardcode; 20 bytes for IP header, 8 bytes for ICMP header, 8 bytes for TCP header
+ packet := make([]byte, outerIpHdrSize+icmpHdrSize+innerIpHdrSize+tcpHdrSize)
+ for {
+ n, from, err := syscall.Recvfrom(recvSocket, packet, 0)
+ if err != nil {
+ break
+ }
+ // extract the 8 bytes of the original TCP header
+ if n < outerIpHdrSize+icmpHdrSize+innerIpHdrSize+tcpHdrSize {
+ continue
+ }
+ // not ttl exceeded
+ if packet[outerIpHdrSize] != icmpMsgType || packet[outerIpHdrSize+1] != 0 {
+ continue
+ }
+ glog.V(4).Infof("Received icmp response message %d: %x\n", len(packet), packet)
+ tcpHdr := parseTcpHeader(packet[outerIpHdrSize+icmpHdrSize+innerIpHdrSize : n])
+
+ var fromAddr net.IP
+
+ switch {
+ case af == "ip4":
+ fromAddr = net.IP(from.(*syscall.SockaddrInet4).Addr[:])
+ case af == "ip6":
+ fromAddr = net.IP(from.(*syscall.SockaddrInet6).Addr[:])
+ }
+
+ // extract ttl bits from the ISN
+ ttl := int(tcpHdr.SeqNum) >> 24
+
+ // extract the timestamp from the ISN
+ ts := tcpHdr.SeqNum & 0x00ffffff
+ // scale the current time
+ now := uint32(time.Now().UnixNano()/(1000*1000)) & 0x00ffffff
+ recv <- IcmpResponse{Probe: Probe{srcPort: int(tcpHdr.Source), ttl: ttl}, fromAddr: &fromAddr, rtt: now - ts}
+ }
+ }()
+
+ out := make(chan interface{})
+ go func() {
+ defer syscall.Close(recvSocket)
+ defer close(out)
+ for {
+ select {
+ // read Icmp struct
+ case response := <-recv:
+ out <- response
+ case <-done:
+ glog.V(2).Infoln("IcmpReceiver done")
+ return
+ }
+ }
+ }()
+
+ return out, nil
+}
+
+//
+// Resolve names in incoming IcmpResponse messages
+// Everything else is passed through as is
+//
+func Resolver(input chan interface{}) (chan interface{}, error) {
+ out := make(chan interface{})
+ go func() {
+ defer close(out)
+
+ for val := range input {
+ switch val.(type) {
+ case IcmpResponse:
+ resp := val.(IcmpResponse)
+ names, err := net.LookupAddr(resp.fromAddr.String())
+ if err != nil {
+ resp.fromName = "?"
+ } else {
+ resp.fromName = names[0]
+ }
+ out <- resp
+ default:
+ out <- val
+ }
+ }
+ }()
+ return out, nil
+}
+
+//
+// Generate TCP SYN packet probes with given TTL at given packet per second rate
+// The packet descriptions are published to the output channel as Probe messages
+// As a side effect, the packets are injected into raw socket
+//
+func Sender(done <-chan struct{}, srcAddr *net.IP, af, dest string, dstPort, baseSrcPort, maxSrcPorts, maxIters, ttl, pps, tos int) (chan interface{}, error) {
+ var err error
+
+ out := make(chan interface{})
+
+ glog.V(2).Infof("Sender for ttl %d starting\n", ttl)
+
+ dstAddr, err := resolveName(dest, af)
+ if err != nil {
+ return nil, err
+ }
+
+ var sendSocket int
+
+ // create the socket
+ switch {
+ case af == "ip4":
+ sendSocket, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_TCP)
+ case af == "ip6":
+ sendSocket, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_TCP)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ // bind the socket
+ switch {
+ case af == "ip4":
+ var sockaddr [4]byte
+ copy(sockaddr[:], srcAddr.To4())
+ err = syscall.Bind(sendSocket, &syscall.SockaddrInet4{Port: 0, Addr: sockaddr})
+ case af == "ip6":
+ var sockaddr [16]byte
+ copy(sockaddr[:], srcAddr.To16())
+ err = syscall.Bind(sendSocket, &syscall.SockaddrInet6{Port: 0, Addr: sockaddr})
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ // set the ttl on the socket
+ switch {
+ case af == "ip4":
+ err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IP, syscall.IP_TTL, ttl)
+ case af == "ip6":
+ err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IPV6, syscall.IPV6_UNICAST_HOPS, ttl)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ // set the tos on the socket
+ switch {
+ case af == "ip4":
+ err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IP, syscall.IP_TOS, tos)
+ case af == "ip6":
+ err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IPV6, syscall.IPV6_TCLASS, tos)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ // spawn a new goroutine and return the channel to be used for reading
+ go func() {
+ defer syscall.Close(sendSocket)
+ defer close(out)
+
+ delay := time.Duration(1000/pps) * time.Millisecond
+
+ for i := 0; i < maxSrcPorts*maxIters; i++ {
+ srcPort := baseSrcPort + i%maxSrcPorts
+ probe := Probe{srcPort: srcPort, ttl: ttl}
+ now := uint32(time.Now().UnixNano()/(1000*1000)) & 0x00ffffff
+ seqNum := ((uint32(ttl) & 0xff) << 24) | (now & 0x00ffffff)
+ packet := makeTcpHeader(af, srcAddr, dstAddr, srcPort, dstPort, seqNum)
+
+ switch {
+ case af == "ip4":
+ var sockaddr [4]byte
+ copy(sockaddr[:], dstAddr.To4())
+ err = syscall.Sendto(sendSocket, packet, 0, &syscall.SockaddrInet4{Port: 0, Addr: sockaddr})
+ case af == "ip6":
+ var sockaddr [16]byte
+ copy(sockaddr[:], dstAddr.To16())
+ // with IPv6 the dst port must be zero, otherwise the syscall fails
+ err = syscall.Sendto(sendSocket, packet, 0, &syscall.SockaddrInet6{Port: 0, Addr: sockaddr})
+ }
+
+ if err != nil {
+ glog.Errorf("Error sending packet %s\n", err)
+ break
+ }
+
+ // grab time before blocking on send channel
+ start := time.Now()
+ select {
+ case out <- probe:
+ end := time.Now()
+ jitter := time.Duration(((rand.Float64()-0.5)/20)*1000/float64(pps)) * time.Millisecond
+ if end.Sub(start) < delay+jitter {
+ time.Sleep(delay + jitter - (end.Sub(start)))
+ }
+ case <-done:
+ glog.V(2).Infof("Sender for ttl %d exiting prematurely\n", ttl)
+ return
+ }
+ }
+ glog.V(2).Infoln("Sender done")
+ }()
+
+ return out, nil
+}
+
+//
+// Normalize rcvd by send count to get the hit rate
+//
+func normalizeRcvd(sent, rcvd []int) ([]float64, error) {
+ if len(rcvd) != len(sent) {
+ return nil, fmt.Errorf("Length mismatch for sent/rcvd")
+ }
+
+ result := make([]float64, len(rcvd))
+ for i := range sent {
+ result[i] = float64(rcvd[i]) / float64(sent[i])
+ }
+
+ return result, nil
+}
+
+//
+// Detect a pattern where all samples after
+// a sample [i] have lower hit rate than [i]
+// this normally indicates a breaking point after [i]
+//
+func isLossy(hitRates []float64) bool {
+ var found bool
+ var segLen int
+ for i := 0; i < len(hitRates)-1 && !found; i++ {
+ found = true
+ segLen = len(hitRates) - i
+ for j := i + 1; j < len(hitRates); j++ {
+ if hitRates[j] >= hitRates[i] {
+ found = false
+ break
+ }
+ }
+ }
+ // do not alarm on single-hop segment
+ if segLen > 2 {
+ return found
+ }
+ return false
+}
+
+//
+// print the paths reported as having losses
+//
+func printLossyPaths(sent, rcvd map[int] /* src port */ []int, hops map[int] /* src port */ []string, maxColumns, maxTtl int) {
+ var allPorts []int
+
+ for srcPort, _ := range hops {
+ allPorts = append(allPorts, srcPort)
+ }
+
+ // split in multiple tables to fit the columns on the screen
+ for i := 0; i < len(allPorts)/maxColumns; i++ {
+ data := make([][]string, maxTtl)
+ table := tablewriter.NewWriter(os.Stdout)
+ header := []string{"TTL"}
+
+ maxOffset := (i + 1) * maxColumns
+ if maxOffset > len(allPorts) {
+ maxOffset = len(allPorts)
+ }
+
+ for _, srcPort := range allPorts[i*maxColumns : maxOffset] {
+ header = append(header, fmt.Sprintf("port: %d", srcPort), fmt.Sprintf("sent/rcvd"))
+ }
+
+ table.SetHeader(header)
+
+ for ttl := 0; ttl < maxTtl-1; ttl++ {
+ data[ttl] = make([]string, 2*(maxOffset-i*maxColumns)+1)
+ data[ttl][0] = fmt.Sprintf("%d", ttl+1)
+ for j, srcPort := range allPorts[i*maxColumns : maxOffset] {
+ data[ttl][2*j+1] = hops[srcPort][ttl]
+ data[ttl][2*j+2] = fmt.Sprintf("%02d/%02d", sent[srcPort][ttl], rcvd[srcPort][ttl])
+ }
+ }
+
+ for _, v := range data {
+ table.Append(v)
+ }
+
+ table.Render()
+ fmt.Fprintf(os.Stdout, "\n")
+ }
+}
+
+//
+// Define a JSON report from go/fbtracert
+//
+type Report struct {
+ // The path map
+ Paths map[string] /* srcPort */ []string /* path hops */
+ // Probe count sent per source port/hop name
+ Sent map[string][]int
+ // Probe count received per source port/hop name
+ Rcvd map[string][]int
+}
+
+func newReport() (report Report) {
+ report.Paths = make(map[string][]string)
+ report.Sent = make(map[string][]int)
+ report.Rcvd = make(map[string][]int)
+
+ return report
+}
+
+//
+// Raw Json output for external program to analyze
+//
+func printLossyPathsJson(sent, rcvd map[int] /* src port */ []int, hops map[int] /* src port */ []string, maxTtl int) {
+ var report = newReport()
+
+ for srcPort, path := range hops {
+ report.Paths[fmt.Sprintf("%d", srcPort)] = path
+ report.Sent[fmt.Sprintf("%d", srcPort)] = sent[srcPort]
+ report.Rcvd[fmt.Sprintf("%d", srcPort)] = rcvd[srcPort]
+ }
+
+ b, err := json.MarshalIndent(report, "", "\t")
+ if err != nil {
+ glog.Errorf("Could not generate JSON %s", err)
+ return
+ }
+ fmt.Fprintf(os.Stdout, "%s\n", b)
+}
+
+func main() {
+ runtime.GOMAXPROCS(runtime.NumCPU())
+
+ flag.Parse()
+ target := flag.Arg(0)
+
+ var probes []chan interface{}
+
+ numIters := int(*maxTime * *probeRate / *maxSrcPorts)
+
+ if numIters <= 1 {
+ fmt.Fprintf(os.Stderr, "Number of iterations too low, increase probe rate / run time or decrease src port range...\n")
+ return
+ }
+
+ source, err := getSourceAddr(*addrFamily, *srcAddr)
+
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Could not identify a source address to trace from\n")
+ return
+ }
+
+ fmt.Fprintf(os.Stderr, "Starting fbtracert with %d probes per second/ttl, base src port %d and with the port span of %d\n", *probeRate, *baseSrcPort, *maxSrcPorts)
+ fmt.Fprintf(os.Stderr, "Use '-logtostderr=true' cmd line option to see GLOG output\n")
+
+ // this will catch senders quitting - we have one sender per ttl
+ senderDone := make([]chan struct{}, *maxTtl)
+ for ttl := *minTtl; ttl <= *maxTtl; ttl++ {
+ senderDone[ttl-1] = make(chan struct{})
+ c, err := Sender(senderDone[ttl-1], source, *addrFamily, target, *targetPort, *baseSrcPort, *maxSrcPorts, numIters, ttl, *probeRate, *tosValue)
+ if err != nil {
+ glog.Fatalf("Failed to start sender for ttl %d, %s\n", ttl, err)
+ return
+ }
+ probes = append(probes, c)
+ }
+
+ // channel to tell receivers to stop
+ recvDone := make(chan struct{})
+
+ // collect icmp unreachable messages for our probes
+ icmpResp, err := IcmpReceiver(recvDone, *addrFamily)
+ if err != nil {
+ return
+ }
+
+ // collect TCP RST's from the target
+ targetAddr, err := resolveName(target, *addrFamily)
+ tcpResp, err := TcpReceiver(recvDone, *addrFamily, targetAddr.String(), *baseSrcPort, *baseSrcPort+*maxSrcPorts, *targetPort, *maxTtl)
+ if err != nil {
+ return
+ }
+
+ // add DNS name resolvers to the mix
+ var resolved []chan interface{}
+ unresolved := merge(tcpResp, icmpResp)
+
+ for i := 0; i < *numResolvers; i++ {
+ c, err := Resolver(unresolved)
+ if err != nil {
+ return
+ }
+ resolved = append(resolved, c)
+ }
+
+ // maps that store various counters per source port/ttl
+ // e..g sent, for every soruce port, contains vector
+ // of sent packets for each TTL
+ sent := make(map[int] /*src Port */ []int /* pkts sent */)
+ rcvd := make(map[int] /*src Port */ []int /* pkts rcvd */)
+ hops := make(map[int] /*src Port */ []string /* hop name */)
+
+ for srcPort := *baseSrcPort; srcPort < *baseSrcPort+*maxSrcPorts; srcPort++ {
+ sent[srcPort] = make([]int, *maxTtl)
+ rcvd[srcPort] = make([]int, *maxTtl)
+ hops[srcPort] = make([]string, *maxTtl)
+ //hops[srcPort][*maxTtl-1] = target
+
+ for i := 0; i < *maxTtl; i++ {
+ hops[srcPort][i] = "?"
+ }
+ }
+
+ // collect all probe specs emitted by senders
+ // once all senders terminate, tell receivers to quit too
+ go func() {
+ for val := range merge(probes...) {
+ probe := val.(Probe)
+ sent[probe.srcPort][probe.ttl-1]++
+ }
+ glog.V(2).Infoln("All senders finished!")
+ // give receivers time to catch up on in-flight data
+ time.Sleep(2 * time.Second)
+ // tell receivers to stop receiving
+ close(recvDone)
+ }()
+
+ // this store DNS names of all nodes that ever replied to us
+ var names []string
+
+ // src ports that changed their paths in process of tracing
+ var flappedPorts map[int]bool = make(map[int]bool)
+
+ lastClosed := *maxTtl
+ for val := range merge(resolved...) {
+ switch val.(type) {
+ case IcmpResponse:
+ resp := val.(IcmpResponse)
+ rcvd[resp.srcPort][resp.ttl-1]++
+ currName := hops[resp.srcPort][resp.ttl-1]
+ if currName != "?" && currName != resp.fromName {
+ glog.V(2).Infof("%d: Source port %d flapped at ttl %d from: %s to %s\n", time.Now().UnixNano()/(1000*1000), resp.srcPort, resp.ttl, currName, resp.fromName)
+ flappedPorts[resp.srcPort] = true
+ }
+ hops[resp.srcPort][resp.ttl-1] = resp.fromName
+ // accumulate all names for processing later
+ // XXX: we may have duplicates, which is OK,
+ // but not very efficient
+ names = append(names, resp.fromName)
+ case TcpResponse:
+ resp := val.(TcpResponse)
+ // stop all senders sending above this ttl, since they are not needed
+ // XXX: this is not always optimal, i.e. we may receive TCP RST for
+ // a port mapped to a short WAN path, and it would tell us to terminate
+ // probing at higher TTL, thus cutting visibility on "long" paths
+ // however, this mostly concerned that last few hops...
+ for i := resp.ttl; i < lastClosed; i++ {
+ close(senderDone[i])
+ }
+ // update the last closed ttl, so we don't double-close the channels
+ if resp.ttl < lastClosed {
+ lastClosed = resp.ttl
+ }
+ rcvd[resp.srcPort][resp.ttl-1]++
+ hops[resp.srcPort][resp.ttl-1] = target
+ }
+ }
+
+ for srcPort, hopVector := range hops {
+ for i := range hopVector {
+ // truncate lists once we hit the target name
+ if hopVector[i] == target && i < *maxTtl-1 {
+ sent[srcPort] = sent[srcPort][:i+1]
+ rcvd[srcPort] = rcvd[srcPort][:i+1]
+ hopVector = hopVector[:i+1]
+ break
+ }
+ }
+ }
+
+ if len(flappedPorts) > 0 {
+ glog.Infof("A total of %d ports out of %d changed their paths while tracing\n", len(flappedPorts), *maxSrcPorts)
+ }
+
+ lossyPathSent := make(map[int] /*src port */ []int)
+ lossyPathRcvd := make(map[int] /* src port */ []int)
+ lossyPathHops := make(map[int] /*src port*/ []string)
+
+ // process the accumulated data, find and output lossy paths
+ for port, sentVector := range sent {
+ if flappedPorts[port] {
+ continue
+ }
+ if rcvdVector, ok := rcvd[port]; ok {
+ norm, err := normalizeRcvd(sentVector, rcvdVector)
+
+ if err != nil {
+ glog.Errorf("Could not normalize %v / %v", rcvdVector, sentVector)
+ continue
+ }
+
+ if isLossy(norm) || *showAll {
+ hosts := make([]string, len(norm))
+ for i := range norm {
+ hosts[i] = hops[port][i]
+ }
+ lossyPathSent[port] = sentVector
+ lossyPathRcvd[port] = rcvdVector
+ lossyPathHops[port] = hosts
+ }
+ } else {
+ glog.Errorf("No responses received for port %d", port)
+ }
+ }
+
+ if len(lossyPathHops) > 0 {
+ if *jsonOutput {
+ printLossyPathsJson(lossyPathSent, lossyPathRcvd, lossyPathHops, lastClosed+1)
+ } else {
+ printLossyPaths(lossyPathSent, lossyPathRcvd, lossyPathHops, *maxColumns, lastClosed+1)
+ }
+ return
+ }
+ glog.Infof("Did not find any faulty paths\n")
+}
diff --git a/tcp.go b/tcp.go
new file mode 100644
index 0000000..9514bea
--- /dev/null
+++ b/tcp.go
@@ -0,0 +1,172 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+
+package main
+
+import (
+ "bytes"
+ "encoding/binary"
+ "net"
+)
+
+//
+// TCP flags
+//
+const (
+ FIN = 1 << 0
+ SYN = 1 << 1
+ RST = 1 << 2
+ PSH = 1 << 3
+ ACK = 1 << 4
+ URG = 1 << 5
+)
+
+//
+// Define the TCP header struct
+//
+type TcpHeader struct {
+ Source uint16
+ Destination uint16
+ SeqNum uint32
+ AckNum uint32
+ DataOffset uint8 // 4 bits
+ Reserved uint8 // 6 bits
+ Flags uint8 // 6 bits
+ Window uint16
+ Checksum uint16
+ Urgent uint16
+}
+
+//
+// create & serialize a TCP header, compute and fill in the checksum (v4/v6)
+//
+func makeTcpHeader(af string, srcAddr, dstAddr *net.IP, srcPort, dstPort int, ts uint32) []byte {
+ tcpHeader := TcpHeader{
+ Source: uint16(srcPort), // Random ephemeral port
+ Destination: uint16(dstPort),
+ SeqNum: ts,
+ AckNum: 0,
+ DataOffset: 5, // 4 bits
+ Reserved: 0, // 6 bits
+ Flags: SYN, // 6 bits (000010, SYN bit set)
+ Window: 0xffff, // max window
+ Checksum: 0,
+ Urgent: 0,
+ }
+
+ // temporary bytes for checksum
+ bytes := tcpHeader.Serialize()
+ tcpHeader.Checksum = tcpChecksum(af, bytes, srcAddr, dstAddr)
+
+ return tcpHeader.Serialize()
+}
+
+// Parse packet into TcpHeader structure
+func parseTcpHeader(data []byte) *TcpHeader {
+ var tcp TcpHeader
+
+ r := bytes.NewReader(data)
+
+ binary.Read(r, binary.BigEndian, &tcp.Source)
+ binary.Read(r, binary.BigEndian, &tcp.Destination)
+ binary.Read(r, binary.BigEndian, &tcp.SeqNum)
+ binary.Read(r, binary.BigEndian, &tcp.AckNum)
+
+ // read the flags from a 16-bit field
+ var field uint16
+
+ binary.Read(r, binary.BigEndian, &field)
+ // most significant 4 bits
+ tcp.DataOffset = byte(field >> 12)
+ // reserved part - 6 bits
+ tcp.Reserved = byte(field >> 6 & 0x3f)
+ // flags - 6 bits
+ tcp.Flags = byte(field & 0x3f)
+
+ binary.Read(r, binary.BigEndian, &tcp.Window)
+ binary.Read(r, binary.BigEndian, &tcp.Checksum)
+ binary.Read(r, binary.BigEndian, &tcp.Urgent)
+
+ return &tcp
+}
+
+//
+// Emit raw bytes for the header
+//
+func (tcp *TcpHeader) Serialize() []byte {
+
+ buf := new(bytes.Buffer)
+ binary.Write(buf, binary.BigEndian, tcp.Source)
+ binary.Write(buf, binary.BigEndian, tcp.Destination)
+ binary.Write(buf, binary.BigEndian, tcp.SeqNum)
+ binary.Write(buf, binary.BigEndian, tcp.AckNum)
+
+ var mix uint16
+ mix = uint16(tcp.DataOffset)<<12 |
+ uint16(tcp.Reserved&0x3f)<<9 |
+ uint16(tcp.Flags&0x3f)
+ binary.Write(buf, binary.BigEndian, mix)
+
+ binary.Write(buf, binary.BigEndian, tcp.Window)
+ binary.Write(buf, binary.BigEndian, tcp.Checksum)
+ binary.Write(buf, binary.BigEndian, tcp.Urgent)
+
+ out := buf.Bytes()
+
+ return out
+}
+
+//
+// TCP Checksum, works for both v4 and v6 IP addresses
+//
+func tcpChecksum(af string, data []byte, srcip, dstip *net.IP) uint16 {
+
+ // the pseudo header used for TCP c-sum computation
+ var pseudoHeader []byte
+
+ pseudoHeader = append(pseudoHeader, *srcip...)
+ pseudoHeader = append(pseudoHeader, *dstip...)
+ switch {
+ case af == "ip4":
+ pseudoHeader = append(pseudoHeader, []byte{
+ 0,
+ 6, // protocol number for TCP
+ 0, byte(len(data)), // TCP length (16 bits), w/o pseudoheader
+ }...)
+ case af == "ip6":
+ pseudoHeader = append(pseudoHeader, []byte{
+ 0, 0, 0, byte(len(data)), // TCP length (32 bits), w/0 pseudoheader
+ 0, 0, 0,
+ 6, // protocol number for TCP
+ }...)
+ }
+
+ body := make([]byte, 0, len(pseudoHeader)+len(data))
+ body = append(body, pseudoHeader...)
+ body = append(body, data...)
+
+ bodyLen := len(body)
+
+ var word uint16
+ var csum uint32
+
+ for i := 0; i+1 < bodyLen; i += 2 {
+ word = uint16(body[i])<<8 | uint16(body[i+1])
+ csum += uint32(word)
+ }
+
+ if bodyLen%2 != 0 {
+ csum += uint32(body[len(body)-1])
+ }
+
+ csum = (csum >> 16) + (csum & 0xffff)
+ csum = csum + (csum >> 16)
+
+ return uint16(^csum)
+}
diff --git a/util.go b/util.go
new file mode 100644
index 0000000..3fa3fb7
--- /dev/null
+++ b/util.go
@@ -0,0 +1,74 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+
+package main
+
+import (
+ "sync"
+)
+
+//
+// Filter data on input channel
+//
+func filter(f func(interface{}) bool, in chan interface{}) chan interface{} {
+ out := make(chan interface{})
+
+ go func() {
+ for val := range in {
+ if f(val) {
+ out <- val
+ }
+ }
+ }()
+
+ return out
+}
+
+//
+// fork input channel into two, copy data
+//
+func fork(in <-chan interface{}) (out1, out2 chan interface{}) {
+ out1, out2 = make(chan interface{}), make(chan interface{})
+
+ go func() {
+ for val := range in {
+ out1 <- val
+ out2 <- val
+ }
+ }()
+
+ return
+}
+
+//
+// Merge data from multiple channels into one
+//
+func merge(cs ...chan interface{}) chan interface{} {
+ var wg sync.WaitGroup
+ out := make(chan interface{})
+
+ output := func(c <-chan interface{}) {
+ defer wg.Done()
+ for val := range c {
+ out <- val
+ }
+ }
+
+ wg.Add(len(cs))
+ for _, ch := range cs {
+ go output(ch)
+ }
+
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+
+ return out
+}