Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KATC] Update config schema #1770

Closed
49 changes: 37 additions & 12 deletions ee/katc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
// identifier parsed from the JSON KATC config, and the `dataFunc` is the function
// that performs the query against the source.
type katcSourceType struct {
name string
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error)
name string
// `pathConstraints` comes from the live query or scheduled query, and constrains `sourcePaths` to a particular value.
// For example, `sourcePaths` may allow for querying files belonging to any user ("/Users/%/path/to/file"), and
// `pathConstraints` may narrow the query to a path for a particular user ("/Users/example/path/to/file").
dataFunc func(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, pathConstraints *table.ConstraintList) ([]sourceData, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little unsure about how pathConstraints will be used. Can you provide an example? (In this comment is fine, doesn't need to be docs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what the code is doing, but I'm not sure it adds a lot over a pushing sourcePath through `filepath.Glob(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added documentation with an example

}

// sourceData holds the result of calling `katcSourceType.dataFunc`. It maps the
Expand Down Expand Up @@ -101,35 +104,57 @@ func (r *rowTransformStep) UnmarshalJSON(data []byte) error {
// katcTableConfig is the configuration for a specific KATC table. The control server
// sends down these configurations.
type katcTableConfig struct {
Name string `json:"name"`
SourceType katcSourceType `json:"source_type"`
Source string `json:"source"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
Platform string `json:"platform"`
SourcePaths []string `json:"source_paths"` // Describes how to connect to source (e.g. path to db) -- % and _ wildcards supported
Filter string `json:"filter"`
Columns []string `json:"columns"`
Query string `json:"query"` // Query to run against `path`
SourceQuery string `json:"source_query"` // Query to run against each source path
RowTransformSteps []rowTransformStep `json:"row_transform_steps"`
}

// ConstructKATCTables takes stored configuration of KATC tables, parses the configuration,
// and returns the constructed tables.
func ConstructKATCTables(config map[string]string, slogger *slog.Logger) []osquery.OsqueryPlugin {
plugins := make([]osquery.OsqueryPlugin, 0)
for tableName, tableConfigStr := range config {

tableConfigs, tableConfigsExist := config["tables"]
if !tableConfigsExist {
slogger.Log(context.TODO(), slog.LevelWarn,
"missing top-level tables key in KATC config, cannot construct tables",
)

return plugins
}

// We want to unmarshal each table config separately, so that we don't fail to configure all tables
// if only some are malformed.
var rawTableConfigs []json.RawMessage
if err := json.Unmarshal([]byte(tableConfigs), &rawTableConfigs); err != nil {
slogger.Log(context.TODO(), slog.LevelWarn,
"could not unmarshal tables in KATC config",
"err", err,
)
return plugins
}

for _, rawTableConfig := range rawTableConfigs {
var cfg katcTableConfig
if err := json.Unmarshal([]byte(tableConfigStr), &cfg); err != nil {
if err := json.Unmarshal(rawTableConfig, &cfg); err != nil {
slogger.Log(context.TODO(), slog.LevelWarn,
"unable to unmarshal config for Kolide ATC table, skipping",
"table_name", tableName,
"unable to unmarshal config for KATC table, skipping",
"err", err,
)
continue
}

if cfg.Platform != runtime.GOOS {
// For now, the filter is simply the OS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably want to future proof this a little.

Brainstorming... We could define a rich DSL with boolean logic. Or maybe we can just have a series of strings? Or maybe a series of K:V pairs? Early use might be goos:darwin or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making filter into filters, a map[string]string? Simple to unmarshal and therefore parse, and flexible for future additions?

{
						"name": "kolide_indexeddb_leveldb_test",
						"source_type": "indexeddb_leveldb",
						"filters": {
						     "goos": "darwin"
						},
						"columns": ["data"],
						"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
						"source_query": "db.store",
						"row_transform_steps": ["deserialize_chrome"]
					}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filters makes sense to me. I don't know if that should be an array of strings, or a map. Array might be little more flexible, I think either can work.

I think we should ponder whether that's an AND or an OR. If we go the way of overlays, this should be an AND, because the overlays handle OR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated to filters in the other PR #1772 -- if we decide to go with this approach I'll make the update here as well.

if cfg.Filter != runtime.GOOS {
continue
}

t, columns := newKatcTable(tableName, cfg, slogger)
plugins = append(plugins, table.NewPlugin(tableName, columns, t.generate))
t, columns := newKatcTable(cfg, slogger)
plugins = append(plugins, table.NewPlugin(cfg.Name, columns, t.generate))
}

return plugins
Expand Down
143 changes: 96 additions & 47 deletions ee/katc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,84 +21,133 @@ func TestConstructKATCTables(t *testing.T) {
{
testCaseName: "snappy_sqlite",
katcConfig: map[string]string{
"kolide_snappy_sqlite_test": fmt.Sprintf(`{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"tables": fmt.Sprintf(`[
{
"name": "kolide_snappy_sqlite_test",
"source_type": "sqlite",
"filter": "%s",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data JOIN object_store ON (object_data.object_store_id = object_store.id) WHERE object_store.name=\"testtable\";",
"row_transform_steps": ["snappy"]
}
]`, runtime.GOOS),
},
expectedPluginCount: 1,
},
{
testCaseName: "indexeddb_leveldb",
katcConfig: map[string]string{
"kolide_indexeddb_leveldb_test": fmt.Sprintf(`{
"source_type": "indexeddb_leveldb",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.indexeddb.leveldb",
"query": "db.store",
"row_transform_steps": ["deserialize_chrome"]
}`, runtime.GOOS),
"tables": fmt.Sprintf(`[
{
"name": "kolide_indexeddb_leveldb_test",
"source_type": "indexeddb_leveldb",
"filter": "%s",
"columns": ["data"],
"source_paths": ["/some/path/to/db.indexeddb.leveldb"],
"source_query": "db.store",
"row_transform_steps": ["deserialize_chrome"]
}
]`, runtime.GOOS),
},
expectedPluginCount: 1,
},
{
testCaseName: "multiple plugins",
katcConfig: map[string]string{
"test_1": fmt.Sprintf(`{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"]
}`, runtime.GOOS),
"test_2": fmt.Sprintf(`{
"source_type": "sqlite",
"platform": "%s",
"columns": ["col1", "col2"],
"source": "/some/path/to/a/different/db.sqlite",
"query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"]
}`, runtime.GOOS),
"tables": fmt.Sprintf(`[
{
"name": "test_1",
"source_type": "sqlite",
"filter": "%s",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"]
},
{
"name": "test_2",
"source_type": "sqlite",
"filter": "%s",
"columns": ["col1", "col2"],
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["camel_to_snake"]
}
]`, runtime.GOOS, runtime.GOOS),
},
expectedPluginCount: 2,
},
{
testCaseName: "skips invalid tables and returns valid tables",
katcConfig: map[string]string{
"tables": fmt.Sprintf(`[
{
"name": "not_a_valid_table",
"source_type": "not a real type",
"filter": "%s",
"columns": ["col1", "col2"],
"source_paths": ["/some/path/to/a/different/db.sqlite"],
"source_query": "SELECT col1, col2 FROM some_table;",
"row_transform_steps": ["not a real row transform step"]
},
{
"name": "valid_table",
"source_type": "sqlite",
"filter": "%s",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["snappy"]
}
]`, runtime.GOOS, runtime.GOOS),
},
expectedPluginCount: 1,
},
{
testCaseName: "malformed config",
katcConfig: map[string]string{
"malformed_test": "this is not a config",
},
expectedPluginCount: 0,
},
{
testCaseName: "malformed table",
katcConfig: map[string]string{
"tables": "this is not a config",
},
expectedPluginCount: 0,
},
{
testCaseName: "invalid table source",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"source_type": "unknown_source",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;"
}`, runtime.GOOS),
"tables": fmt.Sprintf(`[
{
"name": "kolide_snappy_test",
"source_type": "unknown_source",
"filter": "%s",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;"
}
]`, runtime.GOOS),
},
expectedPluginCount: 0,
},
{
testCaseName: "invalid data processing step type",
katcConfig: map[string]string{
"kolide_snappy_test": fmt.Sprintf(`{
"source_type": "sqlite",
"platform": "%s",
"columns": ["data"],
"source": "/some/path/to/db.sqlite",
"query": "SELECT data FROM object_data;",
"row_transform_steps": ["unknown_step"]
}`, runtime.GOOS),
"tables": fmt.Sprintf(`[
{
"name": "kolide_snappy_test",
"source_type": "sqlite",
"filter": "%s",
"columns": ["data"],
"source_paths": ["/some/path/to/db.sqlite"],
"source_query": "SELECT data FROM object_data;",
"row_transform_steps": ["unknown_step"]
}
]`, runtime.GOOS),
},
expectedPluginCount: 0,
},
Expand Down
56 changes: 29 additions & 27 deletions ee/katc/indexeddb_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,41 @@ import (
// found at the filepath in `sourcePattern`. It retrieves all rows from the database
// and object store specified in `query`, which it expects to be in the format
// `<db name>.<object store name>`.
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error) {
pathPattern := sourcePatternToGlobbablePattern(sourcePattern)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("getting db and object store names: %w", err)
}

// Query databases
func indexeddbLeveldbData(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, pathConstraints *table.ConstraintList) ([]sourceData, error) {
results := make([]sourceData, 0)
for _, db := range leveldbs {
// Check to make sure `db` adheres to sourceConstraints
valid, err := checkSourceConstraints(db, sourceConstraints)
for _, sourcePath := range sourcePaths {
pathPattern := sourcePatternToGlobbablePattern(sourcePath)
leveldbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
return nil, fmt.Errorf("globbing for leveldb files: %w", err)
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
// Extract database and table from query
dbName, objectStoreName, err := extractQueryTargets(query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
return nil, fmt.Errorf("getting db and object store names: %w", err)
}

// Query databases
for _, db := range leveldbs {
// Check to make sure `db` adheres to pathConstraints
valid, err := checkPathConstraints(db, pathConstraints)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
}

rowsFromDb, err := indexeddb.QueryIndexeddbObjectStore(db, dbName, objectStoreName)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", db, err)
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}
results = append(results, sourceData{
path: db,
rows: rowsFromDb,
})
}

return results, nil
Expand Down
44 changes: 23 additions & 21 deletions ee/katc/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,34 @@ import (
)

// sqliteData is the dataFunc for sqlite KATC tables
func sqliteData(ctx context.Context, slogger *slog.Logger, sourcePattern string, query string, sourceConstraints *table.ConstraintList) ([]sourceData, error) {
pathPattern := sourcePatternToGlobbablePattern(sourcePattern)
sqliteDbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("globbing for files with pattern %s: %w", pathPattern, err)
}

func sqliteData(ctx context.Context, slogger *slog.Logger, sourcePaths []string, query string, pathConstraints *table.ConstraintList) ([]sourceData, error) {
results := make([]sourceData, 0)
for _, sqliteDb := range sqliteDbs {
// Check to make sure `sqliteDb` adheres to sourceConstraints
valid, err := checkSourceConstraints(sqliteDb, sourceConstraints)
for _, sourcePath := range sourcePaths {
pathPattern := sourcePatternToGlobbablePattern(sourcePath)
sqliteDbs, err := filepath.Glob(pathPattern)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
return nil, fmt.Errorf("globbing for files with pattern %s: %w", pathPattern, err)
}

rowsFromDb, err := querySqliteDb(ctx, slogger, sqliteDb, query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", sqliteDb, err)
for _, sqliteDb := range sqliteDbs {
// Check to make sure `sqliteDb` adheres to pathConstraints
valid, err := checkPathConstraints(sqliteDb, pathConstraints)
if err != nil {
return nil, fmt.Errorf("checking source path constraints: %w", err)
}
if !valid {
continue
}

rowsFromDb, err := querySqliteDb(ctx, slogger, sqliteDb, query)
if err != nil {
return nil, fmt.Errorf("querying %s: %w", sqliteDb, err)
}
results = append(results, sourceData{
path: sqliteDb,
rows: rowsFromDb,
})
}
results = append(results, sourceData{
path: sqliteDb,
rows: rowsFromDb,
})
}

return results, nil
Expand Down
Loading
Loading