165 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Go
		
	
	
	
package capture
 | 
						|
 | 
						|
import (
 | 
						|
	"net"
 | 
						|
 | 
						|
	"dev.sum7.eu/wifictld/analyzer/data"
 | 
						|
	log "github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
//Collector for capture
 | 
						|
type Collector struct {
 | 
						|
	connections map[string]*net.UDPConn
 | 
						|
	handler     data.Handler
 | 
						|
	queue       chan *Packet
 | 
						|
	stop        chan interface{}
 | 
						|
}
 | 
						|
 | 
						|
// NewCollector creates a Collector struct
 | 
						|
func NewCollector(handler data.Handler, ifaces []IFaceConfig) *Collector {
 | 
						|
 | 
						|
	coll := &Collector{
 | 
						|
		handler:     handler,
 | 
						|
		queue:       make(chan *Packet, 400),
 | 
						|
		stop:        make(chan interface{}),
 | 
						|
		connections: make(map[string]*net.UDPConn),
 | 
						|
	}
 | 
						|
 | 
						|
	for _, iface := range ifaces {
 | 
						|
		coll.listenUDP(iface)
 | 
						|
	}
 | 
						|
 | 
						|
	go coll.parser()
 | 
						|
 | 
						|
	return coll
 | 
						|
}
 | 
						|
 | 
						|
// Close Collector
 | 
						|
func (coll *Collector) Close() {
 | 
						|
	close(coll.stop)
 | 
						|
	for _, conn := range coll.connections {
 | 
						|
		conn.Close()
 | 
						|
	}
 | 
						|
	close(coll.queue)
 | 
						|
}
 | 
						|
 | 
						|
func (coll *Collector) listenUDP(iface IFaceConfig) {
 | 
						|
	ip := net.ParseIP(iface.IPAddress)
 | 
						|
	var conn *net.UDPConn
 | 
						|
	var err error
 | 
						|
 | 
						|
	if ip.IsMulticast() {
 | 
						|
		ifx, err := net.InterfaceByName(iface.InterfaceName)
 | 
						|
		if err != nil {
 | 
						|
			log.Panic(err)
 | 
						|
		}
 | 
						|
 | 
						|
		// Open socket
 | 
						|
		conn, err = net.ListenMulticastUDP("udp", ifx, &net.UDPAddr{
 | 
						|
			IP:   ip,
 | 
						|
			Port: iface.Port,
 | 
						|
		})
 | 
						|
		if err != nil {
 | 
						|
			log.Panic(err)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		// Open socket
 | 
						|
		conn, err = net.ListenUDP("udp", &net.UDPAddr{
 | 
						|
			IP:   ip,
 | 
						|
			Port: iface.Port,
 | 
						|
			Zone: iface.InterfaceName,
 | 
						|
		})
 | 
						|
		if err != nil {
 | 
						|
			log.Panic(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	conn.SetReadBuffer(MaxDataGramSize)
 | 
						|
 | 
						|
	coll.connections[iface.InterfaceName] = conn
 | 
						|
 | 
						|
	// Start receiver
 | 
						|
	go coll.receiver(conn)
 | 
						|
}
 | 
						|
 | 
						|
// Packet of the respond request
 | 
						|
type Packet struct {
 | 
						|
	Address *net.UDPAddr
 | 
						|
	Raw     []byte
 | 
						|
}
 | 
						|
 | 
						|
func (coll *Collector) parser() {
 | 
						|
	for obj := range coll.queue {
 | 
						|
		msg, err := data.NewSocketMSG(obj.Raw)
 | 
						|
		if err != nil {
 | 
						|
			log.Warnf("unable to unmarshal request from %s: %s", obj.Address.String(), err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		log.Debugf("recv[%s]: %s", obj.Address, msg.String())
 | 
						|
		response, err := coll.handler(obj.Address, msg)
 | 
						|
		if err != nil {
 | 
						|
			log.Warnf("unable to handle request from %s: %s", obj.Address.String(), err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if response != nil {
 | 
						|
			coll.SendTo(obj.Address, response)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// SendTo a specifical address
 | 
						|
func (coll *Collector) SendTo(addr *net.UDPAddr, msg *data.SocketMSG) {
 | 
						|
	log.Debugf("send[%s]: %s", addr, msg.String())
 | 
						|
	data, err := msg.Marshal()
 | 
						|
	if err != nil {
 | 
						|
		log.Warnf("unable to marshal response for %s: %s", addr.String(), err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	conn, ok := coll.connections[addr.Zone]
 | 
						|
	if ok {
 | 
						|
		conn.WriteToUDP(data, addr)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for _, conn := range coll.connections {
 | 
						|
		conn.WriteToUDP(data, addr)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Send to every connection to default address
 | 
						|
func (coll *Collector) Send(msg *data.SocketMSG) {
 | 
						|
	log.Debugf("send: %s", msg.String())
 | 
						|
	data, err := msg.Marshal()
 | 
						|
	if err != nil {
 | 
						|
		log.Warnf("unable to marshal response: %s", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for ifname, conn := range coll.connections {
 | 
						|
		conn.WriteToUDP(data, &net.UDPAddr{
 | 
						|
			IP:   net.ParseIP(MulticastAddressDefault),
 | 
						|
			Port: Port,
 | 
						|
			Zone: ifname,
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (coll *Collector) receiver(conn *net.UDPConn) {
 | 
						|
	buf := make([]byte, MaxDataGramSize)
 | 
						|
 | 
						|
	for {
 | 
						|
		n, src, err := conn.ReadFromUDP(buf)
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			log.Warnf("ReadFromUDP failed: %s", err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		raw := make([]byte, n)
 | 
						|
		copy(raw, buf)
 | 
						|
 | 
						|
		coll.queue <- &Packet{
 | 
						|
			Address: src,
 | 
						|
			Raw:     raw,
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |