diff options
author | Aijay Adams <aijay@fb.com> | 2016-02-05 11:26:38 -0800 |
---|---|---|
committer | Aijay Adams <aijay@fb.com> | 2016-02-05 11:26:38 -0800 |
commit | 2ceefb504234069d43bf89063703b55d83be5ea4 (patch) | |
tree | 5331eaafb79bb8501b503146400faeed122aaa5f | |
download | fbtracert-2ceefb504234069d43bf89063703b55d83be5ea4.zip fbtracert-2ceefb504234069d43bf89063703b55d83be5ea4.tar.gz fbtracert-2ceefb504234069d43bf89063703b55d83be5ea4.tar.bz2 |
adding project to github
-rw-r--r-- | LICENSE | 30 | ||||
-rw-r--r-- | PATENTS | 34 | ||||
-rw-r--r-- | README.md | 68 | ||||
-rw-r--r-- | TARGETS | 11 | ||||
-rw-r--r-- | main.go | 789 | ||||
-rw-r--r-- | tcp.go | 172 | ||||
-rw-r--r-- | util.go | 74 |
7 files changed, 1178 insertions, 0 deletions
@@ -0,0 +1,30 @@ +BSD License + +For fbtracert software + +Copyright (c) 2016-present, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. @@ -0,0 +1,34 @@ + +Additional Grant of Patent Rights Version 2 + +"Software" means the fbtracert software distributed by Facebook, Inc. + +Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software +("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable +(subject to the termination provision below) license under any Necessary +Claims, to make, have made, use, sell, offer to sell, import, and otherwise +transfer the Software. For avoidance of doubt, no license is granted under +Facebookâs rights in any patent claims that are infringed by (i) modifications +to the Software made by you or any third party or (ii) the Software in +combination with any software or other technology. + +The license granted hereunder will terminate, automatically and without notice, +if you (or any of your subsidiaries, corporate affiliates or agents) initiate +directly or indirectly, or take a direct financial interest in, any Patent +Assertion: (i) against Facebook or any of its subsidiaries or corporate +affiliates, (ii) against any party if such Patent Assertion arises in whole or +in part from any software, technology, product or service of Facebook or any of +its subsidiaries or corporate affiliates, or (iii) against any party relating +to the Software. Notwithstanding the foregoing, if Facebook or any of its +subsidiaries or corporate affiliates files a lawsuit alleging patent +infringement against you in the first instance, and you respond by filing a +patent infringement counterclaim in that lawsuit against that party that is +unrelated to the Software, the license granted hereunder will not terminate +under section (i) of this paragraph due to such counterclaim. + +A "Necessary Claim" is a claim of a patent owned by Facebook that is +necessarily infringed by the Software standing alone. + +A "Patent Assertion" is any lawsuit or other action alleging direct, indirect, +or contributory infringement or inducement to infringe any patent, including a +cross-claim or counterclaim. diff --git a/README.md b/README.md new file mode 100644 index 0000000..550924e --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# fbtracert +(pronounced ef-BEE-tracerTEE) + +## Installing + +Requires golang >= 1.5.1 + +go get -d github.com/facebook/fbtracert +go install github.com/facebook/fbtracert +$GOPATH/bin/fbtracert --help + +## Full documentation + +### Fault isolation in ECMP networks via multi-port traceroute + +This tools attempts to identify the network component that drops packets by employing the traceroute logic +that explores multiple parallel paths. The following describes the main goroutines and their logic. + +### Sender + +We start this go-routine for every TTL that we expect on path to the destination. With start with some max TTL +value, and then stop all senders that have TTL above the distance to the target. For every TTL, the sender +loops over a range of source ports, and emits a TCP SYN packet towards the destination with the set target port. +The Sender also emits "Probe" objects on a special channel so that the analysis part may know what packets +have been injected in the network (srcPort and Ttl). + +Notice how encode the sending time-stamp and the ttl in the ISN of the TCP SYN packet. This allows for measuring +the probe RTT, and recoving the TTL of the response. Just like regular traceroute, we expect the network to return +us either ICMP Unreachable message (TTL exceeded) or TCP RST message (when we hit the ultimate hop) + +The Sender thread stops once it completes the requested number of iterations over the source port range. + +### ICMP Receiver + +We run only one ICMP receiver goroutine: it is responsible for receiving the ICMP Unrechable messages and recovering +the original probe information from them. We only use the first 8 bytes of the TCP packet embedded into ICMP Unreachable +message, though in IPv6 case we could have more. This is sufficient anyways to recover the TTL and the timestamp of the +original probe. + +Upon reception of an ICMP message, we build IcmpResponse struct and forward it to the input work queue of the Resolver +goroutine ensemble. This is needed to resolve the IP address of the node that sent us the response into its DNS name. + +### TCP Receiver + +Similar to IcmpReceiver in logic, but this goroutine intercepts to TCP RST/ACK packets sent by the ultimate destinations of +our probes. These responses are processed and have TTL extracted, and then forwarded to the Resolver thread. We can +work both with close ports (RST expect) and open ports (ACK expected). Be careful and make sure there are no open +connections from your machine to the target on the port you are probing - this may confuse the hell of of TcpReceiver + +### Resolver + +This goroutine listens to the incoming Icmp/Tcp Response messages and resolves the names embedded into the Icmp responses. +We start lots of those so we can handle concurrent name resolution. The resolver is effectively a transformation function +on the stream of messages. + +### Main goroutine + +This one is responsible for starting all other goroutines, and then assembling their output. It is also responsible for +terminated the unnecessary Senders. This is done by seeing what TTL hops actually return TCP RST messages; once we receive +TCP RST for TTL x, we can safely stop all senders for TTL > x + +The main loop expect to receive all "Probes" from the channels fed by the Sender goroutines. The Sender will close its +output channels once its done sending. This serves as an indicator that all sending has completed. After that, we +wait a few more seconds all tell the TcpReceiver and IcmpReceiver to stop by closing their "signal" channel. + +After that, we process all data that the Receivers have fed to the main thread. We need to find the source ports +whos' paths show consistent packet loss after a given hop N. We then output these paths as the "suspects" along with the +counts of sent/received packets per hop. @@ -0,0 +1,11 @@ +go_binary( + name = "fbtracert", + srcs = util.files("**/*.go"), + go_external_deps = [ + ("github.com/golang/glog", + "d1c4472bf2efd3826f2b5bdcc02d8416798d678c"), + ("github.com/olekukonko/tablewriter", + "bc39950e081b457853031334b3c8b95cdfe428ba"), + ], + go_version = "1.5.1", +) @@ -0,0 +1,789 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "github.com/golang/glog" + "github.com/olekukonko/tablewriter" + "math/rand" + "net" + "os" + "runtime" + "syscall" + "time" +) + +// +// Command line flags +// +var maxTtl *int = flag.Int("maxTtl", 30, "The maximum ttl to use") +var minTtl *int = flag.Int("minTtl", 1, "The ttl to start at") +var maxSrcPorts *int = flag.Int("maxSrcPorts", 256, "The maximum number of source ports to use") +var maxTime *int = flag.Int("maxTime", 60, "The time to run the process for") +var targetPort *int = flag.Int("targetPort", 22, "The target port to trace to") +var probeRate *int = flag.Int("probeRate", 96, "The probe rate per ttl layer") +var tosValue *int = flag.Int("tosValue", 140, "The TOS/TC to use in probes") +var numResolvers *int = flag.Int("numResolvers", 32, "The number of DNS resolver goroutines") +var addrFamily *string = flag.String("addrFamily", "ip4", "The address family (ip4/ip6) to use for testing") +var maxColumns *int = flag.Int("maxColumns", 4, "Maximum number of columns in report tables") +var showAll *bool = flag.Bool("showAll", false, "Show all paths, regardless of loss detection") +var srcAddr *string = flag.String("srcAddr", "", "The source address for pings, default to auto-discover") +var jsonOutput *bool = flag.Bool("jsonOutput", false, "Output raw JSON data") +var baseSrcPort *int = flag.Int("baseSrcPort", 32768, "The base source port to start probing from") + +// +// Discover the source address for pinging +// +func getSourceAddr(af string, srcAddr string) (*net.IP, error) { + + if srcAddr != "" { + addr, err := net.ResolveIPAddr(*addrFamily, srcAddr) + if err != nil { + return nil, err + } + return &addr.IP, nil + } + + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, err + } + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if (ipnet.IP.To4() != nil && af == "ip4") || (ipnet.IP.To4() == nil && af == "ip6") { + return &ipnet.IP, nil + } + } + } + return nil, fmt.Errorf("Could not find a source address in af %s", af) +} + +// +// Resolve given hostname/address in the given address family +// +func resolveName(dest string, af string) (*net.IP, error) { + addr, err := net.ResolveIPAddr(af, dest) + return &addr.IP, err +} + +// +// Probe specification, emitted by sender +// +type Probe struct { + srcPort int + ttl int +} + +// +// Emitted by IcmpReceiver +// +type IcmpResponse struct { + Probe + fromAddr *net.IP + fromName string + rtt uint32 +} + +// +// Emitted by TcpReceiver +// +type TcpResponse struct { + Probe + rtt uint32 +} + +// +// Feed on TCP RST messages we receive from the end host; we use lots of parameters to check if the incoming packet +// is actually a response to our probe. We create TcpResponse structs and emit them on the output channel +// +func TcpReceiver(done <-chan struct{}, af string, targetAddr string, probePortStart, probePortEnd, targetPort, maxTtl int) (chan interface{}, error) { + var recvSocket int + var err error + var ipHdrSize int + + glog.V(2).Infoln("TcpReceiver starting...") + + // create the socket + switch { + case af == "ip4": + recvSocket, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_TCP) + // IPv4 header is always included with the ipv4 raw socket receive + ipHdrSize = 20 + case af == "ip6": + recvSocket, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_TCP) + // no IPv6 header present on TCP packets received on the raw socket + ipHdrSize = 0 + default: + return nil, fmt.Errorf("Unknown address family supplied") + } + + if err != nil { + return nil, err + } + + // we'll be writing the TcpResponse structs to this channel + out := make(chan interface{}) + + // IP + TCP header, this channel is fed from the socket + recv := make(chan TcpResponse) + go func() { + const tcpHdrSize int = 20 + packet := make([]byte, ipHdrSize+tcpHdrSize) + + for { + n, from, err := syscall.Recvfrom(recvSocket, packet, 0) + // parent has closed the socket likely + if err != nil { + break + } + + // IP + TCP header size + if n < ipHdrSize+tcpHdrSize { + continue + } + + // is that from the target port we expect? + tcpHdr := parseTcpHeader(packet[ipHdrSize:n]) + if int(tcpHdr.Source) != targetPort { + continue + } + + // is that TCP RST TCP ACK? + if tcpHdr.Flags&RST != RST && tcpHdr.Flags&ACK != ACK { + continue + } + + var fromAddrStr string + + switch { + case af == "ip4": + fromAddrStr = net.IP((from.(*syscall.SockaddrInet4).Addr)[:]).String() + case af == "ip6": + fromAddrStr = net.IP((from.(*syscall.SockaddrInet6).Addr)[:]).String() + } + + // is that from our target? + if fromAddrStr != targetAddr { + continue + } + + // we extract the original TTL and timestamp from the ack number + ackNum := tcpHdr.AckNum - 1 + ttl := int(ackNum >> 24) + + if ttl > maxTtl || ttl < 1 { + continue + } + + // recover the time-stamp from the ack # + ts := ackNum & 0x00ffffff + now := uint32(time.Now().UnixNano()/(1000*1000)) & 0x00ffffff + + // received timestamp is higher than local time; it is possible + // that ts == now, since our clock resolution is coarse + if ts > now { + continue + } + + recv <- TcpResponse{Probe: Probe{srcPort: int(tcpHdr.Destination), ttl: ttl}, rtt: now - ts} + } + }() + + go func() { + defer syscall.Close(recvSocket) + defer close(out) + for { + select { + case response := <-recv: + out <- response + case <-done: + glog.V(2).Infoln("TcpReceiver terminating...") + return + } + } + }() + + return out, nil +} + +// +// This runs on its own collecting Icmp responses until its explicitly told to stop +// +func IcmpReceiver(done <-chan struct{}, af string) (chan interface{}, error) { + var recvSocket int + var err error + var outerIpHdrSize int + var innerIpHdrSize int + var icmpMsgType byte + + const ( + icmpHdrSize int = 8 + tcpHdrSize int = 8 + ) + + switch { + case af == "ip4": + recvSocket, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_ICMP) + // IPv4 raw socket always prepend the transport IPv4 header + outerIpHdrSize = 20 + // the size of the original IPv4 header that was on the TCP packet sent out + innerIpHdrSize = 20 + // hardcoded: time to live exceeded + icmpMsgType = 11 + case af == "ip6": + recvSocket, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_ICMPV6) + // IPv6 raw socket does not prepend the original transport IPv6 header + outerIpHdrSize = 0 + // this is the size of IPv6 header of the original TCP packet we used in the probes + innerIpHdrSize = 40 + // time to live exceeded + icmpMsgType = 3 + } + + if err != nil { + return nil, err + } + + glog.V(2).Infoln("IcmpReceiver is starting...") + + recv := make(chan interface{}) + + go func() { + // TODO: remove hardcode; 20 bytes for IP header, 8 bytes for ICMP header, 8 bytes for TCP header + packet := make([]byte, outerIpHdrSize+icmpHdrSize+innerIpHdrSize+tcpHdrSize) + for { + n, from, err := syscall.Recvfrom(recvSocket, packet, 0) + if err != nil { + break + } + // extract the 8 bytes of the original TCP header + if n < outerIpHdrSize+icmpHdrSize+innerIpHdrSize+tcpHdrSize { + continue + } + // not ttl exceeded + if packet[outerIpHdrSize] != icmpMsgType || packet[outerIpHdrSize+1] != 0 { + continue + } + glog.V(4).Infof("Received icmp response message %d: %x\n", len(packet), packet) + tcpHdr := parseTcpHeader(packet[outerIpHdrSize+icmpHdrSize+innerIpHdrSize : n]) + + var fromAddr net.IP + + switch { + case af == "ip4": + fromAddr = net.IP(from.(*syscall.SockaddrInet4).Addr[:]) + case af == "ip6": + fromAddr = net.IP(from.(*syscall.SockaddrInet6).Addr[:]) + } + + // extract ttl bits from the ISN + ttl := int(tcpHdr.SeqNum) >> 24 + + // extract the timestamp from the ISN + ts := tcpHdr.SeqNum & 0x00ffffff + // scale the current time + now := uint32(time.Now().UnixNano()/(1000*1000)) & 0x00ffffff + recv <- IcmpResponse{Probe: Probe{srcPort: int(tcpHdr.Source), ttl: ttl}, fromAddr: &fromAddr, rtt: now - ts} + } + }() + + out := make(chan interface{}) + go func() { + defer syscall.Close(recvSocket) + defer close(out) + for { + select { + // read Icmp struct + case response := <-recv: + out <- response + case <-done: + glog.V(2).Infoln("IcmpReceiver done") + return + } + } + }() + + return out, nil +} + +// +// Resolve names in incoming IcmpResponse messages +// Everything else is passed through as is +// +func Resolver(input chan interface{}) (chan interface{}, error) { + out := make(chan interface{}) + go func() { + defer close(out) + + for val := range input { + switch val.(type) { + case IcmpResponse: + resp := val.(IcmpResponse) + names, err := net.LookupAddr(resp.fromAddr.String()) + if err != nil { + resp.fromName = "?" + } else { + resp.fromName = names[0] + } + out <- resp + default: + out <- val + } + } + }() + return out, nil +} + +// +// Generate TCP SYN packet probes with given TTL at given packet per second rate +// The packet descriptions are published to the output channel as Probe messages +// As a side effect, the packets are injected into raw socket +// +func Sender(done <-chan struct{}, srcAddr *net.IP, af, dest string, dstPort, baseSrcPort, maxSrcPorts, maxIters, ttl, pps, tos int) (chan interface{}, error) { + var err error + + out := make(chan interface{}) + + glog.V(2).Infof("Sender for ttl %d starting\n", ttl) + + dstAddr, err := resolveName(dest, af) + if err != nil { + return nil, err + } + + var sendSocket int + + // create the socket + switch { + case af == "ip4": + sendSocket, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_TCP) + case af == "ip6": + sendSocket, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_TCP) + } + + if err != nil { + return nil, err + } + + // bind the socket + switch { + case af == "ip4": + var sockaddr [4]byte + copy(sockaddr[:], srcAddr.To4()) + err = syscall.Bind(sendSocket, &syscall.SockaddrInet4{Port: 0, Addr: sockaddr}) + case af == "ip6": + var sockaddr [16]byte + copy(sockaddr[:], srcAddr.To16()) + err = syscall.Bind(sendSocket, &syscall.SockaddrInet6{Port: 0, Addr: sockaddr}) + } + + if err != nil { + return nil, err + } + + // set the ttl on the socket + switch { + case af == "ip4": + err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IP, syscall.IP_TTL, ttl) + case af == "ip6": + err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IPV6, syscall.IPV6_UNICAST_HOPS, ttl) + } + + if err != nil { + return nil, err + } + + // set the tos on the socket + switch { + case af == "ip4": + err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IP, syscall.IP_TOS, tos) + case af == "ip6": + err = syscall.SetsockoptInt(sendSocket, syscall.IPPROTO_IPV6, syscall.IPV6_TCLASS, tos) + } + + if err != nil { + return nil, err + } + + // spawn a new goroutine and return the channel to be used for reading + go func() { + defer syscall.Close(sendSocket) + defer close(out) + + delay := time.Duration(1000/pps) * time.Millisecond + + for i := 0; i < maxSrcPorts*maxIters; i++ { + srcPort := baseSrcPort + i%maxSrcPorts + probe := Probe{srcPort: srcPort, ttl: ttl} + now := uint32(time.Now().UnixNano()/(1000*1000)) & 0x00ffffff + seqNum := ((uint32(ttl) & 0xff) << 24) | (now & 0x00ffffff) + packet := makeTcpHeader(af, srcAddr, dstAddr, srcPort, dstPort, seqNum) + + switch { + case af == "ip4": + var sockaddr [4]byte + copy(sockaddr[:], dstAddr.To4()) + err = syscall.Sendto(sendSocket, packet, 0, &syscall.SockaddrInet4{Port: 0, Addr: sockaddr}) + case af == "ip6": + var sockaddr [16]byte + copy(sockaddr[:], dstAddr.To16()) + // with IPv6 the dst port must be zero, otherwise the syscall fails + err = syscall.Sendto(sendSocket, packet, 0, &syscall.SockaddrInet6{Port: 0, Addr: sockaddr}) + } + + if err != nil { + glog.Errorf("Error sending packet %s\n", err) + break + } + + // grab time before blocking on send channel + start := time.Now() + select { + case out <- probe: + end := time.Now() + jitter := time.Duration(((rand.Float64()-0.5)/20)*1000/float64(pps)) * time.Millisecond + if end.Sub(start) < delay+jitter { + time.Sleep(delay + jitter - (end.Sub(start))) + } + case <-done: + glog.V(2).Infof("Sender for ttl %d exiting prematurely\n", ttl) + return + } + } + glog.V(2).Infoln("Sender done") + }() + + return out, nil +} + +// +// Normalize rcvd by send count to get the hit rate +// +func normalizeRcvd(sent, rcvd []int) ([]float64, error) { + if len(rcvd) != len(sent) { + return nil, fmt.Errorf("Length mismatch for sent/rcvd") + } + + result := make([]float64, len(rcvd)) + for i := range sent { + result[i] = float64(rcvd[i]) / float64(sent[i]) + } + + return result, nil +} + +// +// Detect a pattern where all samples after +// a sample [i] have lower hit rate than [i] +// this normally indicates a breaking point after [i] +// +func isLossy(hitRates []float64) bool { + var found bool + var segLen int + for i := 0; i < len(hitRates)-1 && !found; i++ { + found = true + segLen = len(hitRates) - i + for j := i + 1; j < len(hitRates); j++ { + if hitRates[j] >= hitRates[i] { + found = false + break + } + } + } + // do not alarm on single-hop segment + if segLen > 2 { + return found + } + return false +} + +// +// print the paths reported as having losses +// +func printLossyPaths(sent, rcvd map[int] /* src port */ []int, hops map[int] /* src port */ []string, maxColumns, maxTtl int) { + var allPorts []int + + for srcPort, _ := range hops { + allPorts = append(allPorts, srcPort) + } + + // split in multiple tables to fit the columns on the screen + for i := 0; i < len(allPorts)/maxColumns; i++ { + data := make([][]string, maxTtl) + table := tablewriter.NewWriter(os.Stdout) + header := []string{"TTL"} + + maxOffset := (i + 1) * maxColumns + if maxOffset > len(allPorts) { + maxOffset = len(allPorts) + } + + for _, srcPort := range allPorts[i*maxColumns : maxOffset] { + header = append(header, fmt.Sprintf("port: %d", srcPort), fmt.Sprintf("sent/rcvd")) + } + + table.SetHeader(header) + + for ttl := 0; ttl < maxTtl-1; ttl++ { + data[ttl] = make([]string, 2*(maxOffset-i*maxColumns)+1) + data[ttl][0] = fmt.Sprintf("%d", ttl+1) + for j, srcPort := range allPorts[i*maxColumns : maxOffset] { + data[ttl][2*j+1] = hops[srcPort][ttl] + data[ttl][2*j+2] = fmt.Sprintf("%02d/%02d", sent[srcPort][ttl], rcvd[srcPort][ttl]) + } + } + + for _, v := range data { + table.Append(v) + } + + table.Render() + fmt.Fprintf(os.Stdout, "\n") + } +} + +// +// Define a JSON report from go/fbtracert +// +type Report struct { + // The path map + Paths map[string] /* srcPort */ []string /* path hops */ + // Probe count sent per source port/hop name + Sent map[string][]int + // Probe count received per source port/hop name + Rcvd map[string][]int +} + +func newReport() (report Report) { + report.Paths = make(map[string][]string) + report.Sent = make(map[string][]int) + report.Rcvd = make(map[string][]int) + + return report +} + +// +// Raw Json output for external program to analyze +// +func printLossyPathsJson(sent, rcvd map[int] /* src port */ []int, hops map[int] /* src port */ []string, maxTtl int) { + var report = newReport() + + for srcPort, path := range hops { + report.Paths[fmt.Sprintf("%d", srcPort)] = path + report.Sent[fmt.Sprintf("%d", srcPort)] = sent[srcPort] + report.Rcvd[fmt.Sprintf("%d", srcPort)] = rcvd[srcPort] + } + + b, err := json.MarshalIndent(report, "", "\t") + if err != nil { + glog.Errorf("Could not generate JSON %s", err) + return + } + fmt.Fprintf(os.Stdout, "%s\n", b) +} + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + + flag.Parse() + target := flag.Arg(0) + + var probes []chan interface{} + + numIters := int(*maxTime * *probeRate / *maxSrcPorts) + + if numIters <= 1 { + fmt.Fprintf(os.Stderr, "Number of iterations too low, increase probe rate / run time or decrease src port range...\n") + return + } + + source, err := getSourceAddr(*addrFamily, *srcAddr) + + if err != nil { + fmt.Fprintf(os.Stderr, "Could not identify a source address to trace from\n") + return + } + + fmt.Fprintf(os.Stderr, "Starting fbtracert with %d probes per second/ttl, base src port %d and with the port span of %d\n", *probeRate, *baseSrcPort, *maxSrcPorts) + fmt.Fprintf(os.Stderr, "Use '-logtostderr=true' cmd line option to see GLOG output\n") + + // this will catch senders quitting - we have one sender per ttl + senderDone := make([]chan struct{}, *maxTtl) + for ttl := *minTtl; ttl <= *maxTtl; ttl++ { + senderDone[ttl-1] = make(chan struct{}) + c, err := Sender(senderDone[ttl-1], source, *addrFamily, target, *targetPort, *baseSrcPort, *maxSrcPorts, numIters, ttl, *probeRate, *tosValue) + if err != nil { + glog.Fatalf("Failed to start sender for ttl %d, %s\n", ttl, err) + return + } + probes = append(probes, c) + } + + // channel to tell receivers to stop + recvDone := make(chan struct{}) + + // collect icmp unreachable messages for our probes + icmpResp, err := IcmpReceiver(recvDone, *addrFamily) + if err != nil { + return + } + + // collect TCP RST's from the target + targetAddr, err := resolveName(target, *addrFamily) + tcpResp, err := TcpReceiver(recvDone, *addrFamily, targetAddr.String(), *baseSrcPort, *baseSrcPort+*maxSrcPorts, *targetPort, *maxTtl) + if err != nil { + return + } + + // add DNS name resolvers to the mix + var resolved []chan interface{} + unresolved := merge(tcpResp, icmpResp) + + for i := 0; i < *numResolvers; i++ { + c, err := Resolver(unresolved) + if err != nil { + return + } + resolved = append(resolved, c) + } + + // maps that store various counters per source port/ttl + // e..g sent, for every soruce port, contains vector + // of sent packets for each TTL + sent := make(map[int] /*src Port */ []int /* pkts sent */) + rcvd := make(map[int] /*src Port */ []int /* pkts rcvd */) + hops := make(map[int] /*src Port */ []string /* hop name */) + + for srcPort := *baseSrcPort; srcPort < *baseSrcPort+*maxSrcPorts; srcPort++ { + sent[srcPort] = make([]int, *maxTtl) + rcvd[srcPort] = make([]int, *maxTtl) + hops[srcPort] = make([]string, *maxTtl) + //hops[srcPort][*maxTtl-1] = target + + for i := 0; i < *maxTtl; i++ { + hops[srcPort][i] = "?" + } + } + + // collect all probe specs emitted by senders + // once all senders terminate, tell receivers to quit too + go func() { + for val := range merge(probes...) { + probe := val.(Probe) + sent[probe.srcPort][probe.ttl-1]++ + } + glog.V(2).Infoln("All senders finished!") + // give receivers time to catch up on in-flight data + time.Sleep(2 * time.Second) + // tell receivers to stop receiving + close(recvDone) + }() + + // this store DNS names of all nodes that ever replied to us + var names []string + + // src ports that changed their paths in process of tracing + var flappedPorts map[int]bool = make(map[int]bool) + + lastClosed := *maxTtl + for val := range merge(resolved...) { + switch val.(type) { + case IcmpResponse: + resp := val.(IcmpResponse) + rcvd[resp.srcPort][resp.ttl-1]++ + currName := hops[resp.srcPort][resp.ttl-1] + if currName != "?" && currName != resp.fromName { + glog.V(2).Infof("%d: Source port %d flapped at ttl %d from: %s to %s\n", time.Now().UnixNano()/(1000*1000), resp.srcPort, resp.ttl, currName, resp.fromName) + flappedPorts[resp.srcPort] = true + } + hops[resp.srcPort][resp.ttl-1] = resp.fromName + // accumulate all names for processing later + // XXX: we may have duplicates, which is OK, + // but not very efficient + names = append(names, resp.fromName) + case TcpResponse: + resp := val.(TcpResponse) + // stop all senders sending above this ttl, since they are not needed + // XXX: this is not always optimal, i.e. we may receive TCP RST for + // a port mapped to a short WAN path, and it would tell us to terminate + // probing at higher TTL, thus cutting visibility on "long" paths + // however, this mostly concerned that last few hops... + for i := resp.ttl; i < lastClosed; i++ { + close(senderDone[i]) + } + // update the last closed ttl, so we don't double-close the channels + if resp.ttl < lastClosed { + lastClosed = resp.ttl + } + rcvd[resp.srcPort][resp.ttl-1]++ + hops[resp.srcPort][resp.ttl-1] = target + } + } + + for srcPort, hopVector := range hops { + for i := range hopVector { + // truncate lists once we hit the target name + if hopVector[i] == target && i < *maxTtl-1 { + sent[srcPort] = sent[srcPort][:i+1] + rcvd[srcPort] = rcvd[srcPort][:i+1] + hopVector = hopVector[:i+1] + break + } + } + } + + if len(flappedPorts) > 0 { + glog.Infof("A total of %d ports out of %d changed their paths while tracing\n", len(flappedPorts), *maxSrcPorts) + } + + lossyPathSent := make(map[int] /*src port */ []int) + lossyPathRcvd := make(map[int] /* src port */ []int) + lossyPathHops := make(map[int] /*src port*/ []string) + + // process the accumulated data, find and output lossy paths + for port, sentVector := range sent { + if flappedPorts[port] { + continue + } + if rcvdVector, ok := rcvd[port]; ok { + norm, err := normalizeRcvd(sentVector, rcvdVector) + + if err != nil { + glog.Errorf("Could not normalize %v / %v", rcvdVector, sentVector) + continue + } + + if isLossy(norm) || *showAll { + hosts := make([]string, len(norm)) + for i := range norm { + hosts[i] = hops[port][i] + } + lossyPathSent[port] = sentVector + lossyPathRcvd[port] = rcvdVector + lossyPathHops[port] = hosts + } + } else { + glog.Errorf("No responses received for port %d", port) + } + } + + if len(lossyPathHops) > 0 { + if *jsonOutput { + printLossyPathsJson(lossyPathSent, lossyPathRcvd, lossyPathHops, lastClosed+1) + } else { + printLossyPaths(lossyPathSent, lossyPathRcvd, lossyPathHops, *maxColumns, lastClosed+1) + } + return + } + glog.Infof("Did not find any faulty paths\n") +} @@ -0,0 +1,172 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +package main + +import ( + "bytes" + "encoding/binary" + "net" +) + +// +// TCP flags +// +const ( + FIN = 1 << 0 + SYN = 1 << 1 + RST = 1 << 2 + PSH = 1 << 3 + ACK = 1 << 4 + URG = 1 << 5 +) + +// +// Define the TCP header struct +// +type TcpHeader struct { + Source uint16 + Destination uint16 + SeqNum uint32 + AckNum uint32 + DataOffset uint8 // 4 bits + Reserved uint8 // 6 bits + Flags uint8 // 6 bits + Window uint16 + Checksum uint16 + Urgent uint16 +} + +// +// create & serialize a TCP header, compute and fill in the checksum (v4/v6) +// +func makeTcpHeader(af string, srcAddr, dstAddr *net.IP, srcPort, dstPort int, ts uint32) []byte { + tcpHeader := TcpHeader{ + Source: uint16(srcPort), // Random ephemeral port + Destination: uint16(dstPort), + SeqNum: ts, + AckNum: 0, + DataOffset: 5, // 4 bits + Reserved: 0, // 6 bits + Flags: SYN, // 6 bits (000010, SYN bit set) + Window: 0xffff, // max window + Checksum: 0, + Urgent: 0, + } + + // temporary bytes for checksum + bytes := tcpHeader.Serialize() + tcpHeader.Checksum = tcpChecksum(af, bytes, srcAddr, dstAddr) + + return tcpHeader.Serialize() +} + +// Parse packet into TcpHeader structure +func parseTcpHeader(data []byte) *TcpHeader { + var tcp TcpHeader + + r := bytes.NewReader(data) + + binary.Read(r, binary.BigEndian, &tcp.Source) + binary.Read(r, binary.BigEndian, &tcp.Destination) + binary.Read(r, binary.BigEndian, &tcp.SeqNum) + binary.Read(r, binary.BigEndian, &tcp.AckNum) + + // read the flags from a 16-bit field + var field uint16 + + binary.Read(r, binary.BigEndian, &field) + // most significant 4 bits + tcp.DataOffset = byte(field >> 12) + // reserved part - 6 bits + tcp.Reserved = byte(field >> 6 & 0x3f) + // flags - 6 bits + tcp.Flags = byte(field & 0x3f) + + binary.Read(r, binary.BigEndian, &tcp.Window) + binary.Read(r, binary.BigEndian, &tcp.Checksum) + binary.Read(r, binary.BigEndian, &tcp.Urgent) + + return &tcp +} + +// +// Emit raw bytes for the header +// +func (tcp *TcpHeader) Serialize() []byte { + + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, tcp.Source) + binary.Write(buf, binary.BigEndian, tcp.Destination) + binary.Write(buf, binary.BigEndian, tcp.SeqNum) + binary.Write(buf, binary.BigEndian, tcp.AckNum) + + var mix uint16 + mix = uint16(tcp.DataOffset)<<12 | + uint16(tcp.Reserved&0x3f)<<9 | + uint16(tcp.Flags&0x3f) + binary.Write(buf, binary.BigEndian, mix) + + binary.Write(buf, binary.BigEndian, tcp.Window) + binary.Write(buf, binary.BigEndian, tcp.Checksum) + binary.Write(buf, binary.BigEndian, tcp.Urgent) + + out := buf.Bytes() + + return out +} + +// +// TCP Checksum, works for both v4 and v6 IP addresses +// +func tcpChecksum(af string, data []byte, srcip, dstip *net.IP) uint16 { + + // the pseudo header used for TCP c-sum computation + var pseudoHeader []byte + + pseudoHeader = append(pseudoHeader, *srcip...) + pseudoHeader = append(pseudoHeader, *dstip...) + switch { + case af == "ip4": + pseudoHeader = append(pseudoHeader, []byte{ + 0, + 6, // protocol number for TCP + 0, byte(len(data)), // TCP length (16 bits), w/o pseudoheader + }...) + case af == "ip6": + pseudoHeader = append(pseudoHeader, []byte{ + 0, 0, 0, byte(len(data)), // TCP length (32 bits), w/0 pseudoheader + 0, 0, 0, + 6, // protocol number for TCP + }...) + } + + body := make([]byte, 0, len(pseudoHeader)+len(data)) + body = append(body, pseudoHeader...) + body = append(body, data...) + + bodyLen := len(body) + + var word uint16 + var csum uint32 + + for i := 0; i+1 < bodyLen; i += 2 { + word = uint16(body[i])<<8 | uint16(body[i+1]) + csum += uint32(word) + } + + if bodyLen%2 != 0 { + csum += uint32(body[len(body)-1]) + } + + csum = (csum >> 16) + (csum & 0xffff) + csum = csum + (csum >> 16) + + return uint16(^csum) +} @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +package main + +import ( + "sync" +) + +// +// Filter data on input channel +// +func filter(f func(interface{}) bool, in chan interface{}) chan interface{} { + out := make(chan interface{}) + + go func() { + for val := range in { + if f(val) { + out <- val + } + } + }() + + return out +} + +// +// fork input channel into two, copy data +// +func fork(in <-chan interface{}) (out1, out2 chan interface{}) { + out1, out2 = make(chan interface{}), make(chan interface{}) + + go func() { + for val := range in { + out1 <- val + out2 <- val + } + }() + + return +} + +// +// Merge data from multiple channels into one +// +func merge(cs ...chan interface{}) chan interface{} { + var wg sync.WaitGroup + out := make(chan interface{}) + + output := func(c <-chan interface{}) { + defer wg.Done() + for val := range c { + out <- val + } + } + + wg.Add(len(cs)) + for _, ch := range cs { + go output(ch) + } + + go func() { + wg.Wait() + close(out) + }() + + return out +} |