write

package
v1.9.1-community Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2025 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const IPv6Type uint16 = 0x86DD

IPv6Type value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml

Variables

View Source
var (
	IANAFields = []string{
		"ethernetType",
		"flowDirection",
		"sourceMacAddress",
		"destinationMacAddress",
		"protocolIdentifier",
		"sourceTransportPort",
		"destinationTransportPort",
		"octetDeltaCount",
		"flowStartMilliseconds",
		"flowEndMilliseconds",
		"packetDeltaCount",
		"interfaceName",
		"tcpControlBits",
	}
	IPv4IANAFields = append([]string{
		"sourceIPv4Address",
		"destinationIPv4Address",
		"icmpTypeIPv4",
		"icmpCodeIPv4",
	}, IANAFields...)
	IPv6IANAFields = append([]string{
		"sourceIPv6Address",
		"destinationIPv6Address",
		"nextHeaderIPv6",
		"icmpTypeIPv6",
		"icmpCodeIPv6",
	}, IANAFields...)
	KubeFields = []entities.InfoElement{
		{Name: "sourcePodNamespace", ElementId: 7733, DataType: entities.String, Len: 65535},
		{Name: "sourcePodName", ElementId: 7734, DataType: entities.String, Len: 65535},
		{Name: "destinationPodNamespace", ElementId: 7735, DataType: entities.String, Len: 65535},
		{Name: "destinationPodName", ElementId: 7736, DataType: entities.String, Len: 65535},
		{Name: "sourceNodeName", ElementId: 7737, DataType: entities.String, Len: 65535},
		{Name: "destinationNodeName", ElementId: 7738, DataType: entities.String, Len: 65535},
	}
	CustomNetworkFields = []entities.InfoElement{
		{Name: "timeFlowRttNs", ElementId: 7740, DataType: entities.Unsigned64, Len: 8},
		{Name: "interfaces", ElementId: 7741, DataType: entities.String, Len: 65535},
		{Name: "directions", ElementId: 7742, DataType: entities.String, Len: 65535},
	}

	MapIPFIXKeys = map[string]FieldMap{
		"sourceIPv4Address": {
			Key:    "SrcAddr",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetIPAddressValue().String() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetIPAddressValue(net.ParseIP(rec.(string))) },
		},
		"destinationIPv4Address": {
			Key:    "DstAddr",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetIPAddressValue().String() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetIPAddressValue(net.ParseIP(rec.(string))) },
		},
		"sourceIPv6Address": {
			Key:    "SrcAddr",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetIPAddressValue().String() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetIPAddressValue(net.ParseIP(rec.(string))) },
		},
		"destinationIPv6Address": {
			Key:    "DstAddr",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetIPAddressValue().String() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetIPAddressValue(net.ParseIP(rec.(string))) },
		},
		"nextHeaderIPv6": {
			Key:    "Proto",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) },
		},
		"sourceMacAddress": {
			Key:    "SrcMac",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetMacAddressValue().String() },
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				mac, _ := net.ParseMAC(rec.(string))
				elt.SetMacAddressValue(mac)
			},
		},
		"destinationMacAddress": {
			Key:    "DstMac",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetMacAddressValue().String() },
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				mac, _ := net.ParseMAC(rec.(string))
				elt.SetMacAddressValue(mac)
			},
		},
		"ethernetType": {
			Key:    "Etype",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned16Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned16Value(rec.(uint16)) },
		},
		"flowDirection": {
			Key: "IfDirections",
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				if dirs, ok := rec.([]int); ok && len(dirs) > 0 {
					elt.SetUnsigned8Value(uint8(dirs[0]))
				}
			},
			Matcher: func(elt entities.InfoElementWithValue, expected any) bool {
				ifdirs := expected.([]int)
				return int(elt.GetUnsigned8Value()) == ifdirs[0]
			},
		},
		"directions": {
			Key: "IfDirections",
			Getter: func(elt entities.InfoElementWithValue) any {
				var dirs []int
				for _, dir := range strings.Split(elt.GetStringValue(), ",") {
					d, _ := strconv.Atoi(dir)
					dirs = append(dirs, d)
				}
				return dirs
			},
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				if dirs, ok := rec.([]int); ok && len(dirs) > 0 {
					var asStr []string
					for _, dir := range dirs {
						asStr = append(asStr, strconv.Itoa(dir))
					}
					elt.SetStringValue(strings.Join(asStr, ","))
				}
			},
		},
		"protocolIdentifier": {
			Key:    "Proto",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) },
		},
		"sourceTransportPort": {
			Key:    "SrcPort",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned16Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned16Value(rec.(uint16)) },
		},
		"destinationTransportPort": {
			Key:    "DstPort",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned16Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned16Value(rec.(uint16)) },
		},
		"octetDeltaCount": {
			Key:    "Bytes",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned64Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(rec.(uint64)) },
		},
		"flowStartMilliseconds": {
			Key:    "TimeFlowStartMs",
			Getter: func(elt entities.InfoElementWithValue) any { return int64(elt.GetUnsigned64Value()) },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(uint64(rec.(int64))) },
		},
		"flowEndMilliseconds": {
			Key:    "TimeFlowEndMs",
			Getter: func(elt entities.InfoElementWithValue) any { return int64(elt.GetUnsigned64Value()) },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(uint64(rec.(int64))) },
		},
		"packetDeltaCount": {
			Key:    "Packets",
			Getter: func(elt entities.InfoElementWithValue) any { return uint32(elt.GetUnsigned64Value()) },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(uint64(rec.(uint32))) },
		},
		"interfaceName": {
			Key: "Interfaces",
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				if ifs, ok := rec.([]string); ok && len(ifs) > 0 {
					elt.SetStringValue(ifs[0])
				}
			},
			Matcher: func(elt entities.InfoElementWithValue, expected any) bool {
				ifs := expected.([]string)
				return elt.GetStringValue() == ifs[0]
			},
		},
		"tcpControlBits": {
			Key: "Flags",
			Getter: func(elt entities.InfoElementWithValue) any {
				return elt.GetUnsigned16Value()
			},
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				if decoded, isDecoded := rec.([]string); isDecoded {

					reencoded := utils.EncodeTCPFlags(decoded)
					elt.SetUnsigned16Value(uint16(reencoded))
				} else if raw, isRaw := rec.(uint16); isRaw {
					elt.SetUnsigned16Value(raw)
				}
			},
			Matcher: func(elt entities.InfoElementWithValue, expected any) bool {
				received := elt.GetUnsigned16Value()
				if expSlice, isSlice := expected.([]string); isSlice {
					decoded := utils.DecodeTCPFlags(uint(received))
					if len(expSlice) != len(decoded) {
						return false
					}
					for i := 0; i < len(expSlice); i++ {
						if expSlice[i] != decoded[i] {
							return false
						}
					}
					return true
				}
				if expected == nil {
					return received == 0
				}
				return received == expected
			},
		},
		"icmpTypeIPv4": {
			Key:    "IcmpType",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) },
		},
		"icmpCodeIPv4": {
			Key:    "IcmpCode",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) },
		},
		"icmpTypeIPv6": {
			Key:    "IcmpType",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) },
		},
		"icmpCodeIPv6": {
			Key:    "IcmpCode",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetUnsigned8Value() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned8Value(rec.(uint8)) },
		},
		"interfaces": {
			Key:    "Interfaces",
			Getter: func(elt entities.InfoElementWithValue) any { return strings.Split(elt.GetStringValue(), ",") },
			Setter: func(elt entities.InfoElementWithValue, rec any) {
				if ifs, ok := rec.([]string); ok {
					elt.SetStringValue(strings.Join(ifs, ","))
				}
			},
		},
		"sourcePodNamespace": {
			Key:    "SrcK8S_Namespace",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) },
		},
		"sourcePodName": {
			Key:    "SrcK8S_Name",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) },
		},
		"destinationPodNamespace": {
			Key:    "DstK8S_Namespace",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) },
		},
		"destinationPodName": {
			Key:    "DstK8S_Name",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) },
		},
		"sourceNodeName": {
			Key:    "SrcK8S_HostName",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) },
		},
		"destinationNodeName": {
			Key:    "DstK8S_HostName",
			Getter: func(elt entities.InfoElementWithValue) any { return elt.GetStringValue() },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetStringValue(rec.(string)) },
		},
		"timeFlowRttNs": {
			Key:    "TimeFlowRttNs",
			Getter: func(elt entities.InfoElementWithValue) any { return int64(elt.GetUnsigned64Value()) },
			Setter: func(elt entities.InfoElementWithValue, rec any) { elt.SetUnsigned64Value(uint64(rec.(int64))) },
		},
	}
)

