Skip to content

feat: initial snmp support #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ endif
install-dev-tools:
@go install github.com/mfridman/tparse@latest

.PHONY: deps
deps:
@go mod tidy

agent_bin:
echo "ORB_VERSION: $(ORB_VERSION)-$(COMMIT_HASH)"
Expand Down Expand Up @@ -95,4 +98,4 @@ pull-latest-otel-collector-contrib:
cp ./agent/backend/otel/otelcol-contrib .
rm ./agent/backend/otel/otelcol_contrib.tar.gz
rm ./agent/backend/otel/LICENSE
rm ./agent/backend/otel/README.md
rm ./agent/backend/otel/README.md
312 changes: 312 additions & 0 deletions agent/backend/snmpdiscovery/snmp_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package snmpdiscovery

import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"time"

"gopkg.in/yaml.v3"

"github.com/netboxlabs/orb-agent/agent/backend"
"github.com/netboxlabs/orb-agent/agent/config"
"github.com/netboxlabs/orb-agent/agent/policies"
)

var _ backend.Backend = (*snmpDiscoveryBackend)(nil)

const (
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessTimeout = 10
applyPolicyTimeout = 10
removePolicyTimeout = 20
defaultExec = "snmp-discovery"
defaultAPIHost = "localhost"
defaultAPIPort = "8073"
)

type snmpDiscoveryBackend struct {
logger *slog.Logger
policyRepo policies.PolicyRepo
exec string

apiHost string
apiPort string
apiProtocol string

diodeTarget string
diodeAPIKey string
diodeAppNamePrefix string

startTime time.Time
proc backend.Commander
statusChan <-chan backend.CmdStatus
cancelFunc context.CancelFunc
ctx context.Context
}

type info struct {
Version string `json:"version"`
UpTimeMin float64 `json:"up_time_seconds"`
}

// Register registers the snmp discovery backend
func Register() bool {
backend.Register("snmp_discovery", &snmpDiscoveryBackend{
apiProtocol: "http",
exec: defaultExec,
})
return true
}

func (d *snmpDiscoveryBackend) Configure(logger *slog.Logger, repo policies.PolicyRepo,
config map[string]any, common config.BackendCommons,
) error {
d.logger = logger
d.policyRepo = repo

var prs bool
if d.apiHost, prs = config["host"].(string); !prs {
d.apiHost = defaultAPIHost
}
if d.apiPort, prs = config["port"].(string); !prs {
d.apiPort = defaultAPIPort
}

d.diodeTarget = common.Diode.Target
d.diodeAPIKey = common.Diode.APIKey
d.diodeAppNamePrefix = common.Diode.AgentName

return nil
}

func (d *snmpDiscoveryBackend) Version() (string, error) {
var info info
url := fmt.Sprintf("%s://%s:%s/api/v1/status", d.apiProtocol, d.apiHost, d.apiPort)
err := backend.CommonRequest("snmp-discovery", d.proc, d.logger, url, &info, http.MethodGet,
http.NoBody, "application/json", versionTimeout, "detail")
if err != nil {
return "", err
}
return info.Version, nil
}

func (d *snmpDiscoveryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error {
d.startTime = time.Now()
d.cancelFunc = cancelFunc
d.ctx = ctx

pvOptions := []string{
"--host", d.apiHost,
"--port", d.apiPort,
"--diode-target", d.diodeTarget,
"--diode-api-key", "********",
"--diode-app-name-prefix", d.diodeAppNamePrefix,
}

d.logger.Info("snmp-discovery startup", slog.Any("arguments", pvOptions))

pvOptions[7] = d.diodeAPIKey

d.proc = backend.NewCmdOptions(backend.CmdOptions{
Buffered: false,
Streaming: true,
}, d.exec, pvOptions...)
d.statusChan = d.proc.Start()

// log STDOUT and STDERR lines streaming from Cmd
doneChan := make(chan struct{})
go func() {
defer func() {
if doneChan != nil {
close(doneChan)
}
}()
stdout := d.proc.GetStdout()
stderr := d.proc.GetStderr()
for stdout != nil || stderr != nil {
select {
case line, open := <-stdout:
if !open {
stdout = nil
continue
}
d.logger.Info("snmp-discovery stdout", slog.String("log", line))
case line, open := <-stderr:
if !open {
stderr = nil
continue
}
d.logger.Info("snmp-discovery stderr", slog.String("log", line))
}
}
}()

// wait for simple startup errors
time.Sleep(time.Second)

status := d.proc.Status()

if status.Error != nil {
d.logger.Error("snmp-discovery startup error", slog.Any("error", status.Error))
return status.Error
}

if status.Complete {
err := d.proc.Stop()
if err != nil {
d.logger.Error("proc.Stop error", slog.Any("error", err))
}
return errors.New("snmp-discovery startup error, check log")
}

d.logger.Info("snmp-discovery process started", slog.Int("pid", status.PID))

var version string
var readinessErr error
for backoff := range readinessBackoff {
version, readinessErr = d.Version()
if readinessErr == nil {
d.logger.Info("snmp-discovery readiness ok, got version ",
slog.String("network_discovery_version", version))
break
}
backoffDuration := time.Duration(backoff) * time.Second
d.logger.Info("snmp-discovery is not ready, trying again with backoff",
slog.String("backoff backoffDuration", backoffDuration.String()))
time.Sleep(backoffDuration)
}

if readinessErr != nil {
d.logger.Error("snmp-discovery error on readiness", slog.Any("error", readinessErr))
err := d.proc.Stop()
if err != nil {
d.logger.Error("proc.Stop error", slog.Any("error", err))
}
return readinessErr
}

return nil
}

func (d *snmpDiscoveryBackend) Stop(ctx context.Context) error {
d.logger.Info("routine call to stop snmp-discovery", slog.Any("routine", ctx.Value(config.ContextKey("routine"))))
defer d.cancelFunc()
err := d.proc.Stop()
finalStatus := <-d.statusChan
if err != nil {
d.logger.Error("snmp-discovery shutdown error", slog.Any("error", err))
}
d.logger.Info("snmp-discovery process stopped", slog.Int("pid", finalStatus.PID),
slog.Int("exit_code", finalStatus.Exit))
return nil
}

func (d *snmpDiscoveryBackend) FullReset(ctx context.Context) error {
// force a stop, which stops scrape as well. if proc is dead, it no ops.
if state, _, _ := backend.GetRunningStatus(d.proc); state == backend.Running {
if err := d.Stop(ctx); err != nil {
d.logger.Error("failed to stop backend on restart procedure", slog.Any("error", err))
return err
}
}
// for each policy, restart the scraper
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "snmp-discovery"))
// start it
if err := d.Start(backendCtx, cancelFunc); err != nil {
d.logger.Error("failed to start backend on restart procedure", slog.Any("error", err))
return err
}
return nil
}

