import { eventChannel } from "redux-saga";
import { call, take } from "redux-saga/effects";
import { implementPromiseAction } from "@adobe/redux-saga-promise";
import { sendSocketMessage } from "./ws.saga";
import { socketApiEvents } from "../socket/socket.actions";
import {
  ORCHESTRATION_COMPLETED,
  ORCHESTRATION_STEP_STARTED,
  ORCHESTRATION_TRIGGERED,
} from "../executions/constants";

const EXECUTION_UPDATE = "orchestration:execution-update";

/**
 * @typedef OrchestrationTriggeredMessage
 * @type {{
 *  correlationId: string,
 *  executionId: string,
 *  precedingMessageId: string,
 *  topic: string
 *}}
 */

/**
 * Listens for execution-update messages when an orchestration is triggered
 * @param {import("socket.io-client").Socket} socket
 * @returns {import("redux-saga").EventChannel<OrchestrationTriggeredMessage>}
 */
export const createOrcTriggeredChannel = (socket) =>
  eventChannel((emit) => {
    const handleEvent = (data) => {
      if (
        [
          ORCHESTRATION_TRIGGERED,
          ORCHESTRATION_STEP_STARTED,
          ORCHESTRATION_COMPLETED,
        ].includes(data.topic)
      ) {
        emit(data);
      }
    };
    socket.on(EXECUTION_UPDATE, handleEvent);
    return () => {
      socket.off(EXECUTION_UPDATE, handleEvent);
    };
  });

/**
 * @param {{orchestrationId: string, data: any}} message
 * @returns {Promise<{messageId: string}>}
 */
const triggerOrchestration = (message) => {
  return sendSocketMessage(socketApiEvents.MANUAL_TRIGGER, message);
};

/**
 * Emits a manual-trigger message and then waits for the corresponding execution id
 * @param {import("redux-saga").EventChannel<OrchestrationTriggeredMessage>} chan
 * @param {{payload: { orchestrationId: string, data: any }}} action
 */
export function* triggerAndWaitForExecution(chan, action) {
  return yield call(implementPromiseAction, action, function* () {
    const result = yield call(triggerOrchestration, action.payload);
    let executionId = null;
    while (true) {
      const executionUpdate = yield take(chan);
      if (executionUpdate.precedingMessageId === result.messageId) {
        executionId = executionUpdate.executionId;
      }
      if (
        executionId === executionUpdate.executionId &&
        executionUpdate.topic !== ORCHESTRATION_TRIGGERED
      ) {
        return executionUpdate;
      }
    }
  });
}
