Skip to content

Commit

Permalink
Change upgrade task to feed from S3
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Nov 9, 2023
1 parent ab37c50 commit b9bc264
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 211 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build-publish-binaries.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ jobs:
docker cp $id:/usr/local/discovery/deepfence-discovery - > deepfence-discovery
docker cp $id:/opt/td-agent-bit/bin/fluent-bit - > fluent-bit
docker cp $id:/usr/local/bin/compliance_check/compliance - > compliance
tar zcvf binaries.tar.gz ./*
docker rm -v $id
- name: Upload to S3
run: aws s3 sync /tmp/binaries s3://deepfence-tm-binaries
run: aws s3 sync --exclude "*" --include "*.tar.gz" /tmp/binaries s3://deepfence-tm-binaries
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/SecretScanner
Submodule SecretScanner updated 1 files
+26 −12 Dockerfile
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/YaraHunter
254 changes: 47 additions & 207 deletions deepfence_worker/cronjobs/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@ import (
"fmt"
"net/http"
url2 "net/url"
"os"
"os/exec"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/hibiken/asynq"
m "github.com/minio/minio-go/v7"
"github.com/neo4j/neo4j-go-driver/v4/neo4j"
)

var (
//TODO: compliance
plugins = []string{"fluentbit", "discovery", "package_scanner", "secret_scanner", "malware_scanner"}
)

func getVersionMetadata(url string, result *[]map[string]interface{}) error {
resp, err := http.Get(url)
if err != nil {
Expand All @@ -39,99 +47,62 @@ func getVersionMetadata(url string, result *[]map[string]interface{}) error {
func CheckAgentUpgrade(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Start agent version check")

res := []map[string]interface{}{}
err := getVersionMetadata("https://api.github.com/repos/deepfence/ThreatMapper/tags", &res)
sess, _ := session.NewSession(&aws.Config{
Region: aws.String("us-east-2"), Credentials: credentials.AnonymousCredentials},
)

svc := s3.New(sess)

resp, err := svc.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String("deepfence-tm-binaries"),
})

if err != nil {
return err
}

tags_to_ingest := []string{}
for _, tag := range res {
if strings.HasPrefix(tag["name"].(string), "v") {
tags_to_ingest = append(tags_to_ingest, tag["name"].(string))
}
}
downloader := s3manager.NewDownloader(sess)

tags_with_urls, err := prepareAgentReleases(ctx, tags_to_ingest)
if err != nil {
log.Error().Msgf("Prepare agent releases: %v", err)
} else {
err = ingestAgentVersion(ctx, tags_with_urls)
versioned_tarball := map[string]*aws.WriteAtBuffer{}
for _, item := range resp.Contents {
buf := []byte{}
b := aws.NewWriteAtBuffer(buf)
_, err := downloader.Download(b,
&s3.GetObjectInput{
Bucket: aws.String("deepfence-tm-binaries"),
Key: item.Key,
})
if err != nil {
log.Error().Msgf("ingest agent version: %v", err)
log.Error().Msgf("S3 download failed: %v", err)
continue
}

key := (*item.Key)[:strings.IndexByte(*item.Key, '/')]
versioned_tarball[key] = b
}

plugin_tags_with_urls, err := prepareAgentPluginReleases(ctx, tags_to_ingest)
tags_with_urls, err := prepareAgentBinariesReleases(ctx, versioned_tarball)
if err != nil {
return err
}

return ingestAgentPluginsVersion(ctx, plugin_tags_with_urls)
err = ingestAgentVersion(ctx, tags_with_urls)
return err
}

func prepareAgentReleases(ctx context.Context, tags_to_ingest []string) (map[string]string, error) {
func prepareAgentBinariesReleases(ctx context.Context, versioned_tarball map[string]*aws.WriteAtBuffer) (map[string]string, error) {
processed_tags := map[string]string{}
minio, err := directory.MinioClient(ctx)
if err != nil {
return processed_tags, err
}

for _, tag := range tags_to_ingest {
local_root := "/tmp/" + tag
cmd := exec.Command("rm", []string{"-rf", local_root}...)
if err := cmd.Run(); err != nil {
log.Warn().Err(err).Msg("rm")
}
cmd = exec.Command("mkdir", []string{"-p", local_root + "/home"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("mkdir1")
continue
}
cmd = exec.Command("mkdir", []string{"-p", local_root + "/usr/local"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("mkdir2")
continue
}

agent_image := "deepfenceio/deepfence_agent_ce:" + tag[1:]
cmd = exec.Command("docker", []string{"pull", agent_image}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker pull")
continue
}
cmd = exec.Command("docker", []string{"create", "--name=dummy", agent_image}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker create")
continue
}
cmd = exec.Command("docker", []string{"cp", "dummy:/home/deepfence", local_root + "/home"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker cp")
continue
}
cmd = exec.Command("docker", []string{"cp", "dummy:/usr/local/discovery", local_root + "/usr/local"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker cp")
continue
}
cmd = exec.Command("docker", []string{"rm", "dummy"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker rm")
continue
}
out_file := fmt.Sprintf("%s.tar.gz", tag)
cmd = exec.Command("tar", []string{"zcvf", out_file, "-C", local_root, "."}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Untar")
continue
}
b, err := os.ReadFile(out_file)
if err != nil {
log.Error().Err(err).Msg("ReadFile")
continue
}
res, err := minio.UploadFile(ctx, out_file, b, false, m.PutObjectOptions{ContentType: "application/gzip"})
for version, b := range versioned_tarball {
res, err := minio.UploadFile(ctx,
version,
b.Bytes(),
false,
m.PutObjectOptions{ContentType: "application/gzip"})
key := ""
if err != nil {
ape, ok := err.(directory.AlreadyPresentError)
Expand All @@ -152,7 +123,7 @@ func prepareAgentReleases(ctx context.Context, tags_to_ingest []string) (map[str
continue
}
log.Debug().Msgf("Exposed URL: %v", url)
processed_tags[tag] = url
processed_tags[version] = url
}
return processed_tags, nil
}
Expand All @@ -179,141 +150,10 @@ func ingestAgentVersion(ctx context.Context, tags_to_url map[string]string) erro
if _, err = tx.Run(`
UNWIND $batch as row
MERGE (n:AgentVersion{node_id: row.tag})
SET n.url = row.url`,
ON CREATE SET n.url = row.url`,
map[string]interface{}{"batch": tags_to_ingest}); err != nil {
return err
}

return tx.Commit()
}

var agent_plugins = map[string]string{
"fluentbit": "/opt/td-agent-bit/bin/td-agent-bit",
"discovery": "/usr/local/discovery/deepfence-discovery",
"package_scanner": "/home/deepfence/bin/package-scanner",
"secret_scanner": "/home/deepfence/bin/secret-scanner/SecretScanner",
"malware_scanner": "/home/deepfence/bin/secret-scanner/malware_scanner",
"self": "/bin/deepfenced",
}

func prepareAgentPluginReleases(ctx context.Context, tags_to_ingest []string) (map[string]map[string]string, error) {
processed_tags := map[string]map[string]string{}
minio, err := directory.MinioClient(ctx)
if err != nil {
return processed_tags, err
}

for _, tag := range tags_to_ingest {
processed_tags[tag] = map[string]string{}
local_root := "/tmp/" + tag
cmd := exec.Command("rm", []string{"-rf", local_root}...)
if err := cmd.Run(); err != nil {
log.Warn().Err(err).Msg("rm")
}
cmd = exec.Command("mkdir", []string{"-p", local_root + "/home"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("mkdir1")
continue
}
cmd = exec.Command("mkdir", []string{"-p", local_root + "/usr/local"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("mkdir2")
continue
}

agent_image := "deepfenceio/deepfence_agent_ce:" + tag[1:]
cmd = exec.Command("docker", []string{"pull", agent_image}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker pull")
continue
}
cmd = exec.Command("docker", []string{"create", "--name=dummy", agent_image}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker create")
continue
}
for plugin_name, plugin_path := range agent_plugins {
cmd = exec.Command("docker", []string{"cp", "dummy:" + plugin_path, local_root + "/" + plugin_name}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker cp")
continue
}
out_file := fmt.Sprintf("%s.gz", tag)
cmd = exec.Command("gzip", []string{local_root + "/" + plugin_name}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Gzip")
continue
}
b, err := os.ReadFile(out_file)
if err != nil {
log.Error().Err(err).Msg("ReadFile")
continue
}
res, err := minio.UploadFile(ctx, out_file, b, false, m.PutObjectOptions{ContentType: "application/gzip"})
key := ""
if err != nil {
ape, ok := err.(directory.AlreadyPresentError)
if ok {
log.Warn().Err(err).Msg("Skip upload")
key = ape.Path
} else {
log.Error().Err(err).Msg("Upload")
continue
}
} else {
key = res.Key
}

url, err := minio.ExposeFile(ctx, key, false, 10*time.Hour, url2.Values{})
if err != nil {
log.Error().Err(err)
continue
}
log.Debug().Msgf("Exposed URL: %v", url)

processed_tags[tag][plugin_name] = url
}

cmd = exec.Command("docker", []string{"rm", "dummy"}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker rm")
continue
}

cmd = exec.Command("docker", []string{"rmi", agent_image}...)
if err := cmd.Run(); err != nil {
log.Error().Err(err).Msg("Docker rmi")
continue
}
}
return processed_tags, nil
}

func ingestAgentPluginsVersion(ctx context.Context, tags_to_url map[string]map[string]string) error {
nc, err := directory.Neo4jClient(ctx)
if err != nil {
return err
}
session := nc.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()

tx, err := session.BeginTransaction(neo4j.WithTxTimeout(15 * time.Second))
if err != nil {
return err
}
defer tx.Close()

for k, v := range tags_to_url {
for plugin_name, plugin_url := range v {
query := fmt.Sprintf(`
MERGE (n:%sVersion{node_id: tag})
SET n.url = url`, plugin_name)
if _, err = tx.Run(query,
map[string]interface{}{"tag": k, "url": plugin_url}); err != nil {
return err
}
}
}

return tx.Commit()
}
4 changes: 3 additions & 1 deletion deepfence_worker/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/deepfence/ThreatMapper/deepfence_worker

go 1.21
go 1.21.0

toolchain go1.21.1

replace github.com/deepfence/golang_deepfence_sdk/client => ../golang_deepfence_sdk/client/

Expand Down

0 comments on commit b9bc264

Please sign in to comment.