package alluxio.worker.block;

import alluxio.Configuration;
import alluxio.Sessions;
import alluxio.exception.AlluxioException;
import alluxio.exception.BlockAlreadyExistsException;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.ConnectionFailedException;
import alluxio.exception.InvalidWorkerStateException;
import alluxio.exception.WorkerOutOfSpaceException;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.thrift.BlockWorkerClientService;
import alluxio.util.CommonUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.FileUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.FileInfo;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.AbstractWorker;
import alluxio.worker.DataServer;
import alluxio.worker.WorkerContext;
import alluxio.worker.WorkerIdRegistry;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.BlockMeta;
import alluxio.worker.file.FileSystemMasterClient;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/block/BlockWorker.class */
public final class BlockWorker extends AbstractWorker {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private BlockMasterSync mBlockMasterSync;
    private PinListSync mPinListSync;
    private SessionCleaner mSessionCleanerThread;
    private final BlockWorkerClientServiceHandler mServiceHandler;
    private final DataServer mDataServer;
    private final BlockMasterClient mBlockMasterClient;
    private final FileSystemMasterClient mFileSystemMasterClient;
    private final Configuration mConf;
    private SpaceReserver mSpaceReserver;
    private BlockHeartbeatReporter mHeartbeatReporter;
    private BlockMetricsReporter mMetricsReporter;
    private Sessions mSessions;
    private BlockStore mBlockStore;

    public BlockStore getBlockStore() {
        return this.mBlockStore;
    }

    public BlockWorkerClientServiceHandler getWorkerServiceHandler() {
        return this.mServiceHandler;
    }

    public String getDataBindHost() {
        return this.mDataServer.getBindHost();
    }

    public int getDataLocalPort() {
        return this.mDataServer.getPort();
    }

