RollingFileMessageWriter.java
/*
* Copyright 2015-2016 Providence Authors
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.providence.logging;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.PMessageOrBuilder;
import net.morimekta.providence.PServiceCall;
import net.morimekta.providence.serializer.Serializer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* A simple rolling file message writer in the same manner that logging
* often does, e.g. the 'RollingFileAppender' from lockback.
* <p>
* the message writer MUST be assigned a rolling policy, and MAY be
* assigned a cleanup policy. Note that the cleanup policy will only be
* triggered when the rolling policy triggers a file update.
* <p>
* Also note that the RollingFileMessageWriter is NOT thread safe. So
* if you need to write to the message writer from multiple threads, you
* will either have to synchronize the calls yourself, or use the
* {@link QueuedMessageWriter}.
*/
public class RollingFileMessageWriter implements MessageWriter {
/**
* Create a rolling file message writer without a cleanup policy.
*
* @param directory The directory to place the message files into.
* @param serializer The message serializer to use.
* @param currentName The name of the current file symbolic link.
* @param rollingPolicy The rolling policy.
*/
public RollingFileMessageWriter(@Nonnull File directory,
@Nonnull Serializer serializer,
@Nonnull String currentName,
@Nonnull RollingPolicy rollingPolicy) {
this(directory, serializer, currentName, rollingPolicy, null);
}
/**
* Create a rolling file message writer.
*
* @param directory The directory to place the message files into.
* @param serializer The message serializer to use.
* @param currentName The name of the current file symbolic link.
* @param rollingPolicy The rolling policy.
* @param cleanupPolicy Optional cleanup policy.
*/
public RollingFileMessageWriter(@Nonnull File directory,
@Nonnull Serializer serializer,
@Nonnull String currentName,
@Nonnull RollingPolicy rollingPolicy,
@Nullable CleanupPolicy cleanupPolicy) {
try {
this.directory = directory.getCanonicalFile().getAbsoluteFile();
this.serializer = serializer;
this.currentName = currentName;
this.rollingPolicy = rollingPolicy;
this.cleanupPolicy = cleanupPolicy;
Files.createDirectories(directory.toPath());
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
/**
* Interface for rolling policy implementations.
*/
@FunctionalInterface
public interface RollingPolicy {
/**
* Maybe call the current file updater.
* @param onRollFile The current file updater to call if the current file
* should roll over.
* @param initialCall If this is the initial call, and the current
* file updater should be called regardless.
* @throws IOException If the file roll or update check failed.
*/
void maybeUpdateCurrentFile(@Nonnull CurrentFileUpdater onRollFile,
boolean initialCall) throws IOException;
}
/**
* Interface for calling back to the rolling file message writen when a file roll
* is supposed to happen.
*/
@FunctionalInterface
public interface CurrentFileUpdater {
void updateCurrentFile(@Nonnull String newFileName) throws IOException;
}
/**
* Interface for cleanup policy implementations.
*/
@FunctionalInterface
public interface CleanupPolicy {
/**
* Get a list of files that needs to be deleted because of the cleanup policy.
*
* @param candidateFiles List of the files that can be cleaned up. This does NOT
* include the currently written files (current file and
* symlink).
* @param currentFileName The current file name.
* @return List of files that needs to be deleted.
*/
@Nonnull List<String> getFilesToDelete(@Nonnull List<String> candidateFiles,
@Nonnull String currentFileName);
}
@Override
public <Message extends PMessage<Message>> int write(PMessageOrBuilder<Message> message)
throws IOException {
FileMessageWriter writer = getWriter();
int i = writer.write(message);
i += writer.separator();
return i;
}
@Override
public <Message extends PMessage<Message>> int write(PServiceCall<Message> call)
throws IOException {
FileMessageWriter writer = getWriter();
int i = writer.write(call);
i += writer.separator();
return i;
}
@Override
public int separator() {
return 0;
}
@Override
public void close() throws IOException {
if (currentWriter != null) {
try {
currentWriter.close();
} finally {
currentWriter = null;
}
}
}
private final Serializer serializer;
private final File directory;
private final String currentName;
private final RollingPolicy rollingPolicy;
private final CleanupPolicy cleanupPolicy;
private File currentFile;
private FileMessageWriter currentWriter;
private boolean shouldDoCleanup;
private void updateWriter(String rollToFile) throws IOException {
if (rollToFile.contains(File.separator)) {
throw new IllegalArgumentException("rolling file path " + rollToFile + " is not contained in output directory.");
}
close(); // close the old writer, it it was opened.
currentFile = new File(directory, rollToFile);
Path link = new File(directory, currentName).toPath();
currentWriter = new FileMessageWriter(currentFile, serializer, true);
currentWriter.getOutputStream(); // triggers creation of the file.
if (!rollToFile.equals(currentName)) {
// This should result in an atomic switch from old to new "current" logfile.
Path tmp = Files.createTempFile(directory.toPath(), ".pvd.", ".link");
Files.deleteIfExists(tmp);
Files.createSymbolicLink(tmp, currentFile.toPath());
Files.move(tmp, link, StandardCopyOption.REPLACE_EXISTING);
}
if (cleanupPolicy != null) {
shouldDoCleanup = true;
}
}
private FileMessageWriter getWriter() throws IOException {
rollingPolicy.maybeUpdateCurrentFile(this::updateWriter, currentWriter == null);
if (currentWriter == null) {
updateWriter(currentName);
}
if (shouldDoCleanup) {
shouldDoCleanup = false;
String[] files = directory.list();
if (files != null && files.length > 2) {
// More than the currentFile and the symlink.
List<String> toDelete = cleanupPolicy.getFilesToDelete(
Arrays.stream(files)
.filter(f -> !f.equals(currentName) && !f.equals(currentFile.getName()))
.collect(Collectors.toList()),
currentFile.getName());
toDelete.forEach(del -> {
try {
// Delete if exists, just in case the file was deleted by someone else
// while we figured out.
Files.deleteIfExists(new File(directory, del).toPath());
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
});
}
}
return currentWriter;
}
}