conjur module
#
# Copyright (C) 2014 Conjur Inc
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
from config import Config
from api import API
from group import Group
from user import User
from host import Host
from layer import Layer
from resource import Resource
from role import Role
from variable import Variable
from exceptions import ConjurException
from config import config
def _config(given):
if given is None:
env_config_file = os.getenv('CONJURRC')
if env_config_file is not None:
config.load(env_config_file)
return config
return given
def configure(**kwargs):
"""
Convenience function to merge multiple settings into the default global
config.
Example:
>>> import conjur
>>> conjur.configure(appliance_url='https://conjur.example.com/api',
... account='example',
... cert_path='/path/to/cert.pem')
"""
config.update(**kwargs)
return config
def new_from_netrc(netrc_file=None, configuration=None):
"""
Create a `conjur.API` instance using an identity loaded from netrc. This method
uses the identity stored for the host `config.authn_url`.
`netrc_file` is an alternative path to the netrc formatted file. Defaults
to ~/.netrc on unixy systems.
`configuration` is a `conjur.Config` instance used to determine the host
in the netrc file, and also passed to the `conjur.new_from_key` method to
create the API instance using the identity.
"""
import netrc
configuration = _config(configuration)
auth = netrc.netrc(netrc_file).authenticators(configuration.authn_url)
if auth is None:
raise ValueError("No authenticators found for authn_url '%s' in %s" % (
configuration.authn_url,
(netrc_file or '~/.netrc')
))
login, _, api_key = auth
return new_from_key(login, api_key, configuration)
def new_from_key(login, api_key, configuration=None):
"""
Create a `conjur.API` instance that will authenticate on demand as the identity given
by `login` and `api_key`.
`login` is the identity of the Conjur user or host to authenticate as.
`api_key` is the api key *or* password to use when authenticating.
`configuration` is a `conjur.Config` instance for the api. If not given the global
`Config` instance (`conjur.config`) will be used.
"""
return API(credentials=(login, api_key), config=_config(configuration))
def new_from_token(token, configuration=None):
"""
Create a `conjur.API` instance that will authenticate using the given signed Conjur
token.
This is useful if you want to act on behalf of a the identity of an
HTTP request containing a user's signed token.
`token` is the json formatted, *not* base64'd, Conjur authentication Token.
`configuration` is a conjur.Config instance for the api. If not given, the global Config
instance (`conjur.config`) will be used.
"""
return API(token=token, config=_config(configuration))
__all__ = (
'config', 'Config', 'Group', 'API', 'User', 'Host', 'Layer', 'Resource', 'Role', 'Variable',
'new_from_key', 'new_from_netrc', 'new_from_token', 'configure', 'ConjurException'
)
Module variables
var config
Functions
def configure(
**kwargs)
Convenience function to merge multiple settings into the default global config.
Example:
>>> import conjur
>>> conjur.configure(appliance_url='https://conjur.example.com/api',
... account='example',
... cert_path='/path/to/cert.pem')
def configure(**kwargs):
"""
Convenience function to merge multiple settings into the default global
config.
Example:
>>> import conjur
>>> conjur.configure(appliance_url='https://conjur.example.com/api',
... account='example',
... cert_path='/path/to/cert.pem')
"""
config.update(**kwargs)
return config
def new_from_key(
login, api_key, configuration=None)
Create a API
instance that will authenticate on demand as the identity given
by login
and api_key
.
login
is the identity of the Conjur user or host to authenticate as.
api_key
is the api key or password to use when authenticating.
configuration
is a Config
instance for the api. If not given the global
Config
instance (config
) will be used.
def new_from_key(login, api_key, configuration=None):
"""
Create a `conjur.API` instance that will authenticate on demand as the identity given
by `login` and `api_key`.
`login` is the identity of the Conjur user or host to authenticate as.
`api_key` is the api key *or* password to use when authenticating.
`configuration` is a `conjur.Config` instance for the api. If not given the global
`Config` instance (`conjur.config`) will be used.
"""
return API(credentials=(login, api_key), config=_config(configuration))
def new_from_netrc(
netrc_file=None, configuration=None)
Create a API
instance using an identity loaded from netrc. This method
uses the identity stored for the host config.authn_url
.
netrc_file
is an alternative path to the netrc formatted file. Defaults
to ~/.netrc on unixy systems.
configuration
is a Config
instance used to determine the host
in the netrc file, and also passed to the new_from_key
method to
create the API instance using the identity.
def new_from_netrc(netrc_file=None, configuration=None):
"""
Create a `conjur.API` instance using an identity loaded from netrc. This method
uses the identity stored for the host `config.authn_url`.
`netrc_file` is an alternative path to the netrc formatted file. Defaults
to ~/.netrc on unixy systems.
`configuration` is a `conjur.Config` instance used to determine the host
in the netrc file, and also passed to the `conjur.new_from_key` method to
create the API instance using the identity.
"""
import netrc
configuration = _config(configuration)
auth = netrc.netrc(netrc_file).authenticators(configuration.authn_url)
if auth is None:
raise ValueError("No authenticators found for authn_url '%s' in %s" % (
configuration.authn_url,
(netrc_file or '~/.netrc')
))
login, _, api_key = auth
return new_from_key(login, api_key, configuration)
def new_from_token(
token, configuration=None)
Create a API
instance that will authenticate using the given signed Conjur
token.
This is useful if you want to act on behalf of a the identity of an HTTP request containing a user's signed token.
token
is the json formatted, not base64'd, Conjur authentication Token.
configuration
is a conjur.Config instance for the api. If not given, the global Config
instance (config
) will be used.
def new_from_token(token, configuration=None):
"""
Create a `conjur.API` instance that will authenticate using the given signed Conjur
token.
This is useful if you want to act on behalf of a the identity of an
HTTP request containing a user's signed token.
`token` is the json formatted, *not* base64'd, Conjur authentication Token.
`configuration` is a conjur.Config instance for the api. If not given, the global Config
instance (`conjur.config`) will be used.
"""
return API(token=token, config=_config(configuration))
Classes
class API
class API(object):
def __init__(self, credentials=None, token=None, config=None):
"""
Creates an API instance configured with the given credentials or token
and config.
Generally you should use `conjur.new_from_key`, `conjur.new_from_netrc`,
or `conjur.new_from_token` to get an API instance instead of calling
this constructor directly.
`credentials` should be a `(login,api_key)` tuple if present.
`token` should be a string containing a Conjur JSON token to use when authenticating.
`config` should be a `conjur.Config` instance, and defaults to `conjur.config`.
"""
if credentials:
self.login, self.api_key = credentials
self.token = None
elif token:
self.token = token
self.login = self.api_key = None
else:
raise TypeError("must be given a credentials or token argument")
if config:
self.config = config
else:
from conjur.config import config as default_config
self.config = default_config
def authenticate(self, cached=True):
"""
Authenticate with Conjur and return a token (str) that can be used
to establish identity to Conjur services.
Returns the json formatted signed Conjur authentication Token.
It is an error to call this method if the API was created with a token
rather than a login and api key.
When `cached` is True, a cached token value will be used if it is
available, otherwise the token will be fetched whether or not a
cached value is present.
"""
if cached and self.token:
return self.token
if not self.login or not self.api_key:
raise ConjurException(
"API created without credentials can't authenticate")
url = "%s/users/%s/authenticate" % (self.config.authn_url,
urlescape(self.login))
self.token = self._request('post', url, self.api_key).text
return self.token
def auth_header(self):
"""
Get the value of an Authorization header to make Conjur requests,
performing authentication if necessary.
Returns a string suitable for use as an `Authorization` header value.
"""
token = self.authenticate()
enc = base64.b64encode(token)
return 'Token token="%s"' % enc
def request(self, method, url, **kwargs):
"""
Make an authenticated request with the given method and url.
Additional arguments are passed to requests..
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`method` is of the standard HTTP verbs (case insensitive).
`url` is the full url to request.
If `kwargs['check_errors']` is `True`, non-2xx responses will raise a `conjur.ConjurException`.
Otherwise, it is the callers responsibility to check the status of the returned `requests.Response`.
Additional are passed through to the requests. call after adding an `Authorization`
header and HTTPS verification settings.
"""
headers = kwargs.setdefault('headers', {})
headers['Authorization'] = self.auth_header()
return self._request(method, url, **kwargs)
def _request(self, method, url, *args, **kwargs):
if 'verify' not in kwargs:
kwargs['verify'] = self.config.verify
check_errors = kwargs.pop('check_errors', True)
response = getattr(requests, method.lower())(url, *args, **kwargs)
if check_errors and response.status_code >= 300:
raise ConjurException("Request failed: %d" % response.status_code)
return response
def get(self, url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated GET request to the given :url:.
Returns a requests.Response object.
If the response status is not 2xx, raises a ConjurException.
`url` is the full url to request.
Keyword arguments are passed through to `requests.get`.
"""
return self.request('get', url, **kwargs)
def post(self, url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated `POST` request to the given `url`
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`url` is the full url to request.
Keyword arguments are passed through to `requests.post`.
"""
return self.request('post', url, **kwargs)
def put(self,url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated `PUT` request to the given `url`
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`url` is the full url to request.
Keyword arguments are passed through to `requests.put`.
"""
return self.request('put', url, **kwargs)
def delete(self, url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated `DELETE` request to the given `url`
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`url` is the full url to request.
Keyword arguments are passed through to `requests.delete`.
"""
return self.request('delete', url, **kwargs)
def role(self, kind, identifier):
"""
Return a `conjur.Role` object with the given kind and id.
This method neither creates nor checks for the roles's existence.
`kind` should be the role kind - for example, `"group"`, `"user"`,
or `"host"`.
`identifier` should be the *unqualified* Conjur id. For example, to
get the role for a user named bub, you would call `api.role('user', 'bub')`.
"""
return Role(self, kind, identifier)
def resource(self, kind, identifier):
"""
Return a `conjur.Resource` object with the given kind and id.
This method neither creates nor checks for the resource's existence.
`kind` should be the resource kind - for example, `"variable"`, `"webservice"`,
or `"configuration"`.
`identifier` should be the *unqualified* Conjur id. For example, to
get the resource for a user variable named db_password, you would call
`api.resource('variable', 'db_password')`.
"""
return Resource(self, kind, identifier)
def group(self, id):
"""
Return a `conjur.Group` object with the given id.
This method neither creates nor checks for the groups's existence.
`id` is the *unqualified* id of the group, and does not include the account or kind.
"""
return Group(self, id)
def create_group(self, id):
"""
Creates a Conjur Group and returns a `conjur.Group` object representing it.
`id` is the identifier of the group to create.
"""
self.post('{0}/groups'.format(self.config.core_url), data={'id': id})
return Group(self, id)
def variable(self, id):
"""
Return a `conjur.Variable` object with the given `id`.
This method neither creates nor checks for the variable's existence.
"""
return Variable(self, id)
def create_variable(self, id=None, mime_type='text/plain', kind='secret',
value=None):
"""
Creates a Conjur variable.
Returns a `conjur.Variable` object.
`id` is an identifier for the new variable. If not given, a unique id will
be generated.
`mime_type` is a string like `text/plain` indicating the content type stored by the
variable. This determines the Content-Type header of responses returning the variable's value.
`kind` is a string indicating a user defined role for the variable.
Ignored by Conjur, but useful for making a variable's
purpose.
`value` is a string assigning an initial value for the variable.
"""
data = {'mime_type': mime_type, 'kind': kind}
if id is not None:
data['id'] = id
if value is not None:
data['value'] = value
attrs = self.post("%s/variables" % self.config.core_url, data=data).json()
id = id or attrs['id']
return Variable(self, id, attrs)
def layer(self, layer_id):
"""
Return a `conjur.Layer` object with the given `layer_id`.
This method neither creates nor checks for the layer's existence.
"""
return Layer(self, layer_id)
def host(self, host_id):
"""
Return a `conjur.Host` object with the given `host_id`.
This method neither creates nor checks for the host's existence.
"""
return Host(self, host_id)
def create_host(self, host_id):
"""
Creates a Conjur Host and returns a `conjur.Host` object that represents it.
`host_id` is the id of the Host to be created. The `conjur.Host` object returned by
this method will have an `api_key` attribute, but when the Host is fetched in the future this attribute
is not available.
"""
attrs = self.post("{0}/hosts".format(self.config.core_url),
data={'id': host_id}).json()
return Host(self, host_id, attrs)
def user(self, login):
"""
Returns an object representing a Conjur user with the given login.
The user is *not* created by this method, and may in fact not exist.
"""
return User(self, login)
def create_user(self, login, password=None):
"""
Create a Conjur user with the given `login` and password, and returns a `conjur.User` object
representing it.
If `password` is not given, the user will only be able to authenticate using the generated api_key
attribute of the returned User instance. Note that this `api_key` will not be available when the User
is fetched in the future.
"""
data = {'login': login}
if password is not None:
data['password'] = password
url = "{0}/users".format(self.config.core_url)
return User(self, login, self.post(url, data=data).json())
def _public_key_url(self, *args):
return '/'.join([self.config.pubkeys_url] +
[urlescape(arg) for arg in args])
def add_public_key(self, username, key):
"""
Upload an openssh formatted public key to be made available for the user
given by `username`.
The key should be formatted like `ssh-rsa bob@example.com`.
"""
self.post(self._public_key_url(username), data=key)
def remove_public_key(self, username, keyname):
"""
Remove a specific public key for the user identified by `username`.
The `keyname` argument refers to the name field in the openssh formatted key
to be deleted.
For example, if they key contents are `ssh-rsa bob@example.com`,
the `keyname` should be `bob@example.com`
"""
self.delete(self._public_key_url(username, keyname))
def remove_public_keys(self, username):
"""
Remove all public keys for the user represented by `username`.
"""
for keyname in self.public_key_names(username):
self.remove_public_key(username, keyname)
def public_keys(self, username):
"""
Returns all keys for the user given by `username`, as a newline delimited string.
The odd format is chosen to support the Conjur SSH login implementation.
"""
return self.get(self._public_key_url(username)).text
def public_key(self, username, keyname):
"""
Return the contents of a specific public key given by `keyname`,
for the user given by `username` as a string.
The name of the key is based on the name entry of the openssh formatted key that was uploaded.
For example, if they key contents are `ssh-rsa bob@example.com`,
the `keyname` should be `bob@example.com`
"""
return self.get(self._public_key_url(username, keyname)).text
def public_key_names(self, username):
"""
Return the names of public keys for the user given by `username`.
The names of the keys are based on the name entry of the openssh formatted key that was uploaded.
For example, if they key contents are `ssh-rsa bob@example.com`,
the `keyname` should be `bob@example.com`
"""
return [k.split(' ')[-1] for k in self.public_keys(username).split('\n')]
Ancestors (in MRO)
Methods
def __init__(
self, credentials=None, token=None, config=None)
Creates an API instance configured with the given credentials or token and config.
Generally you should use new_from_key
, new_from_netrc
,
or new_from_token
to get an API instance instead of calling
this constructor directly.
credentials
should be a (login,api_key)
tuple if present.
token
should be a string containing a Conjur JSON token to use when authenticating.
def __init__(self, credentials=None, token=None, config=None):
"""
Creates an API instance configured with the given credentials or token
and config.
Generally you should use `conjur.new_from_key`, `conjur.new_from_netrc`,
or `conjur.new_from_token` to get an API instance instead of calling
this constructor directly.
`credentials` should be a `(login,api_key)` tuple if present.
`token` should be a string containing a Conjur JSON token to use when authenticating.
`config` should be a `conjur.Config` instance, and defaults to `conjur.config`.
"""
if credentials:
self.login, self.api_key = credentials
self.token = None
elif token:
self.token = token
self.login = self.api_key = None
else:
raise TypeError("must be given a credentials or token argument")
if config:
self.config = config
else:
from conjur.config import config as default_config
self.config = default_config
def add_public_key(
self, username, key)
Upload an openssh formatted public key to be made available for the user
given by username
.
The key should be formatted like ssh-rsa <data...> bob@example.com
.
def add_public_key(self, username, key):
"""
Upload an openssh formatted public key to be made available for the user
given by `username`.
The key should be formatted like `ssh-rsa bob@example.com`.
"""
self.post(self._public_key_url(username), data=key)
def auth_header(
self)
Get the value of an Authorization header to make Conjur requests, performing authentication if necessary.
Returns a string suitable for use as an Authorization
header value.
def auth_header(self):
"""
Get the value of an Authorization header to make Conjur requests,
performing authentication if necessary.
Returns a string suitable for use as an `Authorization` header value.
"""
token = self.authenticate()
enc = base64.b64encode(token)
return 'Token token="%s"' % enc
def authenticate(
self, cached=True)
Authenticate with Conjur and return a token (str) that can be used to establish identity to Conjur services.
Returns the json formatted signed Conjur authentication Token.
It is an error to call this method if the API was created with a token rather than a login and api key.
When cached
is True, a cached token value will be used if it is
available, otherwise the token will be fetched whether or not a
cached value is present.
def authenticate(self, cached=True):
"""
Authenticate with Conjur and return a token (str) that can be used
to establish identity to Conjur services.
Returns the json formatted signed Conjur authentication Token.
It is an error to call this method if the API was created with a token
rather than a login and api key.
When `cached` is True, a cached token value will be used if it is
available, otherwise the token will be fetched whether or not a
cached value is present.
"""
if cached and self.token:
return self.token
if not self.login or not self.api_key:
raise ConjurException(
"API created without credentials can't authenticate")
url = "%s/users/%s/authenticate" % (self.config.authn_url,
urlescape(self.login))
self.token = self._request('post', url, self.api_key).text
return self.token
def create_group(
self, id)
Creates a Conjur Group and returns a Group
object representing it.
id
is the identifier of the group to create.
def create_group(self, id):
"""
Creates a Conjur Group and returns a `conjur.Group` object representing it.
`id` is the identifier of the group to create.
"""
self.post('{0}/groups'.format(self.config.core_url), data={'id': id})
return Group(self, id)
def create_host(
self, host_id)
Creates a Conjur Host and returns a Host
object that represents it.
host_id
is the id of the Host to be created. The Host
object returned by
this method will have an api_key
attribute, but when the Host is fetched in the future this attribute
is not available.
def create_host(self, host_id):
"""
Creates a Conjur Host and returns a `conjur.Host` object that represents it.
`host_id` is the id of the Host to be created. The `conjur.Host` object returned by
this method will have an `api_key` attribute, but when the Host is fetched in the future this attribute
is not available.
"""
attrs = self.post("{0}/hosts".format(self.config.core_url),
data={'id': host_id}).json()
return Host(self, host_id, attrs)
def create_user(
self, login, password=None)
Create a Conjur user with the given login
and password, and returns a User
object
representing it.
If password
is not given, the user will only be able to authenticate using the generated api_key
attribute of the returned User instance. Note that this api_key
will not be available when the User
is fetched in the future.
def create_user(self, login, password=None):
"""
Create a Conjur user with the given `login` and password, and returns a `conjur.User` object
representing it.
If `password` is not given, the user will only be able to authenticate using the generated api_key
attribute of the returned User instance. Note that this `api_key` will not be available when the User
is fetched in the future.
"""
data = {'login': login}
if password is not None:
data['password'] = password
url = "{0}/users".format(self.config.core_url)
return User(self, login, self.post(url, data=data).json())
def create_variable(
self, id=None, mime_type='text/plain', kind='secret', value=None)
Creates a Conjur variable.
Returns a Variable
object.
id
is an identifier for the new variable. If not given, a unique id will
be generated.
mime_type
is a string like text/plain
indicating the content type stored by the
variable. This determines the Content-Type header of responses returning the variable's value.
kind
is a string indicating a user defined role for the variable.
Ignored by Conjur, but useful for making a variable's
purpose.
value
is a string assigning an initial value for the variable.
def create_variable(self, id=None, mime_type='text/plain', kind='secret',
value=None):
"""
Creates a Conjur variable.
Returns a `conjur.Variable` object.
`id` is an identifier for the new variable. If not given, a unique id will
be generated.
`mime_type` is a string like `text/plain` indicating the content type stored by the
variable. This determines the Content-Type header of responses returning the variable's value.
`kind` is a string indicating a user defined role for the variable.
Ignored by Conjur, but useful for making a variable's
purpose.
`value` is a string assigning an initial value for the variable.
"""
data = {'mime_type': mime_type, 'kind': kind}
if id is not None:
data['id'] = id
if value is not None:
data['value'] = value
attrs = self.post("%s/variables" % self.config.core_url, data=data).json()
id = id or attrs['id']
return Variable(self, id, attrs)
def delete(
self, url, **kwargs)
NOTE You will generally not need to use this method directly.
Makes an authenticated DELETE
request to the given url
Returns a requests.Response
object.
If the response status is not 2xx, raises a ConjurException
.
url
is the full url to request.
Keyword arguments are passed through to requests.delete
.
def delete(self, url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated `DELETE` request to the given `url`
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`url` is the full url to request.
Keyword arguments are passed through to `requests.delete`.
"""
return self.request('delete', url, **kwargs)
def get(
self, url, **kwargs)
NOTE You will generally not need to use this method directly.
Makes an authenticated GET request to the given :url:.
Returns a requests.Response object.
If the response status is not 2xx, raises a ConjurException.
url
is the full url to request.
Keyword arguments are passed through to requests.get
.
def get(self, url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated GET request to the given :url:.
Returns a requests.Response object.
If the response status is not 2xx, raises a ConjurException.
`url` is the full url to request.
Keyword arguments are passed through to `requests.get`.
"""
return self.request('get', url, **kwargs)
def group(
self, id)
Return a Group
object with the given id.
This method neither creates nor checks for the groups's existence.
id
is the unqualified id of the group, and does not include the account or kind.
def group(self, id):
"""
Return a `conjur.Group` object with the given id.
This method neither creates nor checks for the groups's existence.
`id` is the *unqualified* id of the group, and does not include the account or kind.
"""
return Group(self, id)
def host(
self, host_id)
Return a Host
object with the given host_id
.
This method neither creates nor checks for the host's existence.
def host(self, host_id):
"""
Return a `conjur.Host` object with the given `host_id`.
This method neither creates nor checks for the host's existence.
"""
return Host(self, host_id)
def layer(
self, layer_id)
Return a Layer
object with the given layer_id
.
This method neither creates nor checks for the layer's existence.
def layer(self, layer_id):
"""
Return a `conjur.Layer` object with the given `layer_id`.
This method neither creates nor checks for the layer's existence.
"""
return Layer(self, layer_id)
def post(
self, url, **kwargs)
NOTE You will generally not need to use this method directly.
Makes an authenticated POST
request to the given url
Returns a requests.Response
object.
If the response status is not 2xx, raises a ConjurException
.
url
is the full url to request.
Keyword arguments are passed through to requests.post
.
def post(self, url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated `POST` request to the given `url`
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`url` is the full url to request.
Keyword arguments are passed through to `requests.post`.
"""
return self.request('post', url, **kwargs)
def public_key(
self, username, keyname)
Return the contents of a specific public key given by keyname
,
for the user given by username
as a string.
The name of the key is based on the name entry of the openssh formatted key that was uploaded.
For example, if they key contents are ssh-rsa <data...> bob@example.com
,
the keyname
should be bob@example.com
def public_key(self, username, keyname):
"""
Return the contents of a specific public key given by `keyname`,
for the user given by `username` as a string.
The name of the key is based on the name entry of the openssh formatted key that was uploaded.
For example, if they key contents are `ssh-rsa bob@example.com`,
the `keyname` should be `bob@example.com`
"""
return self.get(self._public_key_url(username, keyname)).text
def public_key_names(
self, username)
Return the names of public keys for the user given by username
.
The names of the keys are based on the name entry of the openssh formatted key that was uploaded.
For example, if they key contents are ssh-rsa <data...> bob@example.com
,
the keyname
should be bob@example.com
def public_key_names(self, username):
"""
Return the names of public keys for the user given by `username`.
The names of the keys are based on the name entry of the openssh formatted key that was uploaded.
For example, if they key contents are `ssh-rsa bob@example.com`,
the `keyname` should be `bob@example.com`
"""
return [k.split(' ')[-1] for k in self.public_keys(username).split('\n')]
def public_keys(
self, username)
Returns all keys for the user given by username
, as a newline delimited string.
The odd format is chosen to support the Conjur SSH login implementation.
def public_keys(self, username):
"""
Returns all keys for the user given by `username`, as a newline delimited string.
The odd format is chosen to support the Conjur SSH login implementation.
"""
return self.get(self._public_key_url(username)).text
def put(
self, url, **kwargs)
NOTE You will generally not need to use this method directly.
Makes an authenticated PUT
request to the given url
Returns a requests.Response
object.
If the response status is not 2xx, raises a ConjurException
.
url
is the full url to request.
Keyword arguments are passed through to requests.put
.
def put(self,url, **kwargs):
"""
**NOTE** You will generally not need to use this method directly.
Makes an authenticated `PUT` request to the given `url`
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`url` is the full url to request.
Keyword arguments are passed through to `requests.put`.
"""
return self.request('put', url, **kwargs)
def remove_public_key(
self, username, keyname)
Remove a specific public key for the user identified by username
.
The keyname
argument refers to the name field in the openssh formatted key
to be deleted.
For example, if they key contents are ssh-rsa <data...> bob@example.com
,
the keyname
should be bob@example.com
def remove_public_key(self, username, keyname):
"""
Remove a specific public key for the user identified by `username`.
The `keyname` argument refers to the name field in the openssh formatted key
to be deleted.
For example, if they key contents are `ssh-rsa bob@example.com`,
the `keyname` should be `bob@example.com`
"""
self.delete(self._public_key_url(username, keyname))
def remove_public_keys(
self, username)
Remove all public keys for the user represented by username
.
def remove_public_keys(self, username):
"""
Remove all public keys for the user represented by `username`.
"""
for keyname in self.public_key_names(username):
self.remove_public_key(username, keyname)
def request(
self, method, url, **kwargs)
Make an authenticated request with the given method and url.
Additional arguments are passed to requests.
Returns a requests.Response
object.
If the response status is not 2xx, raises a ConjurException
.
method
is of the standard HTTP verbs (case insensitive).
url
is the full url to request.
If kwargs['check_errors']
is True
, non-2xx responses will raise a ConjurException
.
Otherwise, it is the callers responsibility to check the status of the returned requests.Response
.
Additional are passed through to the requests.Authorization
header and HTTPS verification settings.
def request(self, method, url, **kwargs):
"""
Make an authenticated request with the given method and url.
Additional arguments are passed to requests..
Returns a `requests.Response` object.
If the response status is not 2xx, raises a `conjur.ConjurException`.
`method` is of the standard HTTP verbs (case insensitive).
`url` is the full url to request.
If `kwargs['check_errors']` is `True`, non-2xx responses will raise a `conjur.ConjurException`.
Otherwise, it is the callers responsibility to check the status of the returned `requests.Response`.
Additional are passed through to the requests. call after adding an `Authorization`
header and HTTPS verification settings.
"""
headers = kwargs.setdefault('headers', {})
headers['Authorization'] = self.auth_header()
return self._request(method, url, **kwargs)
def resource(
self, kind, identifier)
Return a Resource
object with the given kind and id.
This method neither creates nor checks for the resource's existence.
kind
should be the resource kind - for example, "variable"
, "webservice"
,
or "configuration"
.
identifier
should be the unqualified Conjur id. For example, to
get the resource for a user variable named db_password, you would call
api.resource('variable', 'db_password')
.
def resource(self, kind, identifier):
"""
Return a `conjur.Resource` object with the given kind and id.
This method neither creates nor checks for the resource's existence.
`kind` should be the resource kind - for example, `"variable"`, `"webservice"`,
or `"configuration"`.
`identifier` should be the *unqualified* Conjur id. For example, to
get the resource for a user variable named db_password, you would call
`api.resource('variable', 'db_password')`.
"""
return Resource(self, kind, identifier)
def role(
self, kind, identifier)
Return a Role
object with the given kind and id.
This method neither creates nor checks for the roles's existence.
kind
should be the role kind - for example, "group"
, "user"
,
or "host"
.
identifier
should be the unqualified Conjur id. For example, to
get the role for a user named bub, you would call api.role('user', 'bub')
.
def role(self, kind, identifier):
"""
Return a `conjur.Role` object with the given kind and id.
This method neither creates nor checks for the roles's existence.
`kind` should be the role kind - for example, `"group"`, `"user"`,
or `"host"`.
`identifier` should be the *unqualified* Conjur id. For example, to
get the role for a user named bub, you would call `api.role('user', 'bub')`.
"""
return Role(self, kind, identifier)
def user(
self, login)
Returns an object representing a Conjur user with the given login.
The user is not created by this method, and may in fact not exist.
def user(self, login):
"""
Returns an object representing a Conjur user with the given login.
The user is *not* created by this method, and may in fact not exist.
"""
return User(self, login)
def variable(
self, id)
Return a Variable
object with the given id
.
This method neither creates nor checks for the variable's existence.
def variable(self, id):
"""
Return a `conjur.Variable` object with the given `id`.
This method neither creates nor checks for the variable's existence.
"""
return Variable(self, id)
class Config
class Config(object):
def __init__(self, **kwargs):
self._config = {}
self.update(kwargs)
def load(self, input):
import yaml
if isinstance(input, str):
input = open(input, 'r')
conf = yaml.safe_load(input)
self.update(conf)
def update(self, *dicts, **kwargs):
for d in dicts + (kwargs, ):
self._config.update(d)
def service_url(self, service, per_account=True):
key = '%s_url' % service
if key in self._config:
return self._config[key]
if self.appliance_url is not None:
url_parts = [self.appliance_url]
if service != "core":
url_parts.append(service)
return "/".join(url_parts)
else:
raise ConfigException('Missing appliance_url')
def get(self, key, default=_DEFAULT):
if key in self._config:
return self._config[key]
env_key = 'CONJUR_' + key.upper()
if env_key in os.environ:
value = os.environ[env_key]
self._config[key] = value
return value
if default is _DEFAULT:
raise Exception("config setting %s is required" % key)
return default
def set(self, key, value):
self._config[key] = value
authn_url = _service_url('authn', doc='URL for the authn service')
core_url = _service_url('core', doc='URL for the core service')
authz_url = _service_url('authz',
per_account=False,
doc='URL for the authz service')
pubkeys_url = _service_url('pubkeys', doc='URL for the pubkeys service')
cert_file = _setting('cert_file', None,
"Path to certificate to verify ssl requests \
to appliance")
account = _setting('account', 'conjur', 'Conjur account identifier')
appliance_url = _setting('appliance_url', None, 'URL for Conjur appliance')
@property
def verify(self):
'''
Argument to be passed to `requests` methods `verify` keyword argument.
'''
if self.cert_file is not None:
return self.cert_file
else:
return True
Ancestors (in MRO)
Class variables
var account
var appliance_url
var authn_url
var authz_url
var cert_file
var core_url
var pubkeys_url
Instance variables
var account
Conjur account identifier
var appliance_url
URL for Conjur appliance
var authn_url
URL for the authn service
var authz_url
URL for the authz service
var cert_file
Path to certificate to verify ssl requests to appliance
var core_url
URL for the core service
var pubkeys_url
URL for the pubkeys service
var verify
Argument to be passed to requests
methods verify
keyword argument.
Methods
def __init__(
self, **kwargs)
def __init__(self, **kwargs):
self._config = {}
self.update(kwargs)
def get(
self, key, default=<object object at 0x7f42b7b900e0>)
def get(self, key, default=_DEFAULT):
if key in self._config:
return self._config[key]
env_key = 'CONJUR_' + key.upper()
if env_key in os.environ:
value = os.environ[env_key]
self._config[key] = value
return value
if default is _DEFAULT:
raise Exception("config setting %s is required" % key)
return default
def load(
self, input)
def load(self, input):
import yaml
if isinstance(input, str):
input = open(input, 'r')
conf = yaml.safe_load(input)
self.update(conf)
def service_url(
self, service, per_account=True)
def service_url(self, service, per_account=True):
key = '%s_url' % service
if key in self._config:
return self._config[key]
if self.appliance_url is not None:
url_parts = [self.appliance_url]
if service != "core":
url_parts.append(service)
return "/".join(url_parts)
else:
raise ConfigException('Missing appliance_url')
def set(
self, key, value)
def set(self, key, value):
self._config[key] = value
def update(
self, *dicts, **kwargs)
def update(self, *dicts, **kwargs):
for d in dicts + (kwargs, ):
self._config.update(d)
class ConjurException
class ConjurException(Exception):
pass
Ancestors (in MRO)
Class variables
var args
var message
class Group
Represents a Conjur group.
Generally you won't create instances of this class, but use the conjur.API.group(id)
method.
A group is a role that contains other roles, typically users and other groups. A Group
object can list its members with the members
method, and also manage them with the add_member
and remove_member
methods.
class Group(object):
"""
Represents a Conjur [group](https://developer.conjur.net/reference/services/directory/group).
Generally you won't create instances of this class, but use the `conjur.API.group(id)` method.
A group is a role that contains other roles, typically users and other groups. A `conjur.Group`
object can list its members with the `members` method, and also manage them with the `add_member`
and `remove_member` methods.
"""
def __init__(self, api, id):
self.api = api
"""
Instance of `conjur.API` used to implement Conjur operations.
"""
self.id = id
"""
Identifier (unqualified) of the group.
"""
self.role = api.role('group', id)
"""
Represents the `conjur.Role` associated with this group.
"""
def members(self):
"""
Return a list of members of this group. Members are returned as `dict`s
with the following keys:
* `'member'` the fully qualified identifier of the group
* `'role'` the fully qualified identifier of the group (redundant)
* `'grantor'` the role that granted the membership
* `'admin_option'` whether this member can grant membership in the group to other roles.
Example: print member ids (fully qualified) and whether they are admins of the group.
>>> group = api.group('security_admin')
>>> for member in group.members():
... print('{} is a member of security_admin ({} admin option)'.format(
... member['member'],
... 'with' if member['admin_option'] else 'without'
... ))
"""
return self.role.members()
def add_member(self, member, admin=False):
"""
Add a member to this group.
`member` is the member we want to add to the group, and should be a qualified Conjur id,
or an object with a `role` attribute or a `roleid` method. Examples of such objects
include `conjur.User`, `conjur.Role`, and `conjur.Group`.
If `admin` is True, the member will be allowed to add other members to this group.
"""
self.role.grant_to(member, admin)
def remove_member(self, member):
"""
Remove a member from the group.
`member` is the member to remove, and should be a qualified Conjur id,
or an object with a `role` attribute or a `roleid` method. Examples of such objects
include `conjur.User`, `conjur.Role`, and `conjur.Group`.
"""
self.role.revoke_from(member)
Ancestors (in MRO)
Instance variables
var id
Identifier (unqualified) of the group.
Methods
def __init__(
self, api, id)
def __init__(self, api, id):
self.api = api
"""
Instance of `conjur.API` used to implement Conjur operations.
"""
self.id = id
"""
Identifier (unqualified) of the group.
"""
self.role = api.role('group', id)
"""
Represents the `conjur.Role` associated with this group.
"""
def add_member(
self, member, admin=False)
Add a member to this group.
member
is the member we want to add to the group, and should be a qualified Conjur id,
or an object with a role
attribute or a roleid
method. Examples of such objects
include User
, Role
, and Group
.
If admin
is True, the member will be allowed to add other members to this group.
def add_member(self, member, admin=False):
"""
Add a member to this group.
`member` is the member we want to add to the group, and should be a qualified Conjur id,
or an object with a `role` attribute or a `roleid` method. Examples of such objects
include `conjur.User`, `conjur.Role`, and `conjur.Group`.
If `admin` is True, the member will be allowed to add other members to this group.
"""
self.role.grant_to(member, admin)
def members(
self)
Return a list of members of this group. Members are returned as dict
s
with the following keys:
'member'
the fully qualified identifier of the group'role'
the fully qualified identifier of the group (redundant)'grantor'
the role that granted the membership'admin_option'
whether this member can grant membership in the group to other roles.
Example: print member ids (fully qualified) and whether they are admins of the group. >>> group = api.group('security_admin') >>> for member in group.members(): ... print('{} is a member of security_admin ({} admin option)'.format( ... member['member'], ... 'with' if member['admin_option'] else 'without' ... ))
def members(self):
"""
Return a list of members of this group. Members are returned as `dict`s
with the following keys:
* `'member'` the fully qualified identifier of the group
* `'role'` the fully qualified identifier of the group (redundant)
* `'grantor'` the role that granted the membership
* `'admin_option'` whether this member can grant membership in the group to other roles.
Example: print member ids (fully qualified) and whether they are admins of the group.
>>> group = api.group('security_admin')
>>> for member in group.members():
... print('{} is a member of security_admin ({} admin option)'.format(
... member['member'],
... 'with' if member['admin_option'] else 'without'
... ))
"""
return self.role.members()
def remove_member(
self, member)
Remove a member from the group.
member
is the member to remove, and should be a qualified Conjur id,
or an object with a role
attribute or a roleid
method. Examples of such objects
include User
, Role
, and Group
.
def remove_member(self, member):
"""
Remove a member from the group.
`member` is the member to remove, and should be a qualified Conjur id,
or an object with a `role` attribute or a `roleid` method. Examples of such objects
include `conjur.User`, `conjur.Role`, and `conjur.Group`.
"""
self.role.revoke_from(member)
class Host
A Conjur Host
is a role corresponding to a machine or machine identity.
The Host
class provides the ability to check for existence and read attributes of the
host.
Attributes (such as the ownerid
) are fetched lazily.
Newly created hosts, as returned by create_host
, have an api_key
attribute,
but existing hosts retrieved with host
or the constructor of this class do not
have one.
Example:
>>> # Create a host and save it's api key to a file.
>>> host = api.create_host('jenkins')
>>> api_key = host.api_key
>>> with open('/etc/conjur.identity') as f:
... f.write(api_key)
Example:
>>> # See if a host named `jenkins` exists:
>>> if api.host('jenkins').exists():
... print("Host 'jenkins' exists")
... else:
... print("Host 'jenkins' does not exist")
class Host(object):
"""
A Conjur `Host` is a role corresponding to a machine or machine identity.
The `Host` class provides the ability to check for existence and read attributes of the
host.
Attributes (such as the `ownerid`) are fetched lazily.
Newly created hosts, as returned by `conjur.API.create_host`, have an `api_key` attribute,
but existing hosts retrieved with `conjur.API.host` or the constructor of this class *do not*
have one.
Example:
>>> # Create a host and save it's api key to a file.
>>> host = api.create_host('jenkins')
>>> api_key = host.api_key
>>> with open('/etc/conjur.identity') as f:
... f.write(api_key)
Example:
>>> # See if a host named `jenkins` exists:
>>> if api.host('jenkins').exists():
... print("Host 'jenkins' exists")
... else:
... print("Host 'jenkins' does not exist")
"""
def __init__(self, api, id, attrs=None):
self.api = api
self.id = id
self._attrs = attrs
self.role = self.api.role('host', self.id)
def exists(self):
"""
Return `True` if this host exists.
"""
status = self.api.get(self._url(), check_errors=False).status_code
if status == 200:
return True
if status == 404:
return False
raise ConjurException("Request Failed: {0}".format(status))
def _fetch(self):
self._attrs = self.api.get(self._url()).json()
def _url(self):
return "{0}/hosts/{1}".format(self.api.config.core_url,
urlescape(self.id))
def __getattr__(self, item):
if self._attrs is None:
self._fetch()
try:
return self._attrs[item]
except KeyError:
raise AttributeError(item)
Ancestors (in MRO)
Instance variables
var api
var id
var role
Methods
def __init__(
self, api, id, attrs=None)
def __init__(self, api, id, attrs=None):
self.api = api
self.id = id
self._attrs = attrs
self.role = self.api.role('host', self.id)
def exists(
self)
Return True
if this host exists.
def exists(self):
"""
Return `True` if this host exists.
"""
status = self.api.get(self._url(), check_errors=False).status_code
if status == 200:
return True
if status == 404:
return False
raise ConjurException("Request Failed: {0}".format(status))
class Layer
class Layer(object):
def __init__(self, api, id, attrs=None):
self.api = api
self.id = id
self._attrs = {} if attrs is None else attrs
def add_host(self, host):
hostid = authzid(host, 'role', with_account=False)
self.api.post(self._hosts_url(), data={'hostid': hostid})
def remove_host(self, host):
hostid = authzid(host, 'role')
self.api.delete(self._host_url(hostid))
def exists(self):
resp = self.api.get(self._url(), check_errors=False)
if resp.status_code == 200:
return True
if resp.status_code == 404:
return False
raise ConjurException("Request Failed: {0}".format(resp.status_code))
def _url(self):
return "{0}/layers/{1}".format(self.api.config.core_url,
urlescape(self.id))
def _hosts_url(self):
return "{0}/hosts".format(self._url())
def _host_url(self, host_id):
return "{0}/{1}".format(self._hosts_url(), urlescape(host_id))
def _fetch(self):
self._attrs = self.api.get(self._url()).json()
def __getattr__(self, item):
if self._attrs is None:
self._fetch()
try:
return self._attrs[item]
except KeyError:
raise AttributeError(item)
Ancestors (in MRO)
Instance variables
var api
var id
Methods
def __init__(
self, api, id, attrs=None)
def __init__(self, api, id, attrs=None):
self.api = api
self.id = id
self._attrs = {} if attrs is None else attrs
def add_host(
self, host)
def add_host(self, host):
hostid = authzid(host, 'role', with_account=False)
self.api.post(self._hosts_url(), data={'hostid': hostid})
def exists(
self)
def exists(self):
resp = self.api.get(self._url(), check_errors=False)
if resp.status_code == 200:
return True
if resp.status_code == 404:
return False
raise ConjurException("Request Failed: {0}".format(resp.status_code))
def remove_host(
self, host)
def remove_host(self, host):
hostid = authzid(host, 'role')
self.api.delete(self._host_url(hostid))
class Resource
A Resource
represents an object on which Role
s can
be permitted to perform certain actions.
Generally you will not construct these directly, but call the role
method
to do so.
class Resource(object):
"""
A `Resource` represents an object on which `Role`s can
be permitted to perform certain actions.
Generally you will not construct these directly, but call the `conjur.API.role` method
to do so.
"""
def __init__(self, api, kind, identifier):
self.api = api
self.kind = kind
self.identifier = identifier
@property
def resourceid(self):
"""
The fully qualified resource id as a string, like `'the-account:variable:db-password`.
"""
return ":".join([self.api.config.account, self.kind, self.identifier])
def permit(self, role, privilege, grant_option=False):
"""
Permit `role` to perform `privilege` on this resource.
`role` is a qualified conjur identifier (e.g. `'user:alice`') or an object
with a `role` attribute or `roleid` method, such as a `conjur.User` or
`conjur.Group`.
If `grant_option` is True, the role will be able to grant this
permission to other resources.
You must own the resource or have the permission with `grant_option`
to call this method.
"""
data = {}
params = {
'permit': 'true',
'privilege': privilege,
'role': authzid(role, 'role')
}
if grant_option:
data['grant_option'] = 'true'
self.api.post(self.url(), data=data, params=params)
def deny(self, role, privilege):
"""
Deny `role` permission to perform `privilege` on this resource.
You must own the resource or have the permission with `grant_option`
on it to call this method.
"""
params = {
'permit': 'true',
'privilege': privilege,
'role': authzid(role)
}
self.api.post(self.url(), params=params)
def permitted(self, privilege, role=None):
"""
Return True if `role` has `privilege` on this resource.
`role` may be a `conjur.Role` instance, an object with a `role` method,
or a qualified role id as a string.
If `role` is not given, check the permission for the currently
authenticated role.
Example: Check that the currently authenticated role is allowed to `'read'`
a resource.
>>> service = api.resource('service', 'gateway')
>>> if service.permitted('read'):
... print("I can read 'service:gateway'")
... else:
... print("I cannot read 'service:gateway'")
Example: Check whether members of group 'security_admin' can 'update'
a resource.
>>> service = api.resource('service', 'gateway')
>>> security_admin = api.group('security_admin')
>>> if service.permitted('update', security_admin):
... print("security_admin members can update service:gateway")
>>> else:
... print("security_admin members cannot update service:gateway")
"""
if role is None:
# Handle self role check
response = self.api.get(self.url(),
params={'check': 'true',
'privilege': privilege},
check_errors=False)
if response.status_code == 204:
return True
elif response.status_code in (404, 403):
return False
else:
raise ConjurException("Request failed: %d" % response.status_code)
else:
# Otherwise call role.is_permitted
return Role.from_roleid(self.api, role).is_permitted(self, privilege)
def url(self):
"""
Internal method to return a url for this object as a string.
"""
return "/".join([
self.api.config.authz_url,
self.api.config.account,
'resources',
self.kind,
self.identifier
])
Ancestors (in MRO)
Instance variables
var api
var identifier
var kind
var resourceid
The fully qualified resource id as a string, like 'the-account:variable:db-password
.
Methods
def __init__(
self, api, kind, identifier)
def __init__(self, api, kind, identifier):
self.api = api
self.kind = kind
self.identifier = identifier
def deny(
self, role, privilege)
Deny role
permission to perform privilege
on this resource.
You must own the resource or have the permission with grant_option
on it to call this method.
def deny(self, role, privilege):
"""
Deny `role` permission to perform `privilege` on this resource.
You must own the resource or have the permission with `grant_option`
on it to call this method.
"""
params = {
'permit': 'true',
'privilege': privilege,
'role': authzid(role)
}
self.api.post(self.url(), params=params)
def permit(
self, role, privilege, grant_option=False)
Permit role
to perform privilege
on this resource.
role
is a qualified conjur identifier (e.g. 'user:alice
') or an object
with a role
attribute or roleid
method, such as a User
or
Group
.
If grant_option
is True, the role will be able to grant this
permission to other resources.
You must own the resource or have the permission with grant_option
to call this method.
def permit(self, role, privilege, grant_option=False):
"""
Permit `role` to perform `privilege` on this resource.
`role` is a qualified conjur identifier (e.g. `'user:alice`') or an object
with a `role` attribute or `roleid` method, such as a `conjur.User` or
`conjur.Group`.
If `grant_option` is True, the role will be able to grant this
permission to other resources.
You must own the resource or have the permission with `grant_option`
to call this method.
"""
data = {}
params = {
'permit': 'true',
'privilege': privilege,
'role': authzid(role, 'role')
}
if grant_option:
data['grant_option'] = 'true'
self.api.post(self.url(), data=data, params=params)
def permitted(
self, privilege, role=None)
Return True if role
has privilege
on this resource.
role
may be a Role
instance, an object with a role
method,
or a qualified role id as a string.
If role
is not given, check the permission for the currently
authenticated role.
Example: Check that the currently authenticated role is allowed to 'read'
a resource.
>>> service = api.resource('service', 'gateway')
>>> if service.permitted('read'):
... print("I can read 'service:gateway'")
... else:
... print("I cannot read 'service:gateway'")
Example: Check whether members of group 'security_admin' can 'update' a resource.
>>> service = api.resource('service', 'gateway')
>>> security_admin = api.group('security_admin')
>>> if service.permitted('update', security_admin):
... print("security_admin members can update service:gateway")
>>> else:
... print("security_admin members cannot update service:gateway")
def permitted(self, privilege, role=None):
"""
Return True if `role` has `privilege` on this resource.
`role` may be a `conjur.Role` instance, an object with a `role` method,
or a qualified role id as a string.
If `role` is not given, check the permission for the currently
authenticated role.
Example: Check that the currently authenticated role is allowed to `'read'`
a resource.
>>> service = api.resource('service', 'gateway')
>>> if service.permitted('read'):
... print("I can read 'service:gateway'")
... else:
... print("I cannot read 'service:gateway'")
Example: Check whether members of group 'security_admin' can 'update'
a resource.
>>> service = api.resource('service', 'gateway')
>>> security_admin = api.group('security_admin')
>>> if service.permitted('update', security_admin):
... print("security_admin members can update service:gateway")
>>> else:
... print("security_admin members cannot update service:gateway")
"""
if role is None:
# Handle self role check
response = self.api.get(self.url(),
params={'check': 'true',
'privilege': privilege},
check_errors=False)
if response.status_code == 204:
return True
elif response.status_code in (404, 403):
return False
else:
raise ConjurException("Request failed: %d" % response.status_code)
else:
# Otherwise call role.is_permitted
return Role.from_roleid(self.api, role).is_permitted(self, privilege)
def url(
self)
Internal method to return a url for this object as a string.
def url(self):
"""
Internal method to return a url for this object as a string.
"""
return "/".join([
self.api.config.authz_url,
self.api.config.account,
'resources',
self.kind,
self.identifier
])
class Role
Represents a Conjur role.
An instance of this class does not know whether the role in question exists.
Generally you should create instances of this class through the role
method,
or the Role.from_roleid
classmethod.
Roles can provide information about their members and can check whether the role they represent is allowed to perform certain operations on resources.
User
and Group
objects have role
members that reference the role corresponding
to that Conjur asset.
class Role(object):
"""
Represents a Conjur [role](https://developer.conjur.net/key_concepts/rbac.html#rbac-roles).
An instance of this class does not know whether the role in question exists.
Generally you should create instances of this class through the `conjur.API.role` method,
or the `Role.from_roleid` classmethod.
Roles can provide information about their members and can check whether the role they represent
is allowed to perform certain operations on resources.
`conjur.User` and `conjur.Group` objects have `role` members that reference the role corresponding
to that Conjur asset.
"""
def __init__(self, api, kind, identifier):
"""
Create a role to represent the Conjur role with id `:`. For
example, to represent the role associated with a user named bob,
role = Role(api, 'user', 'bob')
`api` must be a `conjur.API` instance, used to implement this classes interactions with Conjur
`kind` is a string giving the role kind
`identifier` is the unqualified identifier of the role.
"""
self.api = api
"""
The `conjur.API` instance used to implement our methods.
"""
self.kind = kind
"""
The `kind` portion of the role's id.
"""
self.identifier = identifier
"""
The `identifier` portion of the role's id.
"""
@classmethod
def from_roleid(cls, api, roleid):
"""
Creates an instance of `conjur.Role` from a full role id string.
`api` is an instance of `conjur.API`
`roleid` is a fully or partially qualified Conjur identifier, for example,
`"the-account:service:some-service"` or `"service:some-service"` resolve to the same role.
"""
tokens = authzid(roleid, 'role').split(':', 3)
if len(tokens) == 3:
tokens.pop(0)
return cls(api, *tokens)
@property
def roleid(self):
"""
Return the full role id as a string.
Example:
>>> role = api.role('user', 'bob')
>>> role.roleid
'the-account:user:bob'
"""
return ':'.join([self.api.config.account, self.kind, self.identifier])
def is_permitted(self, resource, privilege):
"""
Check whether `resource` has `privilege` on this role.
`resource` is a qualified identifier for the resource.
`privilege` is a string like `"update"` or `"execute"`.
Example:
>>> role = api.role('user', 'alice')
>>> if role.is_permitted('variable:db-password', 'execute'):
... print("Alice can fetch the database password")
... else:
... print("Alice cannot fetch the database password")
"""
params = {
'check': 'true',
'resource_id': authzid(resource, 'resource'),
'privilege': privilege
}
response = self.api.get(self._url(), params=params,
check_errors=False)
if response.status_code == 204:
return True
elif response.status_code in (403, 404):
return False
else:
raise ConjurException("Request failed: %d" % response.status_code)
def grant_to(self, member, admin=None):
"""
Grant this role to `member`.
`member` is a string or object with a `role` attribute or `roleid` method,
such as a `conjur.User` or `conjur.Group`.
`admin` whether the member can grant this role to others.
"""
data = {}
if admin is not None:
data['admin'] = 'true' if admin else 'false'
self.api.put(self._membership_url(member), data=data)
def revoke_from(self, member):
"""
The inverse of `conjur.Role.grant_to`. Removes `member` from the members of this
role.
`member` is a string or object with a `role` attribute or `roleid` method,
such as a `conjur.User` or `conjur.Group`.
"""
self.api.delete(self._membership_url(member))
def members(self):
"""
Return a list of members of this role. Members are returned as `dict`s
with the following keys:
* `'member'` the fully qualified identifier of the group
* `'role'` the fully qualified identifier of the group (redundant)
* `'grantor'` the role that granted the membership
* `'admin_option'` whether this member can grant membership in the group to other roles.
"""
return self.api.get(self._membership_url()).json()
def _membership_url(self, member=None):
url = self._url() + "?members"
if member is not None:
memberid = authzid(member, 'role')
url += "&member=" + urlescape(memberid)
return url
def _url(self, *args):
return "/".join([self.api.config.authz_url,
self.api.config.account,
'roles',
self.kind,
self.identifier] + list(args))
Ancestors (in MRO)
Instance variables
var identifier
The identifier
portion of the role's id.
var kind
The kind
portion of the role's id.
var roleid
Return the full role id as a string.
Example:
>>> role = api.role('user', 'bob')
>>> role.roleid
'the-account:user:bob'
Methods
def __init__(
self, api, kind, identifier)
Create a role to represent the Conjur role with id <kind>:<identifier>
. For
example, to represent the role associated with a user named bob,
role = Role(api, 'user', 'bob')
api
must be a API
instance, used to implement this classes interactions with Conjur
kind
is a string giving the role kind
identifier
is the unqualified identifier of the role.
def __init__(self, api, kind, identifier):
"""
Create a role to represent the Conjur role with id `:`. For
example, to represent the role associated with a user named bob,
role = Role(api, 'user', 'bob')
`api` must be a `conjur.API` instance, used to implement this classes interactions with Conjur
`kind` is a string giving the role kind
`identifier` is the unqualified identifier of the role.
"""
self.api = api
"""
The `conjur.API` instance used to implement our methods.
"""
self.kind = kind
"""
The `kind` portion of the role's id.
"""
self.identifier = identifier
"""
The `identifier` portion of the role's id.
"""
def from_roleid(
cls, api, roleid)
Creates an instance of Role
from a full role id string.
api
is an instance of API
roleid
is a fully or partially qualified Conjur identifier, for example,
"the-account:service:some-service"
or "service:some-service"
resolve to the same role.
@classmethod
def from_roleid(cls, api, roleid):
"""
Creates an instance of `conjur.Role` from a full role id string.
`api` is an instance of `conjur.API`
`roleid` is a fully or partially qualified Conjur identifier, for example,
`"the-account:service:some-service"` or `"service:some-service"` resolve to the same role.
"""
tokens = authzid(roleid, 'role').split(':', 3)
if len(tokens) == 3:
tokens.pop(0)
return cls(api, *tokens)
def grant_to(
self, member, admin=None)
Grant this role to member
.
member
is a string or object with a role
attribute or roleid
method,
such as a User
or Group
.
admin
whether the member can grant this role to others.
def grant_to(self, member, admin=None):
"""
Grant this role to `member`.
`member` is a string or object with a `role` attribute or `roleid` method,
such as a `conjur.User` or `conjur.Group`.
`admin` whether the member can grant this role to others.
"""
data = {}
if admin is not None:
data['admin'] = 'true' if admin else 'false'
self.api.put(self._membership_url(member), data=data)
def is_permitted(
self, resource, privilege)
Check whether resource
has privilege
on this role.
resource
is a qualified identifier for the resource.
privilege
is a string like "update"
or "execute"
.
Example:
>>> role = api.role('user', 'alice')
>>> if role.is_permitted('variable:db-password', 'execute'):
... print("Alice can fetch the database password")
... else:
... print("Alice cannot fetch the database password")
def is_permitted(self, resource, privilege):
"""
Check whether `resource` has `privilege` on this role.
`resource` is a qualified identifier for the resource.
`privilege` is a string like `"update"` or `"execute"`.
Example:
>>> role = api.role('user', 'alice')
>>> if role.is_permitted('variable:db-password', 'execute'):
... print("Alice can fetch the database password")
... else:
... print("Alice cannot fetch the database password")
"""
params = {
'check': 'true',
'resource_id': authzid(resource, 'resource'),
'privilege': privilege
}
response = self.api.get(self._url(), params=params,
check_errors=False)
if response.status_code == 204:
return True
elif response.status_code in (403, 404):
return False
else:
raise ConjurException("Request failed: %d" % response.status_code)
def members(
self)
Return a list of members of this role. Members are returned as dict
s
with the following keys:
'member'
the fully qualified identifier of the group'role'
the fully qualified identifier of the group (redundant)'grantor'
the role that granted the membership'admin_option'
whether this member can grant membership in the group to other roles.
def members(self):
"""
Return a list of members of this role. Members are returned as `dict`s
with the following keys:
* `'member'` the fully qualified identifier of the group
* `'role'` the fully qualified identifier of the group (redundant)
* `'grantor'` the role that granted the membership
* `'admin_option'` whether this member can grant membership in the group to other roles.
"""
return self.api.get(self._membership_url()).json()
def revoke_from(
self, member)
The inverse of grant_to
. Removes member
from the members of this
role.
member
is a string or object with a role
attribute or roleid
method,
such as a User
or Group
.
def revoke_from(self, member):
"""
The inverse of `conjur.Role.grant_to`. Removes `member` from the members of this
role.
`member` is a string or object with a `role` attribute or `roleid` method,
such as a `conjur.User` or `conjur.Group`.
"""
self.api.delete(self._membership_url(member))
class User
class User(object):
def __init__(self, api, login, attrs=None):
self.api = api
self.login = login
# support as_role
self.role = api.role('user', login)
self._attrs = attrs
def exists(self):
resp = self.api.get(self.url(), check_errors=False)
return resp.status_code != 404
def __getattr__(self, item):
if self._attrs is None:
self._fetch()
try:
return self._attrs[item]
except KeyError:
raise AttributeError(item)
def _fetch(self):
self._attrs = self.api.get(self.url()).json()
def url(self):
return "{0}/users/{1}".format(self.api.config.core_url,
urlescape(self.login))
Ancestors (in MRO)
Instance variables
var api
var login
var role
Methods
def __init__(
self, api, login, attrs=None)
def __init__(self, api, login, attrs=None):
self.api = api
self.login = login
# support as_role
self.role = api.role('user', login)
self._attrs = attrs
def exists(
self)
def exists(self):
resp = self.api.get(self.url(), check_errors=False)
return resp.status_code != 404
def url(
self)
def url(self):
return "{0}/users/{1}".format(self.api.config.core_url,
urlescape(self.login))
class Variable
A Variable
represents a versioned secret stored in Conjur.
Generally you will get an instance of this class by calling create_variable
or variable
.
Instances of this class allow you to fetch values of the variable, and store new ones.
Example:
>>> # Print the current value of the variable `mysql-password`
>>> variable = api.variable('mysql-password')
>>> print("mysql-password is {}".format(variable.value()))
Example:
>>> # Print all versions of the same variable
>>> variable = api.variable('mysql-password')
>>> for i in range(1, variable.version_count + 1): # version numbers are 1 based
... print("version {} of 'mysql-password' is {}".format(i, variable.value(i)))
class Variable(object):
"""
A `Variable` represents a versioned secret stored in Conjur.
Generally you will get an instance of this class by calling `conjur.API.create_variable`
or `conjur.API.variable`.
Instances of this class allow you to fetch values of the variable, and store new ones.
Example:
>>> # Print the current value of the variable `mysql-password`
>>> variable = api.variable('mysql-password')
>>> print("mysql-password is {}".format(variable.value()))
Example:
>>> # Print all versions of the same variable
>>> variable = api.variable('mysql-password')
>>> for i in range(1, variable.version_count + 1): # version numbers are 1 based
... print("version {} of 'mysql-password' is {}".format(i, variable.value(i)))
"""
def __init__(self, api, id, attrs=None):
self.id = id
self.api = api
self._attrs = attrs
def value(self, version=None):
"""
Retrieve the secret stored in a variable.
`version` is a *one based* index of the version to be retrieved.
If no such version exists, a 404 error is raised.
Returns the value of the variable as a string.
"""
url = "%s/variables/%s/value" % (self.api.config.core_url,
urlescape(self.id))
if version is not None:
url = "%s?version=%s" % (url, version)
return self.api.get(url).text
def add_value(self, value):
"""
Stores a new version of the secret in this variable.
`value` is a string giving the new value to store.
This increments the variable's `version_count` member by one.
"""
self._attrs = None
data = {'value': value}
url = "%s/variables/%s/values" % (self.api.config.core_url,
urlescape(self.id))
self.api.post(url, data=data)
def __getattr__(self, item):
if self._attrs is None:
self._fetch()
try:
return self._attrs[item]
except KeyError:
raise AttributeError(item)
def _fetch(self):
self._attrs = self.api.get(
"{0}/variables/{1}".format(self.api.config.core_url,
urlescape(self.id))
).json()
Ancestors (in MRO)
Instance variables
var api
var id
Methods
def __init__(
self, api, id, attrs=None)
def __init__(self, api, id, attrs=None):
self.id = id
self.api = api
self._attrs = attrs
def add_value(
self, value)
Stores a new version of the secret in this variable.
value
is a string giving the new value to store.
This increments the variable's version_count
member by one.
def add_value(self, value):
"""
Stores a new version of the secret in this variable.
`value` is a string giving the new value to store.
This increments the variable's `version_count` member by one.
"""
self._attrs = None
data = {'value': value}
url = "%s/variables/%s/values" % (self.api.config.core_url,
urlescape(self.id))
self.api.post(url, data=data)
def value(
self, version=None)
Retrieve the secret stored in a variable.
version
is a one based index of the version to be retrieved.
If no such version exists, a 404 error is raised.
Returns the value of the variable as a string.
def value(self, version=None):
"""
Retrieve the secret stored in a variable.
`version` is a *one based* index of the version to be retrieved.
If no such version exists, a 404 error is raised.
Returns the value of the variable as a string.
"""
url = "%s/variables/%s/value" % (self.api.config.core_url,
urlescape(self.id))
if version is not None:
url = "%s?version=%s" % (url, version)
return self.api.get(url).text