    public BlockWorker() throws IOException {
        super(Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("block-worker-heartbeat-%d", true)));
        this.mSpaceReserver = null;
        this.mConf = WorkerContext.getConf();
        this.mBlockMasterClient = new BlockMasterClient(NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.MASTER_RPC, this.mConf), this.mConf);
        this.mFileSystemMasterClient = new FileSystemMasterClient(NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.MASTER_RPC, this.mConf), this.mConf);
        this.mDataServer = DataServer.Factory.create(NetworkAddressUtils.getBindAddress(NetworkAddressUtils.ServiceType.WORKER_DATA, this.mConf), this, this.mConf);
        this.mConf.set("alluxio.worker.data.port", Integer.toString(this.mDataServer.getPort()));
        this.mServiceHandler = new BlockWorkerClientServiceHandler(this);
        this.mHeartbeatReporter = new BlockHeartbeatReporter();
        this.mMetricsReporter = new BlockMetricsReporter(WorkerContext.getWorkerSource());
        this.mSessions = new Sessions();
        this.mBlockStore = new TieredBlockStore();
        this.mBlockStore.registerBlockStoreEventListener(this.mHeartbeatReporter);
        this.mBlockStore.registerBlockStoreEventListener(this.mMetricsReporter);
    }

    @Override // alluxio.worker.Worker
    public Map<String, TProcessor> getServices() {
        HashMap hashMap = new HashMap();
        hashMap.put("BlockWorkerClient", new BlockWorkerClientService.Processor(getWorkerServiceHandler()));
        return hashMap;
    }

    @Override // alluxio.worker.Worker
    public void start() throws IOException {
        try {
            WorkerNetAddress netAddress = WorkerContext.getNetAddress();
            WorkerIdRegistry.registerWithBlockMaster(this.mBlockMasterClient, netAddress);
            this.mBlockMasterSync = new BlockMasterSync(this, netAddress, this.mBlockMasterClient);
            this.mPinListSync = new PinListSync(this, this.mFileSystemMasterClient);
            this.mSessionCleanerThread = new SessionCleaner(this);
            if (this.mConf.getBoolean("alluxio.worker.tieredstore.reserver.enabled")) {
                this.mSpaceReserver = new SpaceReserver(this);
            }
            getExecutorService().submit((Runnable) new HeartbeatThread("Worker Block Sync", this.mBlockMasterSync, WorkerContext.getConf().getInt("alluxio.worker.block.heartbeat.interval.ms")));
            getExecutorService().submit((Runnable) new HeartbeatThread("Worker Pin List Sync", this.mPinListSync, WorkerContext.getConf().getInt("alluxio.worker.block.heartbeat.interval.ms")));
            getExecutorService().submit(this.mSessionCleanerThread);
            if (this.mSpaceReserver != null) {
                getExecutorService().submit(this.mSpaceReserver);
            }
        } catch (ConnectionFailedException e) {
            LOG.error("Failed to get a worker id from block master", e);
            throw Throwables.propagate(e);
        }
    }

    @Override // alluxio.worker.Worker
    public void stop() throws IOException {
        this.mDataServer.close();
        this.mSessionCleanerThread.stop();
        this.mBlockMasterClient.close();
        if (this.mSpaceReserver != null) {
            this.mSpaceReserver.stop();
        }
        this.mFileSystemMasterClient.close();
        getExecutorService().shutdownNow();
        while (!this.mDataServer.isClosed()) {
            this.mDataServer.close();
            CommonUtils.sleepMs(100L);
        }
    }

    public void abortBlock(long j, long j2) throws BlockAlreadyExistsException, BlockDoesNotExistException, InvalidWorkerStateException, IOException {
        this.mBlockStore.abortBlock(j, j2);
    }

    public void accessBlock(long j, long j2) throws BlockDoesNotExistException {
        this.mBlockStore.accessBlock(j, j2);
    }

    public void cleanupSessions() {
        Iterator<Long> it = this.mSessions.getTimedOutSessions().iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            this.mSessions.removeSession(longValue);
            this.mBlockStore.cleanupSession(longValue);
        }
    }

    public void commitBlock(long j, long j2) throws BlockAlreadyExistsException, BlockDoesNotExistException, InvalidWorkerStateException, IOException, WorkerOutOfSpaceException {
        this.mBlockStore.commitBlock(j, j2);
        Long valueOf = Long.valueOf(this.mBlockStore.lockBlock(j, j2));
        try {
            try {
                try {
                    BlockMeta blockMeta = this.mBlockStore.getBlockMeta(j, j2, valueOf.longValue());
                    BlockStoreLocation blockLocation = blockMeta.getBlockLocation();
                    Long valueOf2 = Long.valueOf(blockMeta.getBlockSize());
                    this.mBlockMasterClient.commitBlock(WorkerIdRegistry.getWorkerId().longValue(), this.mBlockStore.getBlockStoreMeta().getUsedBytesOnTiers().get(blockLocation.tierAlias()).longValue(), blockLocation.tierAlias(), j2, valueOf2.longValue());
                    this.mBlockStore.unlockBlock(valueOf.longValue());
                } catch (ConnectionFailedException e) {
                    throw new IOException("Failed to commit block to master.", e);
                }
            } catch (IOException e2) {
                throw new IOException("Failed to commit block to master.", e2);
            }
        } catch (Throwable th) {
            this.mBlockStore.unlockBlock(valueOf.longValue());
            throw th;
        }
    }

    public String createBlock(long j, long j2, String str, long j3) throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException {
        return this.mBlockStore.createBlockMeta(j, j2, BlockStoreLocation.anyDirInTier(str), j3).getPath();
    }

    public void createBlockRemote(long j, long j2, String str, long j3) throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException {
        FileUtils.createBlockPath(this.mBlockStore.createBlockMeta(j, j2, BlockStoreLocation.anyDirInTier(str), j3).getPath());
    }

    public void freeSpace(long j, long j2, String str) throws WorkerOutOfSpaceException, BlockDoesNotExistException, IOException, BlockAlreadyExistsException, InvalidWorkerStateException {
        this.mBlockStore.freeSpace(j, j2, BlockStoreLocation.anyDirInTier(str));
    }

    public BlockWriter getTempBlockWriterRemote(long j, long j2) throws BlockDoesNotExistException, IOException {
        return this.mBlockStore.getBlockWriter(j, j2);
    }

    public BlockHeartbeatReport getReport() {
        return this.mHeartbeatReporter.generateReport();
    }

    public BlockStoreMeta getStoreMeta() {
        return this.mBlockStore.getBlockStoreMeta();
    }

    public BlockMeta getVolatileBlockMeta(long j) throws BlockDoesNotExistException {
        return this.mBlockStore.getVolatileBlockMeta(j);
    }

    public boolean hasBlockMeta(long j) {
        return this.mBlockStore.hasBlockMeta(j);
    }

    public long lockBlock(long j, long j2) throws BlockDoesNotExistException {
        return this.mBlockStore.lockBlock(j, j2);
    }

    public void moveBlock(long j, long j2, String str) throws BlockDoesNotExistException, BlockAlreadyExistsException, InvalidWorkerStateException, WorkerOutOfSpaceException, IOException {
        BlockStoreLocation anyDirInTier = BlockStoreLocation.anyDirInTier(str);
        long lockBlock = this.mBlockStore.lockBlock(j, j2);
        try {
            if (this.mBlockStore.getBlockMeta(j, j2, lockBlock).getBlockLocation().belongsTo(anyDirInTier)) {
                return;
            }
            this.mBlockStore.unlockBlock(lockBlock);
            this.mBlockStore.moveBlock(j, j2, anyDirInTier);
        } finally {
            this.mBlockStore.unlockBlock(lockBlock);
        }
    }

    public String readBlock(long j, long j2, long j3) throws BlockDoesNotExistException, InvalidWorkerStateException {
        return this.mBlockStore.getBlockMeta(j, j2, j3).getPath();
    }

    public BlockReader readBlockRemote(long j, long j2, long j3) throws BlockDoesNotExistException, InvalidWorkerStateException, IOException {
        return this.mBlockStore.getBlockReader(j, j2, j3);
    }

    public void removeBlock(long j, long j2) throws InvalidWorkerStateException, BlockDoesNotExistException, IOException {
        this.mBlockStore.removeBlock(j, j2);
    }

    public void requestSpace(long j, long j2, long j3) throws BlockDoesNotExistException, WorkerOutOfSpaceException, IOException {
        this.mBlockStore.requestSpace(j, j2, j3);
    }

    public void unlockBlock(long j) throws BlockDoesNotExistException {
        this.mBlockStore.unlockBlock(j);
    }

    public void unlockBlock(long j, long j2) throws BlockDoesNotExistException {
        this.mBlockStore.unlockBlock(j, j2);
    }

    public void sessionHeartbeat(long j, List<Long> list) {
        this.mSessions.sessionHeartbeat(j);
        this.mMetricsReporter.updateClientMetrics(list);
    }

    public void updatePinList(Set<Long> set) {
        this.mBlockStore.updatePinnedInodes(set);
    }

    public FileInfo getFileInfo(long j) throws IOException {
        try {
            return this.mFileSystemMasterClient.getFileInfo(j);
        } catch (AlluxioException e) {
            throw new IOException((Throwable) e);
        }
    }
}
