Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cluster management tools #13

Open
3 of 4 tasks
delgadom opened this issue Oct 6, 2018 · 6 comments
Open
3 of 4 tasks

Improve cluster management tools #13

delgadom opened this issue Oct 6, 2018 · 6 comments

Comments

@delgadom
Copy link
Member

delgadom commented Oct 6, 2018

Goals

Given a list of futures (partially completed) and a cluster, should be easy to:

  • Get the state, run duration, and any exceptions
  • Get the worker holding/processing the task
  • Get the pod the worker is running on
  • Execute arbitrary kubernetes commands on the pod and return the result

These should be tested on RHG hub, and, if stable/useful, submitted as PRs to dask_kubernetes.

What we have so far

class FutureNotFoundError(KeyError):
    pass

class PodNotFoundError(KeyError):
    pass

def get_worker_ip_from_future(future, cluster):
    try:
        return list(cluster.scheduler.who_has[gc_ftrs.key])[0]
    except KeyError:
        pass

    for w in cluster.scheduler.workers.values():
        if (future in w.processing) or (future.key in [t.key for t in w.has_what]):
            return w.address
    
    raise FutureNotFoundError('task {} not found in cluster tasks'.format(future))

def get_pod_from_ip(ip, cluster):
    ip = ip.split('://')[-1].split(':')[0]
    
    for p in cluster.pods():
        if p.status.pod_ip == ip:
            return p

    raise PodNotFoundError('No pod found with IP address {}'.format(ip))

There have to be faster ways of doing this, but I don't see them in the dask_kubernetes docs. They're admittedly very hacky though.

The next steps of executing things on the pods could be done from the command line, but using the kubernetes api would obviously be more elegant. pods returned by get_pod_from_ip have a bunch of useful info. it's possible we could get the appropriately configured/authenticated kubernetes client from the dask cluster or client and execute things on this. The kubernetes package has some examples of calling exec using the kubernetes api.

@delgadom
Copy link
Member Author

delgadom commented May 4, 2019

Alternatively, I think these are slightly less hacky?

class WorkerNotFoundError(Exception):
    pass

class PodNotFoundError(Exception):
    pass

def find_worker_processing_future(future, cluster):
    for worker in cluster.scheduler.workers.values():
        if future.key in list(map(lambda x: x.key, worker.processing)):
            return worker
        
    raise WorkerNotFoundError('No worker found processing future {}'.format(future.key))
    
def find_pod_containing_worker(worker, cluster):
    for pod in cluster.pods():
        if pod.status.pod_ip == worker.host:
            return pod
    
    raise PodNotFoundError('No pod found running {}'.format(worker.__repr__()))
    
def get_pod_processing_future(future, cluster):
    return find_pod_containing_worker(
        find_worker_processing_future(future, cluster),
        cluster)

@delgadom
Copy link
Member Author

delgadom commented May 4, 2019

Also, this kubernetes exec example seems to show promise, though I can't get the auth to work:

def exec_command_on_pod(pod, cluster, exec_command):

    res = cluster.core_api.connect_get_namespaced_pod_exec(
        name=pod.metadata.name,
        namespace=pod.metadata.namespace,
        command=exec_command,
        stderr=True, stdin=False,
        stdout=True, tty=False)

    return res

@delgadom
Copy link
Member Author

delgadom commented May 4, 2019

@bolliger32 the second comment may be helpful. I want the last bit to work, but this gets us at least partway there...

@delgadom
Copy link
Member Author

delgadom commented May 6, 2019

'''

To install google cloud SDK, follow the instructions in the
google cloud docs: https://cloud.google.com/sdk/docs/. On
compute.rhg.com, simply make sure your installation is up
to date:

.. code-block:: bash

    sudo apt-get update && sudo apt-get upgrade -y google-cloud-sdk

Then, log in and set up your account:

.. code-block:: bash

    gcloud init

To install kubectl:

.. code-block:: bash

    sudo apt-get install kubectl

To set up credentials for directly executing commands on the pods:

.. code-block:: bash

    # Get credentials for the cluster. For compute.rhg.com, the
    # cluster is "jhub-cluster", and the project "rhg-project-1".
    # For compute.impactlab.org, the cluster is "impactlab-hub",
    # and the project "compute-impactlab".
    gcloud container clusters get-credentials jhub-cluster \
        --zone us-west1-a --project rhg-project-1

These steps only need to be executed once per machine

'''