Functions

This section is empty.

Types

type Fake

type Fake struct {
	// contains filtered or unexported fields
}

func (*Fake) AllRecords

func (w *Fake) AllRecords() []config.GenericMap

func (*Fake) Write

func (w *Fake) Write(in config.GenericMap)

Write stores in memory all records.

type FieldMap

type FieldMap struct {
	Key     string
	Getter  func(entities.InfoElementWithValue) any
	Setter  func(entities.InfoElementWithValue, any)
	Matcher func(entities.InfoElementWithValue, any) bool
}

type Loki

type Loki struct {
	// contains filtered or unexported fields
}

Loki record writer

func NewWriteLoki

func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Loki, error)

NewWriteLoki creates a Loki writer from configuration

func (*Loki) ProcessRecord

func (l *Loki) ProcessRecord(in config.GenericMap) error

func (*Loki) Write

func (l *Loki) Write(entry config.GenericMap)

Write writes a flow before being stored

type None

type None struct {
	// contains filtered or unexported fields
}

func (*None) PrevRecords

func (t *None) PrevRecords() []config.GenericMap

func (*None) Write

func (t *None) Write(in config.GenericMap)

Write writes entries

type Writer

type Writer interface {
	Write(in config.GenericMap)
}

func NewWriteFake added in v0.1.3

func NewWriteFake(_ config.StageParam) (Writer, error)

NewWriteFake creates a new write.

func NewWriteGRPC

func NewWriteGRPC(params config.StageParam) (Writer, error)

NewWriteGRPC create a new write

func NewWriteIpfix added in v0.1.8

func NewWriteIpfix(params config.StageParam) (Writer, error)

NewWriteIpfix creates a new write

func NewWriteNone

func NewWriteNone() (Writer, error)

NewWriteNone create a new write

func NewWriteStdout

func NewWriteStdout(params config.StageParam) (Writer, error)

NewWriteStdout create a new write

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL