wifictld-analyzer/capture/collector.go

170 lines
3.3 KiB
Go

package capture
import (
"net"
"github.com/bdlm/log"
)
//Collector for capture
type Collector struct {
connections map[string]*net.UDPConn
handler Handler
queue chan *Packet
stop chan interface{}
}
// NewCollector creates a Collector struct
func NewCollector(handler Handler, ifaces []*IFaceConfig) *Collector {
coll := &Collector{
handler: handler,
queue: make(chan *Packet, 400),
stop: make(chan interface{}),
connections: make(map[string]*net.UDPConn),
}
for _, iface := range ifaces {
if iface.Port == 0 {
iface.Port = Port
}
if iface.IPAddress == "" {
iface.IPAddress = MulticastAddressDefault
}
coll.listenUDP(iface)
}
go coll.parser()
return coll
}
// Close Collector
func (coll *Collector) Close() {
close(coll.stop)
for _, conn := range coll.connections {
conn.Close()
}
close(coll.queue)
}
func (coll *Collector) listenUDP(iface *IFaceConfig) {
ip := net.ParseIP(iface.IPAddress)
var conn *net.UDPConn
var err error
if ip.IsMulticast() {
ifx, err := net.InterfaceByName(iface.InterfaceName)
if err != nil {
log.Panic(err)
}
// Open socket
conn, err = net.ListenMulticastUDP("udp", ifx, &net.UDPAddr{
IP: ip,
Port: iface.Port,
})
if err != nil {
log.Panic(err)
}
} else {
// Open socket
conn, err = net.ListenUDP("udp", &net.UDPAddr{
IP: ip,
Port: iface.Port,
Zone: iface.InterfaceName,
})
if err != nil {
log.Panic(err)
}
}
conn.SetReadBuffer(MaxDataGramSize)
coll.connections[iface.InterfaceName] = conn
// Start receiver
go coll.receiver(conn)
}
// Packet of the respond request
type Packet struct {
Address *net.UDPAddr
Raw []byte
}
func (coll *Collector) parser() {
for obj := range coll.queue {
msg, err := NewSocketMSG(obj.Raw)
if err != nil {
log.Warnf("unable to unmarshal request from %s: %s", obj.Address.String(), err)
continue
}
log.Debugf("recv[%s]: %s", obj.Address, msg.String())
response, err := coll.handler(obj.Address, msg)
if err != nil {
log.Warnf("unable to handle request from %s: %s", obj.Address.String(), err)
continue
}
if response != nil {
coll.SendTo(obj.Address, response)
}
}
}
// SendTo a specifical address
func (coll *Collector) SendTo(addr *net.UDPAddr, msg *SocketMSG) {
log.Debugf("send[%s]: %s", addr, msg.String())
data, err := msg.Marshal()
if err != nil {
log.Warnf("unable to marshal response for %s: %s", addr.String(), err)
return
}
conn, ok := coll.connections[addr.Zone]
if ok {
conn.WriteToUDP(data, addr)
return
}
for _, conn := range coll.connections {
conn.WriteToUDP(data, addr)
}
}
// Send to every connection to default address
func (coll *Collector) Send(msg *SocketMSG) {
log.Debugf("send: %s", msg.String())
data, err := msg.Marshal()
if err != nil {
log.Warnf("unable to marshal response: %s", err)
return
}
for ifname, conn := range coll.connections {
conn.WriteToUDP(data, &net.UDPAddr{
IP: net.ParseIP(MulticastAddressDefault),
Port: Port,
Zone: ifname,
})
}
}
func (coll *Collector) receiver(conn *net.UDPConn) {
buf := make([]byte, MaxDataGramSize)
for {
n, src, err := conn.ReadFromUDP(buf)
if err != nil {
log.Warnf("ReadFromUDP failed: %s", err)
return
}
raw := make([]byte, n)
copy(raw, buf)
coll.queue <- &Packet{
Address: src,
Raw: raw,
}
}
}