WebsocketDemoActivity.java 5.18 KB
package com.huaheng.mobilewms.websocket;

import android.annotation.SuppressLint;
import android.content.Intent;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;

import com.huaheng.mobilewms.R;
import com.huaheng.mobilewms.WMSApplication;
import com.huaheng.mobilewms.activity.model.CommonActivity;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import io.reactivex.schedulers.Schedulers;
import rx.android.schedulers.AndroidSchedulers;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.StompClient;
import ua.naiksoftware.stomp.dto.StompCommand;
import ua.naiksoftware.stomp.dto.StompHeader;
import ua.naiksoftware.stomp.dto.StompMessage;

public class WebsocketDemoActivity extends CommonActivity implements View.OnClickListener {
    private Button btConnect;
    private Button btDisconnect;
    private Button btSend;
    private Button btService;
    private EditText edWebsockUri;
    private StompClient mStompClient;
    private EditText edTextSend;
    private TextView textEcho;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_websocket_demo2);
        btConnect = findViewById(R.id.btConnect);
        btDisconnect = findViewById(R.id.btDisconnect);
        btSend = findViewById(R.id.btSend);
        btService = findViewById(R.id.btService);
        edWebsockUri = findViewById(R.id.websockUri);
        edTextSend = findViewById(R.id.textSend);
        textEcho = findViewById(R.id.textEcho);

        btConnect.setOnClickListener(this);
        btDisconnect.setOnClickListener(this);
        btSend.setOnClickListener(this);
        btService.setOnClickListener(this);
    }


    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btConnect:
                createStompClient(edWebsockUri.getText().toString());
                break;
            case R.id.btDisconnect:
                if(mStompClient != null && mStompClient.isConnected()) {
                    mStompClient.disconnect();
                }
                break;
            case R.id.btSend:
                sendMessage(WebsocketConstants.UP_STRING_MESSAGE_DEST, edTextSend.getText().toString());
                break;
            case R.id.btService:
                Intent intent = new Intent(this, WebsocketService.class);
                startService(intent);
                break;
            default:
                break;
        }
    }

    private void sendMessage(String uri, String msg){
        mStompClient.send(uri, msg).subscribe();
    }

    /**
     * 回复收到消息
     * @param msg
     */
    private void confirmMessage(StompMessage msg){
        for(StompHeader header: msg.getStompHeaders()){
            if(header.getKey().equals(WebsocketConstants.HEADER_MEG_ID)){
                String msg_id = header.getValue();
                List<StompHeader> headers = new ArrayList<>();
//                headers.add(new StompHeader(WebsocketConstants.HEADER_MEG_ID, msg_id));
                headers.add(new StompHeader(StompHeader.DESTINATION, WebsocketConstants.UP_CONFIRM_MESSAGE_DEST));
                mStompClient.send(new StompMessage(StompCommand.SEND, headers, null)).subscribe();
            }
        }
    }

    @SuppressLint("CheckResult")
    private void createStompClient(String uri) {
        HashMap<String, String> headers = new HashMap<>();
        headers.put("Cookie", WMSApplication.getOkhttpCookie());
        mStompClient = Stomp.over(Stomp.ConnectionProvider.OKHTTP, uri, headers);
        mStompClient.connect();
        mStompClient.withClientHeartbeat(WebsocketConstants.HEART_BEAT_CLIENT).withServerHeartbeat(WebsocketConstants.HEART_BEAT_SERVER);
        mStompClient.lifecycle().subscribe(lifecycleEvent -> {
            switch (lifecycleEvent.getType()) {

                case OPENED:
                    Log.d("TAG", "Stomp connection opened");
                    mStompClient.topic(WebsocketConstants.SUBSCRIBE_TOPIC_MESSAGE)
                            .subscribeOn(Schedulers.io())
//                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(topicMessage -> {
//                                showShort(topicMessage.getPayload());
                            });
                    mStompClient.topic(WebsocketConstants.SUBSCRIBE_USER_TOPIC_MESSAGE)
                            .subscribeOn(Schedulers.io())
//                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(topicMessage -> {
//                                showShort(topicMessage.getPayload());
                                confirmMessage(topicMessage);
                            });
                    break;

                case ERROR:
                    Log.e("TAG", "Error", lifecycleEvent.getException());
//                    startActivity(LoginActivity.class);
                    break;

                case CLOSED:
                    Log.d("TAG", "Stomp connection closed");
                    break;
            }
        });

    }
}