-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding queued querier to knapsack, and OsqueryVersion to menu #1180
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty neat. moving to callbacks is interesting, and worth thinking about.
Today, we have a synchronous interface. And many callers seem to expect that style. Some have their own backoff/retry mechanisms. I don't know if the callback style is a good fit for them, maybe it'll just take migration.
I think I like the callback though. So maybe we should exposer Query
and QueryCallback
or something. And think about what needs a context and timeout?
I guess the other thing to consider, is how we want to handle things that query on an interval. Should the interval move in here? Something like QueryRepeatedly(callback, timer)
but that feels messy. Maybe kick repeating forward.
type Querier interface { | ||
// Query will attempt to send a query to the osquery client. The result of the | ||
// query will be passed to the callback function provided. | ||
Query(query string, callback func(result []map[string]string, err error)) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changing out common Query
interface to being a callback. What do people think about that? Should we give it a different name?
Should we include context? Or some other mechanism for timeouts?
pkg/agent/knapsack/queued_querier.go
Outdated
ctx, qq.cancel = context.WithCancel(ctx) | ||
for { | ||
if qq.syncQuerier != nil { | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this get some kind of healthcheck to make sure the underlying querier is actually alive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm maybe? If it does die, queries will fall on the floor and I assume an error will be returned quickly to the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just a health check in the initial startup. But I'm thinking about intent, I assume the purpose of the callback interface, is to allow various "get info" style things to get chucked into this queue on startup, before osquery finishes booting. From that perspective, we should health check here. We know the first couple queries may fail.
qq := &queuedQuerier{ | ||
logger: log.With(logger, "component", "querier"), | ||
queue: q, | ||
queueCh: make(chan struct{}, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I arbitrarily chose 10, but I don't have a good feel for what's appropriate.
This would be the number of queries in the queue when Query
invocations become blocking, and waiting for other queries to process. Maybe it should be larger just to provide plenty of room?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm ... I worry there is a chance that some other part of the code gets blocked why waiting on the channel to drain. Why don't we just leave it unbuffered and put the channel send in the Query
func in a goroutine?
func (qq *queuedQuerier) Query(query string, callback func(result []map[string]string, err error)) error {
if qq == nil {
return errors.New("queuedQuerier is nil")
}
// Push the item on to the queue, and notify the channel so the query is processed
qq.push(&queueItem{query: query, callback: callback})
go func() {
qq.queueCh <- struct{}{}
}()
return nil
}
Or we could set the buffer 1 and check that len(qq.queueCh) == 0
before pushing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasonable! This should only really get used for the internal stuff, so order of 10 seems okay. Comment please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I like @James-Pickett suggestion, it would be nice to guarantee that Query
does not block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think James is probably correct. There's already the list there.
It makes me wonder if this channel is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel is mostly to wake up the processing thread in cases where the queue is empty, or the querier is not yet available.
We could just spawn a goroutine for every query instead of queueing and processing serially... That would assume there is proper thread safety within the querier impls. If we spawn 100 goroutines all submitting queries, would that just work? That would also mean clients should not expect their queries to be run in order necessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just spawn a goroutine for every query instead of queueing and processing serially... That would assume there is proper thread safety within the querier impls. If we spawn 100 goroutines all submitting queries, would that just work? That would also mean clients should not expect their queries to be run in order necessarily.
I was thinking the same ... effectively have a stack of blocking goroutines as an unordered queue. I don't think we care about order at this level.
If the caller wanted, they could force their own queries to by synchronous at their level. ie: send query A ... wait for results .. send query B
@@ -721,3 +732,33 @@ func iconFilename() string { | |||
func (r *DesktopUsersProcessesRunner) iconFileLocation() string { | |||
return filepath.Join(r.usersFilesRoot, iconFilename()) | |||
} | |||
|
|||
func (r *DesktopUsersProcessesRunner) refreshOsqueryData() { | |||
osqueryVersion := "Unknown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, when desktop first starts, the menu item will display as osquery Version:
. Upon the query result being returned it will change to osquery Version: <the version number>
or osquery Version: Unknown
in case of an error.
Is osquery Version:
what we want for that waiting-for-osquery state? It could be something like osquery Version: Waiting for osquery...
🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don'y think it matters much. Unknown
or Waiting...
Not going to merge this right now. See also #1477 |
This PR adds a new piece of common functionality to
Knapsack
- a querier for issuing queries to osquery and getting results in an asynchronous fashion.Why?
We already have the
Querier
interface and concept scattered around the code base. Some places have reimplemented the same things like backoff/retries, and sleeping to wait for osquery to be ready. It seems logical to consolidate and streamline this functionality, and provide a simple API for doing this. Adding this toKnapsack
enables this functionality to be easily accessed throughout the code base.How?
Knapsack
now conforms to theQuerier
interface and this dependency is injected into it, either as a concrete implementation or a mock. The implementation I've provided is a FIFOqueuedQuerier
which asynchronously processes queries as they are added, and invokes callbacks to provide errors/results to the original clients.Additionally, I've added
OsqueryVersion
as a menu template variable, since we already wanted that, and it provides a nice opportunity to use the queued querier in action. The menu is initially built with a default value forOsqueryVersion
, and when the query result is returned, the menu is refreshed with the new data.