import os
from tempfile import TemporaryFile
import tarfile

from kubernetes.stream import stream as kubernetes_stream
from kubernetes import client as kube_client, config as kube_config

_kube_core_v1_api = None


def _reload_kubernetes_config():
    global _kube_core_v1_api

    _kube_core_v1_api = kube_client.CoreV1Api()
    kube_config.load_kube_config(
        config_file='/home/jovyan/.kube/config',
        client_configuration=_kube_core_v1_api.api_client.configuration,
        persist_config=True)


class WorkerNotFoundError(Exception):
    pass


class PodNotFoundError(Exception):
    pass


def find_worker_processing_future(future, cluster):
    '''
    Look up the worker on a cluster currently processing a future
    '''
    _reload_kubernetes_config()

    for worker in cluster.scheduler.workers.values():
        if future.key in list(map(lambda x: x.key, worker.processing)):
            return worker
        
    raise WorkerNotFoundError('No worker found processing future {}'.format(future.key))


def find_pod_containing_worker(worker, cluster):
    '''
    Look up the pod on a cluster which hosts a worker
    '''
    _reload_kubernetes_config()

    for pod in cluster.pods():
        if pod.status.pod_ip == worker.host:
            return pod
    
    raise PodNotFoundError('No pod found running {}'.format(worker.__repr__()))


def get_pod_processing_future(future, cluster):
    '''
    Look up the pod on a cluster currently processing a future
    '''

    _reload_kubernetes_config()

    return find_pod_containing_worker(
        find_worker_processing_future(future, cluster),
        cluster)

    
def execute_command_on_pod(pod, command):
    '''
    Execute an arbitrary command on a pod
    
    Adapted from adapted from
    https://github.com/aogier/k8s-client-python/blob/master/examples/exec.py
    
    Parameters
    ----------
    pod : object
        A :py:class:`kubernetes.client.models.v1_pod.V1Pod` object
    cmd : list
        the command to executed, formatted as a list, with
        the program to execute first, and the arguments after
        
    Examples
    --------
    
    .. code-block:: python
    
        >>> res = execute_command_on_pod(pod, ['ls', '/'])
        >>> print(res)
        bin
        boot
        dev
        etc
        home
        lib
        lib64
        media
        mnt
        opt
        proc
        root
        run
        sbin
        srv
        sys
        tmp
        usr
        var
    '''

    _reload_kubernetes_config()

    resp = kubernetes_stream(
        _kube_core_v1_api.connect_get_namespaced_pod_exec,
        name=pod.metadata.name,
        namespace=pod.metadata.namespace,
        command=command,
        stderr=True, stdin=False,
        stdout=True, tty=False)

    return resp


def copy_file_to_pod(pod, src, dst):
    '''
    Upload a file to the pod
    
    adapted from
    https://github.com/aogier/k8s-client-python/blob/12f1443895e80ee24d689c419b5642de96c58cc8/examples/exec.py#L101
    
    Parameters
    ----------
    pod : object
        A :py:class:`kubernetes.client.models.v1_pod.V1Pod` object
    src : str
        The location of the file to upload on the local machine
    dst : str
        The destination of the file on the pod
    '''
    
    _reload_kubernetes_config()

    # Copying file client -> pod
    exec_command = ['tar', 'xvf', '-', '-C', os.path.dirname(dst)]
    resp = kubernetes_stream(
        _kube_core_v1_api.connect_get_namespaced_pod_exec,
        pod.metadata.name,
        pod.metadata.namespace,
        command=exec_command,
        stderr=True, stdin=True,
        stdout=True, tty=False,
        _preload_content=False)

    with TemporaryFile() as tar_buffer:
        with tarfile.open(fileobj=tar_buffer, mode='w') as tar:
            tar.add(src)
            tar.add(name=src, arcname=os.path.basename(dst))
        tar_buffer.seek(0)
        commands = []
        commands.append(tar_buffer.read())

        while resp.is_open():
            resp.update(timeout=1)
            if resp.peek_stdout():
                print("STDOUT: %s" % resp.read_stdout())
            if resp.peek_stderr():
                print("STDERR: %s" % resp.read_stderr())
            if commands:
                c = commands.pop(0)
                #print("Running command... %s\n" % c)
                resp.write_stdin(c.decode())
            else:
                break

        resp.close()

