From 300692e35e36b21c27e568a0c6a169b8ea3d53a6 Mon Sep 17 00:00:00 2001 From: Juan Miguel Olmo Date: Fri, 15 Jun 2018 09:05:23 +0200 Subject: [PATCH 1/4] Add files via upload --- .../tests/test_sds_sync.py | 287 ++++++++++++++++++ 1 file changed, 287 insertions(+) diff --git a/tendrl/gluster_integration/tests/test_sds_sync.py b/tendrl/gluster_integration/tests/test_sds_sync.py index 088a304..e0900a7 100644 --- a/tendrl/gluster_integration/tests/test_sds_sync.py +++ b/tendrl/gluster_integration/tests/test_sds_sync.py @@ -1,12 +1,20 @@ import etcd import importlib + from mock import MagicMock +from mock import mock from mock import patch from tendrl.commons.objects import BaseObject from tendrl.commons.utils import etcd_utils from tendrl.commons.utils import event_utils +from tendrl.gluster_integration.sds_sync import check_peers +from tendrl.gluster_integration.sds_sync import check_volumes +from tendrl.gluster_integration.sds_sync import cluster_not_ready +from tendrl.gluster_integration.sds_sync import get_volume_alert_counts +from tendrl.gluster_integration.sds_sync import sync_by_provisioner + @patch.object(BaseObject, "save") @patch.object(BaseObject, "load_all") @@ -66,3 +74,282 @@ def test_brick_status_alert( sds_sync.brick_status_alert( "dhcp12-12.lab.abc.com" ) + + +@patch.object(BaseObject, "load_all") +def test_get_volume_alert_counts(load_all): + + # help objects + gluster_volume_a = NS.tendrl.objects.GlusterVolume( + integration_id="77deef29-b8e5-4dc5-8247-21e2a409a66a", + vol_id=1, + name='a' + ) + gluster_volume_b = NS.tendrl.objects.GlusterVolume( + integration_id="77deef29-b8e5-4dc5-8247-21e2a409a66a", + vol_id=2, + name='b' + ) + + # List returned with the information of all volumes + load_all.return_value = [gluster_volume_a, gluster_volume_b] + + # Check all alarms in all volumes are set to 0 + expected_result = {'a': {'vol_id': 1, 'alert_count': 0}, + 'b': {'vol_id': 2, 'alert_count': 0}} + + result = get_volume_alert_counts() + assert result == expected_result + + +def test_cluster_not_ready(): + + mock_cluster = MagicMock() + mock_cluster.current_job = {} + + # Scene 1: Cluster not ready - condition 1 + mock_cluster.status = "importing" + mock_cluster.current_job["status"] = "failed" + assert cluster_not_ready(mock_cluster) + + # Scene 2: Cluster not ready - condition 2 + mock_cluster.status = "unmanaging" + mock_cluster.current_job["status"] = "it does not matter" + assert cluster_not_ready(mock_cluster) + + # Scene 3: Cluster not ready - condition 3 + mock_cluster.status = "set_volume_profiling" + mock_cluster.current_job["status"] = "it does not matter" + assert cluster_not_ready(mock_cluster) + + # Scene 4: Cluster ready + mock_cluster.status = "other thing" + mock_cluster.current_job["status"] = "it does not matter" + assert not cluster_not_ready(mock_cluster) + + +@patch.object(BaseObject, "load") +@patch.object(BaseObject, "save") +@patch.object(event_utils, "emit_event") +def test_check_peers(mock_emit_event, + mock_baseobject_save, + mock_baseobject_load): + + state_raw_data = {"Peers": {"peer1.uuid": 1, + "peer1.primary_hostname": "p1", + "peer1.state": "the_state", + "peer1.connected": "Connected", + "peer2.uuid": 2, + "peer2.primary_hostname": "p2", + "peer2.state": "the_state", + "peer2.connected": "Connected"}} + sync_time = 1 + + mock_baseobject_save.return_value = True + + peer_mock = MagicMock() + mock_baseobject_load.return_value = peer_mock + + # Scene 1: No discordance between peer data retrieved from gluster and peer + # data retrieved from etcd + + peer_mock.connected = "Connected" + synctime = check_peers("test_cluster", state_raw_data, sync_time) + + # two peers checked + assert synctime == 11 + + # no problems detected + mock_emit_event.assert_not_called() + + # Scene 2. We have discordance between the different sources + # etcd values differs from gluster retrieved data + # But gluster info points that the peer is connected so INFO message should + # be delivered + peer_mock.connected = "Disconnected" # <--- read from etcd + mock_baseobject_load.return_value = peer_mock + mock_emit_event.reset_mock() + + synctime = check_peers("test_cluster", state_raw_data, sync_time) + + # two peers checked + assert synctime == 11 + + # Right kind of events emited + mock_emit_event.assert_called_with( + "peer_status", + "Connected", + "Peer p2 in cluster test_cluster is Connected", + "peer_p2", + "INFO") + + # Scene 3: We have discordance between the different sources + # gluster retrieved data shows a peer Disconnected. + # Warning message should be delivered + + # this is read from etcd db + peer_mock.connected = "Connected" + # this is obtained from gluster cmd + state_raw_data["Peers"]["peer2.connected"] = "Disconnected" + + mock_baseobject_load.return_value = peer_mock + mock_emit_event.reset_mock() + + synctime = check_peers("test_cluster", state_raw_data, sync_time) + + # two peers checked + assert synctime == 11 + + # Right kind of events emited + mock_emit_event.assert_called_with( + "peer_status", + "Disconnected", + "Peer p2 in cluster test_cluster is Disconnected", + "peer_p2", + "WARNING") + + +@mock.patch('tendrl.gluster_integration.sds_sync.sync_volumes') +@patch.object(BaseObject, "load") +def test_check_volumes(mock_glusterVolume_load, mock_sync_volumes): + + # Scene 1: One volume with options + sync_ttl = 1 + cluster_short_name = "test_cluster" + raw_data = {"Volumes": {"volume1.id": 1, + "volume1.status": "Ok", + "volume1.name": "vol1", + "volume1.options.count": 2, + "volume1.options.value2": "trusted.glusterfs.dht", + "volume1.options.key2": "cluster.dht-xattr-name", + "volume1.options.value1": "(null)", + "volume1.options.key1": "cluster.extra-hash-regex"} + } + raw_data_options = {"Volume Options": "volume options"} + + mock_sync_volumes.side_effect = [True, KeyError] + + the_volume = MagicMock() + the_volume.options = {"cluster.min-free-inodes": "5%"} + + mock_glusterVolume_load.return_value = the_volume + + sync_ttl = check_volumes(cluster_short_name, + raw_data, raw_data_options, sync_ttl) + + # Only 1 Volume + assert sync_ttl == 2 + + # The Volume is loaded, it will be needed to add options coming from + # gluster cmd details + mock_glusterVolume_load.assert_called() + + # 5 options for the volume must be saved (probably this can be improved + # saving only one time all the options) + assert len(the_volume.mock_calls) == 5 + + # Scene 2: One volume without options (is right not to update options + # when there is no options in the loaded volume?) + sync_ttl = 1 + mock_sync_volumes.side_effect = [True, KeyError] + + the_volume.reset_mock() + the_volume.options = None + mock_glusterVolume_load.reset_mock() + mock_glusterVolume_load.return_value = the_volume + + sync_ttl = check_volumes(cluster_short_name, raw_data, raw_data_options, + sync_ttl) + + # Only 1 Volume + assert sync_ttl == 2 + + # The Volume is loaded, it will be needed to add options coming from + # gluster cmd details + mock_glusterVolume_load.assert_called() + + # No options for the volume will be saved + assert len(the_volume.mock_calls) == 0 + + +@mock.patch( + "tendrl.gluster_integration.sds_sync.cluster_status.sync_cluster_status") +@mock.patch( + "tendrl.gluster_integration.sds_sync.utilization.sync_utilization_details") +@mock.patch( + "tendrl.gluster_integration.sds_sync.client_connections.sync_volume_connections") +@mock.patch( + "tendrl.gluster_integration.sds_sync.georep_details.aggregate_session_status") +@mock.patch( + "tendrl.gluster_integration.sds_sync.rebalance_status.sync_volume_rebalance_status") +@mock.patch( + "tendrl.gluster_integration.sds_sync.rebalance_status.sync_volume_rebalance_estimated_time") +@mock.patch( + "tendrl.gluster_integration.sds_sync.snapshots.sync_volume_snapshots") +@mock.patch( + "tendrl.gluster_integration.message.process_events.process_events") +@patch.object(BaseObject, "load_all") +def test_sync_by_provisioner(mock_gluster_volume_load_all, + mock_evt, + mock_sync_volume_snapshots, + mock_sync_volume_rebalance_estimated_time, + mock_sync_volume_rebalance_status, + mock_aggregate_session_status, + mock_sync_volume_connections, + mock_sync_utilization_details, + mock_sync_cluster_status): + + raw_data = {"Volumes": {"volume1.id": 1, + "volume1.status": "Ok", + "volume1.name": "vol1", + "volume1.options.count": 2, + "volume1.options.value2": "trusted.glusterfs.dht", + "volume1.options.key2": "cluster.dht-xattr-name", + "volume1.options.value1": "(null)", + "volume1.options.key1": "cluster.extra-hash-regex"} + } + + # Sync a couple of volumes + integration_id = "77deef29-b8e5-4dc5-8247-21e2a409a66a" + node_context = MagicMock() + node_context.tags = "provisioner/%s" % integration_id + sync_ttl = 1 + + volume_1 = MagicMock() + volume_2 = MagicMock() + volumes_returned = [volume_1, volume_2] + + i = 1 + for volume in volumes_returned: + volume.id = i + volume.deleted = False + volume.current_job = {"status": ""} + i += 1 + + mock_gluster_volume_load_all.return_value = volumes_returned + + sync_by_provisioner(integration_id, node_context, raw_data, sync_ttl) + + # Check That diferent sync method are called with the right parameters + # cluster_status.sync_cluster_status + assert len(mock_sync_cluster_status.call_args[0][0]) == 2 + assert [v.id for v in mock_sync_cluster_status.call_args[0][0]] == [1, 2] + assert mock_sync_cluster_status.call_args[0][1] == 351 + + for mock_fn in [mock_sync_utilization_details, + mock_sync_volume_connections, + mock_sync_volume_rebalance_estimated_time, + mock_sync_volume_rebalance_status, + ]: + assert len(mock_fn.call_args[0][0]) == 2 + assert [v.id for v in mock_fn.call_args[0][0]] == [1, 2] + + # georep_details.aggregate_session_status + assert mock_aggregate_session_status.called + + # process_events.process_events + assert mock_evt.called + + # snapshots.sync_volume_snapshots + # 18 = 10 (default sync interval) + (2 volumes * 4) + mock_sync_volume_snapshots.assert_called_with(raw_data["Volumes"], 18) From 79a68069e0f242cabbb8e6dd6c8310dc840dd369 Mon Sep 17 00:00:00 2001 From: Juan Miguel Olmo Date: Fri, 15 Jun 2018 09:10:33 +0200 Subject: [PATCH 2/4] Add files via upload --- .../gluster_integration/sds_sync/__init__.py | 510 +++++++++++------- 1 file changed, 313 insertions(+), 197 deletions(-) diff --git a/tendrl/gluster_integration/sds_sync/__init__.py b/tendrl/gluster_integration/sds_sync/__init__.py index 5c04fe7..6e7f440 100644 --- a/tendrl/gluster_integration/sds_sync/__init__.py +++ b/tendrl/gluster_integration/sds_sync/__init__.py @@ -35,18 +35,29 @@ BRICK_STARTED = "started" -class GlusterIntegrationSdsSyncStateThread(sds_sync.SdsSyncThread): +def cluster_not_ready(cluster): + """Indicates if a cluster is available or not - def __init__(self): - super(GlusterIntegrationSdsSyncStateThread, self).__init__() - self._complete = threading.Event() + @cluster: A Tendrl Cluster object + @retuns boolean indicating if the cluster is ready or not + """ + status_not_ready = (cluster.status == "importing" and + cluster.current_job['status'] == 'failed') or \ + cluster.status == "unmanaging" or \ + cluster.status == "set_volume_profiling" - def run(self): - logger.log( - "info", - NS.publisher_id, - {"message": "%s running" % self.__class__.__name__} - ) + return status_not_ready + + +def retrieve_cluster(integration_id): + """Retrieve a cluster and refresh network nodes information in it + + @integration_id: + @return: a Tendrl Cluster object + """ + + cluster = None + try: gluster_brick_dir = NS.gluster.objects.GlusterBrickDir() gluster_brick_dir.save() @@ -54,6 +65,7 @@ def run(self): cluster = NS.tendrl.objects.Cluster( integration_id=NS.tendrl_context.integration_id ).load() + if cluster.cluster_network in [None, ""]: try: node_networks = NS.tendrl.objects.NodeNetwork().load_all() @@ -65,13 +77,272 @@ def run(self): NS.publisher_id, {"message": "Failed to sync cluster network details"} ) + except Exception as ex: + logger.log( + "error", + NS.publisher_id, + {"message": "Failed to retrieve cluster data: %s" % ex} + ) + return cluster + + +def gluster_state_cmd(cmd, tmpfile): + """Execute a gluster get-state command and retrieve the output + + @cmd: Command to execute + @tmpfile: temp file where the output of the command is stored + @return: a dict with the output retrieved from the command + """ + + subprocess.call( + [ + 'gluster', + 'get-state', + 'glusterd', + 'odir', + '/var/run', + 'file', + tmpfile, + cmd + ] + ) + + the_file = '/var/run/%s' % tmpfile + raw_data = ini2json.ini_to_dict(the_file) + + subprocess.call(['rm', '-rf', the_file]) + + return raw_data + + +def check_peers(cluster_short_name, raw_data, sync_ttl): + """Compare peer status retrieved from gluster and status found in etcd. + + If the status not equal a event is raised. + Brick alerts for host disconnected are also raised. + + @cluster_short_name: short name of the cluster + @raw_data: a dict with gluster state detail + @sync_ttl: Current sync time delay + @return: the sync_ttl value updated + """ + + if "Peers" not in raw_data.keys(): + return sync_ttl + + index = 1 + peers = raw_data["Peers"] + disconnected_hosts = [] + + while True: + try: + # Peer object created with status data retrieved from gluster + # details cmd + peer = NS.tendrl.\ + objects.GlusterPeer( + peer_uuid=peers['peer%s.uuid' % index], + hostname=peers[ + 'peer%s.primary_hostname' % index + ], + state=peers['peer%s.state' % index], + connected=peers['peer%s.connected' % index] + ) + try: + # Peer retrieved from etcd + stored_peer = NS.tendrl.objects.GlusterPeer( + peer_uuid=peers['peer%s.uuid' % index] + ).load() + stored_peer_status = stored_peer.connected + current_status = peer.connected + + # manage event messages for the peer + if stored_peer_status and \ + current_status != stored_peer_status: + + the_peer = 'peer%s.primary_hostname' % index + + # Compose message + msg = ("Peer %s in cluster %s is %s") % ( + peers[the_peer], + cluster_short_name, + current_status + ) + instance = "peer_%s" % peers[the_peer] + + if current_status != 'Connected': + message_level = 'WARNING' + else: + message_level = 'INFO' + + # Send message in event + event_utils.emit_event( + "peer_status", + current_status, + msg, + instance, + message_level + ) + + # Disconnected host name to raise brick alert + if current_status.lower() == "disconnected": + disconnected_hosts.append(peers[the_peer]) + + except etcd.EtcdKeyNotFound: + pass + + sync_ttl += 5 + peer.save(ttl=sync_ttl) + index += 1 + + except KeyError: + break + + # Raise an alert for bricks when peer disconnected + # or node goes down + for disconnected_host in disconnected_hosts: + brick_status_alert( + disconnected_host + ) + + return sync_ttl + + +def check_volumes(cluster_short_name, raw_data, raw_data_options, sync_ttl): + """Load volume options in each volume + + @cluster_short_name: short name of the cluster + @raw_data: a dict with gluster state detail cmd output + @raw_data_options: A dict with gluster volumeoptions cmd output + @sync_ttl: Current sync time delay + @return: the sync_ttl value updated + """ + + if "Volumes" not in raw_data.keys(): + return sync_ttl + + index = 1 + volumes = raw_data['Volumes'] + while True: + try: + sync_volumes( + volumes, index, + raw_data_options['Volume Options'], + # sync_interval + 100 + no of peers + 350 + sync_ttl + 350, + cluster_short_name + ) + index += 1 + sync_ttl += 1 + except KeyError: + break + + # populate the volume specific options + reg_ex = re.compile("^volume[0-9]+.options+") + options = {} + for key in volumes.keys(): + if reg_ex.match(key): + options[key] = volumes[key] + + for key in options.keys(): + volname = key.split('.')[0] + vol_id = volumes['%s.id' % volname] + dict1 = {} + for k, v in options.items(): + if k.startswith('%s.options' % volname): + dict1['.'.join(k.split(".")[2:])] = v + options.pop(k, None) + volume = NS.tendrl.objects.GlusterVolume( + NS.tendrl_context.integration_id, + vol_id=vol_id + ).load() + + if volume.options is not None: + dest = dict(volume.options) + dest.update(dict1) + volume.options = dest + volume.save() + + return sync_ttl + + +def sync_by_provisioner(integration_id, node_context, raw_data, sync_ttl): + """Launch synchronization in several items + + @integration_id: Context integration id + @raw_data: a dict with gluster state detail + @volumes: dict with volumes data + """ + + if "Volumes" not in raw_data.keys(): + raw_data_volumes = {} + else: + raw_data_volumes = raw_data["Volumes"] + + if "provisioner/%s" % integration_id not in node_context.tags: + return + + all_volumes = NS.tendrl.objects.GlusterVolume(integration_id).load_all() or [] + volumes = [] + for volume in all_volumes: + if not str(volume.deleted).lower() == "true" or \ + volume.current_job.get('status', '') \ + in ['', 'finished', 'failed']: + volumes.append(volume) + + cluster_status.sync_cluster_status(volumes, sync_ttl + 350) + utilization.sync_utilization_details(volumes) + client_connections.sync_volume_connections(volumes) + georep_details.aggregate_session_status() + + try: + evt.process_events() + except etcd.EtcdKeyNotFound: + pass + + rebalance_status.sync_volume_rebalance_status(volumes) + rebalance_status.sync_volume_rebalance_estimated_time(volumes) + + snapshots.sync_volume_snapshots( + raw_data_volumes, + int(NS.config.data.get( + "sync_interval", 10 + )) + len(volumes) * 4 + ) + + +class GlusterIntegrationSdsSyncStateThread(sds_sync.SdsSyncThread): + + def __init__(self): + super(GlusterIntegrationSdsSyncStateThread, self).__init__() + self._complete = threading.Event() + + def run(self): + logger.log( + "info", + NS.publisher_id, + {"message": "%s running" % self.__class__.__name__} + ) + + # Get the cluster + cluster = retrieve_cluster(NS.tendrl_context.integration_id) + + # Errors prevent to do the job + if not cluster: + return + + # Init time to refresh cluster state _sleep = 0 + while not self._complete.is_set(): # To detect out of band deletes # refresh gluster object inventory at config['sync_interval'] + + # Refresh context SYNC_TTL = int(NS.config.data.get("sync_interval", 10)) + 100 NS.node_context = NS.node_context.load() NS.tendrl_context = NS.tendrl_context.load() + + # Increase cycle times if _sleep > 5: _sleep = int(NS.config.data.get("sync_interval", 10)) else: @@ -81,207 +352,51 @@ def run(self): _cluster = NS.tendrl.objects.Cluster( integration_id=NS.tendrl_context.integration_id ).load() - if (_cluster.status == "importing" and - _cluster.current_job['status'] == 'failed') or \ - _cluster.status == "unmanaging" or \ - _cluster.status == "set_volume_profiling": + + # Another attemp if cluster not ready, with a increased pause + # in each try. + if cluster_not_ready(_cluster): continue + # Get Cluster node context _cnc = NS.tendrl.objects.ClusterNodeContext( node_id=NS.node_context.node_id ).load() _cnc.is_managed = "yes" _cnc.save() - subprocess.call( - [ - 'gluster', - 'get-state', - 'glusterd', - 'odir', - '/var/run', - 'file', - 'glusterd-state', - 'detail' - ] - ) - raw_data = ini2json.ini_to_dict( - '/var/run/glusterd-state' - ) - subprocess.call(['rm', '-rf', '/var/run/glusterd-state']) - subprocess.call( - [ - 'gluster', - 'get-state', - 'glusterd', - 'odir', - '/var/run', - 'file', - 'glusterd-state-vol-opts', - 'volumeoptions' - ] - ) - raw_data_options = ini2json.ini_to_dict( - '/var/run/glusterd-state-vol-opts' - ) - subprocess.call( - [ - 'rm', - '-rf', - '/var/run/glusterd-state-vol-opts' - ] - ) + + # Load gluster state and volume options + raw_data = gluster_state_cmd("detail", "tmp_file_gl-state") + raw_data_options = gluster_state_cmd("volumeoptions", + "tmp_file_gl-vol-opts") + sync_object = NS.gluster.objects.\ SyncObject(data=json.dumps(raw_data)) sync_object.save() - if "Peers" in raw_data: - index = 1 - peers = raw_data["Peers"] - disconnected_hosts = [] - while True: - try: - peer = NS.tendrl.\ - objects.GlusterPeer( - peer_uuid=peers['peer%s.uuid' % index], - hostname=peers[ - 'peer%s.primary_hostname' % index - ], - state=peers['peer%s.state' % index], - connected=peers['peer%s.connected' % index] - ) - try: - stored_peer = NS.tendrl.objects.GlusterPeer( - peer_uuid=peers['peer%s.uuid' % index] - ).load() - stored_peer_status = stored_peer.connected - current_status = peers[ - 'peer%s.connected' % index - ] - if stored_peer_status and \ - current_status != stored_peer_status: - msg = ( - "Peer %s in cluster %s " - "is %s" - ) % ( - peers[ - 'peer%s.primary_hostname' % - index - ], - _cluster.short_name, - current_status - ) - instance = "peer_%s" % peers[ - 'peer%s.primary_hostname' % index - ] - event_utils.emit_event( - "peer_status", - current_status, - msg, - instance, - 'WARNING' if current_status != - 'Connected' - else 'INFO' - ) - # Disconnected host name to - # raise brick alert - if current_status.lower() == \ - "disconnected": - disconnected_hosts.append( - peers[ - 'peer%s.primary_hostname' % - index - ] - ) - except etcd.EtcdKeyNotFound: - pass - SYNC_TTL += 5 - peer.save(ttl=SYNC_TTL) - index += 1 - except KeyError: - break - # Raise an alert for bricks when peer disconnected - # or node goes down - for disconnected_host in disconnected_hosts: - brick_status_alert( - disconnected_host - ) - if "Volumes" in raw_data: - index = 1 - volumes = raw_data['Volumes'] - while True: - try: - sync_volumes( - volumes, index, - raw_data_options.get('Volume Options'), - # sync_interval + 100 + no of peers + 350 - SYNC_TTL + 350, - _cluster.short_name - ) - index += 1 - SYNC_TTL += 1 - except KeyError: - break - # populate the volume specific options - reg_ex = re.compile("^volume[0-9]+.options+") - options = {} - for key in volumes.keys(): - if reg_ex.match(key): - options[key] = volumes[key] - for key in options.keys(): - volname = key.split('.')[0] - vol_id = volumes['%s.id' % volname] - dict1 = {} - for k, v in options.items(): - if k.startswith('%s.options' % volname): - dict1['.'.join(k.split(".")[2:])] = v - options.pop(k, None) - volume = NS.tendrl.objects.GlusterVolume( - NS.tendrl_context.integration_id, - vol_id=vol_id - ).load() - if volume.options is not None: - dest = dict(volume.options) - dest.update(dict1) - volume.options = dest - volume.save() - - # Sync cluster global details - if "provisioner/%s" % NS.tendrl_context.integration_id \ - in NS.node_context.tags: - all_volumes = NS.tendrl.objects.GlusterVolume( - NS.tendrl_context.integration_id - ).load_all() or [] - volumes = [] - for volume in all_volumes: - if not str(volume.deleted).lower() == "true" or \ - volume.current_job.get('status', '') \ - in ['', 'finished', 'failed']: - volumes.append(volume) - cluster_status.sync_cluster_status(volumes, SYNC_TTL + 350) - utilization.sync_utilization_details(volumes) - client_connections.sync_volume_connections(volumes) - georep_details.aggregate_session_status() - try: - evt.process_events() - except etcd.EtcdKeyNotFound: - pass - rebalance_status.sync_volume_rebalance_status(volumes) - rebalance_status.sync_volume_rebalance_estimated_time( - volumes - ) - snapshots.sync_volume_snapshots( - raw_data['Volumes'], - int(NS.config.data.get( - "sync_interval", 10 - )) + len(volumes) * 4 - ) - # update alert count - update_cluster_alert_count() + # Check peers + SYNC_TTL = check_peers(_cluster.short_name, raw_data, SYNC_TTL) + + # Check Volumes + SYNC_TTL = check_volumes(_cluster.short_name, + raw_data, + raw_data_options, + SYNC_TTL + ) + # Sync data + sync_by_provisioner(NS.tendrl_context.integration_id, + NS.node_context, + raw_data, + SYNC_TTL) + # check and enable volume profiling - if "provisioner/%s" % NS.tendrl_context.integration_id in \ - NS.node_context.tags: + if "provisioner/%s" % NS.tendrl_context.integration_id \ + not in NS.node_context.tags: self._enable_disable_volume_profiling() + # update alert count + update_cluster_alert_count() + _cluster = NS.tendrl.objects.Cluster( integration_id=NS.tendrl_context.integration_id ).load() @@ -300,6 +415,7 @@ def run(self): ) in ['', 'finished', 'failed'] and \ _cluster.status in [None, ""]: _cluster.save() + except Exception as ex: Event( ExceptionMessage( From a22fe451faacc561776df0354d184330e313509d Mon Sep 17 00:00:00 2001 From: Juan Miguel Olmo Date: Fri, 15 Jun 2018 17:59:01 +0200 Subject: [PATCH 3/4] Long lines avoided --- tendrl/gluster_integration/tests/test_sds_sync.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tendrl/gluster_integration/tests/test_sds_sync.py b/tendrl/gluster_integration/tests/test_sds_sync.py index e0900a7..7a285dc 100644 --- a/tendrl/gluster_integration/tests/test_sds_sync.py +++ b/tendrl/gluster_integration/tests/test_sds_sync.py @@ -277,13 +277,17 @@ def test_check_volumes(mock_glusterVolume_load, mock_sync_volumes): @mock.patch( "tendrl.gluster_integration.sds_sync.utilization.sync_utilization_details") @mock.patch( - "tendrl.gluster_integration.sds_sync.client_connections.sync_volume_connections") + "tendrl.gluster_integration.sds_sync.client_connections." + "sync_volume_connections") @mock.patch( - "tendrl.gluster_integration.sds_sync.georep_details.aggregate_session_status") + "tendrl.gluster_integration.sds_sync.georep_details." + "aggregate_session_status") @mock.patch( - "tendrl.gluster_integration.sds_sync.rebalance_status.sync_volume_rebalance_status") + "tendrl.gluster_integration.sds_sync.rebalance_status." + "sync_volume_rebalance_status") @mock.patch( - "tendrl.gluster_integration.sds_sync.rebalance_status.sync_volume_rebalance_estimated_time") + "tendrl.gluster_integration.sds_sync.rebalance_status." + "sync_volume_rebalance_estimated_time") @mock.patch( "tendrl.gluster_integration.sds_sync.snapshots.sync_volume_snapshots") @mock.patch( From 44961634c97a2949e005c6a7ff2b2381d671a001 Mon Sep 17 00:00:00 2001 From: Juan Miguel Olmo Date: Fri, 15 Jun 2018 18:02:58 +0200 Subject: [PATCH 4/4] Cosmetic changes --- tendrl/gluster_integration/sds_sync/__init__.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tendrl/gluster_integration/sds_sync/__init__.py b/tendrl/gluster_integration/sds_sync/__init__.py index 267ed58..e82083f 100644 --- a/tendrl/gluster_integration/sds_sync/__init__.py +++ b/tendrl/gluster_integration/sds_sync/__init__.py @@ -41,10 +41,10 @@ def cluster_not_ready(cluster): @cluster: A Tendrl Cluster object @retuns boolean indicating if the cluster is ready or not """ - status_not_ready = (cluster.status == "importing" and - cluster.current_job['status'] == 'failed') or \ + status_not_ready = ((cluster.status == "importing" and + cluster.current_job['status'] == 'failed') or \ cluster.status == "unmanaging" or \ - cluster.status == "set_volume_profiling" + cluster.status == "set_volume_profiling") return status_not_ready @@ -281,7 +281,9 @@ def sync_by_provisioner(integration_id, node_context, raw_data, sync_ttl): if "provisioner/%s" % integration_id not in node_context.tags: return - all_volumes = NS.tendrl.objects.GlusterVolume(integration_id).load_all() or [] + all_volumes = ( + NS.tendrl.objects.GlusterVolume(integration_id).load_all() or []) + volumes = [] for volume in all_volumes: if not str(volume.deleted).lower() == "true" or \