diff --git a/controllers/replication.storage/replication/replication.go b/controllers/replication.storage/replication/replication.go index 82634ca05..dd5e82f9c 100644 --- a/controllers/replication.storage/replication/replication.go +++ b/controllers/replication.storage/replication/replication.go @@ -107,6 +107,18 @@ func (r *Replication) Resync() *Response { return &Response{Response: resp, Error: err} } +func (r *Replication) GetInfo() *Response { + resp, err := r.Params.Replication.GetVolumeReplicationInfo( + r.Params.VolumeID, + r.Params.ReplicationID, + r.Params.SecretName, + r.Params.SecretNamespace, + r.Params.Parameters, + ) + + return &Response{Response: resp, Error: err} +} + func (r *Response) HasKnownGRPCError(knownErrors []codes.Code) bool { if r.Error == nil { return false diff --git a/controllers/replication.storage/volumereplication_controller.go b/controllers/replication.storage/volumereplication_controller.go index 14f129c8f..ed5b6debf 100644 --- a/controllers/replication.storage/volumereplication_controller.go +++ b/controllers/replication.storage/volumereplication_controller.go @@ -125,6 +125,8 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } // remove the prefix keys in volume replication class parameters parameters := filterPrefixedParameters(replicationParameterPrefix, vrcObj.Spec.Parameters) + schedulingTime := parameters["schedulingInterval"] + scheduleTime, _ := time.ParseDuration(schedulingTime) // get secret secretName := vrcObj.Spec.Parameters[prefixedReplicationSecretNameKey] @@ -363,6 +365,14 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } instance.Status.LastCompletionTime = getCurrentTime() + var last_sync_time int64 + var requeueForInfo bool + if instance.Spec.ReplicationState == replicationv1alpha1.Primary { + last_sync_time, err = r.getVolumeReplicationInfo(vr) + instance.Status.LastSyncTime = last_sync_time + requeueForInfo = true + + } err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg) if err != nil { return ctrl.Result{}, err @@ -370,6 +380,14 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info(msg) + if requeueForInfo { + logger.Info("volume is primary, requeuing to get volume replication info") + return ctrl.Result{ + Requeue: true, + RequeueAfter: scheduleTime, + }, nil + } + return ctrl.Result{}, nil } @@ -614,6 +632,34 @@ func (r *VolumeReplicationReconciler) enableReplication(vr *volumeReplicationIns return nil } +// getVolumeReplicationInfo gets volume replication info. +func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplicationInstance) (int64, error) { + volumeReplication := replication.Replication{ + Params: vr.commonRequestParameters, + } + + resp := volumeReplication.GetInfo() + if resp.Error != nil { + vr.logger.Error(resp.Error, "failed to get volume replication info") + + return 0, resp.Error + } + + infoResponse, ok := resp.Response.(*proto.GetVolumeReplicationInfoResponse) + if !ok { + err := fmt.Errorf("received response of unexpected type") + vr.logger.Error(err, "unable to parse response") + + return 0, err + } + + var last_sync_time int64 + + last_sync_time = infoResponse.LastSyncTime + + return last_sync_time, nil +} + func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State { switch instance.Spec.ReplicationState { case replicationv1alpha1.Primary: diff --git a/internal/client/fake/replication-client.go b/internal/client/fake/replication-client.go index a7dba7c34..dbdfe3acf 100644 --- a/internal/client/fake/replication-client.go +++ b/internal/client/fake/replication-client.go @@ -30,6 +30,8 @@ type ReplicationClient struct { DemoteVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.DemoteVolumeResponse, error) // ResyncVolumeMock mocks ResyncVolume RPC call. ResyncVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.ResyncVolumeResponse, error) + // GetVolumeReplicationInfo mocks GetVolumeReplicationInfo RPC call. + GetVolumeReplicationInfoMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.GetVolumeReplicationInfoResponse, error) } // EnableVolumeReplication calls EnableVolumeReplicationMock mock function. @@ -87,3 +89,14 @@ func (rc *ReplicationClient) ResyncVolume( error) { return rc.ResyncVolumeMock(volumeID, replicationID, secretName, secretNamespace, parameters) } + +// GetVolumeReplicationInfo calls GetVolumeReplicationInfoMock function. +func (rc *ReplicationClient) GetVolumeReplicationInfo( + volumeID, + replicationID string, + secretName, secretNamespace string, + parameters map[string]string) ( + *proto.GetVolumeReplicationInfoResponse, + error) { + return rc.GetVolumeReplicationInfoMock(volumeID, replicationID, secretName, secretNamespace, parameters) +} diff --git a/internal/client/replication-client.go b/internal/client/replication-client.go index de3cc4f7e..2ad9581a9 100644 --- a/internal/client/replication-client.go +++ b/internal/client/replication-client.go @@ -45,6 +45,8 @@ type VolumeReplication interface { // ResyncVolume RPC call to resync the volume. ResyncVolume(volumeID, replicationID string, force bool, secretName, secretNamespace string, parameters map[string]string) (*proto. ResyncVolumeResponse, error) + // GetVolumeReplicationInfo RPC call to get volume replication info. + GetVolumeReplicationInfo(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.GetVolumeReplicationInfoResponse, error) } // NewReplicationClient returns VolumeReplication interface which has the RPC @@ -143,3 +145,21 @@ func (rc *replicationClient) ResyncVolume(volumeID, replicationID string, force return resp, err } + +// GetVolumeReplicationInfo RPC call to get volume replication info. +func (rc *replicationClient) GetVolumeReplicationInfo(volumeID, replicationID string, + secretName, secretNamespace string, parameters map[string]string) (*proto.GetVolumeReplicationInfoResponse, error) { + req := &proto.GetVolumeReplicationInfoRequest{ + VolumeId: volumeID, + ReplicationId: replicationID, + Parameters: parameters, + SecretName: secretName, + SecretNamespace: secretNamespace, + } + + createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) + defer cancel() + resp, err := rc.client.GetVolumeReplicationInfo(createCtx, req) + + return resp, err +} diff --git a/internal/sidecar/service/volumereplication.go b/internal/sidecar/service/volumereplication.go index 6b7d1146b..6ef50bc0a 100644 --- a/internal/sidecar/service/volumereplication.go +++ b/internal/sidecar/service/volumereplication.go @@ -191,3 +191,32 @@ func (rs *ReplicationServer) ResyncVolume( Ready: resp.Ready, }, nil } + +// GetVolumeReplicationInfo fetches required information from kubernetes cluster and calls +// CSI-Addons GetVolumeReplicationInfo service. +func (rs *ReplicationServer) GetVolumeReplicationInfo( + ctx context.Context, + req *proto.GetVolumeReplicationInfoRequest) (*proto.GetVolumeReplicationInfoResponse, error) { + // Get the secrets from the k8s cluster + data, err := kube.GetSecret(ctx, rs.kubeClient, req.GetSecretName(), req.GetSecretNamespace()) + if err != nil { + klog.Errorf("Failed to get secret %s in namespace %s: %v", req.GetSecretName(), req.GetSecretNamespace(), err) + return nil, status.Error(codes.Internal, err.Error()) + } + + resp, err := rs.controllerClient.GetVolumeReplicationInfo(ctx, + &csiReplication.GetVolumeReplicationInfoRequest{ + VolumeId: req.VolumeId, + ReplicationId: req.ReplicationId, + Parameters: req.Parameters, + Secrets: data, + }) + if err != nil { + klog.Errorf("Failed to enable volume replication: %v", err) + return nil, err + } + + return &proto.GetVolumeReplicationInfoResponse{ + LastSyncTime: resp.last_sync_time, + }, nil +}