def copy_file_from_pod(pod, src, dst):
    '''
     Download a file from the pod
    
    adapted from
    https://github.com/aogier/k8s-client-python/blob/12f1443895e80ee24d689c419b5642de96c58cc8/examples/exec.py#L101
    
    Parameters
    ----------
    pod : object
        A :py:class:`kubernetes.client.models.v1_pod.V1Pod` object
    src : str
        The location of the file on the pod
    dst : str
        The destination of the file on the local machine
    '''

    _reload_kubernetes_config()
    
    # Copying file pod -> client
    exec_command = ['tar', 'cf', '-', '-C', os.path.dirname(src), os.path.basename(src)]

    with TemporaryFile() as tar_buffer:

        resp = kubernetes_stream(
            _kube_core_v1_api.connect_get_namespaced_pod_exec,
            pod.metadata.name,
            pod.metadata.namespace,
            command=exec_command,
            stderr=True, stdin=True,
            stdout=True, tty=False,
            _preload_content=False)

        while resp.is_open():
            resp.update(timeout=1)
            if resp.peek_stdout():
                out = resp.read_stdout()
                #print("STDOUT: %s" % len(out))
                tar_buffer.write(out.encode('ascii'))
            if resp.peek_stderr():
                print("STDERR: %s" % resp.read_stderr())

        resp.close()

        tar_buffer.flush()
        tar_buffer.seek(0)

        with tarfile.open(fileobj=tar_buffer, mode='r:') as tar:
            tar.extract(os.path.basename(src), os.path.dirname(dst))

This can be used to execute an arbitrary command on a worker currently processing a future, e.g.:

In [1]: pod = get_pod_processing_future(running_futures[0], cluster)
In [2]: res = execute_command_on_pod(p, ['cat', '/mytmpdir/geoclaw.err'])
In [3]: print(res)
Out[3]:
/opt/conda/envs/worker/lib/python3.6/site-packages/numpy/core/_methods.py:29: RuntimeWarning: invalid value encountered in reduce
  return umr_minimum(a, axis, None, out, keepdims)

@delgadom
Copy link
Member Author

delgadom commented May 7, 2019

system prerequisites to use the block of code above:

sudo apt-get update && sudo apt-get install --upgrade google-cloud-sdk kubectl
pip install --upgrade parameterize_jobs rhg_compute_tools
rm -rf /home/jovyan/.kube/

Log into google cloud:

gcloud init

Log into your personal account (create a new profile). I hope to get rid of this soon, and enable the use of service accounts, but for now this works. Use rhg-project-1/us-west1-a (13) for the project/region for compute.rhg.com. Use compute-impactlab for the compute.impactlab.org project.

Then pull your credentials for the kubernetes cluster, e.g.:

gcloud container clusters get-credentials jhub-cluster --zone us-west1-a --project rhg-project-1

I've found I usually need to execute a kubectl exec command from the command line before the python version will work. Not sure why this is... it's super frustrating. But then this works fine... for a while, and then you'll have to run the get-credentials and kubectl exec lines again. Obviously something to do with the command modifying the config or caching something, but not sure what's missing from our python setup. E.g. run the following:

kubectl exec dask-delgadom-08c6d5e7-1gfwsc --namespace rhodium-jupyter -- ls /

and then you should be good to go for a while, even with different pods. You'll periodically need to run the last two commands again to refresh.

@delgadom
Copy link
Member Author

delgadom commented May 8, 2019

note that the upload/download functions currently require that the files be ascii and unicode compatible. Bytes are not supported. See this thread... apparently the kubernetes stream objects expect the data to be strings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant