wifictld-analyzer/capture/collector.go

171 lines
3.4 KiB
Go
Raw Normal View History

2018-06-02 01:00:54 +02:00
package capture
import (
"net"
"dev.sum7.eu/wifictld/analyzer/data"
log "github.com/sirupsen/logrus"
)
//Collector for capture
type Collector struct {
connections map[string]*net.UDPConn
handler data.Handler
queue chan *Packet
stop chan interface{}
}
// NewCollector creates a Collector struct
2018-07-10 21:40:38 +02:00
func NewCollector(handler data.Handler, ifaces []*IFaceConfig) *Collector {
2018-06-02 01:00:54 +02:00
coll := &Collector{
handler: handler,
queue: make(chan *Packet, 400),
stop: make(chan interface{}),
connections: make(map[string]*net.UDPConn),
}
for _, iface := range ifaces {
2018-07-10 21:40:38 +02:00
if iface.Port == 0 {
iface.Port = Port
}
if iface.IPAddress == "" {
iface.IPAddress = MulticastAddressDefault
}
2018-06-02 01:00:54 +02:00
coll.listenUDP(iface)
}
go coll.parser()
return coll
}
// Close Collector
func (coll *Collector) Close() {
close(coll.stop)
for _, conn := range coll.connections {
conn.Close()
}
close(coll.queue)
}
2018-07-10 21:40:38 +02:00
func (coll *Collector) listenUDP(iface *IFaceConfig) {
2018-06-02 01:00:54 +02:00
ip := net.ParseIP(iface.IPAddress)
var conn *net.UDPConn
var err error
if ip.IsMulticast() {
ifx, err := net.InterfaceByName(iface.InterfaceName)
if err != nil {
log.Panic(err)
}
// Open socket
conn, err = net.ListenMulticastUDP("udp", ifx, &net.UDPAddr{
IP: ip,
Port: iface.Port,
})
if err != nil {
log.Panic(err)
}
} else {
// Open socket
conn, err = net.ListenUDP("udp", &net.UDPAddr{
IP: ip,
Port: iface.Port,
Zone: iface.InterfaceName,
})
if err != nil {
log.Panic(err)
}
}
conn.SetReadBuffer(MaxDataGramSize)
coll.connections[iface.InterfaceName] = conn
// Start receiver
go coll.receiver(conn)
}
// Packet of the respond request
type Packet struct {
Address *net.UDPAddr
Raw []byte
}
func (coll *Collector) parser() {
for obj := range coll.queue {
msg, err := data.NewSocketMSG(obj.Raw)
if err != nil {
log.Warnf("unable to unmarshal request from %s: %s", obj.Address.String(), err)
continue
}
2018-06-03 20:37:52 +02:00
log.Debugf("recv[%s]: %s", obj.Address, msg.String())
2018-06-02 01:00:54 +02:00
response, err := coll.handler(obj.Address, msg)
if err != nil {
log.Warnf("unable to handle request from %s: %s", obj.Address.String(), err)
continue
}
if response != nil {
coll.SendTo(obj.Address, response)
}
}
}
// SendTo a specifical address
func (coll *Collector) SendTo(addr *net.UDPAddr, msg *data.SocketMSG) {
2018-06-03 20:37:52 +02:00
log.Debugf("send[%s]: %s", addr, msg.String())
2018-06-02 01:00:54 +02:00
data, err := msg.Marshal()
if err != nil {
log.Warnf("unable to marshal response for %s: %s", addr.String(), err)
return
}
conn, ok := coll.connections[addr.Zone]
if ok {
conn.WriteToUDP(data, addr)
return
}
for _, conn := range coll.connections {
conn.WriteToUDP(data, addr)
}
}
// Send to every connection to default address
func (coll *Collector) Send(msg *data.SocketMSG) {
2018-06-03 20:37:52 +02:00
log.Debugf("send: %s", msg.String())
2018-06-02 01:00:54 +02:00
data, err := msg.Marshal()
if err != nil {
log.Warnf("unable to marshal response: %s", err)
return
}
for ifname, conn := range coll.connections {
conn.WriteToUDP(data, &net.UDPAddr{
IP: net.ParseIP(MulticastAddressDefault),
Port: Port,
Zone: ifname,
})
}
}
func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, MaxDataGramSize)
for {
n, src, err := conn.ReadFromUDP(buf)
if err != nil {
log.Warnf("ReadFromUDP failed: %s", err)
return
}
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &Packet{
Address: src,
Raw: raw,
}
}
}