func (d *snmpDiscoveryBackend) GetStartTime() time.Time {
return d.startTime
}

func (d *snmpDiscoveryBackend) GetCapabilities() (map[string]any, error) {
caps := make(map[string]any)
url := fmt.Sprintf("%s://%s:%s/api/v1/capabilities", d.apiProtocol, d.apiHost, d.apiPort)
err := backend.CommonRequest("snmp-discovery", d.proc, d.logger, url, &caps, http.MethodGet,
http.NoBody, "application/json", capabilitiesTimeout, "detail")
if err != nil {
return nil, err
}
return caps, nil
}

func (d *snmpDiscoveryBackend) GetRunningStatus() (backend.RunningStatus, string, error) {
// first check process status
runningStatus, errMsg, err := backend.GetRunningStatus(d.proc)
// if it's not running, we're done
if runningStatus != backend.Running {
return runningStatus, errMsg, err
}
// if it's running, check REST API availability too
if _, aiErr := d.Version(); aiErr != nil {
// process is running, but REST API is not accessible
return backend.BackendError, "process running, REST API unavailable", aiErr
}
return runningStatus, "", nil
}

func (d *snmpDiscoveryBackend) GetInitialState() backend.RunningStatus {
return backend.Unknown
}

func (d *snmpDiscoveryBackend) ApplyPolicy(data policies.PolicyData, updatePolicy bool) error {
if updatePolicy {
// To update a policy it's necessary first remove it and then apply a new version
if err := d.RemovePolicy(data); err != nil {
d.logger.Warn("policy failed to remove", slog.String("policy_id", data.ID),
slog.String("policy_name", data.Name), slog.Any("error", err))
}
}

d.logger.Debug("snmp-discovery policy apply", slog.String("policy_id", data.ID), slog.Any("data", data.Data))

fullPolicy := map[string]any{
"policies": map[string]any{
data.Name: data.Data,
},
}

policyYaml, err := yaml.Marshal(fullPolicy)
if err != nil {
d.logger.Warn("policy yaml marshal failure", slog.String("policy_id", data.ID), slog.String("policy_name", data.Name))
return err
}

var resp map[string]any
url := fmt.Sprintf("%s://%s:%s/api/v1/%s", d.apiProtocol, d.apiHost, d.apiPort, "policies")
err = backend.CommonRequest("snmp-discovery", d.proc, d.logger, url, &resp, http.MethodPost,
bytes.NewBuffer(policyYaml), "application/x-yaml", applyPolicyTimeout, "detail")
if err != nil {
d.logger.Warn("policy application failure", slog.String("policy_id", data.ID), slog.String("policy_name", data.Name))
return err
}

return nil
}

func (d *snmpDiscoveryBackend) RemovePolicy(data policies.PolicyData) error {
d.logger.Debug("snmp-discovery policy remove", slog.String("policy_id", data.ID))
var resp any
name := data.Name
// Since we use Name for removing policies not IDs, if there is a change, we need to remove the previous name of the policy
if data.PreviousPolicyData != nil && data.PreviousPolicyData.Name != data.Name {
name = data.PreviousPolicyData.Name
}
url := fmt.Sprintf("%s://%s:%s/api/v1/policies/%s", d.apiProtocol, d.apiHost, d.apiPort, name)
err := backend.CommonRequest("snmp-discovery", d.proc, d.logger, url, &resp, http.MethodDelete,
http.NoBody, "application/json", removePolicyTimeout, "detail")
if err != nil {
return err
}
return nil
}
Loading